diff --git a/batsim/batsim.py b/batsim/batsim.py index 770ad528e9d20366dda698a534560aa986db8eae..41e7d44170531506e642df50f358acefa962429a 100644 --- a/batsim/batsim.py +++ b/batsim/batsim.py @@ -120,27 +120,7 @@ class Batsim(object): } }) - def start_jobs_continuous(self, allocs): - """ - allocs should have the following format: - [ (job, (first res, last res)), (job, (first res, last res)), ...] - """ - - if len(allocs) == 0: - return - - for (job, (first_res, last_res)) in allocs: - self._events_to_send.append({ - "timestamp": self.time(), - "type": "EXECUTE_JOB", - "data": { - "job_id": job.id, - "alloc": "{}-{}".format(first_res, last_res) - } - } - ) - self.nb_jobs_scheduled += 1 - + ''' THIS FUNCTION IS DEPRECATED ''' def start_jobs(self, jobs, res): """ args:res: is list of int (resources ids) """ for job in jobs: @@ -280,19 +260,6 @@ class Batsim(object): } }) - def start_jobs_interval_set_strings(self, jobs, res): - """ args:res: is a jobID:interval_set_string dict """ - for job in jobs: - self._events_to_send.append({ - "timestamp": self.time(), - "type": "EXECUTE_JOB", - "data": { - "job_id": job.id, - "alloc": res[job.id] - } - } - ) - self.nb_jobs_scheduled += 1 def get_job(self, event): if self.redis_enabled: diff --git a/batsim/validatingmachine.py b/batsim/validatingmachine.py index e7d08eb5ebb4f375d3a92f8de274973fe658b652..cf5635c64c5721dc2e004cba38c0899606d238c7 100644 --- a/batsim/validatingmachine.py +++ b/batsim/validatingmachine.py @@ -1,5 +1,6 @@ from batsim.batsim import BatsimScheduler from sortedcontainers import SortedSet +from procset import ProcSet, ProcInt class ValidatingMachine(BatsimScheduler): @@ -67,6 +68,7 @@ class ValidatingMachine(BatsimScheduler): self.scheduler.onRequestedCall() def start_jobs_continuous(self, allocs): + jobs = [] for (job, (first_res, last_res)) in allocs: self.previousAllocations[job.id] = range(first_res, last_res + 1) try: @@ -84,7 +86,10 @@ class ValidatingMachine(BatsimScheduler): "Resource {} was not available (available: {})".format( r, list( self.availableResources))) - self.bs_start_jobs_continuous(allocs) + job.allocation = ProcInt(first_res, last_res) + jobs.append(job) + + self.bs.execute_jobs(jobs) def start_jobs(self, jobs, res): for j in jobs: @@ -103,4 +108,5 @@ class ValidatingMachine(BatsimScheduler): "Resource {} was not available (available: {})".format( r, list( self.availableResources))) - self.bs_start_jobs(jobs, res) + job.allocation = ProcSet(*res[job.id]) + self.bs.execute_jobs(jobs) diff --git a/schedulers/easyBackfill.py b/schedulers/easyBackfill.py index 7838bffd65b75d707f2099593446ee8aa0b393dd..f08f2197cdcb8f06b410408bf0bca4b402d9e9a8 100644 --- a/schedulers/easyBackfill.py +++ b/schedulers/easyBackfill.py @@ -6,6 +6,7 @@ This scheduler consider job as rectangle. from batsim.batsim import BatsimScheduler from sortedcontainers import SortedListWithKey +from procset import ProcInt INFINITY = float('inf') @@ -283,7 +284,11 @@ class EasyBackfill(BatsimScheduler): self.listWaitingJob.insert(0, first_job) if len(allocs) > 0: - self.bs.start_jobs_continuous(allocs) + jobs = [] + for (job, (first_res, last_res)) in allocs: + job.allocation = ProcInt(first_res, last_res) + jobs.append(job) + self.bs.execute_jobs(jobs) def allocJobFCFS(self, job, current_time): for l in self.listFreeSpace.generator(): diff --git a/schedulers/easyBackfillNotopo.py b/schedulers/easyBackfillNotopo.py index 8ff2933c6ea04eebd968c9e0987f8d15af410876..7d9b2c24389f940ad37e0f67dae0a53035506624 100644 --- a/schedulers/easyBackfillNotopo.py +++ b/schedulers/easyBackfillNotopo.py @@ -1,7 +1,7 @@ from batsim.batsim import BatsimScheduler from schedulers.common_pyss_adaptator import CpuSnapshot from sortedcontainers import SortedSet - +from procset import ProcSet class EasyBackfillNotopo(BatsimScheduler): """ @@ -16,8 +16,6 @@ class EasyBackfillNotopo(BatsimScheduler): self.cpu_snapshot = CpuSnapshot(self.bs.nb_resources, False) self.unscheduled_jobs = [] - self.jobs_res = {} - self.sched_delay = 5.0 self.availableResources = SortedSet(range(self.bs.nb_resources)) @@ -63,12 +61,12 @@ class EasyBackfillNotopo(BatsimScheduler): for job in scheduledJobs: job.start_time = current_time # just to be sure res = self.availableResources[:job.requested_resources] - self.jobs_res[job.id] = res + job.allocation = ProcSet(*res) self.previousAllocations[job.id] = res for r in res: self.availableResources.remove(r) - self.bs.start_jobs(scheduledJobs, self.jobs_res) + self.bs.execute_jobs(scheduledJobs) def _schedule_head_of_list(self, current_time): tosched = [] diff --git a/schedulers/easyEnergyBudget.py b/schedulers/easyEnergyBudget.py index e746900dc5ffb916a73c6e1f13314e88a46a32fc..37c0d400e147e531a2bf70d3928e54fec418d7e0 100644 --- a/schedulers/easyEnergyBudget.py +++ b/schedulers/easyEnergyBudget.py @@ -3,6 +3,7 @@ from schedulers.intervalContainer import * import math from enum import Enum +from procset import ProcInt class State(Enum): @@ -187,7 +188,11 @@ class EasyEnergyBudget(EasyBackfill): self.nodes_states.changeState( s, e, State.WantingToStartJob, (self, (j, (s, e)))) else: - self.bs.start_jobs_continuous(allocs) + jobs = [] + for (job, (first_res, last_res)) in allocs: + job.allocation = ProcInt(first_res, last_res) + jobs.append(job) + self.bs.execute_jobs(jobs) def power_consumed(self, listFreeSpace, addUsedProc=0): free_procs = float(listFreeSpace.free_processors - addUsedProc) @@ -520,7 +525,12 @@ def nodes_states_WantingToStartJob_SwitchedON(self, start, end, fromState, toSta j, j.requested_time + j.start_time) for a in allocs_to_start: del self.waiting_allocs[a[0]] - self.bs.start_jobs_continuous(allocs_to_start) + jobs = [] + for (job, (first_res, last_res)) in allocs_to_start: + job.allocation = ProcInt(first_res, last_res) + jobs.append(job) + self.bs.execute_jobs(jobs) + def nodes_states_WantingToStartJob_SwitchedOFF(self, start, end, fromState, toState): diff --git a/schedulers/fcfsSchedSleep.py b/schedulers/fcfsSchedSleep.py index 1d13225073ce69bef0fd9b50a31a5736e177bb1b..9cc41aaa38e2054aed468788cbd2c96d6bb1776e 100644 --- a/schedulers/fcfsSchedSleep.py +++ b/schedulers/fcfsSchedSleep.py @@ -20,6 +20,7 @@ from batsim.batsim import BatsimScheduler import sys from sortedcontainers import SortedSet from enum import Enum +from procset import ProcSet class PState(Enum): @@ -47,7 +48,6 @@ class FcfsSchedSleep(BatsimScheduler): def onAfterBatsimInit(self): self.nb_completed_jobs = 0 - self.jobs_res = {} self.jobs_completed = [] self.jobs_waiting = [] @@ -68,7 +68,6 @@ class FcfsSchedSleep(BatsimScheduler): def scheduleJobs(self): print('\n\n\n\n') print('open_jobs = ', self.open_jobs) - print('jobs_res = ', self.jobs_res) print('computingM = ', self.computing_machines) print('idleM = ', self.idle_machines) @@ -90,8 +89,8 @@ class FcfsSchedSleep(BatsimScheduler): # Job fits now -> allocation elif nb_res_req <= len(self.idle_machines): - res = self.idle_machines[:nb_res_req] - self.jobs_res[job.id] = res + res = ProcSet(*self.idle_machines[:nb_res_req]) + job.allocation = res scheduled_jobs.append(job) for r in res: # Machines' states update self.idle_machines.remove(r) @@ -145,15 +144,16 @@ class FcfsSchedSleep(BatsimScheduler): self.bs.consume_time(self.sched_delay) # send to uds - self.bs.start_jobs(scheduled_jobs, self.jobs_res) - self.bs.change_pstates(pstates_to_change) + self.bs.execute_jobs(scheduled_jobs) + for (val, (r1,r2)) in pstates_to_change: + self.bs.set_resource_state([r1], val) def onJobSubmission(self, job): self.open_jobs.append(job) self.scheduleJobs() def onJobCompletion(self, job): - for res in self.jobs_res[job.id]: + for res in job.allocation: self.idle_machines.add(res) self.computing_machines.remove(res) self.machines_states[res] = State.Idle.value diff --git a/schedulers/fillerSched.py b/schedulers/fillerSched.py index 250cd091d68de8b99c114457797d036aa4524886..bb38df835ae5af46f5d4baa2e4e44f7163ef17f6 100644 --- a/schedulers/fillerSched.py +++ b/schedulers/fillerSched.py @@ -2,8 +2,8 @@ from batsim.batsim import BatsimScheduler, Batsim import sys import os -from random import sample -from sortedcontainers import SortedSet +from procset import ProcSet, ProcInt +from itertools import islice class FillerSched(BatsimScheduler): @@ -14,15 +14,13 @@ class FillerSched(BatsimScheduler): def onAfterBatsimInit(self): self.nb_completed_jobs = 0 - self.jobs_res = {} self.jobs_completed = [] self.jobs_waiting = [] self.sched_delay = 0.005 self.openJobs = set() - self.availableResources = SortedSet(range(self.bs.nb_resources)) - self.previousAllocations = dict() + self.availableResources = ProcSet(ProcInt(0,self.bs.nb_compute_resources-1)) def scheduleJobs(self): @@ -30,20 +28,17 @@ class FillerSched(BatsimScheduler): print('openJobs = ', self.openJobs) print('available = ', self.availableResources) - print('previous = ', self.previousAllocations) # Iterating over a copy to be able to remove jobs from openJobs at traversal for job in set(self.openJobs): nb_res_req = job.requested_resources if nb_res_req <= len(self.availableResources): - res = self.availableResources[:nb_res_req] - self.jobs_res[job.id] = res - self.previousAllocations[job.id] = res + job_alloc = ProcSet(*islice(self.availableResources, nb_res_req)) + job.allocation = job_alloc scheduledJobs.append(job) - for r in res: - self.availableResources.remove(r) + self.availableResources -= job_alloc self.openJobs.remove(job) @@ -52,11 +47,10 @@ class FillerSched(BatsimScheduler): # send to uds if len(scheduledJobs) > 0: - self.bs.start_jobs(scheduledJobs, self.jobs_res) + self.bs.execute_jobs(scheduledJobs) print('openJobs = ', self.openJobs) print('available = ', self.availableResources) - print('previous = ', self.previousAllocations) print('') def onJobSubmission(self, job): @@ -64,7 +58,5 @@ class FillerSched(BatsimScheduler): self.scheduleJobs() def onJobCompletion(self, job): - for res in self.previousAllocations[job.id]: - self.availableResources.add(res) - self.previousAllocations.pop(job.id) + self.availableResources |= job.allocation self.scheduleJobs() diff --git a/schedulers/randomSched.py b/schedulers/randomSched.py index 32370b47db6ea7025cc9c92a5dc0bf39c5c92535..e4a53c3ce7a90fbdf3b621dd5ecdffdd014470a8 100644 --- a/schedulers/randomSched.py +++ b/schedulers/randomSched.py @@ -1,7 +1,7 @@ from batsim.batsim import BatsimScheduler from random import sample - +from procset import ProcSet class RandomSched(BatsimScheduler): @@ -17,11 +17,10 @@ class RandomSched(BatsimScheduler): def scheduleJobs(self): scheduledJobs = [] + # Iterating over all open jobs for job in self.openJobs: res = sample(self.res, job.requested_resources) - - # Iterating over all open jobs - for job in set(self.openJobs): + job.allocation = ProcSet(*res) self.jobs_res[job.id] = res scheduledJobs.append(job) @@ -33,7 +32,7 @@ class RandomSched(BatsimScheduler): # send to uds if len(scheduledJobs) > 0: - self.bs.start_jobs(scheduledJobs, self.jobs_res) + self.bs.execute_jobs(scheduledJobs) def onJobSubmission(self, job): self.openJobs.add(job)