From 8da0bdb77c83df3cd3eb38ca3b1a966b5a08c96d Mon Sep 17 00:00:00 2001 From: LETORT Sebastien <sebastien.letort@irisa.fr> Date: Fri, 6 Dec 2019 11:34:22 +0100 Subject: [PATCH] Major update, the client has been rewritten. --- allgo/__init__.py | 271 +++++++++++++++++++++++++++++++--------------- setup.py | 16 +-- 2 files changed, 193 insertions(+), 94 deletions(-) diff --git a/allgo/__init__.py b/allgo/__init__.py index 6b7e252..47c197e 100644 --- a/allgo/__init__.py +++ b/allgo/__init__.py @@ -1,104 +1,201 @@ +#! /usr/bin/env python +# -*- coding: utf-8 -*- +""" +Python client for the Allgo API. + +This module provides a class that simplify the use of the Allgo API +""" + +# standard libs import logging import os -import time - -try: - from urllib.parse import urlparse, urlencode - from urllib.request import urlopen, Request - from urllib.error import HTTPError -except ImportError: - from urlparse import urlparse - from urllib import urlencode - from urllib2 import urlopen, Request, HTTPError +import sys + +# from json.decoder import JSONDecodeError # python3 +from os.path import expanduser + import requests -log = logging.getLogger('allgo') -__version__ = '0.1.11' +__version__ = '0.2.0' + +#: The default Allgo url used. +MAIN_INSTANCE_URL = "https://allgo18.inria.fr" + +#: The file containing the user token. +#: It is read in last resort to get a token. +TOKEN_FILE = os.path.join(expanduser("~"), '.allgo_token') + + +# =============================================================== +class AllgoError(Exception): + """Generic error generated by this module.""" + + +class TokenError(AllgoError): + """Raise TokenError when none is set.""" + + def __init__(self): + err_msg = "You must provide a token in parameter" + err_msg += " or define an environment variable 'ALLGO_TOKEN'" + err_msg += " or write your token in {}".format(TOKEN_FILE) + super(TokenError, self).__init__(err_msg) -def local_token(): - from os.path import expanduser - home = expanduser("~") - filetoken = os.path.join(home, '.allgo_token') - if os.path.exists(filetoken): - with open(filetoken) as f: +class StatusError(AllgoError): + """Raise StatusError when API does not return a 200 code. + """ + + def __init__(self, status_code, response, *args, **kwargs): + """TODO""" + self._status_code = status_code + self._response = response + AllgoError.__init__(self, *args, **kwargs) + + @property + def status_code(self): + return self._status_code + + @property + def msg(self): + return self._response.json()['error'] + + +def _local_token(): + """read and return the content of TOKEN_FILE or None.""" + if os.path.exists(TOKEN_FILE): + with open(TOKEN_FILE) as f: return f.read() + return None -class App: +# --------------------------------------------------------------- +class Client: """ - AllGo app submission object + manage connexions to the API and provide methods to deal with it. + """ - def __init__(self, name, token=None): + def __init__(self, token=None, allgo_url=MAIN_INSTANCE_URL, verify_tls=True): + """Build an object to manage the parameters of a user-connexion to an Allgo-instance. + """ - Constructor - :param name: name of the application in lower case - :param token: if not provided, we check ALLGO_TOKEN env variable and notebook parameters + self._allgo_url = allgo_url + self._verify_tls = verify_tls + self._token = token or os.getenv('ALLGO_TOKEN') or _local_token() + if None is self._token: + raise TokenError() + + @property + def token(self): + """return the user token for the querying all instance.""" + return self._token + + @property + def allgo_url(self): + """return the url of the querying allgo instance.""" + return self._allgo_url + + @property + def verify_tls(self): + """return the _verify_tls attribute.""" + return self._verify_tls + + def _request_api(self, method, url_end, **kwargs): + """Build and launch a http request on the Allgo instance API. """ - self.name = name - if token: - self.token = token - elif 'ALLGO_TOKEN' in os.environ.keys(): - self.token = os.environ.get('ALLGO_TOKEN') - elif local_token(): - self.token = local_token() - else: - err_msg = "You must provide a token in parameter" - err_msg += " or define an environment variable 'ALLGO_TOKEN'" - raise Exception(err_msg) - - def run(self, files, outputdir='.', params='', verify_tls=True): + headers = {'Authorization': 'Token token={}'.format(self.token)} + url = '{}/api/v1/{}'.format(self.allgo_url, url_end) + logging.debug("querying '{}' with method '{}'".format(url, method)) + logging.debug("and with data = {}".format(kwargs)) + + actions = { + 'post': requests.post, + 'get' : requests.get, + } + try: + resp = actions[method](url, headers=headers, verify=self.verify_tls, **kwargs) + logging.debug("raw API answer : {}".format(resp.content)) + if resp.status_code != requests.codes.ok: + logging.debug("req.status_code = {}".format(resp.status_code)) + raise StatusError(resp.status_code, resp) + return resp.json() + except ValueError: # pure python3 should use JSONDecodeError + # this happens when response content cannot be converted to json + # for example when downloading a file. + return resp + except KeyError as err: + msg = "method {} is not allowed to request the API.".format(method) + msg += "KeyError: {}".format(str(err)) + raise AllgoError(msg) + + def create_job(self, app, version=None, params='', files=None): + """Create a job for app, with params. """ - Submit the job - :param files: input files - :param outputdir: by default current directory - :param params: a string parameters see the application documentation - :param verify_tls: [True] the value is pass to the verify arg of requests.post - :return: + data = { + "job[webapp_name]": app, + "job[webapp_id]" : app, + "job[param]" : params, + "job[version]" : version + } + + file_dict = {} + if None is files: + files = [] + for i, file_ in enumerate(files): + key = "file[{}]".format(i) + file_dict[key] = open(file_, 'rb') + + return self._request_api('post', url_end='jobs', data=data, files=file_dict) + + def job_status(self, job_id): + """Get the status of a job as a dict. """ - headers = {'Authorization': 'Token token={}'.format(self.token)} - data = {"job[webapp_name]": self.name, - "job[webapp_id]": self.name, - "job[param]": params} - ALLGO_URL = os.environ.get('ALLGO_URL', "https://allgo.inria.fr") - url = '%s/api/v1/jobs' % ALLGO_URL - r = requests.post(url, headers=headers, files=files, data=data, verify=verify_tls) - r.raise_for_status() - r = r.json() - if 'id' in r.keys(): - jobid = r['id'] - else: - jobid = list(r.keys())[0] - results = None + url = 'jobs/{}'.format(job_id) + return self._request_api('get', url) + + def app_metrics(self, app_id, what, from_=None, to_=None, step=None): + """query the instance for some metrics. + + **Not yet implemented.** + + """ + raise NotImplementedError("metrics API has not been integrated yet.") + + # -------------------------------------------- + # -- Accessory methods, to ease user work. + # -------------------------------------------- + def run_job(self, app, version=None, params='', files=None, + sleep_duration=2, verbose=False): + """Create a job and wait for it to terminate. + """ + from time import sleep + # Note: states new, and deleted are almost impossible to get. + STATES = ['NEW', 'WAITING', 'RUNNING', 'ARCHIVED', 'DONE', 'DELETED', 'ABORTING'] + + out_dict = self.create_job(app, version=version, params=params, files=files) + job_id = out_dict['id'] + flags = { s.lower(): False for s in STATES } + while True: - url = '{}/api/v1/jobs/{}'.format(ALLGO_URL, jobid) - r = requests.get(url, headers=headers, verify=verify_tls) - r.raise_for_status() - results = r.json() - if 'status' in results.keys(): - status = results['status'] - else: - status = list(results.values())[0]['status'] - if status in ['created', 'waiting', 'running', 'in progress']: - log.info("wait for job %s in status %s", jobid, status) - time.sleep(2) - else: - break - - if status != 'done': - raise Exception('Job %s failed with status %s', (jobid, status)) - - elif status == 'done' and results: - if 'id' in results.keys(): - files = results[str(jobid)].items() - else: - files = results[str(jobid)]['files'].items() - for filename, url in files: - filepath = os.path.join(outputdir, filename) - with requests.get(url, headers=headers, verify=verify_tls, stream=True) as r: - r.raise_for_status() - with open(filepath, 'wb') as f: - for chunk in r.iter_content(chunk_size=8192): - if chunk: - f.write(chunk) + sleep(sleep_duration) + + out_dict = self.job_status(job_id) + status = out_dict['status'] + + if True is verbose: + if False is flags[status]: + print("\n{}\t".format(status), end='') + flags[status] = True + else: + print(".", end='') + sys.stdout.flush() + + if status == 'done': + if True is verbose: + print("") # to get a return line + out_dict['id'] = job_id + return out_dict + + # should never goes here + return out_dict + diff --git a/setup.py b/setup.py index d8a6efd..498b4d5 100644 --- a/setup.py +++ b/setup.py @@ -5,21 +5,23 @@ import allgo setup( name='allgo', version=allgo.__version__, - packages=find_packages(), - author="Sebastien Campion", - author_email="sebastien.campion@inria.fr", - description="AllGo client module", + install_requires=['requests'], + packages=find_packages(exclude=['doc', 'test*']), + author="Sebastien Letort", + author_email="sebastien.letort@inria.fr", + description="AllGo client-API module", long_description=open('README.md').read(), long_description_content_type='text/markdown', include_package_data=True, - url='https://gitlab.inria.fr/allgo/client', + url='https://gitlab.inria.fr/allgo/api-clients/python_client', classifiers=[ - "Programming Language :: Python", - "Development Status :: 1 - Planning", + "Programming Language :: Python :: 3", + "Development Status :: 3 - Alpha", "License :: OSI Approved", "Natural Language :: French", "Operating System :: OS Independent", "Programming Language :: Python :: 2.7", + "Programming Language :: Python :: 3", "Topic :: Communications", ], license="AGPL", -- GitLab