Commit c5cd2bc1 authored by BAIRE Anthony's avatar BAIRE Anthony

Add an auxiliary HTTP server (allgo.aio) for serving asynchronous requests

There are two purposes:
- implement server push (using long-lived HTTP requests) for:
    - sending status updates for the jobs and sandboxes
    - live-streaming of the job logs
- have a really async implementation for pushing image manifests into
  the registry (the preliminary implementation in
  5451a6df was blocking)

It is implemented with aiohttp.web (a lighweight HTTP framework,
similar to what we can do with flask but asynchronously).

The alternative would have been to use the django channels plugin, but:
- it went through a major refactoring (v2) recently
- it requires replacing unicorn with an ASGI server (daphne)
- django-channels and daphne are not yet debian, and for the HTTP server
  i would prefer an implementation for which we have stable security
  updates

(anyway this can be ported to django-channels later)

The nginx config redirects the /aio/ path to this server (and the image
manifests pushes too).

The allgo.aio server interacts only with the nginx, reddis and django
containers. It relies totally on the django server for authenticating
the user and for accessing the mysql db (so there is no ORM).

NOTE: in this design the django server has to trust blindly the requests
coming from the allgo.aio server (for some endpoints). To prevent
security issues, the nginx configuration is modified to set the header
'X-Origin: nginx'. Thus django knowns who he can trust.

This commits implements only the image pushs. Job updated and logs
streaming will come in a later pull request.
parent 880e1e2a
......@@ -8,7 +8,7 @@ RUN apt-getq update && apt-getq install \
python3-django python3-django-allauth python3-misaka \
nginx-light zip gcc python3-dev python3-pip python3-wheel python3-mysqldb \
python-mysqldb python3-crypto gunicorn3 python3-redis python-mysqldb \
python3-crypto python3-natsort supervisor
python3-crypto python3-natsort python3-aiohttp python3-aioredis supervisor
COPY requirements.txt /tmp/
RUN cd /tmp && pip3 install -r requirements.txt && rm requirements.txt
......
#!/usr/bin/env python3
#
# This module implement an auxiliary HTTP server for serving asynchronous
# requests for allgo.
#
# There are two purposes:
# - implement server push (using long-lived HTTP requests) for:
# - sending status updates for the jobs and sandboxes
# - live-streaming of the job logs
# - have a really async implementation for pushing image manifests into
# the registry (the preliminary implementation in
# 5451a6dfdb10a2d0875179d06b43c947ebcd37b4 was blocking)
#
# It is implemented with aiohttp.web (a lighweight HTTP framework,
# similar to what we can do with flask but asynchronously).
#
# The alternative would have been to use the django channels plugin, but:
# - it went through a major refactoring (v2) recently
# - it requires replacing unicorn with an ASGI server (daphne)
# - django-channels and daphne are not yet debian, and for the HTTP server
# i would prefer an implementation for which we have stable security
# updates
#
# (anyway this can be ported to django-channels later)
#
# The nginx config redirects the /aio/ path to this server (and the image
# manifests pushes too).
#
# The allgo.aio server interacts only with the nginx, reddis and django
# containers. It relies totally on the django server for authenticating
# the user and for accessing the mysql db (so there is no ORM).
#
# NOTE: in this design the django server has to trust blindly the requests
# coming from the allgo.aio server (for some endpoints). To prevent
# security issues, the nginx configuration is modified to set the header
# 'X-Origin: nginx'. Thus django knowns who he can trust.
import http
import logging
import os
import asyncio
import aiohttp
import aiohttp.web
from aiohttp.web import Response, StreamResponse
import config.env
# TODO: return error pages with some content
log = logging.getLogger("allgo-aio")
def daemonise(fork: bool, pidfile: str):
"""Demonise the process
- fork the process if 'fork' is true
- write the pid ot 'pidfile'
"""
def write_pidfile(pid):
with open(pidfile, "w") as pf:
pf.write("%d\n" % pid)
if fork:
pid = os.fork()
if pid:
write_pidfile(pid)
os._exit(0)
else:
write_pidfile(os.getpid())
def try_set_result(fut: asyncio.Future, result):
"""set a asyncio.Future result
but do not raise an error if the result is already set
"""
if fut.done():
return False
else:
fut.set_result(result)
return True
def prepare_headers(request: aiohttp.web.Request) -> dict:
"""Prepare the headers to be forwarded to django
This function reuses the headers from the incoming aiohttp.web request and
that are related to:
- authentication (Authorization, Cookie)
- reverse-proxy (X-Forwarded-*)
Also it sets "X-Origin: aio" to make clear that this request comes from
django.
"""
headers = {"X-Origin": "aio"}
for key in (
"Authorization",
"Cookie",
"User_agent",
"X-Forwarded-For",
"X-Forwarded-Host",
"X-Forwarded-Proto"):
val = request.headers.get(key);
if val is not None:
headers[key] = val
return headers
def is_ok(response: aiohttp.ClientResponse):
"""Return true if this HTTP response is successful"""
return 200 <= response.status < 300
async def forward_response(reply: aiohttp.ClientResponse) -> aiohttp.web.Response:
"""Forward a HTTP response to the client
This is used for forwarding django's HTTP responses to the client we are
serving.
"""
return Response(status=reply.status, body=await reply.read(), headers=(
(key, val) for key, val in reply.headers.items()
if key.lower() != "content-encoding"))
class ErrorResponse(Response):
"""Simple response classes for reporting errors
eg: `ErrorResponse(status=404)` is equivalent to:
`Response(status=404, text="404: Not Found")`
"""
def __init__(self, status):
super().__init__(status=status,
text="%d: %s" % (status, http.HTTPStatus(status).phrase))
class AllgoAio:
def __init__(self, bind, loop):
self.loop = loop
self.app = aiohttp.web.Application(loop=loop)
self.django_url = "http://127.0.0.1:8000"
# aiohttp client for making request to django/registry
self.http_client = None
# aiohttp server task
self.server = None
# ---------- routes ---------- #
ar = self.app.router
ar.add_route("*", r"/v2/{repo:.*}/manifests/{tag}", self.handle_image_manifest)
self.handler = self.app.make_handler()
self.host, self.port = bind
self._shutdown_requested = None
def shutdown(self):
log.info("shutdown requested")
try_set_result(self._shutdown_requested, None)
def django_request(self, method, path, *k, **kw):
"""Create a aiohttp request object to the django server
The purpose is just to insert self.django_url in front of the path
"""
assert path.startswith("/")
return self.http_client.request(method, self.django_url+path, *k, **kw)
async def run(self, fork, pidfile):
"""main task (run the server)"""
assert self._shutdown_requested is None, "run() must not be called multiple times"
self._shutdown_requested = asyncio.Future()
log.info("starting")
try:
# create the aiohttp client
self.http_client = aiohttp.ClientSession()
# start the aiohttp server
self.server = await self.loop.create_server(self.handler,
host=self.host, port=self.port)
log.info("listening on: %s:%d", self.host, self.port)
log.info("server ready")
daemonise(fork, pidfile)
await self._shutdown_requested
finally:
log.info("shutting down")
# FIXME: may need to interrupt the pending notifications
# stop the aiohttp server
if self.server is not None:
self.server.close()
await self.server.wait_closed()
await self.app.shutdown()
await self.handler.shutdown()
await self.app.cleanup()
if self.http_client is not None:
await self.http_client.close()
log.info("shutdown complete")
async def handle_image_manifest(self, request):
"""Registry endpoint for pushing/pulling image manifests
The nginx reverse proxy is configured to forward these requests through the
django server, so that:
- we are notified when a new image is pushed
- we can later implement fine-grained permissions (tag-based rather than
repository-based)
Note: the DB is transactionally updated before the client receives the 201
response
"""
# NOTE: the registry endpoint also supports the DELETE method to delete the
# image. Since the deletion of a WebappVersion is not yet implemented, we
# do not allow the delete method here
if request.method not in ("HEAD", "GET", "PUT"):
# method not allowed
return ErrorResponse(status=405)
headers = prepare_headers(request)
headers["Content-Type"] = request.content_type
repo = request.match_info["repo"]
tag = request.match_info["tag"]
# prepare the forwarded HTTP request to the registry
forwarded_request = self.http_client.request(request.method,
config.env.ALLGO_REGISTRY_PRIVATE_URL + request.path,
headers=headers, data=await request.read())
if request.method != "PUT":
# GET, HEAD
async with forwarded_request as registry_reply:
return await forward_response(registry_reply)
else:
# PUT
# call django's pre-push hook
async with self.django_request("POST", "/jwt/pre-push",
headers=headers, params={"repo": repo, "tag": tag},
) as django_reply:
if not is_ok(django_reply):
return await forward_response(django_reply)
webapp_id = int(await django_reply.read())
# forward the registry request
async with forwarded_request as registry_reply:
if not is_ok(registry_reply):
return await forward_response(registry_reply)
# call django's post-push hook
async with self.django_request("POST", "/jwt/post-push",
headers=headers, params={"webapp_id": str(webapp_id), "tag": tag},
) as django_reply:
if is_ok(django_reply):
return await forward_response(registry_reply)
else:
return await forward_response(django_reply)
#!/usr/bin/env python3
import argparse
import asyncio
import logging
import logging.handlers
import os
import signal
from . import AllgoAio
def bind(txt):
mo = re.match("(?:(.+):)?(\d+)\Z", txt)
if mo is None:
raise ValueError()
host, port = mo.groups()
if host is None:
host = "0.0.0.0"
return host, int(port)
def init_logging(log_level, logdir):
logging.basicConfig(
level = min(log_level, logging.INFO),
format = "%(asctime)s %(levelname)-8s %(name)-24s %(message)s",
datefmt = "%Y-%b-%2d %H:%M:%S",
)
console_handler = logging.root.handlers[0]
def add_handler(level, hnd):
hnd.setLevel(level)
hnd.setFormatter(console_handler.formatter)
logging.root.addHandler(hnd)
os.makedirs(logdir, exist_ok=True)
# normal logs
add_handler(logging.INFO, logging.handlers.TimedRotatingFileHandler(
os.path.join(logdir, "aio.log"),
when = "W6", # rotate every sunday
backupCount = 52)) # keep 1 year of logs
# debug logs
if log_level <= logging.DEBUG:
add_handler(logging.DEBUG, logging.handlers.TimedRotatingFileHandler(
os.path.join(logdir, "debug.log"),
when = "D", # rotate every day
backupCount = 7)) # keep a week of logs
return console_handler
parser = argparse.ArgumentParser("allgo.aio", "Allgo asyncio server")
parser.add_argument("-b", "--bind", metavar="HOST:PORT", type=bind,
default=("0.0.0.0", 8001),
help="bind TCP socket (default: 0.0.0.0:8001)")
parser.add_argument("-d", "--debug", action="store_true",
help="enable debugging")
parser.add_argument("-v", "--verbose", action="count",
help="increase verbosity (can be used multiple times)")
parser.add_argument("-l", "--logdir", metavar="PATH", default="/vol/log/aio",
help="log dir path (default: /vol/log/aio)")
parser.add_argument("--daemon", action="store_true",
help="daemonise after startup")
parser.add_argument("--pidfile", metavar="PATH", default="/run/aio.pid",
help="daemon pid file (default: /run/aio.pid)")
args = parser.parse_args()
if args.debug:
log_level = logging.DEBUG
elif args.verbose == 1:
log_level = logging.INFO
else:
log_level = logging.WARNING
init_logging(log_level, args.logdir)
loop = asyncio.get_event_loop()
app = AllgoAio(args.bind, loop)
try:
loop.add_signal_handler(signal.SIGINT, app.shutdown)
loop.add_signal_handler(signal.SIGTERM, app.shutdown)
#loop.add_signal_handler(signal.SIGHUP, app.reload)
loop.run_until_complete(app.run(args.daemon, args.pidfile))
finally:
loop.remove_signal_handler(signal.SIGINT)
loop.remove_signal_handler(signal.SIGTERM)
#loop.remove_signal_handler(signal.SIGHUP)
......@@ -6,10 +6,11 @@ app_name = 'jwt'
urlpatterns = [
url(r'^jwt/auth$', views.jwt_auth, name="jwt_auth"), # REGISTRY_AUTH_TOKEN_REALM for docker registry
# docker registry urls
# - image manifest push/pull
url(r'^v2/(?P<repo>.*)/manifests/(?P<tag>[^/]+)$', views.registry_manifest, name="registry_manifest"),
# - default catch-all route (normally unused because the reverse-proxy is
# expected to route them directly to the registry)
# hooks for registry pull/push for image manifests
url(r'^jwt/pre-push$', views.pre_push, name="pre_push"),
url(r'^jwt/post-push$', views.post_push, name="post_push"),
# default catch-all route for docker registry urls (normally unused because
# the reverse-proxy is expected to route them directly to the registry)
url(r'^v2/', views.registry_notfound),
]
......@@ -15,95 +15,73 @@ log = logging.getLogger('jwt')
# misconfiguration)
MIN_TOKEN_SIZE = 32
@csrf_exempt
def registry_manifest(request, repo, tag):
"""Registry endpoint for pushing/pulling image manifests
The nginx reverse proxy is configured to forward these requests through the
django server, so that:
- we are notified when a new image is pushed
- we can later implement fine-grained permissions (tag-based rather than
repository-based)
Note: the DB is transactionally updated before the client receives the 201
response
"""
@csrf_exempt
def pre_push(request):
"""pre-push hook for image manifests
This endpoint is called by allgo.aio before pushing an image to the
registry.
headers = {"Content-Type": request.content_type}
for key in (
"HTTP_AUTHORIZATION",
"HTTP_CONTENT_TYPE",
"HTTP_USER_AGENT",
"HTTP_X_FORWARDED_FOR",
"HTTP_X_FORWARDED_HOST",
"HTTP_X_FORWARDED_PROTO"):
val = request.META.get(key);
if val is not None:
headers[key[5:].replace("_", "-")] = val
# NOTE: the registry endpoint also supports the DELETE method to delete the
# image. Since the deletion of a WebappVersion is not yet implemented, we
# do not allow the delete method here
if request.method not in ("HEAD", "GET", "PUT"):
# method not allowed
it returns a 200 response with the webapp_id as body if successful
"""
if request.META.get("HTTP_X_ORIGIN") != "aio":
# this endpoint is only usable by allgo.aio
return HttpResponse(status=404)
if request.method != "POST":
return HttpResponse(status=405)
# forward the HTTP request to the registry
# FIXME: this function is a blocking, i do not know if there is a way of
# doing a request without blocking the thread
# FIXME: should allow tuning the timeout value
forward_request = lambda: requests.request(request.method,
config.env.ALLGO_REGISTRY_PRIVATE_URL + request.path,
timeout=.5, headers=headers, data=request.body)
repo = request.GET["repo"]
tag = request.GET["tag"]
try:
if request.method == "PUT":
# push
# find the relevant webapp+version
webapp = Webapp.objects.filter(docker_name=repo).first()
if webapp is None:
return JsonResponse({"error": "unknown repository"}, status=404)
# abort if we have a commit in progress in the controller
# NOTE: there is still a (very narrow) conflict window if the commmit
# starts between this check and the result of this request
if WebappVersion.objects.filter(webapp=webapp, number=tag,
state__in=(WebappVersion.SANDBOX, WebappVersion.COMMITTED)
).exists():
return JsonResponse({
"error": "there is a commit in progress for %s:%s"
% (repo, tag)}, status=409)
rep = forward_request()
if rep.ok:
if request.method == "PUT":
# Save the new app in the DB
WebappVersion.objects.update_or_create(
webapp=webapp, state=WebappVersion.READY, number=tag,
defaults={"published": True})
# TODO: notify the controller
# find the relevant webapp
webapp = Webapp.objects.get(docker_name=repo)
except Webapp.DoesNotExist:
return JsonResponse({"error": "unknown repository"}, status=404)
# abort if we have a commit in progress in the controller
# NOTE: there is still a (very narrow) conflict window if the commmit
# starts between this check and the result of this request
if WebappVersion.objects.filter(webapp=webapp, number=tag,
state__in=(WebappVersion.SANDBOX, WebappVersion.COMMITTED)
).exists():
return JsonResponse({
"error": "there is a commit in progress for %s:%s"
% (repo, tag)}, status=409)
# return the webapp id
return HttpResponse(str(webapp.id))
else:
# pull
rep = forward_request()
@csrf_exempt
def post_push(request):
"""post-push hook for image manifests
This endpoint is called by allgo.aio after pushing an image to the
registry, but before the result is forwarded to the client.
It is responsible of updating the database with the new webapp version.
"""
if request.META.get("HTTP_X_ORIGIN") != "aio":
# this endpoint is only usable by allgo.aio
return HttpResponse(status=404)
if request.method != "POST":
return HttpResponse(status=405)
except requests.ConnectTimeout:
return JsonResponse({"error": "registry not available"}, status=502)
webapp_id = int(request.GET["webapp_id"])
tag = request.GET["tag"]
# forward the registry reply to the client
if "Content-Encoding" in rep.headers:
del rep.headers["Content-Encoding"]
# Save the new app version in the DB
WebappVersion.objects.update_or_create(webapp_id=webapp_id, number=tag,
state=WebappVersion.READY, defaults={"published": True})
response = HttpResponse(rep.content, status=rep.status_code)
for key, val in rep.headers.items():
response[key] = val
return response
return HttpResponse(status=204)
@csrf_exempt
def registry_notfound(request):
"""Default endpoint for all other registry urls
"""Default endpoint for all registry urls
should never be served (if the reverse-proxy is well configured)
"""
......
......@@ -2,6 +2,10 @@ upstream django
{
server 127.0.0.1:8000 fail_timeout=0;
}
upstream aio
{
server 127.0.0.1:8001 fail_timeout=0;
}
server
{
......@@ -22,10 +26,18 @@ server
proxy_buffering off;
location ~ ^/v2/.*/manifests/[^/]*$ {
proxy_pass http://django;
proxy_pass http://aio;
}
}
# allgo async endpoints
location /aio/
{
proxy_pass http://aio/aio/;
proxy_redirect off;
proxy_buffering off;
}
# allgo endpoints
# - static files served directly by nginx
# - other requests forwarded to the django server
......@@ -43,5 +55,12 @@ server
{
proxy_redirect off;
proxy_pass http://django;
# header set to distinguish between requests going directly from nginx and
# requests going through aio
#
# This is a security feature. Django trusts this value (like the
# X-Forwarded-* headers), do not remove it !
proxy_set_header X-Origin "nginx";
}
}
#!/bin/sh
pid="`cat /run/nginx.pid 2>/dev/null || true`"
do_healthcheck()
{
name="$1"
pidfile="$2"
pid="`cat "$pidfile" 2>/dev/null || true`"
if [ -z "$pid" ] || [ ! -d "/proc/$pid/" ] ; then
echo "$name is down"
exit 1
fi
}
do_healthcheck nginx /run/nginx.pid
do_healthcheck allgo.aio /run/aio.pid
if [ -z "$pid" ] || [ ! -d "/proc/$pid/" ] ; then
echo "nginx is down"
exit 1
fi
......@@ -15,6 +15,7 @@ mkdir -p \
/vol/log/nginx \
/vol/log/django \
/vol/log/gunicorn \
/vol/log/aio \
/vol/cache/allgo \
/vol/cache/nginx
......@@ -38,6 +39,10 @@ fi
step "wait until mysql is ready"
wait-mysql
# start the asyncio server
step "start the allgo.aio daemon"
python3 -m allgo.aio --daemon
# start allgo
case "$ALLGO_HTTP_SERVER" in
django)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment