Commit d3d3ddb3 authored by David Glesser's avatar David Glesser
Browse files

Add PyBatsim, a python module that helps you make your own scheduler.

As today, only one scheduler is written here. Soon, more toy schedulers will be added!
parent a288a391
PyBatsim
========
PyBatsim helps you developping your own scheduler in python!
launcher.py -> This is the main entry point to launch things
Ex: python launcher.py filler_sched ../../workload_profiles/test_workload_profile.json (a server should be started)
batsim/
batsim.py -> this class helps you communicate with the batsim server
schedulers/ -> contains all the schedulers
Schedulers name should follow this convention:
foo_bar.py contains the FooBar classname which has as an ancestor BatsimScheduler
filler_sched.py -> A kind of first fit without topology scheduler
#/usr/bin/python3
import json
import struct
import socket
class Batsim(object):
def __init__(self, json_file, scheduler, server_address = '/tmp/bat_socket', verbose=0):
self.server_address = server_address
self.verbose = verbose
self.scheduler = scheduler
#load json file
self._load_json_workload_profile(json_file)
#open connection
self._connection = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
print("[BATSIM]: connecting to %r" % server_address)
try:
self._connection.connect(server_address)
if self.verbose > 1: print('[BATSIM]: connected')
except socket.error:
print("[BATSIM]: socket error")
sys.exit(1)
#initialize some public attributes
self.last_msg_recv_time = -1
self.scheduler.bs = self
self.scheduler.onAfterBatsimInit()
def time(self):
return self._current_time
def consume_time(self, t):
self._current_time += t
return self._current_time
def start_job(self, jobid, res):
self._msgs_to_send.append( ( self.time(), "J:"+str(jobid)+"="+ ",".join([str(i) for i in res]) ) )
def start_jobs(self, jobids, res):
msg = "J:"
for jid in jobids:
msg += str(jid) + "="
for r in res[jid]:
msg += str(r) + ","
msg = msg[:-1] + ";" # replace last comma by semicolon separtor between jobs
msg = msg[:-1] # remove last semicolon
self._msgs_to_send.append( ( self.time(), msg ) )
def do_next_event(self):
self._read_bat_msg()
def start(self):
while True:
self.do_next_event()
def _read_bat_msg(self):
lg_str = self._connection.recv(4)
if not lg_str:
print("[BATSIM]: connection is closed by batsim core")
exit(1)
lg = struct.unpack("i",lg_str)[0]
msg = self._connection.recv(lg).decode()
if self.verbose > 0: print('[BATSIM]: from batsim : %r' % msg)
sub_msgs = msg.split('|')
data = sub_msgs[0].split(":")
version = int(data[0])
self.last_msg_recv_time = float(data[1])
self._current_time = float(data[1])
if self.verbose > 1: print("[BATSIM]: version: %r now: %r" % (version, self.time()))
# [ (timestamp, txtDATA), ...]
self._msgs_to_send = []
for i in range(1, len(sub_msgs)):
data = sub_msgs[i].split(':')
if data[1] == 'R':
self.scheduler.onJobRejection()
if data[1] == 'N':
self.scheduler.onNOP()
if data[1] == 'S':
self.scheduler.onJobSubmission(int(data[2]))
elif data[1] == 'C':
self.scheduler.onJobCompletion(int(data[2]))
elif data[1] == 'p':
opts = data[2].split('=')
self.scheduler.onMachinePStateChanged(int(opts[0]), int(opts[1]))
elif data[1] == 'J' or data[1] == 'P':
raise "Only the server can receive this kind of message"
else:
raise Exception("Unknow submessage type " + data[1] )
msg = "0:" + str(self.last_msg_recv_time) + "|"
if len(self._msgs_to_send) > 0:
#sort msgs by timestamp
self._msgs_to_send = sorted(self._msgs_to_send, key=lambda m: m[0])
for m in self._msgs_to_send:
msg += str(m[0])+":"+m[1]
else:
msg += str(self.time()) +":N"
if self.verbose > 0: print("[BATSIM]: to batsim : %r" % msg)
lg = struct.pack("i",int(len(msg)))
self._connection.sendall(lg)
self._connection.sendall(msg.encode())
def _load_json_workload_profile(self, filename):
wkp_file = open(filename)
wkp = json.load(wkp_file)
self.nb_res = wkp["nb_res"]
self.jobs = {j["id"]: Job(j["id"], j["subtime"], j["walltime"], j["res"], j["profile"]) for j in wkp["jobs"]}
#TODO: profiles
class Job(object):
def __init__(self, id, subtime, walltime, res, profile):
self.id = id
self.submit_time = subtime
self.requested_time = walltime
self.requested_resources = res
self.profile = profile
class BatsimScheduler(object):
def onAfterBatsimInit(self):
#You now have access to self.bs and all other functions
pass
def onJobRejection(self):
raise "not implemented"
def onNOP(self):
raise "not implemented"
def onJobSubmission(self, job):
raise "not implemented"
def onJobCompletion(self, job):
raise "not implemented"
def onMachinePStateChanged(self, nodeid, pstate):
raise "not implemented"
#filler_sched.py ../../workload_profiles/test_workload_profile.json
import sys
from batsim.batsim import Batsim
scheduler_filename = sys.argv[1]
json_filename = sys.argv[2]
def module_to_class(module):
"""
transform foo_bar.py to FooBar
"""
return ''.join(w.title() for w in str.split(module, "_"))
def filename_to_module(fn):
return str(fn).split(".")[0]
def instanciate_scheduler(name):
my_module = name#filename_to_module(my_filename)
my_class = module_to_class(my_module)
#load module(or file)
package = __import__ ('schedulers', fromlist=[my_module])
if my_module not in package.__dict__:
print "No such scheduler (module file not found)."
exit()
if my_class not in package.__dict__[my_module].__dict__:
print "No such scheduler (class within the module file not found)."
exit()
#load the class
scheduler_non_instancied = package.__dict__[my_module].__dict__[my_class]
scheduler = scheduler_non_instancied()
return scheduler
scheduler = instanciate_scheduler(scheduler_filename)
bs = Batsim(json_filename, scheduler, verbose=999)
bs.start()
\ No newline at end of file
#/usr/bin/python3
from batsim.batsim import BatsimScheduler, Batsim
import sys
import os
from random import sample
from sortedcontainers import SortedSet
class FillerSched(BatsimScheduler):
def __init__(self):
pass
def onAfterBatsimInit(self):
self.nb_completed_jobs = 0
self.jobs_res = {}
self.jobs_completed = []
self.jobs_waiting = []
self.sched_delay = 5.0
self.openJobs = set()
self.availableResources = SortedSet(range(self.bs.nb_res))
self.previousAllocations = dict()
def scheduleJobs(self):
scheduledJobs = []
print('openJobs = ', self.openJobs)
print('available = ', self.availableResources)
print('previous = ', self.previousAllocations)
# Iterating over a copy to be able to remove jobs from openJobs at traversal
for job in set(self.openJobs):
nb_res_req = self.bs.jobs[job].requested_resources
if nb_res_req <= len(self.availableResources):
res = self.availableResources[:nb_res_req]
self.jobs_res[job] = res
self.previousAllocations[job] = res
scheduledJobs.append(job)
for r in res:
self.availableResources.remove(r)
self.openJobs.remove(job)
# update time
self.bs.consume_time(self.sched_delay)
# send to uds
if len(scheduledJobs) > 0:
self.bs.start_jobs(scheduledJobs, self.jobs_res)
print('openJobs = ', self.openJobs)
print('available = ', self.availableResources)
print('previous = ', self.previousAllocations)
print('')
def onJobSubmission(self, job):
self.openJobs.add(job)
self.scheduleJobs()
def onJobCompletion(self, job):
for res in self.previousAllocations[job]:
self.availableResources.add(res)
self.previousAllocations.pop(job)
self.scheduleJobs()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment