Commit 01dd48e6 authored by BAIRE Anthony's avatar BAIRE Anthony
Browse files

Use the redis db to trigger controller actions

This commit removes the old notification channel (socket listening
on port 4567), and uses the redis channel 'notify:controller' instead.

The django job creation views are updated accordingly.
parent 1bb4acf4
Pipeline #30400 failed with stage
in 41 seconds
......@@ -16,9 +16,7 @@ COPY docker-controller *.py install-examples config-example.yml /opt/allgo-docke
WORKDIR /opt/allgo-docker
CMD ["/opt/allgo-docker/docker-controller"]
EXPOSE 4567
ENV PORT="4567" \
ENV="" \
ENV ENV="" \
REGISTRY="" \
DATASTORE_PATH="/data/{ENV}/django/rw/datastore" \
SANDBOX_PATH="/data/{ENV}/ssh/cache/sandbox" \
......
......@@ -14,7 +14,6 @@ import re
import os
import shlex
import signal
import socket
import struct
import sys
import time
......@@ -78,6 +77,9 @@ REDIS_MESSAGE_WEBAPP_UPDATED = "webapp:%d"
##################################################
# interval (in seconds) for polling the db
DB_CHECK_PERIOD = 3600
log = logging.getLogger("controller")
assert MySQLdb.threadsafety >= 1
......@@ -86,6 +88,36 @@ assert MySQLdb.threadsafety >= 1
class Error(Exception):
pass
class RateLimiter:
"""Generator for rate limiting
This asynchronous iterator ensures we spend at least `period` seconds in
each iteration.
usage:
async for _ in RateLimiter(60):
...
"""
def __init__(self, period):
assert period > 0
self.period = period
self._t0 = time.monotonic()-period
def __aiter__(self):
return self
async def __anext__(self):
t1 = time.monotonic()
delay = self._t0 - t1 + self.period
if delay > 0:
log.debug("rate_limit: sleep %f seconds", delay)
await asyncio.sleep(delay)
self._t0 = t1 + delay
else:
self._t0 = t1
def docker_check_error(func, *k, **kw):
"""Wrapper for docker-py methods that produce a stream
......@@ -1607,7 +1639,7 @@ class ImageManager:
class DockerController:
def __init__(self, sandbox_host, swarm_host, mysql_host,
port, registry, env, datastore_path, sandbox_path,
registry, env, datastore_path, sandbox_path,
toolbox_path, sandbox_network, redis_host,
config_file="/vol/ro/config.yml",
):
......@@ -1653,12 +1685,6 @@ class DockerController:
self.redis_host = redis_host
self.redis_client = None
self.sock = socket.socket()
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.bind(("0.0.0.0", port))
self.sock.listen(32)
self.sock.setblocking(False)
self.image_manager = ImageManager(self, auth_config=auth_config)
self.sandbox_manager = SandboxManager(self)
self.job_manager = JobManager(self)
......@@ -1773,15 +1799,6 @@ class DockerController:
return self._db_sessionmaker()
def _sock_callback(self):
try:
while True:
self.sock.accept()[0].close()
except BlockingIOError:
pass
self.check_db()
@asyncio.coroutine
def db_startup(self):
log.info("wait until the db server is ready")
......@@ -1834,6 +1851,67 @@ class DockerController:
log.error("db error %s", e.orig.args)
return
async def periodic_db_check_task(self):
"""Periodic db check
This task is theretically not needed if we take for granted that the
redis notification channel works properly.
"""
while True:
try:
await asyncio.sleep(DB_CHECK_PERIOD)
self.check_db()
except OSError as e:
log.error("I/O error when polling the db (%s)", e)
except asyncio.CancelledError:
log.info("db periodic check task terminated")
return
except Exception:
log.exception("error in the periodic db check task")
async def redis_notification_listener_task(self):
"""Task listening for redis notifications
- subscribes to REDIS_CHANNEL_CONTROLLER
- automatically reconnects to the server (with rate limiting)
"""
async for _ in RateLimiter(60):
try:
# create redis connection and subscribe to the notification channel
conn = await aioredis.create_redis((self.redis_host, 6379))
sub, = await conn.subscribe(REDIS_CHANNEL_CONTROLLER)
log.info("subscribed to redis pub/sub channel %r" % REDIS_CHANNEL_CONTROLLER)
# force a db check (because we may have missed some notifications)
self.check_db()
async for msg in sub.iter():
log.info("redis notification: %r", msg)
try:
item_type, item_id_str = msg.split(b":")
item_id = int(item_id_str)
except ValueError as e:
log.warning("ignored malformatted notification: %r (%s)", msg, e)
continue
if item_type == b"job":
self.job_manager.process(item_id)
elif item_type == b"sandbox":
self.sandbox_manager.process(item_id)
else:
log.warning("ignored notification for unknown item %r", msg)
except OSError as e:
log.error("I/O error in the redis listener (%s)", e)
except asyncio.CancelledError:
log.info("notification task terminated")
return
except Exception:
log.exception("error in the redis notification loop")
def shutdown(self):
if not self._shutdown_requested.done():
self._shutdown_requested.set_result(None)
......@@ -1851,31 +1929,38 @@ class DockerController:
loop.add_signal_handler(signal.SIGTERM, self.shutdown)
loop.add_signal_handler(signal.SIGINT, self.shutdown)
loop.add_reader(self.sock, self._sock_callback)
yield from self.db_startup()
tasks = []
try:
# note: this call never fails (even if the redis server is down)
self.redis_client = yield from aioredis.create_reconnecting_redis(
(self.redis_host, 6379))
# start the tasks for receiving notificatiosn
# - from the redis PUBSUB channel
tasks.append(asyncio.ensure_future(self.redis_notification_listener_task()))
# - additional periodic check of the db (in case some redis
# notifications are missed)
tasks.append(asyncio.ensure_future(self.periodic_db_check_task()))
yield from self._shutdown_requested
finally:
# graceful shutdown
for task in tasks:
task.cancel()
self.swarm.shutdown()
self.sandbox.shutdown()
# close socket (to disable incoming notifications)
self.sock.close()
# terminate all pending tasks
yield from asyncio.gather(
self.image_manager.shutdown(),
self.sandbox_manager.shutdown(),
self.job_manager.shutdown(),
*tasks,
return_exceptions=True)
finally:
self.sock.close()
self.swarm.shutdown(wait=True)
self.sandbox.shutdown(wait=True)
......
......@@ -97,10 +97,6 @@ def main():
log.info("log level: %s", logging.getLevelName(log_level))
with get_envvar("PORT") as val:
port = int(val)
log.info("listening on port: %d", port)
with get_envvar("ENV") as env:
re.match("[a-z][a-z0-9_-]*\Z", env).groups()
log.info("environment: %s", env)
......@@ -150,7 +146,7 @@ def main():
logging.INFO if args.verbose else logging.WARNING))
return controller.DockerController(docker_host, swarm_host, mysql_host,
port, registry, env, datastore_path, sandbox_path,
registry, env, datastore_path, sandbox_path,
toolbox_path, sandbox_network, redis_host).run()
except config_reader.ConfigError:
log.critical("bad config")
......
......@@ -6,6 +6,7 @@ import config.env
from django.http import HttpResponse, JsonResponse, FileResponse
from django.views.decorators.csrf import csrf_exempt
from main.models import Job, AllgoUser, Webapp, JobQueue
from main.helpers import notify_controller
log = logging.getLogger('allgo')
......@@ -106,6 +107,12 @@ def jobs(request):
with open(os.path.join(odir, name), 'wb') as fb:
for chunk in file_obj.chunks():
fb.write(chunk)
# start the job
job.state = Job.WAITING
job.save()
notify_controller(job)
return JsonResponse(job_response(job, request))
......
import base64
import hashlib
import os
import redis
import redis
from django.conf import settings
import config
from .models import Job
DEFAULT_ENTROPY = 32 # number of bytes to return by default
##################################################
# redis keys
# job log
REDIS_KEY_JOB_LOG = "log:job:%d"
REDIS_KEY_JOB_STATE = "state:job:%d"
# pubsub channels for waking up allgo.aio (frontend) and the controller
# (ourselves)
REDIS_CHANNEL_AIO = "notify:aio"
REDIS_CHANNEL_CONTROLLER = "notify:controller"
# pubsub messages
REDIS_MESSAGE_JOB_UPDATED = "job:%d"
REDIS_MESSAGE_WEBAPP_UPDATED = "webapp:%d"
##################################################
# global redis connection pool
_redis_connection_pool = None
def get_ssh_data(key):
"""
Return the fingerprint and comment of a given SSH key.
......@@ -58,3 +84,28 @@ def upload_data(file_obj, job_id):
for chunk in file_data.chunks():
destination.write(chunk)
def get_redis_connection():
"Get a redis connection from the global pool"
global _redis_connection_pool
if _redis_connection_pool is None:
_redis_connection_pool = redis.ConnectionPool(
host=config.env.ALLGO_REDIS_HOST)
return redis.Redis(connection_pool=_redis_connection_pool)
def notify_controller(obj):
"""Notify the controller that an entry was updated in the db
The notification is sent through the redis pubsub channel
REDIS_CHANNEL_CONTROLLER.
"""
conn = get_redis_connection()
if isinstance(obj, Job):
conn.publish(REDIS_CHANNEL_CONTROLLER, REDIS_MESSAGE_JOB_UPDATED % obj.id)
else:
raise TypeError(obj)
......@@ -64,7 +64,7 @@ from .forms import (
WebappForm,
WebappSandboxForm,
)
from .helpers import get_ssh_data, upload_data
from .helpers import get_ssh_data, upload_data, notify_controller
from .templatetags.converters import status_icon
......@@ -564,7 +564,7 @@ class JobCreate(SuccessMessageMixin, LoginRequiredMixin, CreateView):
obj = form.save(commit=False)
obj.queue_id = form.cleaned_data.get('queue_id').id
obj.state = 0
obj.state = Job.NEW
obj.result = 0
obj.user_id = self.request.user.id
obj.webapp_id = webapp.id
......@@ -576,6 +576,11 @@ class JobCreate(SuccessMessageMixin, LoginRequiredMixin, CreateView):
latest_job = Job.objects.filter(user_id=self.request.user.id).order_by('-id')[0]
upload_data(self.request.FILES.getlist('files'), latest_job.id)
# start the job
obj.state = Job.WAITING
obj.save()
notify_controller(obj)
return super().form_valid(form)
def get_context_data(self, **kwargs):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment