#!/usr/bin/python3 import asyncio import collections from concurrent.futures import ThreadPoolExecutor import contextlib import datetime import enum import itertools import json import logging import re import os import shlex import signal import socket import sys import time import threading import traceback import docker import MySQLdb from sqlalchemy import desc import sqlalchemy.orm.scoping from database import * HOST_PATH="/vol/host/" ################################################## # default number of concurrent tasks # pushing images from the sandbox to the registry NB_PUSH_SANDBOX = 2 # pulling images from the registry to the sandbox NB_PULL_SANDBOX = 2 # pulling images from the registry to the swarm NB_PULL_SWARM = 2 # sandbox actions (start, stop, commit, ...) NB_SANDBOX_TASKS = 4 # jobs NB_JOB_TASKS = 4 # default thread pool (used by most of the tasks) default_executor = ThreadPoolExecutor(10) ################################################## log = logging.getLogger("controller") assert MySQLdb.threadsafety >= 1 class Error(Exception): pass class ShuttingDown(Exception): pass def docker_check_error(func, *k, **kw): """Wrapper for docker-py methods that produce a stream Methods producing a stram (eg: push, build) do not report all errors by raising exceptions. Some errors are reported later in the stream. This function parses the stream and raise Error() if needed. """ for elem in func(*k, stream=True, **kw): js = json.loads(elem.decode()) if "error" in js: raise Error("push error: " + js["error"]) @contextlib.contextmanager def docker_warning(msg, *k, ignore=None): """Catch docker errors and issue a warning instead""" try: yield except docker.errors.APIError as e: if ignore is None or not isinstance(e, ignore): k += e, log.warning(msg + " (%s)", *k) @contextlib.contextmanager def report_error(fmt, *k): """Context manager for logging exceptions This function logs exceptions (when leaving the context) with log.error() (if the exception inherit from Error) or log.exception() otherwise. The log message is prepended with the string generated by: fmt % k """ try: yield except Exception as e: msg = fmt % k log_func = log.error if isinstance(e, Error) else log.exception log_func("%s (%s)", msg, traceback.format_exception_only(type(e), e)[-1].strip()) raise def rate_limit(period): """Generator for rate limiting This function ensures we spend at least `period` seconds for each iteration """ assert period > 0 t0 = time.monotonic() while True: yield t1 = time.monotonic() delay = t0 - t1 + period if delay > 0: log.debug("rate_limit: sleep %f seconds", delay) time.sleep(delay) t0 = t1 + delay else: t0 = t1 def auto_create_task(func): """Decorator for forcing the creation of a task when a coroutine function is called Return a wrappers that calls the function, create the task on the fly and return it. Also it installs a no-op callback to avoid warnings in case the result is not used. """ assert asyncio.iscoroutinefunction(func) def wrapper(*k, **kw): tsk = asyncio.async(func(*k, **kw)) # ignore warning about result not used tsk.add_done_callback(lambda f: f.exception()) return tsk return wrapper def cascade_future(src, dst): """propagate the result of a future to another future This function installs a callback to the future `src`, that propagates its result to the future `dst`. """ def callback(fut): ex = fut.exception() if ex is None: dst.set_result(fut.result()) else: dst.set_exception(ex) if src.done(): callback(src) else: src.add_done_callback(callback) class Manager: """A class for scheduling asynchronous jobs on a collection of keys (the one-line summary is not very helpful, sorry) Job submission The job is scheduled on key KEY by calling .process(KEY), the function returns an asyncio.Future object (which will receive the result) If .process(KEY) is called while a job is already running for KEY, then the manager arranges the job to be run a second time in a row for the same key Job implementation This is an abstract class. The job shall be implemented as the coroutine named ._process() in the inherited class. The manager guarantees that ._process() cannot be called multiple times concurrently for the same key In case a shutdown is requested (see .shutdown()): - all jobs that are not yet started are cancelled - running jobs continue until they return or until they try to acquire the internal semaphore (which raises ShuttingDown()) Concurrency The job tasks are all started immediately (whatever the number of existing jobs) It is possible to limit the concurrency to a maximum number of parallel tasks. Manager provides an internal semaphore (the number of tokens is set in .__init__()) The semaphore can be locked in two ways: - by locking the manager: with (yield from self): ... concurrency limited to `nb_tokens` tasks ... - by calling .run_in_executor (this is for running non-async code): yield from self.run_in_executor(func, args...) Shutdown When .shutdown() is called the Manager ensures that all jobs are properly terminated - it cancels all jobs that are not yet started - it prevents starting new jobs - it lets running pending jobs, but interrupts them if they when ther try to acquire the internal semaphore All cancel/interrupted tasks have their future raise ShuttingDown(). .shutdown() returns after all pending jobs are terminated. Thread safety - Manager is *not* thread safe, all public methods must be called from the same thread """ class _Handle: __slots__ = "key", "cur", "nxt" def __init__(self, nb_tokens=1, *, executor = default_executor, interruptible = False): # {key: _TaskHandler} self._handles = {} self._semaphore = asyncio.Semaphore(nb_tokens) self._shutdown = asyncio.Future() self._executor = executor self._interruptible = interruptible def _create_task(self, hnd): assert hnd.nxt is None def reset(): assert not hnd.cur.done() nxt = hnd.nxt if nxt is not None: cascade_future(hnd.cur, nxt) hnd.nxt = None hnd.cur = asyncio.async(self._process(hnd.key, reset)) hnd.cur.add_done_callback(lambda fut: self._done(hnd)) log.debug("task scheduled %r %r", self, hnd.key) return hnd.cur def process(self, key): """Schedule the job to be run on key `key` returns an asyncio.Future (that will provide the result of ._process()) """ if self._shutdown.done(): return self._shutdown hnd = self._handles.get(key) if hnd is None: # create handle self._handles[key] = hnd = self._Handle() hnd.key = key hnd.cur = None hnd.nxt = None if hnd.cur is None: # create task return self._create_task(hnd) else: # reschedule task if hnd.nxt is None: hnd.nxt = asyncio.Future() return hnd.nxt def _done(self, hnd): assert hnd is self._handles.get(hnd.key) assert hnd.cur.done() try: hnd.cur.result() except ShuttingDown: pass except Exception: log.exception("task %r %r unhandled exception", self, hnd.key) nxt = hnd.nxt hnd.cur = hnd.nxt = None if nxt is None: del self._handles[hnd.key] else: cascade_future(self._create_task(hnd), nxt) @asyncio.coroutine def __iter__(self): """Coroutine for locking the internal semaphore Usage: with (yield from manager): ... Warning: after a shutdown is initiated, this function will always raise ShuttingDown() """ ctx = yield from iter(self._semaphore) if self._shutdown.done(): with ctx: raise ShuttingDown() return ctx @asyncio.coroutine def run_in_executor(self, *k, lock=True): """Run a function in a separate thread (with limited concurrency) This function locks the internal semaphore and runs the provided functionc call in a separate thread (using the executor) """ def run(): coro = asyncio.get_event_loop().run_in_executor(self._executor, *k) if self._interruptible: return next(asyncio.as_completed((coro, self._shutdown))) else: return coro if lock: with (yield from self): return (yield from run()) else: return (yield from run()) @asyncio.coroutine def _process(self, key, reset): """Actual implementation of the job (to be reimplemented in inherited classes) The Manager class guarantees that this function cannot be called multiple times concurrently on the same key (in case the same key is submitted multiple times, they Manager will call this function a second time after it has terminated). `reset` is a function that may be called to reset the 'dirty' state of this key (this is to avoid calling ._process() a second time if not necessary) """ raise NotImplementedError() @asyncio.coroutine def shutdown(self): """Initiate a graceful shutdown This coroutine terminates once all tasks are properly terminated. """ exc = ShuttingDown() self._shutdown.set_exception(exc) self._shutdown.exception() # to avoid asyncio warnings # cancel all 'next' tasks for hnd in self._handles.values(): if hnd.nxt is not None: hnd.nxt.set_exception(exc) hnd.nxt = None if not self._interruptible: yield from asyncio.gather( *(h.cur for h in self._handles.values() if h.cur is not None), return_exceptions=True) class SandboxManager(Manager): """Manager for sandbox operation This manager handles all sandbox operations (start, stop, commit). Operations are requested asynchronously in the database: - Webapp.sandbox_state=starting (for starting a sandbox) - Webapp.sandbox_state=stopping (for stopping a sandbox) - WebappVersion with state=sandbox (for committing a new image) All state changes on Webapp.sandbox_state are atomic (e.g: if the user requests a stop while the sandbox is starting then the manager finishes with starting the sandbox but does not update the state, and it runs immediately again to stop the sandbox) Whatever is the value of Webapp.sandbox_state, the manager first examines commit requests and makes the commit if requested. If the container already exists before starting the webapp (not possible in normal operations), then a recovery image is committed first. When a commit is successful. The image manager is notified for pushing the image to the registry and (if it is not a recovery image) to pull it to the swarm (because this image will very likely be used soon). State changes: - sandbox start: starting->running (normal case) starting->start_error (error case) - sandbox stop: stopping->idle (normal case) stopping->stop_error (error case) - image commit: sandbox->committed (normal case) sandbox->error (error case) (none)->committed (recovery version) """ def __init__(self, ctrl, nb_threads = NB_SANDBOX_TASKS): super().__init__(nb_threads) self.ctrl = ctrl def inspect_sandbox(self, webapp): try: return self.ctrl.sandbox.inspect_container( self.ctrl.gen_sandbox_name(webapp)) except docker.errors.NotFound: return None @staticmethod def filter_commit_version(query, webapp_id): """Narrow a WebappVersion query to select the candidate versions to be committed""" return (query. filter_by(webapp_id=webapp_id, state = int(VersionState.SANDBOX)) ) def _start(self, webapp, version): """Start a webapp sandbox (to be executed in a thread pool) """ ctrl = self.ctrl ses = ctrl.session # prepare sandbox parameters # docker image if version is None: image = "%s/factory/%s:%s" % (self.ctrl.registry, webapp.docker_os.docker_name, webapp.docker_os.version) else: image = "%s:%s" % (webapp.image_name, version.number) log.debug("sandbox %r: using image %r", webapp.docker_name, image) # safety checks # (because docker_name is used it the paths of the external volumes if ("/" in webapp.docker_name) or (webapp.docker_name in ("", ".", "..")): raise Error("malformatted docker_name") uid = webapp.id + 2000 if uid < 2000: # just for safety raise Error("bad webapp id") # remove stale container (if any) if self.inspect_sandbox(webapp) is not None: self._stop(webapp) container = webapp.sandbox_name try: # prepare the sandbox # (create ssh keys) ctrl.check_host_path("isdir", ctrl.toolbox_path) ctrl.check_host_path("isdir", ctrl.sandbox_path) ctrl.sandbox.create_container("busybox:latest", name=container, command = ["/bin/sh", "-c", """ set -ex export PATH="$PATH:/.toolbox/bin" # clean sandbox dir rm -rf {sbx} # create dirs for dir in {sbx} {etc} {run} do mkdir -p ${{dir}} chown {uid}:65534 ${{dir}} chmod 0700 ${{dir}} done # xauth file touch {run}/XAuthority chown {uid}:65534 {run}/XAuthority chmod 0600 {run}/XAuthority # generate ssh keys (for type in ecdsa ed25519 rsa do key={etc}/ssh_host_${{type}}_key [ -f $key ] || ssh-keygen -N '' -f $key -t $type >&2 echo -n '{hostname}. ' | cat - ${{key}}.pub done) > {etc}/ssh_known_hosts # known_host file for allgo-shell ssh-keygen -H -f {etc}/ssh_known_hosts chmod 0644 {etc}/ssh_known_hosts rm -f {etc}/ssh_known_hosts.old # authentication key for allgo-shell rm -f {etc}/identity ssh-keygen -N '' -f {etc}/identity chown {uid}:65534 {etc}/identity # forced shell for the sshd config cat > {etc}/shell < {etc}/sshd_config < {entrypoint} < all pending .wait() are inturrepted with ShuttingDown -> all future .wait() calls will immediately raise ShuttingDown """ if not self._shutdown: self._shutdown = True for fut in self._futures.values(): if not fut.done(): # pragma: nobranch fut.set_exception(ShuttingDown()) @asyncio.coroutine def wait(self, container_id): """Wait for the termination of a container Notes: - `container_id` *must* be the full container id (64 digits) - the class support only one concurrent waiter for each container id """ assert re.match(r"[0-9a-f]{64}\Z", container_id), "bad container id" if container_id in self._futures: raise RuntimeError("there is already a watch for this container") if self._shutdown: raise ShuttingDown() self._futures[container_id] = fut = asyncio.Future() try: if self._client.inspect_container(container_id)["State"]["Running"]: log.debug("wait for container: %s", container_id) yield from fut log.debug("container terminated: %s", container_id) except docker.errors.NotFound: pass finally: del self._futures[container_id] class JobManager(Manager): class JobInfo: __slots__ = "job_id", "ver_id", "ctr_id", "version", "ctr_name", "client", "watcher" def __init__(self, ctrl, nb_jobs=NB_JOB_TASKS): super().__init__(nb_jobs) self.ctrl = ctrl def _create_job(self, info): ctrl = self.ctrl ses = ctrl.session tmp_img = None assert info.ctr_id is None try: with ses.begin(): job = ses.query(Job).filter_by(id=info.job_id).one() webapp = job.webapp log.info("start job %d (%s:%s)", info.job_id, webapp.docker_name, info.version) job.state = int(JobState.RUNNING) # pragma: nobranch (TODO: remove (coverage bug)) repo = ctrl.gen_image_name(webapp) image = "%s:%s" % (repo, info.version) job_path = ctrl.gen_job_path(job) log.debug("job.path: %r", job_path) if info.ver_id is None: assert info.version == "sandbox" image = tmp_img = info.client.commit(ctrl.gen_sandbox_name(webapp), repo, info.version)["Id"] # TODO use another workdir # TODO use another uid ctrl.check_host_path("isdir", job_path) hc = ctrl.sandbox.create_host_config( binds = {job_path: {"bind": "/tmp"}}, cap_drop = ["all"], # FIXME: CAP_DAC_OVERRIDE needed because all nfs files have uid,gid=1000,1000 cap_add = ["dac_override"], cpu_period = ctrl.cpu_period, cpu_quota = ctrl.cpu_quota, # mem_reservation = ctrl.mem_soft_limit, mem_limit = ctrl.mem_hard_limit, ) if ctrl.mem_soft_limit: # TODO: upgrade docker-py (and use create_host_config) hc["MemoryReservation"] = ctrl.mem_soft_limit log.debug("host_config %r", hc) info.ctr_id = info.client.create_container(image, name=info.ctr_name, working_dir = "/tmp", # NOTE: the command line is a little complex, but this is # to ensure that (TODO write tests for this): # - no output is lost (we go though a pipe in case the # app has multiple processes writing to stdout/stderr # concurrently) # - we get the exit code of the app (not the exit code of # cat) # - we are failsafe (if fifo creation fails then the app # is run anyway, with the exit code of cat) # - we have no unusual dependencies (only sh, cat and # mkfifo) command = ["/bin/sh", "-c", """ fifo=/.allgo.fifo.{job_id} if mkfifo "$fifo" then exec cat <"$fifo" >allgo.log & exec "$@" >"$fifo" 2>&1 & wait %1 wait %2 rm "$fifo" else "$@" 2>&1 | cat >allgo.log fi failcnt="`cat /sys/fs/cgroup/memory/memory.failcnt`" if [ "$failcnt" -ne 0 ] ; then echo "WARNING: out of memory (memory.failcnt=$failcnt)" >>allgo.log fi """.format(job_id=job.id), "job%d" % job.id, webapp.entrypoint] + shlex.split(job.param), labels = {"allgo.tmp_img": tmp_img or ""}, host_config = hc)["Id"] info.client.start(info.ctr_id) except: #TODO introduce a state JobState.ERROR self._remove_job(info, tmp_img=tmp_img) raise def _remove_job(self, info, *, tmp_img=None): ses = self.ctrl.session # TODO: report launch errors to the user # TODO: report exit code to the user # TODO: use another uid def parse_docker_timestamp(value): return datetime.datetime.strptime( # limit the precision to the microsecond # (othewise strptime fails) re.sub(r"(\.\d{,6})\d*Z$", r"\1Z", value), # iso8601 format "%Y-%m-%dT%H:%M:%S.%fZ") with ses.begin(): job = ses.query(Job).filter_by(id=info.job_id).one() exec_time = 0.0 if info.ctr_id is not None: try: js = info.client.inspect_container(info.ctr_id) except docker.errors.NotFound: pass else: started_at = js["State"].get("StartedAt", "0001-") finished_at = js["State"].get("FinishedAt") # default docker date is '0001-01-01T00:00:00Z' if not started_at.startswith("0001-"): try: exec_time =( parse_docker_timestamp(finished_at) - parse_docker_timestamp(started_at) ).total_seconds() except Exception: # pragma: nocover log.exception("job %d: unable to compute exec time", info.job_id) if tmp_img is None: tmp_img = js["Config"]["Labels"].get("allgo.tmp_img") or None with docker_warning("job %d: cleanup error: unable to remove container", info.job_id): info.client.remove_container(info.ctr_id) if tmp_img is not None: with docker_warning("job %d: cleanup error: unable to remove tmp image", info.job_id): info.client.remove_image(tmp_img) job.exec_time = exec_time job.state = int(JobState.DONE) log.info("stop job %d (duration %fs)", info.job_id, exec_time) @asyncio.coroutine def _finish_job(self, info): # wait for container to terminate if info.ctr_id is not None: yield from info.watcher.wait(info.ctr_id) # remove container yield from self.run_in_executor(self._remove_job, info, lock=False) @asyncio.coroutine def _process(self, job_id, reset): ctrl = self.ctrl ses = ctrl.session log.debug("process job id %d", job_id) with ses.begin(): # query db job = ses.query(Job).filter_by(id=job_id).first() if job is None: # pragma: nocover # unknown job log.warning("unknown job id %d", job_id) return state = JobState(job.state) if job.webapp is None: log.error("job %d: webapp id %r not found", job_id, job.webapp_id) if state == JobState.WAITING: # pragma: nobranch job.state = int(JobState.DONE) job.exec_time = 0 # TODO report error to the user ? return info = self.JobInfo() info.job_id = job_id info.ctr_id = None info.version = job.version info.ctr_name = ctrl.gen_job_name(job) if job.version == "sandbox": info.client = ctrl.sandbox info.watcher = ctrl.sandbox_watcher else: info.client = ctrl.swarm info.watcher = ctrl.swarm_watcher if state == JobState.WAITING: # job is not yet started if job.version == "sandbox": # to be run in the sandbox info.ver_id = None else: # to be run in the swarm #TODO: replace version_id with webapp_version_id ver = ses.query(WebappVersion).filter_by( webapp_id = job.webapp_id, number = job.version).filter( WebappVersion.state.in_(( int(VersionState.COMMITTED), int(VersionState.READY))) ).order_by( WebappVersion.state.desc(), WebappVersion.id.desc()).first() if ver is None: log.error("job %d: webapp %r version %r not found", job_id, job.webapp.docker_name, job.version) job.state = int(JobState.DONE) # TODO report error to the user return info.ver_id = ver.id elif state == JobState.RUNNING: # pragma: nobranch # job is already started # we do not care about the actual version_id *but* we need to # know whether we are in the swarm or in the sandbox info.ver_id = None if job.version == "sandbox" else -1 # look up container id try: info.ctr_id = info.client.inspect_container(info.ctr_name)["Id"] except docker.errors.NotFound: info.ctr_id = None else: # pragma: nocover # unexpected state log.warning("job id %d is in unexpected state %s", job_id, state.name) return if state == JobState.WAITING: # job is not yet started # pull the image to the swarm if info.ver_id is not None: # NOTE: race condition: will fail if ver.state==sandbox # jobs must be submitted after the image is committed yield from ctrl.image_manager.pull(info.ver_id, swarm=True) # lock the semaphore (to limit the total number of jobs) with (yield from self): yield from self.run_in_executor(self._create_job, info, lock=False) yield from self._finish_job(info) elif state == JobState.RUNNING: # pragma: nobranch # the job is already running # -> wait for its termination try: # try to lock the semaphore semctx = yield from asyncio.wait_for(iter(self), 0) except asyncio.TimeoutError: # no available tokens # (this may happen if MAX_JOBS is reduced across a restart) # -> wait for the job anyway yield from self._finish_job(info) else: # normal case with semctx: yield from self._finish_job(info) # NOTE: for the push/pull managers, interruptible=True guarantees that the # managers terminate immediately, however it cannot guarantee that the # process will terminate immediately because the ThreadPoolExecuter installs # a atexit handler that joins all the background threads. # # Anyway this is not a big issue since all pending push/pull raise # ShuttingDown immediately, thus we won't end up with a sandbox/job # in an inconsistent state when SIGKILL arrives. # class PullManager(Manager): def __init__(self, nb_threads, client, name): super().__init__(nb_threads, interruptible=True) self.client = client self.name = name @asyncio.coroutine def _process(self, img, reset): image, version = img log.info("pull to the %-10s %s:%s", self.name, image, version) return self.run_in_executor(self.client.pull, image, version) class PushManager(Manager): def __init__(self, nb_threads, ctrl): super().__init__(nb_threads, interruptible=True) self.ctrl = ctrl @asyncio.coroutine def _process(self, version_id, reset): ses = self.ctrl.session with report_error("unable to push version id %d", version_id): with ses.begin(): # get the version object and check its state version = ses.query(WebappVersion).filter_by(id=version_id).one() if version.state != VersionState.COMMITTED: if version.state in (VersionState.READY, VersionState.REPLACED): # already pushed return if version.state == VersionState.SANDBOX: raise Error("unable to push (image not yet committed)") raise Error("unable to push (invalid state: %s)" % version.state) # ensure that there is no other version with the same number in the pipeline # (to avoid a race condition) others = (ses.query(WebappVersion.id) .filter_by(webapp_id=version.webapp_id, number=version.number) .filter(WebappVersion.id != version.id) .filter(WebappVersion.state.in_((int(VersionState.SANDBOX), int(VersionState.COMMITTED))))) if others.count(): raise Error("unable to push (there are other pushable versions with the same number: %s)" % ( " ".join(map(str, itertools.chain(*others))))) image = self.ctrl.gen_image_name(version.webapp) tag = version.number log.info("push from the %-8s %s:%s", "sandbox", image, tag) yield from self.run_in_executor(docker_check_error, self.ctrl.sandbox.push, image, tag) reset() with ses.begin(): version = ses.query(WebappVersion).filter_by(id=version_id).one() prev = ses.query(WebappVersion).filter_by( webapp_id = version.webapp_id, number = version.number, state = int(VersionState.READY)).scalar() log.debug("prev version id %r", (prev.id if prev else None)) if prev is None: # this is a new version version.state = int(VersionState.READY) else: # overwrite an existing version for key in "updated_at", "changelog", "published": setattr(prev, key, getattr(version, key)) # mark this version as replaced version.state = int(VersionState.REPLACED) ses.add(version) class ImageManager: def __init__(self, ctrl, nb_push_sandbox = NB_PUSH_SANDBOX, nb_pull_sandbox = NB_PULL_SANDBOX, nb_pull_swarm = NB_PULL_SWARM): self.ctrl = ctrl self.sandbox_push_manager = PushManager(nb_push_sandbox, ctrl) self.sandbox_pull_manager = PullManager(nb_pull_sandbox, ctrl.sandbox, "sandbox") self.swarm_pull_manager = PullManager(nb_pull_swarm, ctrl.swarm, "swarm") # return a future @auto_create_task @asyncio.coroutine def pull(self, version_id: int, *, swarm=False): with report_error("unable to pull version id %d to %s", version_id, ("swarm" if swarm else "sandbox")): ses = self.ctrl.session with ses.begin(): # get the version object and check its state version = ses.query(WebappVersion).filter_by(id=version_id).one() image = self.ctrl.gen_image_name(version.webapp) tag = version.number if swarm: # pull to the swarm if version.state == VersionState.COMMITTED: # must be pushed to the registry first yield from self.push(version_id) with ses.begin(): version = ses.query(WebappVersion).filter_by(id=version_id).one() if version.state not in (VersionState.READY, VersionState.REPLACED): raise Error("bad version state: %s" % version.state) yield from self.swarm_pull_manager.process((image, tag)) else: # pull to the sandbox if version.state == VersionState.COMMITTED: # do not pull! return if version.state not in (VersionState.READY, VersionState.REPLACED): raise Error("bad version state: %s" % version.state) yield from self.sandbox_pull_manager.process((image, tag)) # return a future def push(self, version_id: int): return self.sandbox_push_manager.process(version_id) @asyncio.coroutine def shutdown(self, **kw): yield from asyncio.gather( self.sandbox_pull_manager.shutdown(), self.sandbox_push_manager.shutdown(), self.swarm_pull_manager.shutdown(), return_exceptions=True) class DockerController: def __init__(self, sandbox_host, swarm_host, mysql_host, port, registry, env, datastore_path, sandbox_path, toolbox_path, max_jobs, sandbox_network, cpus, mem_soft_limit, mem_hard_limit): self.sandbox = docker.Client(sandbox_host) self.sandbox_watcher = DockerWatcher(self.sandbox) if sandbox_host == swarm_host: self.swarm = self.sandbox self.swarm_watcher = self.sandbox_watcher else: self.swarm = docker.Client(swarm_host) self.swarm_watcher = DockerWatcher(self.swarm) self._db_sessionmaker = sqlalchemy.orm.scoping.scoped_session( connect_db(mysql_host)) self._thread_local = threading.local() self.sock = socket.socket() self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.sock.bind(("0.0.0.0", port)) self.sock.listen(32) self.sock.setblocking(False) self.image_manager = ImageManager(self) self.sandbox_manager = SandboxManager(self) self.job_manager = JobManager(self, max_jobs) self.cpu_quota = None if cpus is None else int(cpus * 100000) self.cpu_period = None if cpus is None else 100000 self.mem_soft_limit = None if mem_soft_limit is None else docker.utils.parse_bytes(mem_soft_limit) self.mem_hard_limit = None if mem_hard_limit is None else docker.utils.parse_bytes(mem_hard_limit) self.registry = registry self.env = env self.datastore_path = datastore_path self.sandbox_path = sandbox_path self.toolbox_path = toolbox_path self.sandbox_network= sandbox_network self._task = None self._shutdown_requested = None img = "busybox:latest" try: self.sandbox.inspect_image(img) except docker.errors.NotFound: log.info("pulling docker image %s", img) self.sandbox.pull(img) def gen_sandbox_name(self, webapp): return "%s-sandbox-%s" % (self.env, webapp.docker_name) def gen_image_name(self, webapp): return "%s/webapp/%s" % (self.registry, webapp.docker_name) def gen_job_name(self, job): return "%s-job-%d-%s" % (self.env, job.id, job.webapp.docker_name) def gen_job_path(self, job): return os.path.join(self.datastore_path, str(job.user_id), str(job.webapp_id), job.access_token) def check_host_path(self, funcname, path, *, nolink=True): """Validate a path before using it as an external volume in docker - ensure the path is canonical: - it is absolute - it does not contain any '..' parts - it does not contain any symbolic link - ensure the path is of the right type (call os.path.`funcname`()) """ if not os.path.isabs(path): raise Error("host path %r is not absolute" % path) if path != os.path.normpath(path): raise Error("host path %r is not canonical (os.path.normpath())" % path) ctrpath = "/vol/host" + path log.debug("ctrpath %r", ctrpath) func = getattr(os.path, funcname) if nolink and os.path.realpath(ctrpath) != os.path.normpath(ctrpath): raise Error("host path %r contains a symbolic link" % path) if not func(ctrpath): raise Error("host path %r not found (os.path.%s())" % (path, funcname)) @property def session(self): """Return the thread-local sqlalchemy session WARNING: in async functions, db transaction must not span over any 'yield from' statements (to avoid interleaving) """ return self._db_sessionmaker() def _sock_callback(self): try: while True: self.sock.accept()[0].close() except BlockingIOError: pass self.check_db() @asyncio.coroutine def db_startup(self): log.info("wait until the db server is ready") ses = self.session while True: with contextlib.suppress(sqlalchemy.exc.OperationalError): log.debug("db ping") with ses.begin(): ses.execute("SELECT 1") break with contextlib.suppress(asyncio.TimeoutError): yield from asyncio.wait_for(asyncio.shield(self._shutdown_requested), 10) return log.info("db server ready") self.check_db(startup=True) def check_db(self, *, startup=False): log.debug("check_db(startup=%r)", startup) try: ses = self.session with ses.begin(): if startup: ses.execute("DELETE FROM webapp_versions WHERE state=%d" % VersionState.REPLACED) for version_id, in ses.execute( "SELECT id FROM webapp_versions WHERE state=%d" % VersionState.COMMITTED).fetchall(): self.image_manager.push(version_id) for webapp_id, in ses.execute("""SELECT webapps.id FROM webapps LEFT JOIN webapp_versions ON webapps.id=webapp_versions.webapp_id WHERE sandbox_state IN (%d,%d) OR state=%d GROUP BY webapps.id""" % ( SandboxState.STARTING, SandboxState.STOPPING, VersionState.SANDBOX)).fetchall(): self.sandbox_manager.process(webapp_id) for job_id, in ses.query(Job.id).filter(Job.state.in_( (JobState.WAITING.value, JobState.RUNNING.value)) ).order_by(Job.state): log.debug("schedule job %d", job_id) self.job_manager.process(job_id) # TODO schedule the push of images versions in state 'committed' except sqlalchemy.exc.OperationalError as e: log.error("db error %s", e.orig.args) return def shutdown(self): if not self._shutdown_requested.done(): self._shutdown_requested.set_result(None) return self._task @asyncio.coroutine def _run(self): assert self._shutdown_requested is None try: loop = asyncio.get_event_loop() self._shutdown_requested = asyncio.Future() loop.add_signal_handler(signal.SIGTERM, self.shutdown) loop.add_signal_handler(signal.SIGINT, self.shutdown) loop.add_reader(self.sock, self._sock_callback) yield from self.db_startup() try: yield from self._shutdown_requested finally: # graceful shutdown # close socket (to disable incoming notifications) self.sock.close() # close the watchers self.sandbox_watcher.shutdown() self.swarm_watcher.shutdown() # terminate all pending tasks yield from asyncio.gather( self.image_manager.shutdown(), self.sandbox_manager.shutdown(), self.job_manager.shutdown(), return_exceptions=True) finally: self.sock.close() self.sandbox.close() self.swarm.close() def run(self): asyncio.get_event_loop().run_until_complete(self._run())