Commit c11f9a6b authored by BERJON Matthieu's avatar BERJON Matthieu
Browse files

Merge branch '217-include-the-webappversion-id-in-docker-image-tags' into 'django'

Resolve "include the WebappVersion.id in docker image tags"

Closes #265 and #217

See merge request !118
parents b2d1a33b ade74d2d
Pipeline #40170 failed with stage
in 1 minute and 17 seconds
......@@ -537,7 +537,7 @@ class SandboxManager(Manager):
state = int(VersionState.SANDBOX))
)
def _start(self, webapp, version):
def _start(self, webapp, image_tag):
"""Start a webapp sandbox
(to be executed in a thread pool)
......@@ -547,14 +547,7 @@ class SandboxManager(Manager):
ses = ctrl.session
# prepare sandbox parameters
# docker image
if version is None:
image = "%s:%s" % (ctrl.gen_factory_name(webapp.docker_os),
webapp.docker_os.version)
else:
image = "%s:%s" % (webapp.image_name, version.number)
image = ":".join(image_tag)
log.debug("sandbox %r: using image %r", webapp.docker_name, image)
# safety checks
......@@ -698,7 +691,7 @@ EOF
ctrl.check_host_path("isdir", etc_dir)
ctrl.check_host_path("isdir", run_dir)
if version is None and webapp.entrypoint:
if webapp.sandbox_version_id is None and webapp.entrypoint:
# prepend instructions to initialise a dummy entrypoint
dn, bn = os.path.split(webapp.entrypoint)
# FIXME: do nothing if entrypoint already exists
......@@ -802,7 +795,6 @@ exec /.toolbox/bin/sshd -D
", ".join(map(repr, sorted(v.number for v in versions))))
recover = tuple(v.id for v in versions)
# TODO: make 'sandbox' a reserved name
if error:
description = "pre-commit error: " + error
......@@ -824,15 +816,19 @@ exec /.toolbox/bin/sshd -D
published = False,
state = int(VersionState.SANDBOX))
ses.add(version)
version.webapp
ses.refresh(version)
ses.expunge(version)
ses.expunge_all()
assert version is not None
# commit the docker image
image, tag = ctrl.gen_image_tag(version, webapp)
log.debug("dicts %r %r", webapp.__dict__, version.__dict__)
log.info("commit sandbox %r version %r", webapp.docker_name, version.number)
log.info("commit sandbox %r version %r (%s)",
webapp.docker_name, version.number, tag)
container = webapp.sandbox_name
next_state = image_size = None
......@@ -843,7 +839,7 @@ exec /.toolbox/bin/sshd -D
ctrl.sandbox.wait(container)
# commit
cid = ctrl.sandbox.commit(container, webapp.image_name, version.number)
cid = ctrl.sandbox.commit(container, image, tag)
next_state = VersionState.COMMITTED
image_size = ctrl.sandbox.inspect_image(cid)["Size"]
......@@ -931,7 +927,7 @@ exec /.toolbox/bin/sshd -D
ses = ctrl.session
with ses.begin():
# current state of the sandbox + load docker os
# current state of the sandbox + load docker_os
webapp = ses.query(Webapp).filter_by(id=webapp_id).one()
webapp.docker_os
......@@ -945,7 +941,6 @@ exec /.toolbox/bin/sshd -D
# docker name of the sandbox & image
webapp.sandbox_name = ctrl.gen_sandbox_name(webapp)
webapp.image_name = ctrl.gen_image_name(webapp)
phase = "inspect"
next_state = fail_state = None
......@@ -965,16 +960,19 @@ exec /.toolbox/bin/sshd -D
raise Error("invalid version id %d (belongs to webapp %d)" % (
sandbox_version.id, sandbox_version.webapp_id))
image_tag = ctrl.gen_image_tag(sandbox_version, webapp)
# pull requested image
yield from ctrl.image_manager.pull(sandbox_version.id)
else:
image_tag = (ctrl.gen_factory_name(webapp.docker_os),
webapp.docker_os.version)
# pull image
yield from ctrl.image_manager.sandbox_pull_manager.process((
ctrl.gen_factory_name(webapp.docker_os),
webapp.docker_os.version))
yield from ctrl.image_manager.sandbox_pull_manager.process(image_tag)
# start sandbox
yield from self.run_in_executor(self._start, webapp, sandbox_version)
yield from self.run_in_executor(self._start, webapp, image_tag)
elif webapp.sandbox_state == SandboxState.STOPPING:
# stop the sandbox
......@@ -1056,18 +1054,24 @@ class JobManager(Manager):
job = ses.query(Job).filter_by(id=info.job_id).one()
webapp = job.webapp
log.info("start job %d (%s:%s)",
info.job_id, webapp.docker_name, info.version)
repo = ctrl.gen_image_name(webapp)
image = "%s:%s" % (repo, info.version)
if info.ver_id is None:
image = None
image_desc = ""
else:
version = ses.query(WebappVersion).filter_by(id=info.ver_id).one()
repo, tag = ctrl.gen_image_tag(version)
image = "%s:%s" % (repo, tag)
image_desc = ", image=" + tag
log.info("start job %d (%s:%s%s)", info.job_id,
webapp.docker_name, info.version, image_desc)
job_path = ctrl.gen_job_path(job)
log.debug("job.path: %r", job_path)
if info.ver_id is None:
assert info.version == "sandbox"
image = tmp_img = info.client.commit(ctrl.gen_sandbox_name(webapp), repo, info.version)["Id"]
image = tmp_img = info.client.commit(ctrl.gen_sandbox_name(webapp))["Id"]
# TODO use another workdir
# TODO use another uid
......@@ -1572,18 +1576,8 @@ class PushManager(Manager):
raise Error("unable to push (image not yet committed)")
raise Error("unable to push (invalid state: %s)" % version.state)
# ensure that there is no other version with the same number in the pipeline
# (to avoid a race condition)
others = (ses.query(WebappVersion.id)
.filter_by(webapp_id=version.webapp_id, number=version.number)
.filter(WebappVersion.id != version.id)
.filter(WebappVersion.state.in_((int(VersionState.SANDBOX), int(VersionState.COMMITTED)))))
if others.count():
raise Error("unable to push (there are other pushable versions with the same number: %s)" % (
" ".join(map(str, itertools.chain(*others)))))
image = self.ctrl.gen_image_name(version.webapp)
tag = version.number
image, tag = self.ctrl.gen_image_tag(version)
ses.expunge(version)
log.info("push from the %-8s %s:%s", "sandbox", image, tag)
# FIXME: docker-py folks are morons in 1.9.0 they added the
......@@ -1607,24 +1601,33 @@ class PushManager(Manager):
reset()
with ses.begin():
version = ses.query(WebappVersion).filter_by(id=version_id).one()
prev = ses.query(WebappVersion).filter_by(
webapp_id = version.webapp_id,
number = version.number,
state = int(VersionState.READY)).scalar()
log.debug("prev version id %r", (prev.id if prev else None))
if prev is None:
# this is a new version
version.state = int(VersionState.READY)
else:
# overwrite an existing version
for key in "updated_at", "description", "published":
setattr(prev, key, getattr(version, key))
# mark this version as replaced
version.state = int(VersionState.REPLACED)
ses.add(version)
# Now that the push is completed, we need to switch the state to READY
#
# However since push is performed asynchronously and since we
# may be replacing a version that already exists, when may have
# multiple versions competing for the READY state.
#
# We set the READY state to the version with the highest id
# (the one which was committed/pushed last) and put all other
# versions in the REPLACED state.
# select and lock all candidate versions
versions = ses.query(WebappVersion).filter_by(id=version.id).union(
ses.query(WebappVersion).filter_by(
webapp_id = version.webapp_id,
number = version.number,
state = int(VersionState.READY))
).with_for_update().all()
# set the latest one to READY and the others to REPLACED
latest_id = max(v.id for v in versions)
for v in versions:
new_state = (VersionState.READY if v.id == latest_id else
VersionState.REPLACED)
log.debug("version id %d: %s -> %s", v.id,
VersionState(v.state).name, new_state)
v.state = int(new_state)
class ImageManager:
......@@ -1655,8 +1658,7 @@ class ImageManager:
# get the version object and check its state
version = ses.query(WebappVersion).filter_by(id=version_id).one()
image = self.ctrl.gen_image_name(version.webapp)
tag = version.number
image, tag = self.ctrl.gen_image_tag(version)
if swarm:
......@@ -1676,7 +1678,7 @@ class ImageManager:
else:
# pull to the sandbox
if version.state == VersionState.COMMITTED:
# do not pull!
# nothing to do
return
if version.state not in (VersionState.READY, VersionState.REPLACED):
......@@ -1771,8 +1773,17 @@ class DockerController:
def gen_sandbox_name(self, webapp):
return "%s-sandbox-%s" % (self.env, webapp.docker_name)
def gen_image_name(self, webapp):
return "%s/%s" % (self.registry, webapp.docker_name)
def gen_image_tag(self, webapp_version, webapp=None):
"""Get the docker image for a given WebappVersion
return a tuple (REPO, TAG)
"""
if webapp is None:
webapp = webapp_version.webapp
else:
assert webapp.id == webapp_version.webapp_id
return ("%s/%s" % (self.registry, webapp.docker_name),
"id%d" % webapp_version.id)
def gen_job_name(self, job):
return "%s-job-%s-%d-%s" % (self.env, job.queue.name, job.id, job.webapp.docker_name)
......@@ -1886,8 +1897,6 @@ class DockerController:
ses = self.session
with ses.begin():
if startup:
ses.execute("DELETE FROM dj_webapp_versions WHERE state=%d"
% VersionState.REPLACED)
for version_id, in ses.execute(
"SELECT id FROM dj_webapp_versions WHERE state=%d"
% VersionState.COMMITTED).fetchall():
......
......@@ -26,6 +26,7 @@ class VersionState(enum.IntEnum):
READY = 2
ERROR = 3
REPLACED = 4
USER = 5
class JobState(enum.IntEnum):
......
......@@ -482,7 +482,11 @@ class AllgoAio:
# NOTE: the registry endpoint also supports the DELETE method to delete the
# image. Since the deletion of a WebappVersion is not yet implemented, we
# do not allow the delete method here
if request.method not in ("HEAD", "GET", "PUT"):
if request.method in ("HEAD", "GET"):
action = "pull"
elif request.method == "PUT":
action = "push"
else:
# method not allowed
return ErrorResponse(status=405)
......@@ -492,40 +496,42 @@ class AllgoAio:
repo = request.match_info["repo"]
tag = request.match_info["tag"]
# prepare the forwarded HTTP request to the registry
forwarded_request = self.http_client.request(request.method,
config.env.ALLGO_REGISTRY_PRIVATE_URL + request.path,
headers=headers, data=await request.read())
# call django's pre hook
async with self.django_request("POST", "/jwt/pre-"+action,
headers=headers, params={"repo": repo, "tag": tag},
) as django_reply:
if request.method != "PUT":
# GET, HEAD
async with forwarded_request as registry_reply:
return await forward_response(registry_reply)
else:
# PUT
if not is_ok(django_reply):
return await forward_response(django_reply)
version_id = int(await django_reply.read())
# call django's pre-push hook
async with self.django_request("POST", "/jwt/pre-push",
headers=headers, params={"repo": repo, "tag": tag},
) as django_reply:
cleanup = version_id if action == "push" else None
if not is_ok(django_reply):
return await forward_response(django_reply)
webapp_id = int(await django_reply.read())
# forward the HTTP request to the registry
real_url = "%s/v2/%s/manifests/id%d" % (
config.env.ALLGO_REGISTRY_PRIVATE_URL, repo, version_id)
async with self.http_client.request(request.method, real_url,
headers=headers, data=await request.read()) as registry_reply:
if action == "pull":
# pull
# -> just forward the reply
return await forward_response(registry_reply)
else:
# push
# -> call the post-push hook to update the db
# forward the registry request
async with forwarded_request as registry_reply:
if not is_ok(registry_reply):
return await forward_response(registry_reply)
registry_success = int(is_ok(registry_reply))
# call django's post-push hook
async with self.django_request("POST", "/jwt/post-push",
headers=headers, params={"webapp_id": str(webapp_id), "tag": tag},
headers=headers, params={"version_id": str(version_id),
"success": str(registry_success)},
) as django_reply:
if is_ok(django_reply):
return await forward_response(registry_reply)
else:
if registry_success and not is_ok(django_reply):
return await forward_response(django_reply)
else:
return await forward_response(registry_reply)
async def handle_job_events(self, request):
......
......@@ -116,12 +116,13 @@ class Webapp(TimeStampModel):
app belonging to one or more users.
"""
IDLE = 0
RUNNING = 1
STARTING = 2
START_ERROR = 3
STOPPING = 4
STOP_ERROR = 5
#### Sandbox States ####
IDLE = 0 # no active sandbox
RUNNING = 1 # the sandbox is running
STARTING = 2 # the sandbox is starting
START_ERROR = 3 # the sandbox failed to start
STOPPING = 4 # the sandbox is stopping
STOP_ERROR = 5 # the sandbox failed to stop
SANDBOX_STATE_CHOICES = (
(IDLE, 'IDLE'),
......@@ -131,6 +132,37 @@ class Webapp(TimeStampModel):
(STOPPING, 'STOPPING'),
(STOP_ERROR, 'STOP_ERROR'),
)
# Notes about sandbox states
#
# - Possible state transitions
# - by django:
# (any) -> STARTING (sandbox start order)
# (any) -> STOPPING (sandbox stop order)
#
# - by the controller:
# STARTING -> RUNNING (successfully started)
# STARTING -> START_ERROR (start error)
# STOPPING -> IDLE (successfully stopped)
# STOPPING -> STOP_ERROR (stop error)
#
# - Both django and the controller are allowed to change the state, but
# django has the priority. Transitions by the controller must be performed
# *atomically* (compare and swap).
#
# - In the STARTING state, the docker image for creating the sandbox is
# specified by the 'sandbox_version_id' field (if not NULL), or by the
# 'docker_os_id' (if sandbox_version_id is NULL).
#
# - In the STOPPING state, sandbox_version_id must be reset to NULL (to
# avoid later issues with foreign key constraints). If a new version has
# to be committed before destroying the sandbox, django may create a new
# WebappVersion entry in the SANDBOX state.
#
# - Errors (in states START_ERROR and STOP_ERROR) are generally
# recoverable. Django can switch back to STARTING/STOPPING for retrying
# the operation.
#
# Fields
name = models.CharField(unique=True, max_length=255)
......@@ -265,11 +297,28 @@ class WebappVersion(TimeStampModel):
Version of a given webapp
"""
SANDBOX = 0
COMMITTED = 1
READY = 2
ERROR = 3
REPLACED = 4
#### WebappVersion States ####
SANDBOX = 0 # this version not yet committed
# - it is the current content of the sandbox
# - there may be at most one version in SANDBOX state
# per webapp
COMMITTED = 1 # the docker image is committed but not yet pushed to the registry
READY = 2 # the docker image is on the registry
ERROR = 3 # an error occurred during the creation of this version
# - this version is not useable
# - a separate 'recovery' version may have been
# committed by the controller
REPLACED = 4 # this version was replaced (by another READY version)
# - it is assumed to be deleted
# - it is no longer visible by the user
# - it may still be in use by a job or a sandbox
USER = 5 # this version is being pushed directly by the user
SANDBOX_STATE_CHOICES = (
(SANDBOX, 'SANDBOX'),
......@@ -277,7 +326,40 @@ class WebappVersion(TimeStampModel):
(READY, 'READY'),
(ERROR, 'ERROR'),
(REPLACED, 'REPLACED'),
(USER, 'USER'),
)
# Notes about WebappVersion states
#
# - Allowed state changes:
# - by django:
# (none) -> SANDBOX
# (none) -> USER
# USER -> READY,ERROR
# READY -> REPLACED
# - by the controller:
# SANDBOX -> COMMITTED,ERROR
# COMMITTED -> READY,REPLACED
# READY -> REPLACED
#
# - As soon as the version is in the SANDBOX, the version is considered to
# exist and to be usable for launching a job or a sandbox.
#
# - The process for pushing images is completely asynchronous, consequently
# if a version is uptated multiples times quickly, then the images may
# not reach the READY state in the same order. If multiple images with
# the same version number reach the READY state, then only the higest id
# is kept at the READY state, older versions are switched to REPLACED.
# Note: switching to the READY/REPLACING state must be done atomically
# (by locking the db rows) because it may be done by django (when
# pushing) or the controller (when committing a sandbox).
#
# - The docker images are named as <REPO>/<Webapp.docker_name>:id<WebappVersion.id>
# When a version is replaced by the user, under the hood, a separate
# WebappVersion entry is created with a different id. Docker images are never
# overwritten.
#
# Fields
number = models.CharField(max_length=255, validators=[
docker_tag_validator,
......@@ -290,7 +372,6 @@ class WebappVersion(TimeStampModel):
docker_image_size = models.FloatField(blank=True, null=True)
state = models.IntegerField(choices=SANDBOX_STATE_CHOICES)
published = models.BooleanField()
url = models.URLField(blank=True, null=True)
webapp = models.ForeignKey('Webapp', on_delete=models.CASCADE, related_name="webapp")
......
......@@ -332,6 +332,11 @@ class WebappSandboxPanel(LoginRequiredMixin, TemplateView):
webapp = self.get_object()
action = request.POST["action"]
def stop_sandbox():
webapp.sandbox_state = Webapp.STOPPING
webapp.sandbox_version_id = None
webapp.save()
log.info("action %r", request.POST["action"])
if action == "start":
if webapp.sandbox_state != Webapp.IDLE:
......@@ -356,14 +361,20 @@ class WebappSandboxPanel(LoginRequiredMixin, TemplateView):
"unable to commit sandbox %r because it is not running"
% webapp.name)
else:
# query previous active versions of this webapp
previous = WebappVersion.objects.filter(webapp=webapp,
state__in = (WebappVersion.READY, WebappVersion.COMMITTED))
extra = {}
if request.POST["version-action"] == "replace-version":
number = request.POST["version-select"]
# keep the previous 'created_at' timestamp when replacing an image
extra["created_at"] = getattr(previous.filter(number=number).first(), "created_at")
else:
number = request.POST["version-new"]
# ensure that the version does not already exist
if WebappVersion.objects.filter(webapp=webapp, number=number,
state__in = (WebappVersion.READY, WebappVersion.COMMITTED)
).exists():
# ensure that this version number does not already exist
if previous.filter(number=number).exists():
messages.error(request, "unable to commit because version %r already exists"
" (if you want to overwrite this version, then use"
" 'replace version' instead)" % number)
......@@ -375,17 +386,15 @@ class WebappSandboxPanel(LoginRequiredMixin, TemplateView):
state=WebappVersion.SANDBOX,
published=True,
description=request.POST["description"],
url="http://WTF")
webapp.sandbox_state = Webapp.STOPPING;
webapp.save()
**extra)
stop_sandbox()
messages.success(request, "committing sandbox %r version %r"
% (webapp.name, number))
elif action == "rollback":
if webapp.sandbox_state == Webapp.RUNNING:
webapp.sandbox_state = Webapp.STOPPING
webapp.save()
stop_sandbox()
messages.success(request, "rolling back sandbox %r" % webapp.name)
else:
messages.error(request, "unable to roll back, sandbox %r is not running"
......@@ -393,8 +402,7 @@ class WebappSandboxPanel(LoginRequiredMixin, TemplateView):
elif action == "abort":
if webapp.sandbox_state == Webapp.START_ERROR:
webapp.sandbox_state = Webapp.STOPPING
webapp.save()
stop_sandbox()
messages.success(request, "reset sandbox %r" % webapp.name)
elif action == "retry":
......@@ -403,8 +411,7 @@ class WebappSandboxPanel(LoginRequiredMixin, TemplateView):
webapp.save()
messages.success(request, "starting sandbox %r" % webapp.name)
elif webapp.sandbox_state == Webapp.STOP_ERROR:
webapp.sandbox_state = Webapp.STOPPING
webapp.save()
stop_sandbox()
messages.success(request, "stopping sandbox %r" % webapp.name)
log.debug("new sandbox state: %r -> %r",
......
......@@ -7,7 +7,7 @@ urlpatterns = [
url(r'^jwt/auth$', views.jwt_auth, name="jwt_auth"), # REGISTRY_AUTH_TOKEN_REALM for docker registry
# hooks for registry pull/push for image manifests
url(r'^jwt/pre-push$', views.pre_push, name="pre_push"),
url(r'^jwt/pre-(push|pull)$', views.pre_pushpull, name="pre_pushpull"),
url(r'^jwt/post-push$', views.post_push, name="post_push"),
# default catch-all route for docker registry urls (normally unused because
......
......@@ -28,16 +28,17 @@ def _read_controller_token():
log.warning("failed to get the controller token at %r (%s)", path, e)
CONTROLLER_TOKEN = _read_controller_token()
@csrf_exempt
def pre_push(request):
"""pre-push hook for image manifests
def pre_pushpull(request, action):
"""pre-hook for pushing/pulling image manifests
This endpoint is called by allgo.aio before pushing an image to the
This endpoint is called by allgo.aio before pushing/pulling an image to the
registry.
it returns a 200 response with the webapp_id as body if successful
it returns a 200 response with the WebappVersion.id in the body if
successful
"""
if request.META.get("HTTP_X_ORIGIN") != "aio":
# this endpoint is only usable by allgo.aio
return HttpResponse(status=404)
......@@ -53,18 +54,28 @@ def pre_push(request):
except Webapp.DoesNotExist:
return JsonResponse({"error": "unknown repository"}, status=404)
# abort if we have a commit in progress in the controller
# NOTE: there is still a (very narrow) conflict window if the commmit
# starts between this check and the result of this request
if WebappVersion.objects.filter(webapp=webapp, number=tag,
state__in=(WebappVersion.SANDBOX, WebappVersion.COMMITTED)
).exists():