module_loader.py 13.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
# encoding: utf8
"""Load and validate a python module received as a filename.

"""
import os
import re
import glob
import json
import inspect
import textwrap
import importlib
import itertools
import traceback
from functools import partial
15
from collections import defaultdict
16 17 18

import clyngor
from biseau import utils
19 20
from biseau import script as script_functions
from biseau.script import Script, Module
21 22


Lucas Bourneuf's avatar
Lucas Bourneuf committed
23
DEFAULT_DOC = 'NO SCRIPT DOC PROVIDED.\nFix this by writing a module documentation inside script definition.'
24 25 26 27 28 29 30 31 32
RETURNS_TYPES = {iter, str}
OPTIONS_TYPES = {int, float, bool, str, list, open, partial}
TYPE_DEFAULT = {int: 0, float: 0., bool: False, str: '', list: (), open: None, partial: None}
REGEX_OPTION_DESC = re.compile(r'([a-zA-Z0-9_]+)\s*--\s*(.+)$')

class ScriptError(ValueError):
    pass


33
def gen_files_in_dir(dirname:str, extensions:[str]=('py', 'lp', 'json'),
34
                       filter_prefixes:[str]='_') -> (str, str):
35
    "Yield candidate scripts in given dir, based on file extension"
36 37 38
    yield from (
        fname
        for fname in map(os.path.basename, glob.glob('{}/*.{{{}}}'.format(dirname, ','.join(extensions))))
39
        if not filter_prefixes or (filer_prefixes and not fname.startswith(filter_prefixes))
40 41 42
    )


43 44 45 46 47
def build_scripts_from_dir(dirname:str='scripts', options:dict={}) -> [Script]:
    "Yield all scripts found in given directory (not recursive)"
    for file in gen_files_in_dir(dirname):
        yield from build_scripts_from_file(file, options)

48
def build_scripts_from_file(fname:str, options:dict={}) -> [Script]:
49
    "Yield all scripts found in given file (note that only JSON files can define multiple scripts)"
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
    name, ext = os.path.splitext(fname)
    if ext == '.json':
        yield from build_scripts_from_json_file(fname)
    elif ext == '.py':
        try:
            script = build_python_script_from_name(name)
            if script.disabled:
                if isinstance(script.disabled, str):
                    print('SCRIPT {} DISABLED:', script.disabled)
            else:
                yield script
        except ScriptError as err:
            print('SCRIPT ERROR:', str(err))
    elif ext == '.lp':
        yield build_asp_script_from_name(fname)
65 66 67 68 69 70 71
    elif fname.upper().startswith('ASP'):
        if isinstance(options, (list, tuple, str)):
            yield from build_scripts_from_asp_code(options)
        else:
            print(f"Unknow type for ASP code: {type(options)}")
    else:
        print(f"WARNING file '{fname}' was not recognized")
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171


def merge_scripts_lists(*scripts_lists:iter) -> iter:
    """Yield scripts, ordered according to their dependancies"""
    yield from sort_scripts_per_dependancies(itertools.chain.from_iterable(scripts_lists))


def sort_scripts_per_dependancies(scripts:iter) -> iter:
    """Topological sort of scripts based on their inputs/outputs.

    Do not handle scripts interdependancies.

    """
    scripts = tuple(scripts)
    inputs = {script: frozenset(script.inputs) for script in scripts}
    outputs = {script: frozenset(script.outputs) for script in scripts}
    yield from topological_sort_by_io(inputs, outputs)


def topological_sort_by_io(inputs:dict, outputs:dict) -> iter:
    """Yield keys of inputs and outputs so that a value yielded after another
    is either in need of the previous's outputs, or unrelated.

    inputs -- mapping {value: {input}}
    outputs -- mapping {value: {output}}

    """
    # decide {pred: {succs}} for scripts
    topology = defaultdict(set)
    for script, input in inputs.items():
        topology[script]  # just ensure there is one
        for maybe_pred, output in outputs.items():
            if input & output:
                topology[maybe_pred].add(script)
    successors = frozenset(itertools.chain.from_iterable(topology.values()))
    sources = {script for script in topology if script not in successors}
    # compute source, and decide a path
    prev_len = None
    while topology:  # while catch cycles
        while len(topology) != prev_len:
            prev_len = len(topology)
            yield from sources
            topology = {script: {succ for succ in succs if succ not in sources}
                        for script, succs in topology.items()
                        if script not in sources}
            successors = frozenset(itertools.chain.from_iterable(topology.values()))
            sources = {script for script in topology if script not in successors}
        if topology:  # there is at least one cycle
            # take a predecessor, say it is a source
            forced_source = next(iter(topology.keys()))
            sources = {forced_source}
            prev_len = None


def build_python_script_from_name(module_name) -> Script:
    path = module_name.replace('/', '.')
    module = importlib.import_module(path)
    # Reload needed because the module itself is
    #  modified by build_script_from_module
    module = importlib.reload(module)
    return build_script_from_module(module)


def build_asp_script_from_name(fname:str) -> str:
    with open(fname) as fd:
        asp_code = fd.read()
    name = os.path.splitext(os.path.basename(fname))[0]
    name.replace('_', ' ')
    with open(fname) as fd:
        description = []
        for line in fd:
            if line.startswith('% '):
                description.append(line[2:])
            else: break
    description = '\n'.join(description)
    # reuse the json interface
    return build_script_from_json({
        'name': name,
        'ASP': asp_code,
        'description': description,
        'inputs': [],
        'outputs': [],  # TODO: search for #show's in the file
    })


def build_scripts_from_json_file(fname:str) -> [Script]:
    """Yield Script instances found in given file in JSON format"""
    with open(fname) as fd:
        data = json.load(fd)
    if isinstance(data, list):  # multiple scripts
        for payload in data:
            yield build_script_from_json(payload)
    elif isinstance(data, dict):  # only one
        yield build_script_from_json(data)
    else:
        raise ScriptError("Given json file {} is not correctly formatted. "
                          "First object should be a list or a dict, not a {}"
                          "".format(fname, type(data)))


172 173 174 175 176 177 178 179 180
def build_scripts_from_asp_code(data:str or list) -> [Script]:
    """Yield one Script instance initialized with given source code"""
    if isinstance(data, (tuple, list)):  # multiple scripts
        for source in data:
            yield from build_scripts_from_asp_code(source)
    elif isinstance(data, str):  # only one
        yield build_script_from_json({
            'name': 'inline ASP code',
            'ASP': data,
181
            'language': 'asp',
182 183 184 185 186 187 188
            'description': 'inline ASP code',
            'inputs': [],
            'outputs': [],  # TODO: search for #show's in the file
        })
    else:
        raise ScriptError(f"Given ASP source of type {type(data)} can't be handled.")

189

190 191 192 193 194
def build_script_from_json(module_def:dict) -> Script:
    """From given JSON build a Script instance"""
    module = Module()

    # I/O
195 196
    module.INPUTS = frozenset(module_def.get('inputs', ()))
    module.OUTPUTS = frozenset(module_def.get('outputs', ()))
197 198

    # Fields
199
    module.NAME = module_def.get('name', 'unamed script')
200 201 202
    if 'tags' in module_def: module.TAGS = frozenset(module_def['tags'])
    module.__doc__ = module_def.get('description', DEFAULT_DOC)

203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
    # building the run_on function
    if 'ASP file' in module_def:
        module.language = 'asp file'
        module.source_code = module_def['ASP file']
    elif 'ASP' in module_def:
        module.language = 'asp'
        module.source_code = module_def['ASP']
    elif 'python file' in module_def:
        module.language = 'python file'
        module.source_code = module_def['python file']
    elif 'python' in module_def:
        module.language = 'python'
        module.source_code = module_def['python']
    else:
        raise ValueError(f"JSON script {module.NAME} do not have any code field ('ASP' "
                         "or 'ASP file' for instance). If this script was "
                         "generated with Biseau, it's possible that you're "
                         "using an older version than the script creator."
                         "")
222 223 224
    return build_script_from_module(module)


225
def build_script_from_module(module, *, defaults:dict={}) -> Script or ScriptError:
226 227 228 229 230 231
    """Low level function. Expect module to be a python module, or a namespace
    emulating one.

    Will try hard to invalidate given module. If it seems valid, return
    a Script instance describing and referencing the module.

232 233
    defaults -- mapping from (name, tags, erase_context) to a
                default value to use if the module does not provide it.
234

235
    """
236 237 238 239
    if not hasattr(module, '__doc__'):
        bad_script_error(module, "Docstring (description) is missing")


240 241 242 243
    if hasattr(module, 'run_on'):
        run_on_func = {'run_on': module.run_on}
        args = inspect.getfullargspec(module.run_on)
        # print('\nSCRIPT ARGS:', module.NAME, args)
244

245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285
        # Return type
        if inspect.isgeneratorfunction(module.run_on):
            pass
        elif inspect.isfunction(module.run_on) and args.annotations.get('return', str) == str:
            pass
        else:
            bad_script_error(module, "run_on object must be a generator of string"
                             " or a function returning a string, not a {}"
                             "".format(type(module.run_on)))

        # Input mode
        first_arg = args.args[0]
        if first_arg == 'context':
            input_mode = str
        elif first_arg == 'models':
            input_mode = iter
        else:
            bad_script_error(module, "run_on first arg must be either 'context' or"
                             " 'models', not a {}".format(first_arg))

        # detect options
        options = []  # list of (arg name, arg type, default, description)
        for arg in args.kwonlyargs:
            argtype = args.annotations.get(arg)
            if argtype not in OPTIONS_TYPES and not isinstance(argtype, partial):
                bad_script_error(module, "Option {} do not have a valid annotation "
                                 "({}). Only {} are accepted"
                                 "".format(arg, argtype, ', '.join(map(str, OPTIONS_TYPES))))
            default = args.kwonlydefaults.get(arg, TYPE_DEFAULT.get(argtype))
            options.append((arg, argtype, default))
        default_options = {arg: default for arg, _, default in options}

        # add the descriptions to options
        options_descriptions = options_description_from_module(
            module, frozenset(default_options.keys()))
        options = tuple((arg, type, default, options_descriptions.get(arg, ''))
                        for arg, type, default in options)
        # TODO: detect non keyword only parameters, and check their validity.

    else:  # no run_on function… the source code and the language will be enough, we hope…
        options, default_options = (), {}
286
        input_mode = str
287 288 289
        run_on_func = {}

    # source code data
290 291
    source_code = getattr(module, 'source_code', None)
    language = getattr(module, 'language', 'python')
292

293 294 295
    # detect trivias
    tags = frozenset(getattr(module, 'TAGS', defaults.get('tags', {'undefined'})))
    doc = '\n'.join(textwrap.wrap(textwrap.dedent(module.__doc__).strip()))
296 297 298

    # build and return the Script instance
    return Script(
299
        name=getattr(module, 'NAME', defaults.get('name', 'unamed module')),
300 301 302 303
        description=doc,
        tags=tags,
        module=module,
        options=tuple(options),
304
        options_values={},
305 306 307
        input_mode=input_mode,
        incompatible=frozenset(getattr(module, 'INCOMPATIBLE', ())),
        **_build_and_validate_io(module, default_options),
308 309
        source_code=source_code,
        language=language,
310 311
        erase_context=bool(getattr(module, 'ERASE_CONTEXT', defaults.get('erase_context', False))),
        **run_on_func
312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366
    )


def bad_script_error(script, msg:str):
    """Helper to raise errors while building a script"""
    raise ScriptError("Module {} is not a valid script. {}."
                      ''.format(script, msg))


def options_description_from_module(module, options, regex=REGEX_OPTION_DESC) -> dict:
    """Return found description for given options in module"""
    if not module.run_on.__doc__: return {}
    ret = {}  # option: description
    lines = module.run_on.__doc__.splitlines(False)
    for line in lines:
        match = regex.fullmatch(line.strip())
        if match:
            name, desc = match.groups()
            if name in options:
                ret[name] = desc.strip()
    return ret


def _build_and_validate_io(module, default_options:dict={}) -> {str: callable}:
    """Return spec_inputs, spec_outputs, inputs and outputs functions
    built from given module.

    module -- the module containing the things
    default_options -- the options to send to inputs and outputs functions

    return -- the dict {field name: function}, usable directly to create
    a Script instance.

    """
    fields = {}  # field name: field value

    IN = lambda *_, **__: frozenset(getattr(module, 'INPUTS', ()))
    OUT = lambda *_, **__: frozenset(getattr(module, 'OUTPUTS', ()))
    fields['spec_inputs'] = getattr(module, 'inputs', IN)
    fields['spec_outputs'] = getattr(module, 'outputs', OUT)
    fields['inputs'] = IN
    fields['outputs'] = OUT


    # Verify that their are functions, and well dev
    for field in ('spec_inputs', 'spec_outputs', 'inputs', 'outputs'):
        func = fields[field]
        if not callable(func):
            bad_script_error(module, 'Attribute {} is not a function'.format(func))
        if not field.startswith('spec_'):  # it's a class method, not an instance one
            retvalue = func()
            if not isinstance(retvalue, (set, frozenset)):
                bad_script_error(module, "Function {} should return a (frozen)set, "
                                 "not {}".format(func.__name__, type(retvalue)))
    return fields