#!/usr/bin/python3 import asyncio import collections from concurrent.futures import ThreadPoolExecutor import contextlib import enum import itertools import json import logging import re import os import signal import socket import sys import time import threading import traceback import docker import MySQLdb from sqlalchemy import desc import sqlalchemy.orm.scoping from database import * HOST_PATH="/vol/host/" ################################################## # default number of concurrent tasks # pushing images from the sandbox to the registry NB_PUSH_SANDBOX = 2 # pulling images from the registry to the sandbox NB_PULL_SANDBOX = 2 # pulling images from the registry to the swarm NB_PULL_SWARM = 2 # sandbox actions (start, stop, commit, ...) NB_SANDBOX_TASKS = 4 # jobs NB_JOB_TASKS = 4 # default thread pool (used by most of the tasks) default_executor = ThreadPoolExecutor(10) ################################################## log = logging.getLogger("controller") assert MySQLdb.threadsafety >= 1 class Error(Exception): pass class ShuttingDown(Exception): pass def docker_check_error(func, *k, **kw): """Wrapper for docker-py methods that produce a stream Methods producing a stram (eg: push, build) do not report all errors by raising exceptions. Some errors are reported later in the stream. This function parses the stream and raise Error() if needed. """ for elem in func(*k, stream=True, **kw): js = json.loads(elem.decode()) if "error" in js: raise Error("push error: " + js["error"]) @contextlib.contextmanager def docker_warning(msg, *k, ignore=None): """Catch docker errors and issue a warning instead""" try: yield except docker.errors.APIError as e: if ignore is None or not isinstance(e, ignore): k += e, log.warning(msg + " (%s)", *k) @contextlib.contextmanager def report_error(fmt, *k): try: yield except Exception as e: msg = fmt % k log_func = log.error if isinstance(e, Error) else log.exception log_func("%s (%s)", msg, traceback.format_exception_only(type(e), e)[-1].strip()) raise def auto_create_task(func): """Decorator for forcing the creation of a task when a coroutine function is called Return a wrappers that calls the function, create the task on the fly and return it. Also it installs a no-op callback to avoid warnings in case the result is not used. """ assert asyncio.iscoroutinefunction(func) def wrapper(*k, **kw): tsk = asyncio.async(func(*k, **kw)) # ignore warning about result not used tsk.add_done_callback(lambda f: f.exception()) return tsk return wrapper def cascade_future(src, dst): """propagate the result of a future to another future This function installs a callback to the future `src`, that propagates its result to the future `dst`. """ def callback(fut): ex = fut.exception() if ex is None: dst.set_result(fut.result()) else: dst.set_exception(ex) if src.done(): callback(src) else: src.add_done_callback(callback) class Manager: """A class for scheduling asynchronous jobs on a collection of keys (the one-line summary is not very helpful, sorry) Job submission The job is scheduled on key KEY by calling .process(KEY), the function returns an asyncio.Future object (which will receive the result) If .process(KEY) is called while a job is already running for KEY, then the manager arranges the job to be run a second time in a row for the same key Job implementation This is an abstract class. The job shall be implemented as the coroutine named ._process() in the inherited class. The manager guarantees that ._process() cannot be called multiple times concurrently for the same key In case a shutdown is requested (see .shutdown()): - all jobs that are not yet started are cancelled - running jobs continue until they return or until they try to acquire the internal semaphore (which raises ShuttingDown()) Concurrency The job tasks are all started immediately (whatever the number of existing jobs) It is possible to limit the concurrency to a maximum number of parallel tasks. Manager provides an internal semaphore (the number of tokens is set in .__init__()) The semaphore can be locked in two ways: - by locking the manager: with (yield from self): ... concurrency limited to `nb_tokens` tasks ... - by calling .run_in_executor (this is for running non-async code): yield from self.run_in_executor(func, args...) Shutdown When .shutdown() is called the Manager ensures that all jobs are properly terminated - it cancels all jobs that are not yet started - it prevents starting new jobs - it lets running pending jobs, but interrupts them if they when ther try to acquire the internal semaphore All cancel/interrupted tasks have their future raise ShuttingDown(). .shutdown() returns after all pending jobs are terminated. Thread safety - Manager is *not* thread safe, all public methods must be called from the same thread """ class _Handle: __slots__ = "key", "cur", "nxt" def __init__(self, nb_tokens=1, *, executor = default_executor): # {key: _TaskHandler} self._handles = {} self._semaphore = asyncio.Semaphore(nb_tokens) self._shutdown = None self._executor = executor def _create_task(self, hnd): assert hnd.nxt is None def reset(): assert not hnd.cur.done() nxt = hnd.nxt if nxt is not None: cascade_future(hnd.cur, nxt) hnd.nxt = None hnd.cur = asyncio.async(self._process(hnd.key, reset)) hnd.cur.add_done_callback(lambda fut: self._done(hnd)) log.debug("task scheduled %r %r", self, hnd.key) return hnd.cur def process(self, key): """Schedule the job to be run on key `key` returns an asyncio.Future (that will provide the result of ._process()) """ if self._shutdown is not None: return self._shutdown hnd = self._handles.get(key) if hnd is None: # create handle self._handles[key] = hnd = self._Handle() hnd.key = key hnd.cur = None hnd.nxt = None if hnd.cur is None: # create task return self._create_task(hnd) else: # reschedule task if hnd.nxt is None: hnd.nxt = asyncio.Future() return hnd.nxt def _done(self, hnd): assert hnd is self._handles.get(hnd.key) assert hnd.cur.done() try: hnd.cur.result() except Exception: log.exception("task %r %r unhandled exception", self, hnd.key) nxt = hnd.nxt hnd.cur = hnd.nxt = None if nxt is None: del self._handles[hnd.key] else: cascade_future(self._create_task(hnd), nxt) @asyncio.coroutine def __iter__(self): """Coroutine for locking the internal semaphore Usage: with (yield from manager): ... Warning: after a shutdown is initiated, this function will always raise ShuttingDown() """ ctx = yield from iter(self._semaphore) if self._shutdown: with ctx: raise ShuttingDown() return ctx @asyncio.coroutine def run_in_executor(self, *k): """Run a function in a separate thread (with limited concurrency) This function locks the internal semaphore and runs the provided functionc call in a separate thread (using the executor) """ with (yield from self): return (yield from asyncio.get_event_loop().run_in_executor(self._executor, *k)) def _process(self, key, reset): """Actual implementation of the job (to be reimplemented in inherited classes) The Manager class guarantees that this function cannot be called multiple times concurrently on the same key (in case the same key is submitted multiple times, they Manager will call this function a second time after it has terminated). `reset` is a function that may be called to reset the 'dirty' state of this key (this is to avoid calling ._process() a second time if not necessary) """ raise NotImplementedError() @asyncio.coroutine def shutdown(self): """Initiate a graceful shutdown This coroutine terminates once all tasks are properly terminated. """ self._shutdown = asyncio.Future() self._shutdown.set_exception(ShuttingDown()) self._shutdown.exception() # to avoid asyncio warnings # cancel all 'next' tasks for hnd in self._handles.values(): if hnd.nxt is not None: hnd.nxt.set_exception(self._shutdown) hnd.nxt = None yield from asyncio.gather(*(h.cur for h in self._handles.values() if h.cur is not None), return_exceptions=True) class SandboxManager(Manager): """Manager for sandbox operation This manager handles all sandbox operations (start, stop, commit). Operations are requested asynchronously in the database: - Webapp.sandbox_state=starting (for starting a sandbox) - Webapp.sandbox_state=stopping (for stopping a sandbox) - WebappVersion with state=sandbox (for committing a new image) All state changes on Webapp.sandbox_state are atomic (e.g: if the user requests a stop while the sandbox is starting then the manager finishes with starting the sandbox but does not update the state, and it runs immediately again to stop the sandbox) Whatever is the value of Webapp.sandbox_state, the manager first examines commit requests and makes the commit if requested. If the container already exists before starting the webapp (not possible in normal operations), then a recovery image is committed first. When a commit is successful. The image manager is notified for pushing the image to the registry and (if it is not a recovery image) to pull it to the swarm (because this image will very likely be used soon). State changes: - sandbox start: starting->running (normal case) starting->start_error (error case) - sandbox stop: stopping->idle (normal case) stopping->stop_error (error case) - image commit: sandbox->committed (normal case) sandbox->error (error case) (none)->committed (recovery version) """ def __init__(self, ctrl, nb_threads = NB_SANDBOX_TASKS): super().__init__(nb_threads) self.ctrl = ctrl def inspect_sandbox(self, webapp): try: return self.ctrl.sandbox.inspect_container( self.ctrl.gen_sandbox_name(webapp)) except docker.errors.NotFound: return None @staticmethod def filter_sandbox_version(query, webapp_id): """Narrow a WebappVersion query to filter the version that must be started in the sandbox""" return (query. # filter the right webapp filter_by(webapp_id=webapp_id) # keep only versions whose state is 'ready' or 'committed' .filter(WebappVersion.state.in_((int(VersionState.committed), int(VersionState.ready)))) # take the latest one .order_by(desc(WebappVersion.id)).limit(1)) @staticmethod def filter_commit_version(query, webapp_id): """Narrow a WebappVersion query to select the candidate versions to be committed""" return (query. filter_by(webapp_id=webapp_id, state = int(VersionState.sandbox)) ) def _start(self, webapp, version): """Start a webapp sandbox (to be executed in a thread pool) """ ctrl = self.ctrl ses = ctrl.session # prepare sandbox parameters # docker image if version is None: image = "%s/factory/%s:%s" % (self.ctrl.registry, webapp.docker_os.docker_name, webapp.docker_os.version) else: image = "%s:%s" % (webapp.image_name, version.number) log.debug("sandbox %r: using image %r", webapp.docker_name, image) # safety checks # (because docker_name is used it the paths of the external volumes if ("/" in webapp.docker_name) or (webapp.docker_name in ("", ".", "..")): raise Error("malformatted docker_name") pipebin = os.path.join(ctrl.sandbox_path, "bin") pipedir = os.path.join(ctrl.sandbox_path, "srvdir", webapp.docker_name) uid = webapp.id + 2000 if uid < 2000: # just for safety raise Error("bad webapp id") # remove stale container (if any) if self.inspect_sandbox(webapp) is not None: self._stop(webapp) container = webapp.sandbox_name try: # prepare the sandbox # (create files for pipesrv) ctrl.sandbox.create_container("busybox:latest", name=container, command = ["/bin/sh", "-c", "set -x ;rm -rf -- /.pipedir/* && chmod 0700 /.pipedir && mkfifo -m 0600 /.pipedir/srv && chown -R %d:65534 /.pipedir" % uid], host_config = ctrl.sandbox.create_host_config( binds = {pipedir: {"bind": "/.pipedir"}} )) ctrl.sandbox.start(container) if ctrl.sandbox.wait(container): raise Error("sandbox preparation failed") ctrl.sandbox.remove_container(container) # create and start the sandbox ctrl.check_host_path("isdir", pipedir) ctrl.check_host_path("isdir", pipebin) command = ["/.pipebin/pipesrv", "-d", "/.pipedir"] if version is None and webapp.entrypoint: # prepend instructions to initialise a dummy entrypoint dn, bn = os.path.split(webapp.entrypoint) command = ["/bin/sh", "-c", """ {mkdir} cat > {entrypoint!r} <allgo.log & exec "$@" >"$fifo" 2>&1 & rm "$fifo" wait %1 wait %2 else "$@" 2>&1 | cat >allgo.log fi """, "job%d" % job.id, webapp.entrypoint] + job.param.split(), host_config = ctrl.sandbox.create_host_config( binds = {job_path: {"bind": "/tmp"}}, cap_drop = ["all"], # FIXME: CAP_DAC_OVERRIDE needed because all nfs files have uid,gid=1000,1000 cap_add = ["dac_override"], )) client.start(ctr) # TODO report exit code to the user client.wait(ctr) finally: # FIXME: if interrupted (power down) here, the job will remain in # state RUNNING forever ==> do some periodic monitoring stop_time = time.time() log.info("stop job %d (%s:%s)", job_id, webapp.docker_name, tag) with ses.begin(): job.state = int(JobState.DONE) if start_time is not None: job.exec_time = int(stop_time - start_time) webapp.exec_time = (webapp.exec_time + job.exec_time) // 2 if ctr is not None: with docker_warning("cleanup error: unable to remove container %r", ctr, ignore=docker.errors.NotFound): client.remove_container(ctr, force=True) if tmp_img is not None: with docker_warning("cleanup error: unable to remove image %r", ctr, ignore=docker.errors.NotFound): client.remove_image(tmp_img,) @asyncio.coroutine def _process(self, job_id, reset): ctrl = self.ctrl ses = ctrl.session log.debug("process job id %d", job_id) with ses.begin(): # query db job = ses.query(Job).filter_by(id=job_id).first() if job is None or job.state != JobState.WAITING: return if job.webapp is None: log.error("job %d: webapp id %r not found", job_id, job.webapp_id) job.state = JobState.DONE # TODO report error to the user return # select version if job.version == "sandbox": ver_id = None else: #TODO: replace version_id with webapp_version_id ver = ses.query(WebappVersion).filter_by( webapp_id = job.webapp_id, number = job.version).filter( WebappVersion.state != VersionState.error ).order_by(WebappVersion.id.desc()).first() if ver is None: log.error("job %d: webapp %r version %r not found", job_id, job.webapp.docker_name, job.version) job.state = int(JobState.DONE) # TODO report error to the user return ver_id = ver.id if ver_id is not None: # pull image to the swarm # FIXME: race condition: will fail if ver.state==sandbox yield from ctrl.image_manager.pull(ver_id, swarm=True) # run job yield from self.run_in_executor(self._run_job, job_id, ver_id) class PullManager(Manager): def __init__(self, nb_threads, client, name): super().__init__(nb_threads) self.client = client self.name = name def _process(self, img, reset): image, version = img log.info("pull to the %-10s %s:%s", self.name, image, version) return self.run_in_executor(self.client.pull, image, version) class PushManager(Manager): def __init__(self, nb_threads, ctrl): super().__init__(nb_threads) self.ctrl = ctrl def _process(self, version_id, reset): ses = self.ctrl.session with report_error("unable to push version id %d", version_id): with ses.begin(): # get the version object and check its state version = ses.query(WebappVersion).filter_by(id=version_id).one() if version.state != VersionState.committed: if version.state == VersionState.ready: # already pushed return if version.state == VersionState.sandbox: raise Error("unable to push (image not yet committed)") raise Error("unable to push (invalid state: %s)" % version.state) # ensure that there is no other version with the same number in the pipeline # (to avoid a race condition) others = (ses.query(WebappVersion.id) .filter_by(webapp_id=version.webapp_id, number=version.number) .filter(WebappVersion.id != version.id) .filter(WebappVersion.state.in_((int(VersionState.sandbox), int(VersionState.committed))))) if others.count(): raise Error("unable to push (there are other pushable versions with the same number: %s)" % ( " ".join(map(str, itertools.chain(*others))))) image = self.ctrl.gen_image_name(version.webapp) tag = version.number log.info("push from the %-8s %s:%s", "sandbox", image, tag) yield from self.run_in_executor(docker_check_error, self.ctrl.sandbox.push, image, tag) with ses.begin(): prev = ses.query(WebappVersion).filter_by( webapp_id = version.webapp_id, number = version.number, state = int(VersionState.ready)).scalar() log.debug("prev version id %r", (prev.id if prev else None)) if prev is None: # this is a new version version.state = VersionState.ready else: # overwrite an existing version for key in "updated_at", "changelog", "published": setattr(prev, key, getattr(version, key)) ses.delete(version) class ImageManager: def __init__(self, ctrl, nb_push_sandbox = NB_PUSH_SANDBOX, nb_pull_sandbox = NB_PULL_SANDBOX, nb_pull_swarm = NB_PULL_SWARM): self.ctrl = ctrl self.sandbox_push_manager = PushManager(nb_push_sandbox, ctrl) self.sandbox_pull_manager = PullManager(nb_pull_sandbox, ctrl.sandbox, "sandbox") self.swarm_pull_manager = PullManager(nb_pull_swarm, ctrl.swarm, "swarm") # return a future @auto_create_task @asyncio.coroutine def pull(self, version_id: int, *, swarm=False): with report_error("unable to pull version id %d to %s", version_id, ("swarm" if swarm else "sandbox")): ses = self.ctrl.session with ses.begin(): # get the version object and check its state version = ses.query(WebappVersion).filter_by(id=version_id).one() image = self.ctrl.gen_image_name(version.webapp) tag = version.number if swarm: # pull to the swarm if version.state == VersionState.committed: # must be pushed to the registry first yield from self.push(version_id) with ses.begin(): version = ses.query(WebappVersion).filter_by(id=version_id).one() if version.state != VersionState.ready: raise Error("bad version state: %s" % version.state) yield from self.swarm_pull_manager.process((image, tag)) else: # pull to the sandbox if version.state == VersionState.committed: # do not pull! return if version.state != VersionState.ready: raise Error("bad version state: %s" % version.state) yield from self.sandbox_pull_manager.process((image, tag)) # return a future def push(self, version_id: int): return self.sandbox_push_manager.process(version_id) @asyncio.coroutine def shutdown(self, **kw): yield from asyncio.gather( self.sandbox_pull_manager.shutdown(), self.sandbox_push_manager.shutdown(), self.swarm_pull_manager.shutdown(), return_exceptions=True) class DockerController: def __init__(self, sandbox_host, swarm_host, mysql_host, port, registry, env, datastore_path, sandbox_path, max_jobs): self.sandbox = docker.Client(sandbox_host) self.swarm = self.sandbox if sandbox_host == swarm_host else docker.Client(swarm_host) self._db_sessionmaker = sqlalchemy.orm.scoping.scoped_session( connect_db(mysql_host)) self._thread_local = threading.local() self.sock = socket.socket() self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.sock.bind(("0.0.0.0", port)) self.sock.listen(32) self.sock.setblocking(False) self.image_manager = ImageManager(self) self.sandbox_manager = SandboxManager(self) self.job_manager = JobManager(self, max_jobs) self.registry = registry self.env = env self.datastore_path = datastore_path self.sandbox_path = sandbox_path self._task = None self._shutdown_requested = None img = "busybox:latest" try: self.sandbox.inspect_image(img) except docker.errors.NotFound: log.info("pulling docker image %s", img) self.sandbox.pull(img) def gen_sandbox_name(self, webapp): return "%s-sandbox-%s" % (self.env, webapp.docker_name) def gen_image_name(self, webapp): return "%s/webapp/%s" % (self.registry, webapp.docker_name) def gen_job_name(self, job): return "%s-job-%d-%s" % (self.env, job.id, job.webapp.docker_name) def gen_job_path(self, job): assert self.datastore_path.endswith("/") return "%s%d/%d/%s" % ( self.datastore_path, job.user_id, job.webapp_id, job.access_token) def check_host_path(self, funcname, path, *, nolink=True): """Validate a path before using it as an external volume in docker - ensure the path is canonical: - it is absolute - it does not contain any '..' parts - it does not contain any symbolic link - ensure the path is of the right type (call os.path.`funcname`()) """ if not os.path.isabs(path): raise Error("host path %r is not absolute" % path) if path != os.path.normpath(path): raise Error("host path %r is not canonical (os.path.normpath())") ctrpath = "/vol/host" + path log.debug("ctrpath %r", ctrpath) func = getattr(os.path, funcname) if nolink and os.path.realpath(ctrpath) != os.path.normpath(ctrpath): raise Error("host path %r contains a symbolic link" % path) if not func(ctrpath): raise Error("host path %r not found (os.path.%s())" % (path, funcname)) @property def session(self): """Return the thread-local sqlalchemy session WARNING: in async functions, db transaction must not span over any 'yield from' statements (to avoid interleaving) """ return self._db_sessionmaker() def _sock_callback(self): try: while True: self.sock.accept()[0].close() except BlockingIOError: pass self.check_db() def check_db(self): log.debug("check_db") try: ses = self.session with ses.begin(): for webapp_id, in ses.execute("""SELECT webapps.id FROM webapps LEFT JOIN webapp_versions ON webapps.id=webapp_versions.webapp_id WHERE sandbox_state IN (%d,%d) OR state=%d GROUP BY webapps.id""" % ( SandboxState.starting, SandboxState.stopping, VersionState.sandbox)).fetchall(): self.sandbox_manager.process(webapp_id) for job_id, in ses.query(Job.id).filter_by(state = JobState.WAITING.value): log.debug("schedule job %d", job_id) self.job_manager.process(job_id) # TODO schedule the push of images versions in state 'committed' except sqlalchemy.exc.OperationalError as e: log.error("db error %s", e.orig.args) return def shutdown(self): if not self._shutdown_requested.done(): self._shutdown_requested.set_result(None) return self._task @asyncio.coroutine def _run(self): assert self._shutdown_requested is None try: loop = asyncio.get_event_loop() self._shutdown_requested = asyncio.Future() loop.add_signal_handler(signal.SIGTERM, self.shutdown) loop.add_signal_handler(signal.SIGINT, self.shutdown) loop.add_reader(self.sock, self._sock_callback) self.check_db() try: yield from self._shutdown_requested finally: # graceful shutdown # close socket (to disable incoming notifications) self.sock.close() # terminate all pending tasks yield from asyncio.gather( self.image_manager.shutdown(), self.sandbox_manager.shutdown(), self.job_manager.shutdown(), return_exceptions=True) finally: self.sock.close() self.sandbox.close() self.swarm.close() def run(self): asyncio.get_event_loop().run_until_complete(self._run())