#!/usr/bin/python3


import asyncio
import collections
from   concurrent.futures   import ThreadPoolExecutor
import contextlib
import datetime
import enum
import itertools
import json
import logging
import re
import os
import shlex
import signal
import struct
import sys
import time
import threading
import traceback

import aiohttp
import aioredis
import docker
import iso8601
import MySQLdb
from   sqlalchemy import desc
import sqlalchemy.orm.scoping
import yaml

import config_reader
from database import *
from shared_swarm import SharedSwarmClient, ShuttingDown

HOST_PATH="/vol/host/"

##################################################
# default number of concurrent tasks

# pushing images from the sandbox to the registry
NB_PUSH_SANDBOX = 2

# pulling images from the registry to the sandbox
NB_PULL_SANDBOX = 2

# pulling images from the registry to the swarm
NB_PULL_SWARM   = 2

# sandbox actions (start, stop, commit, ...)
NB_SANDBOX_TASKS = 4

# jobs
NB_JOB_TASKS     = 4


# default thread pool (used by most of the tasks)
default_executor = ThreadPoolExecutor(10)

##################################################
# redis keys

# job log
REDIS_KEY_JOB_LOG       = "log:job:%d"
REDIS_KEY_JOB_STATE     = "state:job:%d"

# pubsub channels for waking up allgo.aio (frontend) and the controller
# (ourselves)
REDIS_CHANNEL_AIO        = "notify:aio"
REDIS_CHANNEL_CONTROLLER = "notify:controller"

# pubsub messages
REDIS_MESSAGE_JOB_UPDATED    = "job:%d"
REDIS_MESSAGE_WEBAPP_UPDATED = "webapp:%d"


##################################################


# interval (in seconds) for polling the db
DB_CHECK_PERIOD = 3600

log = logging.getLogger("controller")

assert MySQLdb.threadsafety >= 1


class Error(Exception):
    pass


class RateLimiter:
    """Generator for rate limiting

    This asynchronous iterator ensures we spend at least `period` seconds in
    each iteration.

    usage:
        async for _ in RateLimiter(60):
            ...
    """
    def __init__(self, period):
        assert period > 0
        self.period = period
        self._t0 = time.monotonic()-period

    def __aiter__(self):
        return self

    async def __anext__(self):
        t1 = time.monotonic()
        delay = self._t0 - t1 + self.period
        if delay > 0:
            log.debug("rate_limit: sleep %f seconds", delay)
            await asyncio.sleep(delay)
            self._t0 = t1 + delay
        else:
            self._t0 = t1


def docker_check_error(func, *k, **kw):
    """Wrapper for docker-py methods that produce a stream

    Methods producing a stram (eg: push, build) do not report all errors
    by raising exceptions. Some errors are reported later in the stream.

    This function parses the stream and raise Error() if needed.
    """
    for elem in func(*k, stream=True, **kw):
        js = json.loads(elem.decode())
        if "error" in js:
            raise Error("push error: " + js["error"])

@contextlib.contextmanager
def docker_warning(msg, *k, ignore=None):
    """Catch docker errors and issue a warning instead"""
    try:
        yield
    except docker.errors.APIError as e:
        if ignore is None or not isinstance(e, ignore):
            k += e,
            log.warning(msg + " (%s)", *k)

@contextlib.contextmanager
def report_error(fmt, *k):
    """Context manager for logging exceptions

    This function logs exceptions (when leaving the context) with log.error()
    (if the exception inherit from Error) or log.exception() otherwise.

    The log message is prepended with the string generated by: fmt % k
    """
    try:
        yield
    except Exception as e:
        msg = fmt % k
        log_func = log.error if isinstance(e, Error) else log.exception
        log_func("%s (%s)", msg,
                traceback.format_exception_only(type(e), e)[-1].strip())
        raise

def disable_future_warning(fut):
    """Add a dummy callback to a future to prevent asyncio warnings

    Return: the future

    The asynicio module log a warning message when the result of a future is
    not used. This function installs a dummy callback to the future so as to
    avoid this warning. This is useful for futures whose result *may* be
    ignored by the application.
    """
    fut.add_done_callback(lambda f: f.exception())
    return fut


def auto_create_task(func):
    """Decorator for forcing the creation of a task when a coroutine function is called

    Return a wrappers that calls the function, create the task on the fly and
    return it. Also it installs a no-op callback to avoid warnings in case the
    result is not used.
    """
    assert asyncio.iscoroutinefunction(func)

    return (lambda *k, **kw:
        disable_future_warning(asyncio.async(func(*k, **kw))))

def cascade_future(src, dst):
    """propagate the result of a future to another future
    
    This function installs a callback to the future `src`, that propagates its
    result to the future `dst`.
    """
    def callback(fut):
        ex = fut.exception()
        if ex is None:
            dst.set_result(fut.result())
        else:
            dst.set_exception(ex)

    if src.done():
        callback(src)
    else:
        src.add_done_callback(callback)

async def _make_aiohttp_client(host):
    """Create a aiohttp session for connecting to a docker engine

    `host` is a docker host url (DOCKER_HOST) it may be either a `tcp://`
    or `unix://` url

    Return a tuple `(session, url)` where `session` is a aiohttp client
    session and `url` is the http(s) url of the docker engine
    """
    url = docker.utils.parse_host(host)
    mo = re.match(r"http\+unix:/+(/.*)", url)
    if mo is None:
        # HTTP over TCP
        connector = None
    else:
        # HTTP over unix socket
        url = "http://DUMMY.localhost"
        path = mo.group(1)
        connector = aiohttp.UnixConnector(path)
    session = aiohttp.ClientSession(connector=connector)
    return session, url
def make_aiohttp_client(host):
    return asyncio.get_event_loop().run_until_complete(_make_aiohttp_client(host))

class Manager:
    """A class for scheduling asynchronous jobs on a collection of keys

    (the one-line summary is not very helpful, sorry)
    

    Job submission

    The job is scheduled on key KEY by calling .process(KEY), the function
    returns an asyncio.Future object (which will receive the result)

    If .process(KEY) is called while a job is already running for KEY, then the
    manager arranges the job to be run a second time in a row for the same key


    Job implementation

    This is an abstract class. The job shall be implemented as the coroutine
    named ._process() in the inherited class.

    The manager guarantees that ._process() cannot be called multiple times
    concurrently for the same key

    In case a shutdown is requested (see .shutdown()):
     - all jobs that are not yet started are cancelled
     - running jobs continue until they return or until they try to acquire the
       internal semaphore (which raises ShuttingDown())


    Concurrency

    The job tasks are all started immediately (whatever the number of existing
    jobs)

    It is possible to limit the concurrency to a maximum number of parallel
    tasks. Manager provides an internal semaphore (the number of tokens is set
    in .__init__())

    The semaphore can be locked in two ways:
     - by locking the manager:

            with (yield from self): ... concurrency limited to `nb_tokens`
            tasks ...

     - by calling .run_in_executor (this is for running non-async code):

            yield from self.run_in_executor(func, args...)

    Shutdown

    When .shutdown() is called the Manager ensures that all jobs are properly
    terminated
     - it cancels all jobs that are not yet started
     - it prevents starting new jobs
     - it lets running pending jobs, but interrupts them if they when ther try
       to acquire the internal semaphore

    All cancel/interrupted tasks have their future raise ShuttingDown().

    .shutdown() returns after all pending jobs are terminated.


    Thread safety

    - Manager is *not* thread safe, all public methods must be called from the
      same thread
    """

    class _Handle:
        __slots__ = "key", "cur", "nxt", "rescheduled"

    def __init__(self, nb_tokens=1, *, executor = default_executor, interruptible = False):
        # {key: _TaskHandler}
        self._handles = {}
        self._semaphore = asyncio.Semaphore(nb_tokens)
        self._shutdown = asyncio.Future()
        self._executor = executor
        self._interruptible = interruptible

    def _create_task(self, hnd):
        assert hnd.nxt is None
        def reset():
            assert not hnd.cur.done()
            nxt = hnd.nxt
            if nxt is not None:
                cascade_future(hnd.cur, nxt)
                hnd.nxt = None

            # recreate the 'rescheduled' future and return it
            hnd.rescheduled = disable_future_warning(asyncio.Future())
            return hnd.rescheduled

        hnd.rescheduled = disable_future_warning(asyncio.Future())
        hnd.cur = asyncio.async(self._process(hnd.key, reset, hnd.rescheduled))
        hnd.cur.add_done_callback(lambda fut: self._done(hnd))
        log.debug("task scheduled %r %r", self, hnd.key)
        return hnd.cur

    def process(self, key):
        """Schedule the job to be run on key `key`

        returns an asyncio.Future (that will provide the result of ._process())
        """
        if self._shutdown.done():
            return self._shutdown

        hnd = self._handles.get(key)
        if hnd is None:
            # create handle
            self._handles[key] = hnd = self._Handle()
            hnd.key = key
            hnd.cur = None
            hnd.nxt = None
            hnd.rescheduled = None

        if hnd.cur is None:
            # create task
            return self._create_task(hnd)
        else:
            # reschedule task
            if hnd.nxt is None:
                hnd.nxt = asyncio.Future()
            if not hnd.rescheduled.done():
                hnd.rescheduled.set_result(None)
            return hnd.nxt

    def _done(self, hnd):

        assert hnd is self._handles.get(hnd.key)
        assert hnd.cur.done()

        try:
            hnd.cur.result()
        except ShuttingDown:
            pass
        except Exception:
            log.exception("task %r %r unhandled exception", self, hnd.key)

        nxt     = hnd.nxt
        hnd.cur = hnd.nxt = None
        if nxt is None:
            del self._handles[hnd.key]
        else:
            cascade_future(self._create_task(hnd), nxt)


    @asyncio.coroutine
    def __iter__(self):
        """Coroutine for locking the internal semaphore
        
        Usage:
            with (yield from manager):
              ...

        Warning:
            after a shutdown is initiated, this function will always raise
            ShuttingDown()
        """
        ctx = yield from iter(self._semaphore)
        if self._shutdown.done():
            with ctx:
                raise ShuttingDown()
        return ctx


    @asyncio.coroutine
    def run_in_executor(self, *k, lock=True):
        """Run a function in a separate thread (with limited concurrency)

        This function locks the internal semaphore and runs the provided
        functionc call in a separate thread (using the executor)
        """

        def run():
            coro = asyncio.get_event_loop().run_in_executor(self._executor, *k)
            if self._interruptible:
                return next(asyncio.as_completed((coro, self._shutdown)))
            else:
                return coro
        
        if lock:
            with (yield from self):
                return (yield from run())
        else:
            return (yield from run())


    @asyncio.coroutine
    def _process(self, key, reset, rescheduled):
        """Actual implementation of the job (to be reimplemented in inherited classes)

        The Manager class guarantees that this function cannot be called
        multiple times concurrently on the same key (in case the same key is
        submitted multiple times, they Manager will call this function a second
        time after it has terminated).

        `rescheduled` is a future whose result is set when the job is being
        rescheduled (if process(key) is called before _process(key ...)
        terminates.

        `reset` is a function that may be called to reset the 'dirty' state of
        this key (this is to avoid calling ._process() a second time if not
        necessary) in that case, a new `rescheduled` future is returned.
        """
        raise NotImplementedError()

    @asyncio.coroutine
    def shutdown(self):
        """Initiate a graceful shutdown

        This coroutine terminates once all tasks are properly terminated.
        """
        exc = ShuttingDown()
        self._shutdown.set_exception(exc)
        self._shutdown.exception()  # to avoid asyncio warnings
        # cancel all 'next' tasks
        for hnd in self._handles.values():
            if hnd.nxt is not None:
                hnd.nxt.set_exception(exc)
                hnd.nxt = None

        if not self._interruptible:
            yield from asyncio.gather(
                    *(h.cur for h in self._handles.values() if h.cur is not None),
                    return_exceptions=True)


class SandboxManager(Manager):
    """Manager for sandbox operation

    This manager handles all sandbox operations (start, stop, commit).

    Operations are requested asynchronously in the database:
     - Webapp.sandbox_state=starting (for starting a sandbox)
     - Webapp.sandbox_state=stopping (for stopping a sandbox)
     - WebappVersion with state=sandbox (for committing a new image)

    All state changes on Webapp.sandbox_state are atomic (e.g: if the user
    requests a stop while the sandbox is starting then the manager finishes
    with starting the sandbox but does not update the state, and it runs
    immediately again to stop the sandbox)

    Whatever is the value of Webapp.sandbox_state, the manager first examines
    commit requests and makes the commit if requested.

    If the container already exists before starting the webapp (not possible in
    normal operations), then a recovery image is committed first.

    When a commit is successful. The image manager is notified for pushing the
    image to the registry and (if it is not a recovery image) to pull it to the
    swarm (because this image will very likely be used soon).

    State changes:

     - sandbox start:
            starting->running     (normal case)
            starting->start_error (error case)

     - sandbox stop:
            stopping->idle        (normal case)
            stopping->stop_error  (error case)

     - image commit:
            sandbox->committed  (normal case)
            sandbox->error      (error case)
            (none)->committed   (recovery version) 
    """

    def __init__(self, ctrl, nb_threads = NB_SANDBOX_TASKS):
        super().__init__(nb_threads)
        self.ctrl = ctrl


    def inspect_sandbox(self, webapp):
        try:
            return self.ctrl.sandbox.inspect_container(
                    self.ctrl.gen_sandbox_name(webapp))
        except docker.errors.NotFound:
            return None


    @staticmethod
    def filter_commit_version(query, webapp_id):
        """Narrow a WebappVersion query to select the candidate versions to be committed"""
        return (query.
                filter_by(webapp_id=webapp_id,
                         state = int(VersionState.SANDBOX))
                    )

    def _start(self, webapp, version):
        """Start a webapp sandbox

        (to be executed in a thread pool)
        """

        ctrl = self.ctrl
        ses  = ctrl.session

        # prepare sandbox parameters

        # docker image
        if version is None:
            image = "%s:%s" % (ctrl.gen_factory_name(webapp.docker_os),
                    webapp.docker_os.version)
        else:
            image = "%s:%s" % (webapp.image_name, version.number)

        log.debug("sandbox %r: using image %r", webapp.docker_name, image)

        # safety checks
        # (because docker_name is used it the paths of the external volumes
        if ("/" in webapp.docker_name) or (webapp.docker_name in ("", ".", "..")):
            raise Error("malformatted docker_name")

        uid = webapp.id + 2000
        if uid < 2000:
            # just for safety
            raise Error("bad webapp id")

        # remove stale container (if any)
        if self.inspect_sandbox(webapp) is not None:
            self._stop(webapp)

        container = webapp.sandbox_name
        try:
            # prepare the sandbox
            # (create ssh keys)
            ctrl.check_host_path("isdir", ctrl.toolbox_path)
            ctrl.check_host_path("isdir", ctrl.sandbox_path)

            ctrl.sandbox.create_container("busybox:latest", name=container,
                    command = ["/bin/sh", "-c", """
set -ex

export PATH="$PATH:/.toolbox/bin"

# clean sandbox dir
rm -rf {sbx}

# create dirs
for dir in {sbx} {etc} {run}
do
    mkdir -p            ${{dir}}
    chown {uid}:65534   ${{dir}}
    chmod 0700          ${{dir}}
done

# xauth file
touch               {run}/XAuthority
chown {uid}:65534   {run}/XAuthority
chmod 0600          {run}/XAuthority

# generate ssh keys
(for type in ecdsa ed25519 rsa
do
    key={etc}/ssh_host_${{type}}_key
    [ -f $key ] || ssh-keygen -N '' -f $key -t $type >&2
    
    echo -n '{hostname}. ' | cat - ${{key}}.pub
done) > {etc}/ssh_known_hosts

# known_host file for allgo-shell
ssh-keygen -H -f {etc}/ssh_known_hosts
chmod 0644       {etc}/ssh_known_hosts
rm -f            {etc}/ssh_known_hosts.old

# authentication key for allgo-shell
rm -f               {etc}/identity
ssh-keygen -N '' -f {etc}/identity
chown {uid}:65534   {etc}/identity

# forced shell for the sshd config 
cat > {etc}/shell <<EOF
#!/bin/sh

export PATH="\$PATH:/.toolbox/bin"

uid=\`id -u\`
shell="\`getent passwd \$uid 2>/dev/null| cut -d : -f 7\`"
if [ -z "\$shell" ] ; then
    shell=/bin/sh
fi

if [ -n "\$SSH_ORIGINAL_COMMAND" ] ; then
    exec "\$shell" -c "\$SSH_ORIGINAL_COMMAND"
else
    exec "\$shell"
fi

EOF
chmod 755 {etc}/shell

# sshd config
cat > {etc}/sshd_config <<EOF
Port 22
Protocol 2

# turned off because it requires creating a 'sshd' user inside the sandbox
UsePrivilegeSeparation no

StrictModes no

ForceCommand /.sandbox/etc/ssh/shell

PermitRootLogin without-password

PubkeyAuthentication yes
AuthorizedKeysFile  /.sandbox/etc/ssh/identity.pub .ssh/authorized_keys .ssh/authorized_keys2

ChallengeResponseAuthentication no
PasswordAuthentication          no

X11Forwarding yes
X11DisplayOffset 10
PrintMotd no
PrintLastLog no
TCPKeepAlive yes

# Allow client to pass locale environment variables
AcceptEnv LANG LC_*

Subsystem sftp internal-sftp

UsePAM no
EOF
                    """.format(uid=uid,
                        hostname = "%s-sandbox-%s" % (ctrl.env, webapp.docker_name),
                        sbx = "/mnt/%s"         % webapp.docker_name,
                        etc = "/mnt/%s/etc/ssh" % webapp.docker_name,
                        run = "/mnt/%s/run"     % webapp.docker_name,
                        )],
                    host_config = ctrl.sandbox.create_host_config(
                        binds   = {
                            ctrl.sandbox_path: {"bind": "/mnt"},
                            ctrl.toolbox_path: {"bind": "/.toolbox", "mode": "ro"},
                    }))
            ctrl.sandbox.start(container)
            if ctrl.sandbox.wait(container):
                log.debug("sandbox %s output:\n%s", webapp.docker_name,
                        ctrl.sandbox.logs(container).decode(errors="replace"))
                raise Error("sandbox preparation failed")
            ctrl.sandbox.remove_container(container)

            # create and start the sandbox

            etc_dir = os.path.join(ctrl.sandbox_path, webapp.docker_name, "etc")
            run_dir = os.path.join(ctrl.sandbox_path, webapp.docker_name, "run")
            ctrl.check_host_path("isdir", etc_dir)
            ctrl.check_host_path("isdir", run_dir)

            if version is None and webapp.entrypoint:
                # prepend instructions to initialise a dummy entrypoint
                dn, bn = os.path.split(webapp.entrypoint)
                # FIXME: do nothing if entrypoint already exists
                prepare = """
                    {mkdir}
                    test -f {entrypoint} || cat > {entrypoint} <<EOF
#!/bin/sh
echo
echo "This is app '{name}' called with parameters '\$@'"
echo
echo "The workdir contains:"
ls -l
EOF
                    chmod 0755 -- {entrypoint}

                """.format( entrypoint  = shlex.quote(webapp.entrypoint),
                            name        = webapp.docker_name,
                            mkdir       = (("mkdir -p -- %s" % shlex.quote(dn)) if dn else ""))
            else:
                prepare = ""

            command = ["/bin/sh", "-c", """
set -x
export PATH="$PATH:/.toolbox/bin"

{prepare}

# xauth file (needed for X11 forwarding)
touch       /root/.Xauthority
chmod 600   /root/.Xauthority

exec /.toolbox/bin/sshd -D
            """.format(prepare=prepare)]

            ctrl.sandbox.create_container(image, name=container, hostname=container,
                    command = command,
                    host_config = ctrl.sandbox.create_host_config(
                        binds = {
                            etc_dir: {"bind": "/.sandbox/etc", "mode": "ro"},
                            run_dir: {"bind": "/.sandbox/run", "mode": "rw"},
                            ctrl.toolbox_path: {"bind": "/.toolbox", "mode": "ro"},
                            },
                        # TODO: maybe drop other caps
                        cap_drop = ["NET_RAW"],
                        restart_policy = {"Name": "unless-stopped"},
                        network_mode = ctrl.sandbox_network,
                        ))

            ctrl.sandbox.start(container)

        except:
            with docker_warning("cleanup error: unable to remove container %r",
                    container, ignore=docker.errors.NotFound):
                ctrl.sandbox.remove_container(container, force=True)
            raise

    def _commit(self, webapp, versions):
        """Commit a webapp sandbox

        (to be executed in a thread pool)

        The image version is looked up in webapp_versions (where state==sandbox).
        
        In case of any error, a recovery version is committed instead (to avoid
        loosing the work done inside the sandbox) and the candidates are put in
        error state.
        """
        ctrl = self.ctrl
        ses  = ctrl.session

        # pre-commit checks
        # - ensure that there is exactly one candidate webapp_version with
        #   state=sandbox
        # - otherwise:
        #   - put all candidates in error state
        #   - create a recovery version

        # version be committed
        version = None
        
        # error msg (if any)
        error = None

        # version ids to be recovered
        recover = ()

        if len(versions) == 1:
            # normal case (sandbox commit)
            version = versions[0]
            if not version.number:
                error   = "empty version number"
                recover = version.id,

        elif not versions:
            # sandbox rollback (when user drops a sandbox without committing a new image)
            error     = "dangling sandbox"

        else:
            # multiple candidates (should never happen)
            error    = "multiple candidate versions (%s)" % (
                ", ".join(map(repr, sorted(v.number for v in versions))))
            recover = tuple(v.id for v in versions)

        # TODO: make 'sandbox' a reserved name

        if error:
            changelog = "pre-commit error: " + error
            log.error("sandbox %r version id %r: %s", webapp.docker_name, recover, changelog)

            with ses.begin():
                # put all candidates in 'error state'
                if recover:
                    ses.execute('''UPDATE dj_webapp_versions
                            SET changelog=CONCAT(changelog, " [", :changelog, "]"), state=:state
                            WHERE id IN :ids''', dict(changelog=changelog, ids=recover,
                                state=int(VersionState.ERROR)))

                # create a recovery version
                version = WebappVersion(
                        webapp_id = webapp.id,
                        number    = time.strftime("recovery-%Y%m%d-%H%M%S"),
                        changelog = changelog,
                        published = False,
                        state     = int(VersionState.SANDBOX))
                ses.add(version)
            ses.refresh(version)
            ses.expunge(version)

        assert version is not None

        # commit the docker image

        log.debug("dicts %r %r", webapp.__dict__, version.__dict__)
        log.info("commit sandbox %r version %r", webapp.docker_name, version.number)

        container = webapp.sandbox_name
        next_state = image_size = None
        try:
            # stop the container (if stopping or if creating a new sandbox)
            if webapp.sandbox_state in (SandboxState.STOPPING, SandboxState.STARTING):
               ctrl.sandbox.stop(container)
               ctrl.sandbox.wait(container)

            # commit
            cid = ctrl.sandbox.commit(container, webapp.image_name, version.number)
            next_state = VersionState.COMMITTED
            image_size = ctrl.sandbox.inspect_image(cid)["Size"]

            return version, error

        except docker.errors.NotFound:
            error = "commit error: container not found %r" % container
            log.error("%s", error)
            next_state = VersionState.ERROR
            image_size = 0
            ses.execute('''UPDATE dj_webapp_versions
                    SET changelog=CONCAT(changelog, " [commit error: sandbox is down]")
                    WHERE id=%d''' % version.id)

            # here we do not propagate the error to allow starting/stopping the
            # sandbox immediately (without going through sandbox_state=:error)
            return None, error

        except Exception as e:
            log.exception("sandbox %r version %r: unexpected commit error (sandbox may still be recovered)",
                    webapp.docker_name, version.number)
            raise

        finally:
            # NOTE: if anything unexpected happens, the version is
            # left in state 'sandbox' and we propagate the exception to
            # ensure the work done inside the sandbox is not dropped
            # and the sandbox is pute in 'error' state
            #
            # The error will not be reported to the user. If this is an issue,
            # then the solution would be to create another error state (to be
            # used when the sandbox is still there).
            # 
            if next_state is not None:
                with ses.begin():
                    ses.execute("UPDATE dj_webapp_versions SET state=%d, docker_image_size=%d WHERE id=%d"
                            % (next_state, image_size, version.id))


    @asyncio.coroutine
    def _manage_commit(self, webapp, versions, *, force=False):
        """Manage sandbox commit (if needed) and notify the image manager

        The commit is performed if one of these conditions is fulfilled:
        - a commit was requested (there is at least one WebappVersion entry
          with state=sandbox for this app)
        - a docker container exists (for this sandbox) and force is true
        """

        if not (versions or (force and self.inspect_sandbox(webapp) is not None)):
            return
        
        # do the commit
        version, error = yield from self.run_in_executor(self._commit, webapp, versions)

        # trigger push/pull operations (but do not wait)
        if version is not None:

            # push to the registry
            self.ctrl.image_manager.push(version.id)

            if not error:
                # preemptive pull to the swarm
                # (the image may be needed soon)
                self.ctrl.image_manager.pull(version.id, swarm=True)


    def _stop(self, webapp):
        """Stop a webapp sandbox

        (to be executed in a thread pool)
        """
        try:
            # FIXME: remove volumes (v=True) too ?
            self.ctrl.sandbox.remove_container(webapp.sandbox_name, force=True)
        except docker.errors.NotFound:
            pass


    @asyncio.coroutine
    def _process(self, webapp_id, reset, rescheduled):
        ctrl = self.ctrl

        log.debug("process sandbox %d", webapp_id)

        ses = ctrl.session
        with ses.begin():
            # current state of the sandbox + load docker os
            webapp = ses.query(Webapp).filter_by(id=webapp_id).one()
            webapp.docker_os

            # version to be started
            sandbox_version = webapp.sandbox_version

            # requested commits
            commit_versions = self.filter_commit_version(ses.query(WebappVersion), webapp_id).all()

            ses.expunge_all()

        # docker name of the sandbox & image
        webapp.sandbox_name = ctrl.gen_sandbox_name(webapp)
        webapp.image_name   = ctrl.gen_image_name(webapp)

        phase = "inspect"
        next_state = fail_state = None
        try:
            if webapp.sandbox_state == SandboxState.STARTING:
                # start the sandbox
                phase = "start"
                next_state = SandboxState.RUNNING
                fail_state = SandboxState.START_ERROR

                # commit (if a sandbox exists)
                yield from self._manage_commit(webapp, commit_versions, force=True)

                if sandbox_version is not None:
                    # ensure version belongs to this application
                    if sandbox_version.webapp_id != webapp.id:
                        raise Error("invalid version id %d (belongs to webapp %d)" % (
                            sandbox_version.id, sandbox_version.webapp_id))

                    # pull requested image
                    yield from ctrl.image_manager.pull(sandbox_version.id)
                else:
                    # pull image
                    yield from ctrl.image_manager.sandbox_pull_manager.process((
                        ctrl.gen_factory_name(webapp.docker_os),
                        webapp.docker_os.version))

                # start sandbox
                yield from self.run_in_executor(self._start, webapp, sandbox_version)

            elif webapp.sandbox_state == SandboxState.STOPPING:
                # stop the sandbox
                phase = "stop"
                next_state = SandboxState.IDLE
                fail_state = SandboxState.STOP_ERROR

                # commit (if requested)
                yield from self._manage_commit(webapp, commit_versions)

                yield from self.run_in_executor(self._stop, webapp)

            else:
                # commit (if requested)
                phase = "commit"
                yield from self._manage_commit(webapp, commit_versions)

        except ShuttingDown:
            next_state = None
            log.info("sandbox %r %s aborted (controller shutdown)", webapp.docker_name, phase)

        except BaseException as e:
            next_state = fail_state

            log_func = log.error if isinstance(e, (docker.errors.APIError, Error)) else log.exception
            log_func ("sandbox %r %s error (%s)", webapp.docker_name, phase,
                    traceback.format_exception_only(type(e), e)[-1].strip())

        finally:
            if next_state is not None:
                # atomically update the sandbox state in the db
                # (in case another action is requested during the process, eg: the user
                #  stops the sandbox while it is not fully started)
                log.info("sandbox %r is now in state %r", webapp.docker_name, next_state.name)
                with ses.begin():
                    ses.execute("UPDATE dj_webapps SET sandbox_state=%d WHERE id=%d AND sandbox_state=%d" %
                            (next_state, webapp_id, webapp.sandbox_state))

            log.debug("done    sandbox %d", webapp_id)


class JobManager(Manager):
    class JobInfo:
        __slots__ = "job_id", "ver_id", "ctr_id", "version", "ctr_name", "client", "cpu", "mem", "node_id", "timeout"

    def __init__(self, ctrl):
        super().__init__(0)
        self.ctrl = ctrl


    @asyncio.coroutine
    def __iter__(self):
        raise NotImplementedError()


    def _create_job(self, info):
        ctrl = self.ctrl
        ses  = ctrl.session
        tmp_img = None

        assert info.ctr_id is None

        try:
            with ses.begin():
                job = ses.query(Job).filter_by(id=info.job_id).one()
                webapp = job.webapp

                log.info("start job %d (%s:%s)",
                        info.job_id, webapp.docker_name, info.version)

                job.state = int(JobState.RUNNING)       # pragma: nobranch (TODO: remove (coverage bug))

            
            repo = ctrl.gen_image_name(webapp)
            image = "%s:%s" % (repo, info.version)

            job_path = ctrl.gen_job_path(job)
            log.debug("job.path: %r", job_path)

            if info.ver_id is None:
                assert info.version == "sandbox"
                image = tmp_img = info.client.commit(ctrl.gen_sandbox_name(webapp), repo, info.version)["Id"]
            
            # TODO use another workdir
            # TODO use another uid

            ctrl.check_host_path("isdir", job_path)
            hc = ctrl.sandbox.create_host_config(
                        binds = {job_path: {"bind": "/tmp"}},
                        cap_drop = ["all"],
                        # FIXME: CAP_DAC_OVERRIDE needed because all nfs files have uid,gid=1000,1000
                        cap_add = ["dac_override"],
                        cpu_quota   = (None if info.cpu is None else (info.cpu * 1024)),
                        cpu_period  = (None if info.cpu is None else 1024),
#                        cpu_shares = info.cpu,
                        mem_limit = info.mem,
                    )
            # NOTE: cpu_shares has a different meaining in docker swarm and docker engine
            #  - swarm:  nb of cpus
            #  - engine: 1/1024 share of the total cpu resouces of the machine
            # engine requires  cpu_share>1
            if ctrl.cpu_shares:
                # TODO: upgrade docker-py (and use create_host_config)
                hc["CpuShares"] = info.cpu
            log.debug("host_config %r", hc)
            info.ctr_id = info.client.create_container(image, name=info.ctr_name,
                    working_dir = "/tmp",
                    # NOTE: the command line is a little complex, but this is
                    #   to ensure that (TODO write tests for this):
                    #   - no output is lost (we go though a pipe in case the
                    #     app has multiple processes writing to stdout/stderr
                    #     concurrently)
                    #     FIXME: maybe ">>allgo.log  2>&1" is sufficent
                    #   - we get the exit code of the app (not the exit code of
                    #     cat)
                    #   - SIGTERM & SIGALRM are forwarded to the process (and
                    #     we call wait again becauce of EINTR)
                    #   - we have no unusual dependencies (only sh, cat, trap,
                    #     kill and mkfifo)
                    #   - we display a warning if the memory limit was reached
                    #     during the job
                    command = ["/bin/sh", "-c", """
                                pid=
                                interrupted=
                                sighnd() {{
                                    (echo
                                    echo "====  ALLGO JOB $2  ===="
                                    kill "-$1" "$pid") 2>&1 | tee -a allgo.log
                                    trap '' TERM ALRM
                                    interrupted=1
                                }}
                                trap "sighnd TERM ABORT"   TERM
                                trap "sighnd ALRM TIMEOUT" ALRM
                                fifo=/.allgo.fifo.{job_id}
                                mkfifo "$fifo" 2>&1 | tee -a allgo.log || exit $?

                                exec cat <"$fifo" | tee -a allgo.log &
                                exec "$@" >"$fifo" 2>&1 &
                                pid=$!

                                wait %2
                                code=$?
                                if [ -n "$interrupted" ] ; then
                                    wait %2
                                    code=$?
                                fi
                                wait %1
                                trap '' TERM ALRM

                                [ -n "$interrupted" ] || (
                                    echo
                                    if [ $code -eq 0 ] ; then
                                        echo "====  ALLGO JOB SUCCESS  ===="
                                    else
                                        echo "====  ALLGO JOB ERROR  ===="
                                        echo "process exited with code $code"
                                    fi

                                    failcnt="`cat /sys/fs/cgroup/memory/memory.failcnt`"
                                    if [ "$failcnt" -ne 0 ] ; then
                                        echo "WARNING: memory limit was reached (memory.failcnt=$failcnt)"
                                    fi
                                ) | tee -a allgo.log 

                                exit $code
                        """.format(job_id=job.id),
                        "job%d" % job.id, webapp.entrypoint] + shlex.split(job.param),

                    labels = {"allgo.tmp_img": tmp_img or ""},
                    environment=["constraint:node==" + info.node_id],
                    host_config = hc)["Id"]
            info.client.start(info.ctr_id)

            with ses.begin():
                # save the container_id into the db
                job.container_id = info.ctr_id

        except:
            #TODO introduce a state JobState.ERROR
            self._remove_job(info, tmp_img=tmp_img)
            raise


    def _remove_job(self, info, *, tmp_img=None):
        ses = self.ctrl.session

        # TODO: report launch errors to the user
        # TODO: report exit code to the user
        # TODO: use another uid

        def parse_docker_timestamp(value):
            return datetime.datetime.strptime(
                    # limit the precision to the microsecond
                    # (othewise strptime fails)
                    re.sub(r"(\.\d{,6})\d*Z$", r"\1Z", value),
                    # iso8601 format
                    "%Y-%m-%dT%H:%M:%S.%fZ")

        with ses.begin():
            job = ses.query(Job).filter_by(id=info.job_id).one()

            exec_time = 0.0
            if info.ctr_id is not None:
                try:
                    js = info.client.inspect_container(info.ctr_id)
                except docker.errors.NotFound:
                    pass
                else:
                    started_at  = js["State"].get("StartedAt", "0001-")
                    finished_at = js["State"].get("FinishedAt")

                    # default docker date is '0001-01-01T00:00:00Z'
                    if not started_at.startswith("0001-"):
                        try:
                            exec_time =( parse_docker_timestamp(finished_at)
                                        - parse_docker_timestamp(started_at)
                                        ).total_seconds()
                        except Exception: # pragma: nocover
                            log.exception("job %d: unable to compute exec time", info.job_id)

                    if tmp_img is None:
                        tmp_img = js["Config"]["Labels"].get("allgo.tmp_img") or None

                with docker_warning("job %d: cleanup error: unable to remove container", info.job_id):
                    info.client.remove_container(info.ctr_id)

            if tmp_img is not None:
                with docker_warning("job %d: cleanup error: unable to remove tmp image", info.job_id):
                    info.client.remove_image(tmp_img)

            job.exec_time = exec_time
            job.state     = int(JobState.DONE)
            job.container_id = None
            if job.result == JobResult.NONE:
                # FIXME: maybe we should have a 'unknown' result
                log.warning("job %d has not result, will fallback to 'ERROR'", info.job_id)
                job.result = int(JobResult.ERROR)

        log.info("done  job %d (result=%s, duration=%fs)", info.job_id, JobResult(job.result).name, exec_time)


    @asyncio.coroutine
    def _finish_job(self, info, reset):
        # wait for container termination (if running)
        if info.ctr_id is not None:
            def kill(sig):
                log.debug("kill job %d (signal %d)", info.job_id, sig)
                with docker_warning("unable to kill container %r", info.ctr_id):
                    info.client.kill(info.ctr_id, sig)

            @asyncio.coroutine
            def stop(sig, reason):
                log.info("stop  job %d (%s)", info.job_id, reason)
                try:
                    # graceful kill
                    kill(sig)
                    yield from asyncio.wait_for(asyncio.shield(wait_task), timeout=5)
                except asyncio.TimeoutError:
                    # hard kill (after 5 seconds)
                    kill(signal.SIGKILL)

            log_task  = asyncio.ensure_future(self._log_task(info))
            wait_task = asyncio.async(info.client.wait_async(info.ctr_id))
            timeout_task = (asyncio.Future() if info.timeout is None
                    else asyncio.async(asyncio.sleep(info.timeout)))
            result = None
            try:
                rescheduled = None
                while not wait_task.done():
                    # we must ensure that the job is not aborted or if the
                    # timeout expires

                    if rescheduled is None or rescheduled.done():
                        # check the job state in the db
                        rescheduled = reset()
                        ses = self.ctrl.session
                        with ses.begin():
                            state, = ses.query(Job.state).filter_by(id=info.job_id).one()
                            if state == JobState.ABORTING:
                                yield from self._notif_job_state(info, "ABORTING")
                                yield from stop(signal.SIGTERM, "user abort")
                                result = result or JobResult.ABORTED

                    elif timeout_task.done():
                        # timeout !
                        yield from stop(signal.SIGALRM, "timeout")
                        result = result or JobResult.TIMEOUT

                    yield from asyncio.wait((wait_task, timeout_task, rescheduled),
                            return_when=asyncio.FIRST_COMPLETED)

                returncode = wait_task.result()
                log.debug("job %d exit code: %r", info.job_id, returncode)
                result = result or (JobResult.SUCCESS if returncode==0 else JobResult.ERROR)
            except:
                # note: we do not cancel the log task on normal termination
                # (we let it finish reading the logs but in the background)
                log_task.cancel()
                raise
            finally:
                wait_task.cancel()
                timeout_task.cancel()
                with contextlib.suppress(asyncio.CancelledError):
                    yield from wait_task

                if result is not None:
                    ses = self.ctrl.session
                    with ses.begin():
                        ses.execute("UPDATE dj_jobs SET result=%d WHERE id=%d AND result=%d" %
                                (int(result), info.job_id, int(JobResult.NONE)))

        # remove container
        yield from self.run_in_executor(self._remove_job, info, lock=False)

    # task that streams the logs from the docker engine into the redis database
    async def _log_task(self, info):

        try:
            offset = 0
            log_key = REDIS_KEY_JOB_LOG % info.job_id
            notif_msg = REDIS_MESSAGE_JOB_UPDATED % info.job_id

            timeout = 3600 if info.timeout is None else (info.timeout + 60)
            if timeout <= 0:
                timeout = 600 
            log.debug("job %d: start log task (timeout %.1f seconds)",
                    info.job_id, timeout)
            async with info.client.aiohttp_session.get(
                    "%s/containers/%s/logs" % (info.client.aiohttp_url, info.ctr_id),
                    params={"follow": "1", "stdout": "1", "stderr": "1"},
                    timeout=timeout) as rep:
                rep.raise_for_status()
                while True:
                    # read the header (8-byte) and extract the chunk size
                    hdr = await rep.content.read(8)
                    if not hdr:
                        break
                    size, = struct.unpack("!L", hdr[4:])

                    #read the chunk
                    data = await rep.content.read(size)

                    # store it in redis
                    await self.ctrl.redis_client.setrange(log_key, offset, data)
                    await self.ctrl.redis_client.publish(REDIS_CHANNEL_AIO, notif_msg)
                    offset += len(data)

                # send eof
                await self.ctrl.redis_client.setrange(log_key, offset, b"\x04")
                await self.ctrl.redis_client.publish(REDIS_CHANNEL_AIO, notif_msg)
        except Exception as e:
            # NOTE: if the redis server goes down, then the log_task is totally
            # interrupted
            log.exception("job %d: exception in log task", info.job_id)
        finally:
            log.debug("job %d: log task terminated (%r bytes logged)",
                    info.job_id, offset)
            await self.ctrl.redis_client.expire(log_key, 86400)


    @asyncio.coroutine
    def _process(self, job_id, reset, rescheduled):
        ctrl = self.ctrl
        ses  = ctrl.session
        log.debug("process job id %d", job_id)

        with ses.begin():
            # query db
            job = ses.query(Job).filter_by(id=job_id).first()
            if job is None:     # pragma: nocover
                # unknown job
                log.warning("unknown job id %d", job_id)
                return

            state = JobState(job.state)

            if job.webapp is None:
                log.error("job %d: webapp id %r not found", job_id, job.webapp_id)
                if state == JobState.WAITING:       # pragma: nobranch
                    job.state  = int(JobState.DONE)
                    job.result = int(JobResult.ERROR)
                    job.exec_time = 0
                # TODO report error to the user ?
                return

            docker_name = job.webapp.docker_name
            
            info = self.JobInfo()
            info.job_id     = job_id
            info.ctr_id     = None
            info.node_id    = None
            info.version    = job.version
            info.ctr_name   = ctrl.gen_job_name(job)
            info.timeout    = job.queue.timeout

            # NOTE: .cpu .mem are the amount of cpu/mem requested when creating
            # a new job. They do not apply to already created jobs (by a
            # previous instance of the controller)
            info.cpu        = ctrl.cpu_shares
            info.mem        = job.webapp.memory_limit


            if job.version == "sandbox":
                info.client  = ctrl.sandbox
            else:
                info.client  = ctrl.swarm


            if state == JobState.WAITING:
                # job is not yet started
                if job.container_id is not None:
                    log.warning("job %d is in state WAITING but already has a container id: %r (will be ignored)",
                            job.id, job.container_id)

                if job.version == "sandbox":
                    # to be run in the sandbox
                    info.ver_id = None
                else:
                    # to be run in the swarm

                    # Find the wanted WebappVersion
                    #TODO: replace version_id with webapp_version_id
                    ver = ses.query(WebappVersion).filter_by(
                            webapp_id = job.webapp_id,
                            number    = job.version).filter(
                                WebappVersion.state.in_((
                                    int(VersionState.COMMITTED),
                                    int(VersionState.READY)))
                            ).order_by(
                                    WebappVersion.state.desc(),
                                    WebappVersion.id.desc()).first()
                    if ver is None:
                        log.error("job %d: webapp %r version %r not found",
                                job_id, job.webapp.docker_name, job.version)

                        job.state = int(JobState.DONE)
                        # TODO report error to the user
                        job.result= int(JobResult.ERROR)
                        return
                    info.ver_id = ver.id

            elif state in (JobState.RUNNING, JobState.ABORTING):     # pragma: nobranch
                # job is already started

                # we do not care about the actual version_id *but* we need to
                # know whether we are in the swarm or in the sandbox
                info.ver_id = None if job.version == "sandbox" else -1

                # inspect the container so as to:
                # - validate and store its id
                # - adjust the timeout if necessary
                if job.container_id is None:
                    # pre-migration jobs
                    # FIXME: to be removed after migration (the container id is now stored in the db at creation time)
                    try:
                        js = info.client.inspect_container("%s-job-%d-%s"
                                % (ctrl.env, job.id, job.webapp.docker_name))
                    except docker.errors.NotFound:
                        js = None

                else:
                    # check the presence of the container and validates its id against its name
                    js = ctrl.inspect_job_container(info.client, job)

                if js is not None:
                    # save the id
                    info.ctr_id = js["Id"]

                    # adjust the timeout
                    if info.timeout is not None:
                        uptime = (datetime.datetime.now(datetime.timezone.utc)
                                - iso8601.parse_date(js["State"]["StartedAt"])).seconds
                        info.timeout -= uptime
                        log.debug("job %d uptime: %.1fs, adjusted timeout is: %.1f",
                                info.job_id, uptime, info.timeout)
            else:
                # unexpected state
                if state != JobState.DONE:
                    log.warning("job id %d is in unexpected state %s", job_id, state.name)
                return


        if state == JobState.WAITING:
            # job is not yet started
            yield from self._notif_job_state(info, "WAITING")

            # pull the image to the swarm
            if info.ver_id is not None:
                # NOTE: race condition: will fail if ver.state==sandbox
                #  jobs must be submitted after the image is committed
                yield from ctrl.image_manager.pull(info.ver_id, swarm=True)

            # request a slot from the shared swarm
            with info.client.request_slot(info.ctr_name, info.cpu or 0, info.mem or 0):
                info.node_id = yield from info.client.wait_slot(info.ctr_name)
                yield from self.run_in_executor(self._create_job, info, lock=False)
            yield from self._notif_job_state(info, "RUNNING")
            yield from self._finish_job(info, reset)

        elif state in (JobState.RUNNING, JobState.ABORTING): # pragma: nobranch
            # the job is already running
            # -> wait for its termination
            yield from self._finish_job(info, reset)

        yield from self._notif_job_state(info, "DONE")

    # send a notification to the aio frontend when the job state is changed
    # 
    # this function never fail (redis errors are caught and written in the
    # logs)
    async def _notif_job_state(self, info, state):
        try:
            log.info("notify the frontend: job %d state is now %r",
                    info.job_id, state)
            await self.ctrl.redis_client.setex(REDIS_KEY_JOB_STATE % info.job_id,
                    86400, state)
            await self.ctrl.redis_client.publish(REDIS_CHANNEL_AIO,
                    REDIS_MESSAGE_JOB_UPDATED % info.job_id)
        except asyncio.CancelledError:
            pass
        except Exception as e:
            log.error("redis notification failed for job %d (%e)",
                    info.job_id, e)

# NOTE: for the push/pull managers, interruptible=True guarantees that the
#   managers terminate immediately, however it cannot guarantee that the
#   process will terminate immediately because the ThreadPoolExecuter installs
#   a atexit handler that joins all the background threads.
#
#   Anyway this is not a big issue since all pending push/pull raise
#   ShuttingDown immediately, thus we won't end up with a sandbox/job
#   in an inconsistent state when SIGKILL arrives.
#

class PullManager(Manager):
    def __init__(self, nb_threads, client, name, *, auth_config=None):
        super().__init__(nb_threads, interruptible=True)
        self.client    = client
        self.name      = name
        self.auth_config = auth_config

    @asyncio.coroutine
    def _process(self, img, reset, rescheduled):
        image, version = img
        log.info("pull to the %-10s %s:%s", self.name, image, version)
        return self.run_in_executor(lambda:
                self.client.pull(image, version, auth_config=self.auth_config))


class PushManager(Manager):
    def __init__(self, nb_threads, ctrl, *, auth_config=None):
        super().__init__(nb_threads, interruptible=True)
        self.ctrl = ctrl
        self.auth_config = auth_config

    @asyncio.coroutine
    def _process(self, version_id, reset, rescheduled):
        ses = self.ctrl.session

        with report_error("unable to push version id %d", version_id):

            with ses.begin():
                # get the version object and check its state
                version = ses.query(WebappVersion).filter_by(id=version_id).one()
                if version.state != VersionState.COMMITTED:
                    if version.state in (VersionState.READY, VersionState.REPLACED):
                        # already pushed
                        return
                    if version.state == VersionState.SANDBOX:
                        raise Error("unable to push (image not yet committed)")
                    raise Error("unable to push (invalid state: %s)" % version.state)

                # ensure that there is no other version with the same number in the pipeline
                # (to avoid a race condition)
                others = (ses.query(WebappVersion.id)
                        .filter_by(webapp_id=version.webapp_id, number=version.number)
                        .filter(WebappVersion.id     != version.id)
                        .filter(WebappVersion.state.in_((int(VersionState.SANDBOX), int(VersionState.COMMITTED)))))
                if others.count():
                    raise Error("unable to push (there are other pushable versions with the same number: %s)" % (
                        " ".join(map(str, itertools.chain(*others)))))

                image = self.ctrl.gen_image_name(version.webapp)
                tag   = version.number

            log.info("push from the %-8s %s:%s", "sandbox", image, tag)
            yield from self.run_in_executor(docker_check_error, lambda:
                    self.ctrl.sandbox.push(image, tag, auth_config=self.auth_config))

            reset()

            with ses.begin():
                version = ses.query(WebappVersion).filter_by(id=version_id).one()
                prev = ses.query(WebappVersion).filter_by(
                        webapp_id = version.webapp_id,
                        number    = version.number,
                        state     = int(VersionState.READY)).scalar()
                log.debug("prev version id %r", (prev.id if prev else None))

                if prev is None:
                    # this is a new version
                    version.state = int(VersionState.READY)
                else:
                    # overwrite an existing version
                    for key in "updated_at", "changelog", "published":
                        setattr(prev, key, getattr(version, key))
            
                    # mark this version as replaced
                    version.state = int(VersionState.REPLACED)
                ses.add(version)

class ImageManager:

    def __init__(self, ctrl,
            nb_push_sandbox = NB_PUSH_SANDBOX,
            nb_pull_sandbox = NB_PULL_SANDBOX,
            nb_pull_swarm   = NB_PULL_SWARM,
            *, auth_config=None):

        self.ctrl = ctrl

        self.sandbox_push_manager = PushManager(nb_push_sandbox, ctrl,
                auth_config=auth_config)
        self.sandbox_pull_manager = PullManager(nb_pull_sandbox, ctrl.sandbox, "sandbox",
                auth_config=auth_config)
        self.swarm_pull_manager   = PullManager(nb_pull_swarm,   ctrl.swarm,   "swarm",
                auth_config=auth_config)

    # return a future
    @auto_create_task
    @asyncio.coroutine
    def pull(self, version_id: int, *, swarm=False):
        with report_error("unable to pull version id %d to %s", version_id,
                ("swarm" if swarm else "sandbox")):

            ses = self.ctrl.session
            with ses.begin():
                # get the version object and check its state
                version = ses.query(WebappVersion).filter_by(id=version_id).one()

                image = self.ctrl.gen_image_name(version.webapp)
                tag   = version.number


            if swarm:
                # pull to the swarm

                if version.state == VersionState.COMMITTED:
                    # must be pushed to the registry first
                    yield from self.push(version_id)
                    with ses.begin():
                        version = ses.query(WebappVersion).filter_by(id=version_id).one()

                if version.state not in (VersionState.READY, VersionState.REPLACED):
                    raise Error("bad version state: %s" % version.state)

                yield from self.swarm_pull_manager.process((image, tag))

            else:
                # pull to the sandbox
                if version.state == VersionState.COMMITTED:
                    # do not pull!
                    return

                if version.state not in (VersionState.READY, VersionState.REPLACED):
                    raise Error("bad version state: %s" % version.state)

                yield from self.sandbox_pull_manager.process((image, tag))


    # return a future
    def push(self, version_id: int):
        return self.sandbox_push_manager.process(version_id)

    @asyncio.coroutine
    def shutdown(self, **kw):
        yield from asyncio.gather(
                self.sandbox_pull_manager.shutdown(),
                self.sandbox_push_manager.shutdown(),
                self.swarm_pull_manager.shutdown(),
                return_exceptions=True)


class DockerController:
    def __init__(self, sandbox_host, swarm_host, mysql_host,
            registry, env, datastore_path, sandbox_path,
            toolbox_path, sandbox_network, redis_host,
            config_file="/vol/ro/config.yml",
            ):

        # parse the config file
        with config_reader.ConfigReader(
                open(config_file) if os.path.exists(config_file) else "{}",
                logger = log
                ) as cfg:

            def get_bytes(cfg, key):
                txt = cfg.get(key, type=str)
                if txt is not None:
                    try:
                        docker.utils.parse_bytes(txt)
                    except Exception as e:
                        raise ValueError("%s: %s" % (cfg.path(key), e))
            if cfg is None:
                cfg = config_reader.Mapping({})

            self.cpu_shares         = cfg.get("cpus", None, int)

            dct = cfg.get("registry_auth", None, dict)
            auth_config = {
                    "username": dct.get("username", str, required=True),
                    "password": dct.get("password", str, required=True),
                    } if dct else None

            self.sandbox            = SharedSwarmClient(sandbox_host, config=cfg.get("sandbox", {}, dict), alias="sandbox")
            self.sandbox.aiohttp_session, self.sandbox.aiohttp_url = (
                    make_aiohttp_client(sandbox_host))
            if sandbox_host == swarm_host:
                self.swarm          = self.sandbox
            else:
                self.swarm          = SharedSwarmClient(swarm_host, config=cfg.get("swarm", {}, dict), alias="swarm")
                self.swarm.aiohttp_session, self.swarm.aiohttp_url = (
                        make_aiohttp_client(swarm_host))

        self._db_sessionmaker = sqlalchemy.orm.scoping.scoped_session(
                connect_db(mysql_host))
        self._thread_local = threading.local()

        self.redis_host = redis_host
        self.redis_client = None

        self.image_manager   = ImageManager(self, auth_config=auth_config)
        self.sandbox_manager = SandboxManager(self)
        self.job_manager     = JobManager(self)


        self.registry       = registry
        self.env            = env
        self.datastore_path = datastore_path
        self.sandbox_path   = sandbox_path
        self.toolbox_path   = toolbox_path
        self.sandbox_network= sandbox_network

        self._task               = None
        self._shutdown_requested = None

        img = "busybox:latest"
        try:
            self.sandbox.inspect_image(img)
        except docker.errors.NotFound:
            log.info("pulling docker image %s", img)
            self.sandbox.pull(img)

    def gen_sandbox_name(self, webapp):
        return "%s-sandbox-%s" % (self.env, webapp.docker_name)

    def gen_image_name(self, webapp):
        return "%s/%s" % (self.registry, webapp.docker_name)

    def gen_job_name(self, job):
        return "%s-job-%s-%d-%s" % (self.env, job.queue.name, job.id, job.webapp.docker_name)

    def gen_job_path(self, job):
        return os.path.join(self.datastore_path, str(job.id))

    def gen_factory_name(self, docker_os):
        # NOTE: factory names now refer to an image from the official docker
        # registry. To enforce this (and avoid a user using an allgo image as
        # the docker_os), we explicitely prepend "registry-1.docker.io" in the
        # repository name. Once the registry authentication is clarified and
        # correctly secured we will be able to remove this restriction and
        # allow using any docker images).
        #
        repo = docker_os.docker_name
        if "/" not in repo:
            repo = "library/" + repo
        return "registry-1.docker.io/"+ repo

    def inspect_job_container(self, client, job):
        """inspect the underlying container of a job + safety checks
        
        The purpose of the validation check is to prevent a DoS attack on an
        arbitrary container. The job.container_id shall really be the id of the
        container and its name shall start with "ENV-job".

        return:
         - the result of client.inspect_container() if ok
         - None if the container is not found or if the container name/id is not valid
        """
        assert job.container_id is not None
        cid =  job.container_id
        try:
            ctr = client.inspect_container(cid)
        except docker.errors.NotFound:
            return None

        if cid != ctr["Id"]:
            log.error("job %d references container %r which resolves to a different container id: %r",
                    job.id, cid, ctr["Id"])
        elif not re.match(r"/%s-job-[^/]+\Z" % self.env, ctr["Name"]):
            log.error("job %d references container %r whose name is invalid: %r",
                    job.id, cid, ctr["Name"])
        else:
            return ctr
        return None


    def check_host_path(self, funcname, path, *, nolink=True):
        """Validate a path before using it as an external volume in docker

        - ensure the path is canonical:
            - it is absolute
            - it does not contain any '..' parts
            - it does not contain any symbolic link
        - ensure the path is of the right type (call os.path.`funcname`())
        """

        if not os.path.isabs(path):
            raise Error("host path %r is not absolute" % path)

        if path != os.path.normpath(path):
            raise Error("host path %r is not canonical (os.path.normpath())" % path)

        ctrpath = "/vol/host" + path
        log.debug("ctrpath %r", ctrpath)

        
        func = getattr(os.path, funcname)
        if nolink and os.path.realpath(ctrpath) != os.path.normpath(ctrpath):
            raise Error("host path %r contains a symbolic link" % path)
        
        if not func(ctrpath):
            raise Error("host path %r not found (os.path.%s())" % (path, funcname))


    @property
    def session(self):
        """Return the thread-local sqlalchemy session
        
        WARNING: in async functions, db transaction must not span over any
                 'yield from' statements (to avoid interleaving)
        """
        return self._db_sessionmaker()


    @asyncio.coroutine
    def db_startup(self):
        log.info("wait until the db server is ready")

        ses = self.session
        while True:
            with contextlib.suppress(sqlalchemy.exc.OperationalError):
                log.debug("db ping")
                with ses.begin():
                    ses.execute("SELECT 1")
                break

            with contextlib.suppress(asyncio.TimeoutError):
                yield from asyncio.wait_for(asyncio.shield(self._shutdown_requested), 10)
                return
        
        log.info("db server ready")

        self.check_db(startup=True)

    def check_db(self, *, startup=False):
        log.debug("check_db(startup=%r)", startup)
        try:
            ses = self.session
            with ses.begin():
                if startup:
                    ses.execute("DELETE FROM dj_webapp_versions WHERE state=%d"
                            % VersionState.REPLACED)
                    for version_id, in ses.execute(
                            "SELECT id FROM dj_webapp_versions WHERE state=%d"
                            % VersionState.COMMITTED).fetchall():
                        self.image_manager.push(version_id)

                for webapp_id, in ses.execute("""SELECT dj_webapps.id FROM dj_webapps
                        LEFT JOIN dj_webapp_versions ON dj_webapps.id=dj_webapp_versions.webapp_id
                        WHERE sandbox_state IN (%d,%d) OR state=%d
                        GROUP BY dj_webapps.id""" % (
                        SandboxState.STARTING, SandboxState.STOPPING, VersionState.SANDBOX)).fetchall():

                    self.sandbox_manager.process(webapp_id)

                for job_id, in ses.query(Job.id).filter(Job.state.in_(
                        (JobState.WAITING.value, JobState.RUNNING.value, JobState.ABORTING.value))
                        ).order_by(Job.state):
                    log.debug("schedule job %d", job_id)
                    self.job_manager.process(job_id)

                # TODO schedule the push of images versions in state 'committed'
        except sqlalchemy.exc.OperationalError as e:
            log.error("db error %s", e.orig.args)
            return

    async def periodic_db_check_task(self):
        """Periodic db check
        
        This task is theretically not needed if we take for granted that the
        redis notification channel works properly.
        """
        while True:
            try:
                await asyncio.sleep(DB_CHECK_PERIOD)
                self.check_db()
            except OSError as e:
                log.error("I/O error when polling the db (%s)", e)
            except asyncio.CancelledError:
                log.info("db periodic check task terminated")
                return
            except Exception:
                log.exception("error in the periodic db check task")


    async def redis_notification_listener_task(self):
        """Task listening for redis notifications
        
        - subscribes to REDIS_CHANNEL_CONTROLLER
        - automatically reconnects to the server (with rate limiting)
        """

        async for _ in RateLimiter(60):
            try:
                # create redis connection and subscribe to the notification channel
                conn = await aioredis.create_redis((self.redis_host, 6379))
                sub, = await conn.subscribe(REDIS_CHANNEL_CONTROLLER)
                log.info("subscribed to redis pub/sub channel %r" % REDIS_CHANNEL_CONTROLLER)

                # force a db check (because we may have missed some notifications)
                self.check_db()

                async for msg in sub.iter():
                    log.info("redis notification: %r", msg)
                    try:
                        item_type, item_id_str = msg.split(b":")
                        item_id = int(item_id_str)
                    except ValueError as e:
                        log.warning("ignored malformatted notification: %r (%s)", msg, e)
                        continue

                    if item_type == b"job":
                        self.job_manager.process(item_id)
                    elif item_type == b"sandbox":
                        self.sandbox_manager.process(item_id)
                    else:
                        log.warning("ignored notification for unknown item %r", msg)

            except OSError as e:
                log.error("I/O error in the redis listener (%s)", e)
            except asyncio.CancelledError:
                log.info("notification task terminated")
                return
            except Exception:
                log.exception("error in the redis notification loop")


    def shutdown(self):
        if not self._shutdown_requested.done():
            self._shutdown_requested.set_result(None)
        return self._task


    @asyncio.coroutine
    def _run(self):
        assert self._shutdown_requested is None

        try:
            loop = asyncio.get_event_loop()

            self._shutdown_requested = asyncio.Future()

            loop.add_signal_handler(signal.SIGTERM, self.shutdown)
            loop.add_signal_handler(signal.SIGINT,  self.shutdown)

            yield from self.db_startup()
            tasks = []
            try:
                # note: this call never fails (even if the redis server is down)
                self.redis_client = yield from aioredis.create_reconnecting_redis(
                        (self.redis_host, 6379))

                # start the tasks for receiving notificatiosn
                #  - from the redis PUBSUB channel
                tasks.append(asyncio.ensure_future(self.redis_notification_listener_task()))
                #  - additional periodic check of the db (in case some redis
                #  notifications are missed)
                tasks.append(asyncio.ensure_future(self.periodic_db_check_task()))

                yield from self._shutdown_requested
            finally:
                # graceful shutdown

                for task in tasks:
                    task.cancel()
                self.swarm.shutdown()
                self.sandbox.shutdown()

                # terminate all pending tasks
                yield from asyncio.gather(
                        self.image_manager.shutdown(),
                        self.sandbox_manager.shutdown(),
                        self.job_manager.shutdown(),
                        *tasks,
                        return_exceptions=True)
        finally:
            self.swarm.shutdown(wait=True)
            self.sandbox.shutdown(wait=True)

    def run(self):
        asyncio.get_event_loop().run_until_complete(self._run())