controller.py 79 KB
Newer Older
1 2 3 4
#!/usr/bin/python3


import asyncio
5
import collections
6
from   concurrent.futures   import ThreadPoolExecutor
7
import contextlib
8
import datetime
9
import enum
BAIRE Anthony's avatar
BAIRE Anthony committed
10
import itertools
BAIRE Anthony's avatar
BAIRE Anthony committed
11
import json
12 13 14
import logging
import re
import os
15
import shlex
16
import signal
17
import struct
18
import sys
19 20 21
import time
import threading
import traceback
22

23
import aiohttp
24
import aioredis
25
import docker
BAIRE Anthony's avatar
BAIRE Anthony committed
26
import iso8601
27
import MySQLdb
BAIRE Anthony's avatar
BAIRE Anthony committed
28 29
from   sqlalchemy import desc
import sqlalchemy.orm.scoping
30
import yaml
31

32
import config_reader
33
from database import *
34
from shared_swarm import SharedSwarmClient, ShuttingDown
35

36 37
HOST_PATH="/vol/host/"

38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
##################################################
# default number of concurrent tasks

# pushing images from the sandbox to the registry
NB_PUSH_SANDBOX = 2

# pulling images from the registry to the sandbox
NB_PULL_SANDBOX = 2

# pulling images from the registry to the swarm
NB_PULL_SWARM   = 2

# sandbox actions (start, stop, commit, ...)
NB_SANDBOX_TASKS = 4

# jobs
NB_JOB_TASKS     = 4


# default thread pool (used by most of the tasks)
default_executor = ThreadPoolExecutor(10)

##################################################
61 62 63 64 65
# redis keys

# job log
REDIS_KEY_JOB_LOG       = "log:job:%d"
REDIS_KEY_JOB_STATE     = "state:job:%d"
66
REDIS_KEY_JOB_RESULT    = "result:job:%d"
67 68 69 70 71 72 73 74 75 76 77 78 79

# pubsub channels for waking up allgo.aio (frontend) and the controller
# (ourselves)
REDIS_CHANNEL_AIO        = "notify:aio"
REDIS_CHANNEL_CONTROLLER = "notify:controller"

# pubsub messages
REDIS_MESSAGE_JOB_UPDATED    = "job:%d"
REDIS_MESSAGE_WEBAPP_UPDATED = "webapp:%d"


##################################################

80

81 82 83
# interval (in seconds) for polling the db
DB_CHECK_PERIOD = 3600

BAIRE Anthony's avatar
BAIRE Anthony committed
84
log = logging.getLogger("controller")
85

86 87 88 89 90
assert MySQLdb.threadsafety >= 1


class Error(Exception):
    pass
91

92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121

class RateLimiter:
    """Generator for rate limiting

    This asynchronous iterator ensures we spend at least `period` seconds in
    each iteration.

    usage:
        async for _ in RateLimiter(60):
            ...
    """
    def __init__(self, period):
        assert period > 0
        self.period = period
        self._t0 = time.monotonic()-period

    def __aiter__(self):
        return self

    async def __anext__(self):
        t1 = time.monotonic()
        delay = self._t0 - t1 + self.period
        if delay > 0:
            log.debug("rate_limit: sleep %f seconds", delay)
            await asyncio.sleep(delay)
            self._t0 = t1 + delay
        else:
            self._t0 = t1


BAIRE Anthony's avatar
BAIRE Anthony committed
122 123 124 125 126 127 128 129 130 131 132 133 134
def docker_check_error(func, *k, **kw):
    """Wrapper for docker-py methods that produce a stream

    Methods producing a stram (eg: push, build) do not report all errors
    by raising exceptions. Some errors are reported later in the stream.

    This function parses the stream and raise Error() if needed.
    """
    for elem in func(*k, stream=True, **kw):
        js = json.loads(elem.decode())
        if "error" in js:
            raise Error("push error: " + js["error"])

BAIRE Anthony's avatar
BAIRE Anthony committed
135 136 137 138 139 140 141 142 143
@contextlib.contextmanager
def docker_warning(msg, *k, ignore=None):
    """Catch docker errors and issue a warning instead"""
    try:
        yield
    except docker.errors.APIError as e:
        if ignore is None or not isinstance(e, ignore):
            k += e,
            log.warning(msg + " (%s)", *k)
BAIRE Anthony's avatar
BAIRE Anthony committed
144

BAIRE Anthony's avatar
BAIRE Anthony committed
145 146
@contextlib.contextmanager
def report_error(fmt, *k):
BAIRE Anthony's avatar
doc  
BAIRE Anthony committed
147 148 149 150 151 152 153
    """Context manager for logging exceptions

    This function logs exceptions (when leaving the context) with log.error()
    (if the exception inherit from Error) or log.exception() otherwise.

    The log message is prepended with the string generated by: fmt % k
    """
BAIRE Anthony's avatar
BAIRE Anthony committed
154 155 156 157 158 159 160 161
    try:
        yield
    except Exception as e:
        msg = fmt % k
        log_func = log.error if isinstance(e, Error) else log.exception
        log_func("%s (%s)", msg,
                traceback.format_exception_only(type(e), e)[-1].strip())
        raise
162

BAIRE Anthony's avatar
BAIRE Anthony committed
163 164 165 166 167 168 169 170 171 172 173 174 175
def disable_future_warning(fut):
    """Add a dummy callback to a future to prevent asyncio warnings

    Return: the future

    The asynicio module log a warning message when the result of a future is
    not used. This function installs a dummy callback to the future so as to
    avoid this warning. This is useful for futures whose result *may* be
    ignored by the application.
    """
    fut.add_done_callback(lambda f: f.exception())
    return fut

176

177 178 179 180 181 182 183 184 185
def auto_create_task(func):
    """Decorator for forcing the creation of a task when a coroutine function is called

    Return a wrappers that calls the function, create the task on the fly and
    return it. Also it installs a no-op callback to avoid warnings in case the
    result is not used.
    """
    assert asyncio.iscoroutinefunction(func)

BAIRE Anthony's avatar
BAIRE Anthony committed
186 187
    return (lambda *k, **kw:
        disable_future_warning(asyncio.async(func(*k, **kw))))
BAIRE Anthony's avatar
BAIRE Anthony committed
188

189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206
def cascade_future(src, dst):
    """propagate the result of a future to another future
    
    This function installs a callback to the future `src`, that propagates its
    result to the future `dst`.
    """
    def callback(fut):
        ex = fut.exception()
        if ex is None:
            dst.set_result(fut.result())
        else:
            dst.set_exception(ex)

    if src.done():
        callback(src)
    else:
        src.add_done_callback(callback)

207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229
async def _make_aiohttp_client(host):
    """Create a aiohttp session for connecting to a docker engine

    `host` is a docker host url (DOCKER_HOST) it may be either a `tcp://`
    or `unix://` url

    Return a tuple `(session, url)` where `session` is a aiohttp client
    session and `url` is the http(s) url of the docker engine
    """
    url = docker.utils.parse_host(host)
    mo = re.match(r"http\+unix:/+(/.*)", url)
    if mo is None:
        # HTTP over TCP
        connector = None
    else:
        # HTTP over unix socket
        url = "http://DUMMY.localhost"
        path = mo.group(1)
        connector = aiohttp.UnixConnector(path)
    session = aiohttp.ClientSession(connector=connector)
    return session, url
def make_aiohttp_client(host):
    return asyncio.get_event_loop().run_until_complete(_make_aiohttp_client(host))
BAIRE Anthony's avatar
todo  
BAIRE Anthony committed
230

231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250
class DockerAuthConfigDict(dict):
    """dict of authentication credentials for docker.Client

    Keys: repository names (or possibly subrepositories)
    Values: auth config dicts (suitable for docker.Client.{pull,push})
    """

    def get_auth_config(self, image):
        """Get the suitable authentication credential for a given docker_image

        Return a 'auth_config' dict or None if no credentials are found
        """
        for registry, auth_config in self.items():
            if image.startswith(registry):
                l = len(registry)
                if image[l:l+1] in ("", "/", ":"):
                    log.debug("select auth_config %r for image %r", registry, image)
                    return auth_config
        log.debug("select no auth_config for image %r", image)

251
class Manager:
BAIRE Anthony's avatar
BAIRE Anthony committed
252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317
    """A class for scheduling asynchronous jobs on a collection of keys

    (the one-line summary is not very helpful, sorry)
    

    Job submission

    The job is scheduled on key KEY by calling .process(KEY), the function
    returns an asyncio.Future object (which will receive the result)

    If .process(KEY) is called while a job is already running for KEY, then the
    manager arranges the job to be run a second time in a row for the same key


    Job implementation

    This is an abstract class. The job shall be implemented as the coroutine
    named ._process() in the inherited class.

    The manager guarantees that ._process() cannot be called multiple times
    concurrently for the same key

    In case a shutdown is requested (see .shutdown()):
     - all jobs that are not yet started are cancelled
     - running jobs continue until they return or until they try to acquire the
       internal semaphore (which raises ShuttingDown())


    Concurrency

    The job tasks are all started immediately (whatever the number of existing
    jobs)

    It is possible to limit the concurrency to a maximum number of parallel
    tasks. Manager provides an internal semaphore (the number of tokens is set
    in .__init__())

    The semaphore can be locked in two ways:
     - by locking the manager:

            with (yield from self): ... concurrency limited to `nb_tokens`
            tasks ...

     - by calling .run_in_executor (this is for running non-async code):

            yield from self.run_in_executor(func, args...)

    Shutdown

    When .shutdown() is called the Manager ensures that all jobs are properly
    terminated
     - it cancels all jobs that are not yet started
     - it prevents starting new jobs
     - it lets running pending jobs, but interrupts them if they when ther try
       to acquire the internal semaphore

    All cancel/interrupted tasks have their future raise ShuttingDown().

    .shutdown() returns after all pending jobs are terminated.


    Thread safety

    - Manager is *not* thread safe, all public methods must be called from the
      same thread
    """
318

319
    class _Handle:
BAIRE Anthony's avatar
BAIRE Anthony committed
320
        __slots__ = "key", "cur", "nxt", "rescheduled"
321

322
    def __init__(self, nb_tokens=1, *, executor = default_executor, interruptible = False):
BAIRE Anthony's avatar
BAIRE Anthony committed
323
        # {key: _TaskHandler}
324
        self._handles = {}
325
        self._semaphore = asyncio.Semaphore(nb_tokens)
326
        self._shutdown = asyncio.Future()
327
        self._executor = executor
328
        self._interruptible = interruptible
329 330 331 332 333 334 335 336 337 338

    def _create_task(self, hnd):
        assert hnd.nxt is None
        def reset():
            assert not hnd.cur.done()
            nxt = hnd.nxt
            if nxt is not None:
                cascade_future(hnd.cur, nxt)
                hnd.nxt = None

BAIRE Anthony's avatar
BAIRE Anthony committed
339 340 341 342 343 344
            # recreate the 'rescheduled' future and return it
            hnd.rescheduled = disable_future_warning(asyncio.Future())
            return hnd.rescheduled

        hnd.rescheduled = disable_future_warning(asyncio.Future())
        hnd.cur = asyncio.async(self._process(hnd.key, reset, hnd.rescheduled))
345
        hnd.cur.add_done_callback(lambda fut: self._done(hnd))
BAIRE Anthony's avatar
BAIRE Anthony committed
346
        log.debug("task scheduled %r %r", self, hnd.key)
347
        return hnd.cur
348

BAIRE Anthony's avatar
BAIRE Anthony committed
349 350 351 352 353
    def process(self, key):
        """Schedule the job to be run on key `key`

        returns an asyncio.Future (that will provide the result of ._process())
        """
354
        if self._shutdown.done():
355 356
            return self._shutdown

BAIRE Anthony's avatar
BAIRE Anthony committed
357
        hnd = self._handles.get(key)
358 359
        if hnd is None:
            # create handle
BAIRE Anthony's avatar
BAIRE Anthony committed
360 361
            self._handles[key] = hnd = self._Handle()
            hnd.key = key
362 363
            hnd.cur = None
            hnd.nxt = None
BAIRE Anthony's avatar
BAIRE Anthony committed
364
            hnd.rescheduled = None
365 366 367 368 369 370 371 372

        if hnd.cur is None:
            # create task
            return self._create_task(hnd)
        else:
            # reschedule task
            if hnd.nxt is None:
                hnd.nxt = asyncio.Future()
BAIRE Anthony's avatar
BAIRE Anthony committed
373 374
            if not hnd.rescheduled.done():
                hnd.rescheduled.set_result(None)
375 376 377 378
            return hnd.nxt

    def _done(self, hnd):

BAIRE Anthony's avatar
BAIRE Anthony committed
379
        assert hnd is self._handles.get(hnd.key)
380 381 382 383
        assert hnd.cur.done()

        try:
            hnd.cur.result()
384 385
        except ShuttingDown:
            pass
386
        except Exception:
BAIRE Anthony's avatar
BAIRE Anthony committed
387
            log.exception("task %r %r unhandled exception", self, hnd.key)
388 389 390 391

        nxt     = hnd.nxt
        hnd.cur = hnd.nxt = None
        if nxt is None:
BAIRE Anthony's avatar
BAIRE Anthony committed
392
            del self._handles[hnd.key]
393
        else:
394
            cascade_future(self._create_task(hnd), nxt)
395

396

397 398
    @asyncio.coroutine
    def __iter__(self):
BAIRE Anthony's avatar
BAIRE Anthony committed
399 400 401 402 403 404 405 406 407 408
        """Coroutine for locking the internal semaphore
        
        Usage:
            with (yield from manager):
              ...

        Warning:
            after a shutdown is initiated, this function will always raise
            ShuttingDown()
        """
409
        ctx = yield from iter(self._semaphore)
410
        if self._shutdown.done():
411 412 413 414 415 416
            with ctx:
                raise ShuttingDown()
        return ctx


    @asyncio.coroutine
417
    def run_in_executor(self, *k, lock=True):
BAIRE Anthony's avatar
BAIRE Anthony committed
418 419 420 421 422
        """Run a function in a separate thread (with limited concurrency)

        This function locks the internal semaphore and runs the provided
        functionc call in a separate thread (using the executor)
        """
423 424 425 426 427 428 429 430

        def run():
            coro = asyncio.get_event_loop().run_in_executor(self._executor, *k)
            if self._interruptible:
                return next(asyncio.as_completed((coro, self._shutdown)))
            else:
                return coro
        
431 432
        if lock:
            with (yield from self):
433
                return (yield from run())
434
        else:
435
            return (yield from run())
436 437


BAIRE Anthony's avatar
BAIRE Anthony committed
438
    @asyncio.coroutine
BAIRE Anthony's avatar
BAIRE Anthony committed
439
    def _process(self, key, reset, rescheduled):
BAIRE Anthony's avatar
doc  
BAIRE Anthony committed
440 441 442 443 444 445 446
        """Actual implementation of the job (to be reimplemented in inherited classes)

        The Manager class guarantees that this function cannot be called
        multiple times concurrently on the same key (in case the same key is
        submitted multiple times, they Manager will call this function a second
        time after it has terminated).

BAIRE Anthony's avatar
BAIRE Anthony committed
447 448 449 450
        `rescheduled` is a future whose result is set when the job is being
        rescheduled (if process(key) is called before _process(key ...)
        terminates.

BAIRE Anthony's avatar
doc  
BAIRE Anthony committed
451 452
        `reset` is a function that may be called to reset the 'dirty' state of
        this key (this is to avoid calling ._process() a second time if not
BAIRE Anthony's avatar
BAIRE Anthony committed
453
        necessary) in that case, a new `rescheduled` future is returned.
BAIRE Anthony's avatar
doc  
BAIRE Anthony committed
454
        """
455 456
        raise NotImplementedError()

457
    @asyncio.coroutine
458
    def shutdown(self):
BAIRE Anthony's avatar
BAIRE Anthony committed
459 460 461 462
        """Initiate a graceful shutdown

        This coroutine terminates once all tasks are properly terminated.
        """
463 464
        exc = ShuttingDown()
        self._shutdown.set_exception(exc)
465
        self._shutdown.exception()  # to avoid asyncio warnings
466 467 468
        # cancel all 'next' tasks
        for hnd in self._handles.values():
            if hnd.nxt is not None:
469
                hnd.nxt.set_exception(exc)
470 471
                hnd.nxt = None

472 473 474 475
        if not self._interruptible:
            yield from asyncio.gather(
                    *(h.cur for h in self._handles.values() if h.cur is not None),
                    return_exceptions=True)
BAIRE Anthony's avatar
BAIRE Anthony committed
476

477 478

class SandboxManager(Manager):
BAIRE Anthony's avatar
BAIRE Anthony committed
479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517
    """Manager for sandbox operation

    This manager handles all sandbox operations (start, stop, commit).

    Operations are requested asynchronously in the database:
     - Webapp.sandbox_state=starting (for starting a sandbox)
     - Webapp.sandbox_state=stopping (for stopping a sandbox)
     - WebappVersion with state=sandbox (for committing a new image)

    All state changes on Webapp.sandbox_state are atomic (e.g: if the user
    requests a stop while the sandbox is starting then the manager finishes
    with starting the sandbox but does not update the state, and it runs
    immediately again to stop the sandbox)

    Whatever is the value of Webapp.sandbox_state, the manager first examines
    commit requests and makes the commit if requested.

    If the container already exists before starting the webapp (not possible in
    normal operations), then a recovery image is committed first.

    When a commit is successful. The image manager is notified for pushing the
    image to the registry and (if it is not a recovery image) to pull it to the
    swarm (because this image will very likely be used soon).

    State changes:

     - sandbox start:
            starting->running     (normal case)
            starting->start_error (error case)

     - sandbox stop:
            stopping->idle        (normal case)
            stopping->stop_error  (error case)

     - image commit:
            sandbox->committed  (normal case)
            sandbox->error      (error case)
            (none)->committed   (recovery version) 
    """
518

519
    def __init__(self, ctrl, nb_threads = NB_SANDBOX_TASKS):
520
        super().__init__(nb_threads)
521 522 523
        self.ctrl = ctrl


BAIRE Anthony's avatar
BAIRE Anthony committed
524 525
    def inspect_sandbox(self, webapp):
        try:
BAIRE Anthony's avatar
BAIRE Anthony committed
526 527
            return self.ctrl.sandbox.inspect_container(
                    self.ctrl.gen_sandbox_name(webapp))
BAIRE Anthony's avatar
BAIRE Anthony committed
528 529 530
        except docker.errors.NotFound:
            return None

BAIRE Anthony's avatar
BAIRE Anthony committed
531 532 533 534 535 536

    @staticmethod
    def filter_commit_version(query, webapp_id):
        """Narrow a WebappVersion query to select the candidate versions to be committed"""
        return (query.
                filter_by(webapp_id=webapp_id,
BAIRE Anthony's avatar
BAIRE Anthony committed
537
                         state = int(VersionState.SANDBOX))
BAIRE Anthony's avatar
BAIRE Anthony committed
538 539
                    )

540
    def _start(self, webapp, image_tag):
541 542 543 544 545 546
        """Start a webapp sandbox

        (to be executed in a thread pool)
        """

        ctrl = self.ctrl
BAIRE Anthony's avatar
BAIRE Anthony committed
547
        ses  = ctrl.session
548 549

        # prepare sandbox parameters
550
        image = ":".join(image_tag)
BAIRE Anthony's avatar
BAIRE Anthony committed
551
        log.debug("sandbox %r: using image %r", webapp.docker_name, image)
552 553 554 555 556 557 558 559 560 561 562 563

        # safety checks
        # (because docker_name is used it the paths of the external volumes
        if ("/" in webapp.docker_name) or (webapp.docker_name in ("", ".", "..")):
            raise Error("malformatted docker_name")

        uid = webapp.id + 2000
        if uid < 2000:
            # just for safety
            raise Error("bad webapp id")

        # remove stale container (if any)
564 565
        if self.inspect_sandbox(webapp) is not None:
            self._stop(webapp)
566

BAIRE Anthony's avatar
BAIRE Anthony committed
567
        container = webapp.sandbox_name
568 569
        try:
            # prepare the sandbox
570 571 572 573
            # (create ssh keys)
            ctrl.check_host_path("isdir", ctrl.toolbox_path)
            ctrl.check_host_path("isdir", ctrl.sandbox_path)

BAIRE Anthony's avatar
BAIRE Anthony committed
574
            ctrl.sandbox.create_container("busybox:latest", name=container,
575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674
                    command = ["/bin/sh", "-c", """
set -ex

export PATH="$PATH:/.toolbox/bin"

# clean sandbox dir
rm -rf {sbx}

# create dirs
for dir in {sbx} {etc} {run}
do
    mkdir -p            ${{dir}}
    chown {uid}:65534   ${{dir}}
    chmod 0700          ${{dir}}
done

# xauth file
touch               {run}/XAuthority
chown {uid}:65534   {run}/XAuthority
chmod 0600          {run}/XAuthority

# generate ssh keys
(for type in ecdsa ed25519 rsa
do
    key={etc}/ssh_host_${{type}}_key
    [ -f $key ] || ssh-keygen -N '' -f $key -t $type >&2
    
    echo -n '{hostname}. ' | cat - ${{key}}.pub
done) > {etc}/ssh_known_hosts

# known_host file for allgo-shell
ssh-keygen -H -f {etc}/ssh_known_hosts
chmod 0644       {etc}/ssh_known_hosts
rm -f            {etc}/ssh_known_hosts.old

# authentication key for allgo-shell
rm -f               {etc}/identity
ssh-keygen -N '' -f {etc}/identity
chown {uid}:65534   {etc}/identity

# forced shell for the sshd config 
cat > {etc}/shell <<EOF
#!/bin/sh

export PATH="\$PATH:/.toolbox/bin"

uid=\`id -u\`
shell="\`getent passwd \$uid 2>/dev/null| cut -d : -f 7\`"
if [ -z "\$shell" ] ; then
    shell=/bin/sh
fi

if [ -n "\$SSH_ORIGINAL_COMMAND" ] ; then
    exec "\$shell" -c "\$SSH_ORIGINAL_COMMAND"
else
    exec "\$shell"
fi

EOF
chmod 755 {etc}/shell

# sshd config
cat > {etc}/sshd_config <<EOF
Port 22
Protocol 2

# turned off because it requires creating a 'sshd' user inside the sandbox
UsePrivilegeSeparation no

StrictModes no

ForceCommand /.sandbox/etc/ssh/shell

PermitRootLogin without-password

PubkeyAuthentication yes
AuthorizedKeysFile  /.sandbox/etc/ssh/identity.pub .ssh/authorized_keys .ssh/authorized_keys2

ChallengeResponseAuthentication no
PasswordAuthentication          no

X11Forwarding yes
X11DisplayOffset 10
PrintMotd no
PrintLastLog no
TCPKeepAlive yes

# Allow client to pass locale environment variables
AcceptEnv LANG LC_*

Subsystem sftp internal-sftp

UsePAM no
EOF
                    """.format(uid=uid,
                        hostname = "%s-sandbox-%s" % (ctrl.env, webapp.docker_name),
                        sbx = "/mnt/%s"         % webapp.docker_name,
                        etc = "/mnt/%s/etc/ssh" % webapp.docker_name,
                        run = "/mnt/%s/run"     % webapp.docker_name,
                        )],
675
                    host_config = ctrl.sandbox.create_host_config(
676 677 678 679
                        binds   = {
                            ctrl.sandbox_path: {"bind": "/mnt"},
                            ctrl.toolbox_path: {"bind": "/.toolbox", "mode": "ro"},
                    }))
680 681
            ctrl.sandbox.start(container)
            if ctrl.sandbox.wait(container):
682 683
                log.debug("sandbox %s output:\n%s", webapp.docker_name,
                        ctrl.sandbox.logs(container).decode(errors="replace"))
684 685 686 687
                raise Error("sandbox preparation failed")
            ctrl.sandbox.remove_container(container)

            # create and start the sandbox
688 689 690 691 692

            etc_dir = os.path.join(ctrl.sandbox_path, webapp.docker_name, "etc")
            run_dir = os.path.join(ctrl.sandbox_path, webapp.docker_name, "run")
            ctrl.check_host_path("isdir", etc_dir)
            ctrl.check_host_path("isdir", run_dir)
693

694
            if webapp.sandbox_version_id is None and webapp.entrypoint:
695 696
                # prepend instructions to initialise a dummy entrypoint
                dn, bn = os.path.split(webapp.entrypoint)
BAIRE Anthony's avatar
todo  
BAIRE Anthony committed
697
                # FIXME: do nothing if entrypoint already exists
698
                prepare = """
699
                    {mkdir}
700
                    test -f {entrypoint} || cat > {entrypoint} <<EOF
701 702 703 704 705 706 707
#!/bin/sh
echo
echo "This is app '{name}' called with parameters '\$@'"
echo
echo "The workdir contains:"
ls -l
EOF
708
                    chmod 0755 -- {entrypoint}
709

710
                """.format( entrypoint  = shlex.quote(webapp.entrypoint),
711
                            name        = webapp.docker_name,
712 713 714 715 716 717 718 719 720 721 722 723 724 725
                            mkdir       = (("mkdir -p -- %s" % shlex.quote(dn)) if dn else ""))
            else:
                prepare = ""

            command = ["/bin/sh", "-c", """
set -x
export PATH="$PATH:/.toolbox/bin"

{prepare}

# xauth file (needed for X11 forwarding)
touch       /root/.Xauthority
chmod 600   /root/.Xauthority

726 727 728 729
# unlock the root account in /etc/shadow
# (needed at least on Fedora)
usermod --unlock root

730 731
exec /.toolbox/bin/sshd -D
            """.format(prepare=prepare)]
732

733
            ctrl.sandbox.create_container(image, name=container, hostname=container,
734
                    entrypoint = [],
735
                    command = command,
736 737
                    host_config = ctrl.sandbox.create_host_config(
                        binds = {
738 739 740
                            etc_dir: {"bind": "/.sandbox/etc", "mode": "ro"},
                            run_dir: {"bind": "/.sandbox/run", "mode": "rw"},
                            ctrl.toolbox_path: {"bind": "/.toolbox", "mode": "ro"},
741 742 743
                            },
                        # TODO: maybe drop other caps
                        cap_drop = ["NET_RAW"],
744
                        restart_policy = {"Name": "unless-stopped"},
745
                        network_mode = ctrl.sandbox_network,
746 747 748 749 750
                        ))

            ctrl.sandbox.start(container)

        except:
BAIRE Anthony's avatar
BAIRE Anthony committed
751 752
            with docker_warning("cleanup error: unable to remove container %r",
                    container, ignore=docker.errors.NotFound):
753 754 755
                ctrl.sandbox.remove_container(container, force=True)
            raise

BAIRE Anthony's avatar
BAIRE Anthony committed
756
    def _commit(self, webapp, versions):
757 758 759 760 761 762 763 764 765 766 767
        """Commit a webapp sandbox

        (to be executed in a thread pool)

        The image version is looked up in webapp_versions (where state==sandbox).
        
        In case of any error, a recovery version is committed instead (to avoid
        loosing the work done inside the sandbox) and the candidates are put in
        error state.
        """
        ctrl = self.ctrl
BAIRE Anthony's avatar
BAIRE Anthony committed
768
        ses  = ctrl.session
769

BAIRE Anthony's avatar
BAIRE Anthony committed
770 771 772 773 774 775
        # pre-commit checks
        # - ensure that there is exactly one candidate webapp_version with
        #   state=sandbox
        # - otherwise:
        #   - put all candidates in error state
        #   - create a recovery version
776

BAIRE Anthony's avatar
BAIRE Anthony committed
777 778 779 780 781
        # version be committed
        version = None
        
        # error msg (if any)
        error = None
782

BAIRE Anthony's avatar
BAIRE Anthony committed
783 784
        # version ids to be recovered
        recover = ()
785

BAIRE Anthony's avatar
BAIRE Anthony committed
786 787 788 789 790 791
        if len(versions) == 1:
            # normal case (sandbox commit)
            version = versions[0]
            if not version.number:
                error   = "empty version number"
                recover = version.id,
792

BAIRE Anthony's avatar
BAIRE Anthony committed
793 794 795
        elif not versions:
            # sandbox rollback (when user drops a sandbox without committing a new image)
            error     = "dangling sandbox"
796

BAIRE Anthony's avatar
BAIRE Anthony committed
797 798 799 800 801
        else:
            # multiple candidates (should never happen)
            error    = "multiple candidate versions (%s)" % (
                ", ".join(map(repr, sorted(v.number for v in versions))))
            recover = tuple(v.id for v in versions)
802

BAIRE Anthony's avatar
BAIRE Anthony committed
803

BAIRE Anthony's avatar
BAIRE Anthony committed
804
        if error:
805 806
            description = "pre-commit error: " + error
            log.error("sandbox %r version id %r: %s", webapp.docker_name, recover, description)
807

BAIRE Anthony's avatar
BAIRE Anthony committed
808
            with ses.begin():
809 810
                # put all candidates in 'error state'
                if recover:
BAIRE Anthony's avatar
BAIRE Anthony committed
811
                    ses.execute('''UPDATE dj_webapp_versions
812 813
                            SET description=CONCAT(description, " [", :description, "]"), state=:state
                            WHERE id IN :ids''', dict(description=description, ids=recover,
BAIRE Anthony's avatar
BAIRE Anthony committed
814
                                state=int(VersionState.ERROR)))
815 816

                # create a recovery version
BAIRE Anthony's avatar
BAIRE Anthony committed
817 818 819
                version = WebappVersion(
                        webapp_id = webapp.id,
                        number    = time.strftime("recovery-%Y%m%d-%H%M%S"),
820
                        description = description,
BAIRE Anthony's avatar
BAIRE Anthony committed
821
                        published = False,
BAIRE Anthony's avatar
BAIRE Anthony committed
822
                        state     = int(VersionState.SANDBOX))
BAIRE Anthony's avatar
BAIRE Anthony committed
823
                ses.add(version)
824
                version.webapp
BAIRE Anthony's avatar
BAIRE Anthony committed
825
            ses.refresh(version)
826
            ses.expunge_all()
827

BAIRE Anthony's avatar
BAIRE Anthony committed
828
        assert version is not None
829

BAIRE Anthony's avatar
BAIRE Anthony committed
830
        # commit the docker image
831

832 833
        image, tag = ctrl.gen_image_tag(version, webapp)

BAIRE Anthony's avatar
BAIRE Anthony committed
834
        log.debug("dicts %r %r", webapp.__dict__, version.__dict__)
835 836
        log.info("commit sandbox %r version %r (%s)",
                webapp.docker_name, version.number, tag)
837

BAIRE Anthony's avatar
BAIRE Anthony committed
838 839 840 841
        container = webapp.sandbox_name
        next_state = image_size = None
        try:
            # stop the container (if stopping or if creating a new sandbox)
BAIRE Anthony's avatar
BAIRE Anthony committed
842
            if webapp.sandbox_state in (SandboxState.STOPPING, SandboxState.STARTING):
BAIRE Anthony's avatar
BAIRE Anthony committed
843 844
               ctrl.sandbox.stop(container)
               ctrl.sandbox.wait(container)
845

BAIRE Anthony's avatar
BAIRE Anthony committed
846
            # commit
847
            cid = ctrl.sandbox.commit(container, image, tag)
BAIRE Anthony's avatar
BAIRE Anthony committed
848
            next_state = VersionState.COMMITTED
BAIRE Anthony's avatar
BAIRE Anthony committed
849 850 851 852 853 854 855
            image_size = ctrl.sandbox.inspect_image(cid)["Size"]

            return version, error

        except docker.errors.NotFound:
            error = "commit error: container not found %r" % container
            log.error("%s", error)
BAIRE Anthony's avatar
BAIRE Anthony committed
856
            next_state = VersionState.ERROR
BAIRE Anthony's avatar
BAIRE Anthony committed
857
            image_size = 0
BAIRE Anthony's avatar
BAIRE Anthony committed
858
            ses.execute('''UPDATE dj_webapp_versions
859
                    SET description=CONCAT(description, " [commit error: sandbox is down]")
BAIRE Anthony's avatar
BAIRE Anthony committed
860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882
                    WHERE id=%d''' % version.id)

            # here we do not propagate the error to allow starting/stopping the
            # sandbox immediately (without going through sandbox_state=:error)
            return None, error

        except Exception as e:
            log.exception("sandbox %r version %r: unexpected commit error (sandbox may still be recovered)",
                    webapp.docker_name, version.number)
            raise

        finally:
            # NOTE: if anything unexpected happens, the version is
            # left in state 'sandbox' and we propagate the exception to
            # ensure the work done inside the sandbox is not dropped
            # and the sandbox is pute in 'error' state
            #
            # The error will not be reported to the user. If this is an issue,
            # then the solution would be to create another error state (to be
            # used when the sandbox is still there).
            # 
            if next_state is not None:
                with ses.begin():
BAIRE Anthony's avatar
BAIRE Anthony committed
883
                    ses.execute("UPDATE dj_webapp_versions SET state=%d, docker_image_size=%d WHERE id=%d"
BAIRE Anthony's avatar
BAIRE Anthony committed
884
                            % (next_state, image_size, version.id))
BAIRE Anthony's avatar
BAIRE Anthony committed
885

886

BAIRE Anthony's avatar
BAIRE Anthony committed
887
    @asyncio.coroutine
BAIRE Anthony's avatar
BAIRE Anthony committed
888
    def _manage_commit(self, webapp, versions, *, force=False):
BAIRE Anthony's avatar
BAIRE Anthony committed
889 890 891 892 893 894 895 896
        """Manage sandbox commit (if needed) and notify the image manager

        The commit is performed if one of these conditions is fulfilled:
        - a commit was requested (there is at least one WebappVersion entry
          with state=sandbox for this app)
        - a docker container exists (for this sandbox) and force is true
        """

BAIRE Anthony's avatar
BAIRE Anthony committed
897
        if not (versions or (force and self.inspect_sandbox(webapp) is not None)):
BAIRE Anthony's avatar
BAIRE Anthony committed
898 899 900
            return
        
        # do the commit
BAIRE Anthony's avatar
BAIRE Anthony committed
901
        version, error = yield from self.run_in_executor(self._commit, webapp, versions)
BAIRE Anthony's avatar
BAIRE Anthony committed
902 903

        # trigger push/pull operations (but do not wait)
BAIRE Anthony's avatar
BAIRE Anthony committed
904
        if version is not None:
905 906

            # push to the registry
BAIRE Anthony's avatar
BAIRE Anthony committed
907
            self.ctrl.image_manager.push(version.id)
908 909 910 911

            if not error:
                # preemptive pull to the swarm
                # (the image may be needed soon)
BAIRE Anthony's avatar
BAIRE Anthony committed
912
                self.ctrl.image_manager.pull(version.id, swarm=True)
913 914


BAIRE Anthony's avatar
BAIRE Anthony committed
915 916
    def _stop(self, webapp):
        """Stop a webapp sandbox
917

BAIRE Anthony's avatar
BAIRE Anthony committed
918
        (to be executed in a thread pool)
919 920 921
        """
        try:
            # FIXME: remove volumes (v=True) too ?
BAIRE Anthony's avatar
BAIRE Anthony committed
922
            self.ctrl.sandbox.remove_container(webapp.sandbox_name, force=True)
923 924 925
        except docker.errors.NotFound:
            pass

926 927

    @asyncio.coroutine
BAIRE Anthony's avatar
BAIRE Anthony committed
928
    def _process(self, webapp_id, reset, rescheduled):
929 930 931
        ctrl = self.ctrl

        log.debug("process sandbox %d", webapp_id)
BAIRE Anthony's avatar
BAIRE Anthony committed
932 933 934

        ses = ctrl.session
        with ses.begin():
935
            # current state of the sandbox + load docker_os
BAIRE Anthony's avatar
BAIRE Anthony committed
936 937 938 939
            webapp = ses.query(Webapp).filter_by(id=webapp_id).one()
            webapp.docker_os

            # version to be started
940
            sandbox_version = webapp.sandbox_version
BAIRE Anthony's avatar
BAIRE Anthony committed
941 942 943 944 945 946 947 948 949

            # requested commits
            commit_versions = self.filter_commit_version(ses.query(WebappVersion), webapp_id).all()

            ses.expunge_all()

        # docker name of the sandbox & image
        webapp.sandbox_name = ctrl.gen_sandbox_name(webapp)

BAIRE Anthony's avatar
BAIRE Anthony committed
950
        phase = "inspect"
951
        next_state = fail_state = None
952
        try:
BAIRE Anthony's avatar
BAIRE Anthony committed
953
            if webapp.sandbox_state == SandboxState.STARTING:
954
                # start the sandbox
BAIRE Anthony's avatar
BAIRE Anthony committed
955
                phase = "start"
BAIRE Anthony's avatar
BAIRE Anthony committed
956 957
                next_state = SandboxState.RUNNING
                fail_state = SandboxState.START_ERROR
958

BAIRE Anthony's avatar
BAIRE Anthony committed
959
                # commit (if a sandbox exists)
BAIRE Anthony's avatar
BAIRE Anthony committed
960
                yield from self._manage_commit(webapp, commit_versions, force=True)
BAIRE Anthony's avatar
BAIRE Anthony committed
961

BAIRE Anthony's avatar
BAIRE Anthony committed
962
                if sandbox_version is not None:
963 964 965 966 967
                    # ensure version belongs to this application
                    if sandbox_version.webapp_id != webapp.id:
                        raise Error("invalid version id %d (belongs to webapp %d)" % (
                            sandbox_version.id, sandbox_version.webapp_id))

968 969
                    image_tag = ctrl.gen_image_tag(sandbox_version, webapp)

970
                    # pull requested image
BAIRE Anthony's avatar
BAIRE Anthony committed
971
                    yield from ctrl.image_manager.pull(sandbox_version.id)
972
                else:
973 974 975
                    image_tag = (ctrl.gen_factory_name(webapp.docker_os),
                             webapp.docker_os.version)

976
                    # pull image
977
                    yield from ctrl.image_manager.sandbox_pull_manager.process(image_tag)
BAIRE Anthony's avatar
BAIRE Anthony committed
978 979

                # start sandbox
980
                yield from self.run_in_executor(self._start, webapp, image_tag)
981

BAIRE Anthony's avatar
BAIRE Anthony committed
982
            elif webapp.sandbox_state == SandboxState.STOPPING:
983
                # stop the sandbox
BAIRE Anthony's avatar
BAIRE Anthony committed
984
                phase = "stop"
BAIRE Anthony's avatar
BAIRE Anthony committed
985 986
                next_state = SandboxState.IDLE
                fail_state = SandboxState.STOP_ERROR
987

BAIRE Anthony's avatar
BAIRE Anthony committed
988
                # commit (if requested)
BAIRE Anthony's avatar
BAIRE Anthony committed
989
                yield from self._manage_commit(webapp, commit_versions)
990

BAIRE Anthony's avatar
BAIRE Anthony committed
991
                yield from self.run_in_executor(self._stop, webapp)
992 993

            else:
BAIRE Anthony's avatar
BAIRE Anthony committed
994 995
                # commit (if requested)
                phase = "commit"
BAIRE Anthony's avatar
BAIRE Anthony committed
996
                yield from self._manage_commit(webapp, commit_versions)
BAIRE Anthony's avatar
BAIRE Anthony committed
997

998 999 1000 1001
        except ShuttingDown:
            next_state = None
            log.info("sandbox %r %s aborted (controller shutdown)", webapp.docker_name, phase)

1002 1003 1004 1005
        except BaseException as e:
            next_state = fail_state

            log_func = log.error if isinstance(e, (docker.errors.APIError, Error)) else log.exception
BAIRE Anthony's avatar
BAIRE Anthony committed
1006
            log_func ("sandbox %r %s error (%s)", webapp.docker_name, phase,
1007 1008 1009 1010 1011 1012 1013 1014
                    traceback.format_exception_only(type(e), e)[-1].strip())

        finally:
            if next_state is not None:
                # atomically update the sandbox state in the db
                # (in case another action is requested during the process, eg: the user
                #  stops the sandbox while it is not fully started)
                log.info("sandbox %r is now in state %r", webapp.docker_name, next_state.name)
BAIRE Anthony's avatar
BAIRE Anthony committed
1015
                with ses.begin():
BAIRE Anthony's avatar
BAIRE Anthony committed
1016
                    ses.execute("UPDATE dj_webapps SET sandbox_state=%d WHERE id=%d AND sandbox_state=%d" %
BAIRE Anthony's avatar
BAIRE Anthony committed
1017
                            (next_state, webapp_id, webapp.sandbox_state))
1018

1019 1020
                yield from self.ctrl.notif_webapp_updated(webapp_id)

1021 1022 1023
            log.debug("done    sandbox %d", webapp_id)


1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068
class ImportManager(Manager):
    """Manager for importing webapp versions"""
    def __init__(self, ctrl, nb_threads=1):
        super().__init__(nb_threads)
        self.ctrl = ctrl

    async def _process(self, webapp_id, reset, rescheduled):
        ctrl = self.ctrl
        ses = ctrl.session

        # lock the manager so that we only have a single import running
        # concurrently (to avoid overloading the push manager and affecting
        # users working on a sandbox)
        with await iter(self): 
            log.debug("process import %d", webapp_id)

            with ses.begin():
                docker_name, = ses.query(Webapp.docker_name).filter_by(id=webapp_id).one()
                versions = [(v.id, v.number) + ctrl.gen_image_tag(v)
                        for v in ses.query(WebappVersion).filter_by(
                            webapp_id=webapp_id, state=int(VersionState.IMPORT))]

            for version_id, number, dst_repo, dst_tag  in versions:
                log.info("importing %s:%s (id%s)", docker_name, number, version_id)

                # pull the image from the old registry
                src_repo = "%s/%s" % (ctrl.import_registry, docker_name)
                src_image = "%s:%s" % (src_repo, number)
                await ctrl.image_manager.sandbox_pull_manager.process(
                        (src_repo, number))

                # commit the image
                log.info("tag %s as %s:%s", src_image, dst_repo, dst_tag)
                await self.run_in_executor(ctrl.sandbox.tag, src_image,
                        dst_repo, dst_tag, lock=False)

                # atomically mark the image as committed
                with ses.begin():
                    ses.execute("UPDATE dj_webapp_versions SET state=%d WHERE id=%d AND state=%d"
                            % (VersionState.COMMITTED, version_id, VersionState.IMPORT))

                # push the image to the new registry
                await ctrl.image_manager.push(version_id)


BAIRE Anthony's avatar
BAIRE Anthony committed
1069
class JobManager(Manager):
1070
    class JobInfo:
BAIRE Anthony's avatar
BAIRE Anthony committed
1071
        __slots__ = "job_id", "ver_id", "ctr_id", "version", "ctr_name", "client", "cpu", "mem", "node_id", "timeout"
1072

1073
    def __init__(self, ctrl):
1074
        super().__init__(0)
BAIRE Anthony's avatar
BAIRE Anthony committed
1075 1076
        self.ctrl = ctrl

1077 1078 1079 1080 1081 1082

    @asyncio.coroutine
    def __iter__(self):
        raise NotImplementedError()


1083 1084 1085 1086
    # create the job and:
    # - return True  if started
    # - return False if cancelled by the user
    # - raise exception on error
1087
    def _create_job(self, info):
BAIRE Anthony's avatar
BAIRE Anthony committed
1088 1089
        ctrl = self.ctrl
        ses  = ctrl.session
1090 1091 1092
        tmp_img = None

        assert info.ctr_id is None
BAIRE Anthony's avatar
BAIRE Anthony committed
1093 1094 1095

        try:
            with ses.begin():</