Commit a3532bd5 authored by MERCIER Michael's avatar MERCIER Michael
Browse files

add locality information

parent 9a84fed0
......@@ -19,7 +19,6 @@ Also, the `--enable-dynamic-jobs` and `--acknowledge-dynamic-jobs` Batsim CLI
option MUST be set"""
import copy
import logging
import math
import random
from itertools import islice
......@@ -148,13 +147,14 @@ def generate_dfs_io_profile(
# Round robin trough the hosts
host_that_read_index = (host_that_read_index + 1) % len(job_alloc)
assert nb_blocks_to_read_remote == nb_blocks_to_read_local == 0
if nb_blocks_to_read == 0:
real_locality = None
else:
#import ipdb; ipdb.set_trace()
real_locality = (nb_wanted_local_read - nb_blocks_to_read_local) / nb_blocks_to_read
# import ipdb; ipdb.set_trace()
real_locality = (
nb_wanted_local_read - nb_blocks_to_read_local
) / nb_blocks_to_read
# Generates writes block list
nb_blocks_to_write = int((profile_dict["io_writes"] / block_size_in_Bytes) + 1)
......@@ -180,14 +180,18 @@ def generate_dfs_io_profile(
# NOTE: We can also manage write location here (under HPC node or
# not)
row = index_of(io_alloc, random.choice(list(io_alloc)))
#import ipdb; ipdb.set_trace()
# import ipdb; ipdb.set_trace()
comm_matrix[(row * len(io_alloc)) + col] += block_size_in_Bytes
# Round robin trough the hosts
host_that_write_index = (host_that_write_index + 1) % len(job_alloc)
io_profile = {"type": "parallel", "cpu": [0] * len(io_alloc), "com":
comm_matrix, "locality": real_locality}
io_profile = {
"type": "parallel",
"cpu": [0] * len(io_alloc),
"com": comm_matrix,
"locality": real_locality,
}
return io_profile, real_locality
......@@ -217,7 +221,9 @@ class SchedBebida(BatsimScheduler):
self.logger.info("Try to allocate Job: {}".format(job.id))
assert (
job.allocation is None
), "Job allocation should be None and not {}".format(job.allocation)
), "Job allocation should be None and not {}".format(
job.allocation
)
nb_found_resources = 0
allocation = ProcSet()
......@@ -301,11 +307,13 @@ class SchedBebida(BatsimScheduler):
# Level of locality
self.node_locality_in_percent = self.options.get(
"node_locality_in_percent", 70)
"node_locality_in_percent", 70
)
# Level of locality variation (uniform)
self.node_locality_variation_in_percent = self.options.get(
"node_locality_variation_in_percent", 10)
"node_locality_variation_in_percent", 10
)
self.logger.info(
"Node locality is set to: {}% +- {}%".format(
......@@ -512,9 +520,10 @@ class SchedBebida(BatsimScheduler):
if curr_task_progress != 0:
# Now let's modify the current internal profile to reflect progress
curr_task_profile = copy.deepcopy(
self.bs.profiles[old_job.workload][
progress["current_task"]["profile_name"]
])
self.bs.profiles[old_job.workload][
progress["current_task"]["profile_name"]
]
)
assert (
curr_task_profile["type"]
== "parallel_homogeneous_total"
......@@ -535,7 +544,10 @@ class SchedBebida(BatsimScheduler):
)
new_job.profile_dict["seq"][0] = curr_task_profile_name
if curr_task_profile_name not in self.bs.profiles[new_job.workload]:
if (
curr_task_profile_name
not in self.bs.profiles[new_job.workload]
):
to_submit[curr_task_profile_name] = curr_task_profile
# submit the new internal current task profile
......@@ -667,8 +679,9 @@ class SchedBebida(BatsimScheduler):
local_disks = ProcSet(
*[self.storage_map[disk] for disk in job.allocation]
)
remote_disks = ProcSet(*list(self.bs.storage_resources.keys()
)) - local_disks
remote_disks = (
ProcSet(*list(self.bs.storage_resources.keys())) - local_disks
)
remote_block_location_list = [
random.choice(list(local_disks))
for _ in range(nb_blocks_to_read_remote)
......@@ -693,7 +706,9 @@ class SchedBebida(BatsimScheduler):
job.id, list(all_profiles.keys()) + list(io_profiles.keys())
)
self.logger.debug("Creating new profile: " + io_profile_name)
io_profiles[io_profile_name], real_locality = generate_dfs_io_profile(
io_profiles[
io_profile_name
], real_locality = generate_dfs_io_profile(
profile,
job.allocation,
io_alloc,
......@@ -702,8 +717,12 @@ class SchedBebida(BatsimScheduler):
job_locality,
self.storage_map,
)
self.logger.info("Real locality of profile " +
io_profile_name + " is " + str(real_locality))
self.logger.info(
"Real locality of profile "
+ io_profile_name
+ " is "
+ str(real_locality)
)
seq_locality[io_profile_name] = real_locality
# submit these profiles
self.bs.register_profiles(job.workload, io_profiles)
......@@ -721,15 +740,16 @@ class SchedBebida(BatsimScheduler):
},
}
if job.metadata is None:
metadata = {"locality" : seq_locality}
metadata = {"locality": seq_locality}
else:
metadata = copy.deepcopy(job.metadata)
if "locality" not in metadata:
metadata["locality"] = seq_locality
else:
metadata["locality"] = {
**(metadata["locality"]),
**seq_locality}
**(metadata["locality"]),
**seq_locality,
}
self.bs.set_job_metadata(job.id, metadata)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment