module_loader.py 14.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
# encoding: utf8
"""Load and validate a python module received as a filename.

"""
import os
import re
import glob
import json
import inspect
import textwrap
import importlib
import itertools
from functools import partial
14
from collections import defaultdict
15 16 17

import clyngor
from biseau import utils
18 19
from biseau import script as script_functions
from biseau.script import Script, Module
20 21


Lucas Bourneuf's avatar
Lucas Bourneuf committed
22
DEFAULT_DOC = 'NO SCRIPT DOC PROVIDED.\nFix this by writing a module documentation inside script definition.'
23
RETURNS_TYPES = {iter, str}
24 25
OPTIONS_TYPES = {int, float, bool, str, open, (open, 'r'), (open, 'w')}
TYPE_DEFAULT = {int: 0, float: 0., bool: False, str: '', open: None}
26 27 28 29 30 31
REGEX_OPTION_DESC = re.compile(r'([a-zA-Z0-9_]+)\s*--\s*(.+)$')

class ScriptError(ValueError):
    pass


32
def gen_files_in_dir(dirname:str, extensions:[str]=('py', 'lp', 'json'),
33
                       filter_prefixes:[str]='_') -> (str, str):
34
    "Yield candidate scripts in given dir, based on file extension"
35 36 37
    yield from (
        fname
        for fname in map(os.path.basename, glob.glob('{}/*.{{{}}}'.format(dirname, ','.join(extensions))))
38
        if not filter_prefixes or (filer_prefixes and not fname.startswith(filter_prefixes))
39 40 41
    )


42 43 44 45 46
def build_scripts_from_dir(dirname:str='scripts', options:dict={}) -> [Script]:
    "Yield all scripts found in given directory (not recursive)"
    for file in gen_files_in_dir(dirname):
        yield from build_scripts_from_file(file, options)

47
def build_scripts_from_file(fname:str, options:dict={}) -> [Script]:
48
    "Yield all scripts found in given file (note that only JSON files can define multiple scripts)"
49 50 51 52 53 54
    name, ext = os.path.splitext(fname)
    if ext == '.json':
        yield from build_scripts_from_json_file(fname)
    elif ext == '.py':
        try:
            script = build_python_script_from_name(name)
55
            yield script
56 57 58 59
        except ScriptError as err:
            print('SCRIPT ERROR:', str(err))
    elif ext == '.lp':
        yield build_asp_script_from_name(fname)
60 61 62 63 64 65 66
    elif fname.upper().startswith('ASP'):
        if isinstance(options, (list, tuple, str)):
            yield from build_scripts_from_asp_code(options)
        else:
            print(f"Unknow type for ASP code: {type(options)}")
    else:
        print(f"WARNING file '{fname}' was not recognized")
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122


def merge_scripts_lists(*scripts_lists:iter) -> iter:
    """Yield scripts, ordered according to their dependancies"""
    yield from sort_scripts_per_dependancies(itertools.chain.from_iterable(scripts_lists))


def sort_scripts_per_dependancies(scripts:iter) -> iter:
    """Topological sort of scripts based on their inputs/outputs.

    Do not handle scripts interdependancies.

    """
    scripts = tuple(scripts)
    inputs = {script: frozenset(script.inputs) for script in scripts}
    outputs = {script: frozenset(script.outputs) for script in scripts}
    yield from topological_sort_by_io(inputs, outputs)


def topological_sort_by_io(inputs:dict, outputs:dict) -> iter:
    """Yield keys of inputs and outputs so that a value yielded after another
    is either in need of the previous's outputs, or unrelated.

    inputs -- mapping {value: {input}}
    outputs -- mapping {value: {output}}

    """
    # decide {pred: {succs}} for scripts
    topology = defaultdict(set)
    for script, input in inputs.items():
        topology[script]  # just ensure there is one
        for maybe_pred, output in outputs.items():
            if input & output:
                topology[maybe_pred].add(script)
    successors = frozenset(itertools.chain.from_iterable(topology.values()))
    sources = {script for script in topology if script not in successors}
    # compute source, and decide a path
    prev_len = None
    while topology:  # while catch cycles
        while len(topology) != prev_len:
            prev_len = len(topology)
            yield from sources
            topology = {script: {succ for succ in succs if succ not in sources}
                        for script, succs in topology.items()
                        if script not in sources}
            successors = frozenset(itertools.chain.from_iterable(topology.values()))
            sources = {script for script in topology if script not in successors}
        if topology:  # there is at least one cycle
            # take a predecessor, say it is a source
            forced_source = next(iter(topology.keys()))
            sources = {forced_source}
            prev_len = None


def build_python_script_from_name(module_name) -> Script:
    path = module_name.replace('/', '.')
123 124 125 126 127 128 129 130 131 132 133 134 135 136
    import_as_module = True
    try:
        module = importlib.import_module(path)
    except TypeError:
        import_as_module = False

    if import_as_module:
        # Reload needed because the module itself is
        #  modified by build_script_from_module
        module = importlib.reload(module)
        return build_script_from_module(module)
    else:  # just load it savagely
        # module_name is not importable directly
        return build_script_from_json({'python file': module_name + '.py'})
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175


def build_asp_script_from_name(fname:str) -> str:
    with open(fname) as fd:
        asp_code = fd.read()
    name = os.path.splitext(os.path.basename(fname))[0]
    name.replace('_', ' ')
    with open(fname) as fd:
        description = []
        for line in fd:
            if line.startswith('% '):
                description.append(line[2:])
            else: break
    description = '\n'.join(description)
    # reuse the json interface
    return build_script_from_json({
        'name': name,
        'ASP': asp_code,
        'description': description,
        'inputs': [],
        'outputs': [],  # TODO: search for #show's in the file
    })


def build_scripts_from_json_file(fname:str) -> [Script]:
    """Yield Script instances found in given file in JSON format"""
    with open(fname) as fd:
        data = json.load(fd)
    if isinstance(data, list):  # multiple scripts
        for payload in data:
            yield build_script_from_json(payload)
    elif isinstance(data, dict):  # only one
        yield build_script_from_json(data)
    else:
        raise ScriptError("Given json file {} is not correctly formatted. "
                          "First object should be a list or a dict, not a {}"
                          "".format(fname, type(data)))


176 177 178 179 180 181 182 183 184
def build_scripts_from_asp_code(data:str or list) -> [Script]:
    """Yield one Script instance initialized with given source code"""
    if isinstance(data, (tuple, list)):  # multiple scripts
        for source in data:
            yield from build_scripts_from_asp_code(source)
    elif isinstance(data, str):  # only one
        yield build_script_from_json({
            'name': 'inline ASP code',
            'ASP': data,
185
            'language': 'asp',
186 187 188 189 190 191 192
            'description': 'inline ASP code',
            'inputs': [],
            'outputs': [],  # TODO: search for #show's in the file
        })
    else:
        raise ScriptError(f"Given ASP source of type {type(data)} can't be handled.")

193

194 195 196 197 198
def build_script_from_json(module_def:dict) -> Script:
    """From given JSON build a Script instance"""
    module = Module()

    # I/O
199 200
    module.INPUTS = frozenset(module_def.get('inputs', ()))
    module.OUTPUTS = frozenset(module_def.get('outputs', ()))
201 202

    # Fields
203
    module.NAME = module_def.get('name', 'unamed script')
204 205 206
    if 'tags' in module_def: module.TAGS = frozenset(module_def['tags'])
    module.__doc__ = module_def.get('description', DEFAULT_DOC)

207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
    # building the run_on function
    if 'ASP file' in module_def:
        module.language = 'asp file'
        module.source_code = module_def['ASP file']
    elif 'ASP' in module_def:
        module.language = 'asp'
        module.source_code = module_def['ASP']
    elif 'python file' in module_def:
        module.language = 'python file'
        module.source_code = module_def['python file']
    elif 'python' in module_def:
        module.language = 'python'
        module.source_code = module_def['python']
    else:
        raise ValueError(f"JSON script {module.NAME} do not have any code field ('ASP' "
                         "or 'ASP file' for instance). If this script was "
                         "generated with Biseau, it's possible that you're "
                         "using an older version than the script creator."
                         "")
226 227 228
    return build_script_from_module(module)


229
def build_script_from_module(module, *, defaults:dict={}) -> Script or ScriptError:
230 231 232 233 234 235
    """Low level function. Expect module to be a python module, or a namespace
    emulating one.

    Will try hard to invalidate given module. If it seems valid, return
    a Script instance describing and referencing the module.

236 237
    defaults -- mapping from (name, tags, erase_context) to a
                default value to use if the module does not provide it.
238

239
    """
240 241 242 243
    if not hasattr(module, '__doc__'):
        bad_script_error(module, "Docstring (description) is missing")


244 245 246 247
    if hasattr(module, 'run_on'):
        run_on_func = {'run_on': module.run_on}
        args = inspect.getfullargspec(module.run_on)
        # print('\nSCRIPT ARGS:', module.NAME, args)
248

249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272
        # Return type
        if inspect.isgeneratorfunction(module.run_on):
            pass
        elif inspect.isfunction(module.run_on) and args.annotations.get('return', str) == str:
            pass
        else:
            bad_script_error(module, "run_on object must be a generator of string"
                             " or a function returning a string, not a {}"
                             "".format(type(module.run_on)))

        # Input mode
        first_arg = args.args[0]
        if first_arg == 'context':
            input_mode = str
        elif first_arg == 'models':
            input_mode = iter
        else:
            bad_script_error(module, "run_on first arg must be either 'context' or"
                             " 'models', not a {}".format(first_arg))

        # detect options
        options = []  # list of (arg name, arg type, default, description)
        for arg in args.kwonlyargs:
            argtype = args.annotations.get(arg)
273 274 275 276
            isgroup = isinstance(argtype, (tuple, list, set, frozenset))
            if not isgroup and argtype not in OPTIONS_TYPES:
                bad_script_error(module, "Option {} does not have a valid annotation "
                                 "({}). Only tuples, lists, (frozen)sets, and primitive types such as {} are accepted"
277
                                 "".format(arg, argtype, ', '.join(map(str, OPTIONS_TYPES))))
278 279 280 281
            if isgroup:  # pick an element or a subset of elements as default
                default = argtype[0] if isinstance(argtype, (tuple, list)) else frozenset()
            else:
                default = args.kwonlydefaults.get(arg, TYPE_DEFAULT.get(argtype))
282 283 284 285 286 287 288 289 290 291 292 293
            options.append((arg, argtype, default))
        default_options = {arg: default for arg, _, default in options}

        # add the descriptions to options
        options_descriptions = options_description_from_module(
            module, frozenset(default_options.keys()))
        options = tuple((arg, type, default, options_descriptions.get(arg, ''))
                        for arg, type, default in options)
        # TODO: detect non keyword only parameters, and check their validity.

    else:  # no run_on function… the source code and the language will be enough, we hope…
        options, default_options = (), {}
294
        input_mode = str
295 296 297
        run_on_func = {}

    # source code data
298 299
    source_code = getattr(module, 'source_code', None)
    language = getattr(module, 'language', 'python')
300

301 302 303
    # detect trivias
    tags = frozenset(getattr(module, 'TAGS', defaults.get('tags', {'undefined'})))
    doc = '\n'.join(textwrap.wrap(textwrap.dedent(module.__doc__).strip()))
304 305 306

    # build and return the Script instance
    return Script(
307
        name=getattr(module, 'NAME', defaults.get('name', 'unamed module')),
308 309 310 311
        description=doc,
        tags=tags,
        module=module,
        options=tuple(options),
312
        options_values={},
313 314 315
        input_mode=input_mode,
        incompatible=frozenset(getattr(module, 'INCOMPATIBLE', ())),
        **_build_and_validate_io(module, default_options),
316 317
        source_code=source_code,
        language=language,
318 319
        erase_context=bool(getattr(module, 'ERASE_CONTEXT', defaults.get('erase_context', False))),
        **run_on_func
320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374
    )


def bad_script_error(script, msg:str):
    """Helper to raise errors while building a script"""
    raise ScriptError("Module {} is not a valid script. {}."
                      ''.format(script, msg))


def options_description_from_module(module, options, regex=REGEX_OPTION_DESC) -> dict:
    """Return found description for given options in module"""
    if not module.run_on.__doc__: return {}
    ret = {}  # option: description
    lines = module.run_on.__doc__.splitlines(False)
    for line in lines:
        match = regex.fullmatch(line.strip())
        if match:
            name, desc = match.groups()
            if name in options:
                ret[name] = desc.strip()
    return ret


def _build_and_validate_io(module, default_options:dict={}) -> {str: callable}:
    """Return spec_inputs, spec_outputs, inputs and outputs functions
    built from given module.

    module -- the module containing the things
    default_options -- the options to send to inputs and outputs functions

    return -- the dict {field name: function}, usable directly to create
    a Script instance.

    """
    fields = {}  # field name: field value

    IN = lambda *_, **__: frozenset(getattr(module, 'INPUTS', ()))
    OUT = lambda *_, **__: frozenset(getattr(module, 'OUTPUTS', ()))
    fields['spec_inputs'] = getattr(module, 'inputs', IN)
    fields['spec_outputs'] = getattr(module, 'outputs', OUT)
    fields['inputs'] = IN
    fields['outputs'] = OUT


    # Verify that their are functions, and well dev
    for field in ('spec_inputs', 'spec_outputs', 'inputs', 'outputs'):
        func = fields[field]
        if not callable(func):
            bad_script_error(module, 'Attribute {} is not a function'.format(func))
        if not field.startswith('spec_'):  # it's a class method, not an instance one
            retvalue = func()
            if not isinstance(retvalue, (set, frozenset)):
                bad_script_error(module, "Function {} should return a (frozen)set, "
                                 "not {}".format(func.__name__, type(retvalue)))
    return fields