Commit 72b97acd authored by BAIRE Anthony's avatar BAIRE Anthony
Browse files

add job queues

parent d632a296
......@@ -18,10 +18,6 @@
# bigmem_harg_limit: "8G"
#
#
# # list of apps known to have long jobs (default: [])
# long_apps: ["app999"]
#
#
# ############ swarm and sandbox configuration ################
# #
# # This part configures resource reservations for the given docker hosts
......
......@@ -1449,7 +1449,6 @@ class DockerController:
self.mem_hard_limit = cfg.get("mem_hard_limit", None, str, cast=docker.utils.parse_bytes)
self.bigmem_hard_limit = cfg.get("bigmem_hard_limit", None, str, cast=docker.utils.parse_bytes)
self.long_apps = cfg.get("long_apps", [], list)
bigmem_apps = cfg.get("bigmem_apps", [], list)
self.sandbox = SharedSwarmClient(sandbox_host, config=cfg.get("sandbox", {}, dict), alias="sandbox")
......@@ -1498,10 +1497,7 @@ class DockerController:
return "%s/webapp/%s" % (self.registry, webapp.docker_name)
def gen_job_name(self, job):
# prefix the name of the container with "long" if the webapp is listed in LONG_APPS
kind = "long" if job.webapp.docker_name in self.long_apps else "norm"
return "%s-job-%s-%d-%s" % (self.env, kind, job.id, job.webapp.docker_name)
return "%s-job-%s-%d-%s" % (self.env, job.queue.name, job.id, job.webapp.docker_name)
def gen_job_path(self, job):
return os.path.join(self.datastore_path, str(job.user_id),
......
......@@ -7,7 +7,7 @@ from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, Table, Text, Boolean, DateTime
from sqlalchemy.orm import relationship, sessionmaker
__all__ = "Webapp", "WebappVersion", "DockerOs", "Job", "connect_db", "SandboxState", "VersionState", "JobState"
__all__ = "Webapp", "WebappVersion", "DockerOs", "Job", "connect_db", "SandboxState", "VersionState", "JobState", "JobQueue"
Base = declarative_base()
......@@ -94,6 +94,13 @@ class DockerOs(Base):
"%s=%r" % (k, getattr(self, k)) for k in (
"id", "docker_name", "version"))))
class JobQueue(Base):
__tablename__ = "job_queues"
id = Column(Integer, primary_key=True)
name = Column(String)
timeout = Column(Integer)
class Job(Base):
__tablename__ = "jobs"
......@@ -106,9 +113,11 @@ class Job(Base):
exec_time = Column(Integer)
access_token = Column(String)
container_id = Column(String(64))
queue_id = Column(Integer, ForeignKey('job_queues.id'))
webapp = relationship("Webapp")
queue = relationship("JobQueue")
def __repr__(self):
return "%s(%s)" % (type(self).__name__, ", ".join((
......
......@@ -230,6 +230,10 @@ class ControllerTestCase(unittest.TestCase):
if (img.docker_name or "").startswith("test-"):
cls.session.delete(img)
for queue in cls.session.query(JobQueue):
if queue.name.startswith("test_"):
cls.session.delete(queue)
if not full:
# seed the db
deb = DockerOs(docker_name="test-busybox", version="latest")
......@@ -523,12 +527,12 @@ class ControllerTestCase(unittest.TestCase):
str(job.webapp_id), job.access_token)
def create_job(self, app, version, command="", *, state=J.WAITING,
user_id=0, access_token="0123456789", files={}):
user_id=0, access_token="0123456789", files={}, queue_id=1):
ses = self.session
with ses.begin():
job = Job(webapp=app, user_id=user_id, state=int(state),
param="-c " + shlex.quote(command), version=version, exec_time=None,
access_token=access_token)
access_token=access_token, queue_id=queue_id)
ses.add(job)
path = self.job_dir(job)
......@@ -1165,6 +1169,22 @@ class ControllerTestCase(unittest.TestCase):
self.assertEqual(job.exec_time, 0)
@with_db
def test_job_alt_queue(self, ses, app):
with preamble():
self.add_dummy_version(app, "1.0")
with ses.begin():
queue = JobQueue(name="test_alt_queue", timeout=42)
ses.add(queue)
job1 = self.create_job(app, "1.0")
job2 = self.create_job(app, "1.0", queue_id=queue.id, access_token="123456")
self.assertEqual(self.ctrl.gen_job_name(job1), "test-job-default-%d-test-app" % job1.id)
self.assertEqual(self.ctrl.gen_job_name(job2), "test-job-test_alt_queue-%d-test-app" % job2.id)
#TODO: remove after migration (now container id is stored as Job.container_id at creation time)
@with_db
def test_job_with_no_container_id(self, ses, app):
......@@ -1297,7 +1317,7 @@ class ControllerTestCase(unittest.TestCase):
with self.check_job_transition(job, J.WAITING, J.RUNNING):
self.notify()
ctr = "%s-job-norm-%d-test-app" % (ENV, job.id)
ctr = "%s-job-default-%d-test-app" % (ENV, job.id)
self.wait_created(ctr)
if remove:
self.dk.remove_container(ctr, force=True)
......@@ -1401,7 +1421,7 @@ class ControllerTestCase(unittest.TestCase):
with self.check_job_transition(job, J.WAITING, J.RUNNING):
self.notify()
ctr = "%s-job-norm-%d-test-app" % (ENV, job.id)
ctr = "%s-job-default-%d-test-app" % (ENV, job.id)
self.wait_created(ctr)
time.sleep(.1) # because JobManager reschedules the task
......
......@@ -12,6 +12,14 @@ class Api::V1::JobsController < Api::V1::ApiController
#POST /jobs.json
def create
jp = job_params
# lookup webapp
webapp = Webapp.find jp[:webapp_id]
# lookup job queue (by name)
queue = jp.extract!(:queue)[:queue]
jp[:queue_id] = queue ? JobQueue.find_by!(name: queue).id : webapp.default_job_queue_id
#TODO: multi file_url by api
if params[:files].nil? and jp[:dataset].nil? and jp[:file_url].nil? and jp[:param].nil? #no files, nothing
#render text: "you forgot to use a input file"
......@@ -76,7 +84,7 @@ class Api::V1::JobsController < Api::V1::ApiController
private
def job_params
params.require(:job).permit(:webapp_id, :dataset, :datafile, :file, :param, :status, :datasize, :file_url, webapp_attributes: [:name, :id], :files_attributes => [])
params.require(:job).permit(:webapp_id, :dataset, :datafile, :file, :param, :status, :datasize, :file_url, :queue, webapp_attributes: [:name, :id], :files_attributes => [])
end
def files_list
......
......@@ -236,7 +236,7 @@ class JobsController < ApplicationController
# Never trust parameters from the scary internet, only allow the white list through.
#for params name job[:bar] where params.require(:foo).permit(:bar)
def job_params
params.require(:job).permit(:id, :webapp_id, :datafile, :param, :state, :filename, :file, :version, :datasize, :file_url,
params.require(:job).permit(:id, :webapp_id, :datafile, :param, :state, :filename, :file, :version, :datasize, :file_url, :queue_id,
:dataset, webapp_attributes: [:name, :id])
end
......
......@@ -36,7 +36,7 @@ class WebappsController < ApplicationController
def show
if current_user
@webapp_jobs = @webapp.jobs.all
@job = Job.new(webapp_id: @webapp.id)
@job = Job.new(webapp_id: @webapp.id, queue_id: @webapp.default_job_queue_id)
@datasets = Dataset.where(user_id: current_user.id)
@q = Quota.init(current_user.id,@webapp.id)
@q_quantity = @q.quantity
......@@ -375,7 +375,7 @@ class WebappsController < ApplicationController
def webapp_params
# FIXME: do not permit to update user_id
params.require(:webapp).permit(:id, :name, :description, :contact, :entrypoint, :logo, :tag_list, :user_id, :default_quota, :source_file, :private,
:sandbox_state, :docker_os_id, :from_showapps, :readme, :input_demo, :output_demo, :update_demos, :update_parameters, :version,
:sandbox_state, :docker_os_id, :from_showapps, :readme, :input_demo, :output_demo, :update_demos, :update_parameters, :version, :default_job_queue_id,
webapp_parameters_attributes: [:id, :name, :value, :detail],
#webapp_demos_attributes: [:id, :name, :extension, :file_type],
webapp_versions_attributes: [:id, :number, :changelog] )
......
class ApplicationRecord < ActiveRecord::Base
self.abstract_class = true
end
......@@ -10,6 +10,8 @@ class Job < ActiveRecord::Base
has_many :job_uploads
accepts_nested_attributes_for :job_uploads
#inverse_of
belongs_to :queue, class_name: JobQueue
require 'fileutils'
before_create :generate_access_token
......@@ -89,7 +91,8 @@ class Job < ActiveRecord::Base
end
def self.create_in_ajax(webapp_id,user_id)
job = self.new(webapp_id: webapp_id, user_id: user_id)
job = self.new(webapp_id: webapp_id, user_id: user_id,
queue_id: Webapp.find(webapp_id).default_job_queue_id)
job.make_directory
job.datasize = 0
job.state = "NEW"
......
class JobQueue < ApplicationRecord
validates :name, uniqueness: true, format: {with: /\A[A-Za-z0-9_]+\z/}
validates_uniqueness_of :is_default, conditions: -> { where is_default: true }
validates_numericality_of :timeout, greater_than: 0, allow_nil: true
default_scope { order(:timeout) }
def self.default
where(is_default: true).first
end
def human_timeout
if timeout
d = (timeout / 1.day).days
h = (timeout % 1.day / 1.hour).hours
m = (timeout % 1.hour / 1.minute).minutes
s = (timeout % 1.minute).seconds
[d,h,m,s].select {|x| x!=0}.map{|x| x.inspect}.join(", ")
end
end
def name_with_timeout
"#{name} (#{timeout ? "<#{human_timeout}" : "no limit"})"
end
end
......@@ -42,6 +42,9 @@ class Webapp < ActiveRecord::Base
# (nil for using the original docker_os)
belongs_to :sandbox_version, class_name: "WebappVersion"
# default queue for the jobs created for this webapp
belongs_to :default_job_queue, class_name: JobQueue
#paperclip module, for logo
has_attached_file :logo,
......
......@@ -49,6 +49,12 @@
<span class="help-block">It's the last one by default</span>
</div>
</div>
<div class="row">
<%= f.label :queue_id, "Queue: ", class: "control-label col-md-2 col-md-offset-1 lead" %>
<div class="col-md-3">
<%= f.collection_select :queue_id, JobQueue.all, :id, :name_with_timeout, {}, {class: "form-control"} %>
</div>
</div>
<div class="row">
<%= f.label :param, "Parameters: ", class: "control-label col-md-2 col-md-offset-1 lead" %>
<div class="col-md-9">
......
......@@ -18,6 +18,7 @@
<th>App</th>
<th>Files</th>
<th>Status</th>
<th>Queue</th>
<th>Date</th>
<th>Parameters</th>
<th style="text-align: center">Actions</th>
......@@ -34,6 +35,7 @@
<td><%= link_to job.webapp.name, job.webapp %></td>
<td><%= File.basename(job.job_uploads.first.job_file_file_name).truncate(30) if !job.job_uploads.first.nil? %></td>
<td><%= job.state.downcase %></td>
<td><%= job.queue.name %></td>
<td><%= job.updated_at.strftime("%e/%m/%Y, %H:%M") %></td> <!-- TODO changer en fonction de la localisation -->
<td>
<% if job.param %>
......
......@@ -15,7 +15,8 @@
<h3>App: </h3><p class="lead"><%= link_to @job.webapp.name, @job.webapp %></p>
<h3>Version: </h3><%= @job.version %> <br /><br />
<h3>Parameters: </h3><%= @job.param %> <br /><br />
<h3>Status: </h3><span id="job_state"><%= @job.state.downcase %></span> <br /><br />
<h3>Status: </h3><span id="job_state"><%= @job.state.downcase %></span> <br /><br />
<h3>Queue: </h3><%= @job.queue.name_with_timeout %> <br /><br />
<span id="zip">
<% if @job.state == "DONE" %>
<h3>Download zip: </h3>
......
......@@ -8,7 +8,8 @@
<pre><%="
curl -H 'Authorization: Token token=<your_private_token>' -X POST
-F job[webapp_id]=#{@webapp.id}
-F job[param]=""
-F job[param]=\"\"
-F job[queue]=standard
-F files[0]=@test.txt
-F files[1]=@test2.csv
-F job[file_url]=<my_file_url>
......
......@@ -37,6 +37,12 @@
<span class="help-block">Which file will we have to launch? <br />If empty it will be /home/allgo/< your_app_name >.sh </span>
</div>
<div class="form-group">
<%= f.label :default_queue, class: "col-sm-2 control-label" %>
<div class="col-sm-4"><%= f.collection_select :default_job_queue_id, JobQueue.all, :id, :name_with_timeout, {}, {class: "form-control"} %></div>
<span class="help-block"></span>
</div>
<div class="form-group">
<%= f.label :tag_list, "Tags, separated by commas", class: "col-sm-2 control-label" %>
<div class="col-sm-5"><%= f.text_field :tag_list, class: "form-control",
......
class CreateJobQueues < ActiveRecord::Migration[5.0]
def up
create_table :job_queues do |t|
t.string :name, null: false
t.integer :timeout
t.boolean :is_default, null: false, default: false
t.timestamps
end
add_column :jobs, :queue_id, :integer, null: false
add_column :webapps, :default_job_queue_id, :integer, null: false
execute('INSERT INTO job_queues (id, name, is_default, created_at, updated_at)
VALUES (1, "default", 1, now(), now());')
execute('UPDATE jobs SET queue_id=1')
execute('UPDATE webapps SET default_job_queue_id=1')
end
def down
remove_column :webapps, :default_job_queue_id
remove_column :jobs, :queue_id
drop_table :job_queues
end
end
......@@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.
ActiveRecord::Schema.define(version: 20171031145600) do
ActiveRecord::Schema.define(version: 20171115133031) do
create_table "datasets", force: :cascade, options: "ENGINE=InnoDB DEFAULT CHARSET=utf8" do |t|
t.string "name", null: false
......@@ -28,6 +28,14 @@ ActiveRecord::Schema.define(version: 20171031145600) do
t.string "docker_name"
end
create_table "job_queues", force: :cascade, options: "ENGINE=InnoDB DEFAULT CHARSET=utf8" do |t|
t.string "name", null: false
t.integer "timeout"
t.boolean "is_default", default: false, null: false
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
end
create_table "job_uploads", force: :cascade, options: "ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci" do |t|
t.string "job_file_file_name"
t.string "job_file_content_type"
......@@ -52,6 +60,7 @@ ActiveRecord::Schema.define(version: 20171031145600) do
t.string "access_token"
t.integer "state", default: 0, null: false
t.string "container_id", limit: 64
t.integer "queue_id", null: false
end
create_table "quotas", force: :cascade, options: "ENGINE=InnoDB DEFAULT CHARSET=utf8" do |t|
......@@ -142,7 +151,7 @@ ActiveRecord::Schema.define(version: 20171031145600) do
create_table "webapps", force: :cascade, options: "ENGINE=InnoDB DEFAULT CHARSET=utf8" do |t|
t.string "name"
t.text "description", limit: 65535
t.text "description", limit: 65535
t.string "contact"
t.datetime "created_at"
t.datetime "updated_at"
......@@ -154,13 +163,14 @@ ActiveRecord::Schema.define(version: 20171031145600) do
t.integer "default_quota"
t.integer "docker_os_id"
t.string "docker_name"
t.boolean "readme", default: false
t.boolean "readme", default: false
t.string "entrypoint"
t.integer "exec_time"
t.boolean "private"
t.string "access_token"
t.integer "sandbox_state", default: 0, null: false
t.integer "sandbox_state", default: 0, null: false
t.integer "sandbox_version_id"
t.integer "default_job_queue_id", null: false
t.index ["docker_os_id"], name: "index_webapps_on_docker_os_id", using: :btree
t.index ["name"], name: "index_webapps_on_name", unique: true, using: :btree
t.index ["user_id"], name: "index_webapps_on_user_id", using: :btree
......
......@@ -17,6 +17,8 @@ DockerOs.create([
{ name: "ubuntu", docker_name: "ubuntu", version: "14.04"},
])
JobQueue.create!(name: "default", is_default: true)
if Rails.env.development?
now = Time.zone.now
......
FactoryGirl.define do
factory :job_queue do
name "MyString"
timeout 1
end
end
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment