#!/usr/bin/python3 import asyncio import collections from concurrent.futures import ThreadPoolExecutor import contextlib import datetime import enum import itertools import json import logging import re import os import shlex import signal import struct import sys import time import threading import traceback import aiohttp import aioredis import docker import iso8601 import MySQLdb from sqlalchemy import desc import sqlalchemy.orm.scoping import yaml import config_reader from database import * from shared_swarm import SharedSwarmClient, ShuttingDown HOST_PATH="/vol/host/" ################################################## # default number of concurrent tasks # pushing images from the sandbox to the registry NB_PUSH_SANDBOX = 2 # pulling images from the registry to the sandbox NB_PULL_SANDBOX = 2 # pulling images from the registry to the swarm NB_PULL_SWARM = 2 # sandbox actions (start, stop, commit, ...) NB_SANDBOX_TASKS = 4 # jobs NB_JOB_TASKS = 4 # default thread pool (used by most of the tasks) default_executor = ThreadPoolExecutor(10) ################################################## # redis keys # job log REDIS_KEY_JOB_LOG = "log:job:%d" REDIS_KEY_JOB_STATE = "state:job:%d" # pubsub channels for waking up allgo.aio (frontend) and the controller # (ourselves) REDIS_CHANNEL_AIO = "notify:aio" REDIS_CHANNEL_CONTROLLER = "notify:controller" # pubsub messages REDIS_MESSAGE_JOB_UPDATED = "job:%d" REDIS_MESSAGE_WEBAPP_UPDATED = "webapp:%d" ################################################## # interval (in seconds) for polling the db DB_CHECK_PERIOD = 3600 log = logging.getLogger("controller") assert MySQLdb.threadsafety >= 1 class Error(Exception): pass class RateLimiter: """Generator for rate limiting This asynchronous iterator ensures we spend at least `period` seconds in each iteration. usage: async for _ in RateLimiter(60): ... """ def __init__(self, period): assert period > 0 self.period = period self._t0 = time.monotonic()-period def __aiter__(self): return self async def __anext__(self): t1 = time.monotonic() delay = self._t0 - t1 + self.period if delay > 0: log.debug("rate_limit: sleep %f seconds", delay) await asyncio.sleep(delay) self._t0 = t1 + delay else: self._t0 = t1 def docker_check_error(func, *k, **kw): """Wrapper for docker-py methods that produce a stream Methods producing a stram (eg: push, build) do not report all errors by raising exceptions. Some errors are reported later in the stream. This function parses the stream and raise Error() if needed. """ for elem in func(*k, stream=True, **kw): js = json.loads(elem.decode()) if "error" in js: raise Error("push error: " + js["error"]) @contextlib.contextmanager def docker_warning(msg, *k, ignore=None): """Catch docker errors and issue a warning instead""" try: yield except docker.errors.APIError as e: if ignore is None or not isinstance(e, ignore): k += e, log.warning(msg + " (%s)", *k) @contextlib.contextmanager def report_error(fmt, *k): """Context manager for logging exceptions This function logs exceptions (when leaving the context) with log.error() (if the exception inherit from Error) or log.exception() otherwise. The log message is prepended with the string generated by: fmt % k """ try: yield except Exception as e: msg = fmt % k log_func = log.error if isinstance(e, Error) else log.exception log_func("%s (%s)", msg, traceback.format_exception_only(type(e), e)[-1].strip()) raise def disable_future_warning(fut): """Add a dummy callback to a future to prevent asyncio warnings Return: the future The asynicio module log a warning message when the result of a future is not used. This function installs a dummy callback to the future so as to avoid this warning. This is useful for futures whose result *may* be ignored by the application. """ fut.add_done_callback(lambda f: f.exception()) return fut def auto_create_task(func): """Decorator for forcing the creation of a task when a coroutine function is called Return a wrappers that calls the function, create the task on the fly and return it. Also it installs a no-op callback to avoid warnings in case the result is not used. """ assert asyncio.iscoroutinefunction(func) return (lambda *k, **kw: disable_future_warning(asyncio.async(func(*k, **kw)))) def cascade_future(src, dst): """propagate the result of a future to another future This function installs a callback to the future `src`, that propagates its result to the future `dst`. """ def callback(fut): ex = fut.exception() if ex is None: dst.set_result(fut.result()) else: dst.set_exception(ex) if src.done(): callback(src) else: src.add_done_callback(callback) async def _make_aiohttp_client(host): """Create a aiohttp session for connecting to a docker engine `host` is a docker host url (DOCKER_HOST) it may be either a `tcp://` or `unix://` url Return a tuple `(session, url)` where `session` is a aiohttp client session and `url` is the http(s) url of the docker engine """ url = docker.utils.parse_host(host) mo = re.match(r"http\+unix:/+(/.*)", url) if mo is None: # HTTP over TCP connector = None else: # HTTP over unix socket url = "http://DUMMY.localhost" path = mo.group(1) connector = aiohttp.UnixConnector(path) session = aiohttp.ClientSession(connector=connector) return session, url def make_aiohttp_client(host): return asyncio.get_event_loop().run_until_complete(_make_aiohttp_client(host)) class Manager: """A class for scheduling asynchronous jobs on a collection of keys (the one-line summary is not very helpful, sorry) Job submission The job is scheduled on key KEY by calling .process(KEY), the function returns an asyncio.Future object (which will receive the result) If .process(KEY) is called while a job is already running for KEY, then the manager arranges the job to be run a second time in a row for the same key Job implementation This is an abstract class. The job shall be implemented as the coroutine named ._process() in the inherited class. The manager guarantees that ._process() cannot be called multiple times concurrently for the same key In case a shutdown is requested (see .shutdown()): - all jobs that are not yet started are cancelled - running jobs continue until they return or until they try to acquire the internal semaphore (which raises ShuttingDown()) Concurrency The job tasks are all started immediately (whatever the number of existing jobs) It is possible to limit the concurrency to a maximum number of parallel tasks. Manager provides an internal semaphore (the number of tokens is set in .__init__()) The semaphore can be locked in two ways: - by locking the manager: with (yield from self): ... concurrency limited to `nb_tokens` tasks ... - by calling .run_in_executor (this is for running non-async code): yield from self.run_in_executor(func, args...) Shutdown When .shutdown() is called the Manager ensures that all jobs are properly terminated - it cancels all jobs that are not yet started - it prevents starting new jobs - it lets running pending jobs, but interrupts them if they when ther try to acquire the internal semaphore All cancel/interrupted tasks have their future raise ShuttingDown(). .shutdown() returns after all pending jobs are terminated. Thread safety - Manager is *not* thread safe, all public methods must be called from the same thread """ class _Handle: __slots__ = "key", "cur", "nxt", "rescheduled" def __init__(self, nb_tokens=1, *, executor = default_executor, interruptible = False): # {key: _TaskHandler} self._handles = {} self._semaphore = asyncio.Semaphore(nb_tokens) self._shutdown = asyncio.Future() self._executor = executor self._interruptible = interruptible def _create_task(self, hnd): assert hnd.nxt is None def reset(): assert not hnd.cur.done() nxt = hnd.nxt if nxt is not None: cascade_future(hnd.cur, nxt) hnd.nxt = None # recreate the 'rescheduled' future and return it hnd.rescheduled = disable_future_warning(asyncio.Future()) return hnd.rescheduled hnd.rescheduled = disable_future_warning(asyncio.Future()) hnd.cur = asyncio.async(self._process(hnd.key, reset, hnd.rescheduled)) hnd.cur.add_done_callback(lambda fut: self._done(hnd)) log.debug("task scheduled %r %r", self, hnd.key) return hnd.cur def process(self, key): """Schedule the job to be run on key `key` returns an asyncio.Future (that will provide the result of ._process()) """ if self._shutdown.done(): return self._shutdown hnd = self._handles.get(key) if hnd is None: # create handle self._handles[key] = hnd = self._Handle() hnd.key = key hnd.cur = None hnd.nxt = None hnd.rescheduled = None if hnd.cur is None: # create task return self._create_task(hnd) else: # reschedule task if hnd.nxt is None: hnd.nxt = asyncio.Future() if not hnd.rescheduled.done(): hnd.rescheduled.set_result(None) return hnd.nxt def _done(self, hnd): assert hnd is self._handles.get(hnd.key) assert hnd.cur.done() try: hnd.cur.result() except ShuttingDown: pass except Exception: log.exception("task %r %r unhandled exception", self, hnd.key) nxt = hnd.nxt hnd.cur = hnd.nxt = None if nxt is None: del self._handles[hnd.key] else: cascade_future(self._create_task(hnd), nxt) @asyncio.coroutine def __iter__(self): """Coroutine for locking the internal semaphore Usage: with (yield from manager): ... Warning: after a shutdown is initiated, this function will always raise ShuttingDown() """ ctx = yield from iter(self._semaphore) if self._shutdown.done(): with ctx: raise ShuttingDown() return ctx @asyncio.coroutine def run_in_executor(self, *k, lock=True): """Run a function in a separate thread (with limited concurrency) This function locks the internal semaphore and runs the provided functionc call in a separate thread (using the executor) """ def run(): coro = asyncio.get_event_loop().run_in_executor(self._executor, *k) if self._interruptible: return next(asyncio.as_completed((coro, self._shutdown))) else: return coro if lock: with (yield from self): return (yield from run()) else: return (yield from run()) @asyncio.coroutine def _process(self, key, reset, rescheduled): """Actual implementation of the job (to be reimplemented in inherited classes) The Manager class guarantees that this function cannot be called multiple times concurrently on the same key (in case the same key is submitted multiple times, they Manager will call this function a second time after it has terminated). `rescheduled` is a future whose result is set when the job is being rescheduled (if process(key) is called before _process(key ...) terminates. `reset` is a function that may be called to reset the 'dirty' state of this key (this is to avoid calling ._process() a second time if not necessary) in that case, a new `rescheduled` future is returned. """ raise NotImplementedError() @asyncio.coroutine def shutdown(self): """Initiate a graceful shutdown This coroutine terminates once all tasks are properly terminated. """ exc = ShuttingDown() self._shutdown.set_exception(exc) self._shutdown.exception() # to avoid asyncio warnings # cancel all 'next' tasks for hnd in self._handles.values(): if hnd.nxt is not None: hnd.nxt.set_exception(exc) hnd.nxt = None if not self._interruptible: yield from asyncio.gather( *(h.cur for h in self._handles.values() if h.cur is not None), return_exceptions=True) class SandboxManager(Manager): """Manager for sandbox operation This manager handles all sandbox operations (start, stop, commit). Operations are requested asynchronously in the database: - Webapp.sandbox_state=starting (for starting a sandbox) - Webapp.sandbox_state=stopping (for stopping a sandbox) - WebappVersion with state=sandbox (for committing a new image) All state changes on Webapp.sandbox_state are atomic (e.g: if the user requests a stop while the sandbox is starting then the manager finishes with starting the sandbox but does not update the state, and it runs immediately again to stop the sandbox) Whatever is the value of Webapp.sandbox_state, the manager first examines commit requests and makes the commit if requested. If the container already exists before starting the webapp (not possible in normal operations), then a recovery image is committed first. When a commit is successful. The image manager is notified for pushing the image to the registry and (if it is not a recovery image) to pull it to the swarm (because this image will very likely be used soon). State changes: - sandbox start: starting->running (normal case) starting->start_error (error case) - sandbox stop: stopping->idle (normal case) stopping->stop_error (error case) - image commit: sandbox->committed (normal case) sandbox->error (error case) (none)->committed (recovery version) """ def __init__(self, ctrl, nb_threads = NB_SANDBOX_TASKS): super().__init__(nb_threads) self.ctrl = ctrl def inspect_sandbox(self, webapp): try: return self.ctrl.sandbox.inspect_container( self.ctrl.gen_sandbox_name(webapp)) except docker.errors.NotFound: return None @staticmethod def filter_commit_version(query, webapp_id): """Narrow a WebappVersion query to select the candidate versions to be committed""" return (query. filter_by(webapp_id=webapp_id, state = int(VersionState.SANDBOX)) ) def _start(self, webapp, version): """Start a webapp sandbox (to be executed in a thread pool) """ ctrl = self.ctrl ses = ctrl.session # prepare sandbox parameters # docker image if version is None: image = "%s:%s" % (ctrl.gen_factory_name(webapp.docker_os), webapp.docker_os.version) else: image = "%s:%s" % (webapp.image_name, version.number) log.debug("sandbox %r: using image %r", webapp.docker_name, image) # safety checks # (because docker_name is used it the paths of the external volumes if ("/" in webapp.docker_name) or (webapp.docker_name in ("", ".", "..")): raise Error("malformatted docker_name") uid = webapp.id + 2000 if uid < 2000: # just for safety raise Error("bad webapp id") # remove stale container (if any) if self.inspect_sandbox(webapp) is not None: self._stop(webapp) container = webapp.sandbox_name try: # prepare the sandbox # (create ssh keys) ctrl.check_host_path("isdir", ctrl.toolbox_path) ctrl.check_host_path("isdir", ctrl.sandbox_path) ctrl.sandbox.create_container("busybox:latest", name=container, command = ["/bin/sh", "-c", """ set -ex export PATH="$PATH:/.toolbox/bin" # clean sandbox dir rm -rf {sbx} # create dirs for dir in {sbx} {etc} {run} do mkdir -p ${{dir}} chown {uid}:65534 ${{dir}} chmod 0700 ${{dir}} done # xauth file touch {run}/XAuthority chown {uid}:65534 {run}/XAuthority chmod 0600 {run}/XAuthority # generate ssh keys (for type in ecdsa ed25519 rsa do key={etc}/ssh_host_${{type}}_key [ -f $key ] || ssh-keygen -N '' -f $key -t $type >&2 echo -n '{hostname}. ' | cat - ${{key}}.pub done) > {etc}/ssh_known_hosts # known_host file for allgo-shell ssh-keygen -H -f {etc}/ssh_known_hosts chmod 0644 {etc}/ssh_known_hosts rm -f {etc}/ssh_known_hosts.old # authentication key for allgo-shell rm -f {etc}/identity ssh-keygen -N '' -f {etc}/identity chown {uid}:65534 {etc}/identity # forced shell for the sshd config cat > {etc}/shell <<EOF #!/bin/sh export PATH="\$PATH:/.toolbox/bin" uid=\`id -u\` shell="\`getent passwd \$uid 2>/dev/null| cut -d : -f 7\`" if [ -z "\$shell" ] ; then shell=/bin/sh fi if [ -n "\$SSH_ORIGINAL_COMMAND" ] ; then exec "\$shell" -c "\$SSH_ORIGINAL_COMMAND" else exec "\$shell" fi EOF chmod 755 {etc}/shell # sshd config cat > {etc}/sshd_config <<EOF Port 22 Protocol 2 # turned off because it requires creating a 'sshd' user inside the sandbox UsePrivilegeSeparation no StrictModes no ForceCommand /.sandbox/etc/ssh/shell PermitRootLogin without-password PubkeyAuthentication yes AuthorizedKeysFile /.sandbox/etc/ssh/identity.pub .ssh/authorized_keys .ssh/authorized_keys2 ChallengeResponseAuthentication no PasswordAuthentication no X11Forwarding yes X11DisplayOffset 10 PrintMotd no PrintLastLog no TCPKeepAlive yes # Allow client to pass locale environment variables AcceptEnv LANG LC_* Subsystem sftp internal-sftp UsePAM no EOF """.format(uid=uid, hostname = "%s-sandbox-%s" % (ctrl.env, webapp.docker_name), sbx = "/mnt/%s" % webapp.docker_name, etc = "/mnt/%s/etc/ssh" % webapp.docker_name, run = "/mnt/%s/run" % webapp.docker_name, )], host_config = ctrl.sandbox.create_host_config( binds = { ctrl.sandbox_path: {"bind": "/mnt"}, ctrl.toolbox_path: {"bind": "/.toolbox", "mode": "ro"}, })) ctrl.sandbox.start(container) if ctrl.sandbox.wait(container): log.debug("sandbox %s output:\n%s", webapp.docker_name, ctrl.sandbox.logs(container).decode(errors="replace")) raise Error("sandbox preparation failed") ctrl.sandbox.remove_container(container) # create and start the sandbox etc_dir = os.path.join(ctrl.sandbox_path, webapp.docker_name, "etc") run_dir = os.path.join(ctrl.sandbox_path, webapp.docker_name, "run") ctrl.check_host_path("isdir", etc_dir) ctrl.check_host_path("isdir", run_dir) if version is None and webapp.entrypoint: # prepend instructions to initialise a dummy entrypoint dn, bn = os.path.split(webapp.entrypoint) # FIXME: do nothing if entrypoint already exists prepare = """ {mkdir} test -f {entrypoint} || cat > {entrypoint} <<EOF #!/bin/sh echo echo "This is app '{name}' called with parameters '\$@'" echo echo "The workdir contains:" ls -l EOF chmod 0755 -- {entrypoint} """.format( entrypoint = shlex.quote(webapp.entrypoint), name = webapp.docker_name, mkdir = (("mkdir -p -- %s" % shlex.quote(dn)) if dn else "")) else: prepare = "" command = ["/bin/sh", "-c", """ set -x export PATH="$PATH:/.toolbox/bin" {prepare} # xauth file (needed for X11 forwarding) touch /root/.Xauthority chmod 600 /root/.Xauthority exec /.toolbox/bin/sshd -D """.format(prepare=prepare)] ctrl.sandbox.create_container(image, name=container, hostname=container, command = command, host_config = ctrl.sandbox.create_host_config( binds = { etc_dir: {"bind": "/.sandbox/etc", "mode": "ro"}, run_dir: {"bind": "/.sandbox/run", "mode": "rw"}, ctrl.toolbox_path: {"bind": "/.toolbox", "mode": "ro"}, }, # TODO: maybe drop other caps cap_drop = ["NET_RAW"], restart_policy = {"Name": "unless-stopped"}, network_mode = ctrl.sandbox_network, )) ctrl.sandbox.start(container) except: with docker_warning("cleanup error: unable to remove container %r", container, ignore=docker.errors.NotFound): ctrl.sandbox.remove_container(container, force=True) raise def _commit(self, webapp, versions): """Commit a webapp sandbox (to be executed in a thread pool) The image version is looked up in webapp_versions (where state==sandbox). In case of any error, a recovery version is committed instead (to avoid loosing the work done inside the sandbox) and the candidates are put in error state. """ ctrl = self.ctrl ses = ctrl.session # pre-commit checks # - ensure that there is exactly one candidate webapp_version with # state=sandbox # - otherwise: # - put all candidates in error state # - create a recovery version # version be committed version = None # error msg (if any) error = None # version ids to be recovered recover = () if len(versions) == 1: # normal case (sandbox commit) version = versions[0] if not version.number: error = "empty version number" recover = version.id, elif not versions: # sandbox rollback (when user drops a sandbox without committing a new image) error = "dangling sandbox" else: # multiple candidates (should never happen) error = "multiple candidate versions (%s)" % ( ", ".join(map(repr, sorted(v.number for v in versions)))) recover = tuple(v.id for v in versions) # TODO: make 'sandbox' a reserved name if error: changelog = "pre-commit error: " + error log.error("sandbox %r version id %r: %s", webapp.docker_name, recover, changelog) with ses.begin(): # put all candidates in 'error state' if recover: ses.execute('''UPDATE dj_webapp_versions SET changelog=CONCAT(changelog, " [", :changelog, "]"), state=:state WHERE id IN :ids''', dict(changelog=changelog, ids=recover, state=int(VersionState.ERROR))) # create a recovery version version = WebappVersion( webapp_id = webapp.id, number = time.strftime("recovery-%Y%m%d-%H%M%S"), changelog = changelog, published = False, state = int(VersionState.SANDBOX)) ses.add(version) ses.refresh(version) ses.expunge(version) assert version is not None # commit the docker image log.debug("dicts %r %r", webapp.__dict__, version.__dict__) log.info("commit sandbox %r version %r", webapp.docker_name, version.number) container = webapp.sandbox_name next_state = image_size = None try: # stop the container (if stopping or if creating a new sandbox) if webapp.sandbox_state in (SandboxState.STOPPING, SandboxState.STARTING): ctrl.sandbox.stop(container) ctrl.sandbox.wait(container) # commit cid = ctrl.sandbox.commit(container, webapp.image_name, version.number) next_state = VersionState.COMMITTED image_size = ctrl.sandbox.inspect_image(cid)["Size"] return version, error except docker.errors.NotFound: error = "commit error: container not found %r" % container log.error("%s", error) next_state = VersionState.ERROR image_size = 0 ses.execute('''UPDATE dj_webapp_versions SET changelog=CONCAT(changelog, " [commit error: sandbox is down]") WHERE id=%d''' % version.id) # here we do not propagate the error to allow starting/stopping the # sandbox immediately (without going through sandbox_state=:error) return None, error except Exception as e: log.exception("sandbox %r version %r: unexpected commit error (sandbox may still be recovered)", webapp.docker_name, version.number) raise finally: # NOTE: if anything unexpected happens, the version is # left in state 'sandbox' and we propagate the exception to # ensure the work done inside the sandbox is not dropped # and the sandbox is pute in 'error' state # # The error will not be reported to the user. If this is an issue, # then the solution would be to create another error state (to be # used when the sandbox is still there). # if next_state is not None: with ses.begin(): ses.execute("UPDATE dj_webapp_versions SET state=%d, docker_image_size=%d WHERE id=%d" % (next_state, image_size, version.id)) @asyncio.coroutine def _manage_commit(self, webapp, versions, *, force=False): """Manage sandbox commit (if needed) and notify the image manager The commit is performed if one of these conditions is fulfilled: - a commit was requested (there is at least one WebappVersion entry with state=sandbox for this app) - a docker container exists (for this sandbox) and force is true """ if not (versions or (force and self.inspect_sandbox(webapp) is not None)): return # do the commit version, error = yield from self.run_in_executor(self._commit, webapp, versions) # trigger push/pull operations (but do not wait) if version is not None: # push to the registry self.ctrl.image_manager.push(version.id) if not error: # preemptive pull to the swarm # (the image may be needed soon) self.ctrl.image_manager.pull(version.id, swarm=True) def _stop(self, webapp): """Stop a webapp sandbox (to be executed in a thread pool) """ try: # FIXME: remove volumes (v=True) too ? self.ctrl.sandbox.remove_container(webapp.sandbox_name, force=True) except docker.errors.NotFound: pass @asyncio.coroutine def _process(self, webapp_id, reset, rescheduled): ctrl = self.ctrl log.debug("process sandbox %d", webapp_id) ses = ctrl.session with ses.begin(): # current state of the sandbox + load docker os webapp = ses.query(Webapp).filter_by(id=webapp_id).one() webapp.docker_os # version to be started sandbox_version = webapp.sandbox_version # requested commits commit_versions = self.filter_commit_version(ses.query(WebappVersion), webapp_id).all() ses.expunge_all() # docker name of the sandbox & image webapp.sandbox_name = ctrl.gen_sandbox_name(webapp) webapp.image_name = ctrl.gen_image_name(webapp) phase = "inspect" next_state = fail_state = None try: if webapp.sandbox_state == SandboxState.STARTING: # start the sandbox phase = "start" next_state = SandboxState.RUNNING fail_state = SandboxState.START_ERROR # commit (if a sandbox exists) yield from self._manage_commit(webapp, commit_versions, force=True) if sandbox_version is not None: # ensure version belongs to this application if sandbox_version.webapp_id != webapp.id: raise Error("invalid version id %d (belongs to webapp %d)" % ( sandbox_version.id, sandbox_version.webapp_id)) # pull requested image yield from ctrl.image_manager.pull(sandbox_version.id) else: # pull image yield from ctrl.image_manager.sandbox_pull_manager.process(( ctrl.gen_factory_name(webapp.docker_os), webapp.docker_os.version)) # start sandbox yield from self.run_in_executor(self._start, webapp, sandbox_version) elif webapp.sandbox_state == SandboxState.STOPPING: # stop the sandbox phase = "stop" next_state = SandboxState.IDLE fail_state = SandboxState.STOP_ERROR # commit (if requested) yield from self._manage_commit(webapp, commit_versions) yield from self.run_in_executor(self._stop, webapp) else: # commit (if requested) phase = "commit" yield from self._manage_commit(webapp, commit_versions) except ShuttingDown: next_state = None log.info("sandbox %r %s aborted (controller shutdown)", webapp.docker_name, phase) except BaseException as e: next_state = fail_state log_func = log.error if isinstance(e, (docker.errors.APIError, Error)) else log.exception log_func ("sandbox %r %s error (%s)", webapp.docker_name, phase, traceback.format_exception_only(type(e), e)[-1].strip()) finally: if next_state is not None: # atomically update the sandbox state in the db # (in case another action is requested during the process, eg: the user # stops the sandbox while it is not fully started) log.info("sandbox %r is now in state %r", webapp.docker_name, next_state.name) with ses.begin(): ses.execute("UPDATE dj_webapps SET sandbox_state=%d WHERE id=%d AND sandbox_state=%d" % (next_state, webapp_id, webapp.sandbox_state)) log.debug("done sandbox %d", webapp_id) class JobManager(Manager): class JobInfo: __slots__ = "job_id", "ver_id", "ctr_id", "version", "ctr_name", "client", "cpu", "mem", "node_id", "timeout" def __init__(self, ctrl): super().__init__(0) self.ctrl = ctrl @asyncio.coroutine def __iter__(self): raise NotImplementedError() def _create_job(self, info): ctrl = self.ctrl ses = ctrl.session tmp_img = None assert info.ctr_id is None try: with ses.begin(): job = ses.query(Job).filter_by(id=info.job_id).one() webapp = job.webapp log.info("start job %d (%s:%s)", info.job_id, webapp.docker_name, info.version) job.state = int(JobState.RUNNING) # pragma: nobranch (TODO: remove (coverage bug)) repo = ctrl.gen_image_name(webapp) image = "%s:%s" % (repo, info.version) job_path = ctrl.gen_job_path(job) log.debug("job.path: %r", job_path) if info.ver_id is None: assert info.version == "sandbox" image = tmp_img = info.client.commit(ctrl.gen_sandbox_name(webapp), repo, info.version)["Id"] # TODO use another workdir # TODO use another uid ctrl.check_host_path("isdir", job_path) hc = ctrl.sandbox.create_host_config( binds = {job_path: {"bind": "/tmp"}}, cap_drop = ["all"], # FIXME: CAP_DAC_OVERRIDE needed because all nfs files have uid,gid=1000,1000 cap_add = ["dac_override"], cpu_quota = (None if info.cpu is None else (info.cpu * 1024)), cpu_period = (None if info.cpu is None else 1024), # cpu_shares = info.cpu, mem_limit = info.mem, ) # NOTE: cpu_shares has a different meaining in docker swarm and docker engine # - swarm: nb of cpus # - engine: 1/1024 share of the total cpu resouces of the machine # engine requires cpu_share>1 if ctrl.cpu_shares: # TODO: upgrade docker-py (and use create_host_config) hc["CpuShares"] = info.cpu log.debug("host_config %r", hc) info.ctr_id = info.client.create_container(image, name=info.ctr_name, working_dir = "/tmp", # NOTE: the command line is a little complex, but this is # to ensure that (TODO write tests for this): # - no output is lost (we go though a pipe in case the # app has multiple processes writing to stdout/stderr # concurrently) # FIXME: maybe ">>allgo.log 2>&1" is sufficent # - we get the exit code of the app (not the exit code of # cat) # - SIGTERM & SIGALRM are forwarded to the process (and # we call wait again becauce of EINTR) # - we have no unusual dependencies (only sh, cat, trap, # kill and mkfifo) # - we display a warning if the memory limit was reached # during the job command = ["/bin/sh", "-c", """ pid= interrupted= sighnd() {{ (echo echo "==== ALLGO JOB $2 ====" kill "-$1" "$pid") 2>&1 | tee -a allgo.log trap '' TERM ALRM interrupted=1 }} trap "sighnd TERM ABORT" TERM trap "sighnd ALRM TIMEOUT" ALRM fifo=/.allgo.fifo.{job_id} mkfifo "$fifo" 2>&1 | tee -a allgo.log || exit $? exec cat <"$fifo" | tee -a allgo.log & exec "$@" >"$fifo" 2>&1 & pid=$! wait %2 code=$? if [ -n "$interrupted" ] ; then wait %2 code=$? fi wait %1 trap '' TERM ALRM [ -n "$interrupted" ] || ( echo if [ $code -eq 0 ] ; then echo "==== ALLGO JOB SUCCESS ====" else echo "==== ALLGO JOB ERROR ====" echo "process exited with code $code" fi failcnt="`cat /sys/fs/cgroup/memory/memory.failcnt`" if [ "$failcnt" -ne 0 ] ; then echo "WARNING: memory limit was reached (memory.failcnt=$failcnt)" fi ) | tee -a allgo.log exit $code """.format(job_id=job.id), "job%d" % job.id, webapp.entrypoint] + shlex.split(job.param), labels = {"allgo.tmp_img": tmp_img or ""}, environment=["constraint:node==" + info.node_id], host_config = hc)["Id"] info.client.start(info.ctr_id) with ses.begin(): # save the container_id into the db job.container_id = info.ctr_id except: #TODO introduce a state JobState.ERROR self._remove_job(info, tmp_img=tmp_img) raise def _remove_job(self, info, *, tmp_img=None): ses = self.ctrl.session # TODO: report launch errors to the user # TODO: report exit code to the user # TODO: use another uid def parse_docker_timestamp(value): return datetime.datetime.strptime( # limit the precision to the microsecond # (othewise strptime fails) re.sub(r"(\.\d{,6})\d*Z$", r"\1Z", value), # iso8601 format "%Y-%m-%dT%H:%M:%S.%fZ") with ses.begin(): job = ses.query(Job).filter_by(id=info.job_id).one() exec_time = 0.0 if info.ctr_id is not None: try: js = info.client.inspect_container(info.ctr_id) except docker.errors.NotFound: pass else: started_at = js["State"].get("StartedAt", "0001-") finished_at = js["State"].get("FinishedAt") # default docker date is '0001-01-01T00:00:00Z' if not started_at.startswith("0001-"): try: exec_time =( parse_docker_timestamp(finished_at) - parse_docker_timestamp(started_at) ).total_seconds() except Exception: # pragma: nocover log.exception("job %d: unable to compute exec time", info.job_id) if tmp_img is None: tmp_img = js["Config"]["Labels"].get("allgo.tmp_img") or None with docker_warning("job %d: cleanup error: unable to remove container", info.job_id): info.client.remove_container(info.ctr_id) if tmp_img is not None: with docker_warning("job %d: cleanup error: unable to remove tmp image", info.job_id): info.client.remove_image(tmp_img) job.exec_time = exec_time job.state = int(JobState.DONE) job.container_id = None if job.result == JobResult.NONE: # FIXME: maybe we should have a 'unknown' result log.warning("job %d has not result, will fallback to 'ERROR'", info.job_id) job.result = int(JobResult.ERROR) log.info("done job %d (result=%s, duration=%fs)", info.job_id, JobResult(job.result).name, exec_time) @asyncio.coroutine def _finish_job(self, info, reset): # wait for container termination (if running) if info.ctr_id is not None: def kill(sig): log.debug("kill job %d (signal %d)", info.job_id, sig) with docker_warning("unable to kill container %r", info.ctr_id): info.client.kill(info.ctr_id, sig) @asyncio.coroutine def stop(sig, reason): log.info("stop job %d (%s)", info.job_id, reason) try: # graceful kill kill(sig) yield from asyncio.wait_for(asyncio.shield(wait_task), timeout=5) except asyncio.TimeoutError: # hard kill (after 5 seconds) kill(signal.SIGKILL) log_task = asyncio.ensure_future(self._log_task(info)) wait_task = asyncio.async(info.client.wait_async(info.ctr_id)) timeout_task = (asyncio.Future() if info.timeout is None else asyncio.async(asyncio.sleep(info.timeout))) result = None try: rescheduled = None while not wait_task.done(): # we must ensure that the job is not aborted or if the # timeout expires if rescheduled is None or rescheduled.done(): # check the job state in the db rescheduled = reset() ses = self.ctrl.session with ses.begin(): state, = ses.query(Job.state).filter_by(id=info.job_id).one() if state == JobState.ABORTING: yield from self._notif_job_state(info, "ABORTING") yield from stop(signal.SIGTERM, "user abort") result = result or JobResult.ABORTED elif timeout_task.done(): # timeout ! yield from stop(signal.SIGALRM, "timeout") result = result or JobResult.TIMEOUT yield from asyncio.wait((wait_task, timeout_task, rescheduled), return_when=asyncio.FIRST_COMPLETED) returncode = wait_task.result() log.debug("job %d exit code: %r", info.job_id, returncode) result = result or (JobResult.SUCCESS if returncode==0 else JobResult.ERROR) except: # note: we do not cancel the log task on normal termination # (we let it finish reading the logs but in the background) log_task.cancel() raise finally: wait_task.cancel() timeout_task.cancel() with contextlib.suppress(asyncio.CancelledError): yield from wait_task if result is not None: ses = self.ctrl.session with ses.begin(): ses.execute("UPDATE dj_jobs SET result=%d WHERE id=%d AND result=%d" % (int(result), info.job_id, int(JobResult.NONE))) # remove container yield from self.run_in_executor(self._remove_job, info, lock=False) # task that streams the logs from the docker engine into the redis database async def _log_task(self, info): try: offset = 0 log_key = REDIS_KEY_JOB_LOG % info.job_id notif_msg = REDIS_MESSAGE_JOB_UPDATED % info.job_id timeout = 3600 if info.timeout is None else (info.timeout + 60) if timeout <= 0: timeout = 600 log.debug("job %d: start log task (timeout %.1f seconds)", info.job_id, timeout) async with info.client.aiohttp_session.get( "%s/containers/%s/logs" % (info.client.aiohttp_url, info.ctr_id), params={"follow": "1", "stdout": "1", "stderr": "1"}, timeout=timeout) as rep: rep.raise_for_status() while True: # read the header (8-byte) and extract the chunk size hdr = await rep.content.read(8) if not hdr: break size, = struct.unpack("!L", hdr[4:]) #read the chunk data = await rep.content.read(size) # store it in redis await self.ctrl.redis_client.setrange(log_key, offset, data) await self.ctrl.redis_client.publish(REDIS_CHANNEL_AIO, notif_msg) offset += len(data) # send eof await self.ctrl.redis_client.setrange(log_key, offset, b"\x04") await self.ctrl.redis_client.publish(REDIS_CHANNEL_AIO, notif_msg) except Exception as e: # NOTE: if the redis server goes down, then the log_task is totally # interrupted log.exception("job %d: exception in log task", info.job_id) finally: log.debug("job %d: log task terminated (%r bytes logged)", info.job_id, offset) await self.ctrl.redis_client.expire(log_key, 86400) @asyncio.coroutine def _process(self, job_id, reset, rescheduled): ctrl = self.ctrl ses = ctrl.session log.debug("process job id %d", job_id) with ses.begin(): # query db job = ses.query(Job).filter_by(id=job_id).first() if job is None: # pragma: nocover # unknown job log.warning("unknown job id %d", job_id) return state = JobState(job.state) if job.webapp is None: log.error("job %d: webapp id %r not found", job_id, job.webapp_id) if state == JobState.WAITING: # pragma: nobranch job.state = int(JobState.DONE) job.result = int(JobResult.ERROR) job.exec_time = 0 # TODO report error to the user ? return docker_name = job.webapp.docker_name info = self.JobInfo() info.job_id = job_id info.ctr_id = None info.node_id = None info.version = job.version info.ctr_name = ctrl.gen_job_name(job) info.timeout = job.queue.timeout # NOTE: .cpu .mem are the amount of cpu/mem requested when creating # a new job. They do not apply to already created jobs (by a # previous instance of the controller) info.cpu = ctrl.cpu_shares info.mem = job.webapp.memory_limit if job.version == "sandbox": info.client = ctrl.sandbox else: info.client = ctrl.swarm if state == JobState.WAITING: # job is not yet started if job.container_id is not None: log.warning("job %d is in state WAITING but already has a container id: %r (will be ignored)", job.id, job.container_id) if job.version == "sandbox": # to be run in the sandbox info.ver_id = None else: # to be run in the swarm # Find the wanted WebappVersion #TODO: replace version_id with webapp_version_id ver = ses.query(WebappVersion).filter_by( webapp_id = job.webapp_id, number = job.version).filter( WebappVersion.state.in_(( int(VersionState.COMMITTED), int(VersionState.READY))) ).order_by( WebappVersion.state.desc(), WebappVersion.id.desc()).first() if ver is None: log.error("job %d: webapp %r version %r not found", job_id, job.webapp.docker_name, job.version) job.state = int(JobState.DONE) # TODO report error to the user job.result= int(JobResult.ERROR) return info.ver_id = ver.id elif state in (JobState.RUNNING, JobState.ABORTING): # pragma: nobranch # job is already started # we do not care about the actual version_id *but* we need to # know whether we are in the swarm or in the sandbox info.ver_id = None if job.version == "sandbox" else -1 # inspect the container so as to: # - validate and store its id # - adjust the timeout if necessary if job.container_id is None: # pre-migration jobs # FIXME: to be removed after migration (the container id is now stored in the db at creation time) try: js = info.client.inspect_container("%s-job-%d-%s" % (ctrl.env, job.id, job.webapp.docker_name)) except docker.errors.NotFound: js = None else: # check the presence of the container and validates its id against its name js = ctrl.inspect_job_container(info.client, job) if js is not None: # save the id info.ctr_id = js["Id"] # adjust the timeout if info.timeout is not None: uptime = (datetime.datetime.now(datetime.timezone.utc) - iso8601.parse_date(js["State"]["StartedAt"])).seconds info.timeout -= uptime log.debug("job %d uptime: %.1fs, adjusted timeout is: %.1f", info.job_id, uptime, info.timeout) else: # unexpected state if state != JobState.DONE: log.warning("job id %d is in unexpected state %s", job_id, state.name) return if state == JobState.WAITING: # job is not yet started yield from self._notif_job_state(info, "WAITING") # pull the image to the swarm if info.ver_id is not None: # NOTE: race condition: will fail if ver.state==sandbox # jobs must be submitted after the image is committed yield from ctrl.image_manager.pull(info.ver_id, swarm=True) # request a slot from the shared swarm with info.client.request_slot(info.ctr_name, info.cpu or 0, info.mem or 0): info.node_id = yield from info.client.wait_slot(info.ctr_name) yield from self.run_in_executor(self._create_job, info, lock=False) yield from self._notif_job_state(info, "RUNNING") yield from self._finish_job(info, reset) elif state in (JobState.RUNNING, JobState.ABORTING): # pragma: nobranch # the job is already running # -> wait for its termination yield from self._finish_job(info, reset) yield from self._notif_job_state(info, "DONE") # send a notification to the aio frontend when the job state is changed # # this function never fail (redis errors are caught and written in the # logs) async def _notif_job_state(self, info, state): try: log.info("notify the frontend: job %d state is now %r", info.job_id, state) await self.ctrl.redis_client.setex(REDIS_KEY_JOB_STATE % info.job_id, 86400, state) await self.ctrl.redis_client.publish(REDIS_CHANNEL_AIO, REDIS_MESSAGE_JOB_UPDATED % info.job_id) except asyncio.CancelledError: pass except Exception as e: log.error("redis notification failed for job %d (%e)", info.job_id, e) # NOTE: for the push/pull managers, interruptible=True guarantees that the # managers terminate immediately, however it cannot guarantee that the # process will terminate immediately because the ThreadPoolExecuter installs # a atexit handler that joins all the background threads. # # Anyway this is not a big issue since all pending push/pull raise # ShuttingDown immediately, thus we won't end up with a sandbox/job # in an inconsistent state when SIGKILL arrives. # class PullManager(Manager): def __init__(self, nb_threads, client, name, *, auth_config=None): super().__init__(nb_threads, interruptible=True) self.client = client self.name = name self.auth_config = auth_config @asyncio.coroutine def _process(self, img, reset, rescheduled): image, version = img log.info("pull to the %-10s %s:%s", self.name, image, version) return self.run_in_executor(lambda: self.client.pull(image, version, auth_config=self.auth_config)) class PushManager(Manager): def __init__(self, nb_threads, ctrl, *, auth_config=None): super().__init__(nb_threads, interruptible=True) self.ctrl = ctrl self.auth_config = auth_config @asyncio.coroutine def _process(self, version_id, reset, rescheduled): ses = self.ctrl.session with report_error("unable to push version id %d", version_id): with ses.begin(): # get the version object and check its state version = ses.query(WebappVersion).filter_by(id=version_id).one() if version.state != VersionState.COMMITTED: if version.state in (VersionState.READY, VersionState.REPLACED): # already pushed return if version.state == VersionState.SANDBOX: raise Error("unable to push (image not yet committed)") raise Error("unable to push (invalid state: %s)" % version.state) # ensure that there is no other version with the same number in the pipeline # (to avoid a race condition) others = (ses.query(WebappVersion.id) .filter_by(webapp_id=version.webapp_id, number=version.number) .filter(WebappVersion.id != version.id) .filter(WebappVersion.state.in_((int(VersionState.SANDBOX), int(VersionState.COMMITTED))))) if others.count(): raise Error("unable to push (there are other pushable versions with the same number: %s)" % ( " ".join(map(str, itertools.chain(*others))))) image = self.ctrl.gen_image_name(version.webapp) tag = version.number log.info("push from the %-8s %s:%s", "sandbox", image, tag) yield from self.run_in_executor(docker_check_error, lambda: self.ctrl.sandbox.push(image, tag, auth_config=self.auth_config)) reset() with ses.begin(): version = ses.query(WebappVersion).filter_by(id=version_id).one() prev = ses.query(WebappVersion).filter_by( webapp_id = version.webapp_id, number = version.number, state = int(VersionState.READY)).scalar() log.debug("prev version id %r", (prev.id if prev else None)) if prev is None: # this is a new version version.state = int(VersionState.READY) else: # overwrite an existing version for key in "updated_at", "changelog", "published": setattr(prev, key, getattr(version, key)) # mark this version as replaced version.state = int(VersionState.REPLACED) ses.add(version) class ImageManager: def __init__(self, ctrl, nb_push_sandbox = NB_PUSH_SANDBOX, nb_pull_sandbox = NB_PULL_SANDBOX, nb_pull_swarm = NB_PULL_SWARM, *, auth_config=None): self.ctrl = ctrl self.sandbox_push_manager = PushManager(nb_push_sandbox, ctrl, auth_config=auth_config) self.sandbox_pull_manager = PullManager(nb_pull_sandbox, ctrl.sandbox, "sandbox", auth_config=auth_config) self.swarm_pull_manager = PullManager(nb_pull_swarm, ctrl.swarm, "swarm", auth_config=auth_config) # return a future @auto_create_task @asyncio.coroutine def pull(self, version_id: int, *, swarm=False): with report_error("unable to pull version id %d to %s", version_id, ("swarm" if swarm else "sandbox")): ses = self.ctrl.session with ses.begin(): # get the version object and check its state version = ses.query(WebappVersion).filter_by(id=version_id).one() image = self.ctrl.gen_image_name(version.webapp) tag = version.number if swarm: # pull to the swarm if version.state == VersionState.COMMITTED: # must be pushed to the registry first yield from self.push(version_id) with ses.begin(): version = ses.query(WebappVersion).filter_by(id=version_id).one() if version.state not in (VersionState.READY, VersionState.REPLACED): raise Error("bad version state: %s" % version.state) yield from self.swarm_pull_manager.process((image, tag)) else: # pull to the sandbox if version.state == VersionState.COMMITTED: # do not pull! return if version.state not in (VersionState.READY, VersionState.REPLACED): raise Error("bad version state: %s" % version.state) yield from self.sandbox_pull_manager.process((image, tag)) # return a future def push(self, version_id: int): return self.sandbox_push_manager.process(version_id) @asyncio.coroutine def shutdown(self, **kw): yield from asyncio.gather( self.sandbox_pull_manager.shutdown(), self.sandbox_push_manager.shutdown(), self.swarm_pull_manager.shutdown(), return_exceptions=True) class DockerController: def __init__(self, sandbox_host, swarm_host, mysql_host, registry, env, datastore_path, sandbox_path, toolbox_path, sandbox_network, redis_host, config_file="/vol/ro/config.yml", ): # parse the config file with config_reader.ConfigReader( open(config_file) if os.path.exists(config_file) else "{}", logger = log ) as cfg: def get_bytes(cfg, key): txt = cfg.get(key, type=str) if txt is not None: try: docker.utils.parse_bytes(txt) except Exception as e: raise ValueError("%s: %s" % (cfg.path(key), e)) if cfg is None: cfg = config_reader.Mapping({}) self.cpu_shares = cfg.get("cpus", None, int) dct = cfg.get("registry_auth", None, dict) auth_config = { "username": dct.get("username", str, required=True), "password": dct.get("password", str, required=True), } if dct else None self.sandbox = SharedSwarmClient(sandbox_host, config=cfg.get("sandbox", {}, dict), alias="sandbox") self.sandbox.aiohttp_session, self.sandbox.aiohttp_url = ( make_aiohttp_client(sandbox_host)) if sandbox_host == swarm_host: self.swarm = self.sandbox else: self.swarm = SharedSwarmClient(swarm_host, config=cfg.get("swarm", {}, dict), alias="swarm") self.swarm.aiohttp_session, self.swarm.aiohttp_url = ( make_aiohttp_client(swarm_host)) self._db_sessionmaker = sqlalchemy.orm.scoping.scoped_session( connect_db(mysql_host)) self._thread_local = threading.local() self.redis_host = redis_host self.redis_client = None self.image_manager = ImageManager(self, auth_config=auth_config) self.sandbox_manager = SandboxManager(self) self.job_manager = JobManager(self) self.registry = registry self.env = env self.datastore_path = datastore_path self.sandbox_path = sandbox_path self.toolbox_path = toolbox_path self.sandbox_network= sandbox_network self._task = None self._shutdown_requested = None img = "busybox:latest" try: self.sandbox.inspect_image(img) except docker.errors.NotFound: log.info("pulling docker image %s", img) self.sandbox.pull(img) def gen_sandbox_name(self, webapp): return "%s-sandbox-%s" % (self.env, webapp.docker_name) def gen_image_name(self, webapp): return "%s/%s" % (self.registry, webapp.docker_name) def gen_job_name(self, job): return "%s-job-%s-%d-%s" % (self.env, job.queue.name, job.id, job.webapp.docker_name) def gen_job_path(self, job): return os.path.join(self.datastore_path, str(job.id)) def gen_factory_name(self, docker_os): # NOTE: factory names now refer to an image from the official docker # registry. To enforce this (and avoid a user using an allgo image as # the docker_os), we explicitely prepend "registry-1.docker.io" in the # repository name. Once the registry authentication is clarified and # correctly secured we will be able to remove this restriction and # allow using any docker images). # repo = docker_os.docker_name if "/" not in repo: repo = "library/" + repo return "registry-1.docker.io/"+ repo def inspect_job_container(self, client, job): """inspect the underlying container of a job + safety checks The purpose of the validation check is to prevent a DoS attack on an arbitrary container. The job.container_id shall really be the id of the container and its name shall start with "ENV-job". return: - the result of client.inspect_container() if ok - None if the container is not found or if the container name/id is not valid """ assert job.container_id is not None cid = job.container_id try: ctr = client.inspect_container(cid) except docker.errors.NotFound: return None if cid != ctr["Id"]: log.error("job %d references container %r which resolves to a different container id: %r", job.id, cid, ctr["Id"]) elif not re.match(r"/%s-job-[^/]+\Z" % self.env, ctr["Name"]): log.error("job %d references container %r whose name is invalid: %r", job.id, cid, ctr["Name"]) else: return ctr return None def check_host_path(self, funcname, path, *, nolink=True): """Validate a path before using it as an external volume in docker - ensure the path is canonical: - it is absolute - it does not contain any '..' parts - it does not contain any symbolic link - ensure the path is of the right type (call os.path.`funcname`()) """ if not os.path.isabs(path): raise Error("host path %r is not absolute" % path) if path != os.path.normpath(path): raise Error("host path %r is not canonical (os.path.normpath())" % path) ctrpath = "/vol/host" + path log.debug("ctrpath %r", ctrpath) func = getattr(os.path, funcname) if nolink and os.path.realpath(ctrpath) != os.path.normpath(ctrpath): raise Error("host path %r contains a symbolic link" % path) if not func(ctrpath): raise Error("host path %r not found (os.path.%s())" % (path, funcname)) @property def session(self): """Return the thread-local sqlalchemy session WARNING: in async functions, db transaction must not span over any 'yield from' statements (to avoid interleaving) """ return self._db_sessionmaker() @asyncio.coroutine def db_startup(self): log.info("wait until the db server is ready") ses = self.session while True: with contextlib.suppress(sqlalchemy.exc.OperationalError): log.debug("db ping") with ses.begin(): ses.execute("SELECT 1") break with contextlib.suppress(asyncio.TimeoutError): yield from asyncio.wait_for(asyncio.shield(self._shutdown_requested), 10) return log.info("db server ready") self.check_db(startup=True) def check_db(self, *, startup=False): log.debug("check_db(startup=%r)", startup) try: ses = self.session with ses.begin(): if startup: ses.execute("DELETE FROM dj_webapp_versions WHERE state=%d" % VersionState.REPLACED) for version_id, in ses.execute( "SELECT id FROM dj_webapp_versions WHERE state=%d" % VersionState.COMMITTED).fetchall(): self.image_manager.push(version_id) for webapp_id, in ses.execute("""SELECT dj_webapps.id FROM dj_webapps LEFT JOIN dj_webapp_versions ON dj_webapps.id=dj_webapp_versions.webapp_id WHERE sandbox_state IN (%d,%d) OR state=%d GROUP BY dj_webapps.id""" % ( SandboxState.STARTING, SandboxState.STOPPING, VersionState.SANDBOX)).fetchall(): self.sandbox_manager.process(webapp_id) for job_id, in ses.query(Job.id).filter(Job.state.in_( (JobState.WAITING.value, JobState.RUNNING.value, JobState.ABORTING.value)) ).order_by(Job.state): log.debug("schedule job %d", job_id) self.job_manager.process(job_id) # TODO schedule the push of images versions in state 'committed' except sqlalchemy.exc.OperationalError as e: log.error("db error %s", e.orig.args) return async def periodic_db_check_task(self): """Periodic db check This task is theretically not needed if we take for granted that the redis notification channel works properly. """ while True: try: await asyncio.sleep(DB_CHECK_PERIOD) self.check_db() except OSError as e: log.error("I/O error when polling the db (%s)", e) except asyncio.CancelledError: log.info("db periodic check task terminated") return except Exception: log.exception("error in the periodic db check task") async def redis_notification_listener_task(self): """Task listening for redis notifications - subscribes to REDIS_CHANNEL_CONTROLLER - automatically reconnects to the server (with rate limiting) """ async for _ in RateLimiter(60): try: # create redis connection and subscribe to the notification channel conn = await aioredis.create_redis((self.redis_host, 6379)) sub, = await conn.subscribe(REDIS_CHANNEL_CONTROLLER) log.info("subscribed to redis pub/sub channel %r" % REDIS_CHANNEL_CONTROLLER) # force a db check (because we may have missed some notifications) self.check_db() async for msg in sub.iter(): log.info("redis notification: %r", msg) try: item_type, item_id_str = msg.split(b":") item_id = int(item_id_str) except ValueError as e: log.warning("ignored malformatted notification: %r (%s)", msg, e) continue if item_type == b"job": self.job_manager.process(item_id) elif item_type == b"sandbox": self.sandbox_manager.process(item_id) else: log.warning("ignored notification for unknown item %r", msg) except OSError as e: log.error("I/O error in the redis listener (%s)", e) except asyncio.CancelledError: log.info("notification task terminated") return except Exception: log.exception("error in the redis notification loop") def shutdown(self): if not self._shutdown_requested.done(): self._shutdown_requested.set_result(None) return self._task @asyncio.coroutine def _run(self): assert self._shutdown_requested is None try: loop = asyncio.get_event_loop() self._shutdown_requested = asyncio.Future() loop.add_signal_handler(signal.SIGTERM, self.shutdown) loop.add_signal_handler(signal.SIGINT, self.shutdown) yield from self.db_startup() tasks = [] try: # note: this call never fails (even if the redis server is down) self.redis_client = yield from aioredis.create_reconnecting_redis( (self.redis_host, 6379)) # start the tasks for receiving notificatiosn # - from the redis PUBSUB channel tasks.append(asyncio.ensure_future(self.redis_notification_listener_task())) # - additional periodic check of the db (in case some redis # notifications are missed) tasks.append(asyncio.ensure_future(self.periodic_db_check_task())) yield from self._shutdown_requested finally: # graceful shutdown for task in tasks: task.cancel() self.swarm.shutdown() self.sandbox.shutdown() # terminate all pending tasks yield from asyncio.gather( self.image_manager.shutdown(), self.sandbox_manager.shutdown(), self.job_manager.shutdown(), *tasks, return_exceptions=True) finally: self.swarm.shutdown(wait=True) self.sandbox.shutdown(wait=True) def run(self): asyncio.get_event_loop().run_until_complete(self._run())