Verified Commit 9d936be8 authored by Raphaël Bleuse's avatar Raphaël Bleuse
Browse files

Remove validating machine

The validating machine checks a very simple set of constraints.
The use of the machine has been obsoleted by Batsim internal checks:
Batsim checks more and is more opinionated.
parent 22e4d18c
......@@ -23,8 +23,7 @@ class Batsim(object):
def __init__(self, scheduler,
network_endpoint,
timeout,
event_endpoint=None,
validatingmachine=None):
event_endpoint=None):
self.logger = logging.getLogger(__name__)
......@@ -43,10 +42,7 @@ class Batsim(object):
sys.setrecursionlimit(10000)
if validatingmachine is None:
self.scheduler = scheduler
else:
self.scheduler = validatingmachine(scheduler)
self.scheduler = scheduler
# initialize some public attributes
self.nb_jobs_submitted_from_batsim = 0
......
......@@ -10,7 +10,6 @@ Options:
-v --verbosity=<verbosity-level> Sets the verbosity level. Available
values are {debug, info, warning, error, critical}
Default: info
-p --protect Protect the scheduler using a validating machine.
-s --socket-endpoint=<endpoint> Batsim socket endpoint to use [default: tcp://*:28000]
-e --event-socket-endpoint=<endpoint> Socket endpoint to use to publish scheduler events
-o --options=<options_string> A Json string to pass to the scheduler [default: {}]
......@@ -41,8 +40,6 @@ def main():
timeout = int(arguments['--timeout'] or float("inf"))
protect = bool(arguments['--protect'])
if arguments["--options-file"]:
with open(arguments["--options-file"]) as options_file:
options = json.load(options_file)
......@@ -61,8 +58,7 @@ def main():
socket_endpoint,
event_socket_endpoint,
options,
timeout,
protect)
timeout)
if __name__ == "__main__":
......
......@@ -254,9 +254,6 @@ def prepare_scheduler_cl(options, verbose):
if options["scheduler"].get("verbose", False):
sched_cl.append('-v')
if options["scheduler"].get("protection", False):
sched_cl.append('-p')
if "socket-endpoint" in options["scheduler"]:
sched_cl.append('-s')
sched_cl.append(options["scheduler"]["socket-endpoint"])
......
......@@ -14,7 +14,6 @@ import importlib.util
import os.path
from batsim.batsim import Batsim, BatsimScheduler, NetworkHandler
from batsim.validatingmachine import ValidatingMachine
from docopt import docopt
import zmq
......@@ -103,13 +102,7 @@ def launch_scheduler(scheduler,
socket_endpoint,
event_socket_endpoint,
options,
timeout,
protect):
if protect:
vm = ValidatingMachine
else:
vm = None
timeout):
print("Scheduler: {} ({})".format(scheduler.__class__.__name__, options))
time_start = time.time()
......@@ -118,8 +111,7 @@ def launch_scheduler(scheduler,
bs = Batsim(scheduler,
socket_endpoint,
timeout,
event_socket_endpoint,
validatingmachine=vm)
event_socket_endpoint)
aborted = False
# try:
bs.start()
......@@ -154,9 +146,7 @@ def launch_scheduler_main(
standalone=True,
**kwargs):
for arg in argv or sys.argv[1:]:
if arg == "--protect":
kwargs["protect"] = True
elif arg == "--verbose":
if arg == "--verbose":
kwargs["verbose"] = 999
elif arg.startswith("--options="):
kwargs["options"] = json.loads(arg[arg.index("=") + 1:])
......
from batsim.batsim import BatsimScheduler
from sortedcontainers import SortedSet
from procset import ProcSet
class ValidatingMachine(BatsimScheduler):
"""
This class tries to do a lot of checks to prevent from stupid and invisible errors.
You should use this when you are developping and testing a scheduler.
It checks that:
- not 2 jobs use the same resource as the same time
- a job is only started once
- a job is launched after his submit time
"""
def __init__(self, scheduler):
super().__init__()
self.scheduler = scheduler
def onAfterBatsimInit(self):
self.scheduler.onAfterBatsimInit()
def onSimulationBegins(self):
self.nb_res = self.bs.nb_compute_resources
self.availableResources = SortedSet(range(self.nb_res))
self.jobs_waiting = []
self.previousAllocations = dict()
# save real job start function
self.real_start_jobs = self.bs.start_jobs
self.real_execute_jobs = self.bs.execute_jobs
# intercept job start
self.scheduler.bs = self.bs
self.scheduler.bs.start_jobs = self.start_jobs_valid
self.scheduler.bs.execute_jobs = self.execute_jobs_valid
self.scheduler.onSimulationBegins()
def onSimulationEnds(self):
self.scheduler.onSimulationEnds()
def onDeadlock(self):
self.scheduler.onDeadlock()
def onJobSubmission(self, job):
self.jobs_waiting.append(job)
self.scheduler.onJobSubmission(job)
def onJobCompletion(self, job):
for res in self.previousAllocations[job.id]:
self.availableResources.add(res)
self.previousAllocations.pop(job.id)
self.scheduler.onJobCompletion(job)
def onJobMessage(self, timestamp, job, message):
self.scheduler.onJobMessage(timestamp, job, message)
def onJobsKilled(self, jobs):
self.scheduler.onJobsKilled(jobs)
def onMachinePStateChanged(self, nodeid, pstate):
self.scheduler.onMachinePStateChanged(nodeid, pstate)
def onReportEnergyConsumed(self, consumed_energy):
self.scheduler.onReportEnergyConsumed(consumed_energy)
def onRequestedCall(self):
self.scheduler.onRequestedCall()
def start_jobs_valid(self, jobs, res):
for j in jobs:
try:
self.jobs_waiting.remove(j)
except KeyError:
raise ValueError(
"Job {} was not waiting (waiting: {})".format(
j, [j2.id for j2 in self.jobs_waiting]))
self.previousAllocations[j.id] = res[j.id]
for r in res[j.id]:
try:
self.availableResources.remove(r)
except KeyError:
raise ValueError(
"Resource {} was not available (available: {})".format(
r, list(
self.availableResources)))
j.allocation = ProcSet(*res[j.id])
self.real_execute_jobs(jobs)
def execute_jobs_valid(self, jobs, io_jobs=None):
for j in jobs:
try:
self.jobs_waiting.remove(j)
except KeyError:
raise ValueError(
"Job {} was not waiting (waiting: {})".format(
j, [j2.id for j2 in self.jobs_waiting]))
self.previousAllocations[j.id] = j.allocation
for r in j.allocation:
try:
self.availableResources.remove(r)
except KeyError:
raise ValueError(
"Resource {} was not available (available: {})".format(
r, list(
self.availableResources)))
self.real_execute_jobs(jobs, io_jobs)
\ No newline at end of file
......@@ -45,7 +45,6 @@
"name":"fillerSched",
"_comment": "The name of a standard scheduler or the path to a python module containing a scheduler",
"verbose": true,
"protection": true
"verbose": true
}
}
......@@ -21,7 +21,6 @@ def generate_basic(
"name_expe": "basic_filler_sched_",
"name": "fillerSched",
"verbose": False,
"protection": True,
"interpreter": "coverage",
"options": {
}
......@@ -70,7 +69,6 @@ def generate_sched_static(
"name_expe": "sched_fillerSched_",
"name": "schedFiller",
"verbose": False,
"protection": True,
"interpreter": "coverage",
"options": {
},
......@@ -80,7 +78,6 @@ def generate_sched_static(
"name_expe": "sched_backfilling_",
"name": "schedEasySjfBackfill",
"verbose": False,
"protection": True,
"interpreter": "coverage",
"options": {
},
......@@ -130,7 +127,6 @@ def generate_sched_script(
"name_expe": "sched_fillerSched_",
"name": "schedFiller",
"verbose": False,
"protection": True,
"interpreter": "coverage",
"options": {
}
......@@ -139,7 +135,6 @@ def generate_sched_script(
"name_expe": "sched_backfilling_",
"name": "schedEasySjfBackfill",
"verbose": False,
"protection": True,
"interpreter": "coverage",
"options": {
}
......@@ -193,7 +188,6 @@ def generate_sched_dynamic(
"name_expe": "sched_dynamic",
"name": "tests/schedulers/dynamicTestScheduler.py",
"verbose": False,
"protection": True,
"interpreter": "coverage",
"options": {
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment