Commit 1bb4acf4 authored by BAIRE Anthony's avatar BAIRE Anthony
Browse files

Stream job logs and job state updates to the user

This commit makes several changes.

In the controller:

- duplicates the logs produced by the jobs. Initially they were only
  stored into allgo.log, now they are also forwarded to the container
  output (using the 'tee' command) so that the controller can read

- add a log_task that reads the logs from docker and feeds them into
  the redis db key "log:job:<ID>" (this is implemented with aiohttp
  in order to be fully asynchronous)

- store the job state in a new redis key "state:job:<ID>"

- send notification to the redis pubsub 'notify:aio' channel when
  the job state has changed or when new logs are available

In the allgo.aio frontend:

- implement the /aio/jobs/<ID>/events endpoints which streams all
  job events & logs to the client (using json formatted messages)

In django:

- refactor the JobDetail view and template to update the page
  dynamically for job updates (state/logs)
    - allgo.log is read only when the job is already terminated.
      Otherwise the page uses the /aio/jobs/<ID>/events channel
      to stream the logs
    - the state icon is patched on the page when the state changes,
      except for the DONE state which triggers a full page reload
      (because there are other parts to be updated)
parent 4db47c44
Pipeline #30399 failed with stage
in 1 minute and 31 seconds
......@@ -4,7 +4,7 @@ RUN apt-getq install python3-websocket python3-six python3-requests \
python3-mysqldb python3-sqlalchemy python3-fasteners \
python3-nose python3-coverage libjs-jquery python3-yaml \
python3-termcolor python3-iso8601 python3-docker \
python3-aioredis python3-aiohttp
COPY files/. /
......@@ -15,11 +15,13 @@ import os
import shlex
import signal
import socket
import struct
import sys
import time
import threading
import traceback
import aiohttp
import aioredis
import docker
import iso8601
......@@ -169,6 +171,29 @@ def cascade_future(src, dst):
async def _make_aiohttp_client(host):
"""Create a aiohttp session for connecting to a docker engine
`host` is a docker host url (DOCKER_HOST) it may be either a `tcp://`
or `unix://` url
Return a tuple `(session, url)` where `session` is a aiohttp client
session and `url` is the http(s) url of the docker engine
url = docker.utils.parse_host(host)
mo = re.match(r"http\+unix:/+(/.*)", url)
if mo is None:
# HTTP over TCP
connector = None
# HTTP over unix socket
url = "http://DUMMY.localhost"
path =
connector = aiohttp.UnixConnector(path)
session = aiohttp.ClientSession(connector=connector)
return session, url
def make_aiohttp_client(host):
return asyncio.get_event_loop().run_until_complete(_make_aiohttp_client(host))
class Manager:
"""A class for scheduling asynchronous jobs on a collection of keys
......@@ -1024,16 +1049,16 @@ class JobManager(Manager):
sighnd() {{
echo "==== ALLGO JOB $2 ===="
kill "-$1" "$pid") >>allgo.log 2>&1
kill "-$1" "$pid") 2>&1 | tee -a allgo.log
trap '' TERM ALRM
trap "sighnd TERM ABORT" TERM
trap "sighnd ALRM TIMEOUT" ALRM
mkfifo "$fifo" 2>>allgo.log || exit $?
mkfifo "$fifo" 2>&1 | tee -a allgo.log || exit $?
exec cat <"$fifo" >>allgo.log &
exec cat <"$fifo" | tee -a allgo.log &
exec "$@" >"$fifo" 2>&1 &
......@@ -1059,7 +1084,7 @@ class JobManager(Manager):
if [ "$failcnt" -ne 0 ] ; then
echo "WARNING: memory limit was reached (memory.failcnt=$failcnt)"
) >>allgo.log
) | tee -a allgo.log
exit $code
......@@ -1158,6 +1183,7 @@ class JobManager(Manager):
# hard kill (after 5 seconds)
log_task = asyncio.ensure_future(self._log_task(info))
wait_task = asyncio.async(info.client.wait_async(info.ctr_id))
timeout_task = (asyncio.Future() if info.timeout is None
else asyncio.async(asyncio.sleep(info.timeout)))
......@@ -1175,6 +1201,7 @@ class JobManager(Manager):
with ses.begin():
state, = ses.query(Job.state).filter_by(id=info.job_id).one()
if state == JobState.ABORTING:
yield from self._notif_job_state(info, "ABORTING")
yield from stop(signal.SIGTERM, "user abort")
result = result or JobResult.ABORTED
......@@ -1189,7 +1216,11 @@ class JobManager(Manager):
returncode = wait_task.result()
log.debug("job %d exit code: %r", info.job_id, returncode)
result = result or (JobResult.SUCCESS if returncode==0 else JobResult.ERROR)
# note: we do not cancel the log task on normal termination
# (we let it finish reading the logs but in the background)
......@@ -1205,6 +1236,52 @@ class JobManager(Manager):
# remove container
yield from self.run_in_executor(self._remove_job, info, lock=False)
# task that streams the logs from the docker engine into the redis database
async def _log_task(self, info):
offset = 0
log_key = REDIS_KEY_JOB_LOG % info.job_id
notif_msg = REDIS_MESSAGE_JOB_UPDATED % info.job_id
timeout = 3600 if info.timeout is None else (info.timeout + 60)
if timeout <= 0:
timeout = 600
log.debug("job %d: start log task (timeout %.1f seconds)",
info.job_id, timeout)
async with info.client.aiohttp_session.get(
"%s/containers/%s/logs" % (info.client.aiohttp_url, info.ctr_id),
params={"follow": "1", "stdout": "1", "stderr": "1"},
timeout=timeout) as rep:
while True:
# read the header (8-byte) and extract the chunk size
hdr = await
if not hdr:
size, = struct.unpack("!L", hdr[4:])
#read the chunk
data = await
# store it in redis
await self.ctrl.redis_client.setrange(log_key, offset, data)
await self.ctrl.redis_client.publish(REDIS_CHANNEL_AIO, notif_msg)
offset += len(data)
# send eof
await self.ctrl.redis_client.setrange(log_key, offset, b"\x04")
await self.ctrl.redis_client.publish(REDIS_CHANNEL_AIO, notif_msg)
except Exception as e:
# NOTE: if the redis server goes down, then the log_task is totally
# interrupted
log.exception("job %d: exception in log task", info.job_id)
log.debug("job %d: log task terminated (%r bytes logged)",
info.job_id, offset)
await self.ctrl.redis_client.expire(log_key, 86400)
def _process(self, job_id, reset, rescheduled):
ctrl = self.ctrl
......@@ -1329,6 +1406,7 @@ class JobManager(Manager):
if state == JobState.WAITING:
# job is not yet started
yield from self._notif_job_state(info, "WAITING")
# pull the image to the swarm
if info.ver_id is not None:
......@@ -1340,6 +1418,7 @@ class JobManager(Manager):
with info.client.request_slot(info.ctr_name, info.cpu or 0, info.mem or 0):
info.node_id = yield from info.client.wait_slot(info.ctr_name)
yield from self.run_in_executor(self._create_job, info, lock=False)
yield from self._notif_job_state(info, "RUNNING")
yield from self._finish_job(info, reset)
elif state in (JobState.RUNNING, JobState.ABORTING): # pragma: nobranch
......@@ -1347,6 +1426,26 @@ class JobManager(Manager):
# -> wait for its termination
yield from self._finish_job(info, reset)
yield from self._notif_job_state(info, "DONE")
# send a notification to the aio frontend when the job state is changed
# this function never fail (redis errors are caught and written in the
# logs)
async def _notif_job_state(self, info, state):
try:"notify the frontend: job %d state is now %r",
info.job_id, state)
await self.ctrl.redis_client.setex(REDIS_KEY_JOB_STATE % info.job_id,
86400, state)
await self.ctrl.redis_client.publish(REDIS_CHANNEL_AIO,
except asyncio.CancelledError:
except Exception as e:
log.error("redis notification failed for job %d (%e)",
info.job_id, e)
# NOTE: for the push/pull managers, interruptible=True guarantees that the
# managers terminate immediately, however it cannot guarantee that the
# process will terminate immediately because the ThreadPoolExecuter installs
......@@ -1538,10 +1637,14 @@ class DockerController:
} if dct else None
self.sandbox = SharedSwarmClient(sandbox_host, config=cfg.get("sandbox", {}, dict), alias="sandbox")
self.sandbox.aiohttp_session, self.sandbox.aiohttp_url = (
if sandbox_host == swarm_host:
self.swarm = self.sandbox
self.swarm = SharedSwarmClient(swarm_host, config=cfg.get("swarm", {}, dict), alias="swarm")
self.swarm.aiohttp_session, self.swarm.aiohttp_url = (
self._db_sessionmaker = sqlalchemy.orm.scoping.scoped_session(
......@@ -41,6 +41,7 @@ import json
import logging
import os
import time
import weakref
import asyncio
import aiohttp
......@@ -55,6 +56,27 @@ import config.env
# TODO: return error pages with some content
# redis keys
# job log
REDIS_KEY_JOB_LOG = "log:job:%d"
REDIS_KEY_JOB_STATE = "state:job:%d"
# pubsub channels for waking up allgo.aio (frontend) and the controller
# (ourselves)
REDIS_CHANNEL_AIO = "notify:aio"
REDIS_CHANNEL_CONTROLLER = "notify:controller"
# pubsub messages
log = logging.getLogger("allgo-aio")
def daemonise(fork: bool, pidfile: str):
......@@ -169,6 +191,8 @@ class JsonSeqStreamResponse(StreamResponse):
def send(self, json_object):
self.write(json.dumps(json_object).encode() + b"\n\x1e")
class DoneException(Exception):
async def forward_response(reply: aiohttp.ClientResponse) -> aiohttp.web.Response:
......@@ -193,6 +217,28 @@ class ErrorResponse(Response):
text="%d: %s" % (status, http.HTTPStatus(status).phrase))
class ConditionDict(weakref.WeakValueDictionary):
"""Dictionary storing asyncio conditions variables
This dict & conditions are used for routing the reddis notifications
to the relevant HTTP request handlers.
The dict key identifies a resoursce (eg: job id) and the dict valuse is the
relevant asyncio.Condition for reporting updates to this resource.
Conditions are created on the fly if the key is not found in the dict.
This dictionary is a weakref dict, so that the condition variables are
destroyed when they are no longer used (i.e. nobody is listening to them).
def __getitem__(self, key):
cond = self.get(key)
if cond is None:
cond = self[key] = asyncio.Condition()
return cond
class AllgoAio:
def __init__(self, bind, loop):
self.loop = loop
......@@ -210,10 +256,14 @@ class AllgoAio:
# (not used by the notification task which has its own reddis connection)
self.redis_client = None
# condition dict for job notifications
self.job_conditions = ConditionDict()
# ---------- routes ---------- #
ar =
ar.add_route("*", r"/v2/{repo:.*}/manifests/{tag}", self.handle_image_manifest)
ar.add_route("GET", r"/aio/jobs/{job_id:\d+}/events", self.handle_job_events)
self.handler =, self.port = bind
......@@ -305,17 +355,16 @@ class AllgoAio:
async def redis_notification_listener_task(self):
"""Task for listening for redis notification
- subscribes to 'aio:notify'
- subscribes to REDIS_CHANNEL_AIO
- automatically reconnects to the server (with rate limiting)
async for _ in RateLimiter(60):
# create redis connection and subscribe to the notification channel
conn = await self.create_redis()
sub, = await conn.subscribe("aio:notify")"subscribed to redis pub/sub channel 'aio:notify'")
sub, = await conn.subscribe(REDIS_CHANNEL_AIO)"subscribed to redis pub/sub channel %r" % REDIS_CHANNEL_AIO)
async for msg in sub.iter():"redis notification: %r", msg)
......@@ -325,9 +374,18 @@ class AllgoAio:
except ValueError as e:
log.warning("ignored malformatted notification: %r (%s)", msg, e)
log.warning("ignored notification for unknown item %r", msg)
if item_type == b"job":
cond = self.job_conditions.get(item_id)
if cond is not None:
async with cond:"cond notified: %r", cond)
log.warning("ignored notification for unknown item %r", msg)
except OSError as e:
log.error("I/O error in the redis listener (%s)", e)
except asyncio.CancelledError:"notification task terminated")
......@@ -394,3 +452,107 @@ class AllgoAio:
return await forward_response(registry_reply)
return await forward_response(django_reply)
async def handle_job_events(self, request):
"""Channel for monitoring job events
This HTTP endpoint streams a sequence json objects describing events
related to a job.
Currently two events are defined:
- state update (when the state of the job is updated)
{"state": "<NEW_STATE>"}
- new logs (when content is appended to the job logs)
{"logs": "<CONTENT>"}
job_id = int(request.match_info["job_id"])
offset = int(request.url.query.get("offset", 0))
except ValueError:
return Response(status=400)
log_key = REDIS_KEY_JOB_LOG % job_id
state_key = REDIS_KEY_JOB_STATE % job_id
state = None
cond = self.job_conditions[job_id]
# query the django server to have the job details and ensure this user
# is allowed to view this job (thanks to the "Cookie" header)
headers = prepare_headers(request)
headers["Accept"] = "application/json"
async with self.django_request("GET", "/jobs/%d/" % job_id,
headers=headers, allow_redirects=False) as rep:
if rep.status != 200:
# FIXME: django should be able to return 401 directly
return Response(status=401 if rep.status==302 else rep.status)
state = (await rep.json())["state"].encode()
rep = JsonSeqStreamResponse()
await rep.prepare(request)
rep.send({"state": state.decode()})
await rep.drain()
# poll the redis db and forward the data to the http response
# return true if something was read
async def poll():
nonlocal state, offset
if offset is None and state == b"DONE":
raise DoneException()
updated = False
# poll state change
new_state = await self.redis_client.get(state_key)
if new_state != state:"job %d state updated %s -> %s", job_id, state, new_state)
# note: null state is possible if the redis entry does not
# exists yet
if new_state:
rep.send({"state": new_state.decode()})
state = new_state
updated = True
# poll new logs
if offset is not None:
data = await self.redis_client.getrange(log_key, offset, offset+8191)"job %d log read %d bytes at offset %d: %r",
job_id, len(data), offset, data[:40])
if data:
if data[-1] == 0x04:
data = data[:-1]
offset = None
offset += len(data)
rep.send({"logs": data.decode(errors="replace")})
updated = True
if updated:
await rep.drain()
return updated
while True:
async with cond:
if not await poll():
await cond.wait()
while await poll():
except (DoneException, asyncio.CancelledError):
except Exception:
log.exception("exception in handle_job_log(job_id=%d)", job_id)
return rep
......@@ -58,14 +58,3 @@ def upload_data(file_obj, job_id):
for chunk in file_data.chunks():
def read_runner_logs(jobid):
redis_host = config.env.ALLGO_REDIS_HOST
r = redis.StrictRedis(host=redis_host, port=6379, db=0)
linenumber = 0
k = "job:log:%s:%s" % (jobid, linenumber)
log.debug("Runner read logs k %s - %s", k, r.exists(k))
while r.exists(k):
yield r.get(k)
linenumber += 1
k = "job:log:%s:%s" % (jobid, linenumber)
import json
from django import template
from django.utils.safestring import mark_safe
from misaka import Markdown, HtmlRenderer
......@@ -66,6 +68,9 @@ def status_icon(obj):
if isinstance(obj, Job):
return mark_safe(render_job_status(obj.status))
elif obj == "job-status-dict":
return json.dumps({status: render_job_status(status)
raise TypeError(type(obj))
......@@ -25,7 +25,7 @@ from django.contrib.auth.mixins import LoginRequiredMixin
from django.contrib.auth.models import User
from django.contrib.messages.views import SuccessMessageMixin
from django.core.urlresolvers import reverse
from django.http import HttpResponse
from django.http import HttpResponse, JsonResponse
from django.shortcuts import render, get_object_or_404
from django.urls import reverse, reverse_lazy
from django.utils.crypto import get_random_string
......@@ -64,7 +64,8 @@ from .forms import (
from .helpers import get_ssh_data, read_runner_logs, upload_data
from .helpers import get_ssh_data, upload_data
from .templatetags.converters import status_icon
# Start logger
......@@ -502,24 +503,24 @@ class JobDetail(LoginRequiredMixin, DetailView):
job = Job.objects.get(
dirname = os.path.join(settings.DATASTORE, str(
# Read the Redis database if job is new or running or
# read the `allgo.log` file if job is done
if job.state in (Job.NEW, Job.WAITING, Job.RUNNING, Job.ABORTING):
logs = read_runner_logs(
if job.state == Job.DONE:
# job is done
# -> read the `allgo.log` file
log_file = os.path.join(dirname, 'allgo.log')
if os.path.isfile(log_file):
with open(log_file, 'r', errors="replace") as log_data:
logs =
logs = 'Can\'t load the log file'
log.error("Log file doesn't exists (job #%d)",
except OSError as e:
logs = '(logs not available)'
log.error("Log file not available for job #%d (%s)",, e)
# job is pending
# -> logs will be streamed (ajax request)
logs = ""
kwargs['logs'] = logs
# Get the files and some metadata such as the webapp version
webapp = Webapp.objects.get(docker_name=self.object.webapp.docker_name)
webapp_version = WebappVersion.objects.get(webapp=webapp)
kwargs['webapp_version'] = webapp_version
if os.path.exists(dirname):
# put all files in a list
kwargs['files'] = [os.path.basename(x) for x in glob.glob(os.path.join(dirname, '*'))]
......@@ -527,6 +528,18 @@ class JobDetail(LoginRequiredMixin, DetailView):
kwargs['files'] = []
return super().get_context_data(**kwargs)
def render_to_response(self, context, **kwargs):
if self.request.META.get("HTTP_ACCEPT") == "application/json":
job = context["job"]
return JsonResponse({
"state": job.get_state_display(),
"status": job.status,
"rendered_status": status_icon(job),
"exec_time": job.exec_time,
return super().render_to_response(context, **kwargs)
class JobCreate(SuccessMessageMixin, LoginRequiredMixin, CreateView):
"""Save a job into the database
......@@ -95,14 +95,15 @@
<td class="text-muted">Status</td>
<td>{{ job | status_icon }}</td>
<td id="job-status">{{ job | status_icon }}</td>
<div class="col-sm">
<h5 class="card-title">Logs</h5>
<pre><code class="lang-json">{{ logs }}</code></pre>
<pre style="overflow: auto; height: 40em;"
><code class="lang-console" id="job-logs">{{ logs }}</code></pre>
......@@ -112,4 +113,50 @@
{% block javascript %}
{{ block.super }}
<script defer src="{% static 'js/tooltip.js' %}"></script>
{% if job.state != job.DONE %}
<script src="{% static 'js/json_seq_events.js' %}"></script>
JOB_STATUS_DICT = {{ 'job-status-dict' | status_icon | safe }};
function escapeHtml(text) {
return text.replace(/[\"&<>]/g, function (a) {
return { '"': '&quot;', '&': '&amp;', '<': '&lt;', '>': '&gt;' }[a];
json_seq_event_listener("/aio/jobs/{{ }}/events",
function(msg) {
console.log("message:", msg);
if (msg.state == "DONE") {
// job finished
// Note: we refresh the job page because:
// - the logs may not be complete (need to reload allgo.log)
// - the list of output files has to be updated
// - some parameters (eg: execution time) have to be updated