controller.py 74.8 KB
Newer Older
1
2
3
4
#!/usr/bin/python3


import asyncio
5
import collections
6
from   concurrent.futures   import ThreadPoolExecutor
7
import contextlib
8
import datetime
9
import enum
BAIRE Anthony's avatar
BAIRE Anthony committed
10
import itertools
BAIRE Anthony's avatar
BAIRE Anthony committed
11
import json
12
13
14
import logging
import re
import os
15
import shlex
16
import signal
17
import struct
18
import sys
19
20
21
import time
import threading
import traceback
22

23
import aiohttp
24
import aioredis
25
import docker
BAIRE Anthony's avatar
BAIRE Anthony committed
26
import iso8601
27
import MySQLdb
BAIRE Anthony's avatar
BAIRE Anthony committed
28
29
from   sqlalchemy import desc
import sqlalchemy.orm.scoping
30
import yaml
31

32
import config_reader
33
from database import *
34
from shared_swarm import SharedSwarmClient, ShuttingDown
35

36
37
HOST_PATH="/vol/host/"

38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
##################################################
# default number of concurrent tasks

# pushing images from the sandbox to the registry
NB_PUSH_SANDBOX = 2

# pulling images from the registry to the sandbox
NB_PULL_SANDBOX = 2

# pulling images from the registry to the swarm
NB_PULL_SWARM   = 2

# sandbox actions (start, stop, commit, ...)
NB_SANDBOX_TASKS = 4

# jobs
NB_JOB_TASKS     = 4


# default thread pool (used by most of the tasks)
default_executor = ThreadPoolExecutor(10)

##################################################
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# redis keys

# job log
REDIS_KEY_JOB_LOG       = "log:job:%d"
REDIS_KEY_JOB_STATE     = "state:job:%d"

# pubsub channels for waking up allgo.aio (frontend) and the controller
# (ourselves)
REDIS_CHANNEL_AIO        = "notify:aio"
REDIS_CHANNEL_CONTROLLER = "notify:controller"

# pubsub messages
REDIS_MESSAGE_JOB_UPDATED    = "job:%d"
REDIS_MESSAGE_WEBAPP_UPDATED = "webapp:%d"


##################################################

79

80
81
82
# interval (in seconds) for polling the db
DB_CHECK_PERIOD = 3600

BAIRE Anthony's avatar
BAIRE Anthony committed
83
log = logging.getLogger("controller")
84

85
86
87
88
89
assert MySQLdb.threadsafety >= 1


class Error(Exception):
    pass
90

91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120

class RateLimiter:
    """Generator for rate limiting

    This asynchronous iterator ensures we spend at least `period` seconds in
    each iteration.

    usage:
        async for _ in RateLimiter(60):
            ...
    """
    def __init__(self, period):
        assert period > 0
        self.period = period
        self._t0 = time.monotonic()-period

    def __aiter__(self):
        return self

    async def __anext__(self):
        t1 = time.monotonic()
        delay = self._t0 - t1 + self.period
        if delay > 0:
            log.debug("rate_limit: sleep %f seconds", delay)
            await asyncio.sleep(delay)
            self._t0 = t1 + delay
        else:
            self._t0 = t1


BAIRE Anthony's avatar
BAIRE Anthony committed
121
122
123
124
125
126
127
128
129
130
131
132
133
def docker_check_error(func, *k, **kw):
    """Wrapper for docker-py methods that produce a stream

    Methods producing a stram (eg: push, build) do not report all errors
    by raising exceptions. Some errors are reported later in the stream.

    This function parses the stream and raise Error() if needed.
    """
    for elem in func(*k, stream=True, **kw):
        js = json.loads(elem.decode())
        if "error" in js:
            raise Error("push error: " + js["error"])

BAIRE Anthony's avatar
BAIRE Anthony committed
134
135
136
137
138
139
140
141
142
@contextlib.contextmanager
def docker_warning(msg, *k, ignore=None):
    """Catch docker errors and issue a warning instead"""
    try:
        yield
    except docker.errors.APIError as e:
        if ignore is None or not isinstance(e, ignore):
            k += e,
            log.warning(msg + " (%s)", *k)
BAIRE Anthony's avatar
BAIRE Anthony committed
143

BAIRE Anthony's avatar
BAIRE Anthony committed
144
145
@contextlib.contextmanager
def report_error(fmt, *k):
BAIRE Anthony's avatar
doc    
BAIRE Anthony committed
146
147
148
149
150
151
152
    """Context manager for logging exceptions

    This function logs exceptions (when leaving the context) with log.error()
    (if the exception inherit from Error) or log.exception() otherwise.

    The log message is prepended with the string generated by: fmt % k
    """
BAIRE Anthony's avatar
BAIRE Anthony committed
153
154
155
156
157
158
159
160
    try:
        yield
    except Exception as e:
        msg = fmt % k
        log_func = log.error if isinstance(e, Error) else log.exception
        log_func("%s (%s)", msg,
                traceback.format_exception_only(type(e), e)[-1].strip())
        raise
161

BAIRE Anthony's avatar
BAIRE Anthony committed
162
163
164
165
166
167
168
169
170
171
172
173
174
def disable_future_warning(fut):
    """Add a dummy callback to a future to prevent asyncio warnings

    Return: the future

    The asynicio module log a warning message when the result of a future is
    not used. This function installs a dummy callback to the future so as to
    avoid this warning. This is useful for futures whose result *may* be
    ignored by the application.
    """
    fut.add_done_callback(lambda f: f.exception())
    return fut

175

176
177
178
179
180
181
182
183
184
def auto_create_task(func):
    """Decorator for forcing the creation of a task when a coroutine function is called

    Return a wrappers that calls the function, create the task on the fly and
    return it. Also it installs a no-op callback to avoid warnings in case the
    result is not used.
    """
    assert asyncio.iscoroutinefunction(func)

BAIRE Anthony's avatar
BAIRE Anthony committed
185
186
    return (lambda *k, **kw:
        disable_future_warning(asyncio.async(func(*k, **kw))))
BAIRE Anthony's avatar
BAIRE Anthony committed
187

188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def cascade_future(src, dst):
    """propagate the result of a future to another future
    
    This function installs a callback to the future `src`, that propagates its
    result to the future `dst`.
    """
    def callback(fut):
        ex = fut.exception()
        if ex is None:
            dst.set_result(fut.result())
        else:
            dst.set_exception(ex)

    if src.done():
        callback(src)
    else:
        src.add_done_callback(callback)

206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
async def _make_aiohttp_client(host):
    """Create a aiohttp session for connecting to a docker engine

    `host` is a docker host url (DOCKER_HOST) it may be either a `tcp://`
    or `unix://` url

    Return a tuple `(session, url)` where `session` is a aiohttp client
    session and `url` is the http(s) url of the docker engine
    """
    url = docker.utils.parse_host(host)
    mo = re.match(r"http\+unix:/+(/.*)", url)
    if mo is None:
        # HTTP over TCP
        connector = None
    else:
        # HTTP over unix socket
        url = "http://DUMMY.localhost"
        path = mo.group(1)
        connector = aiohttp.UnixConnector(path)
    session = aiohttp.ClientSession(connector=connector)
    return session, url
def make_aiohttp_client(host):
    return asyncio.get_event_loop().run_until_complete(_make_aiohttp_client(host))
BAIRE Anthony's avatar
todo    
BAIRE Anthony committed
229

230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
class DockerAuthConfigDict(dict):
    """dict of authentication credentials for docker.Client

    Keys: repository names (or possibly subrepositories)
    Values: auth config dicts (suitable for docker.Client.{pull,push})
    """

    def get_auth_config(self, image):
        """Get the suitable authentication credential for a given docker_image

        Return a 'auth_config' dict or None if no credentials are found
        """
        for registry, auth_config in self.items():
            if image.startswith(registry):
                l = len(registry)
                if image[l:l+1] in ("", "/", ":"):
                    log.debug("select auth_config %r for image %r", registry, image)
                    return auth_config
        log.debug("select no auth_config for image %r", image)

250
class Manager:
BAIRE Anthony's avatar
BAIRE Anthony committed
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
    """A class for scheduling asynchronous jobs on a collection of keys

    (the one-line summary is not very helpful, sorry)
    

    Job submission

    The job is scheduled on key KEY by calling .process(KEY), the function
    returns an asyncio.Future object (which will receive the result)

    If .process(KEY) is called while a job is already running for KEY, then the
    manager arranges the job to be run a second time in a row for the same key


    Job implementation

    This is an abstract class. The job shall be implemented as the coroutine
    named ._process() in the inherited class.

    The manager guarantees that ._process() cannot be called multiple times
    concurrently for the same key

    In case a shutdown is requested (see .shutdown()):
     - all jobs that are not yet started are cancelled
     - running jobs continue until they return or until they try to acquire the
       internal semaphore (which raises ShuttingDown())


    Concurrency

    The job tasks are all started immediately (whatever the number of existing
    jobs)

    It is possible to limit the concurrency to a maximum number of parallel
    tasks. Manager provides an internal semaphore (the number of tokens is set
    in .__init__())

    The semaphore can be locked in two ways:
     - by locking the manager:

            with (yield from self): ... concurrency limited to `nb_tokens`
            tasks ...

     - by calling .run_in_executor (this is for running non-async code):

            yield from self.run_in_executor(func, args...)

    Shutdown

    When .shutdown() is called the Manager ensures that all jobs are properly
    terminated
     - it cancels all jobs that are not yet started
     - it prevents starting new jobs
     - it lets running pending jobs, but interrupts them if they when ther try
       to acquire the internal semaphore

    All cancel/interrupted tasks have their future raise ShuttingDown().

    .shutdown() returns after all pending jobs are terminated.


    Thread safety

    - Manager is *not* thread safe, all public methods must be called from the
      same thread
    """
317

318
    class _Handle:
BAIRE Anthony's avatar
BAIRE Anthony committed
319
        __slots__ = "key", "cur", "nxt", "rescheduled"
320

321
    def __init__(self, nb_tokens=1, *, executor = default_executor, interruptible = False):
BAIRE Anthony's avatar
BAIRE Anthony committed
322
        # {key: _TaskHandler}
323
        self._handles = {}
324
        self._semaphore = asyncio.Semaphore(nb_tokens)
325
        self._shutdown = asyncio.Future()
326
        self._executor = executor
327
        self._interruptible = interruptible
328
329
330
331
332
333
334
335
336
337

    def _create_task(self, hnd):
        assert hnd.nxt is None
        def reset():
            assert not hnd.cur.done()
            nxt = hnd.nxt
            if nxt is not None:
                cascade_future(hnd.cur, nxt)
                hnd.nxt = None

BAIRE Anthony's avatar
BAIRE Anthony committed
338
339
340
341
342
343
            # recreate the 'rescheduled' future and return it
            hnd.rescheduled = disable_future_warning(asyncio.Future())
            return hnd.rescheduled

        hnd.rescheduled = disable_future_warning(asyncio.Future())
        hnd.cur = asyncio.async(self._process(hnd.key, reset, hnd.rescheduled))
344
        hnd.cur.add_done_callback(lambda fut: self._done(hnd))
BAIRE Anthony's avatar
BAIRE Anthony committed
345
        log.debug("task scheduled %r %r", self, hnd.key)
346
        return hnd.cur
347

BAIRE Anthony's avatar
BAIRE Anthony committed
348
349
350
351
352
    def process(self, key):
        """Schedule the job to be run on key `key`

        returns an asyncio.Future (that will provide the result of ._process())
        """
353
        if self._shutdown.done():
354
355
            return self._shutdown

BAIRE Anthony's avatar
BAIRE Anthony committed
356
        hnd = self._handles.get(key)
357
358
        if hnd is None:
            # create handle
BAIRE Anthony's avatar
BAIRE Anthony committed
359
360
            self._handles[key] = hnd = self._Handle()
            hnd.key = key
361
362
            hnd.cur = None
            hnd.nxt = None
BAIRE Anthony's avatar
BAIRE Anthony committed
363
            hnd.rescheduled = None
364
365
366
367
368
369
370
371

        if hnd.cur is None:
            # create task
            return self._create_task(hnd)
        else:
            # reschedule task
            if hnd.nxt is None:
                hnd.nxt = asyncio.Future()
BAIRE Anthony's avatar
BAIRE Anthony committed
372
373
            if not hnd.rescheduled.done():
                hnd.rescheduled.set_result(None)
374
375
376
377
            return hnd.nxt

    def _done(self, hnd):

BAIRE Anthony's avatar
BAIRE Anthony committed
378
        assert hnd is self._handles.get(hnd.key)
379
380
381
382
        assert hnd.cur.done()

        try:
            hnd.cur.result()
383
384
        except ShuttingDown:
            pass
385
        except Exception:
BAIRE Anthony's avatar
BAIRE Anthony committed
386
            log.exception("task %r %r unhandled exception", self, hnd.key)
387
388
389
390

        nxt     = hnd.nxt
        hnd.cur = hnd.nxt = None
        if nxt is None:
BAIRE Anthony's avatar
BAIRE Anthony committed
391
            del self._handles[hnd.key]
392
        else:
393
            cascade_future(self._create_task(hnd), nxt)
394

395

396
397
    @asyncio.coroutine
    def __iter__(self):
BAIRE Anthony's avatar
BAIRE Anthony committed
398
399
400
401
402
403
404
405
406
407
        """Coroutine for locking the internal semaphore
        
        Usage:
            with (yield from manager):
              ...

        Warning:
            after a shutdown is initiated, this function will always raise
            ShuttingDown()
        """
408
        ctx = yield from iter(self._semaphore)
409
        if self._shutdown.done():
410
411
412
413
414
415
            with ctx:
                raise ShuttingDown()
        return ctx


    @asyncio.coroutine
416
    def run_in_executor(self, *k, lock=True):
BAIRE Anthony's avatar
BAIRE Anthony committed
417
418
419
420
421
        """Run a function in a separate thread (with limited concurrency)

        This function locks the internal semaphore and runs the provided
        functionc call in a separate thread (using the executor)
        """
422
423
424
425
426
427
428
429

        def run():
            coro = asyncio.get_event_loop().run_in_executor(self._executor, *k)
            if self._interruptible:
                return next(asyncio.as_completed((coro, self._shutdown)))
            else:
                return coro
        
430
431
        if lock:
            with (yield from self):
432
                return (yield from run())
433
        else:
434
            return (yield from run())
435
436


BAIRE Anthony's avatar
BAIRE Anthony committed
437
    @asyncio.coroutine
BAIRE Anthony's avatar
BAIRE Anthony committed
438
    def _process(self, key, reset, rescheduled):
BAIRE Anthony's avatar
doc    
BAIRE Anthony committed
439
440
441
442
443
444
445
        """Actual implementation of the job (to be reimplemented in inherited classes)

        The Manager class guarantees that this function cannot be called
        multiple times concurrently on the same key (in case the same key is
        submitted multiple times, they Manager will call this function a second
        time after it has terminated).

BAIRE Anthony's avatar
BAIRE Anthony committed
446
447
448
449
        `rescheduled` is a future whose result is set when the job is being
        rescheduled (if process(key) is called before _process(key ...)
        terminates.

BAIRE Anthony's avatar
doc    
BAIRE Anthony committed
450
451
        `reset` is a function that may be called to reset the 'dirty' state of
        this key (this is to avoid calling ._process() a second time if not
BAIRE Anthony's avatar
BAIRE Anthony committed
452
        necessary) in that case, a new `rescheduled` future is returned.
BAIRE Anthony's avatar
doc    
BAIRE Anthony committed
453
        """
454
455
        raise NotImplementedError()

456
    @asyncio.coroutine
457
    def shutdown(self):
BAIRE Anthony's avatar
BAIRE Anthony committed
458
459
460
461
        """Initiate a graceful shutdown

        This coroutine terminates once all tasks are properly terminated.
        """
462
463
        exc = ShuttingDown()
        self._shutdown.set_exception(exc)
464
        self._shutdown.exception()  # to avoid asyncio warnings
465
466
467
        # cancel all 'next' tasks
        for hnd in self._handles.values():
            if hnd.nxt is not None:
468
                hnd.nxt.set_exception(exc)
469
470
                hnd.nxt = None

471
472
473
474
        if not self._interruptible:
            yield from asyncio.gather(
                    *(h.cur for h in self._handles.values() if h.cur is not None),
                    return_exceptions=True)
BAIRE Anthony's avatar
BAIRE Anthony committed
475

476
477

class SandboxManager(Manager):
BAIRE Anthony's avatar
BAIRE Anthony committed
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
    """Manager for sandbox operation

    This manager handles all sandbox operations (start, stop, commit).

    Operations are requested asynchronously in the database:
     - Webapp.sandbox_state=starting (for starting a sandbox)
     - Webapp.sandbox_state=stopping (for stopping a sandbox)
     - WebappVersion with state=sandbox (for committing a new image)

    All state changes on Webapp.sandbox_state are atomic (e.g: if the user
    requests a stop while the sandbox is starting then the manager finishes
    with starting the sandbox but does not update the state, and it runs
    immediately again to stop the sandbox)

    Whatever is the value of Webapp.sandbox_state, the manager first examines
    commit requests and makes the commit if requested.

    If the container already exists before starting the webapp (not possible in
    normal operations), then a recovery image is committed first.

    When a commit is successful. The image manager is notified for pushing the
    image to the registry and (if it is not a recovery image) to pull it to the
    swarm (because this image will very likely be used soon).

    State changes:

     - sandbox start:
            starting->running     (normal case)
            starting->start_error (error case)

     - sandbox stop:
            stopping->idle        (normal case)
            stopping->stop_error  (error case)

     - image commit:
            sandbox->committed  (normal case)
            sandbox->error      (error case)
            (none)->committed   (recovery version) 
    """
517

518
    def __init__(self, ctrl, nb_threads = NB_SANDBOX_TASKS):
519
        super().__init__(nb_threads)
520
521
522
        self.ctrl = ctrl


BAIRE Anthony's avatar
BAIRE Anthony committed
523
524
    def inspect_sandbox(self, webapp):
        try:
BAIRE Anthony's avatar
BAIRE Anthony committed
525
526
            return self.ctrl.sandbox.inspect_container(
                    self.ctrl.gen_sandbox_name(webapp))
BAIRE Anthony's avatar
BAIRE Anthony committed
527
528
529
        except docker.errors.NotFound:
            return None

BAIRE Anthony's avatar
BAIRE Anthony committed
530
531
532
533
534
535

    @staticmethod
    def filter_commit_version(query, webapp_id):
        """Narrow a WebappVersion query to select the candidate versions to be committed"""
        return (query.
                filter_by(webapp_id=webapp_id,
BAIRE Anthony's avatar
BAIRE Anthony committed
536
                         state = int(VersionState.SANDBOX))
BAIRE Anthony's avatar
BAIRE Anthony committed
537
538
539
                    )

    def _start(self, webapp, version):
540
541
542
543
544
545
        """Start a webapp sandbox

        (to be executed in a thread pool)
        """

        ctrl = self.ctrl
BAIRE Anthony's avatar
BAIRE Anthony committed
546
        ses  = ctrl.session
547
548

        # prepare sandbox parameters
BAIRE Anthony's avatar
BAIRE Anthony committed
549
550
551

        # docker image
        if version is None:
552
            image = "%s:%s" % (ctrl.gen_factory_name(webapp.docker_os),
BAIRE Anthony's avatar
BAIRE Anthony committed
553
                    webapp.docker_os.version)
554
        else:
BAIRE Anthony's avatar
BAIRE Anthony committed
555
            image = "%s:%s" % (webapp.image_name, version.number)
BAIRE Anthony's avatar
BAIRE Anthony committed
556
557

        log.debug("sandbox %r: using image %r", webapp.docker_name, image)
558
559
560
561
562
563
564
565
566
567
568
569

        # safety checks
        # (because docker_name is used it the paths of the external volumes
        if ("/" in webapp.docker_name) or (webapp.docker_name in ("", ".", "..")):
            raise Error("malformatted docker_name")

        uid = webapp.id + 2000
        if uid < 2000:
            # just for safety
            raise Error("bad webapp id")

        # remove stale container (if any)
570
571
        if self.inspect_sandbox(webapp) is not None:
            self._stop(webapp)
572

BAIRE Anthony's avatar
BAIRE Anthony committed
573
        container = webapp.sandbox_name
574
575
        try:
            # prepare the sandbox
576
577
578
579
            # (create ssh keys)
            ctrl.check_host_path("isdir", ctrl.toolbox_path)
            ctrl.check_host_path("isdir", ctrl.sandbox_path)

BAIRE Anthony's avatar
BAIRE Anthony committed
580
            ctrl.sandbox.create_container("busybox:latest", name=container,
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
                    command = ["/bin/sh", "-c", """
set -ex

export PATH="$PATH:/.toolbox/bin"

# clean sandbox dir
rm -rf {sbx}

# create dirs
for dir in {sbx} {etc} {run}
do
    mkdir -p            ${{dir}}
    chown {uid}:65534   ${{dir}}
    chmod 0700          ${{dir}}
done

# xauth file
touch               {run}/XAuthority
chown {uid}:65534   {run}/XAuthority
chmod 0600          {run}/XAuthority

# generate ssh keys
(for type in ecdsa ed25519 rsa
do
    key={etc}/ssh_host_${{type}}_key
    [ -f $key ] || ssh-keygen -N '' -f $key -t $type >&2
    
    echo -n '{hostname}. ' | cat - ${{key}}.pub
done) > {etc}/ssh_known_hosts

# known_host file for allgo-shell
ssh-keygen -H -f {etc}/ssh_known_hosts
chmod 0644       {etc}/ssh_known_hosts
rm -f            {etc}/ssh_known_hosts.old

# authentication key for allgo-shell
rm -f               {etc}/identity
ssh-keygen -N '' -f {etc}/identity
chown {uid}:65534   {etc}/identity

# forced shell for the sshd config 
cat > {etc}/shell <<EOF
#!/bin/sh

export PATH="\$PATH:/.toolbox/bin"

uid=\`id -u\`
shell="\`getent passwd \$uid 2>/dev/null| cut -d : -f 7\`"
if [ -z "\$shell" ] ; then
    shell=/bin/sh
fi

if [ -n "\$SSH_ORIGINAL_COMMAND" ] ; then
    exec "\$shell" -c "\$SSH_ORIGINAL_COMMAND"
else
    exec "\$shell"
fi

EOF
chmod 755 {etc}/shell

# sshd config
cat > {etc}/sshd_config <<EOF
Port 22
Protocol 2

# turned off because it requires creating a 'sshd' user inside the sandbox
UsePrivilegeSeparation no

StrictModes no

ForceCommand /.sandbox/etc/ssh/shell

PermitRootLogin without-password

PubkeyAuthentication yes
AuthorizedKeysFile  /.sandbox/etc/ssh/identity.pub .ssh/authorized_keys .ssh/authorized_keys2

ChallengeResponseAuthentication no
PasswordAuthentication          no

X11Forwarding yes
X11DisplayOffset 10
PrintMotd no
PrintLastLog no
TCPKeepAlive yes

# Allow client to pass locale environment variables
AcceptEnv LANG LC_*

Subsystem sftp internal-sftp

UsePAM no
EOF
                    """.format(uid=uid,
                        hostname = "%s-sandbox-%s" % (ctrl.env, webapp.docker_name),
                        sbx = "/mnt/%s"         % webapp.docker_name,
                        etc = "/mnt/%s/etc/ssh" % webapp.docker_name,
                        run = "/mnt/%s/run"     % webapp.docker_name,
                        )],
681
                    host_config = ctrl.sandbox.create_host_config(
682
683
684
685
                        binds   = {
                            ctrl.sandbox_path: {"bind": "/mnt"},
                            ctrl.toolbox_path: {"bind": "/.toolbox", "mode": "ro"},
                    }))
686
687
            ctrl.sandbox.start(container)
            if ctrl.sandbox.wait(container):
688
689
                log.debug("sandbox %s output:\n%s", webapp.docker_name,
                        ctrl.sandbox.logs(container).decode(errors="replace"))
690
691
692
693
                raise Error("sandbox preparation failed")
            ctrl.sandbox.remove_container(container)

            # create and start the sandbox
694
695
696
697
698

            etc_dir = os.path.join(ctrl.sandbox_path, webapp.docker_name, "etc")
            run_dir = os.path.join(ctrl.sandbox_path, webapp.docker_name, "run")
            ctrl.check_host_path("isdir", etc_dir)
            ctrl.check_host_path("isdir", run_dir)
699
700
701
702

            if version is None and webapp.entrypoint:
                # prepend instructions to initialise a dummy entrypoint
                dn, bn = os.path.split(webapp.entrypoint)
BAIRE Anthony's avatar
todo    
BAIRE Anthony committed
703
                # FIXME: do nothing if entrypoint already exists
704
                prepare = """
705
                    {mkdir}
706
                    test -f {entrypoint} || cat > {entrypoint} <<EOF
707
708
709
710
711
712
713
#!/bin/sh
echo
echo "This is app '{name}' called with parameters '\$@'"
echo
echo "The workdir contains:"
ls -l
EOF
714
                    chmod 0755 -- {entrypoint}
715

716
                """.format( entrypoint  = shlex.quote(webapp.entrypoint),
717
                            name        = webapp.docker_name,
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
                            mkdir       = (("mkdir -p -- %s" % shlex.quote(dn)) if dn else ""))
            else:
                prepare = ""

            command = ["/bin/sh", "-c", """
set -x
export PATH="$PATH:/.toolbox/bin"

{prepare}

# xauth file (needed for X11 forwarding)
touch       /root/.Xauthority
chmod 600   /root/.Xauthority

exec /.toolbox/bin/sshd -D
            """.format(prepare=prepare)]
734

735
            ctrl.sandbox.create_container(image, name=container, hostname=container,
736
                    command = command,
737
738
                    host_config = ctrl.sandbox.create_host_config(
                        binds = {
739
740
741
                            etc_dir: {"bind": "/.sandbox/etc", "mode": "ro"},
                            run_dir: {"bind": "/.sandbox/run", "mode": "rw"},
                            ctrl.toolbox_path: {"bind": "/.toolbox", "mode": "ro"},
742
743
744
                            },
                        # TODO: maybe drop other caps
                        cap_drop = ["NET_RAW"],
745
                        restart_policy = {"Name": "unless-stopped"},
746
                        network_mode = ctrl.sandbox_network,
747
748
749
750
751
                        ))

            ctrl.sandbox.start(container)

        except:
BAIRE Anthony's avatar
BAIRE Anthony committed
752
753
            with docker_warning("cleanup error: unable to remove container %r",
                    container, ignore=docker.errors.NotFound):
754
755
756
                ctrl.sandbox.remove_container(container, force=True)
            raise

BAIRE Anthony's avatar
BAIRE Anthony committed
757
    def _commit(self, webapp, versions):
758
759
760
761
762
763
764
765
766
767
768
        """Commit a webapp sandbox

        (to be executed in a thread pool)

        The image version is looked up in webapp_versions (where state==sandbox).
        
        In case of any error, a recovery version is committed instead (to avoid
        loosing the work done inside the sandbox) and the candidates are put in
        error state.
        """
        ctrl = self.ctrl
BAIRE Anthony's avatar
BAIRE Anthony committed
769
        ses  = ctrl.session
770

BAIRE Anthony's avatar
BAIRE Anthony committed
771
772
773
774
775
776
        # pre-commit checks
        # - ensure that there is exactly one candidate webapp_version with
        #   state=sandbox
        # - otherwise:
        #   - put all candidates in error state
        #   - create a recovery version
777

BAIRE Anthony's avatar
BAIRE Anthony committed
778
779
780
781
782
        # version be committed
        version = None
        
        # error msg (if any)
        error = None
783

BAIRE Anthony's avatar
BAIRE Anthony committed
784
785
        # version ids to be recovered
        recover = ()
786

BAIRE Anthony's avatar
BAIRE Anthony committed
787
788
789
790
791
792
        if len(versions) == 1:
            # normal case (sandbox commit)
            version = versions[0]
            if not version.number:
                error   = "empty version number"
                recover = version.id,
793

BAIRE Anthony's avatar
BAIRE Anthony committed
794
795
796
        elif not versions:
            # sandbox rollback (when user drops a sandbox without committing a new image)
            error     = "dangling sandbox"
797

BAIRE Anthony's avatar
BAIRE Anthony committed
798
799
800
801
802
        else:
            # multiple candidates (should never happen)
            error    = "multiple candidate versions (%s)" % (
                ", ".join(map(repr, sorted(v.number for v in versions))))
            recover = tuple(v.id for v in versions)
803

BAIRE Anthony's avatar
BAIRE Anthony committed
804
805
        # TODO: make 'sandbox' a reserved name

BAIRE Anthony's avatar
BAIRE Anthony committed
806
807
808
        if error:
            changelog = "pre-commit error: " + error
            log.error("sandbox %r version id %r: %s", webapp.docker_name, recover, changelog)
809

BAIRE Anthony's avatar
BAIRE Anthony committed
810
            with ses.begin():
811
812
                # put all candidates in 'error state'
                if recover:
BAIRE Anthony's avatar
BAIRE Anthony committed
813
                    ses.execute('''UPDATE dj_webapp_versions
BAIRE Anthony's avatar
fix sql    
BAIRE Anthony committed
814
815
                            SET changelog=CONCAT(changelog, " [", :changelog, "]"), state=:state
                            WHERE id IN :ids''', dict(changelog=changelog, ids=recover,
BAIRE Anthony's avatar
BAIRE Anthony committed
816
                                state=int(VersionState.ERROR)))
817
818

                # create a recovery version
BAIRE Anthony's avatar
BAIRE Anthony committed
819
820
821
822
823
                version = WebappVersion(
                        webapp_id = webapp.id,
                        number    = time.strftime("recovery-%Y%m%d-%H%M%S"),
                        changelog = changelog,
                        published = False,
BAIRE Anthony's avatar
BAIRE Anthony committed
824
                        state     = int(VersionState.SANDBOX))
BAIRE Anthony's avatar
BAIRE Anthony committed
825
826
827
                ses.add(version)
            ses.refresh(version)
            ses.expunge(version)
828

BAIRE Anthony's avatar
BAIRE Anthony committed
829
        assert version is not None
830

BAIRE Anthony's avatar
BAIRE Anthony committed
831
        # commit the docker image
832

BAIRE Anthony's avatar
BAIRE Anthony committed
833
834
        log.debug("dicts %r %r", webapp.__dict__, version.__dict__)
        log.info("commit sandbox %r version %r", webapp.docker_name, version.number)
835

BAIRE Anthony's avatar
BAIRE Anthony committed
836
837
838
839
        container = webapp.sandbox_name
        next_state = image_size = None
        try:
            # stop the container (if stopping or if creating a new sandbox)
BAIRE Anthony's avatar
BAIRE Anthony committed
840
            if webapp.sandbox_state in (SandboxState.STOPPING, SandboxState.STARTING):
BAIRE Anthony's avatar
BAIRE Anthony committed
841
842
               ctrl.sandbox.stop(container)
               ctrl.sandbox.wait(container)
843

BAIRE Anthony's avatar
BAIRE Anthony committed
844
845
            # commit
            cid = ctrl.sandbox.commit(container, webapp.image_name, version.number)
BAIRE Anthony's avatar
BAIRE Anthony committed
846
            next_state = VersionState.COMMITTED
BAIRE Anthony's avatar
BAIRE Anthony committed
847
848
849
850
851
852
853
            image_size = ctrl.sandbox.inspect_image(cid)["Size"]

            return version, error

        except docker.errors.NotFound:
            error = "commit error: container not found %r" % container
            log.error("%s", error)
BAIRE Anthony's avatar
BAIRE Anthony committed
854
            next_state = VersionState.ERROR
BAIRE Anthony's avatar
BAIRE Anthony committed
855
            image_size = 0
BAIRE Anthony's avatar
BAIRE Anthony committed
856
            ses.execute('''UPDATE dj_webapp_versions
BAIRE Anthony's avatar
BAIRE Anthony committed
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
                    SET changelog=CONCAT(changelog, " [commit error: sandbox is down]")
                    WHERE id=%d''' % version.id)

            # here we do not propagate the error to allow starting/stopping the
            # sandbox immediately (without going through sandbox_state=:error)
            return None, error

        except Exception as e:
            log.exception("sandbox %r version %r: unexpected commit error (sandbox may still be recovered)",
                    webapp.docker_name, version.number)
            raise

        finally:
            # NOTE: if anything unexpected happens, the version is
            # left in state 'sandbox' and we propagate the exception to
            # ensure the work done inside the sandbox is not dropped
            # and the sandbox is pute in 'error' state
            #
            # The error will not be reported to the user. If this is an issue,
            # then the solution would be to create another error state (to be
            # used when the sandbox is still there).
            # 
            if next_state is not None:
                with ses.begin():
BAIRE Anthony's avatar
BAIRE Anthony committed
881
                    ses.execute("UPDATE dj_webapp_versions SET state=%d, docker_image_size=%d WHERE id=%d"
BAIRE Anthony's avatar
BAIRE Anthony committed
882
                            % (next_state, image_size, version.id))
BAIRE Anthony's avatar
BAIRE Anthony committed
883

884

BAIRE Anthony's avatar
BAIRE Anthony committed
885
    @asyncio.coroutine
BAIRE Anthony's avatar
BAIRE Anthony committed
886
    def _manage_commit(self, webapp, versions, *, force=False):
BAIRE Anthony's avatar
BAIRE Anthony committed
887
888
889
890
891
892
893
894
        """Manage sandbox commit (if needed) and notify the image manager

        The commit is performed if one of these conditions is fulfilled:
        - a commit was requested (there is at least one WebappVersion entry
          with state=sandbox for this app)
        - a docker container exists (for this sandbox) and force is true
        """

BAIRE Anthony's avatar
BAIRE Anthony committed
895
        if not (versions or (force and self.inspect_sandbox(webapp) is not None)):
BAIRE Anthony's avatar
BAIRE Anthony committed
896
897
898
            return
        
        # do the commit
BAIRE Anthony's avatar
BAIRE Anthony committed
899
        version, error = yield from self.run_in_executor(self._commit, webapp, versions)
BAIRE Anthony's avatar
BAIRE Anthony committed
900
901

        # trigger push/pull operations (but do not wait)
BAIRE Anthony's avatar
BAIRE Anthony committed
902
        if version is not None:
903
904

            # push to the registry
BAIRE Anthony's avatar
BAIRE Anthony committed
905
            self.ctrl.image_manager.push(version.id)
906
907
908
909

            if not error:
                # preemptive pull to the swarm
                # (the image may be needed soon)
BAIRE Anthony's avatar
BAIRE Anthony committed
910
                self.ctrl.image_manager.pull(version.id, swarm=True)
911
912


BAIRE Anthony's avatar
BAIRE Anthony committed
913
914
    def _stop(self, webapp):
        """Stop a webapp sandbox
915

BAIRE Anthony's avatar
BAIRE Anthony committed
916
        (to be executed in a thread pool)
917
918
919
        """
        try:
            # FIXME: remove volumes (v=True) too ?
BAIRE Anthony's avatar
BAIRE Anthony committed
920
            self.ctrl.sandbox.remove_container(webapp.sandbox_name, force=True)
921
922
923
        except docker.errors.NotFound:
            pass

924
925

    @asyncio.coroutine
BAIRE Anthony's avatar
BAIRE Anthony committed
926
    def _process(self, webapp_id, reset, rescheduled):
927
928
929
        ctrl = self.ctrl

        log.debug("process sandbox %d", webapp_id)
BAIRE Anthony's avatar
BAIRE Anthony committed
930
931
932
933
934
935
936
937

        ses = ctrl.session
        with ses.begin():
            # current state of the sandbox + load docker os
            webapp = ses.query(Webapp).filter_by(id=webapp_id).one()
            webapp.docker_os

            # version to be started
938
            sandbox_version = webapp.sandbox_version
BAIRE Anthony's avatar
BAIRE Anthony committed
939
940
941
942
943
944
945
946
947
948

            # requested commits
            commit_versions = self.filter_commit_version(ses.query(WebappVersion), webapp_id).all()

            ses.expunge_all()

        # docker name of the sandbox & image
        webapp.sandbox_name = ctrl.gen_sandbox_name(webapp)
        webapp.image_name   = ctrl.gen_image_name(webapp)

BAIRE Anthony's avatar
BAIRE Anthony committed
949
        phase = "inspect"
950
        next_state = fail_state = None
951
        try:
BAIRE Anthony's avatar
BAIRE Anthony committed
952
            if webapp.sandbox_state == SandboxState.STARTING:
953
                # start the sandbox
BAIRE Anthony's avatar
BAIRE Anthony committed
954
                phase = "start"
BAIRE Anthony's avatar
BAIRE Anthony committed
955
956
                next_state = SandboxState.RUNNING
                fail_state = SandboxState.START_ERROR
957

BAIRE Anthony's avatar
BAIRE Anthony committed
958
                # commit (if a sandbox exists)
BAIRE Anthony's avatar
BAIRE Anthony committed
959
                yield from self._manage_commit(webapp, commit_versions, force=True)
BAIRE Anthony's avatar
BAIRE Anthony committed
960

BAIRE Anthony's avatar
BAIRE Anthony committed
961
                if sandbox_version is not None:
962
963
964
965
966
967
                    # ensure version belongs to this application
                    if sandbox_version.webapp_id != webapp.id:
                        raise Error("invalid version id %d (belongs to webapp %d)" % (
                            sandbox_version.id, sandbox_version.webapp_id))

                    # pull requested image
BAIRE Anthony's avatar
BAIRE Anthony committed
968
                    yield from ctrl.image_manager.pull(sandbox_version.id)
969
970
971
972
973
                else:
                    # pull image
                    yield from ctrl.image_manager.sandbox_pull_manager.process((
                        ctrl.gen_factory_name(webapp.docker_os),
                        webapp.docker_os.version))
BAIRE Anthony's avatar
BAIRE Anthony committed
974
975

                # start sandbox
BAIRE Anthony's avatar
BAIRE Anthony committed
976
                yield from self.run_in_executor(self._start, webapp, sandbox_version)
977

BAIRE Anthony's avatar
BAIRE Anthony committed
978
            elif webapp.sandbox_state == SandboxState.STOPPING:
979
                # stop the sandbox
BAIRE Anthony's avatar
BAIRE Anthony committed
980
                phase = "stop"
BAIRE Anthony's avatar
BAIRE Anthony committed
981
982
                next_state = SandboxState.IDLE
                fail_state = SandboxState.STOP_ERROR
983

BAIRE Anthony's avatar
BAIRE Anthony committed
984
                # commit (if requested)
BAIRE Anthony's avatar
BAIRE Anthony committed
985
                yield from self._manage_commit(webapp, commit_versions)
986

BAIRE Anthony's avatar
BAIRE Anthony committed
987
                yield from self.run_in_executor(self._stop, webapp)
988
989

            else:
BAIRE Anthony's avatar
BAIRE Anthony committed
990
991
                # commit (if requested)
                phase = "commit"
BAIRE Anthony's avatar
BAIRE Anthony committed
992
                yield from self._manage_commit(webapp, commit_versions)
BAIRE Anthony's avatar
BAIRE Anthony committed
993

994
995
996
997
        except ShuttingDown:
            next_state = None
            log.info("sandbox %r %s aborted (controller shutdown)", webapp.docker_name, phase)

998
999
1000
1001
        except BaseException as e:
            next_state = fail_state

            log_func = log.error if isinstance(e, (docker.errors.APIError, Error)) else log.exception
BAIRE Anthony's avatar
BAIRE Anthony committed
1002
            log_func ("sandbox %r %s error (%s)", webapp.docker_name, phase,
1003
1004
1005
1006
1007
1008
1009
1010
                    traceback.format_exception_only(type(e), e)[-1].strip())

        finally:
            if next_state is not None:
                # atomically update the sandbox state in the db
                # (in case another action is requested during the process, eg: the user
                #  stops the sandbox while it is not fully started)
                log.info("sandbox %r is now in state %r", webapp.docker_name, next_state.name)
BAIRE Anthony's avatar
BAIRE Anthony committed
1011
                with ses.begin():
BAIRE Anthony's avatar
BAIRE Anthony committed
1012
                    ses.execute("UPDATE dj_webapps SET sandbox_state=%d WHERE id=%d AND sandbox_state=%d" %
BAIRE Anthony's avatar
BAIRE Anthony committed
1013
                            (next_state, webapp_id, webapp.sandbox_state))
1014

1015
1016
                yield from self.ctrl.notif_webapp_updated(webapp_id)

1017
1018
1019
            log.debug("done    sandbox %d", webapp_id)


BAIRE Anthony's avatar
BAIRE Anthony committed
1020
class JobManager(Manager):
1021
    class JobInfo:
BAIRE Anthony's avatar
BAIRE Anthony committed
1022
        __slots__ = "job_id", "ver_id", "ctr_id", "version", "ctr_name", "client", "cpu", "mem", "node_id", "timeout"
1023

1024
    def __init__(self, ctrl):
1025
        super().__init__(0)
BAIRE Anthony's avatar
BAIRE Anthony committed
1026
1027
        self.ctrl = ctrl

1028
1029
1030
1031
1032
1033

    @asyncio.coroutine
    def __iter__(self):
        raise NotImplementedError()


1034
1035
1036
1037
    # create the job and:
    # - return True  if started
    # - return False if cancelled by the user
    # - raise exception on error
1038
    def _create_job(self, info):
BAIRE Anthony's avatar
BAIRE Anthony committed
1039
1040
        ctrl = self.ctrl
        ses  = ctrl.session
1041
1042
1043
        tmp_img = None

        assert info.ctr_id is None
BAIRE Anthony's avatar
BAIRE Anthony committed
1044
1045
1046

        try:
            with ses.begin():
1047
1048
1049
1050
1051
1052
1053
1054
                # atomically switch the job state from WAITING to STARTING
                # (because the UI may destroy the job before it is started)
                if ses.execute("UPDATE dj_jobs SET state=%d WHERE id=%d AND state=%d"
                        % (JobState.RUNNING, info.job_id, JobState.WAITING)
                        ).rowcount == 0:
                    log.info("job %d not started (destroyed by user)", info.job_id)
                    return False

1055
                job = ses.query(Job).filter_by(id=info.job_id).one()
BAIRE Anthony's avatar
BAIRE Anthony committed
1056
1057
1058
                webapp = job.webapp

                log.info("start job %d (%s:%s)",
1059
                        info.job_id, webapp.docker_name, info.version)
BAIRE Anthony's avatar
BAIRE Anthony committed
1060
1061
            
            repo = ctrl.gen_image_name(webapp)
1062
            image = "%s:%s" % (repo, info.version)
BAIRE Anthony's avatar
BAIRE Anthony committed
1063
1064
1065
1066

            job_path = ctrl.gen_job_path(job)
            log.debug("job.path: %r", job_path)

1067
1068
1069
            if info.ver_id is None:
                assert info.version == "sandbox"
                image = tmp_img = info.client.commit(ctrl.gen_sandbox_name(webapp), repo, info.version)["Id"]
BAIRE Anthony's avatar
BAIRE Anthony committed
1070
1071
1072
1073
1074
            
            # TODO use another workdir
            # TODO use another uid

            ctrl.check_host_path("isdir", job_path)
1075
            hc = ctrl.sandbox.create_host_config(
1076
                        binds = {job_path: {"bind": "/tmp"}},
1077
1078
1079
                        cap_drop = ["all"],
                        # FIXME: CAP_DAC_OVERRIDE needed because all nfs files have uid,gid=1000,1000
                        cap_add = ["dac_override"],
1080
1081
1082
1083
                        cpu_quota   = (None if info.cpu is None else (info.cpu * 1024)),
                        cpu_period  = (None if info.cpu is None else 1024),
#                        cpu_shares = info.cpu,
                        mem_limit = info.mem,
1084
                    )
1085
1086
1087
1088
            # NOTE: cpu_shares has a different meaining in docker swarm and docker engine
            #  - swarm:  nb of cpus
            #  - engine: 1/1024 share of the total cpu resouces of the machine
            # engine requires  cpu_share>1
1089
            if ctrl.cpu_shares:
BAIRE Anthony's avatar
BAIRE Anthony committed
1090
                # TODO: upgrade docker-py (and use create_host_config)
1091
                hc["CpuShares"] = info.cpu
1092
            log.debug("host_config %r", hc)
1093
            info.ctr_id = info.client.create_container(image, name=info.ctr_name,
1094
                    working_dir = "/tmp",
1095
                    # NOTE: the command line is a little complex, but this is
BAIRE Anthony's avatar
todos    
BAIRE Anthony committed
1096
                    #   to ensure that (TODO write tests for this):
1097
1098
1099
                    #   - no output is lost (we go though a pipe in case the
                    #     app has multiple processes writing to stdout/stderr
                    #     concurrently)
1100
                    #     FIXME: maybe ">>allgo.log  2>&1" is sufficent
1101
1102
                    #   - we get the exit code of the app (not the exit code of
                    #     cat)
BAIRE Anthony's avatar
BAIRE Anthony committed
1103
1104
                    #   - SIGTERM & SIGALRM are forwarded to the process (and
                    #     we call wait again becauce of EINTR)
1105
1106
1107
1108
                    #   - we have no unusual dependencies (only sh, cat, trap,
                    #     kill and mkfifo)
                    #   - we display a warning if the memory limit was reached
                    #     during the job
1109
                    command = ["/bin/sh", "-c", """
1110
1111
1112
1113
                                pid=
                                interrupted=
                                sighnd() {{
                                    (echo
BAIRE Anthony's avatar
BAIRE Anthony committed
1114
                                    echo "====  ALLGO JOB $2  ===="
1115
                                    kill "-$1" "$pid") 2>&1 | tee -a allgo.log
BAIRE Anthony's avatar
BAIRE Anthony committed
1116
                                    trap '' TERM ALRM
1117
1118
                                    interrupted=1
                                }}
BAIRE Anthony's avatar
BAIRE Anthony committed
1119
1120
                                trap "sighnd TERM ABORT"   TERM
                                trap "sighnd ALRM TIMEOUT" ALRM
1121
                                fifo=/.allgo.fifo.{job_id}
1122
                                mkfifo "$fifo" 2>&1 | tee -a allgo.log || exit $?
1123

1124
                                exec cat <"$fifo" | tee -a allgo.log &
1125
1126
1127
1128
1129
1130
                                exec "$@" >"$fifo" 2>&1 &
                                pid=$!

                                wait %2
                                code=$?
                                if [ -n "$interrupted" ] ; then
1131
                                    wait %2
1132
                                    code=$?
1133
                                fi
1134
                                wait %1
BAIRE Anthony's avatar
BAIRE Anthony committed
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
                                trap '' TERM ALRM

                                [ -n "$interrupted" ] || (
                                    echo
                                    if [ $code -eq 0 ] ; then
                                        echo "====  ALLGO JOB SUCCESS  ===="
                                    else
                                        echo "====  ALLGO JOB ERROR  ===="
                                        echo "process exited with code $code"
                                    fi

                                    failcnt="`cat /sys/fs/cgroup/memory/memory.failcnt`"
                                    if [ "$failcnt" -ne 0 ] ; then
BAIRE Anthony's avatar
BAIRE Anthony committed
1148
                                        echo "WARNING: memory limit was reached (memory.failcnt=$failcnt)"
BAIRE Anthony's avatar
BAIRE Anthony committed
1149
                                    fi
1150
                                ) | tee -a allgo.log 
1151
1152

                                exit $code
BAIRE Anthony's avatar
BAIRE Anthony committed
1153
                        """.format(job_id=job.id),
1154
                        "job%d" % job.id, webapp.entrypoint] + shlex.split(job.param),
BAIRE Anthony's avatar
BAIRE Anthony committed
1155

1156
                    labels = {"allgo.tmp_img": tmp_img or ""},
1157
                    environment=["constraint:node==" + info.node_id],
1158
                    host_config = hc)["Id"]
1159
            info.client.start(info.ctr_id)
1160
1161
1162
1163

            with ses.begin():
                # save the container_id into the db
                job.container_id = info.ctr_id
1164
            return True
1165

1166
        except:
BAIRE Anthony's avatar
todo    
BAIRE Anthony committed
1167
            #TODO introduce a state JobState.ERROR
1168
1169
            self._remove_job(info, tmp_img=tmp_img)
            raise
BAIRE Anthony's avatar
BAIRE Anthony committed
1170
1171


1172
1173
    def _remove_job(self, info, *, tmp_img=None):
        ses = self.ctrl.session
BAIRE Anthony's avatar
BAIRE Anthony committed
1174

1175
1176
1177
        # TODO: report launch errors to the user
        # TODO: report exit code to the user
        # TODO: use another uid
BAIRE Anthony's avatar
BAIRE Anthony committed
1178

1179
1180
1181
1182
1183
1184
1185
        def parse_docker_timestamp(value):
            return datetime.datetime.strptime(
                    # limit the precision to the microsecond
                    # (othewise strptime fails)
                    re.sub(r"(\.\d{,6})\d*Z$", r"\1Z", value),
                    # iso8601 format
                    "%Y-%m-%dT%H:%M:%S.%fZ")
BAIRE Anthony's avatar
BAIRE Anthony committed
1186

1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
        with ses.begin():
            job = ses.query(Job).filter_by(id=info.job_id).one()

            exec_time = 0.0
            if info.ctr_id is not None:
                try:
                    js = info.client.inspect_container(info.ctr_id)
                except docker.errors.NotFound:
                    pass
                else:
                    started_at  = js["State"].get("StartedAt", "0001-")
                    finished_at = js["State"].get("FinishedAt")
BAIRE Anthony's avatar
BAIRE Anthony committed
1199

1200
1201
1202
1203
1204
1205
                    # default docker date is '0001-01-01T00:00:00Z'
                    if not started_at.startswith("0001-"):
                        try:
                            exec_time =( parse_docker_timestamp(finished_at)
                                        - parse_docker_timestamp(started_at)
                                        ).total_seconds()
BAIRE Anthony's avatar
BAIRE Anthony committed
1206
1207
                        except Exception: # pragma: nocover
                            log.exception("job %d: unable to compute exec time", info.job_id)
BAIRE Anthony's avatar
BAIRE Anthony committed
1208

1209
                    if tmp_img is None:
BAIRE Anthony's avatar
BAIRE Anthony committed
1210
                        tmp_img = js["Config"]["Labels"].get("allgo.tmp_img") or None
BAIRE Anthony's avatar
BAIRE Anthony committed
1211

1212
1213
                with docker_warning("job %d: cleanup error: unable to remove container", info.job_id):
                    info.client.remove_container(info.ctr_id)
BAIRE Anthony's avatar
BAIRE Anthony committed
1214
1215

            if tmp_img is not None:
1216
1217
1218
1219
1220
                with docker_warning("job %d: cleanup error: unable to remove tmp image", info.job_id):
                    info.client.remove_image(tmp_img)

            job.exec_time = exec_time
            job.state     = int(JobState.DONE)
1221
            job.container_id = None
BAIRE Anthony's avatar
BAIRE Anthony committed
1222
1223
1224
1225
            if job.result == JobResult.NONE:
                # FIXME: maybe we should have a 'unknown' result
                log.warning("job %d has not result, will fallback to 'ERROR'", info.job_id)
                job.result = int(JobResult.ERROR)
1226

BAIRE Anthony's avatar
BAIRE Anthony committed
1227
        log.info("done  job %d (result=%s, duration=%fs)", info.job_id, JobResult(job.result).name, exec_time)