Mentions légales du service

Skip to content
Snippets Groups Projects
Commit 375baac6 authored by DA SILVA Anderson Andrei 's avatar DA SILVA Anderson Andrei
Browse files

Changed the rules on get_free_bw...() method, to force the storage controller...

Changed the rules on get_free_bw...() method, to force the storage controller to select another source than the CEPH. It got a SIMGRID error.
parent 40d6d175
No related tags found
No related merge requests found
......@@ -210,8 +210,10 @@ class StorageController:
self._bs = bs # Pybatsim
self._qn = qn # The QNode Scheduler
self._logger = bs.logger
self._total_transferred_from_CEPH = 0 # In Bytes
self._total_transferred = 0 # In Bytes
self._nb_transfers_zero = 0 # The number of times a dataset was already present on disk
self._nb_transfers_real = 0 # The number of "real" transfers
......@@ -250,6 +252,7 @@ class StorageController:
print('!!!!!!!!!!!!!!!!!!!!!!!! Bandwidth', self.bandwidth)
print('!!!!!!!!!!!!!!!!!!!!!!!! Storages', self._storages)
print('!!!!!!!!!!!!!!!!!!!!!!!! ceph ID', self._ceph_id)
def get_storage(self, storage_id):
""" Returns the Storage corresponding to given storage_id if it exists or returns None. """
......@@ -263,7 +266,6 @@ class StorageController:
""" Add storage to storages list """
self._storages[storage._id] = storage
def add_dataset(self, storage_id, dataset, timestamp=0):
""" Add to the given storage the dataset
......@@ -309,8 +311,9 @@ class StorageController:
else:
return storage.has_dataset(dataset_id)
'''
def get_free_bandwidth_between_storages(self, source_id, dest_id):
'''
"""
Gets a rough estimate of the free bandwidth (in bits/sec) to move from one storage to other.
Uses staged jobs to find which of the network links are congested.
If source_id is not CEPH, then dest_id is replaced by CEPH
......@@ -319,9 +322,7 @@ class StorageController:
:param source_id: Source of the transfer
:param dest_id: Destination of the transfer
:return: Cost of free bandwidth assuming equal share
'''
"""
print("!!!!!!!! get_free_bandwidth_between_storages")
......@@ -339,6 +340,8 @@ class StorageController:
if dest_id in self.bandwidth:
speed = self.bandwidth[dest_id]
print("!!!!!!!! get_free_bandwidth_between_storages - ID" + str(id))
transfers = 0
for val in self.staging_map:
if val[0] == id:
......@@ -348,7 +351,40 @@ class StorageController:
transfers = transfers + 1 # Assuming the current one starts
return float(speed)/float(transfers)
'''
def get_free_bandwidth_between_storages(self, source_id, dest_id):
print('========================================')
print('source: ' + str(source_id))
if (source_id != 1704):
print('in: ' + str(self.mappingQBoxes[source_id].site))
if(self.mappingQBoxes[source_id].site == 'paris'):
return 1
if(self.mappingQBoxes[source_id].site == 'bordeaux'):
return 2
print('source: ' + str(dest_id))
print('in: ' + str(self.mappingQBoxes[dest_id].site))
print('========================================')
'''
place_a = source_id.site
wan_lat_a = source_id.wan_lat
#frac_a = place_a / wan_lat_a
place_b = dest_id.site
wan_lat_b = dest_id.wan_lat
#frac_b = place_b / wan_lat_b
'''
print('---------------------hey')
if(source_id == 1704):
print('CEPH')
return 3
return self.bandwidth[source_id]
def copy_from_CEPH_to_dest(self, dataset_ids, dest_id):
""" Method used to move datasets from the CEPH to the disk of a QBox
......@@ -392,9 +428,16 @@ class StorageController:
If we can't move the Dataset, then no job for it is scheduled
"""
self._logger.debug("StorageController : Request for dataset {} to transfer from {} to {}"\
print("!!!!!! Copy ----------------------------------")
print("StorageController : Request for dataset {} to transfer from {} to {}"\
.format(dataset_id, source_id, dest_id))
#self._logger.debug
self._logger.info("StorageController : Request for dataset {} to transfer from {} to {}"\
.format(dataset_id, source_id, dest_id))
source = self.get_storage(source_id)
dest = self.get_storage(dest_id)
dataset = source.get_dataset(dataset_id)
......@@ -463,6 +506,7 @@ class StorageController:
self._next_staging_job_id += 1
jid1 = "dyn-staging!" + str(self._next_staging_job_id)
self._bs.register_job(id=jid1, res=2, walltime=-1, profile_name=profile_name)
self._logger.info("Heeeeeeeeeeeeeeeey")
# Job Execution
job1 = Job(jid1, 0, -1, 1, "", "")
......@@ -470,13 +514,19 @@ class StorageController:
job1.storage_mapping = {}
job1.storage_mapping[source._name] = source_id
job1.storage_mapping[dest._name] = dest_id
self._logger.info("Hoooooooooooooooooooooow")
self._bs.execute_jobs([job1])
self.moveRequested[jid1] = dataset.get_id()
self._logger.info("[ {} ] Storage Controller staging job for dataset {} from {} to {} started".format(self._bs.time(), dataset_id, source_id, dest_id))
self._nb_transfers_real += 1
self._total_transferred += dataset.get_size()
if(source_id == self._ceph_id):
self._total_transferred_from_CEPH += dataset.get_size()
else:
self._total_transferred += dataset.get_size()
self.staging_map.add((dest_id, dataset_id))
......@@ -585,7 +635,6 @@ class StorageController:
return qboxes_list
''' This function should be called during init of the QBox Scheduler '''
def onQBoxRegistration(self, qbox_name, qbox):
qbox_disk_name = qbox_name + "_disk"
......@@ -635,14 +684,16 @@ class StorageController:
storage.releaseHardLinks(qtask_id)
def onQBoxAskDataset(self, storage_id, dataset_id):
'''
"""
This function is called from a QBox scheduler and asks for a dataset to be on disk.
Only returns true if the dataset is already present in the Qbox.
If the data staging of that dataset on this qbox disk was already asked, returns False but doesnt start
another data staging job.
'''
"""
print('!!!!!!! required by:' + str(storage_id) +' ' + str(dataset_id))
if((storage_id, dataset_id) in self.staging_map):
self._nb_transfers_zero+=1
return False
......@@ -653,13 +704,16 @@ class StorageController:
faster = self._ceph_id
faster_bw = self.get_free_bandwidth_between_storages(self._ceph_id, storage_id)
for storage in self.get_storages().values():
if(storage.has_dataset(dataset_id)):
print('!!!!!! storages: ' + str(self.get_storages()))
for storage in self.get_storages():
print('!!!!!! storage: ' + str(storage), storage_id)
if(self.has_dataset(storage, dataset_id)):
storage_bw = self.get_free_bandwidth_between_storages(storage, storage_id)
print('----------------------------')
print(str(storage) + 'tem ' + ' dataset_id ' + str(dataset_id) + '----')
print(str(storage) + ' has ' + ' dataset_id ' + str(dataset_id) + '----')
print('faster: ' + str(faster) + ' bw: ' + str(faster_bw))
print('compared: ' + str(storage) + 'bw: ' + str(storage_bw))
print('compared: ' + str(storage) + ' bw: ' + str(storage_bw))
print('----------------------------')
if(storage_bw < faster_bw):
......@@ -674,13 +728,13 @@ class StorageController:
'''
def onQBoxAskDataset(self, storage_id, dataset_id):
"""
This function is called from a QBox scheduler and asks for a dataset to be on disk.
Only returns true if the dataset is already present in the Qbox.
If the data staging of that dataset on this qbox disk was already asked, returns False but doesnt start
another data staging job.
"""
if((storage_id, dataset_id) in self.staging_map):
self._nb_transfers_zero+=1
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment