controller.py 42 KB
Newer Older
1
2
3
4
#!/usr/bin/python3


import asyncio
5
import collections
6
from   concurrent.futures   import ThreadPoolExecutor
7
8
import contextlib
import enum
BAIRE Anthony's avatar
BAIRE Anthony committed
9
import itertools
BAIRE Anthony's avatar
BAIRE Anthony committed
10
import json
11
12
13
14
15
16
import logging
import re
import os
import signal
import socket
import sys
17
18
19
import time
import threading
import traceback
20
21
22

import docker
import MySQLdb
BAIRE Anthony's avatar
BAIRE Anthony committed
23
24
from   sqlalchemy import desc
import sqlalchemy.orm.scoping
25

26
27
from database import *

28
29
HOST_PATH="/vol/host/"

30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
##################################################
# default number of concurrent tasks

# pushing images from the sandbox to the registry
NB_PUSH_SANDBOX = 2

# pulling images from the registry to the sandbox
NB_PULL_SANDBOX = 2

# pulling images from the registry to the swarm
NB_PULL_SWARM   = 2

# sandbox actions (start, stop, commit, ...)
NB_SANDBOX_TASKS = 4

# jobs
NB_JOB_TASKS     = 4


# default thread pool (used by most of the tasks)
default_executor = ThreadPoolExecutor(10)

##################################################

BAIRE Anthony's avatar
BAIRE Anthony committed
54
log = logging.getLogger("controller")
55

56
57
58
59
60
assert MySQLdb.threadsafety >= 1


class Error(Exception):
    pass
61

62
63
64
class ShuttingDown(Exception):
    pass

BAIRE Anthony's avatar
BAIRE Anthony committed
65
66
67
68
69
70
71
72
73
74
75
76
77
def docker_check_error(func, *k, **kw):
    """Wrapper for docker-py methods that produce a stream

    Methods producing a stram (eg: push, build) do not report all errors
    by raising exceptions. Some errors are reported later in the stream.

    This function parses the stream and raise Error() if needed.
    """
    for elem in func(*k, stream=True, **kw):
        js = json.loads(elem.decode())
        if "error" in js:
            raise Error("push error: " + js["error"])

BAIRE Anthony's avatar
BAIRE Anthony committed
78
79
80
81
82
83
84
85
86
@contextlib.contextmanager
def docker_warning(msg, *k, ignore=None):
    """Catch docker errors and issue a warning instead"""
    try:
        yield
    except docker.errors.APIError as e:
        if ignore is None or not isinstance(e, ignore):
            k += e,
            log.warning(msg + " (%s)", *k)
BAIRE Anthony's avatar
BAIRE Anthony committed
87

BAIRE Anthony's avatar
BAIRE Anthony committed
88
89
90
91
92
93
94
95
96
97
@contextlib.contextmanager
def report_error(fmt, *k):
    try:
        yield
    except Exception as e:
        msg = fmt % k
        log_func = log.error if isinstance(e, Error) else log.exception
        log_func("%s (%s)", msg,
                traceback.format_exception_only(type(e), e)[-1].strip())
        raise
98

99
100
101
102
103
104
105
106
107
def auto_create_task(func):
    """Decorator for forcing the creation of a task when a coroutine function is called

    Return a wrappers that calls the function, create the task on the fly and
    return it. Also it installs a no-op callback to avoid warnings in case the
    result is not used.
    """
    assert asyncio.iscoroutinefunction(func)

BAIRE Anthony's avatar
BAIRE Anthony committed
108
    def wrapper(*k, **kw):
109
        tsk = asyncio.async(func(*k, **kw))
BAIRE Anthony's avatar
BAIRE Anthony committed
110
        # ignore warning about result not used
111
112
        tsk.add_done_callback(lambda f: f.exception())
        return tsk
BAIRE Anthony's avatar
BAIRE Anthony committed
113
114
    return wrapper

115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def cascade_future(src, dst):
    """propagate the result of a future to another future
    
    This function installs a callback to the future `src`, that propagates its
    result to the future `dst`.
    """
    def callback(fut):
        ex = fut.exception()
        if ex is None:
            dst.set_result(fut.result())
        else:
            dst.set_exception(ex)

    if src.done():
        callback(src)
    else:
        src.add_done_callback(callback)

133
class Manager:
BAIRE Anthony's avatar
BAIRE Anthony committed
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
    """A class for scheduling asynchronous jobs on a collection of keys

    (the one-line summary is not very helpful, sorry)
    

    Job submission

    The job is scheduled on key KEY by calling .process(KEY), the function
    returns an asyncio.Future object (which will receive the result)

    If .process(KEY) is called while a job is already running for KEY, then the
    manager arranges the job to be run a second time in a row for the same key


    Job implementation

    This is an abstract class. The job shall be implemented as the coroutine
    named ._process() in the inherited class.

    The manager guarantees that ._process() cannot be called multiple times
    concurrently for the same key

    In case a shutdown is requested (see .shutdown()):
     - all jobs that are not yet started are cancelled
     - running jobs continue until they return or until they try to acquire the
       internal semaphore (which raises ShuttingDown())


    Concurrency

    The job tasks are all started immediately (whatever the number of existing
    jobs)

    It is possible to limit the concurrency to a maximum number of parallel
    tasks. Manager provides an internal semaphore (the number of tokens is set
    in .__init__())

    The semaphore can be locked in two ways:
     - by locking the manager:

            with (yield from self): ... concurrency limited to `nb_tokens`
            tasks ...

     - by calling .run_in_executor (this is for running non-async code):

            yield from self.run_in_executor(func, args...)

    Shutdown

    When .shutdown() is called the Manager ensures that all jobs are properly
    terminated
     - it cancels all jobs that are not yet started
     - it prevents starting new jobs
     - it lets running pending jobs, but interrupts them if they when ther try
       to acquire the internal semaphore

    All cancel/interrupted tasks have their future raise ShuttingDown().

    .shutdown() returns after all pending jobs are terminated.


    Thread safety

    - Manager is *not* thread safe, all public methods must be called from the
      same thread
    """
200

201
    class _Handle:
BAIRE Anthony's avatar
BAIRE Anthony committed
202
        __slots__ = "key", "cur", "nxt"
203

204
    def __init__(self, nb_tokens=1, *, executor = default_executor):
BAIRE Anthony's avatar
BAIRE Anthony committed
205
        # {key: _TaskHandler}
206
        self._handles = {}
207
        self._semaphore = asyncio.Semaphore(nb_tokens)
208
        self._shutdown = None
209
        self._executor = executor
210
211
212
213
214
215
216
217
218
219

    def _create_task(self, hnd):
        assert hnd.nxt is None
        def reset():
            assert not hnd.cur.done()
            nxt = hnd.nxt
            if nxt is not None:
                cascade_future(hnd.cur, nxt)
                hnd.nxt = None

BAIRE Anthony's avatar
BAIRE Anthony committed
220
        hnd.cur = asyncio.async(self._process(hnd.key, reset))
221
        hnd.cur.add_done_callback(lambda fut: self._done(hnd))
BAIRE Anthony's avatar
BAIRE Anthony committed
222
        log.debug("task scheduled %r %r", self, hnd.key)
223
        return hnd.cur
224

BAIRE Anthony's avatar
BAIRE Anthony committed
225
226
227
228
229
    def process(self, key):
        """Schedule the job to be run on key `key`

        returns an asyncio.Future (that will provide the result of ._process())
        """
230
231
232
        if self._shutdown is not None:
            return self._shutdown

BAIRE Anthony's avatar
BAIRE Anthony committed
233
        hnd = self._handles.get(key)
234
235
        if hnd is None:
            # create handle
BAIRE Anthony's avatar
BAIRE Anthony committed
236
237
            self._handles[key] = hnd = self._Handle()
            hnd.key = key
238
239
240
241
242
243
244
245
246
247
248
249
250
251
            hnd.cur = None
            hnd.nxt = None

        if hnd.cur is None:
            # create task
            return self._create_task(hnd)
        else:
            # reschedule task
            if hnd.nxt is None:
                hnd.nxt = asyncio.Future()
            return hnd.nxt

    def _done(self, hnd):

BAIRE Anthony's avatar
BAIRE Anthony committed
252
        assert hnd is self._handles.get(hnd.key)
253
254
255
256
257
        assert hnd.cur.done()

        try:
            hnd.cur.result()
        except Exception:
BAIRE Anthony's avatar
BAIRE Anthony committed
258
            log.exception("task %r %r unhandled exception", self, hnd.key)
259
260
261
262

        nxt     = hnd.nxt
        hnd.cur = hnd.nxt = None
        if nxt is None:
BAIRE Anthony's avatar
BAIRE Anthony committed
263
            del self._handles[hnd.key]
264
        else:
265
            cascade_future(self._create_task(hnd), nxt)
266

267

268
269
    @asyncio.coroutine
    def __iter__(self):
BAIRE Anthony's avatar
BAIRE Anthony committed
270
271
272
273
274
275
276
277
278
279
        """Coroutine for locking the internal semaphore
        
        Usage:
            with (yield from manager):
              ...

        Warning:
            after a shutdown is initiated, this function will always raise
            ShuttingDown()
        """
280
281
282
283
284
285
286
287
        ctx = yield from iter(self._semaphore)
        if self._shutdown:
            with ctx:
                raise ShuttingDown()
        return ctx


    @asyncio.coroutine
288
    def run_in_executor(self, *k):
BAIRE Anthony's avatar
BAIRE Anthony committed
289
290
291
292
293
        """Run a function in a separate thread (with limited concurrency)

        This function locks the internal semaphore and runs the provided
        functionc call in a separate thread (using the executor)
        """
294
295
        with (yield from self):
            return (yield from asyncio.get_event_loop().run_in_executor(self._executor, *k))
296
297


BAIRE Anthony's avatar
BAIRE Anthony committed
298
    def _process(self, key, reset):
BAIRE Anthony's avatar
doc    
BAIRE Anthony committed
299
300
301
302
303
304
305
306
307
308
309
        """Actual implementation of the job (to be reimplemented in inherited classes)

        The Manager class guarantees that this function cannot be called
        multiple times concurrently on the same key (in case the same key is
        submitted multiple times, they Manager will call this function a second
        time after it has terminated).

        `reset` is a function that may be called to reset the 'dirty' state of
        this key (this is to avoid calling ._process() a second time if not
        necessary)
        """
310
311
        raise NotImplementedError()

312
    @asyncio.coroutine
313
    def shutdown(self):
BAIRE Anthony's avatar
BAIRE Anthony committed
314
315
316
317
        """Initiate a graceful shutdown

        This coroutine terminates once all tasks are properly terminated.
        """
318
319
        self._shutdown = asyncio.Future()
        self._shutdown.set_exception(ShuttingDown())
320
        self._shutdown.exception()  # to avoid asyncio warnings
321
322
323
324
325
326
327
        # cancel all 'next' tasks
        for hnd in self._handles.values():
            if hnd.nxt is not None:
                hnd.nxt.set_exception(self._shutdown)
                hnd.nxt = None

        yield from asyncio.gather(*(h.cur for h in self._handles.values() if h.cur is not None),
328
                return_exceptions=True)
BAIRE Anthony's avatar
BAIRE Anthony committed
329

330
331

class SandboxManager(Manager):
BAIRE Anthony's avatar
BAIRE Anthony committed
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
    """Manager for sandbox operation

    This manager handles all sandbox operations (start, stop, commit).

    Operations are requested asynchronously in the database:
     - Webapp.sandbox_state=starting (for starting a sandbox)
     - Webapp.sandbox_state=stopping (for stopping a sandbox)
     - WebappVersion with state=sandbox (for committing a new image)

    All state changes on Webapp.sandbox_state are atomic (e.g: if the user
    requests a stop while the sandbox is starting then the manager finishes
    with starting the sandbox but does not update the state, and it runs
    immediately again to stop the sandbox)

    Whatever is the value of Webapp.sandbox_state, the manager first examines
    commit requests and makes the commit if requested.

    If the container already exists before starting the webapp (not possible in
    normal operations), then a recovery image is committed first.

    When a commit is successful. The image manager is notified for pushing the
    image to the registry and (if it is not a recovery image) to pull it to the
    swarm (because this image will very likely be used soon).

    State changes:

     - sandbox start:
            starting->running     (normal case)
            starting->start_error (error case)

     - sandbox stop:
            stopping->idle        (normal case)
            stopping->stop_error  (error case)

     - image commit:
            sandbox->committed  (normal case)
            sandbox->error      (error case)
            (none)->committed   (recovery version) 
    """
371

372
    def __init__(self, ctrl, nb_threads = NB_SANDBOX_TASKS):
373
        super().__init__(nb_threads)
374
375
376
        self.ctrl = ctrl


BAIRE Anthony's avatar
BAIRE Anthony committed
377
378
    def inspect_sandbox(self, webapp):
        try:
BAIRE Anthony's avatar
BAIRE Anthony committed
379
380
            return self.ctrl.sandbox.inspect_container(
                    self.ctrl.gen_sandbox_name(webapp))
BAIRE Anthony's avatar
BAIRE Anthony committed
381
382
383
        except docker.errors.NotFound:
            return None

BAIRE Anthony's avatar
BAIRE Anthony committed
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
    @staticmethod
    def filter_sandbox_version(query, webapp_id):
        """Narrow a WebappVersion query to filter the version that must be started in the sandbox"""

        return (query.
                # filter the right webapp
                filter_by(webapp_id=webapp_id)
                # keep only versions whose state is 'ready' or 'committed'
                .filter(WebappVersion.state.in_((int(VersionState.committed), int(VersionState.ready))))
                # take the latest one
                .order_by(desc(WebappVersion.id)).limit(1))

    @staticmethod
    def filter_commit_version(query, webapp_id):
        """Narrow a WebappVersion query to select the candidate versions to be committed"""
        return (query.
                filter_by(webapp_id=webapp_id,
                         state = int(VersionState.sandbox))
                    )

    def _start(self, webapp, version):
405
406
407
408
409
410
        """Start a webapp sandbox

        (to be executed in a thread pool)
        """

        ctrl = self.ctrl
BAIRE Anthony's avatar
BAIRE Anthony committed
411
        ses  = ctrl.session
412
413

        # prepare sandbox parameters
BAIRE Anthony's avatar
BAIRE Anthony committed
414
415
416
417
418
419

        # docker image
        if version is None:
            image = "%s/factory/%s:%s" % (self.ctrl.registry,
                    webapp.docker_os.docker_name,
                    webapp.docker_os.version)
420
        else:
BAIRE Anthony's avatar
BAIRE Anthony committed
421
            image = "%s:%s" % (webapp.image_name, version.number)
BAIRE Anthony's avatar
BAIRE Anthony committed
422
423

        log.debug("sandbox %r: using image %r", webapp.docker_name, image)
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438

        # safety checks
        # (because docker_name is used it the paths of the external volumes
        if ("/" in webapp.docker_name) or (webapp.docker_name in ("", ".", "..")):
            raise Error("malformatted docker_name")

        pipebin = os.path.join(ctrl.sandbox_path, "bin")
        pipedir = os.path.join(ctrl.sandbox_path, "srvdir", webapp.docker_name)

        uid = webapp.id + 2000
        if uid < 2000:
            # just for safety
            raise Error("bad webapp id")

        # remove stale container (if any)
439
440
        if self.inspect_sandbox(webapp) is not None:
            self._stop(webapp)
441

BAIRE Anthony's avatar
BAIRE Anthony committed
442
        container = webapp.sandbox_name
443
444
445
        try:
            # prepare the sandbox
            # (create files for pipesrv)
BAIRE Anthony's avatar
BAIRE Anthony committed
446
            ctrl.sandbox.create_container("busybox:latest", name=container,
447
448
449
450
451
452
453
454
455
456
457
458
459
                    command = ["/bin/sh", "-c", "set -x ;rm -rf -- /.pipedir/* && chmod 0700 /.pipedir && mkfifo -m 0600 /.pipedir/srv && chown -R %d:65534 /.pipedir" % uid],
                    host_config = ctrl.sandbox.create_host_config(
                        binds   = {pipedir: {"bind": "/.pipedir"}}
                    ))
            ctrl.sandbox.start(container)
            if ctrl.sandbox.wait(container):
                raise Error("sandbox preparation failed")
            ctrl.sandbox.remove_container(container)


            # create and start the sandbox
            ctrl.check_host_path("isdir", pipedir)
            ctrl.check_host_path("isdir", pipebin)
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
            command = ["/.pipebin/pipesrv", "-d", "/.pipedir"]

            if version is None and webapp.entrypoint:
                # prepend instructions to initialise a dummy entrypoint
                dn, bn = os.path.split(webapp.entrypoint)
                command = ["/bin/sh", "-c", """
                    {mkdir}
                    cat > {entrypoint!r} <<EOF
#!/bin/sh
echo
echo "This is app '{name}' called with parameters '\$@'"
echo
echo "The workdir contains:"
ls -l
EOF
                    chmod 0755 -- {entrypoint!r}
                    exec {cmd}
                """.format( entrypoint  = webapp.entrypoint,
                            name        = webapp.docker_name,
                            mkdir       = (("mkdir -p -- %r" % dn) if dn else ""),
                            cmd         = " ".join(map(repr, command)))]

482
            ctrl.sandbox.create_container(image, name=container, hostname=container,
483
                    command = command,
484
485
486
487
488
489
490
491
492
493
494
495
                    host_config = ctrl.sandbox.create_host_config(
                        binds = {
                            pipedir: {"bind": "/.pipedir", "mode": "ro"},
                            pipebin: {"bind": "/.pipebin", "mode": "ro"},
                            },
                        # TODO: maybe drop other caps
                        cap_drop = ["NET_RAW"],
                        ))

            ctrl.sandbox.start(container)

        except:
BAIRE Anthony's avatar
BAIRE Anthony committed
496
497
            with docker_warning("cleanup error: unable to remove container %r",
                    container, ignore=docker.errors.NotFound):
498
499
500
                ctrl.sandbox.remove_container(container, force=True)
            raise

BAIRE Anthony's avatar
BAIRE Anthony committed
501
    def _commit(self, webapp, versions):
502
503
504
505
506
507
508
509
510
511
512
        """Commit a webapp sandbox

        (to be executed in a thread pool)

        The image version is looked up in webapp_versions (where state==sandbox).
        
        In case of any error, a recovery version is committed instead (to avoid
        loosing the work done inside the sandbox) and the candidates are put in
        error state.
        """
        ctrl = self.ctrl
BAIRE Anthony's avatar
BAIRE Anthony committed
513
        ses  = ctrl.session
514

BAIRE Anthony's avatar
BAIRE Anthony committed
515
516
517
518
519
520
        # pre-commit checks
        # - ensure that there is exactly one candidate webapp_version with
        #   state=sandbox
        # - otherwise:
        #   - put all candidates in error state
        #   - create a recovery version
521

BAIRE Anthony's avatar
BAIRE Anthony committed
522
523
524
525
526
        # version be committed
        version = None
        
        # error msg (if any)
        error = None
527

BAIRE Anthony's avatar
BAIRE Anthony committed
528
529
        # version ids to be recovered
        recover = ()
530

BAIRE Anthony's avatar
BAIRE Anthony committed
531
532
533
534
535
536
        if len(versions) == 1:
            # normal case (sandbox commit)
            version = versions[0]
            if not version.number:
                error   = "empty version number"
                recover = version.id,
537

BAIRE Anthony's avatar
BAIRE Anthony committed
538
539
540
        elif not versions:
            # sandbox rollback (when user drops a sandbox without committing a new image)
            error     = "dangling sandbox"
541

BAIRE Anthony's avatar
BAIRE Anthony committed
542
543
544
545
546
        else:
            # multiple candidates (should never happen)
            error    = "multiple candidate versions (%s)" % (
                ", ".join(map(repr, sorted(v.number for v in versions))))
            recover = tuple(v.id for v in versions)
547

BAIRE Anthony's avatar
BAIRE Anthony committed
548
549
        # TODO: make 'sandbox' a reserved name

BAIRE Anthony's avatar
BAIRE Anthony committed
550
551
552
        if error:
            changelog = "pre-commit error: " + error
            log.error("sandbox %r version id %r: %s", webapp.docker_name, recover, changelog)
553

BAIRE Anthony's avatar
BAIRE Anthony committed
554
            with ses.begin():
555
556
                # put all candidates in 'error state'
                if recover:
BAIRE Anthony's avatar
BAIRE Anthony committed
557
                    ses.execute('''UPDATE webapp_versions
BAIRE Anthony's avatar
fix sql    
BAIRE Anthony committed
558
559
560
                            SET changelog=CONCAT(changelog, " [", :changelog, "]"), state=:state
                            WHERE id IN :ids''', dict(changelog=changelog, ids=recover,
                                state=int(VersionState.error)))
561
562

                # create a recovery version
BAIRE Anthony's avatar
BAIRE Anthony committed
563
564
565
566
567
568
569
570
571
                version = WebappVersion(
                        webapp_id = webapp.id,
                        number    = time.strftime("recovery-%Y%m%d-%H%M%S"),
                        changelog = changelog,
                        published = False,
                        state     = int(VersionState.sandbox))
                ses.add(version)
            ses.refresh(version)
            ses.expunge(version)
572

BAIRE Anthony's avatar
BAIRE Anthony committed
573
        assert version is not None
574

BAIRE Anthony's avatar
BAIRE Anthony committed
575
        # commit the docker image
576

BAIRE Anthony's avatar
BAIRE Anthony committed
577
578
        log.debug("dicts %r %r", webapp.__dict__, version.__dict__)
        log.info("commit sandbox %r version %r", webapp.docker_name, version.number)
579

BAIRE Anthony's avatar
BAIRE Anthony committed
580
581
582
583
584
585
586
        container = webapp.sandbox_name
        next_state = image_size = None
        try:
            # stop the container (if stopping or if creating a new sandbox)
            if webapp.sandbox_state in (SandboxState.stopping, SandboxState.starting):
               ctrl.sandbox.stop(container)
               ctrl.sandbox.wait(container)
587

BAIRE Anthony's avatar
BAIRE Anthony committed
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
            # commit
            cid = ctrl.sandbox.commit(container, webapp.image_name, version.number)
            next_state = VersionState.committed
            image_size = ctrl.sandbox.inspect_image(cid)["Size"]

            return version, error

        except docker.errors.NotFound:
            error = "commit error: container not found %r" % container
            log.error("%s", error)
            next_state = VersionState.error
            image_size = 0
            ses.execute('''UPDATE webapp_versions
                    SET changelog=CONCAT(changelog, " [commit error: sandbox is down]")
                    WHERE id=%d''' % version.id)

            # here we do not propagate the error to allow starting/stopping the
            # sandbox immediately (without going through sandbox_state=:error)
            return None, error

        except Exception as e:
            log.exception("sandbox %r version %r: unexpected commit error (sandbox may still be recovered)",
                    webapp.docker_name, version.number)
            raise

        finally:
            # NOTE: if anything unexpected happens, the version is
            # left in state 'sandbox' and we propagate the exception to
            # ensure the work done inside the sandbox is not dropped
            # and the sandbox is pute in 'error' state
            #
            # The error will not be reported to the user. If this is an issue,
            # then the solution would be to create another error state (to be
            # used when the sandbox is still there).
            # 
            if next_state is not None:
                with ses.begin():
                    ses.execute("UPDATE webapp_versions SET state=%d, docker_image_size=%d WHERE id=%d"
                            % (next_state, image_size, version.id))
BAIRE Anthony's avatar
BAIRE Anthony committed
627

628

BAIRE Anthony's avatar
BAIRE Anthony committed
629
    @asyncio.coroutine
BAIRE Anthony's avatar
BAIRE Anthony committed
630
    def _manage_commit(self, webapp, versions, *, force=False):
BAIRE Anthony's avatar
BAIRE Anthony committed
631
632
633
634
635
636
637
638
        """Manage sandbox commit (if needed) and notify the image manager

        The commit is performed if one of these conditions is fulfilled:
        - a commit was requested (there is at least one WebappVersion entry
          with state=sandbox for this app)
        - a docker container exists (for this sandbox) and force is true
        """

BAIRE Anthony's avatar
BAIRE Anthony committed
639
        if not (versions or (force and self.inspect_sandbox(webapp) is not None)):
BAIRE Anthony's avatar
BAIRE Anthony committed
640
641
642
            return
        
        # do the commit
BAIRE Anthony's avatar
BAIRE Anthony committed
643
        version, error = yield from self.run_in_executor(self._commit, webapp, versions)
BAIRE Anthony's avatar
BAIRE Anthony committed
644
645

        # trigger push/pull operations (but do not wait)
BAIRE Anthony's avatar
BAIRE Anthony committed
646
        if version is not None:
647
648

            # push to the registry
BAIRE Anthony's avatar
BAIRE Anthony committed
649
            self.ctrl.image_manager.push(version.id)
650
651
652
653

            if not error:
                # preemptive pull to the swarm
                # (the image may be needed soon)
BAIRE Anthony's avatar
BAIRE Anthony committed
654
                self.ctrl.image_manager.pull(version.id, swarm=True)
655
656


BAIRE Anthony's avatar
BAIRE Anthony committed
657
658
    def _stop(self, webapp):
        """Stop a webapp sandbox
659

BAIRE Anthony's avatar
BAIRE Anthony committed
660
        (to be executed in a thread pool)
661
662
663
        """
        try:
            # FIXME: remove volumes (v=True) too ?
BAIRE Anthony's avatar
BAIRE Anthony committed
664
            self.ctrl.sandbox.remove_container(webapp.sandbox_name, force=True)
665
666
667
        except docker.errors.NotFound:
            pass

668
669

    @asyncio.coroutine
670
671
672
673
    def _process(self, webapp_id, reset):
        ctrl = self.ctrl

        log.debug("process sandbox %d", webapp_id)
BAIRE Anthony's avatar
BAIRE Anthony committed
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692

        ses = ctrl.session
        with ses.begin():
            # current state of the sandbox + load docker os
            webapp = ses.query(Webapp).filter_by(id=webapp_id).one()
            webapp.docker_os

            # version to be started
            sandbox_version = self.filter_sandbox_version(ses.query(WebappVersion), webapp_id).first()

            # requested commits
            commit_versions = self.filter_commit_version(ses.query(WebappVersion), webapp_id).all()

            ses.expunge_all()

        # docker name of the sandbox & image
        webapp.sandbox_name = ctrl.gen_sandbox_name(webapp)
        webapp.image_name   = ctrl.gen_image_name(webapp)

BAIRE Anthony's avatar
BAIRE Anthony committed
693
        phase = "inspect"
694
        next_state = fail_state = None
695
        try:
696
            if webapp.sandbox_state == SandboxState.starting:
697
                # start the sandbox
BAIRE Anthony's avatar
BAIRE Anthony committed
698
                phase = "start"
699
700
                next_state = SandboxState.running
                fail_state = SandboxState.start_error
701

BAIRE Anthony's avatar
BAIRE Anthony committed
702
                # commit (if a sandbox exists)
BAIRE Anthony's avatar
BAIRE Anthony committed
703
                yield from self._manage_commit(webapp, commit_versions, force=True)
BAIRE Anthony's avatar
BAIRE Anthony committed
704
705

                # pull requested image
BAIRE Anthony's avatar
BAIRE Anthony committed
706
707
                if sandbox_version is not None:
                    yield from ctrl.image_manager.pull(sandbox_version.id)
BAIRE Anthony's avatar
BAIRE Anthony committed
708
709

                # start sandbox
BAIRE Anthony's avatar
BAIRE Anthony committed
710
                yield from self.run_in_executor(self._start, webapp, sandbox_version)
711

712
            elif webapp.sandbox_state == SandboxState.stopping:
713
                # stop the sandbox
BAIRE Anthony's avatar
BAIRE Anthony committed
714
                phase = "stop"
715
716
717
                next_state = SandboxState.idle
                fail_state = SandboxState.stop_error

BAIRE Anthony's avatar
BAIRE Anthony committed
718
                # commit (if requested)
BAIRE Anthony's avatar
BAIRE Anthony committed
719
                yield from self._manage_commit(webapp, commit_versions)
720

BAIRE Anthony's avatar
BAIRE Anthony committed
721
                yield from self.run_in_executor(self._stop, webapp)
722
723

            else:
BAIRE Anthony's avatar
BAIRE Anthony committed
724
725
                # commit (if requested)
                phase = "commit"
BAIRE Anthony's avatar
BAIRE Anthony committed
726
                yield from self._manage_commit(webapp, commit_versions)
BAIRE Anthony's avatar
BAIRE Anthony committed
727

728
729
730
731
        except ShuttingDown:
            next_state = None
            log.info("sandbox %r %s aborted (controller shutdown)", webapp.docker_name, phase)

732
733
734
735
        except BaseException as e:
            next_state = fail_state

            log_func = log.error if isinstance(e, (docker.errors.APIError, Error)) else log.exception
BAIRE Anthony's avatar
BAIRE Anthony committed
736
            log_func ("sandbox %r %s error (%s)", webapp.docker_name, phase,
737
738
739
740
741
742
743
744
                    traceback.format_exception_only(type(e), e)[-1].strip())

        finally:
            if next_state is not None:
                # atomically update the sandbox state in the db
                # (in case another action is requested during the process, eg: the user
                #  stops the sandbox while it is not fully started)
                log.info("sandbox %r is now in state %r", webapp.docker_name, next_state.name)
BAIRE Anthony's avatar
BAIRE Anthony committed
745
746
747
                with ses.begin():
                    ses.execute("UPDATE webapps SET sandbox_state=%d WHERE id=%d AND sandbox_state=%d" %
                            (next_state, webapp_id, webapp.sandbox_state))
748

749
750
751
752
            log.debug("done    sandbox %d", webapp_id)



BAIRE Anthony's avatar
BAIRE Anthony committed
753
class JobManager(Manager):
754
755
    def __init__(self, ctrl, nb_jobs=NB_JOB_TASKS):
        super().__init__(nb_jobs)
BAIRE Anthony's avatar
BAIRE Anthony committed
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
        self.ctrl = ctrl

    def _run_job(self, job_id, ver_id):
        ctrl = self.ctrl
        ses  = ctrl.session
        ctr = tmp_img = start_time = None

        try:
            with ses.begin():
                job = ses.query(Job).filter_by(id=job_id).one()
                webapp = job.webapp
                if ver_id is None:
                    tag = "sandbox"
                else:
                    ver = ses.query(WebappVersion).filter_by(id=ver_id).one()
                    tag = ver.number

                log.info("start job %d (%s:%s)",
                        job_id, webapp.docker_name, tag)

                start_time = time.time()
                job.state = int(JobState.RUNNING)

            
            repo = ctrl.gen_image_name(webapp)
            image = "%s:%s" % (repo, tag)

            job_path = ctrl.gen_job_path(job)
            log.debug("job.path: %r", job_path)

            if ver_id is None:
                client = ctrl.sandbox
                image = tmp_img = client.commit(ctrl.gen_sandbox_name(webapp), repo, tag)["Id"]
            else:
                client = ctrl.swarm
            
            # TODO use another workdir
            # TODO use another uid

            ctrl.check_host_path("isdir", job_path)
            ctr = client.create_container(image, name=ctrl.gen_job_name(job),
                    working_dir = "/tmp",   # TODO use another dir
                    # TODO parse quotes in job.param (instead of a rough .split())
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
                    #
                    # NOTE: the command line is a little complex, but this is
                    #   to ensure that:
                    #   - no output is lost (we go though a pipe in case the
                    #     app has multiple processes writing to stdout/stderr
                    #     concurrently)
                    #   - we get the exit code of the app (not the exit code of
                    #     cat)
                    #   - we are failsafe (if fifo creation fails then the app
                    #     is run anyway, with the exit code of cat)
                    #   - we have no unusual dependencies (only sh, cat and
                    #     mkfifo)
                   command = ["/bin/sh", "-c", """
                                fifo=/tmp/.allgo.fifo
                                if mkfifo "$fifo"
                                then
                                    exec cat <"$fifo" >allgo.log &
                                    exec "$@" >"$fifo" 2>&1 &
                                    rm "$fifo"
                                    wait %1
                                    wait %2
                                else
                                    "$@" 2>&1 | cat >allgo.log
                                fi
                        """,
BAIRE Anthony's avatar
BAIRE Anthony committed
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
                        "job%d" % job.id, webapp.entrypoint] + job.param.split(),

                    host_config = ctrl.sandbox.create_host_config(
                        binds = {job_path: {"bind": "/tmp"}},
                        cap_drop = ["all"],
                        # FIXME: CAP_DAC_OVERRIDE needed because all nfs files have uid,gid=1000,1000
                        cap_add = ["dac_override"],
                    ))
            client.start(ctr)
            # TODO report exit code to the user
            client.wait(ctr)

        finally:
            # FIXME: if interrupted (power down) here, the job will remain in
            #        state RUNNING forever ==> do some periodic monitoring

            stop_time = time.time()

            log.info("stop  job %d (%s:%s)",
                        job_id, webapp.docker_name, tag)

            with ses.begin():
                job.state = int(JobState.DONE)

                if start_time is not None:
                    job.exec_time    = int(stop_time - start_time)
                    webapp.exec_time = (webapp.exec_time + job.exec_time) // 2


            if ctr is not None:
                with docker_warning("cleanup error: unable to remove container %r", ctr,
                        ignore=docker.errors.NotFound):

                    client.remove_container(ctr, force=True)

            if tmp_img is not None:
                with docker_warning("cleanup error: unable to remove image %r", ctr,
                        ignore=docker.errors.NotFound):

                    client.remove_image(tmp_img,)

865
    @asyncio.coroutine
BAIRE Anthony's avatar
BAIRE Anthony committed
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
    def _process(self, job_id, reset):
        ctrl = self.ctrl
        ses  = ctrl.session
        log.debug("process job id %d", job_id)
        

        with ses.begin():
            # query db
            job = ses.query(Job).filter_by(id=job_id).first()
            if job is None or job.state != JobState.WAITING:
                return

            if job.webapp is None:
                log.error("job %d: webapp id %r not found", job_id, job.webapp_id)

                job.state = JobState.DONE
                # TODO report error to the user
                return

            # select version
            if job.version == "sandbox":
                ver_id = None
            else:
                #TODO: replace version_id with webapp_version_id
                ver = ses.query(WebappVersion).filter_by(
                        webapp_id = job.webapp_id,
                        number    = job.version).filter(
                            WebappVersion.state != VersionState.error
                        ).order_by(WebappVersion.id.desc()).first()
                if ver is None:
                    log.error("job %d: webapp %r version %r not found",
                            job_id, job.webapp.docker_name, job.version)

                    job.state = int(JobState.DONE)
                    # TODO report error to the user
                    return
                ver_id = ver.id

        if ver_id is not None:
            # pull image to the swarm
            # FIXME: race condition: will fail if ver.state==sandbox
            yield from ctrl.image_manager.pull(ver_id, swarm=True)

        # run job
        yield from self.run_in_executor(self._run_job, job_id, ver_id)

912
913

class PullManager(Manager):
914
    def __init__(self, nb_threads, client, name):
915
916
        super().__init__(nb_threads)
        self.client    = client
917
918
919
920
921
922
923
        self.name      = name

    def _process(self, img, reset):
        image, version = img
        log.info("pull to the %-10s %s:%s", self.name, image, version)
        return self.run_in_executor(self.client.pull, image, version)

924
925

class PushManager(Manager):
BAIRE Anthony's avatar
BAIRE Anthony committed
926
    def __init__(self, nb_threads, ctrl):
927
        super().__init__(nb_threads)
928
        self.ctrl = ctrl
BAIRE Anthony's avatar
BAIRE Anthony committed
929

BAIRE Anthony's avatar
BAIRE Anthony committed
930
931
    def _process(self, version_id, reset):
        ses = self.ctrl.session
932

BAIRE Anthony's avatar
BAIRE Anthony committed
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
        with report_error("unable to push version id %d", version_id):

            with ses.begin():
                # get the version object and check its state
                version = ses.query(WebappVersion).filter_by(id=version_id).one()
                if version.state != VersionState.committed:
                    if version.state == VersionState.ready:
                        # already pushed
                        return
                    if version.state == VersionState.sandbox:
                        raise Error("unable to push (image not yet committed)")
                    raise Error("unable to push (invalid state: %s)" % version.state)

                # ensure that there is no other version with the same number in the pipeline
                # (to avoid a race condition)
                others = (ses.query(WebappVersion.id)
                        .filter_by(webapp_id=version.webapp_id, number=version.number)
                        .filter(WebappVersion.id     != version.id)
                        .filter(WebappVersion.state.in_((int(VersionState.sandbox), int(VersionState.committed)))))
                if others.count():
                    raise Error("unable to push (there are other pushable versions with the same number: %s)" % (
                        " ".join(map(str, itertools.chain(*others)))))

                image = self.ctrl.gen_image_name(version.webapp)
                tag   = version.number

BAIRE Anthony's avatar
BAIRE Anthony committed
959
            log.info("push from the %-8s %s:%s", "sandbox", image, tag)
BAIRE Anthony's avatar
BAIRE Anthony committed
960
            yield from self.run_in_executor(docker_check_error, self.ctrl.sandbox.push, image, tag)
BAIRE Anthony's avatar
BAIRE Anthony committed
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978

            with ses.begin():
                prev = ses.query(WebappVersion).filter_by(
                        webapp_id = version.webapp_id,
                        number    = version.number,
                        state     = int(VersionState.ready)).scalar()
                log.debug("prev version id %r", (prev.id if prev else None))

                if prev is None:
                    # this is a new version
                    version.state = VersionState.ready
                else:
                    # overwrite an existing version
                    for key in "updated_at", "changelog", "published":
                        setattr(prev, key, getattr(version, key))

                    ses.delete(version)

979

BAIRE Anthony's avatar
BAIRE Anthony committed
980
981
class ImageManager:

982
983
984
985
986
    def __init__(self, ctrl,
            nb_push_sandbox = NB_PUSH_SANDBOX,
            nb_pull_sandbox = NB_PULL_SANDBOX,
            nb_pull_swarm   = NB_PULL_SWARM):

BAIRE Anthony's avatar
BAIRE Anthony committed
987
988
989
990
991
992
993
        self.ctrl = ctrl

        self.sandbox_push_manager = PushManager(nb_push_sandbox, ctrl)
        self.sandbox_pull_manager = PullManager(nb_pull_sandbox, ctrl.sandbox, "sandbox")
        self.swarm_pull_manager   = PullManager(nb_pull_swarm,   ctrl.swarm,   "swarm")

    # return a future
994
    @auto_create_task
BAIRE Anthony's avatar
BAIRE Anthony committed
995
996
997
998
    @asyncio.coroutine
    def pull(self, version_id: int, *, swarm=False):
        with report_error("unable to pull version id %d to %s", version_id,
                ("swarm" if swarm else "sandbox")):
BAIRE Anthony's avatar
BAIRE Anthony committed
999
1000
1001
1002
1003
1004
1005
1006
1007

            ses = self.ctrl.session
            with ses.begin():
                # get the version object and check its state
                version = ses.query(WebappVersion).filter_by(id=version_id).one()

                image = self.ctrl.gen_image_name(version.webapp)
                tag   = version.number

BAIRE Anthony's avatar
BAIRE Anthony committed
1008
1009
1010
1011

            if swarm:
                # pull to the swarm

BAIRE Anthony's avatar
BAIRE Anthony committed
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
                if version.state == VersionState.committed:
                    # must be pushed to the registry first
                    yield from self.push(version_id)
                    with ses.begin():
                        version = ses.query(WebappVersion).filter_by(id=version_id).one()

                if version.state != VersionState.ready:
                    raise Error("bad version state: %s" % version.state)

                yield from self.swarm_pull_manager.process((image, tag))
BAIRE Anthony's avatar
BAIRE Anthony committed
1022
1023

            else:
BAIRE Anthony's avatar
BAIRE Anthony committed
1024
1025
1026
1027
                # pull to the sandbox
                if version.state == VersionState.committed:
                    # do not pull!
                    return
BAIRE Anthony's avatar
BAIRE Anthony committed
1028

BAIRE Anthony's avatar
BAIRE Anthony committed
1029
1030
1031
1032
                if version.state != VersionState.ready:
                    raise Error("bad version state: %s" % version.state)

                yield from self.sandbox_pull_manager.process((image, tag))
BAIRE Anthony's avatar
BAIRE Anthony committed
1033
1034
1035
1036
1037
1038


    # return a future
    def push(self, version_id: int):
        return self.sandbox_push_manager.process(version_id)

1039
1040
1041
1042
1043
1044
1045
1046
1047
    @asyncio.coroutine
    def shutdown(self, **kw):
        yield from asyncio.gather(
                self.sandbox_pull_manager.shutdown(),
                self.sandbox_push_manager.shutdown(),
                self.swarm_pull_manager.shutdown(),
                return_exceptions=True)


BAIRE Anthony's avatar
BAIRE Anthony committed
1048

1049
1050
class DockerController:
    def __init__(self, sandbox_host, swarm_host, mysql_host,
1051
1052
            port, registry, env, datastore_path, sandbox_path,
            max_jobs):
1053
1054
1055
1056

        self.sandbox = docker.Client(sandbox_host)
        self.swarm   = self.sandbox if sandbox_host == swarm_host else docker.Client(swarm_host)

BAIRE Anthony's avatar
BAIRE Anthony committed
1057
1058
        self._db_sessionmaker = sqlalchemy.orm.scoping.scoped_session(
                connect_db(mysql_host))
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
        self._thread_local = threading.local()

        self.sock = socket.socket()
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.sock.bind(("0.0.0.0", port))
        self.sock.listen(32)
        self.sock.setblocking(False)

        self.image_manager   = ImageManager(self)
        self.sandbox_manager = SandboxManager(self)
1069
        self.job_manager     = JobManager(self, max_jobs)
1070
1071
1072
1073
1074
1075

        self.registry       = registry
        self.env            = env
        self.datastore_path = datastore_path
        self.sandbox_path   = sandbox_path

1076
1077
        self._task               = None
        self._shutdown_requested = None
1078

BAIRE Anthony's avatar
BAIRE Anthony committed
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
        img = "busybox:latest"
        try:
            self.sandbox.inspect_image(img)
        except docker.errors.NotFound:
            log.info("pulling docker image %s", img)
            self.sandbox.pull(img)

    def gen_sandbox_name(self, webapp):
        return "%s-sandbox-%s" % (self.env, webapp.docker_name)

    def gen_image_name(self, webapp):
BAIRE Anthony's avatar
BAIRE Anthony committed
1090
        return "%s/webapp/%s" % (self.registry, webapp.docker_name)
BAIRE Anthony's avatar
BAIRE Anthony committed
1091

BAIRE Anthony's avatar
BAIRE Anthony committed
1092
1093
1094
1095
1096
1097
1098
1099
1100
    def gen_job_name(self, job):
        return "%s-job-%d-%s" % (self.env, job.id, job.webapp.docker_name)

    def gen_job_path(self, job):
        assert self.datastore_path.endswith("/")
        return "%s%d/%d/%s" % (
                self.datastore_path, job.user_id,
                job.webapp_id, job.access_token)

BAIRE Anthony's avatar
BAIRE Anthony committed
1101

1102
    def check_host_path(self, funcname, path, *, nolink=True):
BAIRE Anthony's avatar
BAIRE Anthony committed
1103
1104
1105
1106
1107
1108
1109
1110
1111
        """Validate a path before using it as an external volume in docker

        - ensure the path is canonical:
            - it is absolute
            - it does not contain any '..' parts
            - it does not contain any symbolic link
        - ensure the path is of the right type (call os.path.`funcname`())
        """

1112
1113
        if not os.path.isabs(path):
            raise Error("host path %r is not absolute" % path)
BAIRE Anthony's avatar
BAIRE Anthony committed
1114
1115
1116
1117

        if path != os.path.normpath(path):
            raise Error("host path %r is not canonical (os.path.normpath())")

1118
1119
        ctrpath = "/vol/host" + path
        log.debug("ctrpath %r", ctrpath)
BAIRE Anthony's avatar
BAIRE Anthony committed
1120

1121
1122
1123
        
        func = getattr(os.path, funcname)
        if nolink and os.path.realpath(ctrpath) != os.path.normpath(ctrpath):
BAIRE Anthony's avatar
BAIRE Anthony committed
1124
            raise Error("host path %r contains a symbolic link" % path)
1125
1126
1127
1128
1129
1130
        
        if not func(ctrpath):
            raise Error("host path %r not found (os.path.%s())" % (path, funcname))


    @property
BAIRE Anthony's avatar
BAIRE Anthony committed
1131
1132
    def session(self):
        """Return the thread-local sqlalchemy session
1133
1134
1135
1136
        
        WARNING: in async functions, db transaction must not span over any
                 'yield from' statements (to avoid interleaving)
        """
BAIRE Anthony's avatar
BAIRE Anthony committed
1137
        return self._db_sessionmaker()
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150


    def _sock_callback(self):
        try:
            while True:
                self.sock.accept()[0].close()
        except BlockingIOError:
            pass

        self.check_db()

    def check_db(self):
        log.debug("check_db")
BAIRE Anthony's avatar
BAIRE Anthony committed
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
        try:
            ses = self.session
            with ses.begin():
                for webapp_id, in ses.execute("""SELECT webapps.id FROM webapps
                        LEFT JOIN webapp_versions ON webapps.id=webapp_versions.webapp_id
                        WHERE sandbox_state IN (%d,%d) OR state=%d
                        GROUP BY webapps.id""" % (
                        SandboxState.starting, SandboxState.stopping, VersionState.sandbox)).fetchall():

                    self.sandbox_manager.process(webapp_id)

                for job_id, in ses.query(Job.id).filter_by(state = JobState.WAITING.value):
                    log.debug("schedule job %d", job_id)
                    self.job_manager.process(job_id)
BAIRE Anthony's avatar
todo    
BAIRE Anthony committed
1165
1166

                # TODO schedule the push of images versions in state 'committed'
BAIRE Anthony's avatar
BAIRE Anthony committed
1167
        except sqlalchemy.exc.OperationalError as e:
1168
1169
            log.error("db error %s", e.orig.args)
            return
BAIRE Anthony's avatar
BAIRE Anthony committed
1170

1171
    def shutdown(self):
1172
1173
1174
        if not self._shutdown_requested.done():
            self._shutdown_requested.set_result(None)
        return self._task
1175

1176
1177
1178
1179
1180
1181

    @asyncio.coroutine
    def _run(self):
        assert self._shutdown_requested is None

        try:
1182
            loop = asyncio.get_event_loop()
1183
1184
1185

            self._shutdown_requested = asyncio.Future()

1186
1187
            loop.add_signal_handler(signal.SIGTERM, self.shutdown)
            loop.add_signal_handler(signal.SIGINT,  self.shutdown)
BAIRE Anthony's avatar
BAIRE Anthony committed
1188
            loop.add_reader(self.sock, self._sock_callback)
1189

1190
            self.check_db()
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
            try:
                yield from self._shutdown_requested
            finally:
                # graceful shutdown

                # close socket (to disable incoming notifications)
                self.sock.close()

                # terminate all pending tasks
                yield from asyncio.gather(
                        self.image_manager.shutdown(),
                        self.sandbox_manager.shutdown(),
                        self.job_manager.shutdown(),
                        return_exceptions=True)
1205
1206
1207
1208
        finally:
            self.sock.close()
            self.sandbox.close()
            self.swarm.close()
1209

1210
1211
    def run(self):
        asyncio.get_event_loop().run_until_complete(self._run())