Commit 467fd671 authored by BAIRE Anthony's avatar BAIRE Anthony
Browse files

allow configuring restrictions for apps that use much cpu or memory

parent f03a6e78
......@@ -21,6 +21,10 @@ ENV PORT="4567" \
ENV="" \
REGISTRY="" \
MAX_JOBS="4" \
MAX_BIGMEM_JOBS="2" \
MAX_LONG_JOBS="2" \
BIGMEM_APPS="" \
LONG_APPS="" \
CPUS="" \
MEM_SOFT_LIMIT="" \
MEM_HARD_LIMIT="" \
......
......@@ -1006,10 +1006,68 @@ class JobManager(Manager):
class JobInfo:
__slots__ = "job_id", "ver_id", "ctr_id", "version", "ctr_name", "client", "watcher"
def __init__(self, ctrl, nb_jobs=NB_JOB_TASKS):
def __init__(self, ctrl, nb_jobs=NB_JOB_TASKS,
nb_long_jobs =NB_JOB_TASKS, long_apps =(),
nb_bigmem_jobs=NB_JOB_TASKS, bigmem_apps=()):
super().__init__(nb_jobs)
self.ctrl = ctrl
self.bigmem_apps = bigmem_apps
self.long_apps = long_apps
self.bigmem_semaphore = asyncio.Semaphore(nb_bigmem_jobs)
self.long_semaphore = asyncio.Semaphore(nb_long_jobs)
@asyncio.coroutine
def __iter__(self):
raise NotImplementedError()
@asyncio.coroutine
def lock_resources(self, docker_name, *, wait=True):
"""Coroutine for locking the internal semaphores
Usage:
with (yield from self.lock_resource(docker_name)):
...
Warning:
after a shutdown is initiated, this function will always raise
ShuttingDown()
"""
if wait:
@asyncio.coroutine
def add_lock(stack, sem):
return stack.enter_context((yield from iter(sem)))
else:
@asyncio.coroutine
def add_lock(stack, sem):
try:
return stack.enter_context(
(yield from asyncio.wait_for(iter(sem), 0)))
except asyncio.TimeoutError:
# no available tokens
# (this may happen if MAX_JOBS is reduced across a restart)
# -> continue anyway
pass
stack = contextlib.ExitStack()
if docker_name in self.bigmem_apps:
yield from add_lock(stack, self.bigmem_semaphore)
log.debug("bigmem job (%s)", docker_name)
if docker_name in self.long_apps:
yield from add_lock(stack, self.long_semaphore)
log.debug("long job (%s)", docker_name)
yield from add_lock(stack, self._semaphore)
if self._shutdown.done():
with stack:
raise ShuttingDown()
return stack
def _create_job(self, info):
ctrl = self.ctrl
ses = ctrl.session
......@@ -1189,6 +1247,8 @@ class JobManager(Manager):
info.version = job.version
info.ctr_name = ctrl.gen_job_name(job)
docker_name = job.webapp.docker_name
if job.version == "sandbox":
info.client = ctrl.sandbox
......@@ -1254,7 +1314,7 @@ class JobManager(Manager):
yield from ctrl.image_manager.pull(info.ver_id, swarm=True)
# lock the semaphore (to limit the total number of jobs)
with (yield from self):
with (yield from self.lock_resources(docker_name)):
yield from self.run_in_executor(self._create_job, info, lock=False)
yield from self._finish_job(info)
......@@ -1262,21 +1322,10 @@ class JobManager(Manager):
# the job is already running
# -> wait for its termination
try:
# try to lock the semaphore
semctx = yield from asyncio.wait_for(iter(self), 0)
except asyncio.TimeoutError:
# no available tokens
# (this may happen if MAX_JOBS is reduced across a restart)
# -> wait for the job anyway
# try to lock the semaphore
with (yield from self.lock_resources(docker_name, wait=False)):
yield from self._finish_job(info)
else:
# normal case
with semctx:
yield from self._finish_job(info)
# NOTE: for the push/pull managers, interruptible=True guarantees that the
# managers terminate immediately, however it cannot guarantee that the
# process will terminate immediately because the ThreadPoolExecuter installs
......@@ -1433,7 +1482,9 @@ class DockerController:
def __init__(self, sandbox_host, swarm_host, mysql_host,
port, registry, env, datastore_path, sandbox_path,
toolbox_path, max_jobs, sandbox_network, cpus,
mem_soft_limit, mem_hard_limit):
mem_soft_limit, mem_hard_limit,
max_long_jobs,long_apps, max_bigmem_jobs, bigmem_apps,
):
self.sandbox = docker.Client(sandbox_host)
self.sandbox_watcher = DockerWatcher(self.sandbox)
......@@ -1448,6 +1499,7 @@ class DockerController:
connect_db(mysql_host))
self._thread_local = threading.local()
self.sock = socket.socket()
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.bind(("0.0.0.0", port))
......@@ -1456,7 +1508,8 @@ class DockerController:
self.image_manager = ImageManager(self)
self.sandbox_manager = SandboxManager(self)
self.job_manager = JobManager(self, max_jobs)
self.job_manager = JobManager(self, max_jobs,
max_long_jobs, long_apps, max_bigmem_jobs, bigmem_apps)
self.cpu_quota = None if cpus is None else int(cpus * 100000)
self.cpu_period = None if cpus is None else 100000
......
......@@ -65,6 +65,22 @@ def main():
raise ValueError("path is not absolute")
return "/" + path.strip("/")
def log_var(desc, val):
log.info("%-21s %r", desc, val)
def cfg_positive_int(name, desc):
with get_envvar(name) as val:
result = int(val)
if result <= 0:
raise ValueError("must provide a positive value")
log_var(desc, result)
return result
def cfg_list(name, desc):
with get_envvar(name) as val:
result = val.split()
log_var(desc, result)
return result
with get_envvar("DEBUG") as val:
debug = bool(int(val or 0))
......@@ -105,11 +121,11 @@ def main():
toolbox_path = parse_path(val.format(ENV=env))
log.info("toolbox path %s", toolbox_path)
with get_envvar("MAX_JOBS") as val:
max_jobs = int(val) if val else 4
if max_jobs <= 0:
raise ValueError("must provide a positive value")
log.info("max concurrent jobs %d", max_jobs)
max_jobs = cfg_positive_int("MAX_JOBS", "max concurrent jobs")
max_long_jobs = cfg_positive_int("MAX_LONG_JOBS", "max long jobs")
max_bigmem_jobs = cfg_positive_int("MAX_BIGMEM_JOBS", "max bigmem jobs")
bigmem_apps = cfg_list ("BIGMEM_APPS", "bigmem apps")
long_apps = cfg_list ("LONG_APPS", "long apps")
with get_envvar("CPUS") as val:
cpus = float(val) if val else None
......@@ -150,6 +166,7 @@ def main():
return controller.DockerController(docker_host, swarm_host, mysql_host,
port, registry, env, datastore_path, sandbox_path, toolbox_path,
max_jobs, sandbox_network, cpus, mem_soft_limit, mem_hard_limit,
max_long_jobs,long_apps, max_bigmem_jobs, bigmem_apps,
).run()
finally:
lock.release()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment