Commit f19328b8 authored by BAIRE Anthony's avatar BAIRE Anthony
Browse files

implement job timeout

fix #121
parent 0a6a6ea4
......@@ -4,7 +4,7 @@ RUN echo deb http://miroir.irisa.fr/debian/ jessie-backports main >> /etc/apt/so
RUN apt-getq install python3-websocket python3-six python3-requests \
python3-mysqldb python3-sqlalchemy python3-fasteners \
python3-nose python3-coverage libjs-jquery python3-yaml \
python3-termcolor
python3-termcolor python3-iso8601
COPY *.deb /tmp/
RUN dpkg -i /tmp/*.deb
......
......@@ -21,6 +21,7 @@ import threading
import traceback
import docker
import iso8601
import MySQLdb
from sqlalchemy import desc
import sqlalchemy.orm.scoping
......@@ -920,7 +921,7 @@ exec /.toolbox/bin/sshd -D
class JobManager(Manager):
class JobInfo:
__slots__ = "job_id", "ver_id", "ctr_id", "version", "ctr_name", "client", "cpu", "mem", "node_id"
__slots__ = "job_id", "ver_id", "ctr_id", "version", "ctr_name", "client", "cpu", "mem", "node_id", "timeout"
def __init__(self, ctrl, bigmem_apps=()):
super().__init__(0)
......@@ -998,8 +999,8 @@ class JobManager(Manager):
# FIXME: maybe ">>allgo.log 2>&1" is sufficent
# - we get the exit code of the app (not the exit code of
# cat)
# - SIGTERM is forwarded to the process (and we call wait
# a second time becauce of EINTR)
# - SIGTERM & SIGALRM are forwarded to the process (and
# we call wait again becauce of EINTR)
# - we have no unusual dependencies (only sh, cat, trap,
# kill and mkfifo)
# - we display a warning if the memory limit was reached
......@@ -1009,12 +1010,13 @@ class JobManager(Manager):
interrupted=
sighnd() {{
(echo
echo '==== ALLGO JOB ABORT ===='
kill "$pid") >>allgo.log 2>&1
trap '' TERM
echo "==== ALLGO JOB $2 ===="
kill "-$1" "$pid") >>allgo.log 2>&1
trap '' TERM ALRM
interrupted=1
}}
trap sighnd TERM
trap "sighnd TERM ABORT" TERM
trap "sighnd ALRM TIMEOUT" ALRM
fifo=/.allgo.fifo.{job_id}
mkfifo "$fifo" 2>>allgo.log || exit $?
......@@ -1105,43 +1107,57 @@ class JobManager(Manager):
job.state = int(JobState.DONE)
job.container_id = None
log.info("stop job %d (duration %fs)", info.job_id, exec_time)
log.info("done job %d (duration %fs)", info.job_id, exec_time)
@asyncio.coroutine
def _finish_job(self, info, reset, rescheduled):
def _finish_job(self, info, reset):
# wait for container termination (if running)
if info.ctr_id is not None:
def kill(sig):
log.debug("kill job %d (signal %d)", info.job_id, sig)
with docker_warning("unable to kill container %r", info.ctr_id):
info.client.kill(info.ctr_id, sig)
@asyncio.coroutine
def stop(sig, reason):
log.info("stop job %d (%s)", info.job_id, reason)
try:
# graceful kill
kill(sig)
yield from asyncio.wait_for(wait_task, timeout=5)
except asyncio.TimeoutError:
# hard kill (after 5 seconds)
kill(signal.SIGKILL)
wait_task = asyncio.async(info.client.wait_async(info.ctr_id))
timeout_task = (asyncio.Future() if info.timeout is None
else asyncio.async(asyncio.sleep(info.timeout)))
try:
rescheduled = None
while not wait_task.done():
# we must ensure that the job is not aborted while we are
# waiting
#
# thus we check the state in the db and monitor the
# 'rescheduled' future
rescheduled = reset()
ses = self.ctrl.session
with ses.begin():
state, = ses.query(Job.state).filter_by(id=info.job_id).one()
if state == JobState.ABORTING:
def kill(sig):
log.info("abort job %d (signal %d)", info.job_id, sig)
with docker_warning("unable to kill container %r", info.ctr_id):
info.client.kill(info.ctr_id, sig)
try:
# graceful kill
kill(signal.SIGTERM)
yield from asyncio.wait_for(wait_task, timeout=5)
except asyncio.TimeoutError:
# hard kill (after 5 seconds)
kill(signal.SIGKILL)
yield from asyncio.wait((wait_task, rescheduled),
# we must ensure that the job is not aborted or if the
# timeout expires
if rescheduled is None or rescheduled.done():
# check the job state in the db
rescheduled = reset()
ses = self.ctrl.session
with ses.begin():
state, = ses.query(Job.state).filter_by(id=info.job_id).one()
if state == JobState.ABORTING:
yield from stop(signal.SIGTERM, "user abort")
elif timeout_task.done():
# timeout !
yield from stop(signal.SIGALRM, "timeout")
yield from asyncio.wait((wait_task, timeout_task, rescheduled),
return_when=asyncio.FIRST_COMPLETED)
finally:
wait_task.cancel()
timeout_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
yield from wait_task
......@@ -1180,6 +1196,7 @@ class JobManager(Manager):
info.node_id = None
info.version = job.version
info.ctr_name = ctrl.gen_job_name(job)
info.timeout = job.queue.timeout
# NOTE: .cpu .mem are the amount of cpu/mem requested when creating
# a new job. They do not apply to already created jobs (by a
......@@ -1233,18 +1250,33 @@ class JobManager(Manager):
# know whether we are in the swarm or in the sandbox
info.ver_id = None if job.version == "sandbox" else -1
# inspect the container so as to:
# - validate and store its id
# - adjust the timeout if necessary
if job.container_id is None:
# look up container id (if it is not yet in the db)
# pre-migration jobs
# FIXME: to be removed after migration (the container id is now stored in the db at creation time)
try:
info.ctr_id = job.container_id = info.client.inspect_container(
"%s-job-%d-%s" % (ctrl.env, job.id, job.webapp.docker_name))["Id"]
js = info.client.inspect_container("%s-job-%d-%s"
% (ctrl.env, job.id, job.webapp.docker_name))
except docker.errors.NotFound:
pass
js = None
else:
# check the presence of the container and validates its id against its name
info.ctr_id = ctrl.check_job_container(info.client, job)
js = ctrl.inspect_job_container(info.client, job)
if js is not None:
# save the id
info.ctr_id = js["Id"]
# adjust the timeout
if info.timeout is not None:
uptime = (datetime.datetime.now(datetime.timezone.utc)
- iso8601.parse_date(js["State"]["StartedAt"])).seconds
info.timeout -= uptime
log.debug("job %d uptime: %.1fs, adjusted timeout is: %.1f",
info.job_id, uptime, info.timeout)
else:
# unexpected state
if state != JobState.DONE:
......@@ -1265,12 +1297,12 @@ class JobManager(Manager):
with info.client.request_slot(info.ctr_name, info.cpu or 0, info.mem or 0):
info.node_id = yield from info.client.wait_slot(info.ctr_name)
yield from self.run_in_executor(self._create_job, info, lock=False)
yield from self._finish_job(info, reset, rescheduled)
yield from self._finish_job(info, reset)
elif state in (JobState.RUNNING, JobState.ABORTING): # pragma: nobranch
# the job is already running
# -> wait for its termination
yield from self._finish_job(info, reset, rescheduled)
yield from self._finish_job(info, reset)
# NOTE: for the push/pull managers, interruptible=True guarantees that the
# managers terminate immediately, however it cannot guarantee that the
......@@ -1506,15 +1538,15 @@ class DockerController:
def gen_factory_name(self, docker_os):
return "%s/factory/%s" % (self.registry, docker_os.docker_name)
def check_job_container(self, client, job):
"""Check if the job has a container running and ensure that it was created by us
def inspect_job_container(self, client, job):
"""inspect the underlying container of a job + safety checks
The purpose of the validation check is to prevent a DoS attack on an
arbitrary container. The job.container_id shall really be the id of the
container and its name shall start with "ENV-job".
return
- the validated container id if ok
return:
- the result of client.inspect_container() if ok
- None if the container is not found or if the container name/id is not valid
"""
assert job.container_id is not None
......@@ -1531,7 +1563,7 @@ class DockerController:
log.error("job %d references container %r whose name is invalid: %r",
job.id, cid, ctr["Name"])
else:
return cid
return ctr
return None
......
......@@ -1300,24 +1300,56 @@ class ControllerTestCase(unittest.TestCase):
self.check_job_output(job, "foo\n\n==== ALLGO JOB ABORT ====\nnot exiting\n")
self.assertEqual(job.exec_time, 5)
@with_db
def test_job_timeout(self, ses, app):
with preamble():
self.add_dummy_version(app, "1.0")
with ses.begin():
queue = JobQueue(name="test_short_queue", timeout=2)
ses.add(queue)
with part("case 1: graceful exit (SIGALRM) works)"):
job = self.create_job(app, "1.0", 'echo foo ; hnd() { echo exiting ; sleep 1 ; kill %1 ; exit ; } ; trap hnd ALRM ; sleep 3600 & wait ; echo bar',
queue_id=queue.id)
with self.check_job_transition(job, J.WAITING, J.DONE, ignore_state=J.RUNNING):
self.notify()
self.check_job_output(job, "foo\n\n==== ALLGO JOB TIMEOUT ====\nexiting\n")
self.assertEqual(job.exec_time, 3)
with part("case 2: graceful exit fails -> fallback to SIGKILL"):
job = self.create_job(app, "1.0", 'echo foo ; hnd() { echo not exiting ; } ; trap hnd ALRM ; sleep 3600 & wait ; wait ; echo bar',
queue_id=queue.id, access_token="456789")
with self.check_job_transition(job, J.WAITING, J.DONE, ignore_state=J.RUNNING):
self.notify()
self.check_job_output(job, "foo\n\n==== ALLGO JOB TIMEOUT ====\nnot exiting\n")
self.assertEqual(job.exec_time, 7)
@with_db
def test_job_controller_interrupted(self, ses, app):
with preamble():
self.add_dummy_version(app, "1.0")
with ses.begin():
queue = JobQueue(name="test_short_queue", timeout=3)
ses.add(queue)
def test(remove=False, sema_lock=False, abort=False):
job = self.create_job(app, "1.0", 'sleep .3 ; echo "Hello World!"',
files = {"foo": "this is foo\n"},
access_token = str("".join(map(str, (12345, remove, sema_lock, abort)))))
def test(remove=False, sema_lock=False, abort=False, timeout=False):
token = str("".join(map(str, (12345, remove, sema_lock, abort, timeout))))
append = " ; trap 'kill $pid' ALRM TERM ; sleep 10 & pid=$! ; while ! wait ; do true ; done" if (abort or timeout) else ""
queue_id = queue.id if timeout else 1
job = self.create_job(app, "1.0", 'sleep .4 ; echo "Hello World!"' + append,
access_token = token, queue_id=queue_id)
with mock.patch("shared_swarm.Client.wait_async", side_effect=controller.ShuttingDown):
with self.check_job_transition(job, J.WAITING, J.RUNNING):
self.notify()
ctr = "%s-job-default-%d-test-app" % (ENV, job.id)
ctr = "%s-job-%s-%d-test-app" % (ENV, job.queue.name, job.id)
self.wait_created(ctr)
if remove:
self.dk.remove_container(ctr, force=True)
......@@ -1326,17 +1358,24 @@ class ControllerTestCase(unittest.TestCase):
if abort:
with ses.begin():
job.state = int(J.ABORTING)
time.sleep(.1) # because JobManager reschedules the task
if timeout:
time.sleep(1)
else:
time.sleep(.1) # because JobManager reschedules the task
with self.check_job_transition(job, job.state, J.DONE):
self.notify()
if abort:
self.check_job_output(job, "\n==== ALLGO JOB ABORT ====\n")
self.assertEqual(job.exec_time, 0)
elif timeout:
self.check_job_output(job, "Hello World!\n\n==== ALLGO JOB TIMEOUT ====\n")
self.assertEqual(job.exec_time, 3)
elif not remove:
self.check_job_output(job, "Hello World!\n")
self.assertEqual(job.exec_time, 0)
self.assertEqual(job.exec_time, 0)
self.assertRaises(docker.errors.NotFound, self.dk.inspect_container, ctr)
......@@ -1352,6 +1391,9 @@ class ControllerTestCase(unittest.TestCase):
with part("case 4: job aborted"):
test(abort=True)
with part("case 5: job timeout"):
test(timeout=True)
@with_db
def test_job_from_sandbox(self, ses, app):
with preamble():
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment