Commit 7b91c3e5 authored by BAIRE Anthony's avatar BAIRE Anthony
Browse files

propagate the exit code of the container in wait_async()

parent 9cf06220
......@@ -32,6 +32,9 @@ _UNITS = collections.OrderedDict([
log=logging.getLogger("shared_swarm")
def _parse_exitcode(txt):
if txt is not None:
return int(txt)
def _parse_sockaddr(txt, *, default_host):
mo = re.match(r"(?:([^:]+):)?(\d+)\Z", str(txt))
......@@ -367,9 +370,10 @@ class _SlaveResourceManager(_ResourceManager):
def _dispatch_event(self, event):
if event.get("status") == "die":
cid = event["id"]
code = _parse_exitcode(event["Actor"]["Attributes"].get("exitCode"))
fut = self._waiters.get(cid)
if fut is not None and not fut.done():
self._loop.call_soon_threadsafe(fut.set_result, None)
self._loop.call_soon_threadsafe(fut.set_result, code)
@asyncio.coroutine
def wait(self, container_id):
......@@ -384,10 +388,14 @@ class _SlaveResourceManager(_ResourceManager):
self._waiters[container_id] = fut = asyncio.Future()
try:
if self.client.inspect_container(container_id)["State"]["Running"]:
ctr = self.client.inspect_container(container_id)
if ctr["State"]["Running"]:
self.log.debug("wait for container: %s", container_id)
yield from fut
returncode = yield from fut
else:
returncode = ctr["State"]["ExitCode"]
self.log.debug("container terminated: %s", container_id)
return returncode
except docker.errors.NotFound:
self.log.debug("container not found: %s", container_id)
raise KeyError(container_id)
......@@ -891,13 +899,15 @@ class _MasterResourceManager(_ResourceManager):
else:
# detect terminated containers
if ctr["State"] not in ("running", "created"):
self._container_cleanup(cid, removed=False)
mo = re.match("Exited \((\d+)\)", ctr["Status"])
self._container_cleanup(cid, removed=False,
returncode=(mo and int(mo.group(1))))
# flush removed containers
# FIXME: ignore disappearring containers if their node is unhealthy
for cid in unseen:
self._container_cleanup(cid, removed=True)
self._container_cleanup(cid, None, removed=True)
self._refresh_group_slots()
......@@ -1033,7 +1043,7 @@ class _MasterResourceManager(_ResourceManager):
slot.id = container_id
self._created[container_id] = slot
def _container_cleanup(self, container_id, *, removed):
def _container_cleanup(self, container_id, returncode, *, removed):
self.log.debug("container event: %s %s", ("removed" if removed else "terminated"), container_id)
assert container_id
......@@ -1041,7 +1051,7 @@ class _MasterResourceManager(_ResourceManager):
slot = self._created.get(container_id)
if slot is not None:
# report container terminated
self._set_futures(slot.fut_terminated)
self._set_futures(slot.fut_terminated, result=returncode)
if removed:
# container removed
......@@ -1114,6 +1124,7 @@ class _MasterResourceManager(_ResourceManager):
return
cid = event["id"]
name = event["Actor"]["Attributes"]["name"]
code = _parse_exitcode(event["Actor"]["Attributes"].get("exitCode"))
with self._lock_events:
......@@ -1124,11 +1135,11 @@ class _MasterResourceManager(_ResourceManager):
# container terminated
elif status == "die":
self._container_cleanup(cid, removed=False)
self._container_cleanup(cid, code, removed=False)
# container removed
elif status == "destroy":
self._container_cleanup(cid, removed=True)
self._container_cleanup(cid, code, removed=True)
#TODO: monitor "update" events too ? (in case cpu/ram changes)
......@@ -1156,10 +1167,14 @@ class _MasterResourceManager(_ResourceManager):
try:
# FIXME: should track 'start' events too (to avoid inspecting the container here)
if self.client.inspect_container(container_id)["State"]["Running"]:
ctr = self.client.inspect_container(container_id)
if ctr["State"]["Running"]:
self.log.debug("wait for container: %s", container_id)
yield from slot.fut_terminated
returncode = yield from slot.fut_terminated
else:
returncode = ctr["State"]["ExitCode"]
self.log.debug("container terminated: %s", container_id)
return returncode
except docker.errors.NotFound:
self.log.debug("container not found: %s", container_id)
raise KeyError(container_id)
......@@ -1207,12 +1222,15 @@ class SharedSwarmClient(swarm_abstraction.Client):
# FIXME: shoud accept container name or partial id
# FIXME: may have a race condition if called just at startup (before refresh() is completed)
# return the exit code of the container or None if unknown (eg. because
# container does not exist)
@asyncio.coroutine
def wait_async(self, cid):
self.__manager.log.debug("wait_async %s", cid)
try:
yield from self.__manager.wait(cid)
self.__manager.log.debug("wait_async %s -> done", cid)
returncode = yield from self.__manager.wait(cid)
self.__manager.log.debug("wait_async %s -> done (exit %r)", cid, returncode)
return returncode
except BaseException as e:
self.__manager.log.debug("wait_async %s -> %r", cid, e)
raise
......
......@@ -37,7 +37,7 @@ class NotReachable(Exception):
pass
class ContainerState(shared_swarm._InitReprMixin):
__slots__ = "node", "cpu", "mem", "state"
__slots__ = "node", "cpu", "mem", "state", "code"
@contextlib.contextmanager
def temp_file(content):
......@@ -162,6 +162,8 @@ class Scenario:
return lambda func: stack.enter_context(mock.patch.object(
cls, func.__name__, func))
# TODO: write integration tests to ensure that the behaviour of these functions
# is consistent with docker.Client()
@patch_method(BaseClient)
def events(client, *, filters):
self.tc.assertEqual(filters, {"event": ["create", "die", "destroy"]})
......@@ -195,7 +197,8 @@ class Scenario:
def containers(client, *, all=False):
self.tc.assertTrue(all)
return [{"Id": generic_id(name), "Names": ["/%s/%s" % (state.node, name)],
"State": state.state,
"State": state.state,
"Status": (("Exited (%d) FOO" % state.code) if state.state=="exited" else "FOO"),
} for name, state in self.containers.items()]
@patch_method(BaseClient)
......@@ -208,7 +211,11 @@ class Scenario:
raise_docker_NotFound()
return dict(Id=cid, Name="/%s" % name, Node={"Name": state.node, "ID":
generic_id(state.node)},
State={"Running": state.state == "running"},
State={"Running": state.state == "running",
# NOTE: ExitCode is always an int (0 if unset) but we force an invalid
# value here to ensure that the ExitCode is used only when the
# container has exited
"ExitCode": "<<TEST_UNSET>>" if state.code is None else state.code},
HostConfig=dict(CpuShares=state.cpu, Memory=state.mem))
# override .info() at startup so that the swarm contains 0 nodes
......@@ -353,25 +360,37 @@ class Scenario:
do_create(node)
self._ensure_tasks_running()
def event_terminate(name):
"""Simulate the termination of a container"""
def event_terminate(name, *, code=0):
"""Simulate the termination of a container
code: exit code of the container
"""
assert isinstance(code, int)
self._ensure_tasks_running()
self.containers[name].state = "exited"
self._send_event("die", name)
self._run_loop(self._wait_task("wait", name, ignore_missing=False),
self.containers[name].code = code
self._send_event("die", name, Actor={"Attributes":{"exitCode": str(code)}})
actual_code = self._run_loop(self._wait_task("wait", name, ignore_missing=False),
timeout=PERIOD)
self.tc.assertEqual(actual_code, code)
def event_destroy(name):
"""Simulate the removal of a container"""
def event_destroy(name, *, code=None):
"""Simulate the removal of a container
code is the expected exit code of the process
(in case the wait task is still active (i.e. no terminated event
were received))
"""
self._ensure_tasks_running()
del self.containers[name]
self._send_event("destroy", name)
self._run_loop(self._wait_task("wait", name, ignore_missing=True),
actual_code = self._run_loop(self._wait_task("wait", name, ignore_missing=True),
timeout=PERIOD)
self.tc.assertEqual(actual_code, code)
def event_refresh(*, nodes):
"""Trigger a refresh event (in the refresh thread)"""
......@@ -402,14 +421,15 @@ class Scenario:
if throw is None:
self.tc.assertEqual(actual_node_id, generic_id(node))
def event_terminated(name, *, event=True):
def event_terminated(name, *, code=None):
"""Expect the termination of a container
This event is observed when a container terminates and when this
termination is not caused by a event_terminate() stimulus.
"""
self._run_loop(self._wait_task("wait", name, ignore_missing=False),
actual_code = self._run_loop(self._wait_task("wait", name, ignore_missing=False),
timeout=PERIOD)
self.tc.assertEqual(actual_code, code)
func = locals().get("event_" + event)
......@@ -530,7 +550,7 @@ class SharedSwarmClientTestCase(unittest.TestCase):
sc("refresh", nodes={"n1":{"mem":5*G}, "n2":{"cpu":8}})
sc("create", "n2", cpu=1, mem=1*G)
sc("created", "n2", node="n1")
sc("terminate", "n2")
sc("terminate", "n2", code=42)
sc("destroy", "n2")
sc("destroy", "h1")
......@@ -636,8 +656,9 @@ class SharedSwarmClientTestCase(unittest.TestCase):
# container n1 destroyed but notification not received
sc.containers["n0"].state = "exited"
sc.containers["n0"].code = 43
sc("refresh", nodes={"n1":{}})
sc("terminated","n0")
sc("terminated","n0", code=43)
sc("destroy", "n0")
def test_scenario_refresh_job_destroyed(self):
......@@ -790,12 +811,12 @@ class SharedSwarmClientTestCase(unittest.TestCase):
sc("create", "h0", cpu=1, mem=1*G)
sc("created", "h0", node="n1")
sc("terminate", "h0")
sc("terminate", "h0", code=44)
# recreate the wait task
sc._create_task("wait", "h0", sc.client.wait_async(generic_id("h0")))
# -> must return immediately
sc("terminated","h0")
sc("terminated","h0", code=44)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment