Commit e5c0a544 authored by GARNIER Laurent's avatar GARNIER Laurent
Browse files

Merge branch 'api-job-events' into 'django'

add api endpoint to stream job state updates and logs


See merge request !208
parents 28e10237 39e009c9
Pipeline #136434 failed with stages
in 1 second
......@@ -167,7 +167,8 @@ class RateLimiter:
class JsonSeqStreamResponse(StreamResponse):
"""aiohttp response class for streaming json objects
The objects are streamed in the application/json-seq format (RFC 7464).
The stream is line-delimited, i.e: a newline <LF> character is inserted between each JSON
object (and the objects are guaranteed not to contain any newline)
resp = JsonSeqStreamResponse()
......@@ -180,14 +181,11 @@ class JsonSeqStreamResponse(StreamResponse):
def __init__(self, *k, **kw):
super().__init__(*k, **kw)
self.content_type = "application/json-seq"
self.content_type = "application/json"
self.charset = "utf-8"
async def prepare(self, request):
prepared = self.prepared
await super().prepare(request)
if not prepared:
# send periodic nop messages (empty dict) to keep the connection alive
......@@ -200,7 +198,7 @@ class JsonSeqStreamResponse(StreamResponse):
loop = asyncio.get_event_loop()
def keepalive_cb():
except StopIteration:
# it seems that self.write raises StopIteration when the
# connection is closed
......@@ -209,7 +207,7 @@ class JsonSeqStreamResponse(StreamResponse):
loop.call_later(KEEPALIVE_INTERVAL, keepalive_cb)
def send(self, json_object):
self.write(json.dumps(json_object).encode() + b"\n\x1e")
self.write(json.dumps(json_object).encode() + b"\n")
class DoneException(Exception):
......@@ -308,6 +306,7 @@ class AllgoAio:
rtr.add_route("GET", r"/aio/jobs/{job_id:\d+}/events", self.handle_job_events)
rtr.add_route("GET", r"/aio/jobs/events", self.handle_job_list_events)
rtr.add_route("GET", r"/aio/apps/{docker_name}/events", self.handle_webapp_events)
rtr.add_route("GET", r"/api/v1/jobs/{job_id:\d+}/events",self.handle_job_events)
self.handler =, self.port = bind
......@@ -535,23 +534,32 @@ class AllgoAio:
async def handle_job_events(self, request):
"""Channel for monitoring job events
This HTTP endpoint streams a sequence json objects describing events
related to a job.
Currently two events are defined:
- state update (when the state of the job is updated)
{"state": "<NEW_STATE>"}
This function a sequence json objects describing events related to a job. It is used by two
- /aio/jobs/{id}/events (UI)
- /api/v1/jobs/{id}/events (API v1)
- new logs (when content is appended to the job logs)
It accepts one optional query parameter `offset` which gives the starting offset for
streaming the logs (useful for resuming an interupted stream). If negative, then the logs
will not be streamed at all.
Currently it streams two kind of events:
- new logs:
{"logs": "<CONTENT>"}
- state changes:
{"state": "<NEW_STATE>"} (UI variant)
{"status": "<NEW_STATUS>"} (API variant)
Note: the API variant returns a 'status' field to be consistent with the existing
API calls (also 'state' and 'results' are implementation details and are not
shown to the user)
job_id = int(request.match_info["job_id"])
offset = int(request.url.query.get("offset", 0))
if offset < 0:
offset = None
except ValueError:
return Response(status=400)
......@@ -562,22 +570,52 @@ class AllgoAio:
rep = JsonSeqStreamResponse()
# query the django server to have the job details and ensure this user
# is allowed to view this job (thanks to the "Cookie" header)
# is allowed to view this job
if request.path.startswith("/aio/"):
# using the "Cookie" header)
headers = prepare_headers(request)
headers["Accept"] = "application/json"
async with self.django_request("GET", "/jobs/%d" % job_id,
headers=headers, allow_redirects=False) as upstream_rep:
if upstream_rep.status != 200:
# FIXME: django should be able to return 401 directly
return Response(status=401 if upstream_rep.status==302 else upstream_rep.status)
state = (await upstream_rep.json())["state"]
result = None
def send_state(state, result):
rep.send({"state": state})
elif request.path.startswith("/api/v1/"):
# using an API token
async with self.django_request("GET", "/api/v1/jobs/%d" % job_id,
headers=prepare_headers(request)) as upstream_rep:
if upstream_rep.status != 200:
return Response(status=upstream_rep.status, body=await
status = (await upstream_rep.json())["status"]
if not isinstance(status, str):
raise TypeError(type(status))
# API v1 uses 'status' which has to be computed from 'state' and 'result'
if status in ("new", "waiting", "running", "aborting"):
# job running
state = status.upper()
result = None
# job terminated
state = "DONE"
result = "SUCCESS" if status=="done" else status.upper()
def send_state(state, result):
rep.send({"status": (
state if state not in FINAL_STATES else
"DONE" if result == "SUCCESS" else
(result or "NONE")).lower()})
headers = prepare_headers(request)
headers["Accept"] = "application/json"
async with self.django_request("GET", "/jobs/%d" % job_id,
headers=headers, allow_redirects=False) as rep:
if rep.status != 200:
# FIXME: django should be able to return 401 directly
return Response(status=401 if rep.status==302 else rep.status)
state = (await rep.json())["state"]
rep = JsonSeqStreamResponse()
await rep.prepare(request)
rep.send({"state": state})
await rep.drain()
send_state(state, result)
# poll the redis db and forward the data to the http response
......@@ -602,7 +640,7 @@ class AllgoAio:
# note: null state is possible if the redis entry does not
# exists yet
if job.state:
rep.send({"state": job.state})
send_state(job.state, job.result)
state = job.state
updated = True
......@@ -6,6 +6,6 @@ app_name = 'api'
urlpatterns = [
url(r'^jobs$',, name='jobs'),
url(r'^jobs/(?P<pk>\d+)', views.APIJobView.as_view(), name='job'),
url(r'^datastore/(?P<pk>\d+)/(.*)', views.APIDownloadView.as_view(), name='download'),
url(r'^jobs/(?P<pk>\d+)$', views.APIJobView.as_view(), name='job'),
url(r'^datastore/(?P<pk>\d+)/(.*)$', views.APIDownloadView.as_view(), name='download'),
// Start an asynchronous long-polling HTTP GET request for streaming a sequence
// of JSON objects (RFC 7464)
// of line-delimited JSON objects
// url: string http url
// on_event: function(event) event callback
......@@ -54,7 +54,7 @@ function json_seq_event_listener(url, on_event=null, on_error=null)
// streaming
while(true) {
var end = req.responseText.indexOf("\x1e", req.offset);
var end = req.responseText.indexOf("\n", req.offset);
if (end < 0) {
......@@ -67,6 +67,13 @@ server
disable_symlinks on;
location ~ ^/api/v1/jobs/\d+/events$ {
proxy_pass http://aio;
proxy_redirect off;
proxy_buffering off;
} #location /api/
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment