Commit 7d5df09e authored by BAIRE Anthony's avatar BAIRE Anthony
Browse files

add the 'rescheduled' future

to let the task implementations detect that they are being rescheduled
parent 9f37db99
......@@ -219,7 +219,7 @@ class Manager:
"""
class _Handle:
__slots__ = "key", "cur", "nxt"
__slots__ = "key", "cur", "nxt", "rescheduled"
def __init__(self, nb_tokens=1, *, executor = default_executor, interruptible = False):
# {key: _TaskHandler}
......@@ -238,7 +238,12 @@ class Manager:
cascade_future(hnd.cur, nxt)
hnd.nxt = None
hnd.cur = asyncio.async(self._process(hnd.key, reset))
# recreate the 'rescheduled' future and return it
hnd.rescheduled = disable_future_warning(asyncio.Future())
return hnd.rescheduled
hnd.rescheduled = disable_future_warning(asyncio.Future())
hnd.cur = asyncio.async(self._process(hnd.key, reset, hnd.rescheduled))
hnd.cur.add_done_callback(lambda fut: self._done(hnd))
log.debug("task scheduled %r %r", self, hnd.key)
return hnd.cur
......@@ -258,6 +263,7 @@ class Manager:
hnd.key = key
hnd.cur = None
hnd.nxt = None
hnd.rescheduled = None
if hnd.cur is None:
# create task
......@@ -266,6 +272,8 @@ class Manager:
# reschedule task
if hnd.nxt is None:
hnd.nxt = asyncio.Future()
if not hnd.rescheduled.done():
hnd.rescheduled.set_result(None)
return hnd.nxt
def _done(self, hnd):
......@@ -330,7 +338,7 @@ class Manager:
@asyncio.coroutine
def _process(self, key, reset):
def _process(self, key, reset, rescheduled):
"""Actual implementation of the job (to be reimplemented in inherited classes)
The Manager class guarantees that this function cannot be called
......@@ -338,9 +346,13 @@ class Manager:
submitted multiple times, they Manager will call this function a second
time after it has terminated).
`rescheduled` is a future whose result is set when the job is being
rescheduled (if process(key) is called before _process(key ...)
terminates.
`reset` is a function that may be called to reset the 'dirty' state of
this key (this is to avoid calling ._process() a second time if not
necessary)
necessary) in that case, a new `rescheduled` future is returned.
"""
raise NotImplementedError()
......@@ -814,7 +826,7 @@ exec /.toolbox/bin/sshd -D
@asyncio.coroutine
def _process(self, webapp_id, reset):
def _process(self, webapp_id, reset, rescheduled):
ctrl = self.ctrl
log.debug("process sandbox %d", webapp_id)
......@@ -1087,7 +1099,7 @@ class JobManager(Manager):
yield from self.run_in_executor(self._remove_job, info, lock=False)
@asyncio.coroutine
def _process(self, job_id, reset):
def _process(self, job_id, reset, rescheduled):
ctrl = self.ctrl
ses = ctrl.session
log.debug("process job id %d", job_id)
......@@ -1227,7 +1239,7 @@ class PullManager(Manager):
self.name = name
@asyncio.coroutine
def _process(self, img, reset):
def _process(self, img, reset, rescheduled):
image, version = img
log.info("pull to the %-10s %s:%s", self.name, image, version)
return self.run_in_executor(self.client.pull, image, version)
......@@ -1239,7 +1251,7 @@ class PushManager(Manager):
self.ctrl = ctrl
@asyncio.coroutine
def _process(self, version_id, reset):
def _process(self, version_id, reset, rescheduled):
ses = self.ctrl.session
with report_error("unable to push version id %d", version_id):
......
......@@ -1564,7 +1564,7 @@ class ManagerTestCase(unittest.TestCase):
conditions[id].wait()
actual_lst.append(("stop", id))
def _process(id, reset):
def _process(id, reset, rescheduled):
with lock:
self.assertNotIn(id, conditions)
conditions[id] = threading.Condition(lock)
......@@ -1837,51 +1837,67 @@ class ManagerTestCase(unittest.TestCase):
def test_manager_notimpl(self):
self.assertRaises(NotImplementedError,
asyncio.get_event_loop().run_until_complete,
controller.Manager(1)._process(1, None))
controller.Manager(1)._process(1, None, None))
@mock.patch("controller.log.exception")
def test_manager_futures(self, mexc):
futures = {}
rescheduled = {}
reset = {}
def worker(id):
def worker(id, rst, rsc_fut):
assert id not in futures or futures[id].done()
fut = futures[id] = concurrent.futures.Future()
reset[id] = rst
rescheduled[id] = rsc_fut
return fut.result()
manager = controller.Manager(2)
manager._process = lambda id, reset: manager.run_in_executor(worker, id)
manager._process = lambda *k: manager.run_in_executor(worker, *k)
@asyncio.coroutine
def run():
# run a task
## (1) run a single task
fut = manager.process(1)
yield from asyncio.sleep(.1)
self.assertFalse(fut.done())
self.assertFalse(rescheduled[1].done())
futures[1].set_result(42)
self.assertEquals(42, (yield from fut))
self.assertFalse(rescheduled[1].done())
# run the same task twice in a row
## (2.a) run the same task twice in a row
fut = manager.process(2)
yield from asyncio.sleep(.1)
self.assertFalse(rescheduled[2].done())
fut2 = manager.process(2)
# -> fut2 should yield after fut (because the task need to be rescheduled)
self.assertIsNot(fut, fut2)
# -> rescheduled[2] must be done (to let the current task know that
# it is being rescheduled)
self.assertIs(rescheduled[2].result(), None)
yield from asyncio.sleep(.1)
self.assertFalse(fut.done())
self.assertFalse(fut2.done())
## (2.b) first execution done
futures[2].set_result(43)
yield from asyncio.sleep(.1)
# expect: fut terminated, fut2 still pending
yield from asyncio.wait_for(fut, timeout=.1)
self.assertEquals(fut.result(), 43)
self.assertFalse(fut2.done())
# since the task is restarted, the 'rescheduled' is recreated
self.assertFalse(rescheduled[2].done())
## (2.c) second execution raises an error
mexc.reset_mock()
exc = RuntimeError(44)
futures[2].set_exception(exc)
......@@ -1893,8 +1909,44 @@ class ManagerTestCase(unittest.TestCase):
self.assertEquals(fut.result(), 43)
self.assertIs(fut2.exception(), exc)
## (3) reset case
## (3.a) call .process() twice in a row
fut = manager.process(3)
yield from asyncio.sleep(.1)
self.assertFalse(rescheduled[3].done())
fut2 = manager.process(3)
self.assertIs(rescheduled[3].result(), None) # got notification
## (3.b) reset the task
rescheduled[3] = reset[3]()
yield from asyncio.sleep(.1)
self.assertFalse(rescheduled[3].done())
self.assertFalse(fut.done())
self.assertFalse(fut2.done())
## (3.c) reschedule the task (again)
fut3 = manager.process(3)
self.assertIs(rescheduled[3].result(), None) # got notification
## (3.d) reset the task (again)
rescheduled[3] = reset[3]()
yield from asyncio.sleep(.1)
self.assertFalse(rescheduled[3].done())
self.assertFalse(fut.done())
self.assertFalse(fut2.done())
self.assertFalse(fut3.done())
## (3.e) finish the task
futures[3].set_result(44)
yield from asyncio.sleep(.1)
self.assertEqual(fut.result(), 44)
self.assertEqual(fut2.result(), 44)
self.assertEqual(fut3.result(), 44)
self.assertFalse(rescheduled[3].done())
# ShuttingDown case
# (4) ShuttingDown case
# - start task 4 before shutdown
# - start task 5 after shutdown
mexc.reset_mock()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment