Commit 91306f57 authored by MERCIER Michael's avatar MERCIER Michael

Make locality works like intended and do 3 writes instead of one

parent 97a2258c
......@@ -114,13 +114,14 @@ def generate_dfs_io_profile(
nb_blocks_to_read = int(math.ceil(profile_dict["io_reads"] / block_size_in_Bytes))
nb_blocks_to_read_local = math.ceil(nb_blocks_to_read * locality / 100)
nb_wanted_local_read = nb_blocks_to_read_local
nb_blocks_to_read_remote = nb_blocks_to_read - nb_blocks_to_read_local
comm_matrix = [0] * len(io_alloc) * len(io_alloc)
# Fill in reads in the matrix
host_that_read_index = 0
for _ in range(nb_blocks_to_read):
while nb_blocks_to_read_local > 0:
col = host_that_read_index
host_id = nth(job_alloc, host_that_read_index)
......@@ -131,23 +132,35 @@ def generate_dfs_io_profile(
):
row = index_of(io_alloc, storage_map[host_id])
comm_matrix[(row * len(io_alloc)) + col] += block_size_in_Bytes
nb_blocks_to_read_local = nb_blocks_to_read_local - 1
# This is a remote read
else:
row = index_of(
io_alloc,
remote_block_location_list[
nb_blocks_to_read_remote % len(remote_block_location_list)
],
)
nb_blocks_to_read_remote = nb_blocks_to_read_remote - 1
# Round robin trough the hosts
host_that_read_index = (host_that_read_index + 1) % len(job_alloc)
# This is a remote read
while nb_blocks_to_read_remote > 0:
row = index_of(
io_alloc,
remote_block_location_list[
nb_blocks_to_read_remote % len(remote_block_location_list)
],
)
nb_blocks_to_read_remote = nb_blocks_to_read_remote - 1
comm_matrix[(row * len(io_alloc)) + col] += block_size_in_Bytes
# Round robin trough the hosts
host_that_read_index = (host_that_read_index + 1) % len(job_alloc)
assert nb_blocks_to_read_remote == nb_blocks_to_read_local == 0
if nb_blocks_to_read == 0:
real_locality = None
else:
#import ipdb; ipdb.set_trace()
real_locality = (nb_wanted_local_read - nb_blocks_to_read_local) / nb_blocks_to_read
# Generates writes block list
nb_blocks_to_write = int((profile_dict["io_writes"] / block_size_in_Bytes) + 1)
......@@ -160,7 +173,7 @@ def generate_dfs_io_profile(
col = index_of(io_alloc, storage_map[host_id])
row = host_that_write_index
# fill the matrix
# fill the matrix with one local read
comm_matrix[(row * len(io_alloc)) + col] += block_size_in_Bytes
# manage the replication
......@@ -171,13 +184,15 @@ def generate_dfs_io_profile(
# not on the whole cluster
# NOTE: We can also manage write location here (under HPC node or
# not)
row = random.choice(list(io_alloc))
row = index_of(io_alloc, random.choice(list(io_alloc)))
#import ipdb; ipdb.set_trace()
comm_matrix[(row * len(io_alloc)) + col] += block_size_in_Bytes
# Round robin trough the hosts
host_that_write_index = (host_that_write_index + 1) % len(job_alloc)
io_profile = {"type": "parallel", "cpu": [0] * len(io_alloc), "com": comm_matrix}
return io_profile
return io_profile, real_locality
class SchedBebida(BatsimScheduler):
......@@ -246,12 +261,7 @@ class SchedBebida(BatsimScheduler):
super().__init__(*args, **kwargs)
self.to_be_removed_resources = {}
self.load_balanced_jobs = set()
# TODO use CLI options
# self.variant = "pfs"
if "variant" not in self.options:
self.variant = "no-io"
else:
self.variant = self.options["variant"]
self.variant = self.options["variant"]
self.notify_already_send = False
......@@ -689,7 +699,7 @@ class SchedBebida(BatsimScheduler):
job.id, list(all_profiles.keys()) + list(io_profiles.keys())
)
self.logger.debug("Creating new profile: " + io_profile_name)
io_profiles[io_profile_name] = generate_dfs_io_profile(
io_profiles[io_profile_name], real_locality = generate_dfs_io_profile(
profile,
job.allocation,
io_alloc_read,
......@@ -699,6 +709,8 @@ class SchedBebida(BatsimScheduler):
job_locality,
self.storage_map,
)
self.logger.info("Real locality of profile " +
io_profile_name + " is " + str(real_locality))
# submit these profiles
self.bs.register_profiles(job.workload, io_profiles)
......@@ -711,26 +723,12 @@ class SchedBebida(BatsimScheduler):
"profile": {
"type": "composed",
"seq": list(io_profiles.keys()),
"locality": real_locality,
},
}
else:
io_job = {
"alloc": str(alloc),
"profile_name": new_io_profile_name(job.id, all_profiles),
"profile": generate_dfs_io_profile(
job.profile_dict,
job.allocation,
io_alloc_read,
io_alloc,
nb_blocks_to_read_local,
remote_block_location_list,
self.block_size_in_MB,
job_locality,
self.bs.resources,
self.bs.storage_resources,
),
}
raise Exception("Only composed jobs are supported")
io_jobs[job.id] = io_job
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment