Commit a0361f11 authored by MOMMESSIN Clement's avatar MOMMESSIN Clement

'dynamic submission' -> 'dynamic registration' && update retrieving of batconf options

parent cd41c625
...@@ -96,21 +96,21 @@ class Batsim(object): ...@@ -96,21 +96,21 @@ class Batsim(object):
"type": "CALL_ME_LATER", "type": "CALL_ME_LATER",
"data": {"timestamp": time}}) "data": {"timestamp": time}})
def notify_submission_finished(self): def notify_registration_finished(self):
self._events_to_send.append({ self._events_to_send.append({
"timestamp": self.time(), "timestamp": self.time(),
"type": "NOTIFY", "type": "NOTIFY",
"data": { "data": {
"type": "submission_finished", "type": "registration_finished",
} }
}) })
def notify_submission_continue(self): def notify_registration_continue(self):
self._events_to_send.append({ self._events_to_send.append({
"timestamp": self.time(), "timestamp": self.time(),
"type": "NOTIFY", "type": "NOTIFY",
"data": { "data": {
"type": "continue_submission", "type": "continue_registration",
} }
}) })
...@@ -197,11 +197,11 @@ class Batsim(object): ...@@ -197,11 +197,11 @@ class Batsim(object):
} }
}) })
def submit_profiles(self, workload_name, profiles): def register_profiles(self, workload_name, profiles):
for profile_name, profile in profiles.items(): for profile_name, profile in profiles.items():
msg = { msg = {
"timestamp": self.time(), "timestamp": self.time(),
"type": "SUBMIT_PROFILE", "type": "REGISTER_PROFILE",
"data": { "data": {
"workload_name": workload_name, "workload_name": workload_name,
"profile_name": profile_name, "profile_name": profile_name,
...@@ -209,11 +209,10 @@ class Batsim(object): ...@@ -209,11 +209,10 @@ class Batsim(object):
} }
} }
self._events_to_send.append(msg) self._events_to_send.append(msg)
self.logger.debug("Submitting profile: {}".format(msg["data"])) self.logger.debug("Registering profile: {}".format(msg["data"]))
# Register profile
self.profiles[workload_name][profile_name] = profile self.profiles[workload_name][profile_name] = profile
def submit_job( def register_job(
self, self,
id, id,
res, res,
...@@ -232,7 +231,7 @@ class Batsim(object): ...@@ -232,7 +231,7 @@ class Batsim(object):
} }
msg = { msg = {
"timestamp": self.time(), "timestamp": self.time(),
"type": "SUBMIT_JOB", "type": "REGISTER_JOB",
"data": { "data": {
"job_id": id, "job_id": id,
"job": job_dict, "job": job_dict,
...@@ -350,8 +349,8 @@ class Batsim(object): ...@@ -350,8 +349,8 @@ class Batsim(object):
""" """
The given job is resubmited but in a dynamic workload. The name of this The given job is resubmited but in a dynamic workload. The name of this
workload is "resubmit=N" where N is the number of resubmission. workload is "resubmit=N" where N is the number of resubmission.
The job metadata is fill with a dict that contains the original job The job metadata is filled with a dict that contains the original job
full id in "parent_job" and the number of resubmission in "nb_resumit". full id in "parent_job" and the number of resubmissions in "nb_resubmit".
""" """
if job.metadata is None: if job.metadata is None:
...@@ -373,7 +372,7 @@ class Batsim(object): ...@@ -373,7 +372,7 @@ class Batsim(object):
new_job_name = new_job_name + Batsim.ATTEMPT_JOB_SEPARATOR + str(metadata["nb_resubmit"]) new_job_name = new_job_name + Batsim.ATTEMPT_JOB_SEPARATOR + str(metadata["nb_resubmit"])
# log in job metadata parent job and nb resubmit # log in job metadata parent job and nb resubmit
self.submit_job( self.register_job(
new_job_name, new_job_name,
job.requested_resources, job.requested_resources,
job.requested_time, job.requested_time,
...@@ -422,16 +421,19 @@ class Batsim(object): ...@@ -422,16 +421,19 @@ class Batsim(object):
storage_resources = event_data["storage_resources"] storage_resources = event_data["storage_resources"]
self.machines = {"compute": compute_resources, "storage": storage_resources} self.machines = {"compute": compute_resources, "storage": storage_resources}
self.batconf = event_data["config"] self.batconf = event_data["config"]
self.time_sharing = event_data["allow_time_sharing"] self.time_sharing_on_compute = event_data["allow_time_sharing_on_compute"]
self.dynamic_job_submission_enabled = self.batconf["job_submission"]["from_scheduler"]["enabled"] self.time_sharing_on_storage = event_data["allow_time_sharing_on_storage"]
self.profiles_forwarded_on_submission = self.batconf["profiles-forwarded-on-submission"]
if self.dynamic_job_submission_enabled: self.dynamic_job_registration_enabled = self.batconf["dynamic-jobs-enabled"]
self.logger.warning("Dynamic submission of jobs is ENABLED. The scheduler must send a NOTIFY event of type 'submission_finished' to let Batsim end the simulation.") self.ack_of_dynamic_jobs = self.batconf["dynamic-jobs-acknowledged"]
self.redis_enabled = self.batconf["redis"]["enabled"] if self.dynamic_job_registration_enabled:
redis_hostname = self.batconf["redis"]["hostname"] self.logger.warning("Dynamic registration of jobs is ENABLED. The scheduler must send a NOTIFY event of type 'registration_finished' to let Batsim end the simulation.")
redis_port = self.batconf["redis"]["port"]
redis_prefix = self.batconf["redis"]["prefix"] self.redis_enabled = self.batconf["redis-enabled"]
redis_hostname = self.batconf["redis-hostname"]
redis_port = self.batconf["redis-port"]
redis_prefix = self.batconf["redis-prefix"]
if self.redis_enabled: if self.redis_enabled:
self.redis = DataStorage(redis_prefix, redis_hostname, self.redis = DataStorage(redis_prefix, redis_hostname,
......
...@@ -615,13 +615,13 @@ class Scheduler(metaclass=ABCMeta): ...@@ -615,13 +615,13 @@ class Scheduler(metaclass=ABCMeta):
""" """
pass pass
def submit_dynamic_job(self, *args, **kwargs): def register_dynamic_job(self, *args, **kwargs):
job = self._dynamic_workload.new_job(*args, **kwargs) job = self._dynamic_workload.new_job(*args, **kwargs)
self._dynamic_workload.prepare() self._dynamic_workload.prepare()
job.submit(self) job.submit(self)
def notify_submission_finished(self): def notify_registration_finished(self):
self._batsim.notify_submission_finished() self._batsim.notify_registration_finished()
def as_scheduler(*args, on_init=[], on_end=[], base_classes=[], **kwargs): def as_scheduler(*args, on_init=[], on_end=[], base_classes=[], **kwargs):
......
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
dynamically and submit them during the runtime of a scheduler. dynamically and submit them during the runtime of a scheduler.
WorkloadDescriptions are used internally to manage dynamic jobs created with WorkloadDescriptions are used internally to manage dynamic jobs created with
`Scheduler.submit_dynamic_job` and with `Job.submit_sub_job`. For each job `Scheduler.register_dynamic_job` and with `Job.submit_sub_job`. For each job
a workload is generated named after the id of the job which is then submitted a workload is generated named after the id of the job which is then submitted
to Batsim. to Batsim.
""" """
...@@ -223,16 +223,16 @@ class JobDescription: ...@@ -223,16 +223,16 @@ class JobDescription:
additional_profiles = {p.name: p.to_dict() additional_profiles = {p.name: p.to_dict()
for p in self._additional_profiles} for p in self._additional_profiles}
scheduler._batsim.submit_job( scheduler._batsim.register_profiles(
self.workload.name,
additional_profiles)
scheduler._batsim.register_job(
str(self.id), str(self.id),
self.res, self.res,
self.walltime, self.walltime,
self.workload.name+'!'+self.profile.name, self.workload.name+'!'+self.profile.name,
self.subtime, self.subtime,
self.profile.to_dict()) self.profile.to_dict())
scheduler._batsim.submit_profiles(
self.workload.name,
additional_profiles)
self._submitted = True self._submitted = True
# Keep track of the workload object in the scheduler to relate job # Keep track of the workload object in the scheduler to relate job
......
...@@ -15,21 +15,9 @@ resources. ...@@ -15,21 +15,9 @@ resources.
The Batsim job profile "msg_hg_tot" or a sequence of that kind of jobs are The Batsim job profile "msg_hg_tot" or a sequence of that kind of jobs are
MANDATORY for this mechanism to work. MANDATORY for this mechanism to work.
Also, the folowing batsim configuration is mandatory: Also, the `--dynamic-jobs-enabled` Batsim CLI option MUST be set, while
```json `--profiles-forwarded-on-submission` and `--dynamic_jobs_acknowledged`
{ must NOT be set.
"job_submission": {
"forward_profiles": false,
"from_scheduler": {
"enabled": false,
"acknowledge": true
}
},
"job_kill": {
"forward_profiles": false
}
}
```
""" """
from batsim.batsim import BatsimScheduler, Job from batsim.batsim import BatsimScheduler, Job
...@@ -331,7 +319,7 @@ class SchedBebida(BatsimScheduler): ...@@ -331,7 +319,7 @@ class SchedBebida(BatsimScheduler):
# Fix the seed to have reproducible DFS behavior # Fix the seed to have reproducible DFS behavior
random.seed(0) random.seed(0)
assert self.bs.batconf["job_submission"]["forward_profiles"] == True, ( assert self.bs.batconf["profiles-forwarded-on-submission"] == True, (
"Forward profile is mandatory for resubmit to work") "Forward profile is mandatory for resubmit to work")
def onJobSubmission(self, job): def onJobSubmission(self, job):
...@@ -383,7 +371,7 @@ class SchedBebida(BatsimScheduler): ...@@ -383,7 +371,7 @@ class SchedBebida(BatsimScheduler):
and self.bs.nb_jobs_in_submission == 0 and self.bs.nb_jobs_in_submission == 0
and len(self.running_jobs()) == 0 and len(self.running_jobs()) == 0
and len(self.in_killing_jobs()) == 0): and len(self.in_killing_jobs()) == 0):
self.bs.notify_submission_finished() self.bs.notify_registration_finished()
def onRemoveResources(self, resources): def onRemoveResources(self, resources):
self.available_resources = self.available_resources - ProcSet.from_str(resources) self.available_resources = self.available_resources - ProcSet.from_str(resources)
...@@ -517,7 +505,7 @@ class SchedBebida(BatsimScheduler): ...@@ -517,7 +505,7 @@ class SchedBebida(BatsimScheduler):
to_submit[curr_task_profile_name] = curr_task_profile to_submit[curr_task_profile_name] = curr_task_profile
# submit the new internal current task profile # submit the new internal current task profile
self.bs.submit_profiles(new_job.workload, to_submit) self.bs.register_profiles(new_job.workload, to_submit)
elif (new_job_seq_size == old_job_seq_size): elif (new_job_seq_size == old_job_seq_size):
# FIXME does it takes into account current task progress? # FIXME does it takes into account current task progress?
...@@ -528,7 +516,7 @@ class SchedBebida(BatsimScheduler): ...@@ -528,7 +516,7 @@ class SchedBebida(BatsimScheduler):
new_job = copy.deepcopy(old_job) new_job = copy.deepcopy(old_job)
new_job.profile = old_job.profile + "#" + str(curr_task) new_job.profile = old_job.profile + "#" + str(curr_task)
new_job.profile_dict["seq"] = old_job.profile_dict["seq"][curr_task:] new_job.profile_dict["seq"] = old_job.profile_dict["seq"][curr_task:]
self.bs.submit_profiles(new_job.workload, self.bs.register_profiles(new_job.workload,
{new_job.profile: new_job.profile_dict}) {new_job.profile: new_job.profile_dict})
# Re-submit the profile # Re-submit the profile
...@@ -585,7 +573,7 @@ class SchedBebida(BatsimScheduler): ...@@ -585,7 +573,7 @@ class SchedBebida(BatsimScheduler):
self.pfs_id) self.pfs_id)
# submit these profiles # submit these profiles
assert len(io_profiles) == len(job.profile_dict["seq"]) assert len(io_profiles) == len(job.profile_dict["seq"])
self.bs.submit_profiles(job.workload, io_profiles) self.bs.register_profiles(job.workload, io_profiles)
# Create io job # Create io job
io_job = { io_job = {
...@@ -657,7 +645,7 @@ class SchedBebida(BatsimScheduler): ...@@ -657,7 +645,7 @@ class SchedBebida(BatsimScheduler):
job_locality, job_locality,
self.storage_map) self.storage_map)
# submit these profiles # submit these profiles
self.bs.submit_profiles(job.workload, io_profiles) self.bs.register_profiles(job.workload, io_profiles)
# Create io job # Create io job
io_job = { io_job = {
......
...@@ -14,13 +14,13 @@ from batsim.sched.algorithms.utils import default_resources_filter ...@@ -14,13 +14,13 @@ from batsim.sched.algorithms.utils import default_resources_filter
class DynamicTestScheduler(Scheduler): class DynamicTestScheduler(Scheduler):
def on_init(self): def on_init(self):
self.submit_dynamic_job( self.register_dynamic_job(
walltime=10, walltime=10,
res=2, res=2,
id=42, id=42,
profile=Profiles.Delay(7)) profile=Profiles.Delay(7))
self.submit_dynamic_job(walltime=10, res=2, profile=Profiles.Delay(7)) self.register_dynamic_job(walltime=10, res=2, profile=Profiles.Delay(7))
self.submit_dynamic_job(walltime=10, res=2, profile=Profiles.Delay(7)) self.register_dynamic_job(walltime=10, res=2, profile=Profiles.Delay(7))
w = WorkloadDescription(name="TestWorkload") w = WorkloadDescription(name="TestWorkload")
w.new_job(subtime=0, walltime=10, res=4, profile=Profiles.Delay(5)) w.new_job(subtime=0, walltime=10, res=4, profile=Profiles.Delay(5))
...@@ -38,7 +38,7 @@ class DynamicTestScheduler(Scheduler): ...@@ -38,7 +38,7 @@ class DynamicTestScheduler(Scheduler):
])])) ])]))
w.submit(self) w.submit(self)
self.notify_submission_finished() self.notify_registration_finished()
def schedule(self): def schedule(self):
return filler_sched(self, return filler_sched(self,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment