Commit 2bf5a74d authored by BAIRE Anthony's avatar BAIRE Anthony
Browse files

apply fixes suggested by pylint

parent 2967c955
Pipeline #134131 failed with stages
in 0 seconds
......@@ -2,7 +2,7 @@
# This module implement an auxiliary HTTP server for serving asynchronous
# requests for allgo.
# There are two purposes:
# - implement server push (using long-lived HTTP requests) for:
# - sending status updates for the jobs and sandboxes
......@@ -10,26 +10,26 @@
# - have a really async implementation for pushing image manifests into
# the registry (the preliminary implementation in
# 5451a6dfdb10a2d0875179d06b43c947ebcd37b4 was blocking)
# It is implemented with aiohttp.web (a lighweight HTTP framework,
# similar to what we can do with flask but asynchronously).
# The alternative would have been to use the django channels plugin, but:
# - it went through a major refactoring (v2) recently
# - it requires replacing unicorn with an ASGI server (daphne)
# - django-channels and daphne are not yet debian, and for the HTTP server
# i would prefer an implementation for which we have stable security
# updates
# (anyway this can be ported to django-channels later)
# The nginx config redirects the /aio/ path to this server (and the image
# manifests pushes too).
# The allgo.aio server interacts only with the nginx, reddis and django
# containers. It relies totally on the django server for authenticating
# the user and for accessing the mysql db (so there is no ORM).
# NOTE: in this design the django server has to trust blindly the requests
# coming from the allgo.aio server (for some endpoints). To prevent
# security issues, the nginx configuration is modified to set the header
......@@ -87,19 +87,19 @@ def daemonise(fork: bool, pidfile: str):
- write the pid ot 'pidfile'
def write_pidfile(pid):
with open(pidfile, "w") as pf:
pf.write("%d\n" % pid)
with open(pidfile, "w") as fp:
fp.write("%d\n" % pid)
if fork:
pid = os.fork()
if pid:
os._exit(0) # pylint: disable=protected-access
def try_set_result(fut: asyncio.Future, result):
"""set a asyncio.Future result
but do not raise an error if the result is already set
if fut.done():
......@@ -127,7 +127,7 @@ def prepare_headers(request: aiohttp.web.Request) -> dict:
val = request.headers.get(key);
val = request.headers.get(key)
if val is not None:
headers[key] = val
return headers
......@@ -166,7 +166,7 @@ class RateLimiter:
class JsonSeqStreamResponse(StreamResponse):
"""aiohttp response class for streaming json objects
The objects are streamed in the application/json-seq format (RFC 7464).
......@@ -302,12 +302,12 @@ class AllgoAio:
self.job_list_condition = asyncio.Condition()
# ---------- routes ---------- #
ar =
rtr =
ar.add_route("*", r"/v2/{repo:.*}/manifests/{tag}", self.handle_image_manifest)
ar.add_route("GET", r"/aio/jobs/{job_id:\d+}/events", self.handle_job_events)
ar.add_route("GET", r"/aio/jobs/events", self.handle_job_list_events)
ar.add_route("GET", r"/aio/apps/{docker_name}/events", self.handle_webapp_events)
rtr.add_route("*", r"/v2/{repo:.*}/manifests/{tag}", self.handle_image_manifest)
rtr.add_route("GET", r"/aio/jobs/{job_id:\d+}/events", self.handle_job_events)
rtr.add_route("GET", r"/aio/jobs/events", self.handle_job_list_events)
rtr.add_route("GET", r"/aio/apps/{docker_name}/events", self.handle_webapp_events)
self.handler =, self.port = bind
......@@ -320,7 +320,7 @@ class AllgoAio:
def django_request(self, method, path, *k, **kw):
"""Create a aiohttp request object to the django server
The purpose is just to insert self.django_url in front of the path
assert path.startswith("/")
......@@ -421,7 +421,7 @@ class AllgoAio:
async def redis_notification_listener_task(self):
"""Task for listening for redis notification
- subscribes to REDIS_CHANNEL_AIO
- automatically reconnects to the server (with rate limiting)
......@@ -431,7 +431,7 @@ class AllgoAio:
# create redis connection and subscribe to the notification channel
conn = await self.create_redis()
sub, = await conn.subscribe(REDIS_CHANNEL_AIO)"subscribed to redis pub/sub channel %r" % REDIS_CHANNEL_AIO)"subscribed to redis pub/sub channel %r", REDIS_CHANNEL_AIO)
async with self.job_states_create_lock:
for item_id in self.job_states:
......@@ -505,8 +505,6 @@ class AllgoAio:
return await forward_response(django_reply)
version_id = int(await
cleanup = version_id if action == "push" else None
# forward the HTTP request to the registry
real_url = "%s/v2/%s/manifests/id%d" % (
config.env.ALLGO_REGISTRY_PRIVATE_URL, repo, version_id)
......@@ -558,7 +556,6 @@ class AllgoAio:
return Response(status=400)
log_key = REDIS_KEY_JOB_LOG % job_id
state_key = REDIS_KEY_JOB_STATE % job_id
state = None
async with self.job_states_create_lock:
job = self.job_states[job_id]
......@@ -648,7 +645,7 @@ class AllgoAio:
related to a list of jobs.
The job ids are provided in the query string (parameter 'id')
Currently only one event is defined:
......@@ -822,5 +819,3 @@ class AllgoAio:
except Exception:
log.exception("exception in handle_webapp_events(webapp_id=%d)", webapp_id)
return rep
......@@ -4,13 +4,14 @@ import argparse
import asyncio
import logging
import logging.handlers
import re
import os
import signal
from . import AllgoAio
def bind(txt):
mo = re.match("(?:(.+):)?(\d+)\Z", txt)
mo = re.match(r"(?:(.+):)?(\d+)\Z", txt)
if mo is None:
raise ValueError()
host, port = mo.groups()
......@@ -18,9 +19,9 @@ def bind(txt):
host = ""
return host, int(port)
def init_logging(log_level, logdir):
def init_logging(level, logdir):
level = min(log_level, logging.INFO),
level = min(level, logging.INFO),
format = "%(asctime)s %(levelname)-8s %(name)-24s %(message)s",
datefmt = "%Y-%b-%2d %H:%M:%S",
......@@ -66,12 +67,10 @@ parser.add_argument("--pidfile", metavar="PATH", default="/run/",
args = parser.parse_args()
if args.debug:
log_level = logging.DEBUG
elif args.verbose == 1:
log_level = logging.INFO
log_level = logging.WARNING
log_level = (
logging.DEBUG if args.debug else
logging.INFO if args.verbose == 1 else
init_logging(log_level, args.logdir)
from django.http import HttpResponse
from django.utils.decorators import method_decorator
from django.views.decorators.csrf import csrf_exempt
from main.helpers import get_request_user
from main.models import Job
......@@ -4,7 +4,6 @@ import os
import config.env
from django.core.validators import ValidationError
from django.http import JsonResponse
from django.shortcuts import redirect
from django.urls import reverse
from django.views.decorators.csrf import csrf_exempt
from django.views.generic import View
......@@ -18,7 +17,7 @@ DATASTORE = config.env.ALLGO_DATASTORE
BUF_SIZE = 65536
def get_link(jobid, dir, filename, request):
def get_link(jobid, filename, request):
return '/'.join((get_base_url(request), "api/v1/datastore", str(jobid), filename))
......@@ -28,9 +27,9 @@ class APIJobView(JobAuthMixin, View):
job = Job.objects.get(id=pk)
files = {}
for f in os.listdir(job.data_dir):
if lookup_job_file(, f):
files[f] = get_link(, job.data_dir, f, request)
for filename in os.listdir(job.data_dir):
if lookup_job_file(, filename):
files[filename] = get_link(, filename, request)
response = { files,
......@@ -35,7 +35,7 @@ class WebappAdmin(admin.ModelAdmin):
class WebappParameterAdmin(admin.ModelAdmin):
list_display = ('id', 'get_webapp', 'name')
def get_webapp(self, obj):
......@@ -5,6 +5,4 @@ class MainConfig(AppConfig):
name = 'main'
def ready(self):
import main.signals
......@@ -2,7 +2,6 @@ from allauth.account.forms import SignupForm
from django import forms
from django.contrib.auth.models import User
from django.core.exceptions import ObjectDoesNotExist
from django.core.validators import RegexValidator
from django.utils.safestring import mark_safe
from taggit.forms import TagField
......@@ -16,7 +15,6 @@ from .models import (
from .validators import docker_name_validator
......@@ -45,7 +43,11 @@ class SSHForm(forms.ModelForm):
label="SSH Key",
help_text=mark_safe('Before you can add an SSH key you need to <a href="">generate one</a> or use an <a href="">existing key.</a>'),)
help_text=mark_safe('Before you can add an SSH key you need to '
'<a href="">'
'generate one</a> or use an '
'<a href="">'
'existing key.</a>'))
class Meta:
model = AllgoUser
......@@ -74,9 +76,14 @@ class JobForm(forms.ModelForm):
help_text=mark_safe('The <a href="">queue for scheduling your job</a>. Queues with shorter limit have a higher priority.'),
help_text=mark_safe('The '
'<a href="">'
'queue for scheduling your job</a>. '
'Queues with shorter limit have a higher priority.'),
param = forms.CharField(label='Parameters', label_suffix='', required=False, help_text='Enter the parameters you need or click on the "presets" button to select any predefined one.')
param = forms.CharField(label='Parameters', label_suffix='', required=False,
help_text='Enter the parameters you need or click on the "presets" button to select '
'any predefined one.')
webapp_parameters = forms.ModelChoiceField(
......@@ -114,7 +121,8 @@ class RunnerForm(forms.ModelForm):
self.request = kwargs.pop('request')
super(RunnerForm, self).__init__(*args, **kwargs)
self.fields['webapps'].queryset = Webapp.objects.filter(
self.fields['webapps'].choices = [(, for webapp in Webapp.objects.filter(]
self.fields['webapps'].choices = [(,
for webapp in Webapp.objects.filter(]
for field in self.fields.values():
field.error_messages = {'required':'The field "{fieldname}" is required'.format(
......@@ -129,7 +137,9 @@ class WebappForm(forms.ModelForm):
# Basic
name = forms.CharField(label="Application name", label_suffix="")
contact = forms.EmailField(label="Email contact", label_suffix="", required=False, help_text="By default this will be your personnal e-mail address. You may fill this field if you wish to use a different contact address.")
contact = forms.EmailField(label="Email contact", label_suffix="", required=False,
help_text="By default this will be your personnal e-mail address. "
"You may fill this field if you wish to use a different contact address.")
description = forms.CharField(widget=forms.Textarea, label="Description", label_suffix="")
private = forms.TypedChoiceField(
coerce=lambda x: x == 'True',
......@@ -137,7 +147,8 @@ class WebappForm(forms.ModelForm):
label="Private mode",
help_text='Private apps do not appears in the public list, but they are reachable by users knowing the url.',
help_text='Private apps do not appears in the public list, '
'but they are reachable by users knowing the url.',
# Advanced
......@@ -153,12 +164,18 @@ class WebappForm(forms.ModelForm):
job_queue = forms.ModelChoiceField(
label='Default job queue',
help_text=mark_safe('The default <a href="">queue for scheduling new jobs</a> using this app.'),
help_text=mark_safe('The default '
'<a href="">'
'queue for scheduling new jobs</a> using this app.'),
entrypoint = forms.CharField(label="Entrypoint", label_suffix="",
help_text=mark_safe('This is the <a href="">command executed when allgo runs a job</a> for this app.'),
help_text=mark_safe('This is the '
'<a href="">'
'command executed when allgo runs a job</a> for this app.'),
owner = forms.CharField(required=False, label="Owner", label_suffix='', help_text="Username of the new owner of the application. You will immediately loose access to the application.")
owner = forms.CharField(required=False, label="Owner", label_suffix='',
help_text="Username of the new owner of the application."
" You will immediately loose access to the application.")
tags = TagField(required=False, label_suffix='', help_text="Tags are separated by a comma.")
def __init__(self, *args, **kwargs):
......@@ -186,21 +203,11 @@ class WebappForm(forms.ModelForm):
class Meta:
model = Webapp
fields = ('name', 'description', 'contact', 'entrypoint', 'job_queue', 'private', 'docker_os', 'entrypoint', 'owner', 'tags')
class WebappSandboxForm(forms.ModelForm):
number = forms.CharField(label='Version', label_suffix='')
description = forms.CharField(label='Description', label_suffix='')
class Meta:
model = WebappVersion
fields = ('number', 'description')
fields = ('name', 'description', 'contact', 'entrypoint', 'job_queue',
'private', 'docker_os', 'entrypoint', 'owner', 'tags')
class WebappImportForm(forms.Form):
webapp_id = forms.IntegerField(label="Webapp ID", required=False)
docker_name = forms.CharField(label="Short name", required=False,
......@@ -136,7 +136,7 @@ def get_redis_connection():
def notify_controller(obj):
"""Notify the controller that an entry was updated in the db
The notification is sent through the redis pubsub channel
......@@ -151,13 +151,13 @@ def notify_controller(obj):
_ALLOWED_IP_NETWORKS = list(map(IPy.IP, config.env.ALLGO_ALLOWED_IP_ADMIN.split(",")))
def is_allowed_ip_admin(ip):
def is_allowed_ip_admin(ip_address):
"""Return true if admin actions are allowed from this IP address
The function return true if the provided ip address is included in at least
one network listed in ALLGO_ALLOWED_IP_ADMIN.
return any(ip in net for net in _ALLOWED_IP_NETWORKS)
return any(ip_address in net for net in _ALLOWED_IP_NETWORKS)
......@@ -200,7 +200,7 @@ def get_request_user(request):
# NOTE: we must NOT authenticate by cookie because the CORS
# configuration in the nginx.conf allows all origins
mo = re.match("Token token=(\S+)",
mo = re.match(r"Token token=(\S+)",
request.META.get('HTTP_AUTHORIZATION', ''))
if mo:
return getattr(
......@@ -221,6 +221,3 @@ def query_webapps_for_user(user):
# select webapps that are either public or owned by the user
return Webapp.objects.filter(Q(private=False) | Q(
......@@ -96,7 +96,7 @@ class Command(BaseCommand):
create_func = User.objects.create_superuser
create_func = User.objects.create_user
# create user (in table 'auth_user')
user = create_func(name, name, kw["password"])
from django.conf import settings
from django.contrib.auth.mixins import UserPassesTestMixin, LoginRequiredMixin
from django.core.exceptions import PermissionDenied
from django.http import HttpResponse, JsonResponse, Http404
from django.http import JsonResponse, Http404
from django.shortcuts import redirect
from .models import Job
......@@ -17,7 +16,7 @@ from .helpers import get_request_user
# force the validation of all addresses ?
class AllgoValidAccountMixin:
"""Common mixin for allgo accounts validation
An account is valid if:
- at least one of its email address is verified
- its owner agreed the latest ToS
......@@ -66,14 +65,14 @@ class UserAccessMixin(LoginRequiredMixin, AllgoValidAccountMixin):
class ProviderAccessMixin(LoginRequiredMixin, AllgoValidAccountMixin):
"""Mixin to be included in views that require provider-level access
(i.e. user allowed to create new web applications)
class AllAccessMixin(AllgoValidAccountMixin):
"""Mixin to be included in views usable by any user (registered or not)
Note: the purpose of using this mixin (rather that no mixin at all) is that
it ensures that the user registration is complete (email address
validated). Thus the user is invited to complete the registration before
......@@ -107,4 +106,3 @@ class JobAuthMixin(AllgoValidAccountMixin, UserPassesTestMixin):
if not self.raise_exception and self.request.path_info.startswith("/api/"):
return JsonResponse({"error": "401 Unauthorized"}, status=401)
return super().handle_no_permission()
......@@ -4,22 +4,18 @@ import os
from django.conf import settings
from django.contrib import auth
from django.contrib.auth.models import User, AnonymousUser
from django.core.validators import MinLengthValidator, MinValueValidator, \
from django.core.validators import MinValueValidator, RegexValidator
from django.db import models
from django.db.models.signals import post_save
from django.dispatch import receiver
from django.utils.crypto import get_random_string
from taggit.managers import TaggableManager
from allauth.account.models import EmailAddress
from allauth.socialaccount.models import SocialAccount
from django.utils.crypto import get_random_string
from .validators import job_param_validator, docker_container_id_validator, \
docker_tag_validator, docker_name_validator, \
sshkey_validator, token_validator
import config.env
def generate_token(length=32):
......@@ -96,9 +92,6 @@ class AllgoUser(BaseModel):
def __str__(self):
return self.user.username
def getApp(self):
return [a.docker_name for a in Webapp.objects.filter(]
class DockerOs(BaseModel):
......@@ -160,7 +153,7 @@ class Webapp(TimeStampModel):
# Notes about sandbox states
# - Possible state transitions
......@@ -229,7 +222,8 @@ class Webapp(TimeStampModel):
access_token = models.CharField(max_length=255, blank=True, null=True,
sandbox_state = models.IntegerField(null=True, choices=SANDBOX_STATE_CHOICES, default=IDLE)
sandbox_version = models.ForeignKey('WebappVersion', null=True, blank=True, related_name='webappversions')
sandbox_version = models.ForeignKey('WebappVersion', null=True, blank=True,
notebook_gitrepo = models.CharField(max_length=255, blank=True, null=True)
memory_limit = models.BigIntegerField(null=True,
......@@ -268,7 +262,7 @@ class Webapp(TimeStampModel):
def is_pullable_by(self, actor, *, client_ip=None):
"""Return True if the given actor is allowed to pull an image of this webapp
`actor` may be a User, AllgoUser, Runner or None
`client_ip` is client IP address (used for limiting admin/open_bar
......@@ -287,7 +281,7 @@ class Webapp(TimeStampModel):
def is_pushable_by(self, actor):
"""Return True if the given actor is allowed to push an image of this webapp
`actor` may be a User, AllgoUser, Runner or None
......@@ -295,7 +289,7 @@ class Webapp(TimeStampModel):
def get_webapp_version(self, number=None):
"""Return the WebappVersion object to be used for running a job
'number' is the requested version (provided as a string). If no version
number is provided, then the function selects the most recent.
......@@ -396,14 +390,14 @@ class WebappVersion(TimeStampModel):
# When a version is replaced by the user, under the hood, a separate
# WebappVersion entry is created with a different id. Docker images are never
# overwritten.
# Fields
number = models.CharField(max_length=255, validators=[
# 'sandbox' is a reserved name because it is used for running jobs in
# sandboxes
RegexValidator("\Asandbox\Z", inverse_match=True,
RegexValidator(r"\Asandbox\Z", inverse_match=True,
message="This is a reserved name"),
description = models.CharField(max_length=255, blank=True)
......@@ -504,7 +498,7 @@ class Job(TimeStampModel):
# WAITING -> RUNNING (job start)
# ABORTING -> DONE (job terminated after user abort request)
# RUNNING -> DONE (job terminated, all other cases)
# - both django and the controller are allowed to change the state when the
# job is WAITING or RUNNING. In order to prevent race conditions, the
# following transitions *must* be performed atomically in the db (compare
......@@ -579,13 +573,13 @@ class Job(TimeStampModel):
def status(self):
"""Return a textual representation of the job status
The job status is what we display to the user (the 'state' and 'result'
fields are internal to allgo).
The status is the textual representation of:
- the 'result' field if the job is terminated (success, error, timeout
or aborted)
or aborted)
- the 'state' field otherwise (new, waiting, running, aborting)
......@@ -665,9 +659,9 @@ def save_user_profile(sender, instance, **kwargs):
def is_provider(email_addr: EmailAddress) -> bool:
def email_is_provider(email_addr: EmailAddress) -> bool:
"""Return true if the email address is in the list of domains allowed to provide applications
WARNING: the address may not be verified
......@@ -678,12 +672,12 @@ def is_provider(email_addr: EmailAddress) -> bool:
# malformatted email
return False
EmailAddress.add_to_class('is_provider', is_provider)
EmailAddress.add_to_class('is_provider', email_is_provider)
def email_addresses(user: User):
"""Get the email addresses associated to a user account
returns a db query of EmailAddress objects
return EmailAddress.objects.filter(user=user)
......@@ -692,17 +686,17 @@ auth.models.User.add_to_class('email_addresses', email_addresses)
def is_provider(user: User):
def user_is_provider(user: User):
"""Return true if the user has at least one email address in the allowed
developer domains