__init__.py 30.6 KB
Newer Older
1 2 3 4
#!/usr/bin/env python3
#
# This module implement an auxiliary HTTP server for serving asynchronous
# requests for allgo.
5
#
6 7 8 9 10 11 12
# There are two purposes:
# - implement server push (using long-lived HTTP requests) for:
#     - sending status updates for the jobs and sandboxes
#     - live-streaming of the job logs
# - have a really async implementation for pushing image manifests into
#   the registry (the preliminary implementation in
#   5451a6dfdb10a2d0875179d06b43c947ebcd37b4 was blocking)
13
#
14 15
# It is implemented with aiohttp.web (a lighweight HTTP framework,
# similar to what we can do with flask but asynchronously).
16
#
17 18 19 20 21 22
# The alternative would have been to use the django channels plugin, but:
# - it went through a major refactoring (v2) recently
# - it requires replacing unicorn with an ASGI server (daphne)
# - django-channels and daphne are not yet debian, and for the HTTP server
#   i would prefer an implementation for which we have stable security
#   updates
23
#
24
# (anyway this can be ported to django-channels later)
25
#
26 27
# The nginx config redirects the /aio/ path to this server (and the image
# manifests pushes too).
28
#
29 30 31
# The allgo.aio server interacts only with the nginx, reddis and django
# containers. It relies totally on the django server for authenticating
# the user and for accessing the mysql db (so there is no ORM).
32
#
33 34 35 36 37 38 39
# NOTE: in this design the django server has to trust blindly the requests
# coming from the allgo.aio server (for some endpoints). To prevent
# security issues, the nginx configuration is modified to set the header
# 'X-Origin: nginx'. Thus django knowns who he can trust.


import http
40
import json
41 42
import logging
import os
43 44
import re
import sys
45
import time
46
import weakref
47 48 49 50 51

import asyncio
import aiohttp
import aiohttp.web
from   aiohttp.web  import Response, StreamResponse
52
import aioredis
53 54 55 56 57 58 59 60

import config.env




# TODO: return error pages with some content

61 62 63 64 65 66 67

##################################################
# redis keys

# job log
REDIS_KEY_JOB_LOG       = "log:job:%d"
REDIS_KEY_JOB_STATE     = "state:job:%d"
68
REDIS_KEY_JOB_RESULT    = "result:job:%d"
69 70 71 72 73 74 75 76 77 78 79 80 81 82


# pubsub channels for waking up allgo.aio (frontend) and the controller
# (ourselves)
REDIS_CHANNEL_AIO        = "notify:aio"
REDIS_CHANNEL_CONTROLLER = "notify:controller"

# pubsub messages
REDIS_MESSAGE_JOB_UPDATED    = "job:%d"
REDIS_MESSAGE_WEBAPP_UPDATED = "webapp:%d"

##################################################


83 84 85 86 87 88 89 90 91
log = logging.getLogger("allgo-aio")

def daemonise(fork: bool, pidfile: str):
    """Demonise the process

    - fork the process if 'fork' is true
    - write the pid ot 'pidfile'
    """
    def write_pidfile(pid):
92 93
        with open(pidfile, "w") as fp:
            fp.write("%d\n" % pid)
94 95 96 97
    if fork:
        pid = os.fork()
        if pid:
            write_pidfile(pid)
98
            os._exit(0)     # pylint: disable=protected-access
99 100 101 102 103
    else:
        write_pidfile(os.getpid())

def try_set_result(fut: asyncio.Future, result):
    """set a asyncio.Future result
104

105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
    but do not raise an error if the result is already set
    """
    if fut.done():
        return False
    else:
        fut.set_result(result)
        return True

def prepare_headers(request: aiohttp.web.Request) -> dict:
    """Prepare the headers to be forwarded to django

    This function reuses the headers from the incoming aiohttp.web request and
    that are related to:
    - authentication (Authorization, Cookie)
    - reverse-proxy (X-Forwarded-*)

    Also it sets "X-Origin: aio" to make clear that this request comes from
    django.
    """
    headers = {"X-Origin": "aio"}
    for key in (
            "Authorization",
            "Cookie",
            "User_agent",
            "X-Forwarded-For",
            "X-Forwarded-Host",
            "X-Forwarded-Proto"):
132
        val = request.headers.get(key)
133 134 135 136 137 138 139 140
        if val is not None:
            headers[key] = val
    return headers

def is_ok(response: aiohttp.ClientResponse):
    """Return true if this HTTP response is successful"""
    return 200 <= response.status < 300

141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
class RateLimiter:
    """Generator for rate limiting

    This asynchronous iterator ensures we spend at least `period` seconds in
    each iteration.

    usage:
        async for _ in RateLimiter(60):
            ...
    """
    def __init__(self, period):
        assert period > 0
        self.period = period
        self._t0 = time.monotonic()-period

    def __aiter__(self):
        return self

    async def __anext__(self):
        t1 = time.monotonic()
        delay = self._t0 - t1 + self.period
        if delay > 0:
            log.debug("rate_limit: sleep %f seconds", delay)
            await asyncio.sleep(delay)
            self._t0 = t1 + delay
        else:
            self._t0 = t1

169 170
class JsonSeqStreamResponse(StreamResponse):
    """aiohttp response class for streaming json objects
171

172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
    The objects are streamed in the application/json-seq format (RFC 7464).

    usage:
        resp = JsonSeqStreamResponse()
        await resp.prepare(request)

        ...
        resp.send(json_obj)
        await resp.drain()

    """
    def __init__(self, *k, **kw):
        super().__init__(*k, **kw)
        self.content_type = "application/json-seq"
        self.charset = "utf-8"

    async def prepare(self, request):
        prepared = self.prepared
        await super().prepare(request)
        if not prepared:
            self.write(b"\x1e")

194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
        # send periodic nop messages (empty dict) to keep the connection alive
        #
        # It seems that firefox systematically closes the connection after 60
        # seconds of inactivity (and i do not now how to configure it),
        # therefore i set the interval to 50 seconds, this will also prevent
        # NAT firewalls from closing the connection.

        KEEPALIVE_INTERVAL = 50
        loop = asyncio.get_event_loop()
        def keepalive_cb():
            try:
                self.write(b"{}\n\x1e")
            except StopIteration:
                # it seems that self.write raises StopIteration when the
                # connection is closed
                return
            loop.call_later(KEEPALIVE_INTERVAL, keepalive_cb)
        loop.call_later(KEEPALIVE_INTERVAL, keepalive_cb)

213 214 215
    def send(self, json_object):
        self.write(json.dumps(json_object).encode() + b"\n\x1e")

216 217
class DoneException(Exception):
    pass
218

219

220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241
async def forward_response(reply: aiohttp.ClientResponse) -> aiohttp.web.Response:
    """Forward a HTTP response to the client

    This is used for forwarding django's HTTP responses to the client we are
    serving.
    """
    return Response(status=reply.status, body=await reply.read(), headers=(
        (key, val) for key, val in reply.headers.items()
        if key.lower() != "content-encoding"))


class ErrorResponse(Response):
    """Simple response classes for reporting errors

    eg: `ErrorResponse(status=404)` is equivalent to:
        `Response(status=404, text="404: Not Found")`
    """
    def __init__(self, status):
        super().__init__(status=status,
                text="%d: %s" % (status, http.HTTPStatus(status).phrase))


242 243
class StatesDict(weakref.WeakValueDictionary):
    """Dictionary for caching job/app states and handling synchronisation
244

245 246
    This dict & conditions are used for caching redis data and routing the
    redis notifications to the relevant HTTP request handlers.
247

248 249 250 251 252
    The dict key identifies a resource (eg: job id) and the dict value is an
    object initialised with multiple attributes:
      - a 'cond' attribute which is the asyncio.Condition for reporting updates
        to this resource
      - the attributes listed at dict creation, and initialised with None
253

254
    Entries are created on the fly if the key is not found in the dict.
255

256 257
    This dictionary is a weakref dict, so that if its entries are destroyed
    when they are no longer used (i.e. nobody is listening to them).
258 259
    """

260 261 262 263 264 265 266
    class State:
        pass

    def __init__(self, attrs=()):
        super().__init__()
        self._attrs = attrs

267
    def __getitem__(self, key):
268 269 270 271 272 273 274
        item = self.get(key)
        if item is None:
            item = self[key] = self.State()
            item.cond = asyncio.Condition()
            for name in self._attrs:
                setattr(item, name, None)
        return item
275 276


277 278 279 280 281 282 283 284 285 286 287 288
class AllgoAio:
    def __init__(self, bind, loop):
        self.loop = loop
        self.app = aiohttp.web.Application(loop=loop)

        self.django_url  = "http://127.0.0.1:8000"

        # aiohttp client for making request to django/registry
        self.http_client = None

        # aiohttp server task
        self.server     = None
289 290 291 292 293

        # global redis client for querying data from the reddis server
        # (not used by the notification task which has its own reddis connection)
        self.redis_client = None

294
        # state dicts for job & webapp notifications
295 296
        #
        # key is the job_id/webapp_id
297
        self.webapp_states = StatesDict()
298 299 300 301
        self.job_states = StatesDict(("state", "result"))

        # mutex to be locked when adding new job_states entries
        self.job_states_create_lock = asyncio.Lock()
302

303 304 305
        # global condition notified when any job is updated
        self.job_list_condition = asyncio.Condition()

306
        # ---------- routes ---------- #
307
        rtr = self.app.router
308

309 310 311 312
        rtr.add_route("*",   r"/v2/{repo:.*}/manifests/{tag}",   self.handle_image_manifest)
        rtr.add_route("GET", r"/aio/jobs/{job_id:\d+}/events",   self.handle_job_events)
        rtr.add_route("GET", r"/aio/jobs/events",                self.handle_job_list_events)
        rtr.add_route("GET", r"/aio/apps/{docker_name}/events",  self.handle_webapp_events)
313 314 315 316 317 318 319 320 321 322 323 324

        self.handler = self.app.make_handler()
        self.host, self.port = bind
        self._shutdown_requested = None

    def shutdown(self):
        log.info("shutdown requested")
        try_set_result(self._shutdown_requested, None)


    def django_request(self, method, path, *k, **kw):
        """Create a aiohttp request object to the django server
325

326 327 328 329 330 331 332 333 334 335 336 337
        The purpose is just to insert self.django_url in front of the path
        """
        assert path.startswith("/")
        return self.http_client.request(method, self.django_url+path, *k, **kw)


    async def run(self, fork, pidfile):
        """main task (run the server)"""
        assert self._shutdown_requested is None, "run() must not be called multiple times"

        self._shutdown_requested = asyncio.Future()
        log.info("starting")
338
        listener = None
339 340 341 342
        try:
            # create the aiohttp client
            self.http_client = aiohttp.ClientSession()

343 344 345
            # create redis client
            self.redis_client = await self.create_redis(reconnecting=True)

346 347 348 349 350 351 352
            # start the aiohttp server
            self.server = await self.loop.create_server(self.handler,
                    host=self.host, port=self.port)
            log.info("listening on: %s:%d", self.host, self.port)

            log.info("server ready")
            daemonise(fork, pidfile)
353 354 355 356 357 358 359 360

            # start listener task (to receive reddis notifications)
            # NOTE: this has to be done *after* forking (demonise()), because
            # the redis subscriber starts a thread and this cause a race
            # condition that hangs the server
            # https://stackoverflow.com/questions/39884898/large-amount-of-multiprocessing-process-causing-deadlock
            listener = asyncio.ensure_future(self.redis_notification_listener_task())

361 362 363 364
            await self._shutdown_requested
        finally:
            log.info("shutting down")

365 366 367 368 369 370 371 372 373 374 375 376 377 378
            # stop the redis client
            #
            # NOTE: it seems that there is no function for closing an
            #       autoreconnecting client, therefore we have an exception
            #       when stopping the server
            #
            #if self.redis_client is not None:
            #    self.redis_client.close()
            #    await self.redis_client.wait_closed()

            # stop the redis listener
            if listener is not None:
                listener.cancel()
                await listener
379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394

            # stop the aiohttp server
            if self.server is not None:
                self.server.close()
                await self.server.wait_closed()
            await self.app.shutdown()
            await self.handler.shutdown()
            await self.app.cleanup()

            if self.http_client is not None:
                await self.http_client.close()

        log.info("shutdown complete")



395 396 397 398 399 400 401 402 403

    def create_redis(self, *, reconnecting=False):
        log.info("connecting to redis at %s", config.env.ALLGO_REDIS_HOST)
        addr = (config.env.ALLGO_REDIS_HOST, 6379)
        if reconnecting:
            return aioredis.create_reconnecting_redis(addr)
        else:
            return aioredis.create_redis(addr)

404 405 406 407 408 409 410 411 412 413 414

    async def update_job_state(self, job_id):
        """Get the job state from redis and store it into job_states"""
        job = self.job_states.get(job_id)
        if job is not None:
            async with job.cond:
                new_state = await self.redis_client.get(
                        REDIS_KEY_JOB_STATE % job_id)
                # note: null state is possible if the redis entry does not
                # exists yet
                if new_state:
415 416 417 418
                    if new_state == b"DONE":
                        result = await self.redis_client.get(
                                REDIS_KEY_JOB_RESULT % job_id)
                        job.result = result.decode() if result else "NONE"
419 420 421 422 423
                    job.state = new_state.decode()

                log.info("notify job cond %d", job_id)
                job.cond.notify_all()

424 425
    async def redis_notification_listener_task(self):
        """Task for listening for redis notification
426

427
        - subscribes to REDIS_CHANNEL_AIO
428 429 430 431 432 433 434
        - automatically reconnects to the server (with rate limiting)
        """

        async for _ in RateLimiter(60):
            try:
                # create redis connection and subscribe to the notification channel
                conn = await self.create_redis()
435
                sub, = await conn.subscribe(REDIS_CHANNEL_AIO)
436
                log.info("subscribed to redis pub/sub channel %r", REDIS_CHANNEL_AIO)
437

438 439 440 441
                async with self.job_states_create_lock:
                    for item_id in self.job_states:
                        await self.update_job_state(item_id)

442 443 444 445 446 447 448 449 450
                async for msg in sub.iter():
                    log.info("redis notification: %r", msg)
                    try:
                        item_type, item_id_str = msg.split(b":")
                        item_id = int(item_id_str)
                    except ValueError as e:
                        log.warning("ignored malformatted notification: %r (%s)", msg, e)
                        continue

451
                    if item_type == b"job":
452
                        await self.update_job_state(item_id)
453 454
                        async with self.job_list_condition:
                            self.job_list_condition.notify_all()
455
                    elif item_type == b"webapp":
456 457 458 459
                        webapp = self.webapp_states.get(item_id)
                        if webapp is not None:
                            async with webapp.cond:
                                webapp.cond.notify_all()
460 461
                    else:
                        log.warning("ignored notification for unknown item %r", msg)
462

463 464
            except OSError as e:
                log.error("I/O error in the redis listener (%s)", e)
465 466 467 468 469 470
            except asyncio.CancelledError:
                log.info("notification task terminated")
                return
            except Exception:
                log.exception("error in the redis notification loop")

471 472 473 474 475 476 477 478 479 480
    async def get_image_description(self, request, repo: str, manifest: bytes) -> str:
        """Get the description of an image in the registry

        The WebappVersion.description string is provided by the user:
        - in a UI form (if committing from a sandbox)
        - in the 'allgo.description' label (if pushing a docker image)
        
        'manifest' is the manifest of the image being pushed to the registry.
        It points to a config blob which contains the image labels.

BAIRE Anthony's avatar
BAIRE Anthony committed
481 482
        This function parses the manifest, then it downloads and parses the
        config blob and finally returns the label if present.
483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517

        To be future-proof (the manifest format may change in the future), the
        function returns an empty string and logs a warning if it fails.
        """
        def error(msg, *k, **kw):
            log.warning("unable to get the allgo.description label from pushed image (%s)",
                    msg % k, **kw)
            return ""

        try:
            js = json.loads(manifest.decode())
            if js["schemaVersion"] != 2:
                return error("unknown schemaVersion=%r" % js["schemaVersion"])

            cfg = js["config"]
            digest, size = cfg["digest"], cfg["size"]
            if size > 1024**2:
                return error("config blob too big (%d bytes)" % size)

            if not re.fullmatch("[A-Za-z0-9:]+", digest):
                return error("malformatted digest %r" % digest)

            async with self.http_client.get("%s/v2/%s/blobs/%s" % (
                    config.env.ALLGO_REGISTRY_PRIVATE_URL, repo, digest),
                    headers=prepare_headers(request)) as cfg_reply:
                if not is_ok(cfg_reply):
                    return error("unable to get the config blob (Error %d)" % cfg_reply.status)

                js = json.loads(await cfg_reply.text())
                try:
                    return str(js["config"]["Labels"]["allgo.description"])
                except (TypeError, KeyError):
                    return ""
        except Exception as e:
            return error("unhandled exception", exc_info=sys.exc_info())
518

519 520 521 522 523 524 525 526 527 528 529 530 531 532 533
    async def handle_image_manifest(self, request):
        """Registry endpoint for pushing/pulling image manifests

        The nginx reverse proxy is configured to forward these requests through the
        django server, so that:
        - we are notified when a new image is pushed
        - we can later implement fine-grained permissions (tag-based rather than
          repository-based)

        Note: the DB is transactionally updated before the client receives the 201
        response
        """
        # NOTE: the registry endpoint also supports the DELETE method to delete the
        # image. Since the deletion of a WebappVersion is not yet implemented, we
        # do not allow the delete method here
534 535 536 537 538
        if request.method in ("HEAD", "GET"):
            action = "pull"
        elif request.method == "PUT":
            action = "push"
        else:
539 540 541 542 543
            # method not allowed
            return ErrorResponse(status=405)

        headers = prepare_headers(request)
        headers["Content-Type"] = request.content_type
544
        manifest = await request.read()
545 546 547

        repo = request.match_info["repo"]
        tag  = request.match_info["tag"]
548 549
        description = (await self.get_image_description(request, repo, manifest)
                ) if action == "push" else ""
550

551 552
        # call django's pre hook
        async with self.django_request("POST", "/jwt/pre-"+action,
553 554
                headers=headers, params={"repo": repo, "tag": tag,
                    "description": description}) as django_reply:
555

556 557 558
            if not is_ok(django_reply):
                return await forward_response(django_reply)
            version_id = int(await django_reply.read())
559

560 561 562 563
        # forward the  HTTP request to the registry
        real_url = "%s/v2/%s/manifests/id%d" % (
                config.env.ALLGO_REGISTRY_PRIVATE_URL, repo, version_id)
        async with self.http_client.request(request.method, real_url,
564
                headers=headers, data=manifest) as registry_reply:
565 566 567 568 569 570 571 572

            if action == "pull":
                # pull
                # -> just forward the reply
                return await forward_response(registry_reply)
            else:
                # push
                # -> call the post-push hook to update the db
573

574
                registry_success = int(is_ok(registry_reply))
575 576 577

                # call django's post-push hook
                async with self.django_request("POST", "/jwt/post-push",
578 579
                        headers=headers, params={"version_id": str(version_id),
                            "success": str(registry_success)},
580
                        ) as django_reply:
581
                    if registry_success and not is_ok(django_reply):
582
                        return await forward_response(django_reply)
583 584
                    else:
                        return await forward_response(registry_reply)
585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611


    async def handle_job_events(self, request):
        """Channel for monitoring job events

        This HTTP endpoint streams a sequence json objects describing events
        related to a job.

        Currently two events are defined:

        - state update (when the state of the job is updated)

            {"state": "<NEW_STATE>"}

        - new logs (when content is appended to the job logs)

            {"logs": "<CONTENT>"}

        """
        try:
            job_id = int(request.match_info["job_id"])
            offset = int(request.url.query.get("offset", 0))
        except ValueError:
            return Response(status=400)

        log_key   = REDIS_KEY_JOB_LOG   % job_id
        state = None
612 613
        async with self.job_states_create_lock:
            job = self.job_states[job_id]
614

615
        FINAL_STATES = ("DONE", "ARCHIVED", "DELETED")
616

617 618 619 620 621
        # query the django server to have the job details and ensure this user
        # is allowed to view this job (thanks to the "Cookie" header)

        headers = prepare_headers(request)
        headers["Accept"] = "application/json"
BAIRE Anthony's avatar
BAIRE Anthony committed
622
        async with self.django_request("GET", "/jobs/%d" % job_id,
623 624 625 626
                headers=headers, allow_redirects=False) as rep:
            if rep.status != 200:
                # FIXME: django should be able to return 401 directly
                return Response(status=401 if rep.status==302 else rep.status)
627
            state = (await rep.json())["state"]
628 629 630
        rep = JsonSeqStreamResponse()
        await rep.prepare(request)

631
        rep.send({"state": state})
632 633 634 635 636
        await rep.drain()

        # poll the redis db and forward the data to the http response
        #
        # return true if something was read
637
        async def poll(drain=True):
638 639
            nonlocal state, offset

640
            if offset is None and state in FINAL_STATES:
641 642
                raise DoneException()

643 644 645 646 647
            if drain:
                # ensure the output buffer is flushed before polling for new
                # data
                await rep.drain()

648 649 650
            updated = False

            # poll state change
651
            if state not in FINAL_STATES:
652 653
                if job.state != state:
                    log.info("job %d state updated %s -> %s", job_id, state, job.state)
654 655
                    # note: null state is possible if the redis entry does not
                    # exists yet
656 657 658
                    if job.state:
                        rep.send({"state": job.state})
                    state = job.state
659
                    updated = True
660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679

            # poll new logs
            if offset is not None:
                data = await self.redis_client.getrange(log_key, offset, offset+8191)
                log.info("job %d log read %d bytes at offset %d: %r",
                        job_id, len(data), offset, data[:40])
                if data:
                    if data[-1] == 0x04:
                        # EOF
                        data = data[:-1]
                        offset = None
                    else:
                        offset += len(data)
                    rep.send({"logs": data.decode(errors="replace")})
                    updated = True

            return updated

        try:
            while True:
680
                async with job.cond:
681
                    if not await poll(False):
682
                        await job.cond.wait()
683 684 685 686 687 688
                while await poll():
                    pass

        except (DoneException, asyncio.CancelledError):
            pass
        except Exception:
689
            log.exception("exception in handle_job_events(job_id=%d)", job_id)
690 691
        return rep

692 693 694 695 696 697 698 699

    async def handle_job_list_events(self, request):
        """Channel for monitoring job list events

        This HTTP endpoint streams a sequence json objects describing events
        related to a list of jobs.

        The job ids are provided in the query string (parameter 'id')
700

701 702 703 704 705 706 707 708 709 710 711 712 713

        Currently only one event is defined:

        - state update sent when the state of the job is updated (note: the
          result is provided only with the 'DONE' state)

            {"id:"     <JOB_ID>,
             "state": "<NEW_STATE>",
             [ "result": "<RESULT>" ],
             }
        """

        try:
714
            job_ids = {int(x) for x in request.url.query.getall("id", ())}
715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732
            # limit the number of jobs in a single request
            if len(job_ids) > 20:
                raise ValueError()
        except ValueError:
            return Response(status=400)

        FINAL_STATES = (b"DELETED", b"ARCHIVED")

        # query the django server to have the current state of each job and
        # ensure this user is allowed to view these jobs (thanks to the
        # "Cookie" header)

        # state dict (key: job id, value: state)
        states = {}
        results = {}
        for job_id in job_ids:
            headers = prepare_headers(request)
            headers["Accept"] = "application/json"
BAIRE Anthony's avatar
BAIRE Anthony committed
733
            async with self.django_request("GET", "/jobs/%d" % job_id,
734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810
                    headers=headers, allow_redirects=False) as rep:
                if rep.status == 200:
                    js = await rep.json()
                    states[job_id] = js["state"]
                    results[job_id] = js["result"]
                elif rep.status == 404:
                    states[job_id] = "DELETED"
                else:
                    # FIXME: django should be able to return 401 directly
                    return Response(status=401 if rep.status==302 else rep.status)

        async with self.job_states_create_lock:
            jobs = {job_id: self.job_states[job_id] for job_id in job_ids}

        rep = JsonSeqStreamResponse()
        await rep.prepare(request)

        def send_state_update(job_id, state, result):
            msg = {"id": job_id, "state": state}
            if state == "DONE":
                msg["result"] = result or "NONE"
            rep.send(msg)

        def remove_deleted_jobs():
            for job_id in [job_id for job_id, state in states.items()
                    if state in FINAL_STATES]:
                del states[job_id]
                del jobs[job_id]
                log.info("removed job %d from the watch list", job_id)
            log.info("current states: %r", states)

        for job_id, state in states.items():
            send_state_update(job_id, state, results.get(job_id))
        del results
        await rep.drain()

        remove_deleted_jobs()

        # poll the redis db and forward the data to the http response
        #
        # return true if something was read
        def poll():
            if not states:
                raise DoneException()

            # poll state changes
            updated = False
            for job_id, state in states.items():
                new_state = jobs[job_id].state
                if new_state != state:
                    log.info("job %d state updated %s -> %s", job_id, state, new_state)
                    # note: null state is possible if the redis entry does not
                    # exists yet
                    if new_state:
                        send_state_update(job_id, new_state, jobs[job_id].result)
                    states[job_id] = new_state
                    updated = True

            remove_deleted_jobs()
            return updated

        try:
            while True:
                async with self.job_list_condition:
                    if not poll():
                        await self.job_list_condition.wait()

                # ensure the output buffer is flushed before polling for new
                # data (but do it when the condition is not locked)
                await rep.drain()

        except (DoneException, asyncio.CancelledError):
            pass
        except Exception:
            log.exception("exception in handle_job_events(job_id=%d)", job_id)
        return rep

811 812 813 814 815 816 817 818 819 820 821 822 823
    async def handle_webapp_events(self, request):
        """Channel for monitoring events related to a webapp events

        The response is a streamed event sequence:

        - state update (when the state of the sandbox is updated)
            {"sandbox_state": "<NEW_STATE>"}

        """
        docker_name = request.match_info["docker_name"]

        # return a dict (on success) or an HTTP error code (on error)
        async def get_webapp_details():
824 825 826
            async with self.django_request("GET", "/apps/%s/json" % docker_name,
                    headers=prepare_headers(request),
                    allow_redirects=False, timeout=1) as rep:
827 828 829 830 831 832 833 834 835 836 837 838 839 840
                if rep.status != 200:
                    # FIXME: django should be able to return 401 directly
                    return 401 if rep.status==302 else rep.status
                return await rep.json()

        # query the django server to have the job details and ensure this user
        # is allowed to view this job (thanks to the "Cookie" header)
        details = await get_webapp_details()
        if isinstance(details, int):
            # error
            return Response(status=details)

        webapp_id = details["id"]
        state = details["sandbox_state"]
841
        webapp = self.webapp_states[webapp_id]
842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864


        rep = JsonSeqStreamResponse()
        await rep.prepare(request)
        rep.send({"sandbox_state": state})

        async def poll():
            nonlocal state

            # poll state change
            details = await get_webapp_details()
            if isinstance(details, int):
                log.error("handle_webapp_events(webapp_id=%d) got error %d from django",
                        webapp_id, details)
                raise DoneException()
            log.info("webapp details %r", details)
            new_state = details["sandbox_state"]
            if new_state != state:
                log.info("webapp %d state updated %s -> %s", webapp_id, state, new_state)
                rep.send({"sandbox_state": new_state})
                state = new_state
        try:
            while True:
865
                async with webapp.cond:
866
                    await poll()
867
                    await webapp.cond.wait()
868 869 870 871 872 873

        except asyncio.CancelledError:
            pass
        except Exception:
            log.exception("exception in handle_webapp_events(webapp_id=%d)", webapp_id)
        return rep