Commit 56ae85bf authored by BAIRE Anthony's avatar BAIRE Anthony
Browse files

Add the job_list events channel

This updates the job_list page in real time when the state of any
displayed job changes:
- update the status icon
- show/hide the abort button
- enable/disable the delete button
- remove job from the list when destroyed
parent d4ac4721
......@@ -270,7 +270,7 @@ class AllgoAio:
# (not used by the notification task which has its own reddis connection)
self.redis_client = None
# condition dict for job & webapp notifications
# state dicts for job & webapp notifications
# key is the job_id/webapp_id
self.webapp_states = StatesDict()
......@@ -279,11 +279,15 @@ class AllgoAio:
# mutex to be locked when adding new job_states entries
self.job_states_create_lock = asyncio.Lock()
# global condition notified when any job is updated
self.job_list_condition = asyncio.Condition()
# ---------- routes ---------- #
ar =
ar.add_route("*", r"/v2/{repo:.*}/manifests/{tag}", self.handle_image_manifest)
ar.add_route("GET", r"/aio/jobs/{job_id:\d+}/events", self.handle_job_events)
ar.add_route("GET", r"/aio/jobs/events", self.handle_job_list_events)
ar.add_route("GET", r"/aio/apps/{docker_name}/events", self.handle_webapp_events)
self.handler =
......@@ -425,6 +429,8 @@ class AllgoAio:
if item_type == b"job":
await self.update_job_state(item_id)
async with self.job_list_condition:
elif item_type == b"webapp":
webapp = self.webapp_states.get(item_id)
if webapp is not None:
......@@ -609,6 +615,125 @@ class AllgoAio:
log.exception("exception in handle_job_events(job_id=%d)", job_id)
return rep
async def handle_job_list_events(self, request):
"""Channel for monitoring job list events
This HTTP endpoint streams a sequence json objects describing events
related to a list of jobs.
The job ids are provided in the query string (parameter 'id')
Currently only one event is defined:
- state update sent when the state of the job is updated (note: the
result is provided only with the 'DONE' state)
{"id:" <JOB_ID>,
"state": "<NEW_STATE>",
[ "result": "<RESULT>" ],
job_ids = {int(x) for x in request.url.query.getall("id")}
# limit the number of jobs in a single request
if len(job_ids) > 20:
raise ValueError()
except ValueError:
return Response(status=400)
# query the django server to have the current state of each job and
# ensure this user is allowed to view these jobs (thanks to the
# "Cookie" header)
# state dict (key: job id, value: state)
states = {}
results = {}
for job_id in job_ids:
headers = prepare_headers(request)
headers["Accept"] = "application/json"
async with self.django_request("GET", "/jobs/%d/" % job_id,
headers=headers, allow_redirects=False) as rep:
if rep.status == 200:
js = await rep.json()
states[job_id] = js["state"]
results[job_id] = js["result"]
elif rep.status == 404:
states[job_id] = "DELETED"
# FIXME: django should be able to return 401 directly
return Response(status=401 if rep.status==302 else rep.status)
async with self.job_states_create_lock:
jobs = {job_id: self.job_states[job_id] for job_id in job_ids}
rep = JsonSeqStreamResponse()
await rep.prepare(request)
def send_state_update(job_id, state, result):
msg = {"id": job_id, "state": state}
if state == "DONE":
msg["result"] = result or "NONE"
def remove_deleted_jobs():
for job_id in [job_id for job_id, state in states.items()
if state in FINAL_STATES]:
del states[job_id]
del jobs[job_id]"removed job %d from the watch list", job_id)"current states: %r", states)
for job_id, state in states.items():
send_state_update(job_id, state, results.get(job_id))
del results
await rep.drain()
# poll the redis db and forward the data to the http response
# return true if something was read
def poll():
if not states:
raise DoneException()
# poll state changes
updated = False
for job_id, state in states.items():
new_state = jobs[job_id].state
if new_state != state:"job %d state updated %s -> %s", job_id, state, new_state)
# note: null state is possible if the redis entry does not
# exists yet
if new_state:
send_state_update(job_id, new_state, jobs[job_id].result)
states[job_id] = new_state
updated = True
return updated
while True:
async with self.job_list_condition:
if not poll():
await self.job_list_condition.wait()
# ensure the output buffer is flushed before polling for new
# data (but do it when the condition is not locked)
await rep.drain()
except (DoneException, asyncio.CancelledError):
except Exception:
log.exception("exception in handle_job_events(job_id=%d)", job_id)
return rep
async def handle_webapp_events(self, request):
"""Channel for monitoring events related to a webapp events
......@@ -540,7 +540,7 @@ class JobDetail(LoginRequiredMixin, DetailView):
return JsonResponse({
"state": job.get_state_display(),
"status": job.status,
"result": job.get_result_display(),
"rendered_status": status_icon(job),
"exec_time": job.exec_time,
......@@ -27,16 +27,17 @@
{% for job in job_list %}
<tr id="job-{{}}">
<th scope="row"><a href="{% url 'main:job_detail' %}">{{ }}</a></th>
<td>{{ job.created_at | naturalday }}</td>
<td><a href="{% url 'main:webapp_detail' job.webapp.docker_name %}">{{ | fancy_webapp_name }}</a></td>
<td>{{ job.version }}</td>
<td>{{ job.param }}</td>
<td>{{ }}</td>
<td class="text-center">{{ job | status_icon }}</td>
<td class="text-center" id="job-status-{{}}">{{ job | status_icon }}</td>
<td class="text-center">
<form style="display: inline" method="post" action="{% url 'main:job_abort' %}"
{% if job.state != job.RUNNING %} class="invisible" {% endif %}
{% csrf_token %}
......@@ -51,6 +52,7 @@
<a href="{% url 'main:job_delete' %}"
title="Delete this job"
......@@ -91,4 +93,38 @@
{% block javascript %}
{{ block.super }}
<script defer src="{% static 'js/tooltip.js' %}"></script>
<script src="{% static 'js/json_seq_events.js' %}"></script>
JOB_STATUS_DICT = {{ 'job-status-dict' | status_icon | safe }};
json_seq_event_listener("/aio/jobs/events?{% for job in job_list %}id={{}}&{% endfor %}",
function(msg) {
console.log("message:", msg);
if (msg == undefined) {
console.log("events channel closed (EOF from server)");
if (msg.state) {
if (["DELETED", "ARCHIVED"].includes(msg.state)) {
// job deleted
var row = $("tr#job-" +;
} else {
// job status updated
$("td#job-status-" +[
(msg.state != "DONE") ? msg.state : msg.result]);
// display abort button
$("#job-abort-" +"invisible", (msg.state != "RUNNING"))
// disable delete button
$("#job-delete-" +"disabled",
["ABORTING", "RUNNING"].includes(msg.state));
function(status, msg) {
console.log("error:", status, msg);
{% endblock %}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment