Attention une mise à jour du service Gitlab va être effectuée le mardi 30 novembre entre 17h30 et 18h00. Cette mise à jour va générer une interruption du service dont nous ne maîtrisons pas complètement la durée mais qui ne devrait pas excéder quelques minutes. Cette mise à jour intermédiaire en version 14.0.12 nous permettra de rapidement pouvoir mettre à votre disposition une version plus récente.

Commit d4ac4721 authored by BAIRE Anthony's avatar BAIRE Anthony
Browse files

Add a redis key to store job results

While this is not needed for the job_detail page (because we just reload
the page when the job is done), we will need this information for the
job_list page (because we do not want to reload the job_list each time a
job terminates)
parent fdaba1b8
......@@ -63,6 +63,7 @@ default_executor = ThreadPoolExecutor(10)
# job log
REDIS_KEY_JOB_LOG = "log:job:%d"
REDIS_KEY_JOB_STATE = "state:job:%d"
REDIS_KEY_JOB_RESULT = "result:job:%d"
# pubsub channels for waking up allgo.aio (frontend) and the controller
# (ourselves)
......@@ -1492,16 +1493,22 @@ class JobManager(Manager):
# -> wait for its termination
yield from self._finish_job(info, reset)
yield from self._notif_job_state(info, "DONE")
with ses.begin():
job = ses.query(Job).filter_by(id=job_id).first()
result = None if job is None else JobResult(job.result).name
yield from self._notif_job_state(info, "DONE", result)
# send a notification to the aio frontend when the job state is changed
#
# this function never fail (redis errors are caught and written in the
# logs)
async def _notif_job_state(self, info, state):
async def _notif_job_state(self, info, state, result=None):
try:
log.info("notify the frontend: job %d state is now %r",
info.job_id, state)
log.info("notify the frontend: job %d state is now %r with result %r",
info.job_id, state, result)
if result is not None:
await self.ctrl.redis_client.setex(REDIS_KEY_JOB_RESULT % info.job_id,
86400, result)
await self.ctrl.redis_client.setex(REDIS_KEY_JOB_STATE % info.job_id,
86400, state)
await self.ctrl.redis_client.publish(REDIS_CHANNEL_AIO,
......
......@@ -63,6 +63,7 @@ import config.env
# job log
REDIS_KEY_JOB_LOG = "log:job:%d"
REDIS_KEY_JOB_STATE = "state:job:%d"
REDIS_KEY_JOB_RESULT = "result:job:%d"
# pubsub channels for waking up allgo.aio (frontend) and the controller
......@@ -272,8 +273,11 @@ class AllgoAio:
# condition dict for job & webapp notifications
#
# key is the job_id/webapp_id
self.job_states = StatesDict(("state",))
self.webapp_states = StatesDict()
self.job_states = StatesDict(("state", "result"))
# mutex to be locked when adding new job_states entries
self.job_states_create_lock = asyncio.Lock()
# ---------- routes ---------- #
ar = self.app.router
......@@ -383,6 +387,10 @@ class AllgoAio:
# note: null state is possible if the redis entry does not
# exists yet
if new_state:
if new_state == b"DONE":
result = await self.redis_client.get(
REDIS_KEY_JOB_RESULT % job_id)
job.result = result.decode() if result else "NONE"
job.state = new_state.decode()
log.info("notify job cond %d", job_id)
......@@ -402,8 +410,10 @@ class AllgoAio:
sub, = await conn.subscribe(REDIS_CHANNEL_AIO)
log.info("subscribed to redis pub/sub channel %r" % REDIS_CHANNEL_AIO)
for item_id in list(self.job_states):
await self.update_job_state(item_id)
async with self.job_states_create_lock:
for item_id in self.job_states:
await self.update_job_state(item_id)
async for msg in sub.iter():
log.info("redis notification: %r", msg)
try:
......@@ -519,7 +529,8 @@ class AllgoAio:
log_key = REDIS_KEY_JOB_LOG % job_id
state_key = REDIS_KEY_JOB_STATE % job_id
state = None
job = self.job_states[job_id]
async with self.job_states_create_lock:
job = self.job_states[job_id]
FINAL_STATES = ("DONE", "ARCHIVED", "DELETED")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment