Commit d58a0949 authored by MERCIER Michael's avatar MERCIER Michael

[sced] Bebida: Adapt to new protocole version

parent 5b806ab7
...@@ -12,23 +12,21 @@ events. It kills the jobs that are allocated to removed resources. It also ...@@ -12,23 +12,21 @@ events. It kills the jobs that are allocated to removed resources. It also
kill some jobs in the queue in order to re-schedule them on a larger set of kill some jobs in the queue in order to re-schedule them on a larger set of
resources. resources.
The Batsim job profile "msg_hg_tot" or a sequence of that kind of jobs are The Batsim job profile "parallel_homogeneous_total" or a sequence of that kind
MANDATORY for this mechanism to work. of jobs are MANDATORY for this mechanism to work.
Also, the `--dynamic-jobs-enabled` Batsim CLI option MUST be set, while Also, the `--enable-dynamic-jobs` and `--acknowledge-dynamic-jobs` Batsim CLI
`--profiles-forwarded-on-submission` and `--dynamic_jobs_acknowledged` option MUST be set"""
must NOT be set.
"""
from batsim.batsim import BatsimScheduler, Job
from procset import ProcSet, ProcInt
import logging
import copy import copy
import random import logging
import math import math
import random
from itertools import islice from itertools import islice
from batsim.batsim import BatsimScheduler, Job
from procset import ProcInt, ProcSet
def sort_by_id(jobs): def sort_by_id(jobs):
return sorted(jobs, key=lambda j: int(j.id.split('!')[1].split('#')[0])) return sorted(jobs, key=lambda j: int(j.id.split('!')[1].split('#')[0]))
...@@ -53,7 +51,7 @@ def generate_pfs_io_profile(profile_dict, job_alloc, io_alloc, pfs_id): ...@@ -53,7 +51,7 @@ def generate_pfs_io_profile(profile_dict, job_alloc, io_alloc, pfs_id):
comm_matrix.append(bytes_to_write) comm_matrix.append(bytes_to_write)
io_profile = { io_profile = {
"type": "msg_par", "type": "parallel",
"cpu": [0] * len(io_alloc), "cpu": [0] * len(io_alloc),
"com": comm_matrix "com": comm_matrix
} }
...@@ -110,8 +108,8 @@ def generate_dfs_io_profile( ...@@ -110,8 +108,8 @@ def generate_dfs_io_profile(
storage_map, storage_map,
replication_factor=3): replication_factor=3):
""" """
Every element of the remote_block_location_list is a host that detain a block to Every element of the remote_block_location_list is a host that detain a
read. block to read.
""" """
block_size_in_Bytes = block_size_in_MB * 1024 * 1024 block_size_in_Bytes = block_size_in_MB * 1024 * 1024
# Generates blocks read list from block location: Manage the case where # Generates blocks read list from block location: Manage the case where
...@@ -141,9 +139,12 @@ def generate_dfs_io_profile( ...@@ -141,9 +139,12 @@ def generate_dfs_io_profile(
# This is a remote read # This is a remote read
else: else:
row = index_of(io_alloc, row = index_of(
remote_block_location_list[nb_blocks_to_read_remote % io_alloc,
len(remote_block_location_list)]) remote_block_location_list[
nb_blocks_to_read_remote % len(remote_block_location_list)
]
)
nb_blocks_to_read_remote = nb_blocks_to_read_remote - 1 nb_blocks_to_read_remote = nb_blocks_to_read_remote - 1
comm_matrix[(row * len(io_alloc)) + col] += block_size_in_Bytes comm_matrix[(row * len(io_alloc)) + col] += block_size_in_Bytes
...@@ -186,7 +187,7 @@ def generate_dfs_io_profile( ...@@ -186,7 +187,7 @@ def generate_dfs_io_profile(
host_that_write_index = (host_that_write_index + 1) % len(job_alloc) host_that_write_index = (host_that_write_index + 1) % len(job_alloc)
io_profile = { io_profile = {
"type": "msg_par", "type": "parallel",
"cpu": [0] * len(io_alloc), "cpu": [0] * len(io_alloc),
"com": comm_matrix "com": comm_matrix
} }
...@@ -196,8 +197,9 @@ def generate_dfs_io_profile( ...@@ -196,8 +197,9 @@ def generate_dfs_io_profile(
class SchedBebida(BatsimScheduler): class SchedBebida(BatsimScheduler):
def filter_jobs_by_state(self, state): def filter_jobs_by_state(self, state):
return sort_by_id([job for job in self.bs.jobs.values() if return sort_by_id(
job.job_state == state]) [job for job in self.bs.jobs.values() if job.job_state == state]
)
def running_jobs(self): def running_jobs(self):
return self.filter_jobs_by_state(Job.State.RUNNING) return self.filter_jobs_by_state(Job.State.RUNNING)
...@@ -217,7 +219,7 @@ class SchedBebida(BatsimScheduler): ...@@ -217,7 +219,7 @@ class SchedBebida(BatsimScheduler):
return None if no resources at all are available. return None if no resources at all are available.
""" """
self.logger.info("Try to allocate Job: {}".format(job.id)) self.logger.info("Try to allocate Job: {}".format(job.id))
assert job.allocation is None , ( assert job.allocation is None, (
"Job allocation should be None and not {}".format(job.allocation)) "Job allocation should be None and not {}".format(job.allocation))
nb_found_resources = 0 nb_found_resources = 0
...@@ -325,7 +327,7 @@ class SchedBebida(BatsimScheduler): ...@@ -325,7 +327,7 @@ class SchedBebida(BatsimScheduler):
def onJobSubmission(self, job): def onJobSubmission(self, job):
profile_dict = self.bs.profiles[job.workload][job.profile] profile_dict = self.bs.profiles[job.workload][job.profile]
assert "type" in profile_dict, "Forward profile is mandatory" assert "type" in profile_dict, "Forward profile is mandatory"
assert (profile_dict["type"] == "msg_par_hg_tot" or assert (profile_dict["type"] == "parallel_homogeneous_total" or
profile_dict["type"] == "composed") profile_dict["type"] == "composed")
def onJobCompletion(self, job): def onJobCompletion(self, job):
...@@ -489,12 +491,8 @@ class SchedBebida(BatsimScheduler): ...@@ -489,12 +491,8 @@ class SchedBebida(BatsimScheduler):
# only recreate a profile if it has started # only recreate a profile if it has started
if curr_task_progress != 0: if curr_task_progress != 0:
# Now let's modify the current internal profile to reflect progress # Now let's modify the current internal profile to reflect progress
assert "profile" in progress["current_task"], ('The profile' curr_task_profile = profile[old_job.profile]["seq"][progress["current_task"]
' is not forwarded in the job progress: set' assert curr_task_profile["type"] == "parallel_homogeneous_total", "Only parallel_homegeneous_total profile are supported right now"
' {"job_kill": {"forward_profiles": true}} in the '
'batsim config')
curr_task_profile = progress["current_task"]["profile"]
assert curr_task_profile["type"] == "msg_par_hg_tot", "Only msg_par_hg_tot profile are supported right now"
for key, value in curr_task_profile.items(): for key, value in curr_task_profile.items():
if isinstance(value, (int, float)): if isinstance(value, (int, float)):
curr_task_profile[key] = value * (1 - curr_task_progress) curr_task_profile[key] = value * (1 - curr_task_progress)
...@@ -686,4 +684,3 @@ class SchedBebida(BatsimScheduler): ...@@ -686,4 +684,3 @@ class SchedBebida(BatsimScheduler):
self.logger.info("Finished scheduling jobs, nb jobs scheduled: {}".format( self.logger.info("Finished scheduling jobs, nb jobs scheduled: {}".format(
len(to_execute))) len(to_execute)))
self.logger.debug("jobs to be executed: \n{}".format(to_execute)) self.logger.debug("jobs to be executed: \n{}".format(to_execute))
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment