Commit bcca9bec authored by S. Lackner's avatar S. Lackner

[tools] improve loading of scheduler module and utilities

parent 5e59ab3c
......@@ -11,6 +11,7 @@ from abc import ABCMeta, abstractmethod
from batsim.batsim import BatsimScheduler, Batsim
from batsim.network import NetworkHandler
from batsim.tools.launcher import launch_scheduler_main
from .resource import Resources, ComputeResource
from .job import Job, Jobs
......@@ -205,6 +206,11 @@ class Scheduler(metaclass=ABCMeta):
"""
@classmethod
def launch_main(cls, **kwargs):
"""Initialise this scheduler class and run it as if it were started with the launcher."""
launch_scheduler_main(cls, **kwargs)
def __init__(self, options={}):
self._options = options
debug = self.options.get("debug", False)
......
......@@ -23,7 +23,6 @@ import importlib.util
import os.path
from batsim.batsim import Batsim, BatsimScheduler, NetworkHandler
from batsim.sched import as_scheduler
from batsim.validatingmachine import ValidatingMachine
from docopt import docopt
......@@ -50,12 +49,10 @@ def instanciate_scheduler(name, options):
# load module(or file)
package = __import__('schedulers', fromlist=[my_module])
if my_module not in package.__dict__:
print("No such scheduler (module file not found).", flush=True)
print("No such scheduler (module file not found).")
sys.exit(1)
if my_class not in package.__dict__[my_module].__dict__:
print(
"No such scheduler (class within the module file not found).",
flush=True)
print("No such scheduler (class within the module file not found).")
sys.exit(1)
# load the class
scheduler_non_instancied = package.__dict__[
......@@ -63,21 +60,40 @@ def instanciate_scheduler(name, options):
# A full file path to the scheduler is expected
else:
# Add path to allow relative imports in the scheduler implementation
sys.path.insert(0, os.path.abspath(os.path.dirname(name)))
sys.path.insert(
0, os.path.abspath(
os.path.dirname(
os.path.dirname(name))))
package_path = os.path.split(os.path.dirname(name))[1]
if package_path:
package_path = [package_path]
else:
package_path = []
module_name = os.path.basename(name).split(".")[0]
module_path = ".".join(package_path + [module_name])
my_class = module_to_class(module_name)
# Add path to allow relative imports in the scheduler implementation
sys.path.append(os.path.abspath(os.path.dirname(name)))
# Try to load the module with the outer package
try:
mod = importlib.import_module(module_path)
# Try to load only the module as fallback
except ModuleNotFoundError:
spec = importlib.util.spec_from_file_location(
"schedulers." + module_name, name)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
# Load python file as module schedulers.<file>
spec = importlib.util.spec_from_file_location(
"schedulers." + module_name, name)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
del sys.path[1]
del sys.path[0]
scheduler_non_instancied = mod.__dict__[my_class]
if isinstance(scheduler_non_instancied, types.FunctionType):
from batsim.sched import as_scheduler
scheduler = as_scheduler()(scheduler_non_instancied)
scheduler = scheduler(options)
else:
......@@ -87,6 +103,87 @@ def instanciate_scheduler(name, options):
return scheduler
def launch_scheduler(scheduler,
socket_endpoint="tcp://*:28000",
event_socket_endpoint="tcp://*:28001",
options={},
timeout=2000,
protect=True,
verbose=0):
if protect:
vm = ValidatingMachine
else:
vm = None
print("Scheduler: {} ({})".format(scheduler.__class__.__name__, options))
time_start = time.time()
try:
bs = Batsim(scheduler,
NetworkHandler(socket_endpoint, verbose, timeout),
NetworkHandler(event_socket_endpoint, type=zmq.PUB),
validatingmachine=vm)
aborted = False
try:
bs.start()
except KeyboardInterrupt:
print("Aborted...")
aborted = True
time_ran = str(timedelta(seconds=time.time() - time_start))
print("Simulation ran for: " + time_ran)
print("Job received:", bs.nb_jobs_received,
", scheduled:", bs.nb_jobs_scheduled,
", rejected:", bs.nb_jobs_rejected,
", killed:", bs.nb_jobs_killed,
", submitted:", bs.nb_jobs_submitted,
", changed:", len(bs.jobs_manually_changed),
", timeout:", bs.nb_jobs_timeout,
", success:", bs.nb_jobs_completed)
if bs.nb_jobs_received != (
bs.nb_jobs_scheduled + bs.nb_jobs_rejected +
len(bs.jobs_manually_changed)):
return 1
return 1 if aborted else 0
except KeyboardInterrupt:
print("Aborted...")
return 1
return 0
def launch_scheduler_main(
scheduler_class,
argv=None,
standalone=True,
**kwargs):
for arg in argv or sys.argv[1:]:
if arg == "--protect":
kwargs["protect"] = True
elif arg == "--verbose":
kwargs["verbose"] = 999
elif arg.startswith("--options="):
kwargs["options"] = json.loads(arg[arg.index("=") + 1:])
elif arg.startswith("--timeout="):
kwargs["timeout"] = int(arg[arg.index("=") + 1:])
elif arg.startswith("--socket-endpoint="):
kwargs["socket_endpoint"] = int(arg[arg.index("=") + 1:])
elif arg.startswith("--event-socket-endpoint="):
kwargs["event_socket_endpoint"] = int(arg[arg.index("=") + 1:])
else:
print("Invalid argument: {}".format(arg))
scheduler = scheduler_class(options)
ret = launch_scheduler(scheduler, **kwargs)
if standalone:
sys.exit(ret)
else:
if ret != 0:
raise ValueError(
"Scheduler exited with return code: {}".format(ret))
def main():
# Retrieve arguments
arguments = docopt(__doc__, version='1.0.0rc2')
......@@ -96,47 +193,26 @@ def main():
else:
verbose = 0
if arguments['--protect']:
vm = ValidatingMachine
else:
vm = None
timeout = int(arguments['--timeout'] or 2000)
protect = bool(arguments['--protect'])
options = json.loads(arguments['--options'])
scheduler_filename = arguments['<scheduler>']
socket_endpoint = arguments['--socket-endpoint']
event_socket_endpoint = arguments['--event-socket-endpoint']
print("Starting simulation...", flush=True)
print("Scheduler:", scheduler_filename, flush=True)
print("Options:", options, flush=True)
time_start = time.time()
scheduler = instanciate_scheduler(scheduler_filename, options=options)
bs = Batsim(scheduler,
NetworkHandler(socket_endpoint, verbose, timeout),
validatingmachine=vm)
bs.start()
time_ran = str(timedelta(seconds=time.time() - time_start))
print("Simulation ran for: " + time_ran, flush=True)
print("Job received:", bs.nb_jobs_received,
", scheduled:", bs.nb_jobs_scheduled,
", rejected:", bs.nb_jobs_rejected,
", killed:", bs.nb_jobs_killed,
", submitted:", bs.nb_jobs_submitted,
", changed:", len(bs.jobs_manually_changed),
", timeout:", bs.nb_jobs_timeout,
", success:", bs.nb_jobs_completed, flush=True)
if bs.nb_jobs_received != (
bs.nb_jobs_scheduled + bs.nb_jobs_rejected +
len(bs.jobs_manually_changed)):
sys.exit(1)
sys.exit(0)
return launch_scheduler(scheduler,
socket_endpoint,
event_socket_endpoint,
options,
timeout,
protect,
verbose)
if __name__ == "__main__":
main()
sys.exit(main())
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment