controller.py 56.5 KB
Newer Older
1
2
3
4
#!/usr/bin/python3


import asyncio
5
import collections
6
from   concurrent.futures   import ThreadPoolExecutor
7
import contextlib
8
import datetime
9
import enum
BAIRE Anthony's avatar
BAIRE Anthony committed
10
import itertools
BAIRE Anthony's avatar
BAIRE Anthony committed
11
import json
12
13
14
import logging
import re
import os
15
import shlex
16
17
18
import signal
import socket
import sys
19
20
21
import time
import threading
import traceback
22
23
24

import docker
import MySQLdb
BAIRE Anthony's avatar
BAIRE Anthony committed
25
26
from   sqlalchemy import desc
import sqlalchemy.orm.scoping
27

28
29
from database import *

30
31
HOST_PATH="/vol/host/"

32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
##################################################
# default number of concurrent tasks

# pushing images from the sandbox to the registry
NB_PUSH_SANDBOX = 2

# pulling images from the registry to the sandbox
NB_PULL_SANDBOX = 2

# pulling images from the registry to the swarm
NB_PULL_SWARM   = 2

# sandbox actions (start, stop, commit, ...)
NB_SANDBOX_TASKS = 4

# jobs
NB_JOB_TASKS     = 4


# default thread pool (used by most of the tasks)
default_executor = ThreadPoolExecutor(10)

##################################################

BAIRE Anthony's avatar
BAIRE Anthony committed
56
log = logging.getLogger("controller")
57

58
59
60
61
62
assert MySQLdb.threadsafety >= 1


class Error(Exception):
    pass
63

64
65
66
class ShuttingDown(Exception):
    pass

BAIRE Anthony's avatar
BAIRE Anthony committed
67
68
69
70
71
72
73
74
75
76
77
78
79
def docker_check_error(func, *k, **kw):
    """Wrapper for docker-py methods that produce a stream

    Methods producing a stram (eg: push, build) do not report all errors
    by raising exceptions. Some errors are reported later in the stream.

    This function parses the stream and raise Error() if needed.
    """
    for elem in func(*k, stream=True, **kw):
        js = json.loads(elem.decode())
        if "error" in js:
            raise Error("push error: " + js["error"])

BAIRE Anthony's avatar
BAIRE Anthony committed
80
81
82
83
84
85
86
87
88
@contextlib.contextmanager
def docker_warning(msg, *k, ignore=None):
    """Catch docker errors and issue a warning instead"""
    try:
        yield
    except docker.errors.APIError as e:
        if ignore is None or not isinstance(e, ignore):
            k += e,
            log.warning(msg + " (%s)", *k)
BAIRE Anthony's avatar
BAIRE Anthony committed
89

BAIRE Anthony's avatar
BAIRE Anthony committed
90
91
@contextlib.contextmanager
def report_error(fmt, *k):
BAIRE Anthony's avatar
doc    
BAIRE Anthony committed
92
93
94
95
96
97
98
    """Context manager for logging exceptions

    This function logs exceptions (when leaving the context) with log.error()
    (if the exception inherit from Error) or log.exception() otherwise.

    The log message is prepended with the string generated by: fmt % k
    """
BAIRE Anthony's avatar
BAIRE Anthony committed
99
100
101
102
103
104
105
106
    try:
        yield
    except Exception as e:
        msg = fmt % k
        log_func = log.error if isinstance(e, Error) else log.exception
        log_func("%s (%s)", msg,
                traceback.format_exception_only(type(e), e)[-1].strip())
        raise
107

108
109
110
111
112
113
114
115
116
117
118
119
120
121

def rate_limit(period):
    """Generator for rate limiting
    
    This function ensures we spend at least `period` seconds for each iteration
    """
    assert period > 0

    t0 = time.monotonic()
    while True:
        yield
        t1 = time.monotonic()
        delay = t0 - t1 + period
        if delay > 0:
BAIRE Anthony's avatar
todo    
BAIRE Anthony committed
122
            log.debug("rate_limit: sleep %f seconds", delay)
123
124
125
126
127
            time.sleep(delay)
            t0 = t1 + delay
        else:
            t0 = t1

128
129
130
131
132
133
134
135
136
def auto_create_task(func):
    """Decorator for forcing the creation of a task when a coroutine function is called

    Return a wrappers that calls the function, create the task on the fly and
    return it. Also it installs a no-op callback to avoid warnings in case the
    result is not used.
    """
    assert asyncio.iscoroutinefunction(func)

BAIRE Anthony's avatar
BAIRE Anthony committed
137
    def wrapper(*k, **kw):
138
        tsk = asyncio.async(func(*k, **kw))
BAIRE Anthony's avatar
BAIRE Anthony committed
139
        # ignore warning about result not used
140
141
        tsk.add_done_callback(lambda f: f.exception())
        return tsk
BAIRE Anthony's avatar
BAIRE Anthony committed
142
143
    return wrapper

144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
def cascade_future(src, dst):
    """propagate the result of a future to another future
    
    This function installs a callback to the future `src`, that propagates its
    result to the future `dst`.
    """
    def callback(fut):
        ex = fut.exception()
        if ex is None:
            dst.set_result(fut.result())
        else:
            dst.set_exception(ex)

    if src.done():
        callback(src)
    else:
        src.add_done_callback(callback)

BAIRE Anthony's avatar
todo    
BAIRE Anthony committed
162

163
class Manager:
BAIRE Anthony's avatar
BAIRE Anthony committed
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
    """A class for scheduling asynchronous jobs on a collection of keys

    (the one-line summary is not very helpful, sorry)
    

    Job submission

    The job is scheduled on key KEY by calling .process(KEY), the function
    returns an asyncio.Future object (which will receive the result)

    If .process(KEY) is called while a job is already running for KEY, then the
    manager arranges the job to be run a second time in a row for the same key


    Job implementation

    This is an abstract class. The job shall be implemented as the coroutine
    named ._process() in the inherited class.

    The manager guarantees that ._process() cannot be called multiple times
    concurrently for the same key

    In case a shutdown is requested (see .shutdown()):
     - all jobs that are not yet started are cancelled
     - running jobs continue until they return or until they try to acquire the
       internal semaphore (which raises ShuttingDown())


    Concurrency

    The job tasks are all started immediately (whatever the number of existing
    jobs)

    It is possible to limit the concurrency to a maximum number of parallel
    tasks. Manager provides an internal semaphore (the number of tokens is set
    in .__init__())

    The semaphore can be locked in two ways:
     - by locking the manager:

            with (yield from self): ... concurrency limited to `nb_tokens`
            tasks ...

     - by calling .run_in_executor (this is for running non-async code):

            yield from self.run_in_executor(func, args...)

    Shutdown

    When .shutdown() is called the Manager ensures that all jobs are properly
    terminated
     - it cancels all jobs that are not yet started
     - it prevents starting new jobs
     - it lets running pending jobs, but interrupts them if they when ther try
       to acquire the internal semaphore

    All cancel/interrupted tasks have their future raise ShuttingDown().

    .shutdown() returns after all pending jobs are terminated.


    Thread safety

    - Manager is *not* thread safe, all public methods must be called from the
      same thread
    """
230

231
    class _Handle:
BAIRE Anthony's avatar
BAIRE Anthony committed
232
        __slots__ = "key", "cur", "nxt"
233

234
    def __init__(self, nb_tokens=1, *, executor = default_executor, interruptible = False):
BAIRE Anthony's avatar
BAIRE Anthony committed
235
        # {key: _TaskHandler}
236
        self._handles = {}
237
        self._semaphore = asyncio.Semaphore(nb_tokens)
238
        self._shutdown = asyncio.Future()
239
        self._executor = executor
240
        self._interruptible = interruptible
241
242
243
244
245
246
247
248
249
250

    def _create_task(self, hnd):
        assert hnd.nxt is None
        def reset():
            assert not hnd.cur.done()
            nxt = hnd.nxt
            if nxt is not None:
                cascade_future(hnd.cur, nxt)
                hnd.nxt = None

BAIRE Anthony's avatar
BAIRE Anthony committed
251
        hnd.cur = asyncio.async(self._process(hnd.key, reset))
252
        hnd.cur.add_done_callback(lambda fut: self._done(hnd))
BAIRE Anthony's avatar
BAIRE Anthony committed
253
        log.debug("task scheduled %r %r", self, hnd.key)
254
        return hnd.cur
255

BAIRE Anthony's avatar
BAIRE Anthony committed
256
257
258
259
260
    def process(self, key):
        """Schedule the job to be run on key `key`

        returns an asyncio.Future (that will provide the result of ._process())
        """
261
        if self._shutdown.done():
262
263
            return self._shutdown

BAIRE Anthony's avatar
BAIRE Anthony committed
264
        hnd = self._handles.get(key)
265
266
        if hnd is None:
            # create handle
BAIRE Anthony's avatar
BAIRE Anthony committed
267
268
            self._handles[key] = hnd = self._Handle()
            hnd.key = key
269
270
271
272
273
274
275
276
277
278
279
280
281
282
            hnd.cur = None
            hnd.nxt = None

        if hnd.cur is None:
            # create task
            return self._create_task(hnd)
        else:
            # reschedule task
            if hnd.nxt is None:
                hnd.nxt = asyncio.Future()
            return hnd.nxt

    def _done(self, hnd):

BAIRE Anthony's avatar
BAIRE Anthony committed
283
        assert hnd is self._handles.get(hnd.key)
284
285
286
287
        assert hnd.cur.done()

        try:
            hnd.cur.result()
288
289
        except ShuttingDown:
            pass
290
        except Exception:
BAIRE Anthony's avatar
BAIRE Anthony committed
291
            log.exception("task %r %r unhandled exception", self, hnd.key)
292
293
294
295

        nxt     = hnd.nxt
        hnd.cur = hnd.nxt = None
        if nxt is None:
BAIRE Anthony's avatar
BAIRE Anthony committed
296
            del self._handles[hnd.key]
297
        else:
298
            cascade_future(self._create_task(hnd), nxt)
299

300

301
302
    @asyncio.coroutine
    def __iter__(self):
BAIRE Anthony's avatar
BAIRE Anthony committed
303
304
305
306
307
308
309
310
311
312
        """Coroutine for locking the internal semaphore
        
        Usage:
            with (yield from manager):
              ...

        Warning:
            after a shutdown is initiated, this function will always raise
            ShuttingDown()
        """
313
        ctx = yield from iter(self._semaphore)
314
        if self._shutdown.done():
315
316
317
318
319
320
            with ctx:
                raise ShuttingDown()
        return ctx


    @asyncio.coroutine
321
    def run_in_executor(self, *k, lock=True):
BAIRE Anthony's avatar
BAIRE Anthony committed
322
323
324
325
326
        """Run a function in a separate thread (with limited concurrency)

        This function locks the internal semaphore and runs the provided
        functionc call in a separate thread (using the executor)
        """
327
328
329
330
331
332
333
334

        def run():
            coro = asyncio.get_event_loop().run_in_executor(self._executor, *k)
            if self._interruptible:
                return next(asyncio.as_completed((coro, self._shutdown)))
            else:
                return coro
        
335
336
        if lock:
            with (yield from self):
337
                return (yield from run())
338
        else:
339
            return (yield from run())
340
341


BAIRE Anthony's avatar
BAIRE Anthony committed
342
    @asyncio.coroutine
BAIRE Anthony's avatar
BAIRE Anthony committed
343
    def _process(self, key, reset):
BAIRE Anthony's avatar
doc    
BAIRE Anthony committed
344
345
346
347
348
349
350
351
352
353
354
        """Actual implementation of the job (to be reimplemented in inherited classes)

        The Manager class guarantees that this function cannot be called
        multiple times concurrently on the same key (in case the same key is
        submitted multiple times, they Manager will call this function a second
        time after it has terminated).

        `reset` is a function that may be called to reset the 'dirty' state of
        this key (this is to avoid calling ._process() a second time if not
        necessary)
        """
355
356
        raise NotImplementedError()

357
    @asyncio.coroutine
358
    def shutdown(self):
BAIRE Anthony's avatar
BAIRE Anthony committed
359
360
361
362
        """Initiate a graceful shutdown

        This coroutine terminates once all tasks are properly terminated.
        """
363
364
        exc = ShuttingDown()
        self._shutdown.set_exception(exc)
365
        self._shutdown.exception()  # to avoid asyncio warnings
366
367
368
        # cancel all 'next' tasks
        for hnd in self._handles.values():
            if hnd.nxt is not None:
369
                hnd.nxt.set_exception(exc)
370
371
                hnd.nxt = None

372
373
374
375
        if not self._interruptible:
            yield from asyncio.gather(
                    *(h.cur for h in self._handles.values() if h.cur is not None),
                    return_exceptions=True)
BAIRE Anthony's avatar
BAIRE Anthony committed
376

377
378

class SandboxManager(Manager):
BAIRE Anthony's avatar
BAIRE Anthony committed
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
    """Manager for sandbox operation

    This manager handles all sandbox operations (start, stop, commit).

    Operations are requested asynchronously in the database:
     - Webapp.sandbox_state=starting (for starting a sandbox)
     - Webapp.sandbox_state=stopping (for stopping a sandbox)
     - WebappVersion with state=sandbox (for committing a new image)

    All state changes on Webapp.sandbox_state are atomic (e.g: if the user
    requests a stop while the sandbox is starting then the manager finishes
    with starting the sandbox but does not update the state, and it runs
    immediately again to stop the sandbox)

    Whatever is the value of Webapp.sandbox_state, the manager first examines
    commit requests and makes the commit if requested.

    If the container already exists before starting the webapp (not possible in
    normal operations), then a recovery image is committed first.

    When a commit is successful. The image manager is notified for pushing the
    image to the registry and (if it is not a recovery image) to pull it to the
    swarm (because this image will very likely be used soon).

    State changes:

     - sandbox start:
            starting->running     (normal case)
            starting->start_error (error case)

     - sandbox stop:
            stopping->idle        (normal case)
            stopping->stop_error  (error case)

     - image commit:
            sandbox->committed  (normal case)
            sandbox->error      (error case)
            (none)->committed   (recovery version) 
    """
418

419
    def __init__(self, ctrl, nb_threads = NB_SANDBOX_TASKS):
420
        super().__init__(nb_threads)
421
422
423
        self.ctrl = ctrl


BAIRE Anthony's avatar
BAIRE Anthony committed
424
425
    def inspect_sandbox(self, webapp):
        try:
BAIRE Anthony's avatar
BAIRE Anthony committed
426
427
            return self.ctrl.sandbox.inspect_container(
                    self.ctrl.gen_sandbox_name(webapp))
BAIRE Anthony's avatar
BAIRE Anthony committed
428
429
430
        except docker.errors.NotFound:
            return None

BAIRE Anthony's avatar
BAIRE Anthony committed
431
432
433
434
435
436

    @staticmethod
    def filter_commit_version(query, webapp_id):
        """Narrow a WebappVersion query to select the candidate versions to be committed"""
        return (query.
                filter_by(webapp_id=webapp_id,
BAIRE Anthony's avatar
BAIRE Anthony committed
437
                         state = int(VersionState.SANDBOX))
BAIRE Anthony's avatar
BAIRE Anthony committed
438
439
440
                    )

    def _start(self, webapp, version):
441
442
443
444
445
446
        """Start a webapp sandbox

        (to be executed in a thread pool)
        """

        ctrl = self.ctrl
BAIRE Anthony's avatar
BAIRE Anthony committed
447
        ses  = ctrl.session
448
449

        # prepare sandbox parameters
BAIRE Anthony's avatar
BAIRE Anthony committed
450
451
452
453
454
455

        # docker image
        if version is None:
            image = "%s/factory/%s:%s" % (self.ctrl.registry,
                    webapp.docker_os.docker_name,
                    webapp.docker_os.version)
456
        else:
BAIRE Anthony's avatar
BAIRE Anthony committed
457
            image = "%s:%s" % (webapp.image_name, version.number)
BAIRE Anthony's avatar
BAIRE Anthony committed
458
459

        log.debug("sandbox %r: using image %r", webapp.docker_name, image)
460
461
462
463
464
465
466
467
468
469
470
471

        # safety checks
        # (because docker_name is used it the paths of the external volumes
        if ("/" in webapp.docker_name) or (webapp.docker_name in ("", ".", "..")):
            raise Error("malformatted docker_name")

        uid = webapp.id + 2000
        if uid < 2000:
            # just for safety
            raise Error("bad webapp id")

        # remove stale container (if any)
472
473
        if self.inspect_sandbox(webapp) is not None:
            self._stop(webapp)
474

BAIRE Anthony's avatar
BAIRE Anthony committed
475
        container = webapp.sandbox_name
476
477
        try:
            # prepare the sandbox
478
479
480
481
            # (create ssh keys)
            ctrl.check_host_path("isdir", ctrl.toolbox_path)
            ctrl.check_host_path("isdir", ctrl.sandbox_path)

BAIRE Anthony's avatar
BAIRE Anthony committed
482
            ctrl.sandbox.create_container("busybox:latest", name=container,
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
                    command = ["/bin/sh", "-c", """
set -ex

export PATH="$PATH:/.toolbox/bin"

# clean sandbox dir
rm -rf {sbx}

# create dirs
for dir in {sbx} {etc} {run}
do
    mkdir -p            ${{dir}}
    chown {uid}:65534   ${{dir}}
    chmod 0700          ${{dir}}
done

# xauth file
touch               {run}/XAuthority
chown {uid}:65534   {run}/XAuthority
chmod 0600          {run}/XAuthority

# generate ssh keys
(for type in ecdsa ed25519 rsa
do
    key={etc}/ssh_host_${{type}}_key
    [ -f $key ] || ssh-keygen -N '' -f $key -t $type >&2
    
    echo -n '{hostname}. ' | cat - ${{key}}.pub
done) > {etc}/ssh_known_hosts

# known_host file for allgo-shell
ssh-keygen -H -f {etc}/ssh_known_hosts
chmod 0644       {etc}/ssh_known_hosts
rm -f            {etc}/ssh_known_hosts.old

# authentication key for allgo-shell
rm -f               {etc}/identity
ssh-keygen -N '' -f {etc}/identity
chown {uid}:65534   {etc}/identity

# forced shell for the sshd config 
cat > {etc}/shell <<EOF
#!/bin/sh

export PATH="\$PATH:/.toolbox/bin"

uid=\`id -u\`
shell="\`getent passwd \$uid 2>/dev/null| cut -d : -f 7\`"
if [ -z "\$shell" ] ; then
    shell=/bin/sh
fi

if [ -n "\$SSH_ORIGINAL_COMMAND" ] ; then
    exec "\$shell" -c "\$SSH_ORIGINAL_COMMAND"
else
    exec "\$shell"
fi

EOF
chmod 755 {etc}/shell

# sshd config
cat > {etc}/sshd_config <<EOF
Port 22
Protocol 2

# turned off because it requires creating a 'sshd' user inside the sandbox
UsePrivilegeSeparation no

StrictModes no

ForceCommand /.sandbox/etc/ssh/shell

PermitRootLogin without-password

PubkeyAuthentication yes
AuthorizedKeysFile  /.sandbox/etc/ssh/identity.pub .ssh/authorized_keys .ssh/authorized_keys2

ChallengeResponseAuthentication no
PasswordAuthentication          no

X11Forwarding yes
X11DisplayOffset 10
PrintMotd no
PrintLastLog no
TCPKeepAlive yes

# Allow client to pass locale environment variables
AcceptEnv LANG LC_*

Subsystem sftp internal-sftp

UsePAM no
EOF
                    """.format(uid=uid,
                        hostname = "%s-sandbox-%s" % (ctrl.env, webapp.docker_name),
                        sbx = "/mnt/%s"         % webapp.docker_name,
                        etc = "/mnt/%s/etc/ssh" % webapp.docker_name,
                        run = "/mnt/%s/run"     % webapp.docker_name,
                        )],
583
                    host_config = ctrl.sandbox.create_host_config(
584
585
586
587
                        binds   = {
                            ctrl.sandbox_path: {"bind": "/mnt"},
                            ctrl.toolbox_path: {"bind": "/.toolbox", "mode": "ro"},
                    }))
