Commit 07fefb25 authored by BAIRE Anthony's avatar BAIRE Anthony
Browse files

report job result to the user

fix #96
parent c4479d1d
......@@ -1025,11 +1025,22 @@ class JobManager(Manager):
code=$?
fi
wait %1
failcnt="`cat /sys/fs/cgroup/memory/memory.failcnt`"
if [ "$failcnt" -ne 0 ] ; then
echo "WARNING: out of memory (memory.failcnt=$failcnt)" >>allgo.log
fi
trap '' TERM ALRM
[ -n "$interrupted" ] || (
echo
if [ $code -eq 0 ] ; then
echo "==== ALLGO JOB SUCCESS ===="
else
echo "==== ALLGO JOB ERROR ===="
echo "process exited with code $code"
fi
failcnt="`cat /sys/fs/cgroup/memory/memory.failcnt`"
if [ "$failcnt" -ne 0 ] ; then
echo "WARNING: out of memory (memory.failcnt=$failcnt)"
fi
) >>allgo.log
exit $code
""".format(job_id=job.id),
......@@ -1100,8 +1111,12 @@ class JobManager(Manager):
job.exec_time = exec_time
job.state = int(JobState.DONE)
job.container_id = None
if job.result == JobResult.NONE:
# FIXME: maybe we should have a 'unknown' result
log.warning("job %d has not result, will fallback to 'ERROR'", info.job_id)
job.result = int(JobResult.ERROR)
log.info("done job %d (duration %fs)", info.job_id, exec_time)
log.info("done job %d (result=%s, duration=%fs)", info.job_id, JobResult(job.result).name, exec_time)
@asyncio.coroutine
......@@ -1124,10 +1139,10 @@ class JobManager(Manager):
# hard kill (after 5 seconds)
kill(signal.SIGKILL)
wait_task = asyncio.async(info.client.wait_async(info.ctr_id))
timeout_task = (asyncio.Future() if info.timeout is None
else asyncio.async(asyncio.sleep(info.timeout)))
result = None
try:
rescheduled = None
while not wait_task.done():
......@@ -1142,19 +1157,32 @@ class JobManager(Manager):
state, = ses.query(Job.state).filter_by(id=info.job_id).one()
if state == JobState.ABORTING:
yield from stop(signal.SIGTERM, "user abort")
result = result or JobResult.ABORTED
elif timeout_task.done():
# timeout !
yield from stop(signal.SIGALRM, "timeout")
result = result or JobResult.TIMEOUT
yield from asyncio.wait((wait_task, timeout_task, rescheduled),
return_when=asyncio.FIRST_COMPLETED)
returncode = wait_task.result()
log.debug("job %d exit code: %r", info.job_id, returncode)
result = result or (JobResult.SUCCESS if returncode==0 else JobResult.ERROR)
finally:
wait_task.cancel()
timeout_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
yield from wait_task
if result is not None:
ses = self.ctrl.session
with ses.begin():
ses.execute("UPDATE jobs SET result=%d WHERE id=%d AND result=%d" %
(int(result), info.job_id, int(JobResult.NONE)))
# remove container
yield from self.run_in_executor(self._remove_job, info, lock=False)
......@@ -1177,7 +1205,8 @@ class JobManager(Manager):
if job.webapp is None:
log.error("job %d: webapp id %r not found", job_id, job.webapp_id)
if state == JobState.WAITING: # pragma: nobranch
job.state = int(JobState.DONE)
job.state = int(JobState.DONE)
job.result = int(JobResult.ERROR)
job.exec_time = 0
# TODO report error to the user ?
return
......@@ -1234,6 +1263,7 @@ class JobManager(Manager):
job.state = int(JobState.DONE)
# TODO report error to the user
job.result= int(JobResult.ERROR)
return
info.ver_id = ver.id
......
......@@ -7,7 +7,7 @@ from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, Table, Text, Boolean, DateTime
from sqlalchemy.orm import relationship, sessionmaker
__all__ = "Webapp", "WebappVersion", "DockerOs", "Job", "connect_db", "SandboxState", "VersionState", "JobState", "JobQueue"
__all__ = "Webapp", "WebappVersion", "DockerOs", "Job", "connect_db", "SandboxState", "VersionState", "JobState", "JobQueue", "JobResult"
Base = declarative_base()
......@@ -37,6 +37,14 @@ class JobState(enum.IntEnum):
DELETED = 5
ABORTING = 6
class JobResult(enum.IntEnum):
NONE = 0
SUCCESS = 1
ERROR = 2
ABORTED = 3
TIMEOUT = 4
DONE = 5
class Webapp(Base):
__tablename__ = "webapps"
......@@ -109,6 +117,7 @@ class Job(Base):
webapp_id = Column(Integer, ForeignKey('webapps.id'))
user_id = Column(Integer)
state = Column(Integer)
result = Column(Integer)
param = Column(String)
version = Column(String)
exec_time = Column(Integer)
......
......@@ -532,7 +532,8 @@ class ControllerTestCase(unittest.TestCase):
with ses.begin():
job = Job(webapp=app, user_id=user_id, state=int(state),
param="-c " + shlex.quote(command), version=version, exec_time=None,
access_token=access_token, queue_id=queue_id)
access_token=access_token, queue_id=queue_id,
result=int(JobResult.NONE))
ses.add(job)
path = self.job_dir(job)
......@@ -1163,12 +1164,33 @@ class ControllerTestCase(unittest.TestCase):
with self.check_job_transition(job, J.WAITING, J.DONE, ignore_state=J.RUNNING):
self.notify()
self.check_job_output(job, "coin coin\n")
self.check_job_output(job, "coin coin\n\n==== ALLGO JOB SUCCESS ====\n")
self.check_job_output(job, "Hello World!\n", filename="hello")
self.check_job_output(job, "this is foo\n", filename="bar")
self.assertEqual(job.exec_time, 0)
@with_db
def test_job_exit_code(self, ses, app):
with preamble():
self.add_dummy_version(app, "1.0")
with part("case 1: exit(0)"):
job = self.create_job(app, "1.0", 'echo ok')
with self.check_job_transition(job, J.WAITING, J.DONE, ignore_state=J.RUNNING):
self.notify()
self.check_job_output(job, "ok\n\n==== ALLGO JOB SUCCESS ====\n")
self.assertEqual(job.result, JobResult.SUCCESS)
with part("case 1: exit(24)"):
job = self.create_job(app, "1.0", 'echo bad; exit 24', access_token="4567")
with self.check_job_transition(job, J.WAITING, J.DONE, ignore_state=J.RUNNING):
self.notify()
self.check_job_output(job, "bad\n\n==== ALLGO JOB ERROR ====\nprocess exited with code 24\n")
self.assertEqual(job.result, JobResult.ERROR)
@with_db
def test_job_alt_queue(self, ses, app):
with preamble():
......@@ -1204,6 +1226,7 @@ class ControllerTestCase(unittest.TestCase):
self.notify()
self.assertEqual(job.exec_time, 1)
self.assertEqual(job.result, JobResult.SUCCESS)
with part("case 2: container already removed"):
job = self.create_job(app, "1.0", '', state=J.RUNNING, access_token="456789")
......@@ -1213,6 +1236,7 @@ class ControllerTestCase(unittest.TestCase):
self.notify()
self.assertEqual(job.exec_time, 0)
self.assertEqual(job.result, JobResult.ERROR)
@with_db
def test_job_start_error(self, ses, app):
......@@ -1226,6 +1250,7 @@ class ControllerTestCase(unittest.TestCase):
self.assertRaises(docker.errors.NotFound, self.dk.inspect_container, self.ctrl.gen_job_name(job))
self.assertEqual(job.exec_time, 0)
self.assertEqual(job.result, JobResult.ERROR)
@with_db
......@@ -1239,6 +1264,7 @@ class ControllerTestCase(unittest.TestCase):
with self.check_log("error", r"webapp id \d+ not found"), \
self.check_job_transition(job, J.WAITING, J.DONE, timeout=1):
self.notify()
self.assertEqual(job.result, JobResult.ERROR)
@with_db
def test_job_start_unknown_version(self, ses, app):
......@@ -1249,6 +1275,7 @@ class ControllerTestCase(unittest.TestCase):
with self.check_log("error", "webapp 'test-app' version '1.1' not found"), \
self.check_job_transition(job, J.WAITING, J.DONE, timeout=1):
self.notify()
self.assertEqual(job.result, JobResult.ERROR)
@with_db
def test_job_remove_error_already_removed(self, ses, app):
......@@ -1265,6 +1292,7 @@ class ControllerTestCase(unittest.TestCase):
self.assertRaises(docker.errors.NotFound, self.dk.inspect_container, self.ctrl.gen_job_name(job))
self.assertEqual(job.exec_time, 0)
self.assertEqual(job.result, JobResult.ERROR)
@with_db
......@@ -1285,6 +1313,7 @@ class ControllerTestCase(unittest.TestCase):
self.check_job_output(job, "foo\n\n==== ALLGO JOB ABORT ====\nexiting\n")
self.assertEqual(job.exec_time, 1)
self.assertEqual(job.result, JobResult.ABORTED)
with part("case 2: graceful exit fails -> fallback to SIGKILL"):
job = self.create_job(app, "1.0", 'echo foo ; hnd() { echo not exiting ; sleep 1 ; } ; trap hnd TERM ; sleep 3600 & wait ; wait ; echo bar', access_token="456789")
......@@ -1299,6 +1328,7 @@ class ControllerTestCase(unittest.TestCase):
self.check_job_output(job, "foo\n\n==== ALLGO JOB ABORT ====\nnot exiting\n")
self.assertEqual(job.exec_time, 5)
self.assertEqual(job.result, JobResult.ABORTED)
@with_db
def test_job_timeout(self, ses, app):
......@@ -1317,6 +1347,7 @@ class ControllerTestCase(unittest.TestCase):
self.check_job_output(job, "foo\n\n==== ALLGO JOB TIMEOUT ====\nexiting\n")
self.assertEqual(job.exec_time, 3)
self.assertEqual(job.result, JobResult.TIMEOUT)
with part("case 2: graceful exit fails -> fallback to SIGKILL"):
job = self.create_job(app, "1.0", 'echo foo ; hnd() { echo not exiting ; } ; trap hnd ALRM ; sleep 3600 & wait ; wait ; echo bar',
......@@ -1327,6 +1358,7 @@ class ControllerTestCase(unittest.TestCase):
self.check_job_output(job, "foo\n\n==== ALLGO JOB TIMEOUT ====\nnot exiting\n")
self.assertEqual(job.exec_time, 7)
self.assertEqual(job.result, JobResult.TIMEOUT)
@with_db
def test_job_controller_interrupted(self, ses, app):
......@@ -1337,10 +1369,15 @@ class ControllerTestCase(unittest.TestCase):
ses.add(queue)
def test(remove=False, sema_lock=False, abort=False, timeout=False):
token = str("".join(map(str, (12345, remove, sema_lock, abort, timeout))))
def test(remove=False, sema_lock=False, abort=False, timeout=False, error=False):
token = str("".join(map(str, (12345, remove, sema_lock, abort, timeout, error))))
append = " ; trap 'kill $pid' ALRM TERM ; sleep 10 & pid=$! ; while ! wait ; do true ; done" if (abort or timeout) else ""
if abort or timeout:
append = " ; trap 'kill $pid' ALRM TERM ; sleep 10 & pid=$! ; while ! wait ; do true ; done"
elif error:
append = " ; exit 17"
else:
append = ""
queue_id = queue.id if timeout else 1
job = self.create_job(app, "1.0", 'sleep .4 ; echo "Hello World!"' + append,
access_token = token, queue_id=queue_id)
......@@ -1370,28 +1407,40 @@ class ControllerTestCase(unittest.TestCase):
if abort:
self.check_job_output(job, "\n==== ALLGO JOB ABORT ====\n")
self.assertEqual(job.exec_time, 0)
self.assertEqual(job.result, JobResult.ABORTED)
elif timeout:
self.check_job_output(job, "Hello World!\n\n==== ALLGO JOB TIMEOUT ====\n")
self.assertEqual(job.exec_time, 3)
elif not remove:
self.check_job_output(job, "Hello World!\n")
self.assertEqual(job.result, JobResult.TIMEOUT)
elif remove:
self.assertEqual(job.result, JobResult.ERROR)
elif error:
self.check_job_output(job, "Hello World!\n\n==== ALLGO JOB ERROR ====\nprocess exited with code 17\n")
self.assertEqual(job.exec_time, 0)
self.assertEqual(job.result, JobResult.ERROR)
else:
self.check_job_output(job, "Hello World!\n\n==== ALLGO JOB SUCCESS ====\n")
self.assertEqual(job.exec_time, 0)
self.assertEqual(job.result, JobResult.SUCCESS)
self.assertRaises(docker.errors.NotFound, self.dk.inspect_container, ctr)
with part("case 1: normal"):
test()
with part("case 2: container removed"):
with part("case 2: non-zero exit"):
test(error=True)
with part("case 3: container removed"):
test(remove=True)
with part("case 3: JobManager semaphore locked"):
with part("case 4: JobManager semaphore locked"):
test(sema_lock=True)
with part("case 4: job aborted"):
with part("case 5: job aborted"):
test(abort=True)
with part("case 5: job timeout"):
with part("case 6: job timeout"):
test(timeout=True)
@with_db
......@@ -1418,7 +1467,8 @@ class ControllerTestCase(unittest.TestCase):
with self.check_job_transition(job, J.WAITING, J.DONE, ignore_state=J.RUNNING):
self.notify()
self.check_job_output(job, "foo\n")
self.check_job_output(job, "foo\n\n==== ALLGO JOB SUCCESS ====\n")
self.assertEqual(job.result, JobResult.SUCCESS)
self.assertListEqual(images, self.dk.images(), msg="tmp img must not be created")
......@@ -1427,7 +1477,8 @@ class ControllerTestCase(unittest.TestCase):
with self.check_job_transition(job, J.WAITING, J.DONE, ignore_state=J.RUNNING):
self.notify()
self.check_job_output(job, "bar\n")
self.check_job_output(job, "bar\n\n==== ALLGO JOB SUCCESS ====\n")
self.assertEqual(job.result, JobResult.SUCCESS)
self.assertListEqual(images, self.dk.images(), msg="tmp img must be removed")
......@@ -1438,6 +1489,7 @@ class ControllerTestCase(unittest.TestCase):
self.notify()
self.check_job_output(job, None)
self.assertEqual(job.result, JobResult.ERROR)
self.assertListEqual(images, self.dk.images(), msg="tmp img must be removed")
......@@ -1448,6 +1500,7 @@ class ControllerTestCase(unittest.TestCase):
self.notify()
self.check_job_output(job, None)
self.assertEqual(job.result, JobResult.ERROR)
self.assertListEqual(images, self.dk.images(), msg="tmp img must be removed")
......@@ -1470,7 +1523,8 @@ class ControllerTestCase(unittest.TestCase):
with self.check_job_transition(job, J.RUNNING, J.DONE):
self.notify()
self.check_job_output(job, "hello\n")
self.check_job_output(job, "hello\n\n==== ALLGO JOB SUCCESS ====\n")
self.assertEqual(job.result, JobResult.SUCCESS)
self.assertListEqual(images, self.dk.images(), msg="tmp img must be removed")
......@@ -1555,7 +1609,7 @@ class ControllerTestCase(unittest.TestCase):
with self.check_job_transition(job, J.WAITING, J.DONE, ignore_state=J.RUNNING):
self.notify()
self.check_job_output(job, rnd+"foo\n")
self.check_job_output(job, rnd+"foo\n\n==== ALLGO JOB SUCCESS ====\n")
# wait until the image is replaced
wait_until(lambda: get_version(ver_id).state == V.REPLACED, timeout=10)
......@@ -1575,7 +1629,7 @@ class ControllerTestCase(unittest.TestCase):
with self.check_job_transition(job, J.WAITING, J.DONE, ignore_state=J.RUNNING):
self.notify()
self.check_job_output(job, rnd+"bar1\n")
self.check_job_output(job, rnd+"bar1\n\n==== ALLGO JOB SUCCESS ====\n")
# wait until the image is replaced
wait_until(lambda: get_version(ver_id).state == V.REPLACED, timeout=10)
......@@ -1600,7 +1654,7 @@ class ControllerTestCase(unittest.TestCase):
with self.check_job_transition(job, J.WAITING, J.DONE, ignore_state=J.RUNNING):
pass
self.check_job_output(job, rnd+"bar3\n")
self.check_job_output(job, rnd+"bar3\n\n==== ALLGO JOB SUCCESS ====\n")
# wait until the image is replaced
wait_until(lambda: get_version(ver_id).state == V.READY, timeout=10)
......
......@@ -43,6 +43,26 @@ class Job < ActiveRecord::Base
DELETED: 5,
}
enum result: {
# no result yet
NONE: 0,
# job finished successfully (exit 0)
SUCCESS: 1,
# job finished with an error (non zero exit)
ERROR: 2,
# job was aborted by the user
ABORTED: 3,
# job timed out
TIMEOUT: 4,
# job done (only for pre-migration jobs, because we do not now the outcome)
# (small case because it conflicts with state.DONE)
done: 5,
}
scope :not_archived, -> { where.not(state: "ARCHIVED") }
......@@ -86,6 +106,12 @@ class Job < ActiveRecord::Base
return "#{self.user_id}/#{self.webapp_id}/#{self.access_token}"
end
# status of the job
# return either .state or .result (which one is more useful)
def status
((DONE? or ARCHIVED?) ? result : state).downcase
end
def make_directory
FileUtils.mkpath(self.path)
end
......
......@@ -66,7 +66,7 @@
<td><%= link_to job.id, job, data: {main: 1} %></td>
<td><%= link_to job.webapp.name, job.webapp %></td>
<td><%= File.basename(job.job_uploads.first.job_file_file_name).truncate(30) if !job.job_uploads.first.nil? %></td>
<td><%= job.state.downcase %></td>
<td><%= job.status %></td>
<td><%= job.updated_at.strftime("%e/%m/%Y, %H:%M") %></td> <!-- TODO changer en fonction de la localisation -->
<td>
<% if job.param %>
......@@ -103,7 +103,7 @@
<td><%= link_to job.id, job, data: {main: 1} %></td>
<td><%= link_to job.webapp.name, job.webapp %></td>
<td><%= File.basename(job.job_uploads.first.job_file_file_name).truncate(30) if !job.job_uploads.first.nil? %></td>
<td><%= job.state.downcase %></td>
<td><%= job.status %></td>
<td><%= job.updated_at.strftime("%e/%m/%Y, %H:%M") %></td> <!-- TODO changer en fonction de la localisation -->
<td>
<% if job.param %>
......
......@@ -34,7 +34,7 @@
<td><%= link_to job.id, job, data: {main: 1} %></td>
<td><%= link_to job.webapp.name, job.webapp %></td>
<td><%= File.basename(job.job_uploads.first.job_file_file_name).truncate(30) if !job.job_uploads.first.nil? %></td>
<td><%= job.state.downcase %></td>
<td><%= job.status %></td>
<td><%= job.queue.name %></td>
<td><%= job.updated_at.strftime("%e/%m/%Y, %H:%M") %></td> <!-- TODO changer en fonction de la localisation -->
<td>
......
......@@ -15,7 +15,7 @@
<h3>App: </h3><p class="lead"><%= link_to @job.webapp.name, @job.webapp %></p>
<h3>Version: </h3><%= @job.version %> <br /><br />
<h3>Parameters: </h3><%= @job.param %> <br /><br />
<h3>Status: </h3><span id="job_state"><%= @job.state.downcase %></span> <br /><br />
<h3>Status: </h3><span id="job_state"><%= @job.status %></span> <br /><br />
<h3>Queue: </h3><%= @job.queue.name_with_timeout %> <br /><br />
<span id="zip">
<% if @job.state == "DONE" %>
......
class AddJobResult < ActiveRecord::Migration[5.0]
def up
add_column :jobs, :result, :integer, default: 0, null: false
execute 'UPDATE jobs SET result=5 WHERE state IN (3, 4);'
end
def down
remove_column :jobs, :result
end
end
......@@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema.define(version: 20171116132928) do
ActiveRecord::Schema.define(version: 20171116161215) do
create_table "datasets", force: :cascade, options: "ENGINE=InnoDB DEFAULT CHARSET=utf8" do |t|
t.string "name", null: false
......@@ -61,6 +61,7 @@ ActiveRecord::Schema.define(version: 20171116132928) do
t.integer "state", default: 0, null: false
t.string "container_id", limit: 64
t.integer "queue_id", null: false
t.integer "result", default: 0, null: false
end
create_table "quotas", force: :cascade, options: "ENGINE=InnoDB DEFAULT CHARSET=utf8" do |t|
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment