diff --git a/batsim/batsim.py b/batsim/batsim.py index 41e7d44170531506e642df50f358acefa962429a..cc7edf5e2037414ea4d793befb64a41d067c5847 100644 --- a/batsim/batsim.py +++ b/batsim/batsim.py @@ -136,11 +136,11 @@ class Batsim(object): self.nb_jobs_scheduled += 1 def execute_jobs(self, jobs, io_jobs=None): - """ args:jobs: list of jobs to execute (job.allocation MUST be set) """ + """ args:jobs: list of jobs to execute + job.allocation MUST be not None and should be a non-empty ProcSet""" for job in jobs: assert job.allocation is not None - message = { "timestamp": self.time(), "type": "EXECUTE_JOB", diff --git a/batsim/sched/events.py b/batsim/sched/events.py index bbffa2d136bbbd00184b1700fcfc0c52e811db5b..9c3c4aa9358a88bbe6bf1f1c7c723431a56328b2 100644 --- a/batsim/sched/events.py +++ b/batsim/sched/events.py @@ -67,11 +67,11 @@ class LoggingEvent: return output.getvalue().strip() def to_csv_line(self): - def conv_obj(o): + '''def conv_obj(o): try: return o.__dict__ except (AttributeError, ValueError): - return str(o) + return str(o)''' data = {} for k, v in self.data.items(): @@ -87,12 +87,12 @@ class LoggingEvent: new_v.append(e) v = new_v data[k] = v - try: + '''try: data = json.dumps(data, default=lambda o: conv_obj(o)) except Exception as e: raise ValueError( "Error while dumping json data: {}" - .format(data)) + .format(data))''' output = io.StringIO() csvdata = [self.time, self.level, self.processed_jobs, self.open_jobs, diff --git a/batsim/sched/scheduler.py b/batsim/sched/scheduler.py index 2b1e31209f015943989d1dad530a28bd0a5c6332..25b0d747a9a8d476f842560d08ff7c47c2146fd0 100644 --- a/batsim/sched/scheduler.py +++ b/batsim/sched/scheduler.py @@ -113,7 +113,7 @@ class BaseBatsimScheduler(BatsimScheduler): except KeyError: pass if workload: - job_description = workload[job_id] + job_description = workload[job.id] job_description.job = newjob newjob._workload_description = workload @@ -620,6 +620,9 @@ class Scheduler(metaclass=ABCMeta): self._dynamic_workload.prepare() job.submit(self) + def notify_submission_finished(self): + self._batsim.notify_submission_finished() + def as_scheduler(*args, on_init=[], on_end=[], base_classes=[], **kwargs): """Decorator to convert a function to a scheduler class. diff --git a/batsim/sched/workloads/workloads.py b/batsim/sched/workloads/workloads.py index c1aa229d268fb2ae56aa0632a1287587b12e0966..92e36be5be72af4508cd2bdbe6c0157798b06531 100644 --- a/batsim/sched/workloads/workloads.py +++ b/batsim/sched/workloads/workloads.py @@ -102,13 +102,13 @@ class JobDescription: assert not self.submitted, "Job was already submitted and can not be modified" has_workload = bool(self.workload) - if has_workload: - self.workload.remove_job(self) + #if has_workload: + # self.workload.remove_job(self) self._id = id - if has_workload: - self.workload.add_job(self) + #if has_workload: + # self.workload.add_job(self) @property def res(self): @@ -224,11 +224,10 @@ class JobDescription: for p in self._additional_profiles} scheduler._batsim.submit_job( - self.id, + str(self.id), self.res, self.walltime, - self.profile.name, - self.workload.name, + self.workload.name+'!'+self.profile.name, self.subtime, self.profile.to_dict()) scheduler._batsim.submit_profiles( @@ -389,6 +388,9 @@ class WorkloadDescription: elif job.id > self._last_job_id: self._last_job_id = job.id + 1 + jid = job._workload.name + '!' + str(job.id) + job.id = jid + self._jobmap[job.id] = job def remove_job(self, job): @@ -396,7 +398,7 @@ class WorkloadDescription: :param job: The `JobDescription`. """ - assert not job.self.submitted, "Job was already submitted and can not be removed from the workload" + assert not job.submitted, "Job was already submitted and can not be removed from the workload" self._jobs.remove(job) job._workload = None diff --git a/batsim/validatingmachine.py b/batsim/validatingmachine.py index cf5635c64c5721dc2e004cba38c0899606d238c7..874d36207beb09977a946548d7cb8ba5e5886160 100644 --- a/batsim/validatingmachine.py +++ b/batsim/validatingmachine.py @@ -1,6 +1,6 @@ from batsim.batsim import BatsimScheduler from sortedcontainers import SortedSet -from procset import ProcSet, ProcInt +from procset import ProcSet class ValidatingMachine(BatsimScheduler): @@ -22,18 +22,20 @@ class ValidatingMachine(BatsimScheduler): self.scheduler.onAfterBatsimInit() def onSimulationBegins(self): - self.nb_res = self.bs.nb_res + self.nb_res = self.bs.nb_compute_resources self.availableResources = SortedSet(range(self.nb_res)) self.jobs_waiting = [] self.previousAllocations = dict() - # intercept job start - self.bs_start_jobs_continuous = self.bs.start_jobs_continuous - self.bs.start_jobs_continuous = self.start_jobs_continuous - self.bs_start_jobs = self.bs.start_jobs - self.bs.start_jobs = self.start_jobs + # save real job start function + self.real_start_jobs = self.bs.start_jobs + self.real_execute_jobs = self.bs.execute_jobs + # intercept job start self.scheduler.bs = self.bs + self.scheduler.bs.start_jobs = self.start_jobs_valid + self.scheduler.bs.execute_jobs = self.execute_jobs_valid + self.scheduler.onSimulationBegins() def onSimulationEnds(self): @@ -67,18 +69,17 @@ class ValidatingMachine(BatsimScheduler): def onRequestedCall(self): self.scheduler.onRequestedCall() - def start_jobs_continuous(self, allocs): - jobs = [] - for (job, (first_res, last_res)) in allocs: - self.previousAllocations[job.id] = range(first_res, last_res + 1) + + def start_jobs_valid(self, jobs, res): + for j in jobs: try: - self.jobs_waiting.remove(job) + self.jobs_waiting.remove(j) except KeyError: raise ValueError( "Job {} was not waiting (waiting: {})".format( - job, [ - j2.id for j2 in self.jobs_waiting])) - for r in range(first_res, last_res + 1): + j, [j2.id for j2 in self.jobs_waiting])) + self.previousAllocations[j.id] = res[j.id] + for r in res[j.id]: try: self.availableResources.remove(r) except KeyError: @@ -86,12 +87,10 @@ class ValidatingMachine(BatsimScheduler): "Resource {} was not available (available: {})".format( r, list( self.availableResources))) - job.allocation = ProcInt(first_res, last_res) - jobs.append(job) - - self.bs.execute_jobs(jobs) + j.allocation = ProcSet(*res[j.id]) + self.real_execute_jobs(jobs) - def start_jobs(self, jobs, res): + def execute_jobs_valid(self, jobs, io_jobs=None): for j in jobs: try: self.jobs_waiting.remove(j) @@ -99,8 +98,8 @@ class ValidatingMachine(BatsimScheduler): raise ValueError( "Job {} was not waiting (waiting: {})".format( j, [j2.id for j2 in self.jobs_waiting])) - self.previousAllocations[j.id] = res[j.id] - for r in res[j.id]: + self.previousAllocations[j.id] = j.allocation + for r in j.allocation: try: self.availableResources.remove(r) except KeyError: @@ -108,5 +107,4 @@ class ValidatingMachine(BatsimScheduler): "Resource {} was not available (available: {})".format( r, list( self.availableResources))) - job.allocation = ProcSet(*res[job.id]) - self.bs.execute_jobs(jobs) + self.real_execute_jobs(jobs, io_jobs) \ No newline at end of file diff --git a/schedulers/easyBackfill.py b/schedulers/easyBackfill.py index f08f2197cdcb8f06b410408bf0bca4b402d9e9a8..a32472bdb46ca5d7ee57ab59d53f65dc886b09a9 100644 --- a/schedulers/easyBackfill.py +++ b/schedulers/easyBackfill.py @@ -6,7 +6,7 @@ This scheduler consider job as rectangle. from batsim.batsim import BatsimScheduler from sortedcontainers import SortedListWithKey -from procset import ProcInt +from procset import ProcSet INFINITY = float('inf') @@ -286,7 +286,7 @@ class EasyBackfill(BatsimScheduler): if len(allocs) > 0: jobs = [] for (job, (first_res, last_res)) in allocs: - job.allocation = ProcInt(first_res, last_res) + job.allocation = ProcSet((first_res, last_res)) jobs.append(job) self.bs.execute_jobs(jobs) diff --git a/schedulers/fillerSched.py b/schedulers/fillerSched.py index bb38df835ae5af46f5d4baa2e4e44f7163ef17f6..e2587e7c1d90de23db3adfae96551d4f3a59166f 100644 --- a/schedulers/fillerSched.py +++ b/schedulers/fillerSched.py @@ -2,7 +2,7 @@ from batsim.batsim import BatsimScheduler, Batsim import sys import os -from procset import ProcSet, ProcInt +from procset import ProcSet from itertools import islice @@ -20,7 +20,7 @@ class FillerSched(BatsimScheduler): self.sched_delay = 0.005 self.openJobs = set() - self.availableResources = ProcSet(ProcInt(0,self.bs.nb_compute_resources-1)) + self.availableResources = ProcSet((0,self.bs.nb_compute_resources-1)) def scheduleJobs(self): @@ -34,6 +34,7 @@ class FillerSched(BatsimScheduler): nb_res_req = job.requested_resources if nb_res_req <= len(self.availableResources): + # Retrieve the *nb_res_req* first availables resources job_alloc = ProcSet(*islice(self.availableResources, nb_res_req)) job.allocation = job_alloc scheduledJobs.append(job) diff --git a/schedulers/easyBackfillNotopo.py b/schedulers/unMaintained/easyBackfillNotopo.py similarity index 100% rename from schedulers/easyBackfillNotopo.py rename to schedulers/unMaintained/easyBackfillNotopo.py diff --git a/schedulers/easyEnergyBudget.py b/schedulers/unMaintained/easyEnergyBudget.py similarity index 99% rename from schedulers/easyEnergyBudget.py rename to schedulers/unMaintained/easyEnergyBudget.py index 37c0d400e147e531a2bf70d3928e54fec418d7e0..9d4d09a33e26fd27c63cb65357ff49ab6b87d7e4 100644 --- a/schedulers/easyEnergyBudget.py +++ b/schedulers/unMaintained/easyEnergyBudget.py @@ -3,7 +3,7 @@ from schedulers.intervalContainer import * import math from enum import Enum -from procset import ProcInt +from procset import ProcSet class State(Enum): @@ -190,7 +190,7 @@ class EasyEnergyBudget(EasyBackfill): else: jobs = [] for (job, (first_res, last_res)) in allocs: - job.allocation = ProcInt(first_res, last_res) + job.allocation = ProcSet((first_res, last_res)) jobs.append(job) self.bs.execute_jobs(jobs) @@ -527,7 +527,7 @@ def nodes_states_WantingToStartJob_SwitchedON(self, start, end, fromState, toSta del self.waiting_allocs[a[0]] jobs = [] for (job, (first_res, last_res)) in allocs_to_start: - job.allocation = ProcInt(first_res, last_res) + job.allocation = ProcSet((first_res, last_res)) jobs.append(job) self.bs.execute_jobs(jobs) diff --git a/schedulers/easySjbfBackfillNotopo.py b/schedulers/unMaintained/easySjbfBackfillNotopo.py similarity index 100% rename from schedulers/easySjbfBackfillNotopo.py rename to schedulers/unMaintained/easySjbfBackfillNotopo.py diff --git a/setup.py b/setup.py index 11ed8a33b4932a404ec92854e6c6d16ee3007f99..4cd4078a39f1cae52be5b8459fd2742be249b265 100755 --- a/setup.py +++ b/setup.py @@ -46,10 +46,31 @@ class UserCommand(Command): class TestCommand(UserCommand): description = 'Run tests' - user_options = [] + user_options = [ + ('batsim-bin=', None, 'Path/to/batsim'), + ('workloads-basedir=', None, 'Path/to/batsim/workloads'), + ('platforms-basedir=', None, 'Path/to/batsim/platforms') + ] + + def initialize_options(self): + self.batsim_bin = None + self.workloads_basedir = None + self.platforms_basedir = None + + def finalize_options(self): + self.args = [] + if self.batsim_bin is not None: + self.args.append('BATSIMBIN=--batsim-bin=' + str(self.batsim_bin)) + if self.workloads_basedir is not None: + self.args.append('WORKLOADSDIR=--workloads-basedir=' + str(self.workloads_basedir)) + if self.platforms_basedir is not None: + self.args.append('PLATFORMSDIR=--platforms-basedir=' + str(self.platforms_basedir)) def run(self): - self.run_external_command("make", cwd="tests") + if len(self.args) > 0: + self.run_external_command("make", *self.args ,cwd="tests") + else: + self.run_external_command("make", cwd="tests") class DocCommand(UserCommand): diff --git a/tests/Makefile b/tests/Makefile index 12ece74936e40586c32340b9bb3941f0d15ff1b2..77af9751275cdab6c058db734c2a08a97424f784 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -32,7 +32,7 @@ gantts_okular: $(GANTTS_OUT) python3 ../../../../evalys/evalys/gantt.py $* $*/gantt.png generate: - python3 generate.py + python3 generate.py $(BATSIMBIN) $(WORKLOADSDIR) $(PLATFORMSDIR) clean: rm tests/*/out_jobs.csv diff --git a/tests/generate.py b/tests/generate.py index 28d6bf372a1a0c129b3e1864eb64e4324fbaaed4..4d770e73954a82d976b4a5f7f25c71d8c2de1a69 100644 --- a/tests/generate.py +++ b/tests/generate.py @@ -7,7 +7,7 @@ import os.path import json import copy - +''' def generate_energy( workloads_basedir, platforms_basedir, @@ -94,7 +94,7 @@ def generate_energy( }, "scheduler": copy.deepcopy(s) } for s in schedulers for w in workloads_to_use] - +''' def generate_basic( workloads_basedir, @@ -151,7 +151,7 @@ def generate_sched_static( options): schedulers = [] - schedulers += [ + ''' Problems in parent_job status { "name_expe": "sched_delayProfilesAsTasks", "name": "schedDelayProfilesAsTasks", @@ -159,8 +159,11 @@ def generate_sched_static( "protection": True, "interpreter": "coverage", "options": { - } + }, + "dynamic": True }, + ''' + schedulers += [ { "name_expe": "sched_fillerSched", "name": "schedFiller", @@ -168,7 +171,8 @@ def generate_sched_static( "protection": True, "interpreter": "coverage", "options": { - } + }, + "dynamic":False }, { "name_expe": "sched_backfilling", @@ -177,7 +181,8 @@ def generate_sched_static( "protection": True, "interpreter": "coverage", "options": { - } + }, + "dynamic":False }, ] @@ -208,7 +213,7 @@ def generate_sched_static( "job_submission": { "forward_profiles": True, "from_scheduler": { - "enabled": True, + "enabled": s["dynamic"], "acknowledge": True } } @@ -282,7 +287,7 @@ def generate_sched_script( "job_submission": { "forward_profiles": True, "from_scheduler": { - "enabled": True, + "enabled": False, "acknowledge": True } } @@ -367,12 +372,13 @@ def generate_sched( batsim_bin, batsim_args, options) + ''' Pyhton is not finding the path to batsim.sched.workloads generate_sched_script( workloads_basedir, platforms_basedir, batsim_bin, batsim_args, - options) + options)''' generate_sched_dynamic( workloads_basedir, platforms_basedir, @@ -449,13 +455,14 @@ def main(args): batsim_args, options) + ''' easyEnergyBudget scheduler is no longer maintained if energy: generate_energy( workloads_basedir, platforms_basedir, batsim_bin, batsim_args, - options) + options)''' if sched: generate_sched( diff --git a/tests/schedulers/dynamicTestScheduler.py b/tests/schedulers/dynamicTestScheduler.py index 5fea33da5f842390535f0f2687b6afcc8a6f4a3f..04577f081bb4bfce18d84bef45b874405331f3de 100644 --- a/tests/schedulers/dynamicTestScheduler.py +++ b/tests/schedulers/dynamicTestScheduler.py @@ -38,6 +38,7 @@ class DynamicTestScheduler(Scheduler): ])])) w.submit(self) + self.notify_submission_finished() def schedule(self): return filler_sched(self,