588
589
            ctrl.sandbox.start(container)
            if ctrl.sandbox.wait(container):
590
591
                log.debug("sandbox %s output:\n%s", webapp.docker_name,
                        ctrl.sandbox.logs(container).decode(errors="replace"))
592
593
594
595
                raise Error("sandbox preparation failed")
            ctrl.sandbox.remove_container(container)

            # create and start the sandbox
596
597
598
599
600

            etc_dir = os.path.join(ctrl.sandbox_path, webapp.docker_name, "etc")
            run_dir = os.path.join(ctrl.sandbox_path, webapp.docker_name, "run")
            ctrl.check_host_path("isdir", etc_dir)
            ctrl.check_host_path("isdir", run_dir)
601
602
603
604

            if version is None and webapp.entrypoint:
                # prepend instructions to initialise a dummy entrypoint
                dn, bn = os.path.split(webapp.entrypoint)
BAIRE Anthony's avatar
todo    
BAIRE Anthony committed
605
                # FIXME: do nothing if entrypoint already exists
606
                prepare = """
607
                    {mkdir}
608
                    test -f {entrypoint} || cat > {entrypoint} <<EOF
609
610
611
612
613
614
615
#!/bin/sh
echo
echo "This is app '{name}' called with parameters '\$@'"
echo
echo "The workdir contains:"
ls -l
EOF
616
                    chmod 0755 -- {entrypoint}
617

618
                """.format( entrypoint  = shlex.quote(webapp.entrypoint),
619
                            name        = webapp.docker_name,
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
                            mkdir       = (("mkdir -p -- %s" % shlex.quote(dn)) if dn else ""))
            else:
                prepare = ""

            command = ["/bin/sh", "-c", """
set -x
export PATH="$PATH:/.toolbox/bin"

{prepare}

# xauth file (needed for X11 forwarding)
touch       /root/.Xauthority
chmod 600   /root/.Xauthority

exec /.toolbox/bin/sshd -D
            """.format(prepare=prepare)]
636

637
            ctrl.sandbox.create_container(image, name=container, hostname=container,
638
                    command = command,
639
640
                    host_config = ctrl.sandbox.create_host_config(
                        binds = {
641
642
643
                            etc_dir: {"bind": "/.sandbox/etc", "mode": "ro"},
                            run_dir: {"bind": "/.sandbox/run", "mode": "rw"},
                            ctrl.toolbox_path: {"bind": "/.toolbox", "mode": "ro"},
644
645
646
                            },
                        # TODO: maybe drop other caps
                        cap_drop = ["NET_RAW"],
647
                        restart_policy = {"Name": "unless-stopped"},
648
                        network_mode = ctrl.sandbox_network,
649
650
651
652
653
                        ))

            ctrl.sandbox.start(container)

        except:
BAIRE Anthony's avatar
BAIRE Anthony committed
654
655
            with docker_warning("cleanup error: unable to remove container %r",
                    container, ignore=docker.errors.NotFound):
656
657
658
                ctrl.sandbox.remove_container(container, force=True)
            raise

BAIRE Anthony's avatar
BAIRE Anthony committed
659
    def _commit(self, webapp, versions):
660
661
662
663
664
665
666
667
668
669
670
        """Commit a webapp sandbox

        (to be executed in a thread pool)

        The image version is looked up in webapp_versions (where state==sandbox).
        
        In case of any error, a recovery version is committed instead (to avoid
        loosing the work done inside the sandbox) and the candidates are put in
        error state.
        """
        ctrl = self.ctrl
BAIRE Anthony's avatar
BAIRE Anthony committed
671
        ses  = ctrl.session
672

BAIRE Anthony's avatar
BAIRE Anthony committed
673
674
675
676
677
678
        # pre-commit checks
        # - ensure that there is exactly one candidate webapp_version with
        #   state=sandbox
        # - otherwise:
        #   - put all candidates in error state
        #   - create a recovery version
679

BAIRE Anthony's avatar
BAIRE Anthony committed
680
681
682
683
684
        # version be committed
        version = None
        
        # error msg (if any)
        error = None
685

BAIRE Anthony's avatar
BAIRE Anthony committed
686
687
        # version ids to be recovered
        recover = ()
688

BAIRE Anthony's avatar
BAIRE Anthony committed
689
690
691
692
693
694
        if len(versions) == 1:
            # normal case (sandbox commit)
            version = versions[0]
            if not version.number:
                error   = "empty version number"
                recover = version.id,
695

BAIRE Anthony's avatar
BAIRE Anthony committed
696
697
698
        elif not versions:
            # sandbox rollback (when user drops a sandbox without committing a new image)
            error     = "dangling sandbox"
699

BAIRE Anthony's avatar
BAIRE Anthony committed
700
701
702
703
704
        else:
            # multiple candidates (should never happen)
            error    = "multiple candidate versions (%s)" % (
                ", ".join(map(repr, sorted(v.number for v in versions))))
            recover = tuple(v.id for v in versions)
705

BAIRE Anthony's avatar
BAIRE Anthony committed
706
707
        # TODO: make 'sandbox' a reserved name

BAIRE Anthony's avatar
BAIRE Anthony committed
708
709
710
        if error:
            changelog = "pre-commit error: " + error
            log.error("sandbox %r version id %r: %s", webapp.docker_name, recover, changelog)
711

BAIRE Anthony's avatar
BAIRE Anthony committed
712
            with ses.begin():
713
714
                # put all candidates in 'error state'
                if recover:
BAIRE Anthony's avatar
BAIRE Anthony committed
715
                    ses.execute('''UPDATE webapp_versions
BAIRE Anthony's avatar
fix sql    
BAIRE Anthony committed
716
717
                            SET changelog=CONCAT(changelog, " [", :changelog, "]"), state=:state
                            WHERE id IN :ids''', dict(changelog=changelog, ids=recover,
BAIRE Anthony's avatar
BAIRE Anthony committed
718
                                state=int(VersionState.ERROR)))
719
720

                # create a recovery version
BAIRE Anthony's avatar
BAIRE Anthony committed
721
722
723
724
725
                version = WebappVersion(
                        webapp_id = webapp.id,
                        number    = time.strftime("recovery-%Y%m%d-%H%M%S"),
                        changelog = changelog,
                        published = False,
BAIRE Anthony's avatar
BAIRE Anthony committed
726
                        state     = int(VersionState.SANDBOX))
BAIRE Anthony's avatar
BAIRE Anthony committed
727
728
729
                ses.add(version)
            ses.refresh(version)
            ses.expunge(version)
730

BAIRE Anthony's avatar
BAIRE Anthony committed
731
        assert version is not None
732

BAIRE Anthony's avatar
BAIRE Anthony committed
733
        # commit the docker image
734

BAIRE Anthony's avatar
BAIRE Anthony committed
735
736
        log.debug("dicts %r %r", webapp.__dict__, version.__dict__)
        log.info("commit sandbox %r version %r", webapp.docker_name, version.number)
737

BAIRE Anthony's avatar
BAIRE Anthony committed
738
739
740
741
        container = webapp.sandbox_name
        next_state = image_size = None
        try:
            # stop the container (if stopping or if creating a new sandbox)
BAIRE Anthony's avatar
BAIRE Anthony committed
742
            if webapp.sandbox_state in (SandboxState.STOPPING, SandboxState.STARTING):
BAIRE Anthony's avatar
BAIRE Anthony committed
743
744
               ctrl.sandbox.stop(container)
               ctrl.sandbox.wait(container)
745

BAIRE Anthony's avatar
BAIRE Anthony committed
746
747
            # commit
            cid = ctrl.sandbox.commit(container, webapp.image_name, version.number)
BAIRE Anthony's avatar
BAIRE Anthony committed
748
            next_state = VersionState.COMMITTED
BAIRE Anthony's avatar
BAIRE Anthony committed
749
750
751
752
753
754
755
            image_size = ctrl.sandbox.inspect_image(cid)["Size"]

            return version, error

        except docker.errors.NotFound:
            error = "commit error: container not found %r" % container
            log.error("%s", error)
BAIRE Anthony's avatar
BAIRE Anthony committed
756
            next_state = VersionState.ERROR
BAIRE Anthony's avatar
BAIRE Anthony committed
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
            image_size = 0
            ses.execute('''UPDATE webapp_versions
                    SET changelog=CONCAT(changelog, " [commit error: sandbox is down]")
                    WHERE id=%d''' % version.id)

            # here we do not propagate the error to allow starting/stopping the
            # sandbox immediately (without going through sandbox_state=:error)
            return None, error

        except Exception as e:
            log.exception("sandbox %r version %r: unexpected commit error (sandbox may still be recovered)",
                    webapp.docker_name, version.number)
            raise

        finally:
            # NOTE: if anything unexpected happens, the version is
            # left in state 'sandbox' and we propagate the exception to
            # ensure the work done inside the sandbox is not dropped
            # and the sandbox is pute in 'error' state
            #
            # The error will not be reported to the user. If this is an issue,
            # then the solution would be to create another error state (to be
            # used when the sandbox is still there).
            # 
            if next_state is not None:
                with ses.begin():
                    ses.execute("UPDATE webapp_versions SET state=%d, docker_image_size=%d WHERE id=%d"
                            % (next_state, image_size, version.id))
BAIRE Anthony's avatar
BAIRE Anthony committed
785

786

BAIRE Anthony's avatar
BAIRE Anthony committed
787
    @asyncio.coroutine
BAIRE Anthony's avatar
BAIRE Anthony committed
788
    def _manage_commit(self, webapp, versions, *, force=False):
BAIRE Anthony's avatar
BAIRE Anthony committed
789
790
791
792
793
794
795
796
        """Manage sandbox commit (if needed) and notify the image manager

        The commit is performed if one of these conditions is fulfilled:
        - a commit was requested (there is at least one WebappVersion entry
          with state=sandbox for this app)
        - a docker container exists (for this sandbox) and force is true
        """

BAIRE Anthony's avatar
BAIRE Anthony committed
797
        if not (versions or (force and self.inspect_sandbox(webapp) is not None)):
BAIRE Anthony's avatar
BAIRE Anthony committed
798
799
800
            return
        
        # do the commit
BAIRE Anthony's avatar
BAIRE Anthony committed
801
        version, error = yield from self.run_in_executor(self._commit, webapp, versions)
BAIRE Anthony's avatar
BAIRE Anthony committed
802
803

        # trigger push/pull operations (but do not wait)
BAIRE Anthony's avatar
BAIRE Anthony committed
804
        if version is not None:
805
806

            # push to the registry
BAIRE Anthony's avatar
BAIRE Anthony committed
807
            self.ctrl.image_manager.push(version.id)
808
809
810
811

            if not error:
                # preemptive pull to the swarm
                # (the image may be needed soon)
BAIRE Anthony's avatar
BAIRE Anthony committed
812
                self.ctrl.image_manager.pull(version.id, swarm=True)
813
814


BAIRE Anthony's avatar
BAIRE Anthony committed
815
816
    def _stop(self, webapp):
        """Stop a webapp sandbox
817

BAIRE Anthony's avatar
BAIRE Anthony committed
818
        (to be executed in a thread pool)
819
820
821
        """
        try:
            # FIXME: remove volumes (v=True) too ?
BAIRE Anthony's avatar
BAIRE Anthony committed
822
            self.ctrl.sandbox.remove_container(webapp.sandbox_name, force=True)
823
824
825
        except docker.errors.NotFound:
            pass

826
827

    @asyncio.coroutine
828
829
830
831
    def _process(self, webapp_id, reset):
        ctrl = self.ctrl

        log.debug("process sandbox %d", webapp_id)
BAIRE Anthony's avatar
BAIRE Anthony committed
832
833
834
835
836
837
838
839

        ses = ctrl.session
        with ses.begin():
            # current state of the sandbox + load docker os
            webapp = ses.query(Webapp).filter_by(id=webapp_id).one()
            webapp.docker_os

            # version to be started
840
            sandbox_version = webapp.sandbox_version
BAIRE Anthony's avatar
BAIRE Anthony committed
841
842
843
844
845
846
847
848
849
850

            # requested commits
            commit_versions = self.filter_commit_version(ses.query(WebappVersion), webapp_id).all()

            ses.expunge_all()

        # docker name of the sandbox & image
        webapp.sandbox_name = ctrl.gen_sandbox_name(webapp)
        webapp.image_name   = ctrl.gen_image_name(webapp)

BAIRE Anthony's avatar
BAIRE Anthony committed
851
        phase = "inspect"
852
        next_state = fail_state = None
853
        try:
BAIRE Anthony's avatar
BAIRE Anthony committed
854
            if webapp.sandbox_state == SandboxState.STARTING:
855
                # start the sandbox
BAIRE Anthony's avatar
BAIRE Anthony committed
856
                phase = "start"
BAIRE Anthony's avatar
BAIRE Anthony committed
857
858
                next_state = SandboxState.RUNNING
                fail_state = SandboxState.START_ERROR
859

BAIRE Anthony's avatar
BAIRE Anthony committed
860
                # commit (if a sandbox exists)
BAIRE Anthony's avatar
BAIRE Anthony committed
861
                yield from self._manage_commit(webapp, commit_versions, force=True)
BAIRE Anthony's avatar
BAIRE Anthony committed
862

BAIRE Anthony's avatar
BAIRE Anthony committed
863
                if sandbox_version is not None:
864
865
866
867
868
869
                    # ensure version belongs to this application
                    if sandbox_version.webapp_id != webapp.id:
                        raise Error("invalid version id %d (belongs to webapp %d)" % (
                            sandbox_version.id, sandbox_version.webapp_id))

                    # pull requested image
BAIRE Anthony's avatar
BAIRE Anthony committed
870
                    yield from ctrl.image_manager.pull(sandbox_version.id)
BAIRE Anthony's avatar
BAIRE Anthony committed
871
872

                # start sandbox
BAIRE Anthony's avatar
BAIRE Anthony committed
873
                yield from self.run_in_executor(self._start, webapp, sandbox_version)
874

BAIRE Anthony's avatar
BAIRE Anthony committed
875
            elif webapp.sandbox_state == SandboxState.STOPPING:
876
                # stop the sandbox
BAIRE Anthony's avatar
BAIRE Anthony committed
877
                phase = "stop"
BAIRE Anthony's avatar
BAIRE Anthony committed
878
879
                next_state = SandboxState.IDLE
                fail_state = SandboxState.STOP_ERROR
880

BAIRE Anthony's avatar
BAIRE Anthony committed
881
                # commit (if requested)
BAIRE Anthony's avatar
BAIRE Anthony committed
882
                yield from self._manage_commit(webapp, commit_versions)
883

BAIRE Anthony's avatar
BAIRE Anthony committed
884
                yield from self.run_in_executor(self._stop, webapp)
885
886

            else:
BAIRE Anthony's avatar
BAIRE Anthony committed
887
888
                # commit (if requested)
                phase = "commit"
BAIRE Anthony's avatar
BAIRE Anthony committed
889
                yield from self._manage_commit(webapp, commit_versions)
BAIRE Anthony's avatar
BAIRE Anthony committed
890

891
892
893
894
        except ShuttingDown:
            next_state = None
            log.info("sandbox %r %s aborted (controller shutdown)", webapp.docker_name, phase)

895
896
897
898
        except BaseException as e:
            next_state = fail_state

            log_func = log.error if isinstance(e, (docker.errors.APIError, Error)) else log.exception
BAIRE Anthony's avatar
BAIRE Anthony committed
899
            log_func ("sandbox %r %s error (%s)", webapp.docker_name, phase,
900
901
902
903
904
905
906
907
                    traceback.format_exception_only(type(e), e)[-1].strip())

        finally:
            if next_state is not None:
                # atomically update the sandbox state in the db
                # (in case another action is requested during the process, eg: the user
                #  stops the sandbox while it is not fully started)
                log.info("sandbox %r is now in state %r", webapp.docker_name, next_state.name)
BAIRE Anthony's avatar
BAIRE Anthony committed
908
909
910
                with ses.begin():
                    ses.execute("UPDATE webapps SET sandbox_state=%d WHERE id=%d AND sandbox_state=%d" %
                            (next_state, webapp_id, webapp.sandbox_state))
911

912
913
            log.debug("done    sandbox %d", webapp_id)

914
class DockerWatcher:
BAIRE Anthony's avatar
doc    
BAIRE Anthony committed
915
916
917
918
919
920
921
922
    """A class for monitoring docker events from an asyncio loop

    Currently only the "die" events (container termination) are supported.

    The watcher is run in a separate thread (calling docker.Client.events())
    """

    def __init__(self, client: docker.Client):
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
        self._client = client

        # {container_id: future}
        self._futures = {}

        self._shutdown = False

        self._thread = threading.Thread(target=self._thread_func)
        self._thread.daemon = True
        self._thread.start()

    def _thread_func(self):
        # limit docker client requests to one per minute
        # (in case the docker daemon has errors)
        limiter = rate_limit(60)
        log.debug("watcher started")

BAIRE Anthony's avatar
BAIRE Anthony committed
940
        while not self._shutdown:   # pragma: nobranch
941
942
            next(limiter)
            try:
943
                for event_bytes in self._client.events(filters={"event": "die"}):
BAIRE Anthony's avatar
BAIRE Anthony committed
944
945
                    if self._shutdown:
                        return
946
947
948
949
950
951
952
953
954

                    log.debug("docker event %r", event_bytes)

                    event = json.loads(event_bytes.decode())
                    if event["status"] == "die":
                        cid = event["id"]
                        fut = self._futures.get(cid)
                        if fut is not None and not fut.done():
                            fut._loop.call_soon_threadsafe(fut.set_result, None)
955
956
957
958
959
960
            except Exception:
                log.exception("docker watcher exception")



    def shutdown(self):
BAIRE Anthony's avatar
doc    
BAIRE Anthony committed
961
962
963
964
965
        """Shutdown the watcher

        -> all pending .wait() are inturrepted with ShuttingDown
        -> all future .wait() calls will immediately raise ShuttingDown
        """
966
967
968
        if not self._shutdown:
            self._shutdown = True
            for fut in self._futures.values():
BAIRE Anthony's avatar
BAIRE Anthony committed
969
                if not fut.done():  # pragma: nobranch
970
971
972
973
974
                    fut.set_exception(ShuttingDown())


    @asyncio.coroutine
    def wait(self, container_id):
BAIRE Anthony's avatar
doc    
BAIRE Anthony committed
975
976
977
978
979
980
981
        """Wait for the termination of a container

        Notes:
         - `container_id` *must* be the full container id (64 digits)
         - the class support only one concurrent waiter for each container id

        """
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
        assert re.match(r"[0-9a-f]{64}\Z", container_id), "bad container id"

        if container_id in self._futures:
            raise RuntimeError("there is already a watch for this container")

        if self._shutdown:
            raise ShuttingDown()

        self._futures[container_id] = fut = asyncio.Future()
        try:
            if self._client.inspect_container(container_id)["State"]["Running"]:
                log.debug("wait for container:   %s", container_id)
                yield from fut
            log.debug("container terminated: %s", container_id)
        except docker.errors.NotFound:
            pass
        finally:
            del self._futures[container_id]



1003
1004


BAIRE Anthony's avatar
BAIRE Anthony committed
1005
class JobManager(Manager):
1006
1007
1008
    class JobInfo:
        __slots__ = "job_id", "ver_id", "ctr_id", "version", "ctr_name", "client", "watcher"

1009
1010
    def __init__(self, ctrl, nb_jobs=NB_JOB_TASKS):
        super().__init__(nb_jobs)
BAIRE Anthony's avatar
BAIRE Anthony committed
1011
1012
        self.ctrl = ctrl

