Commit 887950d1 authored by MERCIER Michael's avatar MERCIER Michael

Add Resource Add/Remove message + kill Message fix + debug info

parent 658774f1
......@@ -7,6 +7,7 @@ import sys
from .network import NetworkHandler
from procset import ProcSet
import redis
import zmq
......@@ -20,7 +21,7 @@ class Batsim(object):
network_handler=None,
event_handler=None,
validatingmachine=None,
handle_dynamic_notify=True):
handle_dynamic_notify=False):
self.running_simulation = False
if network_handler is None:
network_handler = NetworkHandler('tcp://*:28000')
......@@ -116,7 +117,7 @@ class Batsim(object):
def start_jobs_continuous(self, allocs):
"""
allocs should have the followinf format:
allocs should have the following format:
[ (job, (first res, last res)), (job, (first res, last res)), ...]
"""
......@@ -143,8 +144,7 @@ class Batsim(object):
"type": "EXECUTE_JOB",
"data": {
"job_id": job.id,
# FixMe do not send "[9]"
"alloc": " ".join(map(str, res[job.id]))
"alloc": str(ProcSet(*res[job.id]))
}
}
)
......@@ -354,9 +354,15 @@ class Batsim(object):
self.scheduler.onJobSubmission(self.jobs[job_id])
self.nb_jobs_received += 1
elif event_type == "JOB_KILLED":
self.scheduler.onJobsKilled(
[self.jobs[jid] for jid in event_data["job_ids"]])
self.nb_jobs_killed += len(event_data["job_ids"])
# get progress
killed_jobs = []
for jid in event_data["job_ids"]:
j = self.jobs[jid]
j.progress = event_data["progress"][jid]
killed_jobs.append(j)
self.scheduler.onJobsKilled(killed_jobs)
self.nb_jobs_killed += len(killed_jobs)
elif event_type == "JOB_COMPLETED":
job_id = event_data["job_id"]
j = self.jobs[job_id]
......@@ -401,6 +407,10 @@ class Batsim(object):
elif event_type == 'REQUESTED_CALL':
self.scheduler.onNOP()
# TODO: separate NOP / REQUESTED_CALL (here and in the algos)
elif event_type == 'ADD_RESOURCES':
self.scheduler.onAddResources(event_data["resources"])
elif event_type == 'REMOVE_RESOURCES':
self.scheduler.onRemoveResources(event_data["resources"])
else:
raise Exception("Unknow event type {}".format(event_type))
......@@ -425,6 +435,8 @@ class Batsim(object):
"events": self._events_to_send
}
self.network.send(new_msg)
print("Sent Message: ", new_msg)
if finished_received:
self.network.close()
......@@ -501,6 +513,7 @@ class Job(object):
self.job_state = Job.State.UNKNOWN
self.kill_reason = None
self.return_code = None
self.progress = None
self.json_dict = json_dict
self.profile_dict = profile_dict
......@@ -570,3 +583,9 @@ class BatsimScheduler(object):
def onReportEnergyConsumed(self, consumed_energy):
raise NotImplementedError()
def onAddResources(self, to_add):
raise NotImplementedError()
def onRemoveResources(self, to_remove):
raise NotImplementedError()
......@@ -132,6 +132,7 @@ class Profiles(metaclass=ABCMeta):
cls.Delay,
cls.Parallel,
cls.ParallelHomogeneous,
cls.ParallelHomogeneousTotal,
cls.Smpi,
cls.Sequence,
cls.ParallelPFS,
......@@ -192,8 +193,7 @@ class Profiles(metaclass=ABCMeta):
@classmethod
def from_dict(cls, dct, name=None):
return cls(nbres=dct["nb_res"],
cpu=dct["cpu"],
return cls(cpu=dct["cpu"],
com=dct["com"],
ret=dct.get("ret", 0),
name=name)
......@@ -201,7 +201,6 @@ class Profiles(metaclass=ABCMeta):
def to_dict(self, embed_references=False):
return {
"type": self.type,
"nb_res": self.nbres,
"cpu": self.cpu,
"com": self.com,
"ret": self.ret,
......@@ -232,6 +231,12 @@ class Profiles(metaclass=ABCMeta):
"ret": self.ret,
}
class ParallelHomogeneousTotal(ParallelHomogeneous):
"""Implementation of the MsgParallelHomogeneousTotal profile."""
type = "msg_par_hg_tot"
class Smpi(Profile):
"""Implementation of the Smpi profile."""
......
......@@ -413,16 +413,16 @@ class Resources(ObserveList):
"""The list of all special resources (managed by the scheduler logic)."""
return self.filter(special=True)
def __getitem__(self, items):
def __getitem__(self, item):
"""Returns either a slice of resources or returns a resource based on a given resource id."""
if isinstance(items, slice):
return self.create(self.all[items])
if isinstance(item, slice):
return self.create(self.all[item])
else:
return self._resource_map[items]
return self._resource_map[item]
def __delitem__(self, index):
def __delitem__(self, item):
"""Deletes a resource with the given resource id."""
resource = self._resource_map[items]
resource = self._resource_map[item]
self.remove(resource)
def _element_new(self, resource):
......
......@@ -195,6 +195,21 @@ class BaseBatsimScheduler(BatsimScheduler):
self._scheduler.on_report_energy_consumed(reply)
self._scheduler._do_schedule(reply)
def onAddResources(self, resources):
self._scheduler._update_time()
self._scheduler.info(
"Received add Resources message: {resources}",
resources=resources,
type="add_resources_received")
self._scheduler.on_add_resources(resources)
def onRemoveResources(self, resources):
self._scheduler.info(
"Received remove Resources message: {resources}",
resources=resources,
type="remove_resources_received")
self._scheduler.on_remove_resources(resources)
class Scheduler(metaclass=ABCMeta):
"""The high-level scheduler which should be interited from by concrete scheduler
......@@ -589,6 +604,20 @@ class Scheduler(metaclass=ABCMeta):
"""
pass
def on_add_resources(self, resources):
"""Hook similar to the low-level API.
:param resources: a procset of resources
"""
pass
def on_remove_resources(self, resources):
"""Hook similar to the low-level API.
:param resources: a procset of resources
"""
pass
def on_event(self, event):
"""Hook called on each event triggered by the scheduler.
......
......@@ -140,6 +140,9 @@ class ObserveList:
def __str__(self):
return str([str(entry) for entry in self._data])
def __rep__(self):
return __str__(self)
def apply(self, apply):
"""Apply a function to modify the list (e.g. sorting the list).
......@@ -329,6 +332,9 @@ class ContainerView:
def __str__(self, *args, **kwargs):
return self._data.__str__(*args, **kwargs)
def __rep__(self, *args, **kwargs):
return self._data.__rep__(*args, **kwargs)
class ListView(ContainerView):
"""A view for dictionaries."""
......
......@@ -11,9 +11,11 @@ buildPythonPackage rec {
redis
pandas
docopt
# for testing
# for testing and debug
coverage
pytest
ipython
ipdb
# for doc generation
sphinx
] ++ [ batsim ];
......
"""
schedBebida
~~~~~~~~~
This scheduler is the implementation of the BigData scheduler for the
Bebida on batsim project.
It is a Simple fcfs algoritihm.
It take into account preemption by respounding to Add/Remove resource
events.
"""
from batsim.sched import Scheduler
from batsim.sched.algorithms.filling import filler_sched
from batsim.sched.algorithms.utils import default_resources_filter
from procset import ProcSet
import itertools
def to_set(objects):
return [o.id for o in objects]
class SchedBebida(Scheduler):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.to_be_removed_resources = []
def on_remove_resources(self, resources):
# find the list of jobs that are impacted
# and kill all those jobs
#import ipdb; ipdb.set_trace()
for job in self.jobs.running:
if ProcSet(*to_set(job.allocation)) & ProcSet.from_str(resources):
job.kill()
self.to_be_removed_resources.append(resources)
def on_add_resources(self, resources):
# add the resources
for resource in ProcSet(*to_set(resources)):
bat_res = { id: resource }
self._batsim.resources.append(bat_res)
# Initialize new API data structure
self._on_pre_init()
# find the list of jobs that need more resources
# kill jobs, so tey will be resubmited taking free resources, until
# tere is no more resources
free_resource_nb = len(self.resources.free)
for job in self.jobs.running:
wanted_resource_nb = job.requested_resources - len(job.allocation.resources)
if wanted_resource_nb > 0:
job.kill()
free_resource_nb = free_resource_nb - wanted_resource_nb
if free_resource_nb <= 0:
break
def on_jobs_killed(self, jobs):
# Do remove resources that was decommisionned
for resouce in self.to_be_removed_resources:
del self.resources[resource]
self.to_be_removed_resources = []
# resubmit the job
# TODO get killed jobs progress and resubmit what's left of the jobs
# for job in jobs:
# job.get_job_data("progress")
def schedule(self):
return filler_sched(
self,
resources_filter=default_resources_filter,
abort_on_first_nonfitting=True)
......@@ -12,12 +12,14 @@ requirements = [
"pyzmq",
"redis",
"pandas",
"docopt==0.6.2"
"docopt==0.6.2",
"procset",
]
setup_requirements = [
"coverage",
"autopep8"
"autopep8",
"ipdb"
]
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment