Mentions légales du service

Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • batsim/pybatsim
  • x-QGuill/pybatsim
  • adfaure/pybatsim
  • gaupy/pybatsim
4 results
Show changes
Commits on Source (92)
Showing
with 514 additions and 369 deletions
......@@ -31,6 +31,7 @@ pip-delete-this-directory.txt
.cache
nosetests.xml
coverage.xml
.mypy_cache
# Translations
*.mo
......
.. _changelog:
Changelog
=========
All notable changes to this project will be documented in this file.
The format is based on `Keep a Changelog`_.
........................................................................................................................
Unreleased
----------
- `Commits since v3.0.0 <https://gitlab.inria.fr/batsim/pybatsim/compare/v3.0.0...master>`_
- ``nix-env -f https://github.com/oar-team/kapack/archive/master.tar.gz -iA pybatsim_dev``
........................................................................................................................
v3.1.0
------
- Release date: 2019-01-18
- `Commits since v2.1.1 <https://gitlab.inria.fr/batsim/pybatsim/compare/2.1.1...v3.1.0>`_
- ``nix-env -f https://github.com/oar-team/kapack/archive/master.tar.gz -iA pybatsim3``
- Recommended Batsim version: `v3.0.0 <https://gitlab.inria.fr/batsim/batsim/tags/v3.0.0>`_
This version is synchronized with Batsim v3.0.0.
See `Batsim changelog <https://batsim.readthedocs.io/en/latest/changelog.html#v3-0-0>`_ for more details.
Changes in API
~~~~~~~~~~~~~~
- Mark `start_jobs` as DEPRECATED, please now use `execute_jobs`.
- `set_resource_state`, `notify_resources_added` and `notify_resources_removed` functions now expect a ProcSet for the `resources` argument.
- `onAddResources` and `onRemoveResources` now sends a ProcSet for the `to_add` and `to_remove` arguments, respectively.
........................................................................................................................
v2.1.1
------
- Release date: 2018-08-31
- `Commits since v2.0.0 <https://gitlab.inria.fr/batsim/pybatsim/compare/2.0...2.1.1>`_
- ``nix-env -f https://github.com/oar-team/kapack/archive/master.tar.gz -iA pybatsim2``
- Recommended Batsim version: `v2.0.0 <https://gitlab.inria.fr/batsim/batsim/tags/v2.0.0>`_
........................................................................................................................
v2.0.0
------
- Release date: 2017-10-03
- Recommended Batsim version: `v2.0.0 <https://gitlab.inria.fr/batsim/batsim/tags/v2.0.0>`_
.. _Keep a Changelog: http://keepachangelog.com/en/1.0.0/
\ No newline at end of file
......@@ -28,6 +28,17 @@ The following commands are provided:
To manipulate the `out_jobs.csv` file based on data only available in the
scheduler but not in Batsim.
Batsim Version Compatibilities
------------------------------
As there are different release paces between Batsim and Pybatsim versions, here is a list of compatibilities between the two projects:
- `Batsim master branch`_ with `Pybatsim master branch`_ (development branches, may be unstable)
- `Batsim v3_0_0`_ with `Pybatsim v3_0_0`_ (latest major release, stable)
- `Batsim v2_0_0`_ with `Pybatsim batsim_2_0_compatible`_
Moreover, all notable changes are listed in the `changelog <https://gitlab.inria.fr/batsim/pybatsim/blob/master/CHANGELOG.rst>`_.
Examples
--------
......@@ -121,6 +132,18 @@ To generate the html documentation use the setup target::
Testing
-------
To run the test experiments use the setup target::
To run the test experiments it is preferable to first enter in a nix shell specific for the pybatsim development with the following command::
nix-shell https://github.com/oar-team/kapack/archive/master.tar.gz -A pybatsim_dev
Then you can run tests with the setup target::
./setup.py test --batsim-bin=path/to/batsim/binary
./setup.py test
.. _Batsim master branch: https://gitlab.inria.fr/batsim/batsim/tree/master
.. _Pybatsim master branch: https://gitlab.inria.fr/batsim/pybatsim/tree/master
.. _Batsim v3_0_0: https://gitlab.inria.fr/batsim/batsim/tags/v3.0.0
.. _Pybatsim v3_0_0: https://gitlab.inria.fr/batsim/pybatsim/tags/v3.0.0
.. _Batsim v2_0_0: https://gitlab.inria.fr/batsim/batsim/tags/v2.0.0
.. _Pybatsim batsim_2_0_compatible: https://gitlab.inria.fr/batsim/pybatsim/tags/batsim_2.0_compatible
from batsim._version import __version__
__version__ = "3.1.0"
This diff is collapsed.
......@@ -5,6 +5,7 @@ Usage:
pybatsim-experiment <experiment> [options]
Options:
--version Print the version of pybatsim and exit
-h --help Show this help message and exit.
-q --quiet Silent experiment output.
-d --debug Print additional debug messages.
......@@ -16,10 +17,11 @@ import json
from docopt import docopt
from batsim.tools.experiments import launch_experiment
from batsim import __version__
def main():
arguments = docopt(__doc__, version='2.0')
arguments = docopt(__doc__, version=__version__)
verbose = not bool(arguments["--quiet"])
debug = bool(arguments["--debug"])
......
......@@ -5,11 +5,14 @@ Usage:
pybatsim <scheduler> [-o <options_string>] [options]
Options:
--version Print the version of pybatsim and exit
-h --help Show this help message and exit.
-v --verbose Be verbose.
-v --verbosity=<verbosity-level> Sets the verbosity level. Available
values are {debug, info, warning, error, critical}
Default: info
-p --protect Protect the scheduler using a validating machine.
-s --socket-endpoint=<endpoint> Batsim socket endpoint to use [default: tcp://*:28000]
-e --event-socket-endpoint=<endpoint> Socket endpoint to use to publish scheduler events [default: tcp://*:29000]
-e --event-socket-endpoint=<endpoint> Socket endpoint to use to publish scheduler events
-o --options=<options_string> A Json string to pass to the scheduler [default: {}]
-O --options-file=<options_file> A file containing the json options
-t --timeout=<timeout> How long to wait for responses from Batsim [default: 2000]
......@@ -17,19 +20,24 @@ Options:
import sys
import json
import logging
from docopt import docopt
from batsim.tools.launcher import launch_scheduler, instanciate_scheduler
from batsim import __version__
def main():
arguments = docopt(__doc__, version='2.0')
arguments = docopt(__doc__, version=__version__)
if arguments['--verbose']:
verbose = 999
loglevel = logging.WARNING
if not arguments['--verbosity']:
loglevel = logging.INFO
else:
verbose = 0
loglevel = logging.getLevelName(arguments['--verbosity'].upper())
FORMAT = '[pybatsim - %(asctime)s - %(name)s - %(levelname)s] %(message)s'
logging.basicConfig(format=FORMAT, level=loglevel)
timeout = int(arguments['--timeout'] or float("inf"))
......@@ -54,8 +62,7 @@ def main():
event_socket_endpoint,
options,
timeout,
protect,
verbose)
protect)
if __name__ == "__main__":
......
......@@ -6,6 +6,7 @@
"""
import zmq
import json
import logging
class NetworkHandler:
......@@ -13,23 +14,22 @@ class NetworkHandler:
def __init__(
self,
socket_endpoint,
verbose=0,
timeout=2000,
type=zmq.REP):
self.socket_endpoint = socket_endpoint
self.verbose = verbose
self.timeout = timeout
self.context = zmq.Context()
self.connection = None
self.type = type
self.logger = logging.getLogger(__name__)
def send(self, msg):
self.send_string(json.dumps(msg))
def send_string(self, msg):
assert self.connection, "Connection not open"
if self.verbose > 0:
print("[PYBATSIM]: SEND_MSG\n {}".format(msg))
self.logger.debug("[PYBATSIM]: SEND_MSG\n {}".format(msg))
self.connection.send_string(msg)
def recv(self, blocking=False):
......@@ -49,8 +49,7 @@ class NetworkHandler:
except zmq.error.Again:
return None
if self.verbose > 0:
print('[PYBATSIM]: RECEIVED_MSG\n {}'.format(msg))
self.logger.debug('[PYBATSIM]: RECEIVED_MSG\n {}'.format(msg))
return msg
......@@ -58,8 +57,7 @@ class NetworkHandler:
assert not self.connection, "Connection already open"
self.connection = self.context.socket(self.type)
if self.verbose > 0:
print("[PYBATSIM]: binding to {addr}"
self.logger.debug("[PYBATSIM]: binding to {addr}"
.format(addr=self.socket_endpoint))
self.connection.bind(self.socket_endpoint)
......@@ -67,8 +65,7 @@ class NetworkHandler:
assert not self.connection, "Connection already open"
self.connection = self.context.socket(self.type)
if self.verbose > 0:
print("[PYBATSIM]: connecting to {addr}"
self.logger.debug("[PYBATSIM]: connecting to {addr}"
.format(addr=self.socket_endpoint))
self.connection.connect(self.socket_endpoint)
......
......@@ -143,8 +143,8 @@ def backfilling_sched(
respect_deps=respect_deps,
handle_scheduled_func=handle_scheduled_func)
# Do backfilling if there are still runnable jobs and free resources.
if len(jobs.runnable) > reservation_depth:
# Do backfilling if there are still open jobs.
if len(jobs.open) > 0:
do_backfilling(scheduler, reservation_depth, jobs=jobs,
resources=resources, resources_filter=resources_filter,
check_func=backfilling_check_func,
......
......@@ -67,11 +67,11 @@ class LoggingEvent:
return output.getvalue().strip()
def to_csv_line(self):
def conv_obj(o):
'''def conv_obj(o):
try:
return o.__dict__
except (AttributeError, ValueError):
return str(o)
return str(o)'''
data = {}
for k, v in self.data.items():
......@@ -87,12 +87,12 @@ class LoggingEvent:
new_v.append(e)
v = new_v
data[k] = v
try:
'''try:
data = json.dumps(data, default=lambda o: conv_obj(o))
except Exception as e:
raise ValueError(
"Error while dumping json data: {}"
.format(data))
.format(data))'''
output = io.StringIO()
csvdata = [self.time, self.level, self.processed_jobs, self.open_jobs,
......
......@@ -517,14 +517,14 @@ class Job:
job=self, type="job_starting_postponed_too_few_resources")
return
if not self._scheduler.has_time_sharing:
if not self._scheduler.has_resource_sharing_on_compute:
for r in self.allocation.resources:
for a1 in r.allocations:
for a2 in r.allocations:
if a1 != a2:
if a1.overlaps_with(a2):
self._scheduler.fatal(
"Time sharing is not enabled in Batsim (resource allocations are overlapping: own={own}, other={other})",
"Resource sharing is not enabled in Batsim (resource allocations are overlapping: own={own}, other={other})",
own=a1, other=a2,
type="schedule_resource_allocations_overlapping")
......
......@@ -296,39 +296,30 @@ class Profiles(metaclass=ABCMeta):
class ParallelPFS(Profile):
"""Implementation of the MsgParallelHomogeneousPFSMultipleTiers profile."""
type = "msg_par_hg_pfs_tiers"
type = "msg_par_hg_pfs"
class Direction(Enum):
TO_STORAGE = 1
FROM_STORAGE = 2
class Host(Enum):
HPST = 1
LCST = 2
def __init__(self, size,
direction=Direction.TO_STORAGE,
host=Host.LCST,
def __init__(self, size_read, size_write,
storage="pfs",
**kwargs):
super().__init__(**kwargs)
self.size = size
self.direction = direction
self.host = host
self.size_read = size_read
self.size_write = size_write
self.storage = storage
@classmethod
def from_dict(cls, dct, name=None):
return cls(size=dct["size"],
direction=cls.Direction[dct["direction"]],
host=cls.Host[dct["host"]],
return cls(size_read=dct["bytes_to_read"],
size_write=dct["bytes_to_write"],
storage=dct["storage"],
ret=dct.get("ret", 0),
name=name)
def to_dict(self, embed_references=False):
return {
"type": self.type,
"size": self.size,
"direction": self.direction.name.lower(),
"host": self.host.name,
"bytes_to_read": self.size_read,
"bytes_to_write_write": self.size_write,
"storage": self.storage,
"ret": self.ret,
}
......@@ -351,7 +342,7 @@ class Profiles(metaclass=ABCMeta):
@classmethod
def from_dict(cls, dct, name=None):
return cls(size=dct["size"],
direction=cls.Direction[dct["repeat"]],
direction=cls.Direction[dct["direction"].upper()],
ret=dct.get("ret", 0),
name=name)
......
......@@ -56,8 +56,8 @@ class Resource:
:param resources_list: the main resources list where this resource is contained
:param time_sharing: whether this resource allows time sharing (if `None`, the
value set in Batsim will be used)
:param resource_sharing: whether this resource allows jobs to run concurrently
(if `None`, the value set in Batsim will be used)
"""
TIME_DELTA = 0.00000000001
......@@ -68,13 +68,13 @@ class Resource:
name,
id=None,
resources_list=None,
time_sharing=None):
resource_sharing=None):
self._scheduler = scheduler
self._id = id
self._name = name
self._time_sharing = time_sharing
self._resource_sharing = resource_sharing
self._resources_list = resources_list
......@@ -86,11 +86,11 @@ class Resource:
return self._id
@property
def time_sharing(self):
def resource_sharing(self):
"""Whether this resource can be shared."""
if self._time_sharing is None:
return self._scheduler.has_time_sharing
return self._time_sharing
if self._resource_sharing is None:
return self._scheduler.has_resource_sharing_on_compute
return self._resource_sharing
@property
def name(self):
......@@ -144,7 +144,7 @@ class Resource:
"""Get time how long this resource is still free until the next reservation starts, `0`
if the resource is allocated, and `Inf` if this resource has no allocations.
Please note that this probably makes less sense for resources which allow time sharing
Please note that this probably makes less sense for resources which allow resource sharing
since this function will only look for times where there is not a single allocation
using this resource.
......@@ -170,16 +170,16 @@ class Resource:
def _do_add_allocation(self, allocation):
"""Adds an allocation to this resource.
It will be checked for overlaps (which are forbidden if time-sharing is not enabled).
It will be checked for overlaps (which are forbidden if resource sharing is not enabled).
:param allocation: the allocation to be added
"""
# If time sharing is not enabled: check that allocations do not overlap
if not self.time_sharing:
# If resource sharing is not enabled: check that allocations do not overlap
if not self.resource_sharing:
for alloc in self._allocations:
if alloc.overlaps_with(allocation):
self._scheduler.fatal(
"Overlapping resource allocation in resource {resource} while time-sharing is not allowed, {own} overlaps with {other}",
"Overlapping resource allocation in resource {resource} while resource sharing is not allowed, {own} overlaps with {other}",
resource=self,
own=alloc,
other=allocation)
......
......@@ -65,14 +65,6 @@ class BaseBatsimScheduler(BatsimScheduler):
type="deadlock")
self._scheduler.on_deadlock()
def onNOP(self):
self._scheduler._update_time()
self._scheduler.debug(
"decision process received NOP",
type="nop_received")
self._scheduler.on_nop()
self._scheduler._do_schedule()
def onJobsKilled(self, jobs):
self._scheduler._update_time()
self._scheduler.debug(
......@@ -121,7 +113,7 @@ class BaseBatsimScheduler(BatsimScheduler):
except KeyError:
pass
if workload:
job_description = workload[job_id]
job_description = workload[job.id]
job_description.job = newjob
newjob._workload_description = workload
......@@ -204,12 +196,20 @@ class BaseBatsimScheduler(BatsimScheduler):
self._scheduler.on_add_resources(resources)
def onRemoveResources(self, resources):
# TODO Shouldn't be an update_time call here too?
self._scheduler.info(
"Received remove Resources message: {resources}",
resources=resources,
type="remove_resources_received")
self._scheduler.on_remove_resources(resources)
def onRequestedCall(self):
self._scheduler._update_time()
self._scheduler.info(
"Received Requested Call message",
type="requested_call_received")
self._scheduler.on_requested_call()
class Scheduler(metaclass=ABCMeta):
"""The high-level scheduler which should be interited from by concrete scheduler
......@@ -270,28 +270,16 @@ class Scheduler(metaclass=ABCMeta):
"""The events happened in the scheduler."""
return self._events
@property
def machines(self):
"""The registered machines in Batsim."""
return self._machines
@property
def dynamic_workload(self):
"""The workload of dynamic job submissions of this scheduler."""
return self._dynamic_workload
@property
def hpst(self):
"""The hpst (high-performance storage tier) host managed by Batsim."""
return self._hpst
@property
def lcst(self):
"""The lcst (large-capacity storage tier) host managed by Batsim."""
return self._lcst
@property
def pfs(self):
"""The pfs (parallel file system) host managed by Batsim. This is an alias
to the host of the large-capacity storage tier.
"""
return self.lcst
@property
def options(self):
"""The options given to the launcher."""
......@@ -318,9 +306,9 @@ class Scheduler(metaclass=ABCMeta):
return self._time
@property
def has_time_sharing(self):
"""Whether or not time sharing is enabled."""
return self._batsim.time_sharing
def has_resource_sharing_on_compute(self):
"""Whether or not resource sharing on compute machines is enabled."""
return self._batsim.allow_compute_sharing
@property
def get_find_resource_handlers(self):
......@@ -459,8 +447,7 @@ class Scheduler(metaclass=ABCMeta):
self.resources),
type="resources_registered")
self._hpst = DictWrapper(self._batsim.hpst)
self._lcst = DictWrapper(self._batsim.lcst)
self._machines = DictWrapper(self._batsim.machines)
def on_init(self):
"""The init method called during the start-up phase of the scheduler."""
......@@ -617,6 +604,10 @@ class Scheduler(metaclass=ABCMeta):
"""
pass
def on_requested_call(self):
"""Hook similar to the low-level API."""
pass
def on_event(self, event):
"""Hook called on each event triggered by the scheduler.
......@@ -624,11 +615,14 @@ class Scheduler(metaclass=ABCMeta):
"""
pass
def submit_dynamic_job(self, *args, **kwargs):
def register_dynamic_job(self, *args, **kwargs):
job = self._dynamic_workload.new_job(*args, **kwargs)
self._dynamic_workload.prepare()
job.submit(self)
def notify_registration_finished(self):
self._batsim.notify_registration_finished()
def as_scheduler(*args, on_init=[], on_end=[], base_classes=[], **kwargs):
"""Decorator to convert a function to a scheduler class.
......
......@@ -114,7 +114,7 @@ class JobModelData:
self.deps = None
# Update fields with values in parameters
self.__dict__.update(kwargs)
self.update(**kwargs)
def process(self, random):
keys = [k for k, _ in self.__dict__.items()]
......@@ -154,6 +154,10 @@ class JobModelData:
def copy_job(self):
return copy.deepcopy(self)
def update(self, **kwargs):
self.__dict__.update(kwargs)
return self
def submit(self, model, random, parameters, workload):
assert None not in [
self.submit_time,
......@@ -233,13 +237,16 @@ class WorkloadModel:
return kwargs
def configure_profile(self, random, parameters, job):
completed = job.completed if job.completed is not None else True
return (
Profiles.Delay(
job.run_time or increment_float(
job.requested_time, -0.00000000001, True),
ret=0 if completed else 1))
try:
return job.profile
except AttributeError:
completed = job.completed if job.completed is not None else True
return (
Profiles.Delay(
job.run_time or increment_float(
job.requested_time, -0.00000000001, True),
ret=0 if completed else 1))
@property
def default_parameters(self):
......
......@@ -6,7 +6,7 @@
dynamically and submit them during the runtime of a scheduler.
WorkloadDescriptions are used internally to manage dynamic jobs created with
`Scheduler.submit_dynamic_job` and with `Job.submit_sub_job`. For each job
`Scheduler.register_dynamic_job` and with `Job.submit_sub_job`. For each job
a workload is generated named after the id of the job which is then submitted
to Batsim.
"""
......@@ -65,6 +65,9 @@ class JobDescription:
for k, v in kwargs.items():
setattr(self, k, v)
def clone(self):
return copy.deepcopy(self)
@property
def workload(self):
"""The `WorkloadDescription` if assigned. This is required in order to submit
......@@ -99,13 +102,13 @@ class JobDescription:
assert not self.submitted, "Job was already submitted and can not be modified"
has_workload = bool(self.workload)
if has_workload:
self.workload.remove_job(self)
#if has_workload:
# self.workload.remove_job(self)
self._id = id
if has_workload:
self.workload.add_job(self)
#if has_workload:
# self.workload.add_job(self)
@property
def res(self):
......@@ -220,15 +223,16 @@ class JobDescription:
additional_profiles = {p.name: p.to_dict()
for p in self._additional_profiles}
scheduler._batsim.submit_job(
self.id,
scheduler._batsim.register_profiles(
self.workload.name,
{self.profile.name : self.profile.to_dict()})
scheduler._batsim.register_job(
str(self.id),
self.res,
self.walltime,
self.profile.name,
self.workload.name,
self.subtime,
self.profile.to_dict())
scheduler._batsim.submit_profiles(
self.subtime)
scheduler._batsim.register_profiles(
self.workload.name,
additional_profiles)
self._submitted = True
......@@ -386,6 +390,9 @@ class WorkloadDescription:
elif job.id > self._last_job_id:
self._last_job_id = job.id + 1
jid = job._workload.name + '!' + str(job.id)
job.id = jid
self._jobmap[job.id] = job
def remove_job(self, job):
......@@ -393,7 +400,7 @@ class WorkloadDescription:
:param job: The `JobDescription`.
"""
assert not job.self.submitted, "Job was already submitted and can not be removed from the workload"
assert not job.submitted, "Job was already submitted and can not be removed from the workload"
self._jobs.remove(job)
job._workload = None
......
......@@ -102,29 +102,19 @@ def execute_cl(
stderr=None,
on_failure=None,
verbose=False):
try:
if verbose:
print("Starting: {}".format(" ".join(cl)), end="")
if stdout:
print(" 1>{}".format(stdout.name), end="")
if stderr:
print(" 2>{}".format(stderr.name), end="")
print()
if verbose:
print("Starting: {}".format(" ".join(cl)), end="")
if stdout:
print(" 1>{}".format(stdout.name), end="")
if stderr:
print(" 2>{}".format(stderr.name), end="")
print()
exec = subprocess.Popen(
cl, stdout=stdout, stderr=stderr)
exec.name = name
return exec
exec = subprocess.Popen(
cl, stdout=stdout, stderr=stderr,
preexec_fn=os.setsid)
exec.name = name
return exec
except PermissionError:
print(
"Failed to run {}: {}".format(name,
" ".join(sched_cl)),
file=sys.stderr)
if on_failure:
return on_failure(name, cl)
else:
sys.exit(2)
def terminate_cl(p, terminate=False):
......@@ -165,7 +155,7 @@ def run_workload_script(options, verbose):
if is_executable(script):
return do_run_script(["./" + script] + args)
elif script.endswith(".py"):
return do_run_script(["python", script] + args)
return do_run_script(["python3", script] + args)
else:
raise ValueError(
"Workload script {} is not executable but also does not seem to be a python script.".format(script))
......@@ -232,11 +222,11 @@ def prepare_scheduler_cl(options, verbose):
sched_cl = []
if 'interpreter' in options["scheduler"]:
if options["scheduler"]["interpreter"] == "coverage":
interpreter = ["python", "-m", "coverage", "run", "-a"]
interpreter = ["python3", "-m", "coverage", "run", "-a"]
elif options["scheduler"]["interpreter"] == "pypy":
interpreter = ["pypy", "-OOO"]
elif options["scheduler"]["interpreter"] == "profile":
interpreter = ["python", "-m", "cProfile", "-o", "simul.cprof"]
interpreter = ["python3", "-m", "cProfile", "-o", "simul.cprof"]
else:
assert False, "Unknown interpreter"
sched_cl += interpreter
......
......@@ -100,12 +100,11 @@ def instanciate_scheduler(name, options):
def launch_scheduler(scheduler,
socket_endpoint="tcp://*:28000",
event_socket_endpoint="tcp://*:28001",
options={},
timeout=2000,
protect=True,
verbose=0):
socket_endpoint,
event_socket_endpoint,
options,
timeout,
protect):
if protect:
vm = ValidatingMachine
......@@ -115,37 +114,37 @@ def launch_scheduler(scheduler,
print("Scheduler: {} ({})".format(scheduler.__class__.__name__, options))
time_start = time.time()
try:
bs = Batsim(scheduler,
NetworkHandler(socket_endpoint, verbose, timeout),
NetworkHandler(event_socket_endpoint, type=zmq.PUB),
validatingmachine=vm)
aborted = False
try:
bs.start()
except KeyboardInterrupt:
print("Aborted...")
aborted = True
time_ran = str(timedelta(seconds=time.time() - time_start))
print("Simulation ran for: " + time_ran)
print("Job received:", bs.nb_jobs_received,
", scheduled:", bs.nb_jobs_scheduled,
", rejected:", bs.nb_jobs_rejected,
", killed:", bs.nb_jobs_killed,
", submitted:", bs.nb_jobs_submitted,
", changed:", len(bs.jobs_manually_changed),
", timeout:", bs.nb_jobs_timeout,
", success", bs.nb_jobs_successful,
", complete:", bs.nb_jobs_completed)
if bs.nb_jobs_received != (
bs.nb_jobs_scheduled + bs.nb_jobs_rejected +
len(bs.jobs_manually_changed)):
return 1
return 1 if aborted else 0
except KeyboardInterrupt:
print("Aborted...")
#try:
bs = Batsim(scheduler,
socket_endpoint,
timeout,
event_socket_endpoint,
validatingmachine=vm)
aborted = False
# try:
bs.start()
# except KeyboardInterrupt:
# print("Aborted...")
# aborted = True
time_ran = str(timedelta(seconds=time.time() - time_start))
print("Simulation ran for: " + time_ran)
print("Job submitted:", bs.nb_jobs_submitted,
", scheduled:", bs.nb_jobs_scheduled,
", rejected:", bs.nb_jobs_rejected,
", killed:", bs.nb_jobs_killed,
", changed:", len(bs.jobs_manually_changed),
", timeout:", bs.nb_jobs_timeout,
", success", bs.nb_jobs_successful,
", complete:", bs.nb_jobs_completed)
if bs.nb_jobs_submitted != (
bs.nb_jobs_scheduled + bs.nb_jobs_rejected +
len(bs.jobs_manually_changed)):
return 1
return 1 if aborted else 0
#except KeyboardInterrupt:
# print("Aborted...")
# return 1
return 0
......
from batsim.batsim import BatsimScheduler
from sortedcontainers import SortedSet
from procset import ProcSet
class ValidatingMachine(BatsimScheduler):
......@@ -15,32 +16,32 @@ class ValidatingMachine(BatsimScheduler):
"""
def __init__(self, scheduler):
super().__init__()
self.scheduler = scheduler
def onAfterBatsimInit(self):
self.scheduler.onAfterBatsimInit()
def onSimulationBegins(self):
self.nb_res = self.bs.nb_res
self.nb_res = self.bs.nb_compute_resources
self.availableResources = SortedSet(range(self.nb_res))
self.jobs_waiting = []
self.previousAllocations = dict()
# intercept job start
self.bs_start_jobs_continuous = self.bs.start_jobs_continuous
self.bs.start_jobs_continuous = self.start_jobs_continuous
self.bs_start_jobs = self.bs.start_jobs
self.bs.start_jobs = self.start_jobs
# save real job start function
self.real_start_jobs = self.bs.start_jobs
self.real_execute_jobs = self.bs.execute_jobs
# intercept job start
self.scheduler.bs = self.bs
self.scheduler.bs.start_jobs = self.start_jobs_valid
self.scheduler.bs.execute_jobs = self.execute_jobs_valid
self.scheduler.onSimulationBegins()
def onSimulationEnds(self):
self.scheduler.onSimulationEnds()
def onNOP(self):
self.scheduler.onNOP()
def onDeadlock(self):
self.scheduler.onDeadlock()
......@@ -66,17 +67,20 @@ class ValidatingMachine(BatsimScheduler):
def onReportEnergyConsumed(self, consumed_energy):
self.scheduler.onReportEnergyConsumed(consumed_energy)
def start_jobs_continuous(self, allocs):
for (job, (first_res, last_res)) in allocs:
self.previousAllocations[job.id] = range(first_res, last_res + 1)
def onRequestedCall(self):
self.scheduler.onRequestedCall()
def start_jobs_valid(self, jobs, res):
for j in jobs:
try:
self.jobs_waiting.remove(job)
self.jobs_waiting.remove(j)
except KeyError:
raise ValueError(
"Job {} was not waiting (waiting: {})".format(
job, [
j2.id for j2 in self.jobs_waiting]))
for r in range(first_res, last_res + 1):
j, [j2.id for j2 in self.jobs_waiting]))
self.previousAllocations[j.id] = res[j.id]
for r in res[j.id]:
try:
self.availableResources.remove(r)
except KeyError:
......@@ -84,9 +88,10 @@ class ValidatingMachine(BatsimScheduler):
"Resource {} was not available (available: {})".format(
r, list(
self.availableResources)))
self.bs_start_jobs_continuous(allocs)
j.allocation = ProcSet(*res[j.id])
self.real_execute_jobs(jobs)
def start_jobs(self, jobs, res):
def execute_jobs_valid(self, jobs, io_jobs=None):
for j in jobs:
try:
self.jobs_waiting.remove(j)
......@@ -94,8 +99,8 @@ class ValidatingMachine(BatsimScheduler):
raise ValueError(
"Job {} was not waiting (waiting: {})".format(
j, [j2.id for j2 in self.jobs_waiting]))
self.previousAllocations[j.id] = res[j.id]
for r in res[j.id]:
self.previousAllocations[j.id] = j.allocation
for r in j.allocation:
try:
self.availableResources.remove(r)
except KeyError:
......@@ -103,4 +108,4 @@ class ValidatingMachine(BatsimScheduler):
"Resource {} was not available (available: {})".format(
r, list(
self.availableResources)))
self.bs_start_jobs(jobs, res)
self.real_execute_jobs(jobs, io_jobs)
\ No newline at end of file