Commit d632a296 authored by BAIRE Anthony's avatar BAIRE Anthony
Browse files

allow users to abort their jobs

fix #125
parent 2cfd30b8
......@@ -1109,10 +1109,41 @@ class JobManager(Manager):
@asyncio.coroutine
def _finish_job(self, info):
# wait for container to terminate
def _finish_job(self, info, reset, rescheduled):
# wait for container termination (if running)
if info.ctr_id is not None:
yield from info.client.wait_async(info.ctr_id)
wait_task = asyncio.async(info.client.wait_async(info.ctr_id))
try:
while not wait_task.done():
# we must ensure that the job is not aborted while we are
# waiting
#
# thus we check the state in the db and monitor the
# 'rescheduled' future
rescheduled = reset()
ses = self.ctrl.session
with ses.begin():
state, = ses.query(Job.state).filter_by(id=info.job_id).one()
if state == JobState.ABORTING:
def kill(sig):
log.info("abort job %d (signal %d)", info.job_id, sig)
with docker_warning("unable to kill container %r", info.ctr_id):
info.client.kill(info.ctr_id, sig)
try:
# graceful kill
kill(signal.SIGTERM)
yield from asyncio.wait_for(wait_task, timeout=5)
except asyncio.TimeoutError:
# hard kill (after 5 seconds)
kill(signal.SIGKILL)
yield from asyncio.wait((wait_task, rescheduled),
return_when=asyncio.FIRST_COMPLETED)
finally:
wait_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
yield from wait_task
# remove container
yield from self.run_in_executor(self._remove_job, info, lock=False)
......@@ -1195,7 +1226,7 @@ class JobManager(Manager):
return
info.ver_id = ver.id
elif state == JobState.RUNNING: # pragma: nobranch
elif state in (JobState.RUNNING, JobState.ABORTING): # pragma: nobranch
# job is already started
# we do not care about the actual version_id *but* we need to
......@@ -1234,12 +1265,12 @@ class JobManager(Manager):
with info.client.request_slot(info.ctr_name, info.cpu or 0, info.mem or 0):
info.node_id = yield from info.client.wait_slot(info.ctr_name)
yield from self.run_in_executor(self._create_job, info, lock=False)
yield from self._finish_job(info)
yield from self._finish_job(info, reset, rescheduled)
elif state == JobState.RUNNING: # pragma: nobranch
elif state in (JobState.RUNNING, JobState.ABORTING): # pragma: nobranch
# the job is already running
# -> wait for its termination
yield from self._finish_job(info)
yield from self._finish_job(info, reset, rescheduled)
# NOTE: for the push/pull managers, interruptible=True guarantees that the
# managers terminate immediately, however it cannot guarantee that the
......@@ -1597,7 +1628,7 @@ class DockerController:
self.sandbox_manager.process(webapp_id)
for job_id, in ses.query(Job.id).filter(Job.state.in_(
(JobState.WAITING.value, JobState.RUNNING.value))
(JobState.WAITING.value, JobState.RUNNING.value, JobState.ABORTING.value))
).order_by(Job.state):
log.debug("schedule job %d", job_id)
self.job_manager.process(job_id)
......
......@@ -34,6 +34,8 @@ class JobState(enum.IntEnum):
RUNNING = 2
DONE = 3
ARCHIVED = 4
DELETED = 5
ABORTING = 6
class Webapp(Base):
......
......@@ -1247,15 +1247,50 @@ class ControllerTestCase(unittest.TestCase):
self.assertEqual(job.exec_time, 0)
@with_db
def test_job_abort(self, ses, app):
with preamble():
self.add_dummy_version(app, "1.0")
with part("case 1: graceful exit (SIGTERM) works)"):
job = self.create_job(app, "1.0", 'echo foo ; hnd() { echo exiting ; sleep 1 ; kill %1 ; exit ; } ; trap hnd TERM ; sleep 3600 & wait ; echo bar')
with self.check_job_transition(job, J.WAITING, J.RUNNING,):
self.notify()
with ses.begin():
job.state = int(J.ABORTING)
with self.check_job_transition(job, J.ABORTING, J.DONE):
self.notify()
self.check_job_output(job, "foo\n\n==== ALLGO JOB ABORT ====\nexiting\n")
self.assertEqual(job.exec_time, 1)
with part("case 2: graceful exit fails -> fallback to SIGKILL"):
job = self.create_job(app, "1.0", 'echo foo ; hnd() { echo not exiting ; sleep 1 ; } ; trap hnd TERM ; sleep 3600 & wait ; wait ; echo bar', access_token="456789")
with self.check_job_transition(job, J.WAITING, J.RUNNING,):
self.notify()
with ses.begin():
job.state = int(J.ABORTING)
with self.check_job_transition(job, J.ABORTING, J.DONE):
self.notify()
self.check_job_output(job, "foo\n\n==== ALLGO JOB ABORT ====\nnot exiting\n")
self.assertEqual(job.exec_time, 5)
@with_db
def test_job_controller_interrupted(self, ses, app):
with preamble():
self.add_dummy_version(app, "1.0")
def test(remove=False, sema_lock=False):
job = self.create_job(app, "1.0", 'sleep .2 ; echo "Hello World!"',
files = {"foo": "this is foo\n"}, access_token = str(12345 + remove - sema_lock))
def test(remove=False, sema_lock=False, abort=False):
job = self.create_job(app, "1.0", 'sleep .3 ; echo "Hello World!"',
files = {"foo": "this is foo\n"},
access_token = str("".join(map(str, (12345, remove, sema_lock, abort)))))
with mock.patch("shared_swarm.Client.wait_async", side_effect=controller.ShuttingDown):
......@@ -1268,17 +1303,23 @@ class ControllerTestCase(unittest.TestCase):
self.dk.remove_container(ctr, force=True)
if sema_lock:
self.ctrl.job_manager._semaphore._value = -10 # negative value to avoid race condition
if abort:
with ses.begin():
job.state = int(J.ABORTING)
time.sleep(.1) # because JobManager reschedules the task
with self.check_job_transition(job, J.RUNNING, J.DONE):
with self.check_job_transition(job, job.state, J.DONE):
self.notify()
if not remove:
if abort:
self.check_job_output(job, "\n==== ALLGO JOB ABORT ====\n")
elif not remove:
self.check_job_output(job, "Hello World!\n")
self.assertEqual(job.exec_time, 0)
self.assertRaises(docker.errors.NotFound, self.dk.inspect_container, ctr)
with part("case 1: normal"):
test()
......@@ -1288,6 +1329,9 @@ class ControllerTestCase(unittest.TestCase):
with part("case 3: JobManager semaphore locked"):
test(sema_lock=True)
with part("case 4: job aborted"):
test(abort=True)
@with_db
def test_job_from_sandbox(self, ses, app):
with preamble():
......
class JobsController < ApplicationController
before_action :set_job, only: [:show, :edit, :upload, :destroy, :update, :destroy_file, :attached_files]
before_action :set_job, only: [:show, :edit, :upload, :destroy, :update, :destroy_file, :attached_files, :abort]
# FIXME: should allow creating a job using the webapp access_token
before_action :authenticate_user!
#skip_before_action :verify_authenticity_token
......@@ -145,6 +145,15 @@ class JobsController < ApplicationController
end
end
# POST /jobs/1/abort
def abort
@job.abort
respond_to do |format|
format.html { redirect_to @job }
format.json { head :no_content }
end
end
# DELETE /jobs/1
# DELETE /jobs/1.json
def destroy
......
......@@ -27,6 +27,9 @@ class Job < ActiveRecord::Base
# the job is running
RUNNING: 2,
# the job is being aborted
ABORTING: 6,
# the job is terminated
DONE: 3,
......@@ -65,7 +68,7 @@ class Job < ActiveRecord::Base
}, allow_blank: true
def notify_controller
if self.state == "WAITING"
if self.state == "WAITING" or self.state == "ABORTING"
DockerAdapter.notify
end
end
......@@ -138,6 +141,24 @@ class Job < ActiveRecord::Base
end
end
# abort a running job
def abort
case state
when "RUNNING"
# *atomically* set the job state to ABORTING
Job.where(id:910, state:"RUNNING").update_all(state: "ABORTING")
self.update! state: "ABORTING"
return
when "NEW"
when "WAITING"
else
# nop
return
end
raise "job is not yet started"
end
# remove a job
#
# For the moment, statistics are derived from the DB. So we never destroy a
......
......@@ -20,7 +20,7 @@
<th>Status</th>
<th>Date</th>
<th>Parameters</th>
<th>Actions</th>
<th style="text-align: center">Actions</th>
</tr>
</thead>
......@@ -40,7 +40,12 @@
<%= job.param.truncate(30) %>
<% end %>
</td>
<td><%= button_to 'Destroy', job, remote: true, method: :delete, data: {
<td style="display: flex; justify-content: center">
<%= button_to 'Abort', job_abort_path(job), remote: true, method: :post, data: {
confirm: "This will abort job #{job.id}\n\nAre you sure?"
}, class: 'btn btn-danger', style: "#{job.state=="RUNNING" ? '' : 'visibility: hidden'}" %>
&nbsp;&nbsp;
<%= button_to 'Destroy', job, remote: true, method: :delete, data: {
confirm: "This will permanently destroy job #{job.id}\n\nAre you sure?"
}, disabled: (job.state == "RUNNING" or job.state == "WAITING"), class: 'btn btn-danger' %></td>
</tr>
......
......@@ -34,15 +34,22 @@
<div class="col-md-6">
<% msg = case @job.state
when "NEW"
destroyable = true
"Job not yet submitted"
when "WAITING"
destroyable = true
"Job queued"
when "RUNNING"
destroyable = false
"Work in progress, estimated time #{@job.webapp.exec_time}s"
# FIXME: estimated should take in to account:
# - the elapsed time (since the job was started)
# - the waiting queue
when "ABORTING"
destroyable = false
"Abort in progress"
else
destroyable = true
nil
end %>
<% if msg %>
......@@ -55,14 +62,20 @@
<% end %>
</div>
<div class="col-md-6" style="display: flex; justify-content: flex-end;">
<% if @job.state == "RUNNING" %>
<%= button_to 'Abort', job_abort_path(@job), remote: true, method: :post, data: {
confirm: "This will abort job #{@job.id}\n\nAre you sure?"
}, class: 'btn btn-danger' %>
&nbsp;&nbsp;
<% end %>
<%= button_to 'Destroy', @job, remote: true, method: :delete, data: {
confirm: "This will permanently destroy job #{@job.id}\n\nAre you sure?"
}, disabled: (not destroyable), class: 'btn btn-danger' %>
&nbsp;&nbsp;
<%= mail_to @job.webapp.contact , "Report a problem" , class: 'btn btn-info' , subject: "problem with app #{@job.webapp.name} , the job numer #{@job.id} failed" , body: "Hi ! \n \nI contact you regarding the following job : \n#{ job_url} \nThat has been subjected to errors .
\n<<< your message here >>>
"%>
&nbsp;&nbsp;
<%= button_to 'Destroy', @job, remote: true, method: :delete, data: {
confirm: "This will permanently destroy job #{@job.id}\n\nAre you sure?"
}, disabled: (@job.state == "RUNNING" or @job.state == "WAITING"), class: 'btn btn-danger' %>
</div>
</div>
......
......@@ -59,6 +59,7 @@ Allgo::Application.routes.draw do
resources :jobs
patch 'upload', to: 'jobs#upload'
post 'upload', to: 'jobs#upload'
post 'jobs/:id/abort', to: 'jobs#abort', as: 'job_abort'
get 'job/inside_creation', to: 'jobs#inside_creation'
get 'job/:id/destroy_file', to: 'jobs#destroy_file'
get 'job/:id/attached_files', to: 'jobs#attached_files'
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment