Commit 48d997b4 authored by David Glesser's avatar David Glesser
Browse files

Merge branch 'c++'

parents 512d22f5 4823b0e1
......@@ -3,14 +3,20 @@
import json
import struct
import socket
import sys
class Batsim(object):
def __init__(self, json_file, scheduler, server_address = '/tmp/bat_socket', verbose=0):
def __init__(self, json_file, scheduler, validatingmachine=None, server_address = '/tmp/bat_socket', verbose=0):
self.server_address = server_address
self.verbose = verbose
self.scheduler = scheduler
if validatingmachine is None:
self.scheduler = scheduler
else:
self.scheduler = validatingmachine(scheduler)
#load json file
self._load_json_workload_profile(json_file)
......@@ -40,14 +46,14 @@ class Batsim(object):
return self._current_time
def start_job(self, jobid, res):
self._msgs_to_send.append( ( self.time(), "J:"+str(jobid)+"="+ ",".join([str(i) for i in res]) ) )
def start_job(self, job, res):
self._msgs_to_send.append( ( self.time(), "J:"+str(job.id)+"="+ ",".join([str(i) for i in res]) ) )
def start_jobs(self, jobids, res):
def start_jobs(self, jobs, res):
msg = "J:"
for jid in jobids:
msg += str(jid) + "="
for r in res[jid]:
for j in jobs:
msg += str(j.id) + "="
for r in res[j.id]:
msg += str(r) + ","
msg = msg[:-1] + ";" # replace last comma by semicolon separtor between jobs
msg = msg[:-1] # remove last semicolon
......@@ -90,9 +96,11 @@ class Batsim(object):
if data[1] == 'N':
self.scheduler.onNOP()
if data[1] == 'S':
self.scheduler.onJobSubmission(int(data[2]))
self.scheduler.onJobSubmission(self.jobs[int(data[2])])
elif data[1] == 'C':
self.scheduler.onJobCompletion(int(data[2]))
j = self.jobs[int(data[2])]
j.finish_time = float(data[0])
self.scheduler.onJobCompletion(j)
elif data[1] == 'p':
opts = data[2].split('=')
self.scheduler.onMachinePStateChanged(int(opts[0]), int(opts[1]))
......@@ -133,6 +141,7 @@ class Job(object):
self.requested_time = walltime
self.requested_resources = res
self.profile = profile
self.finish_time = None#will be set on completion by batsim
class BatsimScheduler(object):
......
from batsim import BatsimScheduler
from sortedcontainers import SortedSet
class ValidatingMachine(BatsimScheduler):
"""
This class tries to do a lot of checks to prevent from stupid and invisible errors.
You should use this when you are developping and testing a scheduler.
It checks that:
- not 2 jobs use the same resource as the same time
- a job is only started once
- a job is launched after his submit time
"""
def __init__(self, scheduler):
self.scheduler = scheduler
def onAfterBatsimInit(self):
self.nb_res = self.bs.nb_res
self.availableResources = SortedSet(range(self.nb_res))
self.jobs_waiting = []
self.previousAllocations = dict()
#intercept job start
self.bs_start_job = self.bs.start_job
self.bs.start_job = self.start_job
self.bs_start_jobs = self.bs.start_jobs
self.bs.start_jobs = self.start_jobs
self.scheduler.bs = self.bs
self.scheduler.onAfterBatsimInit()
def onJobRejection(self):
self.scheduler.onJobRejection()
def onNOP(self):
self.scheduler.onNOP()
def onJobSubmission(self, job):
self.jobs_waiting.append(job)
self.scheduler.onJobSubmission(job)
def onJobCompletion(self, job):
for res in self.previousAllocations[job.id]:
self.availableResources.add(res)
self.previousAllocations.pop(job.id)
self.scheduler.onJobCompletion(job)
def onMachinePStateChanged(self, nodeid, pstate):
self.scheduler.onMachinePStateChanged(nodeid, pstate)
def start_job(self, job, res):
self.previousAllocations[job.id] = res
self.jobs_waiting.remove(job)
for r in res:
self.availableResources.remove(r)
self.bs_start_job(job, res)
def start_jobs(self, jobs, res):
for j in jobs:
self.jobs_waiting.remove(j)
self.previousAllocations[j.id] = res[j.id]
for r in res[j.id]:
self.availableResources.remove(r)
self.bs_start_jobs(jobs, res)
......@@ -5,6 +5,7 @@
import sys
from batsim.batsim import Batsim
from batsim.validatingmachine import ValidatingMachine
......@@ -43,6 +44,9 @@ def instanciate_scheduler(name):
scheduler = instanciate_scheduler(scheduler_filename)
bs = Batsim(json_filename, scheduler, verbose=999)
#vm = None
vm = ValidatingMachine
bs = Batsim(json_filename, scheduler, validatingmachine=vm, verbose=999)
bs.start()
\ No newline at end of file
......@@ -36,12 +36,12 @@ class FillerSched(BatsimScheduler):
# Iterating over a copy to be able to remove jobs from openJobs at traversal
for job in set(self.openJobs):
nb_res_req = self.bs.jobs[job].requested_resources
nb_res_req = job.requested_resources
if nb_res_req <= len(self.availableResources):
res = self.availableResources[:nb_res_req]
self.jobs_res[job] = res
self.previousAllocations[job] = res
self.jobs_res[job.id] = res
self.previousAllocations[job.id] = res
scheduledJobs.append(job)
for r in res:
......@@ -66,7 +66,7 @@ class FillerSched(BatsimScheduler):
self.scheduleJobs()
def onJobCompletion(self, job):
for res in self.previousAllocations[job]:
for res in self.previousAllocations[job.id]:
self.availableResources.add(res)
self.previousAllocations.pop(job)
self.previousAllocations.pop(job.id)
self.scheduleJobs()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment