Commit faad2ba6 authored by BAIRE Anthony's avatar BAIRE Anthony
Browse files

stream the sandbox state to the client

parent 46975d57
Pipeline #31885 failed with stage
in 6 minutes and 12 seconds
......@@ -1012,6 +1012,8 @@ exec /.toolbox/bin/sshd -D
ses.execute("UPDATE dj_webapps SET sandbox_state=%d WHERE id=%d AND sandbox_state=%d" %
(next_state, webapp_id, webapp.sandbox_state))
yield from self.ctrl.notif_webapp_updated(webapp_id)
log.debug("done sandbox %d", webapp_id)
......@@ -1949,6 +1951,22 @@ class DockerController:
log.exception("error in the redis notification loop")
async def notif_webapp_updated(self, webapp_id):
"""Send a notification to the aio frontend when a webapp is updated
this function never fails (redis errors are caught and written in
the logs)
try:"notify the frontend: webapp %d updated", webapp_id)
await self.redis_client.publish(REDIS_CHANNEL_AIO,
except asyncio.CancelledError:
except Exception as e:
log.error("redis notification failed for webapp %d (%e)",
webapp_id, e)
def shutdown(self):
if not self._shutdown_requested.done():
......@@ -256,14 +256,18 @@ class AllgoAio:
# (not used by the notification task which has its own reddis connection)
self.redis_client = None
# condition dict for job notifications
# condition dict for job & webapp notifications
# key is the job_id/webapp_id
self.job_conditions = ConditionDict()
self.webapp_conditions = ConditionDict()
# ---------- routes ---------- #
ar =
ar.add_route("*", r"/v2/{repo:.*}/manifests/{tag}", self.handle_image_manifest)
ar.add_route("GET", r"/aio/jobs/{job_id:\d+}/events", self.handle_job_events)
ar.add_route("GET", r"/aio/apps/{docker_name}/events", self.handle_webapp_events)
self.handler =, self.port = bind
......@@ -380,12 +384,15 @@ class AllgoAio:
if item_type == b"job":
cond = self.job_conditions.get(item_id)
if cond is not None:
async with cond:"cond notified: %r", cond)
elif item_type == b"webapp":
cond = self.webapp_conditions.get(item_id)
log.warning("ignored notification for unknown item %r", msg)
cond = None
if cond is not None:
async with cond:
except OSError as e:
log.error("I/O error in the redis listener (%s)", e)
......@@ -498,7 +505,6 @@ class AllgoAio:
state = (await rep.json())["state"].encode()
rep = JsonSeqStreamResponse()
await rep.prepare(request)
rep.send({"state": state.decode()})
......@@ -559,6 +565,72 @@ class AllgoAio:
except (DoneException, asyncio.CancelledError):
except Exception:
log.exception("exception in handle_job_log(job_id=%d)", job_id)
log.exception("exception in handle_job_events(job_id=%d)", job_id)
return rep
async def handle_webapp_events(self, request):
"""Channel for monitoring events related to a webapp events
The response is a streamed event sequence:
- state update (when the state of the sandbox is updated)
{"sandbox_state": "<NEW_STATE>"}
docker_name = request.match_info["docker_name"]
# return a dict (on success) or an HTTP error code (on error)
async def get_webapp_details():
headers = prepare_headers(request)
headers["Accept"] = "application/json"
async with self.django_request("GET", "/app/%s/" % docker_name,
headers=headers, allow_redirects=False, timeout=1) as rep:
if rep.status != 200:
# FIXME: django should be able to return 401 directly
return 401 if rep.status==302 else rep.status
return await rep.json()
# query the django server to have the job details and ensure this user
# is allowed to view this job (thanks to the "Cookie" header)
details = await get_webapp_details()
if isinstance(details, int):
# error
return Response(status=details)
webapp_id = details["id"]
state = details["sandbox_state"]
cond = self.webapp_conditions[webapp_id]
rep = JsonSeqStreamResponse()
await rep.prepare(request)
rep.send({"sandbox_state": state})
async def poll():
nonlocal state
# poll state change
details = await get_webapp_details()
if isinstance(details, int):
log.error("handle_webapp_events(webapp_id=%d) got error %d from django",
webapp_id, details)
raise DoneException()"webapp details %r", details)
new_state = details["sandbox_state"]
if new_state != state:"webapp %d state updated %s -> %s", webapp_id, state, new_state)
rep.send({"sandbox_state": new_state})
state = new_state
while True:
async with cond:
await poll()
await cond.wait()
except asyncio.CancelledError:
except Exception:
log.exception("exception in handle_webapp_events(webapp_id=%d)", webapp_id)
return rep
......@@ -183,6 +183,18 @@ class WebappDetail(DetailView):
return context
def render_to_response(self, context, **kwargs):
if self.request.META.get("HTTP_ACCEPT") == "application/json":
# json variant of the application details
# (used by the /aio/apps/<DOCKER_NAME>/events endpoint)
webapp = context["webapp"]
return JsonResponse({
"sandbox_state": webapp.get_sandbox_state_display(),
return super().render_to_response(context, **kwargs)
class UserWebappList(ListView):
"""List of user's webapp
......@@ -554,6 +566,8 @@ class JobDetail(LoginRequiredMixin, DetailView):
def render_to_response(self, context, **kwargs):
if self.request.META.get("HTTP_ACCEPT") == "application/json":
# json variant of the job details
# (used by the /aio/jobs/<ID>/events endpoint)
job = context["job"]
return JsonResponse({
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment