Commit 03f5d1ae authored by MERCIER Michael's avatar MERCIER Michael

[sched] bebida: put locality in the metadata

parent fa0e85cb
...@@ -186,7 +186,8 @@ def generate_dfs_io_profile( ...@@ -186,7 +186,8 @@ def generate_dfs_io_profile(
# Round robin trough the hosts # Round robin trough the hosts
host_that_write_index = (host_that_write_index + 1) % len(job_alloc) host_that_write_index = (host_that_write_index + 1) % len(job_alloc)
io_profile = {"type": "parallel", "cpu": [0] * len(io_alloc), "com": comm_matrix} io_profile = {"type": "parallel", "cpu": [0] * len(io_alloc), "com":
comm_matrix, "locality": real_locality}
return io_profile, real_locality return io_profile, real_locality
...@@ -671,8 +672,13 @@ class SchedBebida(BatsimScheduler): ...@@ -671,8 +672,13 @@ class SchedBebida(BatsimScheduler):
== nb_blocks_to_read == nb_blocks_to_read
) )
local_disks = ProcSet(
*[self.storage_map[disk] for disk in job.allocation]
)
remote_disks = ProcSet(*list(self.bs.storage_resources.keys()
)) - local_disks
remote_block_location_list = [ remote_block_location_list = [
random.choice(list(self.bs.storage_resources.keys())) random.choice(list(local_disks))
for _ in range(nb_blocks_to_read_remote) for _ in range(nb_blocks_to_read_remote)
] ]
...@@ -687,6 +693,7 @@ class SchedBebida(BatsimScheduler): ...@@ -687,6 +693,7 @@ class SchedBebida(BatsimScheduler):
if job.profile_dict["type"] == "composed": if job.profile_dict["type"] == "composed":
# TODO split IO quantity between stages # TODO split IO quantity between stages
io_profiles = {} io_profiles = {}
seq_locality = {}
# Generate profile sequence # Generate profile sequence
for profile_name in job.profile_dict["seq"]: for profile_name in job.profile_dict["seq"]:
profile = self.bs.profiles[job.workload][profile_name] profile = self.bs.profiles[job.workload][profile_name]
...@@ -705,6 +712,7 @@ class SchedBebida(BatsimScheduler): ...@@ -705,6 +712,7 @@ class SchedBebida(BatsimScheduler):
) )
self.logger.info("Real locality of profile " + self.logger.info("Real locality of profile " +
io_profile_name + " is " + str(real_locality)) io_profile_name + " is " + str(real_locality))
seq_locality[io_profile_name] = real_locality
# submit these profiles # submit these profiles
self.bs.register_profiles(job.workload, io_profiles) self.bs.register_profiles(job.workload, io_profiles)
...@@ -717,9 +725,21 @@ class SchedBebida(BatsimScheduler): ...@@ -717,9 +725,21 @@ class SchedBebida(BatsimScheduler):
"profile": { "profile": {
"type": "composed", "type": "composed",
"seq": list(io_profiles.keys()), "seq": list(io_profiles.keys()),
"locality": real_locality, "locality": seq_locality,
}, },
} }
if job.metadata is None:
metadata = {"locality" : seq_locality}
else:
metadata = deepcopy(job.metadata)
if "locality" not in metadata:
metadata["locality"] = seq_locality
else:
metadata["locality"] = {
**(metadata["locality"]),
**seq_locality}
self.bs.set_job_metadata(job.id, metadata)
else: else:
raise Exception("Only composed jobs are supported") raise Exception("Only composed jobs are supported")
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment