module_loader.py 14 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
# encoding: utf8
"""Load and validate a python module received as a filename.

"""
import os
import re
import glob
import json
import inspect
import textwrap
import importlib
import itertools
from functools import partial
14
from collections import defaultdict
15 16 17

import clyngor
from biseau import utils
18 19
from biseau import script as script_functions
from biseau.script import Script, Module
20 21


Lucas Bourneuf's avatar
Lucas Bourneuf committed
22
DEFAULT_DOC = 'NO SCRIPT DOC PROVIDED.\nFix this by writing a module documentation inside script definition.'
23
RETURNS_TYPES = {iter, str}
24 25
OPTIONS_TYPES = {int, float, bool, str, open, (open, 'r'), (open, 'w')}
TYPE_DEFAULT = {int: 0, float: 0., bool: False, str: '', open: None}
26 27 28 29 30 31
REGEX_OPTION_DESC = re.compile(r'([a-zA-Z0-9_]+)\s*--\s*(.+)$')

class ScriptError(ValueError):
    pass


32
def gen_files_in_dir(dirname:str, extensions:[str]=('py', 'lp', 'json'),
33
                       filter_prefixes:[str]='_') -> (str, str):
34
    "Yield candidate scripts in given dir, based on file extension"
35 36 37
    yield from (
        fname
        for fname in map(os.path.basename, glob.glob('{}/*.{{{}}}'.format(dirname, ','.join(extensions))))
38
        if not filter_prefixes or (filer_prefixes and not fname.startswith(filter_prefixes))
39 40 41
    )


42 43 44 45 46
def build_scripts_from_dir(dirname:str='scripts', options:dict={}) -> [Script]:
    "Yield all scripts found in given directory (not recursive)"
    for file in gen_files_in_dir(dirname):
        yield from build_scripts_from_file(file, options)

47
def build_scripts_from_file(fname:str, options:dict={}) -> [Script]:
48
    "Yield all scripts found in given file (note that only JSON files can define multiple scripts)"
49 50 51 52 53 54
    name, ext = os.path.splitext(fname)
    if ext == '.json':
        yield from build_scripts_from_json_file(fname)
    elif ext == '.py':
        try:
            script = build_python_script_from_name(name)
55
            yield script
56 57 58 59
        except ScriptError as err:
            print('SCRIPT ERROR:', str(err))
    elif ext == '.lp':
        yield build_asp_script_from_name(fname)
60 61 62 63 64 65 66
    elif fname.upper().startswith('ASP'):
        if isinstance(options, (list, tuple, str)):
            yield from build_scripts_from_asp_code(options)
        else:
            print(f"Unknow type for ASP code: {type(options)}")
    else:
        print(f"WARNING file '{fname}' was not recognized")
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166


def merge_scripts_lists(*scripts_lists:iter) -> iter:
    """Yield scripts, ordered according to their dependancies"""
    yield from sort_scripts_per_dependancies(itertools.chain.from_iterable(scripts_lists))


def sort_scripts_per_dependancies(scripts:iter) -> iter:
    """Topological sort of scripts based on their inputs/outputs.

    Do not handle scripts interdependancies.

    """
    scripts = tuple(scripts)
    inputs = {script: frozenset(script.inputs) for script in scripts}
    outputs = {script: frozenset(script.outputs) for script in scripts}
    yield from topological_sort_by_io(inputs, outputs)


def topological_sort_by_io(inputs:dict, outputs:dict) -> iter:
    """Yield keys of inputs and outputs so that a value yielded after another
    is either in need of the previous's outputs, or unrelated.

    inputs -- mapping {value: {input}}
    outputs -- mapping {value: {output}}

    """
    # decide {pred: {succs}} for scripts
    topology = defaultdict(set)
    for script, input in inputs.items():
        topology[script]  # just ensure there is one
        for maybe_pred, output in outputs.items():
            if input & output:
                topology[maybe_pred].add(script)
    successors = frozenset(itertools.chain.from_iterable(topology.values()))
    sources = {script for script in topology if script not in successors}
    # compute source, and decide a path
    prev_len = None
    while topology:  # while catch cycles
        while len(topology) != prev_len:
            prev_len = len(topology)
            yield from sources
            topology = {script: {succ for succ in succs if succ not in sources}
                        for script, succs in topology.items()
                        if script not in sources}
            successors = frozenset(itertools.chain.from_iterable(topology.values()))
            sources = {script for script in topology if script not in successors}
        if topology:  # there is at least one cycle
            # take a predecessor, say it is a source
            forced_source = next(iter(topology.keys()))
            sources = {forced_source}
            prev_len = None


def build_python_script_from_name(module_name) -> Script:
    path = module_name.replace('/', '.')
    module = importlib.import_module(path)
    # Reload needed because the module itself is
    #  modified by build_script_from_module
    module = importlib.reload(module)
    return build_script_from_module(module)


def build_asp_script_from_name(fname:str) -> str:
    with open(fname) as fd:
        asp_code = fd.read()
    name = os.path.splitext(os.path.basename(fname))[0]
    name.replace('_', ' ')
    with open(fname) as fd:
        description = []
        for line in fd:
            if line.startswith('% '):
                description.append(line[2:])
            else: break
    description = '\n'.join(description)
    # reuse the json interface
    return build_script_from_json({
        'name': name,
        'ASP': asp_code,
        'description': description,
        'inputs': [],
        'outputs': [],  # TODO: search for #show's in the file
    })


def build_scripts_from_json_file(fname:str) -> [Script]:
    """Yield Script instances found in given file in JSON format"""
    with open(fname) as fd:
        data = json.load(fd)
    if isinstance(data, list):  # multiple scripts
        for payload in data:
            yield build_script_from_json(payload)
    elif isinstance(data, dict):  # only one
        yield build_script_from_json(data)
    else:
        raise ScriptError("Given json file {} is not correctly formatted. "
                          "First object should be a list or a dict, not a {}"
                          "".format(fname, type(data)))


167 168 169 170 171 172 173 174 175
def build_scripts_from_asp_code(data:str or list) -> [Script]:
    """Yield one Script instance initialized with given source code"""
    if isinstance(data, (tuple, list)):  # multiple scripts
        for source in data:
            yield from build_scripts_from_asp_code(source)
    elif isinstance(data, str):  # only one
        yield build_script_from_json({
            'name': 'inline ASP code',
            'ASP': data,
176
            'language': 'asp',
177 178 179 180 181 182 183
            'description': 'inline ASP code',
            'inputs': [],
            'outputs': [],  # TODO: search for #show's in the file
        })
    else:
        raise ScriptError(f"Given ASP source of type {type(data)} can't be handled.")

184

185 186 187 188 189
def build_script_from_json(module_def:dict) -> Script:
    """From given JSON build a Script instance"""
    module = Module()

    # I/O
190 191
    module.INPUTS = frozenset(module_def.get('inputs', ()))
    module.OUTPUTS = frozenset(module_def.get('outputs', ()))
192 193

    # Fields
194
    module.NAME = module_def.get('name', 'unamed script')
195 196 197
    if 'tags' in module_def: module.TAGS = frozenset(module_def['tags'])
    module.__doc__ = module_def.get('description', DEFAULT_DOC)

198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
    # building the run_on function
    if 'ASP file' in module_def:
        module.language = 'asp file'
        module.source_code = module_def['ASP file']
    elif 'ASP' in module_def:
        module.language = 'asp'
        module.source_code = module_def['ASP']
    elif 'python file' in module_def:
        module.language = 'python file'
        module.source_code = module_def['python file']
    elif 'python' in module_def:
        module.language = 'python'
        module.source_code = module_def['python']
    else:
        raise ValueError(f"JSON script {module.NAME} do not have any code field ('ASP' "
                         "or 'ASP file' for instance). If this script was "
                         "generated with Biseau, it's possible that you're "
                         "using an older version than the script creator."
                         "")
217 218 219
    return build_script_from_module(module)


220
def build_script_from_module(module, *, defaults:dict={}) -> Script or ScriptError:
221 222 223 224 225 226
    """Low level function. Expect module to be a python module, or a namespace
    emulating one.

    Will try hard to invalidate given module. If it seems valid, return
    a Script instance describing and referencing the module.

227 228
    defaults -- mapping from (name, tags, erase_context) to a
                default value to use if the module does not provide it.
229

230
    """
231 232 233 234
    if not hasattr(module, '__doc__'):
        bad_script_error(module, "Docstring (description) is missing")


235 236 237 238
    if hasattr(module, 'run_on'):
        run_on_func = {'run_on': module.run_on}
        args = inspect.getfullargspec(module.run_on)
        # print('\nSCRIPT ARGS:', module.NAME, args)
239

240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263
        # Return type
        if inspect.isgeneratorfunction(module.run_on):
            pass
        elif inspect.isfunction(module.run_on) and args.annotations.get('return', str) == str:
            pass
        else:
            bad_script_error(module, "run_on object must be a generator of string"
                             " or a function returning a string, not a {}"
                             "".format(type(module.run_on)))

        # Input mode
        first_arg = args.args[0]
        if first_arg == 'context':
            input_mode = str
        elif first_arg == 'models':
            input_mode = iter
        else:
            bad_script_error(module, "run_on first arg must be either 'context' or"
                             " 'models', not a {}".format(first_arg))

        # detect options
        options = []  # list of (arg name, arg type, default, description)
        for arg in args.kwonlyargs:
            argtype = args.annotations.get(arg)
264 265 266 267
            isgroup = isinstance(argtype, (tuple, list, set, frozenset))
            if not isgroup and argtype not in OPTIONS_TYPES:
                bad_script_error(module, "Option {} does not have a valid annotation "
                                 "({}). Only tuples, lists, (frozen)sets, and primitive types such as {} are accepted"
268
                                 "".format(arg, argtype, ', '.join(map(str, OPTIONS_TYPES))))
269 270 271 272
            if isgroup:  # pick an element or a subset of elements as default
                default = argtype[0] if isinstance(argtype, (tuple, list)) else frozenset()
            else:
                default = args.kwonlydefaults.get(arg, TYPE_DEFAULT.get(argtype))
273 274 275 276 277 278 279 280 281 282 283 284
            options.append((arg, argtype, default))
        default_options = {arg: default for arg, _, default in options}

        # add the descriptions to options
        options_descriptions = options_description_from_module(
            module, frozenset(default_options.keys()))
        options = tuple((arg, type, default, options_descriptions.get(arg, ''))
                        for arg, type, default in options)
        # TODO: detect non keyword only parameters, and check their validity.

    else:  # no run_on function… the source code and the language will be enough, we hope…
        options, default_options = (), {}
285
        input_mode = str
286 287 288
        run_on_func = {}

    # source code data
289 290
    source_code = getattr(module, 'source_code', None)
    language = getattr(module, 'language', 'python')
291

292 293 294
    # detect trivias
    tags = frozenset(getattr(module, 'TAGS', defaults.get('tags', {'undefined'})))
    doc = '\n'.join(textwrap.wrap(textwrap.dedent(module.__doc__).strip()))
295 296 297

    # build and return the Script instance
    return Script(
298
        name=getattr(module, 'NAME', defaults.get('name', 'unamed module')),
299 300 301 302
        description=doc,
        tags=tags,
        module=module,
        options=tuple(options),
303
        options_values={},
304 305 306
        input_mode=input_mode,
        incompatible=frozenset(getattr(module, 'INCOMPATIBLE', ())),
        **_build_and_validate_io(module, default_options),
307 308
        source_code=source_code,
        language=language,
309 310
        erase_context=bool(getattr(module, 'ERASE_CONTEXT', defaults.get('erase_context', False))),
        **run_on_func
311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365
    )


def bad_script_error(script, msg:str):
    """Helper to raise errors while building a script"""
    raise ScriptError("Module {} is not a valid script. {}."
                      ''.format(script, msg))


def options_description_from_module(module, options, regex=REGEX_OPTION_DESC) -> dict:
    """Return found description for given options in module"""
    if not module.run_on.__doc__: return {}
    ret = {}  # option: description
    lines = module.run_on.__doc__.splitlines(False)
    for line in lines:
        match = regex.fullmatch(line.strip())
        if match:
            name, desc = match.groups()
            if name in options:
                ret[name] = desc.strip()
    return ret


def _build_and_validate_io(module, default_options:dict={}) -> {str: callable}:
    """Return spec_inputs, spec_outputs, inputs and outputs functions
    built from given module.

    module -- the module containing the things
    default_options -- the options to send to inputs and outputs functions

    return -- the dict {field name: function}, usable directly to create
    a Script instance.

    """
    fields = {}  # field name: field value

    IN = lambda *_, **__: frozenset(getattr(module, 'INPUTS', ()))
    OUT = lambda *_, **__: frozenset(getattr(module, 'OUTPUTS', ()))
    fields['spec_inputs'] = getattr(module, 'inputs', IN)
    fields['spec_outputs'] = getattr(module, 'outputs', OUT)
    fields['inputs'] = IN
    fields['outputs'] = OUT


    # Verify that their are functions, and well dev
    for field in ('spec_inputs', 'spec_outputs', 'inputs', 'outputs'):
        func = fields[field]
        if not callable(func):
            bad_script_error(module, 'Attribute {} is not a function'.format(func))
        if not field.startswith('spec_'):  # it's a class method, not an instance one
            retvalue = func()
            if not isinstance(retvalue, (set, frozenset)):
                bad_script_error(module, "Function {} should return a (frozen)set, "
                                 "not {}".format(func.__name__, type(retvalue)))
    return fields