1013
    def _create_job(self, info):
BAIRE Anthony's avatar
BAIRE Anthony committed
1014
1015
        ctrl = self.ctrl
        ses  = ctrl.session
1016
1017
1018
        tmp_img = None

        assert info.ctr_id is None
BAIRE Anthony's avatar
BAIRE Anthony committed
1019
1020
1021

        try:
            with ses.begin():
1022
                job = ses.query(Job).filter_by(id=info.job_id).one()
BAIRE Anthony's avatar
BAIRE Anthony committed
1023
1024
1025
                webapp = job.webapp

                log.info("start job %d (%s:%s)",
1026
                        info.job_id, webapp.docker_name, info.version)
BAIRE Anthony's avatar
BAIRE Anthony committed
1027

BAIRE Anthony's avatar
BAIRE Anthony committed
1028
                job.state = int(JobState.RUNNING)       # pragma: nobranch (TODO: remove (coverage bug))
BAIRE Anthony's avatar
BAIRE Anthony committed
1029
1030
1031

            
            repo = ctrl.gen_image_name(webapp)
1032
            image = "%s:%s" % (repo, info.version)
BAIRE Anthony's avatar
BAIRE Anthony committed
1033
1034
1035
1036

            job_path = ctrl.gen_job_path(job)
            log.debug("job.path: %r", job_path)

1037
1038
1039
            if info.ver_id is None:
                assert info.version == "sandbox"
                image = tmp_img = info.client.commit(ctrl.gen_sandbox_name(webapp), repo, info.version)["Id"]
BAIRE Anthony's avatar
BAIRE Anthony committed
1040
1041
1042
1043
1044
            
            # TODO use another workdir
            # TODO use another uid

            ctrl.check_host_path("isdir", job_path)
1045
            hc = ctrl.sandbox.create_host_config(
1046
                        binds = {job_path: {"bind": "/tmp"}},
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
                        cap_drop = ["all"],
                        # FIXME: CAP_DAC_OVERRIDE needed because all nfs files have uid,gid=1000,1000
                        cap_add = ["dac_override"],
                        cpu_period = ctrl.cpu_period,
                        cpu_quota  = ctrl.cpu_quota,
#                        mem_reservation = ctrl.mem_soft_limit,
                        mem_limit = ctrl.mem_hard_limit,
                    )
            if ctrl.mem_soft_limit:
                # TODO: upgrade docker-py (and use create_host_config)
                hc["MemoryReservation"] = ctrl.mem_soft_limit
            log.debug("host_config %r", hc)
1059
            info.ctr_id = info.client.create_container(image, name=info.ctr_name,
1060
                    working_dir = "/tmp",
1061
                    # NOTE: the command line is a little complex, but this is
BAIRE Anthony's avatar
todos    
BAIRE Anthony committed
1062
                    #   to ensure that (TODO write tests for this):
1063
1064
1065
1066
1067
1068
1069
1070
1071
                    #   - no output is lost (we go though a pipe in case the
                    #     app has multiple processes writing to stdout/stderr
                    #     concurrently)
                    #   - we get the exit code of the app (not the exit code of
                    #     cat)
                    #   - we are failsafe (if fifo creation fails then the app
                    #     is run anyway, with the exit code of cat)
                    #   - we have no unusual dependencies (only sh, cat and
                    #     mkfifo)
1072
                    command = ["/bin/sh", "-c", """
1073
                                fifo=/.allgo.fifo.{job_id}
1074
1075
1076
1077
1078
1079
                                if mkfifo "$fifo"
                                then
                                    exec cat <"$fifo" >allgo.log &
                                    exec "$@" >"$fifo" 2>&1 &
                                    wait %1
                                    wait %2
1080
                                    rm "$fifo"
1081
1082
1083
                                else
                                    "$@" 2>&1 | cat >allgo.log
                                fi
1084
1085
1086
1087
                                failcnt="`cat /sys/fs/cgroup/memory/memory.failcnt`"
                                if [ "$failcnt" -ne 0 ] ; then
                                    echo "WARNING: out of memory (memory.failcnt=$failcnt)" >>allgo.log
                                fi
BAIRE Anthony's avatar
BAIRE Anthony committed
1088
                        """.format(job_id=job.id),
1089
                        "job%d" % job.id, webapp.entrypoint] + shlex.split(job.param),
BAIRE Anthony's avatar
BAIRE Anthony committed
1090

1091
                    labels = {"allgo.tmp_img": tmp_img or ""},
1092
                    host_config = hc)["Id"]
1093
1094
            info.client.start(info.ctr_id)
        except:
BAIRE Anthony's avatar
todo    
BAIRE Anthony committed
1095
            #TODO introduce a state JobState.ERROR
1096
1097
            self._remove_job(info, tmp_img=tmp_img)
            raise
BAIRE Anthony's avatar
BAIRE Anthony committed
1098
1099


1100
1101
    def _remove_job(self, info, *, tmp_img=None):
        ses = self.ctrl.session
BAIRE Anthony's avatar
BAIRE Anthony committed
1102

1103
1104
1105
        # TODO: report launch errors to the user
        # TODO: report exit code to the user
        # TODO: use another uid
BAIRE Anthony's avatar
BAIRE Anthony committed
1106

1107
1108
1109
1110
1111
1112
1113
        def parse_docker_timestamp(value):
            return datetime.datetime.strptime(
                    # limit the precision to the microsecond
                    # (othewise strptime fails)
                    re.sub(r"(\.\d{,6})\d*Z$", r"\1Z", value),
                    # iso8601 format
                    "%Y-%m-%dT%H:%M:%S.%fZ")
BAIRE Anthony's avatar
BAIRE Anthony committed
1114

1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
        with ses.begin():
            job = ses.query(Job).filter_by(id=info.job_id).one()

            exec_time = 0.0
            if info.ctr_id is not None:
                try:
                    js = info.client.inspect_container(info.ctr_id)
                except docker.errors.NotFound:
                    pass
                else:
                    started_at  = js["State"].get("StartedAt", "0001-")
                    finished_at = js["State"].get("FinishedAt")
BAIRE Anthony's avatar
BAIRE Anthony committed
1127

1128
1129
1130
1131
1132
1133
                    # default docker date is '0001-01-01T00:00:00Z'
                    if not started_at.startswith("0001-"):
                        try:
                            exec_time =( parse_docker_timestamp(finished_at)
                                        - parse_docker_timestamp(started_at)
                                        ).total_seconds()
BAIRE Anthony's avatar
BAIRE Anthony committed
1134
1135
                        except Exception: # pragma: nocover
                            log.exception("job %d: unable to compute exec time", info.job_id)
BAIRE Anthony's avatar
BAIRE Anthony committed
1136

1137
                    if tmp_img is None:
BAIRE Anthony's avatar
BAIRE Anthony committed
1138
                        tmp_img = js["Config"]["Labels"].get("allgo.tmp_img") or None
BAIRE Anthony's avatar
BAIRE Anthony committed
1139

1140
1141
                with docker_warning("job %d: cleanup error: unable to remove container", info.job_id):
                    info.client.remove_container(info.ctr_id)
BAIRE Anthony's avatar
BAIRE Anthony committed
1142
1143

            if tmp_img is not None:
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
                with docker_warning("job %d: cleanup error: unable to remove tmp image", info.job_id):
                    info.client.remove_image(tmp_img)

            job.exec_time = exec_time
            job.state     = int(JobState.DONE)

        log.info("stop  job %d (duration %fs)", info.job_id, exec_time)


    @asyncio.coroutine
    def _finish_job(self, info):
        # wait for container to terminate
        if info.ctr_id is not None:
            yield from info.watcher.wait(info.ctr_id)
BAIRE Anthony's avatar
BAIRE Anthony committed
1158

1159
1160
        # remove container
        yield from self.run_in_executor(self._remove_job, info, lock=False)
BAIRE Anthony's avatar
BAIRE Anthony committed
1161

1162
    @asyncio.coroutine
BAIRE Anthony's avatar
BAIRE Anthony committed
1163
1164
1165
1166
1167
1168
1169
1170
    def _process(self, job_id, reset):
        ctrl = self.ctrl
        ses  = ctrl.session
        log.debug("process job id %d", job_id)

        with ses.begin():
            # query db
            job = ses.query(Job).filter_by(id=job_id).first()
BAIRE Anthony's avatar
BAIRE Anthony committed
1171
            if job is None:     # pragma: nocover
1172
1173
                # unknown job
                log.warning("unknown job id %d", job_id)
BAIRE Anthony's avatar
BAIRE Anthony committed
1174
                return
BAIRE Anthony's avatar
BAIRE Anthony committed
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184

            state = JobState(job.state)

            if job.webapp is None:
                log.error("job %d: webapp id %r not found", job_id, job.webapp_id)
                if state == JobState.WAITING:       # pragma: nobranch
                    job.state = int(JobState.DONE)
                    job.exec_time = 0
                # TODO report error to the user ?
                return
1185
1186
1187
1188
1189
1190
            
            info = self.JobInfo()
            info.job_id     = job_id
            info.ctr_id     = None
            info.version    = job.version
            info.ctr_name   = ctrl.gen_job_name(job)
BAIRE Anthony's avatar
BAIRE Anthony committed
1191
1192
1193


            if job.version == "sandbox":
1194
1195
                info.client  = ctrl.sandbox
                info.watcher = ctrl.sandbox_watcher
BAIRE Anthony's avatar
BAIRE Anthony committed
1196
            else:
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
                info.client  = ctrl.swarm
                info.watcher = ctrl.swarm_watcher


            if state == JobState.WAITING:
                # job is not yet started
                if job.version == "sandbox":
                    # to be run in the sandbox
                    info.ver_id = None
                else:
                    # to be run in the swarm

                    #TODO: replace version_id with webapp_version_id
                    ver = ses.query(WebappVersion).filter_by(
                            webapp_id = job.webapp_id,
                            number    = job.version).filter(
1213
                                WebappVersion.state.in_((
BAIRE Anthony's avatar
BAIRE Anthony committed
1214
1215
                                    int(VersionState.COMMITTED),
                                    int(VersionState.READY)))
1216
1217
1218
                            ).order_by(
                                    WebappVersion.state.desc(),
                                    WebappVersion.id.desc()).first()
1219
1220
1221
1222
1223
1224
1225
1226
1227
                    if ver is None:
                        log.error("job %d: webapp %r version %r not found",
                                job_id, job.webapp.docker_name, job.version)

                        job.state = int(JobState.DONE)
                        # TODO report error to the user
                        return
                    info.ver_id = ver.id

BAIRE Anthony's avatar
BAIRE Anthony committed
1228
            elif state == JobState.RUNNING:     # pragma: nobranch
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
                # job is already started

                # we do not care about the actual version_id *but* we need to
                # know whether we are in the swarm or in the sandbox
                info.ver_id = None if job.version == "sandbox" else -1

                # look up container id
                try:
                    info.ctr_id = info.client.inspect_container(info.ctr_name)["Id"]
                except docker.errors.NotFound:
                    info.ctr_id = None
                    
BAIRE Anthony's avatar
BAIRE Anthony committed
1241
            else:   # pragma: nocover
1242
1243
1244
1245
                # unexpected state
                log.warning("job id %d is in unexpected state %s", job_id, state.name)
                return

BAIRE Anthony's avatar
BAIRE Anthony committed
1246

1247
1248
1249
1250
1251
        if state == JobState.WAITING:
            # job is not yet started

            # pull the image to the swarm
            if info.ver_id is not None:
BAIRE Anthony's avatar
BAIRE Anthony committed
1252
1253
                # NOTE: race condition: will fail if ver.state==sandbox
                #  jobs must be submitted after the image is committed
1254
1255
1256
1257
1258
1259
1260
                yield from ctrl.image_manager.pull(info.ver_id, swarm=True)

            # lock the semaphore (to limit the total number of jobs)
            with (yield from self):
                yield from self.run_in_executor(self._create_job, info, lock=False)
                yield from self._finish_job(info)

BAIRE Anthony's avatar
BAIRE Anthony committed
1261
        elif state == JobState.RUNNING: # pragma: nobranch
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
            # the job is already running
            # -> wait for its termination

            try:
                # try to lock the semaphore
                semctx = yield from asyncio.wait_for(iter(self), 0)

            except asyncio.TimeoutError:
                # no available tokens
                # (this may happen if MAX_JOBS is reduced across a restart)
                # -> wait for the job anyway
                yield from self._finish_job(info)

            else:
                # normal case
                with semctx:
                    yield from self._finish_job(info)
BAIRE Anthony's avatar
BAIRE Anthony committed
1279

1280
1281
1282
1283
1284
1285
1286
1287
1288
# NOTE: for the push/pull managers, interruptible=True guarantees that the
#   managers terminate immediately, however it cannot guarantee that the
#   process will terminate immediately because the ThreadPoolExecuter installs
#   a atexit handler that joins all the background threads.
#
#   Anyway this is not a big issue since all pending push/pull raise
#   ShuttingDown immediately, thus we won't end up with a sandbox/job
#   in an inconsistent state when SIGKILL arrives.
#
1289
1290

class PullManager(Manager):
1291
    def __init__(self, nb_threads, client, name):
1292
        super().__init__(nb_threads, interruptible=True)
1293
        self.client    = client
1294
1295
        self.name      = name