Commit b63a017b authored by BAIRE Anthony's avatar BAIRE Anthony
Browse files

refactor the job controller to allow restarting the controller during jobs

parent 2bc17331
......@@ -5,6 +5,7 @@ import asyncio
import collections
from concurrent.futures import ThreadPoolExecutor
import contextlib
import datetime
import enum
import itertools
import json
......@@ -96,6 +97,25 @@ def report_error(fmt, *k):
traceback.format_exception_only(type(e), e)[-1].strip())
raise
def rate_limit(period):
"""Generator for rate limiting
This function ensures we spend at least `period` seconds for each iteration
"""
assert period > 0
t0 = time.monotonic()
while True:
yield
t1 = time.monotonic()
delay = t0 - t1 + period
if delay > 0:
time.sleep(delay)
t0 = t1 + delay
else:
t0 = t1
def auto_create_task(func):
"""Decorator for forcing the creation of a task when a coroutine function is called
......@@ -254,6 +274,8 @@ class Manager:
try:
hnd.cur.result()
except ShuttingDown:
pass
except Exception:
log.exception("task %r %r unhandled exception", self, hnd.key)
......@@ -285,14 +307,18 @@ class Manager:
@asyncio.coroutine
def run_in_executor(self, *k):
def run_in_executor(self, *k, lock=True):
"""Run a function in a separate thread (with limited concurrency)
This function locks the internal semaphore and runs the provided
functionc call in a separate thread (using the executor)
"""
with (yield from self):
return (yield from asyncio.get_event_loop().run_in_executor(self._executor, *k))
run = asyncio.get_event_loop().run_in_executor
if lock:
with (yield from self):
return (yield from run(self._executor, *k))
else:
return (yield from run(self._executor, *k))
def _process(self, key, reset):
......@@ -748,52 +774,120 @@ EOF
log.debug("done sandbox %d", webapp_id)
class DockerWatcher:
def __init__(self, client):
self._client = client
# {container_id: future}
self._futures = {}
self._loop = asyncio.get_event_loop()
self._shutdown = False
self._thread = threading.Thread(target=self._thread_func)
self._thread.daemon = True
self._thread.start()
def _thread_func(self):
# limit docker client requests to one per minute
# (in case the docker daemon has errors)
limiter = rate_limit(60)
log.debug("watcher started")
while not self._shutdown:
next(limiter)
try:
for event in self._client.events(filters={"event": "die"}):
log.debug("event %r", event)
self._loop.call_soon_threadsafe(self._event, event)
except Exception:
log.exception("docker watcher exception")
def _event(self, event_bytes):
event = json.loads(event_bytes.decode())
if event["status"] == "die":
cid = event["id"]
fut = self._futures.get(cid)
if fut is not None and not fut.done():
fut.set_result(None)
def shutdown(self):
if not self._shutdown:
self._shutdown = True
for fut in self._futures.values():
if not fut.done():
fut.set_exception(ShuttingDown())
@asyncio.coroutine
def wait(self, container_id):
log.debug("container id %s", container_id)
assert re.match(r"[0-9a-f]{64}\Z", container_id), "bad container id"
if container_id in self._futures:
raise RuntimeError("there is already a watch for this container")
if self._shutdown:
raise ShuttingDown()
self._futures[container_id] = fut = asyncio.Future()
try:
if self._client.inspect_container(container_id)["State"]["Running"]:
log.debug("wait for container: %s", container_id)
yield from fut
log.debug("container terminated: %s", container_id)
except docker.errors.NotFound:
pass
finally:
del self._futures[container_id]
class JobManager(Manager):
class JobInfo:
__slots__ = "job_id", "ver_id", "ctr_id", "version", "ctr_name", "client", "watcher"
def __init__(self, ctrl, nb_jobs=NB_JOB_TASKS):
super().__init__(nb_jobs)
self.ctrl = ctrl
def _run_job(self, job_id, ver_id):
def _create_job(self, info):
ctrl = self.ctrl
ses = ctrl.session
ctr = tmp_img = start_time = None
tmp_img = None
assert info.ctr_id is None
try:
with ses.begin():
job = ses.query(Job).filter_by(id=job_id).one()
job = ses.query(Job).filter_by(id=info.job_id).one()
webapp = job.webapp
if ver_id is None:
tag = "sandbox"
else:
ver = ses.query(WebappVersion).filter_by(id=ver_id).one()
tag = ver.number
log.info("start job %d (%s:%s)",
job_id, webapp.docker_name, tag)
info.job_id, webapp.docker_name, info.version)
start_time = time.time()
job.state = int(JobState.RUNNING)
repo = ctrl.gen_image_name(webapp)
image = "%s:%s" % (repo, tag)
image = "%s:%s" % (repo, info.version)
job_path = ctrl.gen_job_path(job)
log.debug("job.path: %r", job_path)
if ver_id is None:
client = ctrl.sandbox
image = tmp_img = client.commit(ctrl.gen_sandbox_name(webapp), repo, tag)["Id"]
else:
client = ctrl.swarm
if info.ver_id is None:
assert info.version == "sandbox"
image = tmp_img = info.client.commit(ctrl.gen_sandbox_name(webapp), repo, info.version)["Id"]
# TODO use another workdir
# TODO use another uid
ctrl.check_host_path("isdir", job_path)
ctr = client.create_container(image, name=ctrl.gen_job_name(job),
info.ctr_id = info.client.create_container(image, name=info.ctr_name,
working_dir = "/tmp", # TODO use another dir
# TODO parse quotes in job.param (instead of a rough .split())
#
......@@ -823,91 +917,193 @@ class JobManager(Manager):
""",
"job%d" % job.id, webapp.entrypoint] + job.param.split(),
labels = {"allgo.tmp_img": tmp_img or ""},
host_config = ctrl.sandbox.create_host_config(
binds = {job_path: {"bind": "/tmp"}},
cap_drop = ["all"],
# FIXME: CAP_DAC_OVERRIDE needed because all nfs files have uid,gid=1000,1000
cap_add = ["dac_override"],
))
client.start(ctr)
# TODO report exit code to the user
client.wait(ctr)
))["Id"]
info.client.start(info.ctr_id)
except:
self._remove_job(info, tmp_img=tmp_img)
raise
finally:
# FIXME: if interrupted (power down) here, the job will remain in
# state RUNNING forever ==> do some periodic monitoring
stop_time = time.time()
def _remove_job(self, info, *, tmp_img=None):
ses = self.ctrl.session
log.info("stop job %d (%s:%s)",
job_id, webapp.docker_name, tag)
# TODO: report launch errors to the user
# TODO: report exit code to the user
# TODO: use another uid
with ses.begin():
job.state = int(JobState.DONE)
def parse_docker_timestamp(value):
return datetime.datetime.strptime(
# limit the precision to the microsecond
# (othewise strptime fails)
re.sub(r"(\.\d{,6})\d*Z$", r"\1Z", value),
# iso8601 format
"%Y-%m-%dT%H:%M:%S.%fZ")
if start_time is not None:
job.exec_time = int(stop_time - start_time)
webapp.exec_time = (webapp.exec_time + job.exec_time) // 2
with ses.begin():
job = ses.query(Job).filter_by(id=info.job_id).one()
exec_time = 0.0
if info.ctr_id is not None:
try:
js = info.client.inspect_container(info.ctr_id)
except docker.errors.NotFound:
pass
else:
started_at = js["State"].get("StartedAt", "0001-")
finished_at = js["State"].get("FinishedAt")
# default docker date is '0001-01-01T00:00:00Z'
if not started_at.startswith("0001-"):
try:
exec_time =( parse_docker_timestamp(finished_at)
- parse_docker_timestamp(started_at)
).total_seconds()
except Exception:
log.exception("job %d: unable to compute exec time" % info.job_id)
if ctr is not None:
with docker_warning("cleanup error: unable to remove container %r", ctr,
ignore=docker.errors.NotFound):
if tmp_img is None:
tmp_img = js["Config"]["Labels"].get("allgo.tmp_img")
client.remove_container(ctr, force=True)
with docker_warning("job %d: cleanup error: unable to remove container", info.job_id):
info.client.remove_container(info.ctr_id)
if tmp_img is not None:
with docker_warning("cleanup error: unable to remove image %r", ctr,
ignore=docker.errors.NotFound):
with docker_warning("job %d: cleanup error: unable to remove tmp image", info.job_id):
info.client.remove_image(tmp_img)
job.exec_time = exec_time
job.state = int(JobState.DONE)
log.info("stop job %d (duration %fs)", info.job_id, exec_time)
@asyncio.coroutine
def _finish_job(self, info):
# wait for container to terminate
if info.ctr_id is not None:
yield from info.watcher.wait(info.ctr_id)
client.remove_image(tmp_img,)
# remove container
yield from self.run_in_executor(self._remove_job, info, lock=False)
@asyncio.coroutine
def _process(self, job_id, reset):
ctrl = self.ctrl
ses = ctrl.session
log.debug("process job id %d", job_id)
with ses.begin():
# query db
job = ses.query(Job).filter_by(id=job_id).first()
if job is None or job.state != JobState.WAITING:
if job is None:
# unknown job
log.warning("unknown job id %d", job_id)
return
info = self.JobInfo()
info.job_id = job_id
info.ctr_id = None
info.version = job.version
info.ctr_name = ctrl.gen_job_name(job)
if job.webapp is None:
log.error("job %d: webapp id %r not found", job_id, job.webapp_id)
job.state = JobState.DONE
# TODO report error to the user
return
# select version
if job.version == "sandbox":
ver_id = None
info.client = ctrl.sandbox
info.watcher = ctrl.sandbox_watcher
else:
#TODO: replace version_id with webapp_version_id
ver = ses.query(WebappVersion).filter_by(
webapp_id = job.webapp_id,
number = job.version).filter(
WebappVersion.state != VersionState.error
).order_by(WebappVersion.id.desc()).first()
if ver is None:
log.error("job %d: webapp %r version %r not found",
job_id, job.webapp.docker_name, job.version)
job.state = int(JobState.DONE)
info.client = ctrl.swarm
info.watcher = ctrl.swarm_watcher
state = JobState(job.state)
if state == JobState.WAITING:
# job is not yet started
if job.webapp is None:
log.error("job %d: webapp id %r not found", job_id, job.webapp_id)
job.state = JobState.DONE
# TODO report error to the user
return
ver_id = ver.id
if ver_id is not None:
# pull image to the swarm
# FIXME: race condition: will fail if ver.state==sandbox
yield from ctrl.image_manager.pull(ver_id, swarm=True)
if job.version == "sandbox":
# to be run in the sandbox
info.ver_id = None
else:
# to be run in the swarm
#TODO: replace version_id with webapp_version_id
ver = ses.query(WebappVersion).filter_by(
webapp_id = job.webapp_id,
number = job.version).filter(
WebappVersion.state != VersionState.error
).order_by(WebappVersion.id.desc()).first()
if ver is None:
log.error("job %d: webapp %r version %r not found",
job_id, job.webapp.docker_name, job.version)
job.state = int(JobState.DONE)
# TODO report error to the user
return
info.ver_id = ver.id
elif state == JobState.RUNNING:
# job is already started
# we do not care about the actual version_id *but* we need to
# know whether we are in the swarm or in the sandbox
info.ver_id = None if job.version == "sandbox" else -1
# look up container id
try:
info.ctr_id = info.client.inspect_container(info.ctr_name)["Id"]
except docker.errors.NotFound:
info.ctr_id = None
else:
# unexpected state
log.warning("job id %d is in unexpected state %s", job_id, state.name)
return
# run job
yield from self.run_in_executor(self._run_job, job_id, ver_id)
if state == JobState.WAITING:
# job is not yet started
# pull the image to the swarm
if info.ver_id is not None:
# FIXME: race condition: will fail if ver.state==sandbox
yield from ctrl.image_manager.pull(info.ver_id, swarm=True)
# lock the semaphore (to limit the total number of jobs)
with (yield from self):
yield from self.run_in_executor(self._create_job, info, lock=False)
yield from self._finish_job(info)
elif state == JobState.RUNNING:
# the job is already running
# -> wait for its termination
try:
# try to lock the semaphore
semctx = yield from asyncio.wait_for(iter(self), 0)
except asyncio.TimeoutError:
# no available tokens
# (this may happen if MAX_JOBS is reduced across a restart)
# -> wait for the job anyway
yield from self._finish_job(info)
else:
# normal case
with semctx:
yield from self._finish_job(info)
class PullManager(Manager):
......@@ -1051,8 +1247,14 @@ class DockerController:
port, registry, env, datastore_path, sandbox_path,
max_jobs):
self.sandbox = docker.Client(sandbox_host)
self.swarm = self.sandbox if sandbox_host == swarm_host else docker.Client(swarm_host)
self.sandbox = docker.Client(sandbox_host)
self.sandbox_watcher = DockerWatcher(self.sandbox)
if sandbox_host == swarm_host:
self.swarm = self.sandbox
self.swarm_watcher = self.sandbox_watcher
else:
self.swarm = docker.Client(swarm_host)
self.swarm_watcher = DockerWatcher(self.swarm)
self._db_sessionmaker = sqlalchemy.orm.scoping.scoped_session(
connect_db(mysql_host))
......@@ -1159,7 +1361,9 @@ class DockerController:
self.sandbox_manager.process(webapp_id)
for job_id, in ses.query(Job.id).filter_by(state = JobState.WAITING.value):
for job_id, in ses.query(Job.id).filter(Job.state.in_(
(JobState.WAITING.value, JobState.RUNNING.value))
).order_by(Job.state):
log.debug("schedule job %d", job_id)
self.job_manager.process(job_id)
......@@ -1196,6 +1400,10 @@ class DockerController:
# close socket (to disable incoming notifications)
self.sock.close()
# close the watchers
self.sandbox_watcher.shutdown()
self.swarm_watcher.shutdown()
# terminate all pending tasks
yield from asyncio.gather(
self.image_manager.shutdown(),
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment