controller.py 63.6 KB
Newer Older
1
2
3
4
#!/usr/bin/python3


import asyncio
5
import collections
6
from   concurrent.futures   import ThreadPoolExecutor
7
import contextlib
8
import datetime
9
import enum
BAIRE Anthony's avatar
BAIRE Anthony committed
10
import itertools
BAIRE Anthony's avatar
BAIRE Anthony committed
11
import json
12
13
14
import logging
import re
import os
15
import shlex
16
17
18
import signal
import socket
import sys
19
20
21
import time
import threading
import traceback
22
23

import docker
BAIRE Anthony's avatar
BAIRE Anthony committed
24
import iso8601
25
import MySQLdb
BAIRE Anthony's avatar
BAIRE Anthony committed
26
27
from   sqlalchemy import desc
import sqlalchemy.orm.scoping
28
import yaml
29

30
import config_reader
31
from database import *
32
from shared_swarm import SharedSwarmClient, ShuttingDown
33

34
35
HOST_PATH="/vol/host/"

36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
##################################################
# default number of concurrent tasks

# pushing images from the sandbox to the registry
NB_PUSH_SANDBOX = 2

# pulling images from the registry to the sandbox
NB_PULL_SANDBOX = 2

# pulling images from the registry to the swarm
NB_PULL_SWARM   = 2

# sandbox actions (start, stop, commit, ...)
NB_SANDBOX_TASKS = 4

# jobs
NB_JOB_TASKS     = 4


# default thread pool (used by most of the tasks)
default_executor = ThreadPoolExecutor(10)

##################################################

BAIRE Anthony's avatar
BAIRE Anthony committed
60
log = logging.getLogger("controller")
61

62
63
64
65
66
assert MySQLdb.threadsafety >= 1


class Error(Exception):
    pass
67

BAIRE Anthony's avatar
BAIRE Anthony committed
68
69
70
71
72
73
74
75
76
77
78
79
80
def docker_check_error(func, *k, **kw):
    """Wrapper for docker-py methods that produce a stream

    Methods producing a stram (eg: push, build) do not report all errors
    by raising exceptions. Some errors are reported later in the stream.

    This function parses the stream and raise Error() if needed.
    """
    for elem in func(*k, stream=True, **kw):
        js = json.loads(elem.decode())
        if "error" in js:
            raise Error("push error: " + js["error"])

BAIRE Anthony's avatar
BAIRE Anthony committed
81
82
83
84
85
86
87
88
89
@contextlib.contextmanager
def docker_warning(msg, *k, ignore=None):
    """Catch docker errors and issue a warning instead"""
    try:
        yield
    except docker.errors.APIError as e:
        if ignore is None or not isinstance(e, ignore):
            k += e,
            log.warning(msg + " (%s)", *k)
BAIRE Anthony's avatar
BAIRE Anthony committed
90

BAIRE Anthony's avatar
BAIRE Anthony committed
91
92
@contextlib.contextmanager
def report_error(fmt, *k):
BAIRE Anthony's avatar
doc    
BAIRE Anthony committed
93
94
95
96
97
98
99
    """Context manager for logging exceptions

    This function logs exceptions (when leaving the context) with log.error()
    (if the exception inherit from Error) or log.exception() otherwise.

    The log message is prepended with the string generated by: fmt % k
    """
BAIRE Anthony's avatar
BAIRE Anthony committed
100
101
102
103
104
105
106
107
    try:
        yield
    except Exception as e:
        msg = fmt % k
        log_func = log.error if isinstance(e, Error) else log.exception
        log_func("%s (%s)", msg,
                traceback.format_exception_only(type(e), e)[-1].strip())
        raise
108

BAIRE Anthony's avatar
BAIRE Anthony committed
109
110
111
112
113
114
115
116
117
118
119
120
121
def disable_future_warning(fut):
    """Add a dummy callback to a future to prevent asyncio warnings

    Return: the future

    The asynicio module log a warning message when the result of a future is
    not used. This function installs a dummy callback to the future so as to
    avoid this warning. This is useful for futures whose result *may* be
    ignored by the application.
    """
    fut.add_done_callback(lambda f: f.exception())
    return fut

122

123
124
125
126
127
128
129
130
131
def auto_create_task(func):
    """Decorator for forcing the creation of a task when a coroutine function is called

    Return a wrappers that calls the function, create the task on the fly and
    return it. Also it installs a no-op callback to avoid warnings in case the
    result is not used.
    """
    assert asyncio.iscoroutinefunction(func)

BAIRE Anthony's avatar
BAIRE Anthony committed
132
133
    return (lambda *k, **kw:
        disable_future_warning(asyncio.async(func(*k, **kw))))
BAIRE Anthony's avatar
BAIRE Anthony committed
134

135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
def cascade_future(src, dst):
    """propagate the result of a future to another future
    
    This function installs a callback to the future `src`, that propagates its
    result to the future `dst`.
    """
    def callback(fut):
        ex = fut.exception()
        if ex is None:
            dst.set_result(fut.result())
        else:
            dst.set_exception(ex)

    if src.done():
        callback(src)
    else:
        src.add_done_callback(callback)

BAIRE Anthony's avatar
todo    
BAIRE Anthony committed
153

154
class Manager:
BAIRE Anthony's avatar
BAIRE Anthony committed
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
    """A class for scheduling asynchronous jobs on a collection of keys

    (the one-line summary is not very helpful, sorry)
    

    Job submission

    The job is scheduled on key KEY by calling .process(KEY), the function
    returns an asyncio.Future object (which will receive the result)

    If .process(KEY) is called while a job is already running for KEY, then the
    manager arranges the job to be run a second time in a row for the same key


    Job implementation

    This is an abstract class. The job shall be implemented as the coroutine
    named ._process() in the inherited class.

    The manager guarantees that ._process() cannot be called multiple times
    concurrently for the same key

    In case a shutdown is requested (see .shutdown()):
     - all jobs that are not yet started are cancelled
     - running jobs continue until they return or until they try to acquire the
       internal semaphore (which raises ShuttingDown())


    Concurrency

    The job tasks are all started immediately (whatever the number of existing
    jobs)

    It is possible to limit the concurrency to a maximum number of parallel
    tasks. Manager provides an internal semaphore (the number of tokens is set
    in .__init__())

    The semaphore can be locked in two ways:
     - by locking the manager:

            with (yield from self): ... concurrency limited to `nb_tokens`
            tasks ...

     - by calling .run_in_executor (this is for running non-async code):

            yield from self.run_in_executor(func, args...)

    Shutdown

    When .shutdown() is called the Manager ensures that all jobs are properly
    terminated
     - it cancels all jobs that are not yet started
     - it prevents starting new jobs
     - it lets running pending jobs, but interrupts them if they when ther try
       to acquire the internal semaphore

    All cancel/interrupted tasks have their future raise ShuttingDown().

    .shutdown() returns after all pending jobs are terminated.


    Thread safety

    - Manager is *not* thread safe, all public methods must be called from the
      same thread
    """
221

222
    class _Handle:
BAIRE Anthony's avatar
BAIRE Anthony committed
223
        __slots__ = "key", "cur", "nxt", "rescheduled"
224

225
    def __init__(self, nb_tokens=1, *, executor = default_executor, interruptible = False):
BAIRE Anthony's avatar
BAIRE Anthony committed
226
        # {key: _TaskHandler}
227
        self._handles = {}
228
        self._semaphore = asyncio.Semaphore(nb_tokens)
229
        self._shutdown = asyncio.Future()
230
        self._executor = executor
231
        self._interruptible = interruptible
232
233
234
235
236
237
238
239
240
241

    def _create_task(self, hnd):
        assert hnd.nxt is None
        def reset():
            assert not hnd.cur.done()
            nxt = hnd.nxt
            if nxt is not None:
                cascade_future(hnd.cur, nxt)
                hnd.nxt = None

BAIRE Anthony's avatar
BAIRE Anthony committed
242
243
244
245
246
247
            # recreate the 'rescheduled' future and return it
            hnd.rescheduled = disable_future_warning(asyncio.Future())
            return hnd.rescheduled

        hnd.rescheduled = disable_future_warning(asyncio.Future())
        hnd.cur = asyncio.async(self._process(hnd.key, reset, hnd.rescheduled))
248
        hnd.cur.add_done_callback(lambda fut: self._done(hnd))
BAIRE Anthony's avatar
BAIRE Anthony committed
249
        log.debug("task scheduled %r %r", self, hnd.key)
250
        return hnd.cur
251

BAIRE Anthony's avatar
BAIRE Anthony committed
252
253
254
255
256
    def process(self, key):
        """Schedule the job to be run on key `key`

        returns an asyncio.Future (that will provide the result of ._process())
        """
257
        if self._shutdown.done():
258
259
            return self._shutdown

BAIRE Anthony's avatar
BAIRE Anthony committed
260
        hnd = self._handles.get(key)
261
262
        if hnd is None:
            # create handle
BAIRE Anthony's avatar
BAIRE Anthony committed
263
264
            self._handles[key] = hnd = self._Handle()
            hnd.key = key
265
266
            hnd.cur = None
            hnd.nxt = None
BAIRE Anthony's avatar
BAIRE Anthony committed
267
            hnd.rescheduled = None
268
269
270
271
272
273
274
275

        if hnd.cur is None:
            # create task
            return self._create_task(hnd)
        else:
            # reschedule task
            if hnd.nxt is None:
                hnd.nxt = asyncio.Future()
BAIRE Anthony's avatar
BAIRE Anthony committed
276
277
            if not hnd.rescheduled.done():
                hnd.rescheduled.set_result(None)
278
279
280
281
            return hnd.nxt

    def _done(self, hnd):

BAIRE Anthony's avatar
BAIRE Anthony committed
282
        assert hnd is self._handles.get(hnd.key)
283
284
285
286
        assert hnd.cur.done()

        try:
            hnd.cur.result()
287
288
        except ShuttingDown:
            pass
289
        except Exception:
BAIRE Anthony's avatar
BAIRE Anthony committed
290
            log.exception("task %r %r unhandled exception", self, hnd.key)
291
292
293
294

        nxt     = hnd.nxt
        hnd.cur = hnd.nxt = None
        if nxt is None:
BAIRE Anthony's avatar
BAIRE Anthony committed
295
            del self._handles[hnd.key]
296
        else:
297
            cascade_future(self._create_task(hnd), nxt)
298

299

300
301
    @asyncio.coroutine
    def __iter__(self):
BAIRE Anthony's avatar
BAIRE Anthony committed
302
303
304
305
306
307
308
309
310
311
        """Coroutine for locking the internal semaphore
        
        Usage:
            with (yield from manager):
              ...

        Warning:
            after a shutdown is initiated, this function will always raise
            ShuttingDown()
        """
312
        ctx = yield from iter(self._semaphore)
313
        if self._shutdown.done():
314
315
316
317
318
319
            with ctx:
                raise ShuttingDown()
        return ctx


    @asyncio.coroutine
320
    def run_in_executor(self, *k, lock=True):
BAIRE Anthony's avatar
BAIRE Anthony committed
321
322
323
324
325
        """Run a function in a separate thread (with limited concurrency)

        This function locks the internal semaphore and runs the provided
        functionc call in a separate thread (using the executor)
        """
326
327
328
329
330
331
332
333

        def run():
            coro = asyncio.get_event_loop().run_in_executor(self._executor, *k)
            if self._interruptible:
                return next(asyncio.as_completed((coro, self._shutdown)))
            else:
                return coro
        
334
335
        if lock:
            with (yield from self):
336
                return (yield from run())
337
        else:
338
            return (yield from run())
339
340


BAIRE Anthony's avatar
BAIRE Anthony committed
341
    @asyncio.coroutine
BAIRE Anthony's avatar
BAIRE Anthony committed
342
    def _process(self, key, reset, rescheduled):
BAIRE Anthony's avatar
doc    
BAIRE Anthony committed
343
344
345
346
347
348
349
        """Actual implementation of the job (to be reimplemented in inherited classes)

        The Manager class guarantees that this function cannot be called
        multiple times concurrently on the same key (in case the same key is
        submitted multiple times, they Manager will call this function a second
        time after it has terminated).

BAIRE Anthony's avatar
BAIRE Anthony committed
350
351
352
353
        `rescheduled` is a future whose result is set when the job is being
        rescheduled (if process(key) is called before _process(key ...)
        terminates.

BAIRE Anthony's avatar
doc    
BAIRE Anthony committed
354
355
        `reset` is a function that may be called to reset the 'dirty' state of
        this key (this is to avoid calling ._process() a second time if not
BAIRE Anthony's avatar
BAIRE Anthony committed
356
        necessary) in that case, a new `rescheduled` future is returned.
BAIRE Anthony's avatar
doc    
BAIRE Anthony committed
357
        """
358
359
        raise NotImplementedError()

360
    @asyncio.coroutine
361
    def shutdown(self):
BAIRE Anthony's avatar
BAIRE Anthony committed
362
363
364
365
        """Initiate a graceful shutdown

        This coroutine terminates once all tasks are properly terminated.
        """
366
367
        exc = ShuttingDown()
        self._shutdown.set_exception(exc)
368
        self._shutdown.exception()  # to avoid asyncio warnings
369
370
371
        # cancel all 'next' tasks
        for hnd in self._handles.values():
            if hnd.nxt is not None:
372
                hnd.nxt.set_exception(exc)
373
374
                hnd.nxt = None

375
376
377
378
        if not self._interruptible:
            yield from asyncio.gather(
                    *(h.cur for h in self._handles.values() if h.cur is not None),
                    return_exceptions=True)
BAIRE Anthony's avatar
BAIRE Anthony committed
379

380
381

class SandboxManager(Manager):
BAIRE Anthony's avatar
BAIRE Anthony committed
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
    """Manager for sandbox operation

    This manager handles all sandbox operations (start, stop, commit).

    Operations are requested asynchronously in the database:
     - Webapp.sandbox_state=starting (for starting a sandbox)
     - Webapp.sandbox_state=stopping (for stopping a sandbox)
     - WebappVersion with state=sandbox (for committing a new image)

    All state changes on Webapp.sandbox_state are atomic (e.g: if the user
    requests a stop while the sandbox is starting then the manager finishes
    with starting the sandbox but does not update the state, and it runs
    immediately again to stop the sandbox)

    Whatever is the value of Webapp.sandbox_state, the manager first examines
    commit requests and makes the commit if requested.

    If the container already exists before starting the webapp (not possible in
    normal operations), then a recovery image is committed first.

    When a commit is successful. The image manager is notified for pushing the
    image to the registry and (if it is not a recovery image) to pull it to the
    swarm (because this image will very likely be used soon).

    State changes:

     - sandbox start:
            starting->running     (normal case)
            starting->start_error (error case)

     - sandbox stop:
            stopping->idle        (normal case)
            stopping->stop_error  (error case)

     - image commit:
            sandbox->committed  (normal case)
            sandbox->error      (error case)
            (none)->committed   (recovery version) 
    """
421

422
    def __init__(self, ctrl, nb_threads = NB_SANDBOX_TASKS):
423
        super().__init__(nb_threads)
424
425
426
        self.ctrl = ctrl


BAIRE Anthony's avatar
BAIRE Anthony committed
427
428
    def inspect_sandbox(self, webapp):
        try:
BAIRE Anthony's avatar
BAIRE Anthony committed
429
430
            return self.ctrl.sandbox.inspect_container(
                    self.ctrl.gen_sandbox_name(webapp))
BAIRE Anthony's avatar
BAIRE Anthony committed
431
432
433
        except docker.errors.NotFound:
            return None

BAIRE Anthony's avatar
BAIRE Anthony committed
434
435
436
437
438
439

    @staticmethod
    def filter_commit_version(query, webapp_id):
        """Narrow a WebappVersion query to select the candidate versions to be committed"""
        return (query.
                filter_by(webapp_id=webapp_id,
BAIRE Anthony's avatar
BAIRE Anthony committed
440
                         state = int(VersionState.SANDBOX))
BAIRE Anthony's avatar
BAIRE Anthony committed
441
442
443
                    )

    def _start(self, webapp, version):
444
445
446
447
448
449
        """Start a webapp sandbox

        (to be executed in a thread pool)
        """

        ctrl = self.ctrl
BAIRE Anthony's avatar
BAIRE Anthony committed
450
        ses  = ctrl.session
451
452

        # prepare sandbox parameters
BAIRE Anthony's avatar
BAIRE Anthony committed
453
454
455

        # docker image
        if version is None:
456
            image = "%s:%s" % (ctrl.gen_factory_name(webapp.docker_os),
BAIRE Anthony's avatar
BAIRE Anthony committed
457
                    webapp.docker_os.version)
458
        else:
BAIRE Anthony's avatar
BAIRE Anthony committed
459
            image = "%s:%s" % (webapp.image_name, version.number)
BAIRE Anthony's avatar
BAIRE Anthony committed
460
461

        log.debug("sandbox %r: using image %r", webapp.docker_name, image)
462
463
464
465
466
467
468
469
470
471
472
473

        # safety checks
        # (because docker_name is used it the paths of the external volumes
        if ("/" in webapp.docker_name) or (webapp.docker_name in ("", ".", "..")):
            raise Error("malformatted docker_name")

        uid = webapp.id + 2000
        if uid < 2000:
            # just for safety
            raise Error("bad webapp id")

        # remove stale container (if any)
474
475
        if self.inspect_sandbox(webapp) is not None:
            self._stop(webapp)
476

BAIRE Anthony's avatar
BAIRE Anthony committed
477
        container = webapp.sandbox_name
478
479
        try:
            # prepare the sandbox
480
481
482
483
            # (create ssh keys)
            ctrl.check_host_path("isdir", ctrl.toolbox_path)
            ctrl.check_host_path("isdir", ctrl.sandbox_path)

BAIRE Anthony's avatar
BAIRE Anthony committed
484
            ctrl.sandbox.create_container("busybox:latest", name=container,
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
                    command = ["/bin/sh", "-c", """
set -ex

export PATH="$PATH:/.toolbox/bin"

# clean sandbox dir
rm -rf {sbx}

# create dirs
for dir in {sbx} {etc} {run}
do
    mkdir -p            ${{dir}}
    chown {uid}:65534   ${{dir}}
    chmod 0700          ${{dir}}
done

# xauth file
touch               {run}/XAuthority
chown {uid}:65534   {run}/XAuthority
chmod 0600          {run}/XAuthority

# generate ssh keys
(for type in ecdsa ed25519 rsa
do
    key={etc}/ssh_host_${{type}}_key
    [ -f $key ] || ssh-keygen -N '' -f $key -t $type >&2
    
    echo -n '{hostname}. ' | cat - ${{key}}.pub
done) > {etc}/ssh_known_hosts

# known_host file for allgo-shell
ssh-keygen -H -f {etc}/ssh_known_hosts
chmod 0644       {etc}/ssh_known_hosts
rm -f            {etc}/ssh_known_hosts.old

# authentication key for allgo-shell
rm -f               {etc}/identity
ssh-keygen -N '' -f {etc}/identity
chown {uid}:65534   {etc}/identity

# forced shell for the sshd config 
cat > {etc}/shell <<EOF
#!/bin/sh

export PATH="\$PATH:/.toolbox/bin"

uid=\`id -u\`
shell="\`getent passwd \$uid 2>/dev/null| cut -d : -f 7\`"
if [ -z "\$shell" ] ; then
    shell=/bin/sh
fi

if [ -n "\$SSH_ORIGINAL_COMMAND" ] ; then
    exec "\$shell" -c "\$SSH_ORIGINAL_COMMAND"
else
    exec "\$shell"
fi

EOF
chmod 755 {etc}/shell

# sshd config
cat > {etc}/sshd_config <<EOF
Port 22
Protocol 2

# turned off because it requires creating a 'sshd' user inside the sandbox
UsePrivilegeSeparation no

StrictModes no

ForceCommand /.sandbox/etc/ssh/shell

PermitRootLogin without-password

PubkeyAuthentication yes
AuthorizedKeysFile  /.sandbox/etc/ssh/identity.pub .ssh/authorized_keys .ssh/authorized_keys2

ChallengeResponseAuthentication no
PasswordAuthentication          no

X11Forwarding yes
X11DisplayOffset 10
PrintMotd no
PrintLastLog no
TCPKeepAlive yes

# Allow client to pass locale environment variables
AcceptEnv LANG LC_*

Subsystem sftp internal-sftp

UsePAM no
EOF
                    """.format(uid=uid,
                        hostname = "%s-sandbox-%s" % (ctrl.env, webapp.docker_name),
                        sbx = "/mnt/%s"         % webapp.docker_name,
                        etc = "/mnt/%s/etc/ssh" % webapp.docker_name,
                        run = "/mnt/%s/run"     % webapp.docker_name,
                        )],
585
                    host_config = ctrl.sandbox.create_host_config(
586
587
588
589
                        binds   = {
                            ctrl.sandbox_path: {"bind": "/mnt"},
                            ctrl.toolbox_path: {"bind": "/.toolbox", "mode": "ro"},
                    }))
590
591
            ctrl.sandbox.start(container)
            if ctrl.sandbox.wait(container):
592
593
                log.debug("sandbox %s output:\n%s", webapp.docker_name,
                        ctrl.sandbox.logs(container).decode(errors="replace"))
594
595
596
597
                raise Error("sandbox preparation failed")
            ctrl.sandbox.remove_container(container)

            # create and start the sandbox
598
599
600
601
602

            etc_dir = os.path.join(ctrl.sandbox_path, webapp.docker_name, "etc")
            run_dir = os.path.join(ctrl.sandbox_path, webapp.docker_name, "run")
            ctrl.check_host_path("isdir", etc_dir)
            ctrl.check_host_path("isdir", run_dir)
603
604
605
606

            if version is None and webapp.entrypoint:
                # prepend instructions to initialise a dummy entrypoint
                dn, bn = os.path.split(webapp.entrypoint)
BAIRE Anthony's avatar
todo    
BAIRE Anthony committed
607
                # FIXME: do nothing if entrypoint already exists
608
                prepare = """
609
                    {mkdir}
610
                    test -f {entrypoint} || cat > {entrypoint} <<EOF
611
612
613
614
615
616
617
#!/bin/sh
echo
echo "This is app '{name}' called with parameters '\$@'"
echo
echo "The workdir contains:"
ls -l
EOF
618
                    chmod 0755 -- {entrypoint}
619

620
                """.format( entrypoint  = shlex.quote(webapp.entrypoint),
621
                            name        = webapp.docker_name,
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
                            mkdir       = (("mkdir -p -- %s" % shlex.quote(dn)) if dn else ""))
            else:
                prepare = ""

            command = ["/bin/sh", "-c", """
set -x
export PATH="$PATH:/.toolbox/bin"

{prepare}

# xauth file (needed for X11 forwarding)
touch       /root/.Xauthority
chmod 600   /root/.Xauthority

exec /.toolbox/bin/sshd -D
            """.format(prepare=prepare)]
638

639
            ctrl.sandbox.create_container(image, name=container, hostname=container,
640
                    command = command,
641
642
                    host_config = ctrl.sandbox.create_host_config(
                        binds = {
643
644
645
                            etc_dir: {"bind": "/.sandbox/etc", "mode": "ro"},
                            run_dir: {"bind": "/.sandbox/run", "mode": "rw"},
                            ctrl.toolbox_path: {"bind": "/.toolbox", "mode": "ro"},
646
647
648
                            },
                        # TODO: maybe drop other caps
                        cap_drop = ["NET_RAW"],
649
                        restart_policy = {"Name": "unless-stopped"},
650
                        network_mode = ctrl.sandbox_network,
651
652
653
654
655
                        ))

            ctrl.sandbox.start(container)

        except:
BAIRE Anthony's avatar
BAIRE Anthony committed
656
657
            with docker_warning("cleanup error: unable to remove container %r",
                    container, ignore=docker.errors.NotFound):
658
659
660
                ctrl.sandbox.remove_container(container, force=True)
            raise

BAIRE Anthony's avatar
BAIRE Anthony committed
661
    def _commit(self, webapp, versions):
662
663
664
665
666
667
668
669
670
671
672
        """Commit a webapp sandbox

        (to be executed in a thread pool)

        The image version is looked up in webapp_versions (where state==sandbox).
        
        In case of any error, a recovery version is committed instead (to avoid
        loosing the work done inside the sandbox) and the candidates are put in
        error state.
        """
        ctrl = self.ctrl
BAIRE Anthony's avatar
BAIRE Anthony committed
673
        ses  = ctrl.session
674

BAIRE Anthony's avatar
BAIRE Anthony committed
675
676
677
678
679
680
        # pre-commit checks
        # - ensure that there is exactly one candidate webapp_version with
        #   state=sandbox
        # - otherwise:
        #   - put all candidates in error state
        #   - create a recovery version
681

BAIRE Anthony's avatar
BAIRE Anthony committed
682
683
684
685
686
        # version be committed
        version = None
        
        # error msg (if any)
        error = None
687

BAIRE Anthony's avatar
BAIRE Anthony committed
688
689
        # version ids to be recovered
        recover = ()
690

BAIRE Anthony's avatar
BAIRE Anthony committed
691
692
693
694
695
696
        if len(versions) == 1:
            # normal case (sandbox commit)
            version = versions[0]
            if not version.number:
                error   = "empty version number"
                recover = version.id,
697

BAIRE Anthony's avatar
BAIRE Anthony committed
698
699
700
        elif not versions:
            # sandbox rollback (when user drops a sandbox without committing a new image)
            error     = "dangling sandbox"
701

BAIRE Anthony's avatar
BAIRE Anthony committed
702
703
704
705
706
        else:
            # multiple candidates (should never happen)
            error    = "multiple candidate versions (%s)" % (
                ", ".join(map(repr, sorted(v.number for v in versions))))
            recover = tuple(v.id for v in versions)
707

BAIRE Anthony's avatar
BAIRE Anthony committed
708
709
        # TODO: make 'sandbox' a reserved name

BAIRE Anthony's avatar
BAIRE Anthony committed
710
711
712
        if error:
            changelog = "pre-commit error: " + error
            log.error("sandbox %r version id %r: %s", webapp.docker_name, recover, changelog)
713

BAIRE Anthony's avatar
BAIRE Anthony committed
714
            with ses.begin():
715
716
                # put all candidates in 'error state'
                if recover:
BAIRE Anthony's avatar
BAIRE Anthony committed
717
                    ses.execute('''UPDATE dj_webapp_versions
BAIRE Anthony's avatar
fix sql    
BAIRE Anthony committed
718
719
                            SET changelog=CONCAT(changelog, " [", :changelog, "]"), state=:state
                            WHERE id IN :ids''', dict(changelog=changelog, ids=recover,
BAIRE Anthony's avatar
BAIRE Anthony committed
720
                                state=int(VersionState.ERROR)))
721
722

                # create a recovery version
BAIRE Anthony's avatar
BAIRE Anthony committed
723
724
725
726
727
                version = WebappVersion(
                        webapp_id = webapp.id,
                        number    = time.strftime("recovery-%Y%m%d-%H%M%S"),
                        changelog = changelog,
                        published = False,
BAIRE Anthony's avatar
BAIRE Anthony committed
728
                        state     = int(VersionState.SANDBOX))
BAIRE Anthony's avatar
BAIRE Anthony committed
729
730
731
                ses.add(version)
            ses.refresh(version)
            ses.expunge(version)
732

BAIRE Anthony's avatar
BAIRE Anthony committed
733
        assert version is not None
734

BAIRE Anthony's avatar
BAIRE Anthony committed
735
        # commit the docker image
736

BAIRE Anthony's avatar
BAIRE Anthony committed
737
738
        log.debug("dicts %r %r", webapp.__dict__, version.__dict__)
        log.info("commit sandbox %r version %r", webapp.docker_name, version.number)
739

BAIRE Anthony's avatar
BAIRE Anthony committed
740
741
742
743
        container = webapp.sandbox_name
        next_state = image_size = None
        try:
            # stop the container (if stopping or if creating a new sandbox)
BAIRE Anthony's avatar
BAIRE Anthony committed
744
            if webapp.sandbox_state in (SandboxState.STOPPING, SandboxState.STARTING):
BAIRE Anthony's avatar
BAIRE Anthony committed
745
746
               ctrl.sandbox.stop(container)
               ctrl.sandbox.wait(container)
747

BAIRE Anthony's avatar
BAIRE Anthony committed
748
749
            # commit
            cid = ctrl.sandbox.commit(container, webapp.image_name, version.number)
BAIRE Anthony's avatar
BAIRE Anthony committed
750
            next_state = VersionState.COMMITTED
BAIRE Anthony's avatar
BAIRE Anthony committed
751
752
753
754
755
756
757
            image_size = ctrl.sandbox.inspect_image(cid)["Size"]

            return version, error

        except docker.errors.NotFound:
            error = "commit error: container not found %r" % container
            log.error("%s", error)
BAIRE Anthony's avatar
BAIRE Anthony committed
758
            next_state = VersionState.ERROR
BAIRE Anthony's avatar
BAIRE Anthony committed
759
            image_size = 0
BAIRE Anthony's avatar
BAIRE Anthony committed
760
            ses.execute('''UPDATE dj_webapp_versions
BAIRE Anthony's avatar
BAIRE Anthony committed
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
                    SET changelog=CONCAT(changelog, " [commit error: sandbox is down]")
                    WHERE id=%d''' % version.id)

            # here we do not propagate the error to allow starting/stopping the
            # sandbox immediately (without going through sandbox_state=:error)
            return None, error

        except Exception as e:
            log.exception("sandbox %r version %r: unexpected commit error (sandbox may still be recovered)",
                    webapp.docker_name, version.number)
            raise

        finally:
            # NOTE: if anything unexpected happens, the version is
            # left in state 'sandbox' and we propagate the exception to
            # ensure the work done inside the sandbox is not dropped
            # and the sandbox is pute in 'error' state
            #
            # The error will not be reported to the user. If this is an issue,
            # then the solution would be to create another error state (to be
            # used when the sandbox is still there).
            # 
            if next_state is not None:
                with ses.begin():
BAIRE Anthony's avatar
BAIRE Anthony committed
785
                    ses.execute("UPDATE dj_webapp_versions SET state=%d, docker_image_size=%d WHERE id=%d"
BAIRE Anthony's avatar
BAIRE Anthony committed
786
                            % (next_state, image_size, version.id))
BAIRE Anthony's avatar
BAIRE Anthony committed
787

788

BAIRE Anthony's avatar
BAIRE Anthony committed
789
    @asyncio.coroutine
BAIRE Anthony's avatar
BAIRE Anthony committed
790
    def _manage_commit(self, webapp, versions, *, force=False):
BAIRE Anthony's avatar
BAIRE Anthony committed
791
792
793
794
795
796
797
798
        """Manage sandbox commit (if needed) and notify the image manager

        The commit is performed if one of these conditions is fulfilled:
        - a commit was requested (there is at least one WebappVersion entry
          with state=sandbox for this app)
        - a docker container exists (for this sandbox) and force is true
        """

BAIRE Anthony's avatar
BAIRE Anthony committed
799
        if not (versions or (force and self.inspect_sandbox(webapp) is not None)):
BAIRE Anthony's avatar
BAIRE Anthony committed
800
801
802
            return
        
        # do the commit
BAIRE Anthony's avatar
BAIRE Anthony committed
803
        version, error = yield from self.run_in_executor(self._commit, webapp, versions)
BAIRE Anthony's avatar
BAIRE Anthony committed
804
805

        # trigger push/pull operations (but do not wait)
BAIRE Anthony's avatar
BAIRE Anthony committed
806
        if version is not None:
807
808

            # push to the registry
BAIRE Anthony's avatar
BAIRE Anthony committed
809
            self.ctrl.image_manager.push(version.id)
810
811
812
813

            if not error:
                # preemptive pull to the swarm
                # (the image may be needed soon)
BAIRE Anthony's avatar
BAIRE Anthony committed
814
                self.ctrl.image_manager.pull(version.id, swarm=True)
815
816


BAIRE Anthony's avatar
BAIRE Anthony committed
817
818
    def _stop(self, webapp):
        """Stop a webapp sandbox
819

BAIRE Anthony's avatar
BAIRE Anthony committed
820
        (to be executed in a thread pool)
821
822
823
        """
        try:
            # FIXME: remove volumes (v=True) too ?
BAIRE Anthony's avatar
BAIRE Anthony committed
824
            self.ctrl.sandbox.remove_container(webapp.sandbox_name, force=True)
825
826
827
        except docker.errors.NotFound:
            pass

828
829

    @asyncio.coroutine
BAIRE Anthony's avatar
BAIRE Anthony committed
830
    def _process(self, webapp_id, reset, rescheduled):
831
832
833
        ctrl = self.ctrl

        log.debug("process sandbox %d", webapp_id)
BAIRE Anthony's avatar
BAIRE Anthony committed
834
835
836
837
838
839
840
841

        ses = ctrl.session
        with ses.begin():
            # current state of the sandbox + load docker os
            webapp = ses.query(Webapp).filter_by(id=webapp_id).one()
            webapp.docker_os

            # version to be started
842
            sandbox_version = webapp.sandbox_version
BAIRE Anthony's avatar
BAIRE Anthony committed
843
844
845
846
847
848
849
850
851
852

            # requested commits
            commit_versions = self.filter_commit_version(ses.query(WebappVersion), webapp_id).all()

            ses.expunge_all()

        # docker name of the sandbox & image
        webapp.sandbox_name = ctrl.gen_sandbox_name(webapp)
        webapp.image_name   = ctrl.gen_image_name(webapp)

BAIRE Anthony's avatar
BAIRE Anthony committed
853
        phase = "inspect"
854
        next_state = fail_state = None
855
        try:
BAIRE Anthony's avatar
BAIRE Anthony committed
856
            if webapp.sandbox_state == SandboxState.STARTING:
857
                # start the sandbox
BAIRE Anthony's avatar
BAIRE Anthony committed
858
                phase = "start"
BAIRE Anthony's avatar
BAIRE Anthony committed
859
860
                next_state = SandboxState.RUNNING
                fail_state = SandboxState.START_ERROR
861

BAIRE Anthony's avatar
BAIRE Anthony committed
862
                # commit (if a sandbox exists)
BAIRE Anthony's avatar
BAIRE Anthony committed
863
                yield from self._manage_commit(webapp, commit_versions, force=True)
BAIRE Anthony's avatar
BAIRE Anthony committed
864

BAIRE Anthony's avatar
BAIRE Anthony committed
865
                if sandbox_version is not None:
866
867
868
869
870
871
                    # ensure version belongs to this application
                    if sandbox_version.webapp_id != webapp.id:
                        raise Error("invalid version id %d (belongs to webapp %d)" % (
                            sandbox_version.id, sandbox_version.webapp_id))

                    # pull requested image
BAIRE Anthony's avatar
BAIRE Anthony committed
872
                    yield from ctrl.image_manager.pull(sandbox_version.id)
873
874
875
876
877
                else:
                    # pull image
                    yield from ctrl.image_manager.sandbox_pull_manager.process((
                        ctrl.gen_factory_name(webapp.docker_os),
                        webapp.docker_os.version))
BAIRE Anthony's avatar
BAIRE Anthony committed
878
879

                # start sandbox
BAIRE Anthony's avatar
BAIRE Anthony committed
880
                yield from self.run_in_executor(self._start, webapp, sandbox_version)
881

BAIRE Anthony's avatar
BAIRE Anthony committed
882
            elif webapp.sandbox_state == SandboxState.STOPPING:
883
                # stop the sandbox
BAIRE Anthony's avatar
BAIRE Anthony committed
884
                phase = "stop"
BAIRE Anthony's avatar
BAIRE Anthony committed
885
886
                next_state = SandboxState.IDLE
                fail_state = SandboxState.STOP_ERROR
887

BAIRE Anthony's avatar
BAIRE Anthony committed
888
                # commit (if requested)
BAIRE Anthony's avatar
BAIRE Anthony committed
889
                yield from self._manage_commit(webapp, commit_versions)
890

BAIRE Anthony's avatar
BAIRE Anthony committed
891
                yield from self.run_in_executor(self._stop, webapp)
892
893

            else:
BAIRE Anthony's avatar
BAIRE Anthony committed
894
895
                # commit (if requested)
                phase = "commit"
BAIRE Anthony's avatar
BAIRE Anthony committed
896
                yield from self._manage_commit(webapp, commit_versions)
BAIRE Anthony's avatar
BAIRE Anthony committed
897

898
899
900
901
        except ShuttingDown:
            next_state = None
            log.info("sandbox %r %s aborted (controller shutdown)", webapp.docker_name, phase)

902
903
904
905
        except BaseException as e:
            next_state = fail_state

            log_func = log.error if isinstance(e, (docker.errors.APIError, Error)) else log.exception
BAIRE Anthony's avatar
BAIRE Anthony committed
906
            log_func ("sandbox %r %s error (%s)", webapp.docker_name, phase,
907
908
909
910
911
912
913
914
                    traceback.format_exception_only(type(e), e)[-1].strip())

        finally:
            if next_state is not None:
                # atomically update the sandbox state in the db
                # (in case another action is requested during the process, eg: the user
                #  stops the sandbox while it is not fully started)
                log.info("sandbox %r is now in state %r", webapp.docker_name, next_state.name)
BAIRE Anthony's avatar
BAIRE Anthony committed
915
                with ses.begin():
BAIRE Anthony's avatar
BAIRE Anthony committed
916
                    ses.execute("UPDATE dj_webapps SET sandbox_state=%d WHERE id=%d AND sandbox_state=%d" %
BAIRE Anthony's avatar
BAIRE Anthony committed
917
                            (next_state, webapp_id, webapp.sandbox_state))
918

919
920
921
            log.debug("done    sandbox %d", webapp_id)


BAIRE Anthony's avatar
BAIRE Anthony committed
922
class JobManager(Manager):
923
    class JobInfo:
BAIRE Anthony's avatar
BAIRE Anthony committed
924
        __slots__ = "job_id", "ver_id", "ctr_id", "version", "ctr_name", "client", "cpu", "mem", "node_id", "timeout"
925

926
    def __init__(self, ctrl):
927
        super().__init__(0)
BAIRE Anthony's avatar
BAIRE Anthony committed
928
929
        self.ctrl = ctrl

930
931
932
933
934
935

    @asyncio.coroutine
    def __iter__(self):
        raise NotImplementedError()


936
    def _create_job(self, info):
BAIRE Anthony's avatar
BAIRE Anthony committed
937
938
        ctrl = self.ctrl
        ses  = ctrl.session
939
940
941
        tmp_img = None

        assert info.ctr_id is None
BAIRE Anthony's avatar
BAIRE Anthony committed
942
943
944

        try:
            with ses.begin():
945
                job = ses.query(Job).filter_by(id=info.job_id).one()
BAIRE Anthony's avatar
BAIRE Anthony committed
946
947
948
                webapp = job.webapp

                log.info("start job %d (%s:%s)",
949
                        info.job_id, webapp.docker_name, info.version)
BAIRE Anthony's avatar
BAIRE Anthony committed
950

BAIRE Anthony's avatar
BAIRE Anthony committed
951
                job.state = int(JobState.RUNNING)       # pragma: nobranch (TODO: remove (coverage bug))
BAIRE Anthony's avatar
BAIRE Anthony committed
952
953
954

            
            repo = ctrl.gen_image_name(webapp)
955
            image = "%s:%s" % (repo, info.version)
BAIRE Anthony's avatar
BAIRE Anthony committed
956
957
958
959

            job_path = ctrl.gen_job_path(job)
            log.debug("job.path: %r", job_path)

960
961
962
            if info.ver_id is None:
                assert info.version == "sandbox"
                image = tmp_img = info.client.commit(ctrl.gen_sandbox_name(webapp), repo, info.version)["Id"]
BAIRE Anthony's avatar
BAIRE Anthony committed
963
964
965
966
967
            
            # TODO use another workdir
            # TODO use another uid

            ctrl.check_host_path("isdir", job_path)
968
            hc = ctrl.sandbox.create_host_config(
969
                        binds = {job_path: {"bind": "/tmp"}},
970
971
972
                        cap_drop = ["all"],
                        # FIXME: CAP_DAC_OVERRIDE needed because all nfs files have uid,gid=1000,1000
                        cap_add = ["dac_override"],
973
974
975
976
                        cpu_quota   = (None if info.cpu is None else (info.cpu * 1024)),
                        cpu_period  = (None if info.cpu is None else 1024),
#                        cpu_shares = info.cpu,
                        mem_limit = info.mem,
977
                    )
978
979
980
981
            # NOTE: cpu_shares has a different meaining in docker swarm and docker engine
            #  - swarm:  nb of cpus
            #  - engine: 1/1024 share of the total cpu resouces of the machine
            # engine requires  cpu_share>1
982
            if ctrl.cpu_shares:
BAIRE Anthony's avatar
BAIRE Anthony committed
983
                # TODO: upgrade docker-py (and use create_host_config)
984
                hc["CpuShares"] = info.cpu
985
            log.debug("host_config %r", hc)
986
            info.ctr_id = info.client.create_container(image, name=info.ctr_name,
987
                    working_dir = "/tmp",
988
                    # NOTE: the command line is a little complex, but this is
BAIRE Anthony's avatar
todos    
BAIRE Anthony committed
989
                    #   to ensure that (TODO write tests for this):
990
991
992
                    #   - no output is lost (we go though a pipe in case the
                    #     app has multiple processes writing to stdout/stderr
                    #     concurrently)
993
                    #     FIXME: maybe ">>allgo.log  2>&1" is sufficent
994
995
                    #   - we get the exit code of the app (not the exit code of
                    #     cat)
BAIRE Anthony's avatar
BAIRE Anthony committed
996
997
                    #   - SIGTERM & SIGALRM are forwarded to the process (and
                    #     we call wait again becauce of EINTR)
998
999
1000
1001
                    #   - we have no unusual dependencies (only sh, cat, trap,
                    #     kill and mkfifo)
                    #   - we display a warning if the memory limit was reached
                    #     during the job
1002
                    command = ["/bin/sh", "-c", """
1003
1004
1005
1006
                                pid=
                                interrupted=
                                sighnd() {{
                                    (echo
BAIRE Anthony's avatar
BAIRE Anthony committed
1007
1008
1009
                                    echo "====  ALLGO JOB $2  ===="
                                    kill "-$1" "$pid") >>allgo.log 2>&1
                                    trap '' TERM ALRM
1010
1011
                                    interrupted=1
                                }}
BAIRE Anthony's avatar
BAIRE Anthony committed
1012
1013
                                trap "sighnd TERM ABORT"   TERM
                                trap "sighnd ALRM TIMEOUT" ALRM
1014
                                fifo=/.allgo.fifo.{job_id}
1015
1016
1017
1018
1019
1020
1021
1022
1023
                                mkfifo "$fifo" 2>>allgo.log || exit $?

                                exec cat <"$fifo" >>allgo.log &
                                exec "$@" >"$fifo" 2>&1 &
                                pid=$!

                                wait %2
                                code=$?
                                if [ -n "$interrupted" ] ; then
1024
                                    wait %2
1025
                                    code=$?
1026
                                fi
1027
                                wait %1
BAIRE Anthony's avatar
BAIRE Anthony committed
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
                                trap '' TERM ALRM

                                [ -n "$interrupted" ] || (
                                    echo
                                    if [ $code -eq 0 ] ; then
                                        echo "====  ALLGO JOB SUCCESS  ===="
                                    else
                                        echo "====  ALLGO JOB ERROR  ===="
                                        echo "process exited with code $code"
                                    fi

                                    failcnt="`cat /sys/fs/cgroup/memory/memory.failcnt`"
                                    if [ "$failcnt" -ne 0 ] ; then
BAIRE Anthony's avatar
BAIRE Anthony committed
1041
                                        echo "WARNING: memory limit was reached (memory.failcnt=$failcnt)"
BAIRE Anthony's avatar
BAIRE Anthony committed
1042
1043
                                    fi
                                ) >>allgo.log 
1044
1045

                                exit $code
BAIRE Anthony's avatar
BAIRE Anthony committed
1046
                        """.format(job_id=job.id),
1047
                        "job%d" % job.id, webapp.entrypoint] + shlex.split(job.param),
BAIRE Anthony's avatar
BAIRE Anthony committed
1048

1049
                    labels = {"allgo.tmp_img": tmp_img or ""},
1050
                    environment=["constraint:node==" + info.node_id],
1051
                    host_config = hc)["Id"]
1052
            info.client.start(info.ctr_id)
1053
1054
1055
1056
1057

            with ses.begin():
                # save the container_id into the db
                job.container_id = info.ctr_id

1058
        except:
BAIRE Anthony's avatar
todo    
BAIRE Anthony committed
1059
            #TODO introduce a state JobState.ERROR
1060
1061
            self._remove_job(info, tmp_img=tmp_img)
            raise
BAIRE Anthony's avatar
BAIRE Anthony committed
1062
1063


1064
1065
    def _remove_job(self, info, *, tmp_img=None):
        ses = self.ctrl.session
BAIRE Anthony's avatar
BAIRE Anthony committed
1066

1067
1068
1069
        # TODO: report launch errors to the user
        # TODO: report exit code to the user
        # TODO: use another uid
BAIRE Anthony's avatar
BAIRE Anthony committed
1070

1071
1072
1073
1074
1075
1076
1077
        def parse_docker_timestamp(value):
            return datetime.datetime.strptime(
                    # limit the precision to the microsecond
                    # (othewise strptime fails)
                    re.sub(r"(\.\d{,6})\d*Z$", r"\1Z", value),
                    # iso8601 format
                    "%Y-%m-%dT%H:%M:%S.%fZ")
BAIRE Anthony's avatar
BAIRE Anthony committed
1078

1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
        with ses.begin():
            job = ses.query(Job).filter_by(id=info.job_id).one()

            exec_time = 0.0
            if info.ctr_id is not None:
                try:
                    js = info.client.inspect_container(info.ctr_id)
                except docker.errors.NotFound:
                    pass
                else:
                    started_at  = js["State"].get("StartedAt", "0001-")
                    finished_at = js["State"].get("FinishedAt")
BAIRE Anthony's avatar
BAIRE Anthony committed
1091

1092
1093
1094
1095
1096
1097
                    # default docker date is '0001-01-01T00:00:00Z'
                    if not started_at.startswith("0001-"):
                        try:
                            exec_time =( parse_docker_timestamp(finished_at)
                                        - parse_docker_timestamp(started_at)
                                        ).total_seconds()
BAIRE Anthony's avatar
BAIRE Anthony committed
1098
1099
                        except Exception: # pragma: nocover
                            log.exception("job %d: unable to compute exec time", info.job_id)
BAIRE Anthony's avatar
BAIRE Anthony committed
1100

1101
                    if tmp_img is None:
BAIRE Anthony's avatar
BAIRE Anthony committed
1102
                        tmp_img = js["Config"]["Labels"].get("allgo.tmp_img") or None
BAIRE Anthony's avatar
BAIRE Anthony committed
1103

1104
1105
                with docker_warning("job %d: cleanup error: unable to remove container", info.job_id):
                    info.client.remove_container(info.ctr_id)
BAIRE Anthony's avatar
BAIRE Anthony committed
1106
1107

            if tmp_img is not None:
1108
1109
1110
1111
1112
                with docker_warning("job %d: cleanup error: unable to remove tmp image", info.job_id):
                    info.client.remove_image(tmp_img)

            job.exec_time = exec_time
            job.state     = int(JobState.DONE)
1113
            job.container_id = None
BAIRE Anthony's avatar
BAIRE Anthony committed
1114
1115
1116
1117
            if job.result == JobResult.NONE:
                # FIXME: maybe we should have a 'unknown' result
                log.warning("job %d has not result, will fallback to 'ERROR'", info.job_id)
                job.result = int(JobResult.ERROR)
1118

BAIRE Anthony's avatar
BAIRE Anthony committed
1119
        log.info("done  job %d (result=%s, duration=%fs)", info.job_id, JobResult(job.result).name, exec_time)
1120
1121
1122


    @asyncio.coroutine
BAIRE Anthony's avatar
BAIRE Anthony committed
1123
    def _finish_job(self, info, reset):
1124
        # wait for container termination (if running)
1125
        if info.ctr_id is not None:
BAIRE Anthony's avatar
BAIRE Anthony committed
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
            def kill(sig):
                log.debug("kill job %d (signal %d)", info.job_id, sig)
                with docker_warning("unable to kill container %r", info.ctr_id):
                    info.client.kill(info.ctr_id, sig)

            @asyncio.coroutine
            def stop(sig, reason):
                log.info("stop  job %d (%s)", info.job_id, reason)
                try:
                    # graceful kill
                    kill(sig)
BAIRE Anthony's avatar
BAIRE Anthony committed
1137
                    yield from asyncio.wait_for(asyncio.shield(wait_task), timeout=5)
BAIRE Anthony's avatar
BAIRE Anthony committed
1138
1139
1140
1141
                except asyncio.TimeoutError:
                    # hard kill (after 5 seconds)
                    kill(signal.SIGKILL)

1142
            wait_task = asyncio.async(info.client.wait_async(info.ctr_id))
BAIRE Anthony's avatar
BAIRE Anthony committed
1143
1144
            timeout_task = (asyncio.Future() if info.timeout is None
                    else asyncio.async(asyncio.sleep(info.timeout)))
BAIRE Anthony's avatar
BAIRE Anthony committed
1145
            result = None
1146
            try:
BAIRE Anthony's avatar
BAIRE Anthony committed
1147
                rescheduled = None
1148
                while not wait_task.done():
BAIRE Anthony's avatar
BAIRE Anthony committed
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
                    # we must ensure that the job is not aborted or if the
                    # timeout expires

                    if rescheduled is None or rescheduled.done():
                        # check the job state in the db
                        rescheduled = reset()
                        ses = self.ctrl.session
                        with ses.begin():
                            state, = ses.query(Job.state).filter_by(id=info.job_id).one()
                            if state == JobState.ABORTING:
                                yield from stop(signal.SIGTERM, "user abort")
BAIRE Anthony's avatar
BAIRE Anthony committed
1160
                                result = result or JobResult.ABORTED
BAIRE Anthony's avatar
BAIRE Anthony committed
1161
1162
1163
1164

                    elif timeout_task.done():
                        # timeout !
                        yield from stop(signal.SIGALRM, "timeout")
BAIRE Anthony's avatar
BAIRE Anthony committed
1165
                        result = result or JobResult.TIMEOUT
BAIRE Anthony's avatar
BAIRE Anthony committed
1166
1167

                    yield from asyncio.wait((wait_task, timeout_task, rescheduled),
1168
                            return_when=asyncio.FIRST_COMPLETED)
BAIRE Anthony's avatar
BAIRE Anthony committed
1169
1170
1171
1172
1173

                returncode = wait_task.result()
                log.debug("job %d exit code: %r", info.job_id, returncode)
                result = result or (JobResult.SUCCESS if returncode==0 else JobResult.ERROR)

1174
1175
            finally:
                wait_task.cancel()
BAIRE Anthony's avatar
BAIRE Anthony committed
1176
                timeout_task.cancel()
1177
1178
                with contextlib.suppress(asyncio.CancelledError):
                    yield from wait_task
BAIRE Anthony's avatar
BAIRE Anthony committed
1179

BAIRE Anthony's avatar
BAIRE Anthony committed
1180
1181
1182
                if result is not None:
                    ses = self.ctrl.session
                    with ses.begin():
BAIRE Anthony's avatar
BAIRE Anthony committed
1183
                        ses.execute("UPDATE dj_jobs SET result=%d WHERE id=%d AND result=%d" %
BAIRE Anthony's avatar
BAIRE Anthony committed
1184
1185
                                (int(result),