Commit 0e301e74 authored by BAIRE Anthony's avatar BAIRE Anthony
Browse files

refactor the management of swarm/sandbox resources

- add SwarmAbstractionClient: a class that extends docker.Client and
  hides the API differences between the docker remote API and the
  swarm API. Thus a single docker engine can be used like a swarm

- add SharedSwarmClient: a class that extends SwarmAbstractionClient
  and monitors the swarm health and its resource (cpu/mem) and manages
  the resource allocation.
  - resources are partitioned in groups (to allow reserving resources
    for higher priority jobs)
  - two SharedSwarmClient can work together over TCP in a master/slave
    configuration (to allow the production and qualification platforms
    to use the same swarm without any interference)

- the controller is modified to:
  - use SharedSwarmClient to:
    - wait for the end of a job (in place of DockerWatcher)
    - manage resource reservation (LONG_APPS vs. BIGMEM_APPS vs normal
      apps) and monitor swarm health (fix #124)
    - NOTE: resources of the swarm and sandbox are now managed
      separately (2 instances of SharedSwarmClient), whereas it was
      global before (this was suboptimal)
  - rely on SwarmAbstractionClient to compute the cpu quotas
  - store the container_id of jobs into the DB (fix #128), this is a
    prerequisite to permit renaming apps in the future
  - store the class of the job (normal vs. long app) in the container
    name (for the resource management with SharedSwarmClient)
  - read the configuration from a yaml file (/vol/ro/config.yml) for:
    - cpu/mem quotas
    - swarm resources allocation policy
    - master/slave configuration
parent cfa6bb36
......@@ -3,13 +3,15 @@ FROM allgo/base-debian
RUN echo deb http://miroir.irisa.fr/debian/ jessie-backports main >> /etc/apt/sources.list && apt-getq update
RUN apt-getq install python3-websocket python3-six python3-requests \
python3-mysqldb python3-sqlalchemy python3-fasteners \
python3-nose python3-coverage libjs-jquery
python3-nose python3-coverage libjs-jquery python3-yaml \
python3-termcolor
COPY *.deb /tmp/
RUN dpkg -i /tmp/*.deb
COPY dk/. /dk/
RUN mkdir -p /opt/allgo-docker
COPY docker-controller controller.py database.py /opt/allgo-docker/
COPY docker-controller *.py install-examples config-example.yml /opt/allgo-docker/
RUN ln -s /vol/host/run/docker.sock /run/docker.sock
......@@ -20,14 +22,6 @@ EXPOSE 4567
ENV PORT="4567" \
ENV="" \
REGISTRY="" \
MAX_JOBS="4" \
MAX_BIGMEM_JOBS="2" \
MAX_LONG_JOBS="2" \
BIGMEM_APPS="" \
LONG_APPS="" \
CPUS="" \
MEM_SOFT_LIMIT="" \
MEM_HARD_LIMIT="" \
DATASTORE_PATH="/data/{ENV}/rails/rw/datastore" \
SANDBOX_PATH="/data/{ENV}/ssh/cache/sandbox" \
TOOLBOX_PATH="/data/{ENV}/toolbox/cache" \
......@@ -36,3 +30,4 @@ ENV PORT="4567" \
SWARM_HOST="unix:///run/docker.sock" \
MYSQL_HOST="{ENV}-mysql"
LABEL dk.migrate_always=1
---
# ############ jobs configuration ################
#
# # number of cpus allocated to each job (default: no limit)
# cpus: 1
#
# # soft memory limit (default: no limit)
# mem_soft_limit: "1G"
#
# # hard memory limit (default: no limit)
# mem_hard_limit: "2g"
#
#
# # list of applications that require more memory (default: [])
# bigmem_apps: ["app1", "app42"]
#
# # hard memory limit for bigmem_apps (default: no limit)
# bigmem_harg_limit: "8G"
#
#
# # list of apps known to have long jobs (default: [])
# long_apps: ["app999"]
#
#
# ############ swarm and sandbox configuration ################
# #
# # This part configures resource reservations for the given docker hosts
# # (either a swarm or a single docker engine)
# #
# # The controller may behave either as a master or a slave (to rely on another
# # controller for managing the resource allocations)
#
# # #### master role ####
# # The 'reserve' section reserves groups of resources (cpu+ram) on the swarm for
# # containers that match the pattern given in the 'match' attribute. All the
# # remaining resources (unallocated with the reserve: section) are allocated to
# # a default group that matches any container.
# #
# # When a new job is started, the controller allocates a slot in the first group
# # that matches the container name *and* that have enough free resources.
# #
# # The resources (cpu/ram) can be specified as a scalar and/or percentage value.
# # If both are provided, then the maximum value will be used.
# #
# # The percentages are expressed relative to the total resources available in
# # the swarm. If the swarm size changes, then the allocated resources are
# # recomputed.
#
# swarm:
# # controller role: master/client (default: master)
# role: master
# # TCP port to accept request from slaves (default: none)
# listen: 4568
#
# reserve:
# # reserve 1 cpu (or 25% of total cpus) + 1G ram (or 25% of total ram)
# # the the containers with a name starting with "high-"
# - match: [ name: "^high-"]
# cpu: "1 25%"
# mem: "1G 25%"
#
# # reserve 1 cpu (or 50% of total cpus) + 1G ram (or 50% of total ram)
# # the the containers with a name starting with "high-" or "medium-"
# - match: [ { name: "^high-"}, {name: "^medium-"} ]
# cpu: "1" 50%"
# mem: "1G 50%"
#
# sandbox:
# role: master
# listen: 4569
# reserve: []
#
#
# # #### slave role ####
# swarm:
# # slave role
# role: slave
# # TCP host/port to contact the controller
# connect: "prod-controller:4568"
#
# sandbox:
# role: slave
# connect: "prod-controller:4569"
#
This diff is collapsed.
#!/bin/sh
./install-examples
#!/bin/sh
./install-examples
......@@ -9,8 +9,10 @@ import sys
import fasteners
import config_reader
import controller
log = controller.log
def die(msg, *k):
......@@ -76,12 +78,6 @@ def main():
log_var(desc, result)
return result
def cfg_list(name, desc):
with get_envvar(name) as val:
result = val.split()
log_var(desc, result)
return result
with get_envvar("DEBUG") as val:
debug = bool(int(val or 0))
......@@ -121,25 +117,6 @@ def main():
toolbox_path = parse_path(val.format(ENV=env))
log.info("toolbox path %s", toolbox_path)
max_jobs = cfg_positive_int("MAX_JOBS", "max concurrent jobs")
max_long_jobs = cfg_positive_int("MAX_LONG_JOBS", "max long jobs")
max_bigmem_jobs = cfg_positive_int("MAX_BIGMEM_JOBS", "max bigmem jobs")
bigmem_apps = cfg_list ("BIGMEM_APPS", "bigmem apps")
long_apps = cfg_list ("LONG_APPS", "long apps")
with get_envvar("CPUS") as val:
cpus = float(val) if val else None
log.info("cpus %s", cpus)
with get_envvar("MEM_SOFT_LIMIT") as val:
mem_soft_limit = val or None
log.info("memory soft limit %s", mem_soft_limit)
with get_envvar("MEM_HARD_LIMIT") as val:
mem_hard_limit = val or None
log.info("memory hard limit %s", mem_hard_limit)
with get_envvar("SANDBOX_NETWORK") as val:
re.match(r"\A[\w]+[\w. _-]*[\w]+\Z", val)
sandbox_network = val
......@@ -164,10 +141,11 @@ def main():
console_handler.setLevel(logging.WARNING)
return controller.DockerController(docker_host, swarm_host, mysql_host,
port, registry, env, datastore_path, sandbox_path, toolbox_path,
max_jobs, sandbox_network, cpus, mem_soft_limit, mem_hard_limit,
max_long_jobs,long_apps, max_bigmem_jobs, bigmem_apps,
).run()
port, registry, env, datastore_path, sandbox_path,
toolbox_path, sandbox_network).run()
except config_reader.ConfigError:
log.critical("bad config")
sys.exit(1)
finally:
lock.release()
......
#!/bin/sh
CONFIG="/vol/ro/config.yml"
set -e
[ -e "$CONFIG" ] || (set -x ; install -m 0640 config-example.yml "$CONFIG" )
This diff is collapsed.
import json
import logging
import docker
#TODO patch .containers()/.inspect_container() to prepend node name in the container names
_LOCAL_ID = '0000:0000:0000:0000:0000:0000:0000:0000:0000:0000:0000:0000'
_LOCAL_NAME = "local"
log = logging.getLogger("swarm_abstraction")
class SwarmAbstractionClient(docker.Client):
"""Wrapper for docker.Client to make a single docker engine look like a swarm with a single node
The swarm API is an extension of the docker remote API, but with some
incompatibilities:
https://docs.docker.com/v1.9/swarm/api/swarm-api/#endpoints-which-behave-differently
The purpose of this class is to normalise the value of the CpuShare
parameter to (1 cpu share == 1 CPU) so that the user of the class does not
have to care whether he's talking to a swarm or a daemon.
"""
def __getattr__(self, name):
if name == "_SwarmAbstractionClient__cpu_shares_multiple":
# lazily discover the number of CpuShares per CPU
# - 1 if server is a swarm
# - 1024/NCPU if server is a docker engine
info = super().info()
if info["ServerVersion"].startswith("swarm/"):
log.info("docker host %r is a swarm", self.base_url)
value = 1
else:
log.info("docker host %r is a docker engine", self.base_url)
value = 1024 // info["NCPU"]
setattr(self, name, value)
return value
else:
s = super()
return getattr(s, "__getattr__", s.__getattribute__)(name)
def inspect_container(self, *k, **kw):
result = super().inspect_container(*k, **kw)
if self.__cpu_shares_multiple != 1:
# docker engine case
# add dummy Node info
result["Node"]={"ID": _LOCAL_ID}
# normalise CpuShares
result["HostConfig"]["CpuShares"] //= self.__cpu_shares_multiple
else:
# swarm case
# normalise CpuShares (yeah, swarm does not do it... )
result["HostConfig"]["CpuShares"] //= (1024 // result["Node"]["Cpus"])
return result
def create_container(self, *k, **kw):
if self.__cpu_shares_multiple != 1:
# normalise CpuShares
hc = kw.get("host_config")
if hc and "CpuShares" in hc:
hc["CpuShares"] *= self.__cpu_shares_multiple
# check and remove node constraint (if any)
env = kw.get("environment")
if isinstance(env, list):
for i in range(len(env)-1, -1, -1):
item = env[i]
if item.startswith("constraint:node=="):
if item != "constraint:node==" + _LOCAL_ID:
raise ValueError(item)
env.pop(i)
return super().create_container(*k, **kw)
def update_container(self, *k, cpu_shares=None, **kw):
# normalise CpuShares
if cpu_shares is not None:
cpu_shares *= self.__cpu_shares_multiple
return super().update_container(*k, cpu_shares=cpu_shares, **kw)
def containers(self, *k, **kw):
lst = super().containers(*k, **kw)
if self.__cpu_shares_multiple != 1:
# normalise container names
for ctr in lst:
ctr["Names"] = ["/%s%s" % (_LOCAL_NAME, x) for x in ctr["Names"]]
return lst
def info(self, *k, **kw):
info = super().info(*k, **kw)
if self.__cpu_shares_multiple != 1:
# show local node
info.setdefault("DriverStatus", [])
info["DriverStatus"] += [
[_LOCAL_NAME, '127.0.0.1:0'],
[' └ ID', _LOCAL_ID],
[' └ Status', 'Healthy'],
[' └ Reserved CPUs', '0 / %d' % info["NCPU"]],
[' └ Reserved Memory', '0 KiB / %d KiB' % (info["MemTotal"] // 1024)]]
return info
Client = SwarmAbstractionClient
---
reserve:
# first group: high priority (h/H)
- match: [ {name: "^h", tag: "ast" }, { name: "/H" }]
cpu: "1 25%"
mem: "1G 25%"
# second group: high + normal priority (h/H/n)
- match: [ name: "^[hHn]" ]
cpu: "1 25%"
mem: "1G 25%"
......@@ -173,7 +173,6 @@ class ControllerTestCase(unittest.TestCase):
sandbox_path = SANDBOX_HOST_PATH,
sandbox_network= SANDBOX_NETWORK,
toolbox_path = TOOLBOX_HOST_PATH,
max_jobs = 4,
)
@classmethod
......@@ -395,7 +394,7 @@ class ControllerTestCase(unittest.TestCase):
def emit(self, record):
nonlocal found
txt = record.msg % record.args
print((txt, record.levelno, regex, levelno, re.search(regex, txt)))
#print((txt, record.levelno, regex, levelno, re.search(regex, txt)))
if record.levelno == levelno and re.search(regex, txt):
found = True
hnd = Hnd()
......@@ -1166,6 +1165,35 @@ class ControllerTestCase(unittest.TestCase):
self.assertEqual(job.exec_time, 0)
#TODO: remove after migration (now container id is stored as Job.container_id at creation time)
@with_db
def test_job_with_no_container_id(self, ses, app):
with preamble():
self.add_dummy_version(app, "1.0")
with part("case 1: container still present"):
job = self.create_job(app, "1.0", '', state=J.RUNNING)
# create the container manually
cid = self.dk.create_container("%s/webapp/%s:1.0" % (REGISTRY, app.docker_name),
command="/bin/sh -c 'sleep 1;echo coin coin'",
name="%s-job-%d-%s" % (ENV, job.id, app.docker_name))
self.dk.start(cid)
with self.check_job_transition(job, J.RUNNING, J.DONE):
self.notify()
self.assertEqual(job.exec_time, 1)
with part("case 2: container already removed"):
job = self.create_job(app, "1.0", '', state=J.RUNNING, access_token="456789")
# do not create the container
with self.check_job_transition(job, J.RUNNING, J.DONE):
self.notify()
self.assertEqual(job.exec_time, 0)
@with_db
def test_job_start_error(self, ses, app):
with preamble():
......@@ -1230,11 +1258,11 @@ class ControllerTestCase(unittest.TestCase):
files = {"foo": "this is foo\n"}, access_token = str(12345 + remove - sema_lock))
with mock.patch("controller.DockerWatcher.wait", side_effect=controller.ShuttingDown):
with mock.patch("shared_swarm.Client.wait_async", side_effect=controller.ShuttingDown):
with self.check_job_transition(job, J.WAITING, J.RUNNING):
self.notify()
ctr = "%s-job-%d-test-app" % (ENV, job.id)
ctr = "%s-job-norm-%d-test-app" % (ENV, job.id)
self.wait_created(ctr)
if remove:
self.dk.remove_container(ctr, force=True)
......@@ -1325,11 +1353,11 @@ class ControllerTestCase(unittest.TestCase):
with part("case 5: sandbox job interrupted"):
job = self.create_job(app, "sandbox", access_token="785469")
with mock.patch("controller.DockerWatcher.wait", side_effect=controller.ShuttingDown):
with mock.patch("shared_swarm.Client.wait_async", side_effect=controller.ShuttingDown):
with self.check_job_transition(job, J.WAITING, J.RUNNING):
self.notify()
ctr = "%s-job-%d-test-app" % (ENV, job.id)
ctr = "%s-job-norm-%d-test-app" % (ENV, job.id)
self.wait_created(ctr)
time.sleep(.1) # because JobManager reschedules the task
......@@ -1880,108 +1908,6 @@ class ManagerTestCase(unittest.TestCase):
if not fut.done():
fut.set_exception(Exception("CANCELLED"))
class DockerWatcherTestCase(unittest.TestCase):
@as_coroutine
@asyncio.coroutine
def test_watcher(self):
# cid -> bool
done = {}
ev_queue = queue.Queue()
def report(cid, status="die"):
if cid == "EOF" or isinstance(cid, Exception):
ev_queue.put(cid)
else:
if status == "die":
done[cid] = True
ev_queue.put(('{"status":"%s", "id":"%s"}' % (status, cid)).encode())
class Client:
def events(self, filters=None):
assert filters == {"event": "die"}
while True:
ev = ev_queue.get()
if ev == "EOF":
break
if isinstance(ev, Exception):
raise ev
yield ev
def inspect_container(self, cid):
if cid == id0:
with mock.patch("docker.errors.APIError.__init__", return_value=None):
raise NotFound()
return {"State": {"Running": not done.get(cid)}}
# disable the rate limiter (i don't want the test to span over 1min)
with mock.patch("controller.rate_limit", return_value=itertools.repeat(None)) as m:
client = Client()
watcher = controller.DockerWatcher(client)
assert m.called_with(60)
id0= 64 * "0"
id1= 64 * "1"
id2= 64 * "2"
id3= 64 * "3"
id4= 64 * "4"
# container running
with self.assertRaises(asyncio.TimeoutError):
yield from asyncio.wait_for(watcher.wait(id1), 0.1)
# noop events
report(id1, "pouet")
report(id2)
with self.assertRaises(asyncio.TimeoutError):
yield from asyncio.wait_for(watcher.wait(id1), 0.1)
# listening thread EOF
report("EOF")
yield from asyncio.sleep(.1)
# listening thread error
with mock.patch("controller.log.exception") as mexc:
report(RuntimeError("blah"))
yield from asyncio.sleep(.15)
mexc.assert_called_with('docker watcher exception')
# event already watched (not supported by design)
with self.assertRaises(RuntimeError):
fut1 = asyncio.async(watcher.wait(id1))
fut2 = asyncio.async(watcher.wait(id1))
yield from asyncio.gather(fut1, fut2)
fut1.cancel()
fut2.cancel()
# container already terminated
done[id2] = True
self.assertIs((yield from asyncio.wait_for(watcher.wait(id2), 0.1)), None)
# id terminating
fut1 = asyncio.async(watcher.wait(id1))
yield from asyncio.sleep(0.1)
report(id1)
self.assertIs((yield from asyncio.wait_for(fut1, 0.1)), None)
# unknown container
yield from asyncio.wait_for(watcher.wait(id0), 0.1)
# shutdown
for i in range(3):
fut3 = asyncio.async(watcher.wait(id3))
yield from asyncio.sleep(0.1)
watcher.shutdown()
with self.assertRaises(controller.ShuttingDown):
yield from asyncio.wait_for(fut3, 0.1)
with self.assertRaises(controller.ShuttingDown):
yield from asyncio.wait_for(watcher.wait(id4), 0.1)
log.error("evueu .get %r", ev_queue.get)
# listening thread next event (to terminate the listening loop)
report(id1)
yield from asyncio.sleep(.1)
class UtilsTestCase(unittest.TestCase):
......@@ -2030,29 +1956,6 @@ class UtilsTestCase(unittest.TestCase):
m.assert_called_with()
def test_rate_limit(self):
def assert_elapsed(expected, func, *k):
t0 = time.monotonic()
func(*k)
t1 = time.monotonic()
self.assertAlmostEqual(t1-t0, expected, 2)
limiter=controller.rate_limit(0.1)
assert_elapsed(0, next, limiter)
assert_elapsed(0.1, next, limiter)
time.sleep(.02)
assert_elapsed(0.08, next, limiter)
time.sleep(.1)
assert_elapsed(0, next, limiter)
time.sleep(.07)
assert_elapsed(0.03, next, limiter)
time.sleep(.12)
assert_elapsed(0, next, limiter)
@mock.patch("controller.log")
def test_report_error(self, mlog):
merr = mlog.error
......
This diff is collapsed.
import json
import queue
import unittest
from unittest import mock
import docker
import swarm_abstraction
from swarm_abstraction import Client
_LOCAL_ID = '0000:0000:0000:0000:0000:0000:0000:0000:0000:0000:0000:0000'
_LOCAL_NAME = "local"
def fpatch(*k, **kw):
"""mock patch a single function"""
kw.setdefault("spec", ("__call__",))
return mock.patch(*k, **kw)
class SwarmAbstractionClientTestCase(unittest.TestCase):
def new_docker_client(self, ncpus=8, *, expect_cpu_shares_multiple=None, **kw):
with mock.patch("docker.Client.info", spec=["__call__"]) as m:
client = Client(**kw)
m.return_value = {
'ServerVersion': '1.13.0',
'NCPU': ncpus,
}
if expect_cpu_shares_multiple is None:
client._SwarmAbstractionClient__cpu_shares_multiple
else:
# test the discovery of the docker engine
self.assertEqual(client._SwarmAbstractionClient__cpu_shares_multiple, expect_cpu_shares_multiple)
self.assertTrue(m.called)
# ensure it is a lazy discovery
m.reset_mock()