Commit 2d52ceca authored by BAIRE Anthony's avatar BAIRE Anthony
Browse files

on shutdown: do not wait for pending pull/push operations

parent 4cb00b7b
......@@ -231,12 +231,13 @@ class Manager:
class _Handle:
__slots__ = "key", "cur", "nxt"
def __init__(self, nb_tokens=1, *, executor = default_executor):
def __init__(self, nb_tokens=1, *, executor = default_executor, interruptible = False):
# {key: _TaskHandler}
self._handles = {}
self._semaphore = asyncio.Semaphore(nb_tokens)
self._shutdown = None
self._shutdown = asyncio.Future()
self._executor = executor
self._interruptible = interruptible
def _create_task(self, hnd):
assert hnd.nxt is None
......@@ -257,7 +258,7 @@ class Manager:
returns an asyncio.Future (that will provide the result of ._process())
"""
if self._shutdown is not None:
if self._shutdown.done():
return self._shutdown
hnd = self._handles.get(key)
......@@ -310,7 +311,7 @@ class Manager:
ShuttingDown()
"""
ctx = yield from iter(self._semaphore)
if self._shutdown:
if self._shutdown.done():
with ctx:
raise ShuttingDown()
return ctx
......@@ -323,12 +324,19 @@ class Manager:
This function locks the internal semaphore and runs the provided
functionc call in a separate thread (using the executor)
"""
run = asyncio.get_event_loop().run_in_executor
def run():
coro = asyncio.get_event_loop().run_in_executor(self._executor, *k)
if self._interruptible:
return next(asyncio.as_completed((coro, self._shutdown)))
else:
return coro
if lock:
with (yield from self):
return (yield from run(self._executor, *k))
return (yield from run())
else:
return (yield from run(self._executor, *k))
return (yield from run())
def _process(self, key, reset):
......@@ -351,17 +359,19 @@ class Manager:
This coroutine terminates once all tasks are properly terminated.
"""
self._shutdown = asyncio.Future()
self._shutdown.set_exception(ShuttingDown())
exc = ShuttingDown()
self._shutdown.set_exception(exc)
self._shutdown.exception() # to avoid asyncio warnings
# cancel all 'next' tasks
for hnd in self._handles.values():
if hnd.nxt is not None:
hnd.nxt.set_exception(self._shutdown)
hnd.nxt.set_exception(exc)
hnd.nxt = None
yield from asyncio.gather(*(h.cur for h in self._handles.values() if h.cur is not None),
return_exceptions=True)
if not self._interruptible:
yield from asyncio.gather(
*(h.cur for h in self._handles.values() if h.cur is not None),
return_exceptions=True)
class SandboxManager(Manager):
......@@ -1138,10 +1148,19 @@ class JobManager(Manager):
with semctx:
yield from self._finish_job(info)
# NOTE: for the push/pull managers, interruptible=True guarantees that the
# managers terminate immediately, however it cannot guarantee that the
# process will terminate immediately because the ThreadPoolExecuter installs
# a atexit handler that joins all the background threads.
#
# Anyway this is not a big issue since all pending push/pull raise
# ShuttingDown immediately, thus we won't end up with a sandbox/job
# in an inconsistent state when SIGKILL arrives.
#
class PullManager(Manager):
def __init__(self, nb_threads, client, name):
super().__init__(nb_threads)
super().__init__(nb_threads, interruptible=True)
self.client = client
self.name = name
......@@ -1153,7 +1172,7 @@ class PullManager(Manager):
class PushManager(Manager):
def __init__(self, nb_threads, ctrl):
super().__init__(nb_threads)
super().__init__(nb_threads, interruptible=True)
self.ctrl = ctrl
def _process(self, version_id, reset):
......
......@@ -111,4 +111,10 @@ def main():
console_handler.setLevel(logging.INFO)
log.info("---- docker controller terminated ----")
# NOTE: this is a crude hack to terminate the process immediatly,
# without joining the background threads (which may still be running
# because of the PushManager/PullManager implementation)
import concurrent.futures.thread
concurrent.futures.thread._threads_queues.clear()
main()
......@@ -265,11 +265,11 @@ class ControllerTestCase(unittest.TestCase):
self.reset()
# run the controller in a separate thread
self.ctrl = self.create_controller()
def run():
self._loop = asyncio.new_event_loop()
self._loop.add_signal_handler = lambda *k: None
asyncio.set_event_loop(self._loop)
self.ctrl = self.create_controller()
self.ctrl.run()
......@@ -1430,7 +1430,7 @@ class ControllerTestCase(unittest.TestCase):
class ManagerTestCase(unittest.TestCase):
def check_manager(self, nb_threads, events):
def check_manager(self, nb_threads, events, *, cleanup_dead_conditions=False):
"""Test the manager class with a sequence of events
expect is a list of events:
......@@ -1452,6 +1452,7 @@ class ManagerTestCase(unittest.TestCase):
# {id: Condition}
conditions = {}
dead_conditions = []
resets = {}
def worker(id):
......@@ -1475,7 +1476,7 @@ class ManagerTestCase(unittest.TestCase):
with lock:
self.assertIn(id, conditions)
del conditions[id]
dead_conditions.append(conditions.pop(id))
del resets[id]
def finish(id):
......@@ -1524,7 +1525,11 @@ class ManagerTestCase(unittest.TestCase):
manager.shutdown()
self.assertListEqual(actual_lst, expect_lst)
if cleanup_dead_conditions:
with lock:
for cond in dead_conditions:
cond.notify()
def test_manager_single(self):
......@@ -1708,6 +1713,24 @@ class ManagerTestCase(unittest.TestCase):
process 2
""")
init = controller.Manager.__init__
with mock.patch("controller.Manager.__init__", lambda *k: init(*k, interruptible=True)):
self.check_manager(2, """
process 1
start 1
process 1
process 2
start 2
process 3
process 4
shutdown
terminated
exc 1
exc 2
exc 3
exc 4
""", cleanup_dead_conditions=True)
def test_manager_notimpl(self):
self.assertRaises(NotImplementedError, controller.Manager(1)._process, 1, None)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment