Mentions légales du service

Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • batsim/pybatsim
  • x-QGuill/pybatsim
  • adfaure/pybatsim
  • gaupy/pybatsim
4 results
Show changes
Commits on Source (84)
Showing
with 1418 additions and 334 deletions
__version__ = "2.1.1"
from __future__ import print_function
# from __future__ import print_function
from enum import Enum
from copy import deepcopy
import json
import sys
from .network import NetworkHandler
from procset import ProcSet
import redis
import zmq
import logging
class Batsim(object):
WORKLOAD_JOB_SEPARATOR = "!"
ATTEMPT_JOB_SEPARATOR = "#"
WORKLOAD_JOB_SEPARATOR_REPLACEMENT = "%"
def __init__(self, scheduler,
network_handler=None,
event_handler=None,
validatingmachine=None,
handle_dynamic_notify=True):
validatingmachine=None):
FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(format=FORMAT)
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.DEBUG)
self.running_simulation = False
if network_handler is None:
network_handler = NetworkHandler('tcp://*:28000')
......@@ -29,7 +40,6 @@ class Batsim(object):
'tcp://127.0.0.1:28001', type=zmq.PUB)
self.network = network_handler
self.event_publisher = event_handler
self.handle_dynamic_notify = handle_dynamic_notify
self.jobs = dict()
......@@ -47,11 +57,10 @@ class Batsim(object):
self.nb_jobs_rejected = 0
self.nb_jobs_scheduled = 0
self.nb_jobs_completed = 0
self.nb_jobs_successful = 0
self.nb_jobs_failed = 0
self.nb_jobs_timeout = 0
self.initialized = False
self.jobs_manually_changed = set()
self.has_dynamic_job_submissions = False
......@@ -65,7 +74,6 @@ class Batsim(object):
self._read_bat_msg()
self.scheduler.onAfterBatsimInit()
self.initialized = True
def publish_event(self, event):
"""Sends a message to subscribed event listeners (e.g. external processes which want to
......@@ -116,7 +124,7 @@ class Batsim(object):
def start_jobs_continuous(self, allocs):
"""
allocs should have the followinf format:
allocs should have the following format:
[ (job, (first res, last res)), (job, (first res, last res)), ...]
"""
......@@ -143,15 +151,32 @@ class Batsim(object):
"type": "EXECUTE_JOB",
"data": {
"job_id": job.id,
# FixMe do not send "[9]"
"alloc": " ".join(map(str, res[job.id]))
"alloc": str(ProcSet(*res[job.id]))
}
}
)
self.nb_jobs_scheduled += 1
def execute_jobs(self, jobs):
""" args:jobs: list of jobs to execute (job.allocation MUST be set) """
for job in jobs:
assert job.allocation is not None
self._events_to_send.append({
"timestamp": self.time(),
"type": "EXECUTE_JOB",
"data": {
"job_id": job.id,
"alloc": str(job.allocation)
}
}
)
self.nb_jobs_scheduled += 1
def reject_jobs(self, jobs):
"""Reject the given jobs."""
assert len(jobs) > 0, "The list of jobs to reject is empty"
for job in jobs:
self._events_to_send.append({
"timestamp": self.time(),
......@@ -162,7 +187,7 @@ class Batsim(object):
})
self.nb_jobs_rejected += 1
def change_job_state(self, job, state, kill_reason=""):
def change_job_state(self, job, state):
"""Change the state of a job."""
self._events_to_send.append({
"timestamp": self.time(),
......@@ -170,14 +195,15 @@ class Batsim(object):
"data": {
"job_id": job.id,
"job_state": state.name,
"kill_reason": kill_reason
}
})
self.jobs_manually_changed.add(job)
def kill_jobs(self, jobs):
"""Kill the given jobs."""
assert len(jobs) > 0, "The list of jobs to kill is empty"
for job in jobs:
job.job_state = Job.State.IN_KILLING
self._events_to_send.append({
"timestamp": self.time(),
"type": "KILL_JOB",
......@@ -205,40 +231,40 @@ class Batsim(object):
res,
walltime,
profile_name,
workload_name,
subtime=None,
profile=None):
assert Batsim.WORKLOAD_JOB_SEPARATOR not in workload_name
assert isinstance(id, int)
assert isinstance(workload_name, str)
full_job_id = workload_name + Batsim.WORKLOAD_JOB_SEPARATOR + str(id)
if subtime is None:
subtime = self.time()
job_dict = {
"profile": profile_name,
"id": id,
"res": res,
"walltime": walltime,
"subtime": subtime,
}
msg = {
"timestamp": self.time(),
"type": "SUBMIT_JOB",
"data": {
"job_id": full_job_id,
"job": {
"profile": profile_name,
"id": id,
"res": res,
"walltime": walltime,
"subtime": subtime,
},
"job_id": id,
"job": job_dict,
}
}
if profile:
if profile is not None:
assert isinstance(profile, dict)
msg["data"]["profile"] = profile
self._events_to_send.append(msg)
self.nb_jobs_submitted += 1
self.has_dynamic_job_submissions = True
return full_job_id
# Create the job here
self.jobs[id] = Job.from_json_dict(job_dict, profile_dict=profile)
self.jobs[id].job_state = Job.State.SUBMITTED
return id
def set_resource_state(self, resources, state):
""" args:resources: is a list of resource numbers or intervals as strings (e.g. "1-5").
......@@ -284,13 +310,88 @@ class Batsim(object):
self._events_to_send.append(
{
"timestamp": self.time(),
"type": "QUERY_REQUEST",
"type": "QUERY",
"data": {
"requests": {"consumed_energy": {}}
}
}
)
def notify_resources_added(self, resources):
self._events_to_send.append(
{
"timestamp": self.time(),
"type": "RESOURCES_ADDED",
"data": {
"resources": resources
}
}
)
def notify_resources_removed(self, resources):
self._events_to_send.append(
{
"timestamp": self.time(),
"type": "RESOURCES_REMOVED",
"data": {
"resources": resources
}
}
)
def set_job_metadata(self, job_id, metadata):
# Consume some time to be sur that the job was created before the
# metadata is set
self.jobs[job_id].metadata = metadata
self._events_to_send.append(
{
"timestamp": self.time(),
"type": "SET_JOB_METADATA",
"data": {
"job_id": str(job_id),
"metadata": str(metadata)
}
}
)
def resubmit_job(self, job):
"""
The given job is resubmited but in a dynamic workload. The name of this
workload is "resubmit=N" where N is the number of resubmission.
The job metadata is fill with a dict that contains the original job
full id in "parent_job" and the number of resubmission in "nb_resumit".
Warning: The profile_dict of the given job must be filled
"""
if job.metadata is None:
metadata = {"parent_job" : job.id, "nb_resubmit": 1}
else:
metadata = deepcopy(job.metadata)
if "parent_job" not in metadata:
metadata["parent_job"] = job.id
metadata["nb_resubmit"] = metadata["nb_resubmit"] + 1
# Keep the curent workload and add a resubmit number
splitted_id = job.id.split(Batsim.ATTEMPT_JOB_SEPARATOR)
if len(splitted_id) == 0:
new_job_name = job.id
else:
# This job as already an attempt number
new_job_name = splitted_id[0]
new_job_name = new_job_name + Batsim.ATTEMPT_JOB_SEPARATOR + str(metadata["nb_resubmit"])
new_job_id = self.submit_job(
new_job_name,
job.requested_resources,
job.requested_time,
job.profile,
profile=job.profile_dict)
# log in job metadata parent job and nb resubmit
self.set_job_metadata(new_job_id, metadata)
def do_next_event(self):
return self._read_bat_msg()
......@@ -305,6 +406,8 @@ class Batsim(object):
msg = self.network.recv(blocking=not self.running_simulation)
if msg is None:
self.scheduler.onDeadlock()
continue
self.logger.info("Message Received from Batsim: {}".format(msg))
self._current_time = msg["now"]
......@@ -323,19 +426,26 @@ class Batsim(object):
assert not self.running_simulation, "A simulation is already running (is more than one instance of Batsim active?!)"
self.running_simulation = True
self.nb_res = event_data["nb_resources"]
batconf = event_data["config"]
self.batconf = event_data["config"]
self.time_sharing = event_data["allow_time_sharing"]
self.handle_dynamic_notify = self.batconf["job_submission"]["from_scheduler"]["enabled"]
self.redis_enabled = batconf["redis"]["enabled"]
redis_hostname = batconf["redis"]["hostname"]
redis_port = batconf["redis"]["port"]
redis_prefix = batconf["redis"]["prefix"]
self.redis_enabled = self.batconf["redis"]["enabled"]
redis_hostname = self.batconf["redis"]["hostname"]
redis_port = self.batconf["redis"]["port"]
redis_prefix = self.batconf["redis"]["prefix"]
if self.redis_enabled:
self.redis = DataStorage(redis_prefix, redis_hostname,
redis_port)
self.resources = event_data["resources_data"]
# Retro compatibility for old Batsim API > 1.0 < 3.0
if "resources_data" in event_data:
res_key = "resources_data"
else:
res_key = "compute_resources"
self.resources = {
res["id"]: res for res in event_data[res_key]}
self.hpst = event_data.get("hpst_host", None)
self.lcst = event_data.get("lcst_host", None)
......@@ -344,39 +454,50 @@ class Batsim(object):
elif event_type == "SIMULATION_ENDS":
assert self.running_simulation, "No simulation is currently running"
self.running_simulation = False
print("All jobs have been submitted and completed!")
self.logger.info("All jobs have been submitted and completed!")
finished_received = True
self.scheduler.onSimulationEnds()
elif event_type == "JOB_SUBMITTED":
# Received WORKLOAD_NAME!JOB_ID
job_id = event_data["job_id"]
self.jobs[job_id] = self.get_job(event)
self.scheduler.onJobSubmission(self.jobs[job_id])
job = self.get_job(event)
job.job_state = Job.State.SUBMITTED
# don't override dynamic job
if job_id not in self.jobs:
self.jobs[job_id] = job
self.scheduler.onJobSubmission(job)
self.nb_jobs_received += 1
elif event_type == "JOB_KILLED":
self.scheduler.onJobsKilled(
[self.jobs[jid] for jid in event_data["job_ids"]])
self.nb_jobs_killed += len(event_data["job_ids"])
# get progress
killed_jobs = []
for jid in event_data["job_ids"]:
j = self.jobs[jid]
j.progress = event_data["job_progress"][jid]
killed_jobs.append(j)
self.scheduler.onJobsKilled(killed_jobs)
elif event_type == "JOB_COMPLETED":
job_id = event_data["job_id"]
j = self.jobs[job_id]
j.finish_time = event["timestamp"]
j.status = event["data"]["status"]
try:
j.job_state = Job.State[event["data"]["job_state"]]
except KeyError:
j.job_state = Job.State.UNKNOWN
j.kill_reason = event["data"]["kill_reason"]
j.return_code = event["data"]["return_code"]
self.scheduler.onJobCompletion(j)
if j.status == "TIMEOUT":
if j.job_state == Job.State.COMPLETED_WALLTIME_REACHED:
self.nb_jobs_timeout += 1
elif j.status == "FAILED":
elif j.job_state == Job.State.COMPLETED_FAILED:
self.nb_jobs_failed += 1
else:
self.nb_jobs_completed += 1
elif j.job_state == Job.State.COMPLETED_SUCCESSFULLY:
self.nb_jobs_successful += 1
elif j.job_state == Job.State.COMPLETED_KILLED:
self.nb_jobs_killed += 1
self.nb_jobs_completed += 1
elif event_type == "FROM_JOB_MSG":
job_id = event_data["job_id"]
j = self.jobs[job_id]
......@@ -395,24 +516,28 @@ class Batsim(object):
raise Exception("Multiple intervals are not supported")
self.scheduler.onMachinePStateChanged(
nodeInterval, event_data["state"])
elif event_type == "QUERY_REPLY":
elif event_type == "ANSWER":
consumed_energy = event_data["consumed_energy"]
self.scheduler.onReportEnergyConsumed(consumed_energy)
elif event_type == 'REQUESTED_CALL':
self.scheduler.onNOP()
# TODO: separate NOP / REQUESTED_CALL (here and in the algos)
elif event_type == 'ADD_RESOURCES':
self.scheduler.onAddResources(event_data["resources"])
elif event_type == 'REMOVE_RESOURCES':
self.scheduler.onRemoveResources(event_data["resources"])
else:
raise Exception("Unknow event type {}".format(event_type))
self.scheduler.onNoMoreEvents()
if self.handle_dynamic_notify and not finished_received:
if ((self.nb_jobs_completed +
self.nb_jobs_failed +
self.nb_jobs_timeout +
self.nb_jobs_killed) == self.nb_jobs_scheduled and
not self.has_dynamic_job_submissions) and self.initialized:
if (self.nb_jobs_completed == self.nb_jobs_received != 0):
# All the received and submited jobs are completed or killed
self.notify_submission_finished()
else:
self.notify_submission_continue()
#self.notify_submission_continue()
# Some jobs just have been dynamically submitted
self.has_dynamic_job_submissions = False
if len(self._events_to_send) > 0:
......@@ -425,10 +550,14 @@ class Batsim(object):
"events": self._events_to_send
}
self.network.send(new_msg)
self.logger.info("Message Sent to Batsim: {}".format(new_msg))
if finished_received:
self.network.close()
self.event_publisher.close()
if self.handle_dynamic_notify:
self.notify_submission_finished()
return not finished_received
......@@ -478,8 +607,10 @@ class Job(object):
RUNNING = 2
COMPLETED_SUCCESSFULLY = 3
COMPLETED_FAILED = 4
COMPLETED_KILLED = 5
REJECTED = 6
COMPLETED_WALLTIME_REACHED = 5
COMPLETED_KILLED = 6
REJECTED = 7
IN_KILLING = 8
def __init__(
self,
......@@ -496,19 +627,22 @@ class Job(object):
self.requested_resources = res
self.profile = profile
self.finish_time = None # will be set on completion by batsim
self.status = None
self.job_state = Job.State.UNKNOWN
self.kill_reason = None
self.return_code = None
self.progress = None
self.json_dict = json_dict
self.profile_dict = profile_dict
self.allocation = None
self.metadata = None
def __repr__(self):
return("<Job {0}; sub:{1} res:{2} reqtime:{3} prof:{4} stat:{5} jstat:{6} killreason:{7} ret:{8}>".format(
return(
("<Job {0}; sub:{1} res:{2} reqtime:{3} prof:{4} "
"state:{5} ret:{6} alloc:{7}>\n").format(
self.id, self.submit_time, self.requested_resources,
self.requested_time, self.profile, self.status,
self.job_state, self.kill_reason,
self.return_code))
self.requested_time, self.profile,
self.job_state,
self.return_code, self.allocation))
@staticmethod
def from_json_string(json_str):
......@@ -522,7 +656,8 @@ class Job(object):
json_dict.get("walltime", -1),
json_dict["res"],
json_dict["profile"],
json_dict, profile_dict)
json_dict,
profile_dict)
# def __eq__(self, other):
# return self.id == other.id
# def __ne__(self, other):
......@@ -534,6 +669,11 @@ class BatsimScheduler(object):
def __init__(self, options):
self.options = options
FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(format=FORMAT)
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.DEBUG)
def onAfterBatsimInit(self):
# You now have access to self.bs and all other functions
pass
......@@ -561,11 +701,19 @@ class BatsimScheduler(object):
raise NotImplementedError()
def onJobsKilled(self, jobs):
for job in jobs:
self.onJobCompletion(job)
raise NotImplementedError()
def onMachinePStateChanged(self, nodeid, pstate):
raise NotImplementedError()
def onReportEnergyConsumed(self, consumed_energy):
raise NotImplementedError()
def onAddResources(self, to_add):
raise NotImplementedError()
def onRemoveResources(self, to_remove):
raise NotImplementedError()
def onNoMoreEvents(self):
pass
"""
batsim.cmds
~~~~~~~~~~~
Additional tools installed in the path.
"""
'''
Run PyBatsim Experiments.
Usage:
pybatsim-experiment <experiment> [options]
Options:
-h --help Show this help message and exit.
-q --quiet Silent experiment output.
-d --debug Print additional debug messages.
'''
import sys
import json
from docopt import docopt
from batsim.tools.experiments import launch_experiment
def main():
arguments = docopt(__doc__, version='2.0')
verbose = not bool(arguments["--quiet"])
debug = bool(arguments["--debug"])
options_file = arguments["<experiment>"]
try:
with open(options_file) as f:
options = json.loads(f.read())
except FileNotFoundError:
if debug:
raise
print("Experiment file does not exist: {}".format(
options_file), file=sys.stderr)
sys.exit(1)
except Exception:
if debug:
raise
print("Error in json file: {}".format(options_file), file=sys.stderr)
sys.exit(1)
options["options-file"] = options_file
if verbose:
print("Running experiment: {}".format(options_file))
return launch_experiment(options, verbose=verbose)
if __name__ == "__main__":
sys.exit(main())
'''
Run PyBatsim Schedulers.
Usage:
pybatsim <scheduler> [-o <options_string>] [options]
Options:
-h --help Show this help message and exit.
-v --verbose Be verbose.
-p --protect Protect the scheduler using a validating machine.
-s --socket-endpoint=<endpoint> Batsim socket endpoint to use [default: tcp://*:28000]
-e --event-socket-endpoint=<endpoint> Socket endpoint to use to publish scheduler events [default: tcp://*:29000]
-o --options=<options_string> A Json string to pass to the scheduler [default: {}]
-O --options-file=<options_file> A file containing the json options
-t --timeout=<timeout> How long to wait for responses from Batsim [default: 2000]
'''
import sys
import json
from docopt import docopt
from batsim.tools.launcher import launch_scheduler, instanciate_scheduler
def main():
arguments = docopt(__doc__, version='2.0')
if arguments['--verbose']:
verbose = 999
else:
verbose = 0
timeout = int(arguments['--timeout'] or float("inf"))
protect = bool(arguments['--protect'])
if arguments["--options-file"]:
with open(arguments["--options-file"]) as options_file:
options = json.load(options_file)
elif arguments["--options"]:
options = json.loads(arguments['--options'])
else:
options = {}
scheduler_filename = arguments['<scheduler>']
socket_endpoint = arguments['--socket-endpoint']
event_socket_endpoint = arguments['--event-socket-endpoint']
scheduler = instanciate_scheduler(scheduler_filename, options=options)
return launch_scheduler(scheduler,
socket_endpoint,
event_socket_endpoint,
options,
timeout,
protect,
verbose)
if __name__ == "__main__":
sys.exit(main())
......@@ -11,6 +11,7 @@ from .job import *
from .resource import *
from .profiles import *
from .alloc import *
from .workloads import *
__all__ = [
"Scheduler",
......@@ -24,4 +25,7 @@ __all__ = [
"Profiles",
"Profile",
"Allocation",
"JobDescription",
"WorkloadDescription",
"generate_workload",
]
......@@ -19,6 +19,7 @@ def do_backfilling(
resources_filter=None,
check_func=None,
handle_scheduled_func=None,
respect_deps=False,
backfilling_sort=None):
"""Helper to implement the backfilling step of backfilling algorithms.
......@@ -44,14 +45,23 @@ def do_backfilling(
:param handle_scheduled_func: a function which will be given the latest scheduled job
as a parameter
:param respect_deps: priority jobs are reserved even if their dependencies are not fulfilled yet.
:param backfilling_sort: after the priority jobs were reserved, this function will
be used to call jobs.sorted(f) with it to sort the remaining
jobs.
"""
runnable_jobs = jobs.runnable
reserved_jobs = runnable_jobs[:reservation_depth]
remaining_jobs = runnable_jobs[reservation_depth:]
if respect_deps:
open_jobs = jobs.open
reserved_jobs = open_jobs[:reservation_depth]
remaining_jobs = open_jobs[reservation_depth:].runnable
else:
runnable_jobs = jobs.runnable
reserved_jobs = runnable_jobs[:reservation_depth]
remaining_jobs = runnable_jobs[reservation_depth:]
if backfilling_sort:
remaining_jobs = remaining_jobs.sorted(backfilling_sort)
......@@ -63,6 +73,7 @@ def do_backfilling(
remaining_jobs,
resources_filter=resources_filter,
check_func=check_func,
respect_deps=respect_deps,
handle_scheduled_func=handle_scheduled_func)
......@@ -75,6 +86,7 @@ def backfilling_sched(
filler_check_func=None,
backfilling_check_func=None,
handle_scheduled_func=None,
respect_deps=False,
backfilling_sort=None,
priority_sort=None):
"""Backfilling algorithm using the filler scheduler to run the first jobs and backfill the remaining jobs.
......@@ -105,6 +117,8 @@ def backfilling_sched(
:param handle_scheduled_func: a function which will be given the latest scheduled job
as a parameter
:param respect_deps: priority jobs are reserved even if their dependencies are not fulfilled yet.
:param backfilling_sort: after the priority jobs were reserved, this function will
be used to call jobs.sorted(f) with it to sort the remaining
jobs.
......@@ -126,13 +140,14 @@ def backfilling_sched(
filler_sched(scheduler, abort_on_first_nonfitting=True, jobs=jobs,
resources=resources, resources_filter=resources_filter,
check_func=filler_check_func,
respect_deps=respect_deps,
handle_scheduled_func=handle_scheduled_func)
# Do backfilling if there are still runnable jobs and free resources.
if resources.free and len(
jobs.runnable) > reservation_depth:
if len(jobs.runnable) > reservation_depth:
do_backfilling(scheduler, reservation_depth, jobs=jobs,
resources=resources, resources_filter=resources_filter,
check_func=backfilling_check_func,
handle_scheduled_func=handle_scheduled_func,
respect_deps=respect_deps,
backfilling_sort=backfilling_sort)
......@@ -10,6 +10,7 @@
def filler_sched(scheduler, abort_on_first_nonfitting=False, jobs=None,
resources=None,
resources_filter=None, check_func=None,
respect_deps=False,
handle_scheduled_func=None):
"""Helper to implement a filler scheduling algorithm.
......@@ -29,6 +30,8 @@ def filler_sched(scheduler, abort_on_first_nonfitting=False, jobs=None,
:param check_func: a function to check whether or not a job is allowed to be scheduled.
Signature: job, res, list_of_already_scheduled_jobs
:param respect_deps: priority jobs are reserved even if their dependencies are not fulfilled yet.
:param handle_scheduled_func: a function which will be given the latest scheduled job
as a parameter
"""
......@@ -45,7 +48,15 @@ def filler_sched(scheduler, abort_on_first_nonfitting=False, jobs=None,
already_scheduled = []
for job in jobs.runnable:
for job in jobs.open:
if not job.runnable:
if abort_on_first_nonfitting:
if respect_deps:
break
else:
continue
else:
continue
res = resources.find_sufficient_resources_for_job(
job, filter=resources_filter)
if res:
......
......@@ -7,11 +7,14 @@
"""
from ..alloc import Allocation
from ..job import Job
from ..utils import increment_float
from ..resource import Resource
def find_resources_without_delaying_priority_jobs(
scheduler, priority_jobs, resources, job, resources_filter=None,
check_func=None):
respect_deps=False, check_func=None):
"""Helper method to find resources for a job without delaying the jobs given as `priority_jobs`.
To accomplish this the resources are temporarily reserved and freed later.
......@@ -27,6 +30,8 @@ def find_resources_without_delaying_priority_jobs(
:param resources_filter: the filter to use for the resource filtering
(can be used to implement topology aware scheduling)
:param respect_deps: priority jobs are reserved even if their dependencies are not fulfilled yet.
:param check_func: a function to check whether or not the job should be scheduled.
The signature is: job, res, temporary_allocations; whereas
the job is the job to be scheduled, res are the resources
......@@ -34,21 +39,53 @@ def find_resources_without_delaying_priority_jobs(
the allocations of the jobs with allocations in the future
(reserved jobs).
"""
scheduler.debug("Reserve priority jobs {jobs}",
jobs=priority_jobs,
type="reserve_priority_jobs")
# Temporarily allocate the priority jobs
temporary_allocations = []
for j in priority_jobs:
time = scheduler.time
if respect_deps:
for dep in j.resolved_dependencies:
if isinstance(dep, Job):
if dep.running:
time = max(
time,
increment_float(
dep.start_time +
dep.requested_time,
Resource.TIME_DELTA,
True))
elif dep.completed:
time = max(
time,
increment_float(
dep.finish_time,
Resource.TIME_DELTA,
True))
else:
return resources.create()
else:
return resources.create()
# Allow allocations in the future to find the first fit for all priority
# jobs
start_time, res = resources.find_with_earliest_start_time(
j, allow_future_allocations=True,
filter=resources_filter)
filter=resources_filter,
time=time)
assert res, "No future allocations were found in backfilling for priority job"
a = Allocation(start_time, resources=res, job=j)
a.allocate_all(scheduler)
temporary_allocations.append(a)
scheduler.debug("Search resources for backfilling job {job}",
job=job,
type="search_backfilling_resources")
# Search for allocations for the given job (not in the future)
res = resources.find_sufficient_resources_for_job(
job, filter=resources_filter)
......@@ -57,17 +94,26 @@ def find_resources_without_delaying_priority_jobs(
if not check_func(job, res, temporary_allocations):
res = None
scheduler.debug("Free priority jobs {jobs}",
jobs=priority_jobs,
type="free_priority_jobs")
# Free the temporarily acquired allocations
for t in temporary_allocations:
t.free()
scheduler.debug("Results of resources for backfilling job {job}: {res}",
job=job,
res=res,
type="search_backfilling_resources_result")
return res
def schedule_jobs_without_delaying_priorities(
scheduler, priority_jobs, resources, jobs,
abort_on_first_nonfitting=False, resources_filter=None,
check_func=None, handle_scheduled_func=None):
check_func=None, respect_deps=False, handle_scheduled_func=None):
"""Schedule jobs without delaying given priority jobs.
:param scheduler: the scheduler
......@@ -91,6 +137,8 @@ def schedule_jobs_without_delaying_priorities(
the allocations of the jobs with allocations in the future
(reserved jobs).
:param respect_deps: priority jobs are reserved even if their dependencies are not fulfilled yet.
:param handle_scheduled_func: a function which will be given the latest scheduled job
as a parameter
"""
......@@ -98,7 +146,7 @@ def schedule_jobs_without_delaying_priorities(
for job in jobs:
res = find_resources_without_delaying_priority_jobs(
scheduler, priority_jobs, resources, job, resources_filter,
check_func)
respect_deps, check_func)
if res:
job.schedule(res)
if handle_scheduled_func is not None:
......
......@@ -41,6 +41,7 @@ class Allocation:
walltime = job.requested_time
self._walltime = walltime
self._estimated_end_time = None
if isinstance(resources, Resource):
self.add_resource(resources)
......@@ -81,12 +82,15 @@ class Allocation:
assert not self.allocated and not self.previously_allocated, "Allocation is in invalid state"
assert self._end_time is None or new_time < self.estimated_end_time, "An allocation can not start after its end time"
self._start_time = new_time
self._estimated_end_time = None
@property
def estimated_end_time(self):
"""The estimated end time of this allocation (either explicit or based on the walltime)."""
return self._end_time or (
self.start_time + (self.walltime or float("Inf")))
if self._estimated_end_time is None:
self._estimated_end_time = self.end_time or (
self.start_time + (self.walltime or float("Inf")))
return self._estimated_end_time
@property
def end_time(self):
......@@ -103,6 +107,7 @@ class Allocation:
assert not self.allocated and not self.previously_allocated, "Allocation is in invalid state"
assert new_time > 0, "The walltime has to be > 0"
self._walltime = new_time
self._estimated_end_time = None
@property
def allocated_resources(self):
......@@ -146,7 +151,7 @@ class Allocation:
:param newjob: the job to be allocated on this allocation
"""
assert not self.allocated and not self.previously_allocated, "Allocation is in invalid state"
assert self._job is None, "Job is not assigned"
assert self._job is None, "Job is already assigned"
assert newjob.open, "Job is not in state open"
assert self.fits_job(newjob), "Job does not fit in allocation"
......@@ -309,6 +314,7 @@ class Allocation:
r._do_free_allocation(self)
self._end_time = self._scheduler.time
self._estimated_end_time = None
self._allocated = False
self._previously_allocated = True
......
"""
batsim.sched.events
~~~~~~~~~~~~~~~~~~~
This module provides handling of scheduling events.
"""
import logging
import json
import csv
import io
from .logging import Logger
from .utils import ObserveList, filter_list
class LoggingEvent:
"""Class for storing data about events triggered by the scheduler.
:param time: the simulation time when the event occurred
:param level: the importance level of the event
:param open_jobs: the number of open jobs
:param processed_jobs: the number of processed jobs (completed, killed, etc.)
:param msg: the actual message of the event
:param type: the type of the event (`str`)
:param data: additional data attached to the event (`dict`)
"""
def __init__(
self,
time,
level,
open_jobs,
processed_jobs,
msg,
type,
data):
self.time = time
self.open_jobs = open_jobs
self.processed_jobs = processed_jobs
self.level = level
self.msg = msg
self.type = type
self.data = data
def __str__(self):
return "[{:.6f}] {}/{} <{}> ({})".format(
self.time, self.processed_jobs, self.open_jobs,
self.type, self.msg)
@classmethod
def get_csv_header(self):
output = io.StringIO()
csvdata = ["time", "level", "processed_jobs", "open_jobs",
"type", "message", "data"]
writer = csv.writer(
output,
quoting=csv.QUOTE_NONNUMERIC,
delimiter=';')
writer.writerow(csvdata)
return output.getvalue().strip()
def to_csv_line(self):
def conv_obj(o):
try:
return o.__dict__
except (AttributeError, ValueError):
return str(o)
data = {}
for k, v in self.data.items():
k = str(k)
if hasattr(v, "to_json_dict"):
v = v.to_json_dict()
elif hasattr(v, "__iter__") and not isinstance(v, str):
new_v = []
for e in v:
if hasattr(e, "to_json_dict"):
e = e.to_json_dict()
new_v.append(e)
v = new_v
data[k] = v
try:
data = json.dumps(data, default=lambda o: conv_obj(o))
except Exception as e:
raise ValueError(
"Error while dumping json data: {}"
.format(data))
output = io.StringIO()
csvdata = [self.time, self.level, self.processed_jobs, self.open_jobs,
self.type, self.msg, data]
writer = csv.writer(
output,
quoting=csv.QUOTE_NONNUMERIC,
delimiter=';')
writer.writerow(csvdata)
return output.getvalue().strip()
@classmethod
def from_entries(cls, parts):
time = float(parts[0])
level = int(parts[1])
processed_jobs = int(parts[2])
open_jobs = int(parts[3])
type = parts[4]
msg = parts[5]
try:
data = json.loads(parts[6])
except Exception:
raise ValueError(
"Error while parsing data entry in line: {}"
.format(parts))
return LoggingEvent(time, level, open_jobs, processed_jobs, msg, type,
data)
class EventList(ObserveList):
def filter(
self,
*args,
time_after=None,
time_at=None,
time_before=None,
level=None,
types=[],
**kwargs):
"""Filter the event list to search for specific events.
:param time_after: Search for events after a specified time.
:param time_at: Search for events at a specified time.
:param time_before: Search for events before a specified time.
:param level: Search for events with a given logging level.
:param types: Search for events with one of the given event types.
"""
no_filters = False
if time_after is None and time_at is None and time_before is None and \
level is None and not types:
no_filters = True
# Filter events
def filter_events(events, **kwargs):
if no_filters:
yield from events
else:
for e in events:
if time_after is not None:
if e.time > time_after:
yield e
continue
if time_before is not None:
if e.time < time_before:
yield e
continue
if time_at is not None:
if e.time == time_at:
yield e
continue
if level is not None:
if e.level == level:
yield e
continue
if types:
if e.type in types:
yield e
continue
return self.create(
filter_list(
self._data,
[filter_events],
*args,
**kwargs))
class EventLogger(Logger):
"""Logger for events which will only log to files and will write the log messages
without any additional formatting.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, streamhandler=False, **kwargs)
@property
def file_formatter(self):
return logging.Formatter('%(message)s')
def load_events_from_file(in_file):
events = EventList()
reader = csv.reader(in_file, quoting=csv.QUOTE_NONNUMERIC, delimiter=';')
# Skip header
next(reader)
for row in reader:
if row:
events.add(LoggingEvent.from_entries(row))
return events
......@@ -196,6 +196,7 @@ class Job:
"""Whether or not this job has been completed."""
return self.state in [
BatsimJob.State.COMPLETED_KILLED,
BatsimJob.State.COMPLETED_WALLTIME_REACHED,
BatsimJob.State.COMPLETED_SUCCESSFULLY,
BatsimJob.State.COMPLETED_FAILED]
......@@ -240,6 +241,7 @@ class Job:
"""Whether this job has failed its execution."""
return self.state in [
BatsimJob.State.COMPLETED_KILLED,
BatsimJob.State.COMPLETED_WALLTIME_REACHED,
BatsimJob.State.COMPLETED_FAILED]
@property
......@@ -319,17 +321,6 @@ class Job:
return self._changed_state
return self._batsim_job.job_state
@property
def kill_reason(self):
"""The kill reason (if any exists) of this job as known by Batsim."""
assert self._batsim_job, "Batsim job is not set => job was not correctly initialised"
return self._batsim_job.kill_reason
@kill_reason.setter
def kill_reason(self, value):
assert self._batsim_job, "Batsim job is not set => job was not correctly initialised"
self._batsim_job.kill_reason = value
@property
def return_code(self):
"""The return code of this job as known by Batsim."""
......@@ -377,6 +368,14 @@ class Job:
result.append(dep)
return ListView(result)
@property
def progress(self):
return self._batsim_job.progress
@progress.setter
def progress(self, value):
self._batsim_job.progress = value
def free(self):
"""Free the current allocation of this job."""
assert self._batsim_job, "Batsim job is not set => job was not correctly initialised"
......@@ -477,8 +476,6 @@ class Job:
self._scheduler.info(
"Rejecting job ({job}), reason={reason}",
job=self, reason=self.rejected_reason, type="job_rejection")
self._scheduler._log_job(
self._scheduler.time, self, "rejected", reason)
self._scheduler._batsim.reject_jobs([self._batsim_job])
del self._scheduler._scheduler._jobmap[self._batsim_job.id]
......@@ -548,8 +545,18 @@ class Job:
alloc = []
for res in self.allocation.allocated_resources:
if res.num_active != 1:
self._scheduler.fatal(
"Scheduled resource {res} was already part of a Batsim allocation",
res=res, type="resource_already_allocated")
alloc.append(res.id)
self._scheduler.debug(
"Start job {batsim_job} on {resources}",
batsim_job=self._batsim_job,
resources=alloc,
type="start_job")
# Start the jobs
self._scheduler._batsim.start_jobs(
[self._batsim_job], {self.id: alloc})
......@@ -562,13 +569,13 @@ class Job:
self._start_time = self._scheduler.time
self._jobs_list.update_element(self)
def change_state(self, state, kill_reason="", return_code=None):
def change_state(self, state, return_code=None):
"""Change the state of a job. This is only needed in rare cases where the real job
should not be executed but instead the state should be set manually.
"""
assert self._batsim_job, "Batsim job is not set => job was not correctly initialised"
self._scheduler._batsim.change_job_state(
self._batsim_job, state, kill_reason)
self._batsim_job, state)
self._changed_state = state
if state == Job.State.RUNNING:
......@@ -576,24 +583,24 @@ class Job:
self._scheduled = True
elif state in [Job.State.COMPLETED_FAILED,
Job.State.COMPLETED_SUCCESSFULLY,
Job.State.COMPLETED_WALLTIME_REACHED,
Job.State.COMPLETED_KILLED]:
self._batsim_job.finish_time = self._scheduler.time
self._batsim_job.kill_reason = kill_reason
self._batsim_job.return_code = return_code or 0 if state == Job.State.COMPLETED_SUCCESSFULLY else 1
self._scheduler._log_job(self._scheduler.time, self, "completed")
self._batsim_job.return_code = (
return_code or 0 if state == Job.State.COMPLETED_SUCCESSFULLY else 1)
self._jobs_list.update_element(self)
def __str__(self):
data = self.to_json_dict()
return (
"<Job {}; queue:{} sub:{} reqtime:{} res:{} prof:{} start:{} fin:{} stat:{} killreason:{} ret:{} comment:{}>"
"<Job {}; number:{} queue:{} sub:{} reqtime:{} res:{} prof:{} start:{} fin:{} stat:{} ret:{} comment:{}>"
.format(
data["id"], data["queue_number"], data["submit_time"], data["requested_time"],
data["id"], data["number"], data["queue_number"],
data["submit_time"], data["requested_time"],
data["requested_resources"], data["profile"],
data["start_time"],
data["finish_time"], data["state"],
data["kill_reason"],
data["return_code"],
data["comment"]))
......@@ -612,8 +619,25 @@ class Job:
if self.state is not None:
state = self.state.name
split_id = self.id.split(Batsim.WORKLOAD_JOB_SEPARATOR)
parent_id = ""
parent_workload_name = ""
parent_number = ""
if self.parent_job:
parent_id = self.parent_job.id
parent_split_id = parent_id.split(Batsim.WORKLOAD_JOB_SEPARATOR)
parent_workload_name = parent_split_id[0]
parent_number = parent_split_id[1]
return {
"id": self.id,
"workload_name": split_id[0],
"number": split_id[1],
"parent_id": parent_id,
"parent_workload_name": parent_workload_name,
"parent_number": parent_number,
"queue_number": self.number,
"submit_time": self.submit_time,
"requested_time": self.requested_time,
......@@ -623,7 +647,7 @@ class Job:
"start_time": self.start_time,
"finish_time": self.finish_time,
"state": state,
"kill_reason": self.kill_reason,
"success": True if self.success else False,
"return_code": self.return_code,
"comment": self.comment
}
......@@ -654,6 +678,16 @@ class Jobs(ObserveList):
self._job_map = {}
super().__init__(*args, **kwargs)
def __eq__(self, other):
if type(other) is not Jobs:
return False
if len(self._job_map) != len(other._job_map):
return False
return all(
[me.id == him.id
for me, him
in zip(self._job_map.values(), other._job_map.values())])
@property
def runnable(self):
"""Returns all jobs which are runnable."""
......
......@@ -7,76 +7,6 @@
import logging
import os
import json
class LoggingEvent:
"""Class for storing data about events triggered by the scheduler.
:param time: the simulation time when the event occurred
:param level: the importance level of the event
:param open_jobs: the number of open jobs
:param processed_jobs: the number of processed jobs (completed, killed, etc.)
:param msg: the actual message of the event
:param type: the type of the event (`str`)
:param data: additional data attached to the event (`dict`)
"""
def __init__(
self,
time,
level,
open_jobs,
processed_jobs,
msg,
type,
data):
self.time = time
self.open_jobs = open_jobs
self.processed_jobs = processed_jobs
self.level = level
self.msg = msg
self.type = type
self.data = data
def to_message(self):
"""Returns a human readable message presentation of this event."""
return "[{:.6f}] {}/{} <{}> ({})".format(
self.time, self.processed_jobs, self.open_jobs,
self.type, self.msg)
def __str__(self):
data = []
for k, v in self.data.items():
try:
k = json.dumps(k, default=lambda o: o.__dict__)
except (AttributeError, ValueError):
k = json.dumps(str(k), default=lambda o: o.__dict__)
if hasattr(v, "to_json_dict"):
v = v.to_json_dict()
try:
v = json.dumps(v, default=lambda o: o.__dict__)
except (AttributeError, ValueError):
raise ValueError(
"Object could not be serialised: {}".format(v))
else:
try:
v = json.dumps(v, default=lambda o: o.__dict__)
except (AttributeError, ValueError):
v = json.dumps(str(v), default=lambda o: o.__dict__)
data.append("{}: {}".format(k, v))
data = "{" + ", ".join(data) + "}"
return "{:.6f};{};{};{};{};{};{}".format(
self.time, self.level, self.processed_jobs, self.open_jobs,
self.type, self.msg, data)
class Logger:
......@@ -117,14 +47,12 @@ class Logger:
self._debug = debug = debug or str(debug).lower() in [
"y", "yes", "true", "1"]
if debug:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
log_level = logging.DEBUG if debug else logging.INFO
logger.setLevel(log_level)
if streamhandler:
handler = logging.StreamHandler()
handler.setLevel(logging.INFO)
handler.setLevel(log_level)
handler.setFormatter(self.formatter)
logger.addHandler(handler)
......@@ -171,16 +99,3 @@ class Logger:
def error(self, *args, **kwargs):
"""Writes a error message to the logger."""
self._logger.error(*args, **kwargs)
class EventLogger(Logger):
"""Logger for events which will only log to files and will write the log messages
without any additional formatting.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, streamhandler=False, **kwargs)
@property
def file_formatter(self):
return logging.Formatter('%(message)s')
......@@ -126,12 +126,13 @@ class Profiles(metaclass=ABCMeta):
try:
t = d["type"]
except KeyError:
return Unknown(data=data, name=name)
return Profiles.Unknown(data=d, name=name)
profiles = [
cls.Delay,
cls.Parallel,
cls.ParallelHomogeneous,
cls.ParallelHomogeneousTotal,
cls.Smpi,
cls.Sequence,
cls.ParallelPFS,
......@@ -143,7 +144,7 @@ class Profiles(metaclass=ABCMeta):
for p in profiles:
if t == p.type:
return p.from_dict(d, name=name)
return Unknown(data=data, type=t, name=name)
return Profiles.Unknown(data=d, type=t, name=name)
class Unknown(Profile):
"""The profile was not recognized."""
......@@ -192,8 +193,7 @@ class Profiles(metaclass=ABCMeta):
@classmethod
def from_dict(cls, dct, name=None):
return cls(nbres=dct["nb_res"],
cpu=dct["cpu"],
return cls(cpu=dct["cpu"],
com=dct["com"],
ret=dct.get("ret", 0),
name=name)
......@@ -201,7 +201,6 @@ class Profiles(metaclass=ABCMeta):
def to_dict(self, embed_references=False):
return {
"type": self.type,
"nb_res": self.nbres,
"cpu": self.cpu,
"com": self.com,
"ret": self.ret,
......@@ -232,6 +231,12 @@ class Profiles(metaclass=ABCMeta):
"ret": self.ret,
}
class ParallelHomogeneousTotal(ParallelHomogeneous):
"""Implementation of the MsgParallelHomogeneousTotal profile."""
type = "msg_par_hg_tot"
class Smpi(Profile):
"""Implementation of the Smpi profile."""
......@@ -322,7 +327,7 @@ class Profiles(metaclass=ABCMeta):
return {
"type": self.type,
"size": self.size,
"direction": self.direction.name,
"direction": self.direction.name.lower(),
"host": self.host.name,
"ret": self.ret,
}
......@@ -354,7 +359,7 @@ class Profiles(metaclass=ABCMeta):
return {
"type": self.type,
"size": self.size,
"direction": self.direction.name,
"direction": self.direction.name.lower(),
"ret": self.ret,
}
......
......@@ -7,7 +7,7 @@
"""
from enum import Enum
from .utils import ObserveList, filter_list, ListView, build_filter
from .utils import ObserveList, filter_list, ListView, build_filter, increment_float
class ResourceRequirement:
......@@ -110,17 +110,27 @@ class Resource:
@property
def active(self):
"""Whether or not this resource is currently active in some of its resources."""
return self.num_active > 0
@property
def num_active(self):
"""Number of allocations in which this resource is currently active."""
num = 0
for alloc in self._allocations:
if self in alloc.allocated_resources:
return True
return False
num += 1
return num
@property
def resources(self):
"""Returns a list containing only the resource (for compatibility with the `Resources` class)."""
return [self]
def find_first_time_to_fit_job(self, job, time=None):
def find_first_time_to_fit_job(
self,
job,
time=None,
future_reservation=False):
"""Finds the first time after which the job can start.
:param job: the job to find a time slot for
......@@ -277,37 +287,87 @@ class ComputeResource(Resource):
self._old_pstate = None
self._pstate_update_in_progress = False
def find_first_time_to_fit_job(self, job, time=None):
return self.find_first_time_to_fit_walltime(job.requested_time, time)
def find_first_time_to_fit_job(
self,
job,
time=None,
future_reservation=False):
return self.find_first_time_to_fit_walltime(job.requested_time, time,
future_reservation)
def find_first_time_to_fit_walltime(self, requested_walltime, time=None):
def find_first_time_to_fit_walltime(self, requested_walltime, time=None,
future_reservation=False):
"""Finds the first time after which the requested walltime is available for a job start.
:param requested_walltime: the size of the requested time slot
:param time: the starting time after which a time slot is needed
:param future_reservation: if future_reservation is set to True, it must not be
guaranteed that the resource is already freed by Batsim at the given time.
"""
present_time = self._scheduler.time
if time is None:
time = self._scheduler.time
time = present_time
start_time = time
time_updated = True
while time_updated:
time_updated = False
# Search the earliest time when a slot for an allocation is
# available
for alloc in self._allocations:
if alloc.start_time <= time and alloc.estimated_end_time >= time:
time = alloc.estimated_end_time + Resource.TIME_DELTA
time_updated = True
# If the result should be found for the current scheduling time
# (is_present) than not the estimated_end_time is used but the
# real end_time (or infinity) because there could be allocations
# in the current time which are not yet freed by Batsim (in case
# of jobs getting killed after their walltime). This is due to
# the implementation of killing jobs inside Batsim which is
# implemented by using a new killer process.
if alloc.end_time is None:
end_time = float("Inf")
else:
end_time = alloc.estimated_end_time
if alloc.start_time <= time and end_time == float(
"Inf") and not future_reservation and time <= present_time:
return None
if alloc.start_time <= time and end_time >= time:
new_time = increment_float(
alloc.estimated_end_time,
Resource.TIME_DELTA,
until_changed=True)
if new_time < start_time:
new_time = start_time
if new_time > time:
time_updated = True
time = new_time
break
# Check whether or not the full requested walltime fits into the
# slot, otherwise move the slot at the end of the found conflicting
# allocation and then repeat the loop.
estimated_end_time = time + requested_walltime
for alloc in self._allocations:
if alloc.start_time > time and alloc.start_time < (
estimated_end_time + Resource.TIME_DELTA):
time = alloc.estimated_end_time + Resource.TIME_DELTA
estimated_end_time = time + requested_walltime
time_updated = True
if not time_updated:
estimated_end_time = increment_float(time, requested_walltime,
until_changed=True)
estimated_end_time = increment_float(estimated_end_time,
Resource.TIME_DELTA,
until_changed=True)
for alloc in self._allocations:
if alloc.start_time > time and alloc.start_time < (
estimated_end_time):
new_time = increment_float(
alloc.estimated_end_time,
Resource.TIME_DELTA,
until_changed=True)
if new_time < start_time:
new_time = start_time
if new_time > time:
time_updated = True
time = new_time
estimated_end_time = time + requested_walltime
break
return time
@property
......@@ -390,24 +450,24 @@ class Resources(ObserveList):
"""The list of all special resources (managed by the scheduler logic)."""
return self.filter(special=True)
def __getitem__(self, items):
def __getitem__(self, item):
"""Returns either a slice of resources or returns a resource based on a given resource id."""
if isinstance(items, slice):
return self.create(self.all[items])
if isinstance(item, slice):
return self.create(self.all[item])
else:
return self._resource_map[items]
return self._resource_map[item]
def __delitem__(self, index):
def __delitem__(self, item):
"""Deletes a resource with the given resource id."""
resource = self._resource_map[items]
resource = self._resource_map[item]
self.remove(resource)
def _element_new(self, resource):
if resource.id:
if resource.id is not None:
self._resource_map[resource.id] = resource
def _element_del(self, resource):
if resource.id:
if resource.id is not None:
del self._resource_map[resource.id]
def find_first_time_and_resources_to_fit_walltime(
......@@ -416,7 +476,8 @@ class Resources(ObserveList):
time,
min_matches=None,
max_matches=None,
filter=None):
filter=None,
future_reservation=False):
"""Find sufficient resources and the earlierst start time to fit a job and its resource requirements.
:param requested_walltime: the walltime which should fit in the allocation
......@@ -438,11 +499,12 @@ class Resources(ObserveList):
sufficient_resources_found = False
found_resources = []
earliest_time_available = None
time_changed = False
for r in res:
new_time = r.find_first_time_to_fit_job(
job, time)
job, time, future_reservation)
if new_time is None:
continue
if new_time == time:
found_resources.append(r)
if min_matches is None or len(
......@@ -465,12 +527,24 @@ class Resources(ObserveList):
if found_resources and (
min_matches is None or len(found_resources) >= min_matches):
break
else:
elif earliest_time_available is not None:
time = earliest_time_available
else:
return None, None
else:
break
elif earliest_time_available:
time = earliest_time_available
if time != earliest_time_available:
time = earliest_time_available
else:
return None, None
else:
if max_matches is not None:
found_resources = found_resources[:max_matches]
if min_matches is not None and len(
found_resources) < min_matches:
found_resources = []
return time, found_resources
found_length = len(found_resources)
......@@ -486,47 +560,93 @@ class Resources(ObserveList):
while True:
result = set()
times_found = set()
s_found_all = []
is_valid = True
new_time, found_res = do_find(job,
time,
self._data,
min_matches,
max_matches,
filter)
s_found_all = []
new_time2 = new_time
if new_time is None:
return None, None
if not found_res:
is_valid = False
if new_time < time:
job._scheduler.fatal(
"Found time is before the current time: old={time_old}, new={time_new}",
time_old=time, time_new=new_time,
type="find_resource_failed_time_old")
times_found.add(new_time)
for h in job._scheduler.get_find_resource_handlers:
reqs = h(job._scheduler, job)
for r in reqs:
new_time2, s_found = do_find(job,
new_time2,
new_time,
r.resources,
r.min,
r.max,
r.filter)
if new_time2 is None:
return None, None
if not s_found:
is_valid = False
if new_time2 < new_time:
job._scheduler.fatal(
"Found time is before the current time: old={time_old}, new={time_new}",
time_old=new_time, time_new=new_time2,
type="find_resource_failed_time_old2")
s_found_all += s_found
if new_time == new_time2:
times_found.add(new_time2)
if new_time2 != new_time:
break
if len(times_found) == 1:
if not is_valid:
return None, None
return new_time, self.create(set(found_res + s_found_all))
else:
time = max(new_time, new_time2)
new_time = max(times_found)
if time == new_time:
job._scheduler.fatal(
"Finding new resource failed. Time has not changed: old={time_old}, new={time_new}",
time_old=time, time_new=list(times_found),
type="find_resource_failed_time_not_changed")
time = new_time
def find_with_earliest_start_time(
self, job, allow_future_allocations=False,
filter=None):
filter=None, time=None):
"""Find sufficient resources and the earlierst start time for a given job.
:param job: the job for which the start times and resources should be found
:param allow_future_allocations: whether or not allocations starting after
the current simulation time are allowed
the current simulation time are allowed. If false only resources are returned which are guaranteed to be free in the present time.
:param filter: the filter to be applied when a set of resources was found
"""
start_time, found_resources = self.find_first_time_and_resources_to_fit_walltime(job, max(
job._scheduler.time, job.submit_time), job.requested_resources, job.requested_resources, filter)
if time is None:
time = job._scheduler.time
if not allow_future_allocations and start_time != job._scheduler.time:
found_resources = self.create()
start_time, found_resources = self.find_first_time_and_resources_to_fit_walltime(job, max(
time, job.submit_time), job.requested_resources, job.requested_resources, filter,
allow_future_allocations)
if not allow_future_allocations:
if start_time is None:
start_time = time
found_resources = self.create()
elif start_time != time:
found_resources = self.create()
return start_time, found_resources
......
......@@ -19,7 +19,8 @@ from .reply import ConsumedEnergyReply
from .utils import DictWrapper
from .messages import Message
from .utils import ListView
from .logging import LoggingEvent, Logger, EventLogger
from .logging import Logger
from .events import LoggingEvent, EventLogger, EventList
from .workloads import WorkloadDescription
......@@ -82,14 +83,13 @@ class BaseBatsimScheduler(BatsimScheduler):
for job in jobs:
jobobj = self._jobmap[job.id]
del self._jobmap[job.id]
jobobjs.append(job)
jobobj.progress = job.progress
jobobjs.append(jobobj)
self._scheduler.info("The following jobs were killed: ({jobs})",
jobs=jobobjs, type="jobs_killed_received")
for job in jobobjs:
job._do_complete_job()
self._scheduler._log_job(self._scheduler.time, job, "killed")
jobobj._do_complete_job()
self._scheduler.on_jobs_killed(jobobjs)
self._scheduler._do_schedule()
......@@ -146,7 +146,6 @@ class BaseBatsimScheduler(BatsimScheduler):
self._scheduler.info("Job has completed its execution ({job})",
job=jobobj, type="job_completion_received")
self._scheduler._log_job(self._scheduler.time, jobobj, "completed")
jobobj._do_complete_job()
......@@ -196,6 +195,21 @@ class BaseBatsimScheduler(BatsimScheduler):
self._scheduler.on_report_energy_consumed(reply)
self._scheduler._do_schedule(reply)
def onAddResources(self, resources):
self._scheduler._update_time()
self._scheduler.info(
"Received add Resources message: {resources}",
resources=resources,
type="add_resources_received")
self._scheduler.on_add_resources(resources)
def onRemoveResources(self, resources):
self._scheduler.info(
"Received remove Resources message: {resources}",
resources=resources,
type="remove_resources_received")
self._scheduler.on_remove_resources(resources)
class Scheduler(metaclass=ABCMeta):
"""The high-level scheduler which should be interited from by concrete scheduler
......@@ -215,27 +229,19 @@ class Scheduler(metaclass=ABCMeta):
self._options = options
debug = self.options.get("debug", False)
export_prefix = self.options.get("export-prefix", "out")
write_events = bool(self.options.get("write-events", False))
self._log_debug_events = self.options.get("log-debug-events", False)
# Create the logger
self._logger = Logger(self, debug=debug)
self._last_published_event = None
self._event_logger = None
if write_events:
self._event_logger = EventLogger(
self, "Events", debug=debug,
to_file="{}_last_events.csv".format(export_prefix),
append_to_file="{}_events.csv".format(export_prefix))
self._event_logger = EventLogger(
self, "Events", debug=debug,
to_file="{}_events.csv".format(export_prefix))
self._event_logger.info(LoggingEvent.get_csv_header())
self._sched_jobs_logger = EventLogger(
self,
"SchedJobs",
debug=debug,
to_file="{}_sched_jobs.csv".format(export_prefix))
self._log_job_header()
self._events = []
self._events = EventList()
# Use the basic Pybatsim scheduler to wrap the Batsim API
self._scheduler = BaseBatsimScheduler(self, options)
......@@ -262,7 +268,7 @@ class Scheduler(metaclass=ABCMeta):
@property
def events(self):
"""The events happened in the scheduler."""
return ListView(self._events)
return self._events
@property
def dynamic_workload(self):
......@@ -366,6 +372,7 @@ class Scheduler(metaclass=ABCMeta):
open_jobs = self._batsim.nb_jobs_received
processed_jobs = (self._batsim.nb_jobs_completed +
self._batsim.nb_jobs_failed +
self._batsim.nb_jobs_rejected +
self._batsim.nb_jobs_timeout +
self._batsim.nb_jobs_killed +
len(self._batsim.jobs_manually_changed))
......@@ -377,9 +384,9 @@ class Scheduler(metaclass=ABCMeta):
event = LoggingEvent(self.time, level, open_jobs, processed_jobs,
msg, type, kwargs)
self._events.append(event)
self._events.add(event)
event_str = event.to_message()
event_str = str(event)
try:
do_publish = True
......@@ -399,102 +406,39 @@ class Scheduler(metaclass=ABCMeta):
self.on_event(event)
return str(event)
def _log_job_header(self):
header = [
"time",
"full_job_id",
"workload_name",
"job_id",
"full_parent_job_id",
"parent_workload_name",
"parent_job_id",
"submission_time",
"requested_number_of_processors",
"requested_time",
"success",
"starting_time",
"finish_time",
"comment",
"type",
"reason"
]
self._sched_jobs_logger.info(";".join([str(i) for i in header]))
def _log_job(
self,
time,
job,
type_of_completion,
reason_for_completion=""):
full_parent_job_id = ""
parent_job_id = ""
parent_workload_name = ""
if job.parent_job:
full_parent_job_id = job.parent_job.id
split_parent = full_parent_job_id.split(
Batsim.WORKLOAD_JOB_SEPARATOR)
parent_workload_name = split_parent[0]
parent_job_id = split_parent[1]
id = job.id.split(Batsim.WORKLOAD_JOB_SEPARATOR)
msg = [
time, # time
job.id, # full_job_id
id[0], # workload_name
id[1], # job_id
full_parent_job_id, # full_parent_job_id
parent_workload_name, # parent_workload_name
parent_job_id, # parent_job_id
job.submit_time, # submission_time
job.requested_resources, # requested_number_of_processors
job.requested_time, # requested_time
1 if job.success else 0, # success
job.start_time, # starting_time
job.finish_time, # finish_time
job.comment or "", # comment
type_of_completion, # type
reason_for_completion # reason
]
msg = ["" if s is None else s for s in msg]
self._sched_jobs_logger.info(";".join([str(i) for i in msg]))
return event.to_csv_line()
def debug(self, msg, **kwargs):
"""Writes a debug message to the logging facility."""
self._logger.debug(self._format_log_msg(msg, **kwargs))
event = self._format_event_msg(1, msg, **kwargs)
if self._event_logger:
if self._log_debug_events:
self._logger.debug(self._format_log_msg(msg, **kwargs))
event = self._format_event_msg(1, msg, **kwargs)
self._event_logger.info(event)
def info(self, msg, **kwargs):
"""Writes a info message to the logging facility."""
self._logger.info(self._format_log_msg(msg, **kwargs))
event = self._format_event_msg(2, msg, **kwargs)
if self._event_logger:
self._event_logger.info(event)
self._event_logger.info(event)
def warn(self, msg, **kwargs):
"""Writes a warn message to the logging facility."""
self._logger.warn(self._format_log_msg(msg, **kwargs))
event = self._format_event_msg(3, msg, **kwargs)
if self._event_logger:
self._event_logger.info(event)
self._event_logger.info(event)
def error(self, msg, **kwargs):
"""Writes a error message to the logging facility."""
self._logger.error(self._format_log_msg(msg, **kwargs))
event = self._format_event_msg(4, msg, **kwargs)
if self._event_logger:
self._event_logger.info(event)
self._event_logger.info(event)
def fatal(self, msg, **kwargs):
"""Writes a fatal message to the logging facility and terminates the scheduler."""
error_msg = self._format_log_msg(msg, **kwargs)
self._logger.error(error_msg)
event = self._format_event_msg(5, msg, **kwargs)
if self._event_logger:
self._event_logger.info(event)
self._event_logger.info(event)
raise ValueError("Fatal error: {}".format(error_msg))
def _on_pre_init(self):
......@@ -502,7 +446,7 @@ class Scheduler(metaclass=ABCMeta):
If the _pre_init method is overridden the super method should be called with:
`super()._pre_init()`
"""
for r in self._batsim.resources:
for r in self._batsim.resources.values():
self._resources.add(ComputeResource(self,
id=r["id"],
name=r["name"],
......@@ -659,6 +603,20 @@ class Scheduler(metaclass=ABCMeta):
"""
pass
def on_add_resources(self, resources):
"""Hook similar to the low-level API.
:param resources: a procset of resources
"""
pass
def on_remove_resources(self, resources):
"""Hook similar to the low-level API.
:param resources: a procset of resources
"""
pass
def on_event(self, event):
"""Hook called on each event triggered by the scheduler.
......
......@@ -24,6 +24,28 @@ class ObserveList:
"""A view of the content of the list."""
return ListView(self._data)
def get(self, idx, default=None):
"""Returns the element at the specified index.
:param idx: the index
:param default: the default value if the list does have less elements.
"""
try:
return self._data[idx]
except IndexError:
return default
@property
def first(self):
"""Return the first element."""
return self.get(0)
@property
def last(self):
"""Return the last element."""
return self.get(len(self._data) - 1)
def _check_new_elem(self, element):
"""Checks whether a new element should be added.
......@@ -118,6 +140,9 @@ class ObserveList:
def __str__(self):
return str([str(entry) for entry in self._data])
def __rep__(self):
return __str__(self)
def apply(self, apply):
"""Apply a function to modify the list (e.g. sorting the list).
......@@ -307,6 +332,9 @@ class ContainerView:
def __str__(self, *args, **kwargs):
return self._data.__str__(*args, **kwargs)
def __rep__(self, *args, **kwargs):
return self._data.__rep__(*args, **kwargs)
class ListView(ContainerView):
"""A view for dictionaries."""
......@@ -323,3 +351,16 @@ class DictView(ContainerView):
def get(self, *args, **kwargs):
return self._data.get(*args, **kwargs)
def increment_float(base, delta, until_changed=False):
old_base = base
while True:
base += delta
if not until_changed:
break
elif base == old_base:
delta *= 10
else:
break
return base
"""
batsim.sched.workloads
~~~~~~~~~~~~~~~~~~~~~~
This modules provides utilities to generate workloads from workload models.
"""
from .workloads import *
__all__ = [
"JobDescription",
"WorkloadDescription",
"generate_workload",
]
"""
batsim.sched.workloads.models
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
This package provides tools to generate workloads from models.
"""
"""
batsim.sched.workloads.models.generator
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
This module contains base classes and declarations which can be extended to implement
concrete workload models which directly work with Batsim.
"""
import argparse
import random
import sys
import time
import copy
import json
from .. import WorkloadDescription
from batsim.sched.profiles import Profiles
from ...utils import increment_float
class ParamRange:
def __init__(self, lower_bound, upper_bound):
self.lower_bound = lower_bound
self.upper_bound = upper_bound
def __call__(self, random_instance):
is_float = isinstance(self.lower_bound, float) or \
isinstance(self.upper_bound, float)
randf = random_instance.uniform if is_float else random_instance.randint
return randf(self.lower_bound, self.upper_bound)
class JobModelData:
def __init__(self, **kwargs):
# Fields defined in Standard Workload Format version 2.2
# Job id (optional)
self.job_number = None
# Submission time (optional)
self.submit_time = None
# Difference between the job's submit time and the start time
# (Not relevant to workloads)
self.wait_time = None
# End time minus start time
# (Not relevant to workloads)
self.run_time = None
# Number of processors used by the job
# (Not relevant to workloads)
self.used_processors = None
# Average used cpu time
# (Not relevant to workloads)
self.average_cpu_time = None
# Used memory
# (Not relevant to workloads)
self.used_memory = None
# Number of requested processors (or nodes)
self.requested_processors = None
# The requested walltime
self.requested_time = None
# Requested memory per processor/node
# (Not relevant since the memory model cannot be generalised in
# Pybatsim.
# It heavily depends on the Simgrid platform how the memory is
# modelled)
self.requested_memory = None
# Whether this job's profile terminates successfully (boolean)
self.completed = None
# Number of the user (optional)
self.user_id = None
# Number of the user's group (optional)
self.group_id = None
# Number of the application (optional)
self.application = None
# Number of the job queue
# (Not relevant to workloads)
self.queue = None
# Number of the partition
# (Not relevant to workloads)
self.partition = None
# The preceding job
# (Not relevant to workloads. This should be modelled instead via
# dependencies)
self.preceding_job = None
# Number of seconds since the preceding job
# (Not relevant to workloads)
self.think_time = None
# Additional comment field
self.comment = None
# Additional data field
self.data = None
# Additional fields known by PyBatsim
self.deps = None
# Update fields with values in parameters
self.__dict__.update(kwargs)
def process(self, random):
keys = [k for k, _ in self.__dict__.items()]
keys.sort() # Prevent changing order for random generator
for k in keys:
v = self.__dict__[k]
if callable(v):
self.__dict__[k] = v(random)
@property
def fields_to_export(self):
return [
"job_number",
# "submit_time", # Exported as subtime
"wait_time",
"run_time",
"used_processors",
"average_cpu_time",
"used_memory",
# "requested_processors", # Exported as res
# "requested_time", # Exported as walltime
"requested_memory",
"completed",
"user_id",
"group_id",
"application",
"queue",
"partition",
"preceding_job",
"think_time",
"comment",
"data",
"deps",
]
def copy_job(self):
return copy.deepcopy(self)
def submit(self, model, random, parameters, workload):
assert None not in [
self.submit_time,
self.requested_time,
self.requested_processors,
], "Job misses required fields: " + str(self)
profile = model.configure_profile(random, parameters, self)
assert profile is not None
kwargs = {}
for field in (self.fields_to_export +
model.additional_job_fields_to_export):
try:
if self.__dict__[field] is not None:
kwargs[field] = self.__dict__[field]
except KeyError:
pass
workload.new_job(id=self.job_number,
subtime=self.submit_time,
walltime=self.requested_time,
res=self.requested_processors,
profile=profile,
**kwargs)
def __str__(self):
return "<Job {}>".format(
", ".join("{}:{}".format(k, v) for k, v in self.__dict__.items()
if not k.startswith("_") and v is not None))
class Option:
def __init__(self, description=None, default=None):
self.description = description
self.default = default
class WorkloadModel:
def __init__(self, **kwargs):
self.parameters = self.merge_parameters(copy.deepcopy(kwargs))
def merge_parameters(self, parameters, base=None):
if base is None:
base = self.default_parameters
base = copy.deepcopy(base)
for k, v in base.items():
try:
new_v = parameters[k]
if new_v.description is not None:
v.description = new_v.description
if new_v.default is not None:
v.default = new_v.default
except KeyError:
pass
for k, v in parameters.items():
if k not in base:
base[k] = v
return base
def apply_parameters(self, parameters, base=None):
if base is None:
base = self.default_parameters
base = copy.deepcopy(base)
kwargs = {}
for k, v in base.items():
kwargs[k] = v.default
for k, v in parameters.items():
if k not in kwargs:
raise ValueError(
"Option parameter '{}' is unknown. Use -H to list valid options.".format(k))
else:
kwargs[k] = v
return kwargs
def configure_profile(self, random, parameters, job):
completed = job.completed if job.completed is not None else True
return (
Profiles.Delay(
job.run_time or increment_float(
job.requested_time, -0.00000000001, True),
ret=0 if completed else 1))
@property
def default_parameters(self):
return {
"random_seed": Option("Seed for the random generator, None means that the current time is used"),
"num_machines": Option(
"Number of machines to generate",
32),
"num_jobs": Option(
"Number of jobs to generate",
100),
}
@property
def additional_job_fields_to_export(self):
return []
def create_jobs(self, random, parameters):
raise NotImplementedError()
def generate(self, name, description=None, date=None, verbose=0,
**kwargs):
if isinstance(verbose, bool):
verbose = 1
parameters = self.apply_parameters(copy.deepcopy(kwargs))
if verbose >= 1:
print("Using workload model: {}".format(self.__class__.__name__),
file=sys.stderr)
seed = parameters.get("random_seed")
if seed is None:
seed = time.time()
if verbose >= 2:
print("Using random seed: {}".format(seed), file=sys.stderr)
r = random.Random()
r.seed(seed)
workload = WorkloadDescription(
name=name,
nb_res=int(parameters["num_machines"]),
description=description,
date=date,
random_seed=seed,
model_class=self.__class__.__name__,
source="Workload generated by Pybatsim model generator")
if verbose >= 1:
print("Generating jobs for chosen workload model", file=sys.stderr)
jobs = self.create_jobs(r, parameters) or []
for job in jobs:
if verbose >= 2:
print("Generating job: {}".format(job), file=sys.stderr)
job.submit(self, r, parameters, workload)
if verbose >= 1:
print("Finalising workload...", file=sys.stderr)
workload.prepare()
return workload
def print_options_help(self):
print("Options:\n")
for k, v in self.default_parameters.items():
print(" {}: {} (default: {})\n".format(
k, v.description or "", v.default))
@classmethod
def as_main(cls, args=sys.argv[1:]):
debug = False
try:
model = cls()
parser = argparse.ArgumentParser(
description="Generates a workload using the model: '{}'"
.format(cls.__name__)
)
parser.add_argument(
"--name", "-n",
help="Sets the name of this workload",
default="generated_workload", type=str
)
parser.add_argument(
"--description", "-d",
help="Sets the description of this workload",
default="Generated workload", type=str
)
parser.add_argument(
"--date", "-i",
help="Sets the date of this workload",
default=None, type=str
)
parser.add_argument(
"--output", "-t",
help="Sets the output file (otherwise stdout is used)",
default=None, type=str
)
parser.add_argument(
"--option",
"-o",
help="Sets additional options of this workload. Use -H to list the known options.",
default=[],
type=str,
action='append')
parser.add_argument(
"--options", "-O",
help="Sets all options from json string. Use -H to list the known options.",
default=None, type=str)
parser.add_argument(
"--verbose", "-v",
help="Increase verbosity",
action="count", default=0
)
parser.add_argument(
"--debug", "-D",
help="Debug output in case of error",
action="store_true"
)
parser.add_argument(
"--options-help", "-H",
help="Print the known option parameters",
action="store_true"
)
args = parser.parse_args(args)
debug = args.debug
if args.options:
try:
options = json.loads(args.options)
except ValueError:
raise ValueError(
"Invalid json data: {}".format(
args.options))
else:
options = {}
for option in args.option:
option = option.split("=")
key = option[0]
if len(option) == 1:
value = True
else:
value = "=".join(option[1:])
try:
value = float(value)
except ValueError:
pass
options[key] = value
if args.options_help:
model.print_options_help()
sys.exit(0)
workload = model.generate(
args.name,
args.description,
args.date,
args.verbose,
**options)
output = args.output
if output is not None:
if args.verbose >= 1:
print(
"Writing workload to file: {}".format(output),
file=sys.stderr)
with open(output, "w") as output:
workload.print(output)
else:
if args.verbose >= 1:
print("Writing workload to stdout", file=sys.stderr)
workload.print()
except Exception as e:
if debug:
raise
else:
print("Error occurred while generating workload: {}".format(e),
file=sys.stderr)
sys.exit(1)