Commit 723ebf3f authored by S. Lackner's avatar S. Lackner

[code] Let pybatsim fail on communication timeout (e.g. when Batsim/Simgrid deadlocks)

parent 92f9cb37
......@@ -12,9 +12,10 @@ import zmq
class NetworkHandler:
def __init__(self, socket_endpoint='tcp://*:28000', verbose=0):
def __init__(self, socket_endpoint='tcp://*:28000', verbose=0, timeout=1000):
self.socket_endpoint = socket_endpoint
self.verbose = verbose
self.timeout = timeout
self.context = zmq.Context()
self.connection = None
......@@ -27,7 +28,11 @@ class NetworkHandler:
def recv(self):
assert self.connection, "Connection not open"
msg = json.loads(self.connection.recv().decode('utf-8'))
try:
msg = self.connection.recv()
except zmq.error.Again:
return None
msg = json.loads(msg.decode('utf-8'))
if self.verbose > 0:
print('[PYBATSIM]: RECEIVED_MSG\n {}'.format(
......@@ -43,6 +48,7 @@ class NetworkHandler:
print("[PYBATSIM]: binding to {addr}"
.format(addr=self.socket_endpoint), flush=True)
self.connection.bind(self.socket_endpoint)
self.connection.RCVTIMEO = self.timeout
def close(self):
if self.connection:
......@@ -313,7 +319,11 @@ class Batsim(object):
cont = self.do_next_event()
def _read_bat_msg(self):
msg = self.network.recv()
msg = None
while msg is None:
msg = self.network.recv()
if msg is None:
self.scheduler.onDeadlock()
self._current_time = msg["now"]
......@@ -533,6 +543,9 @@ class BatsimScheduler(object):
def onSimulationEnds(self):
pass
def onDeadlock(self):
raise ValueError("[PYBATSIM]: Batsim is not responding (maybe deadlocked)")
def onNOP(self):
raise NotImplementedError()
......
......@@ -57,6 +57,12 @@ class BaseBatsimScheduler(BatsimScheduler):
self._scheduler.on_end()
self._scheduler._on_post_end()
def onDeadlock(self):
self._scheduler.debug(
"batsim has reached a deadlock or is not responding",
type="deadlock")
self._scheduler.on_deadlock()
def onNOP(self):
self._scheduler._update_time()
self._scheduler.debug(
......@@ -500,6 +506,9 @@ class Scheduler(metaclass=ABCMeta):
"""Hook similar to the low-level API."""
pass
def on_deadlock(self):
raise ValueError("Batsim has reached a deadlock")
def on_jobs_killed(self, jobs):
"""Hook similar to the low-level API.
......
......@@ -41,6 +41,9 @@ class ValidatingMachine(BatsimScheduler):
def onNOP(self):
self.scheduler.onNOP()
def onDeadlock(self):
self.scheduler.onDeadlock()
def onJobSubmission(self, job):
self.jobs_waiting.append(job)
self.scheduler.onJobSubmission(job)
......
......@@ -11,6 +11,7 @@ Options:
-p --protect Protect the scheduler using a validating machine.
-s --socket-endpoint=<endpoint> Batsim socket endpoint to use [default: tcp://*:28000]
-o --options=<options_string> A Json string to pass to the scheduler [default: {}]
-t --timeout=<timeout> How long to wait for responses from Batsim [default: 1000]
'''
import json
......@@ -90,6 +91,8 @@ if __name__ == "__main__":
else:
vm = None
timeout = int(arguments['--timeout'] or 1000)
options = json.loads(arguments['--options'])
scheduler_filename = arguments['<scheduler>']
......@@ -102,7 +105,7 @@ if __name__ == "__main__":
scheduler = instanciate_scheduler(scheduler_filename, options=options)
bs = Batsim(scheduler,
NetworkHandler(socket_endpoint, verbose),
NetworkHandler(socket_endpoint, verbose, timeout),
validatingmachine=vm)
bs.start()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment