core.py 6.12 KB
Newer Older
1 2 3 4 5 6
"""Core functions implementing the main behaviors.

Call example in main.

"""
import os
7 8
import time
import json
9 10 11
import clyngor
import tempfile
from PIL import Image
12
from collections import OrderedDict
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
from . import utils
from . import Script
from . import asp_to_dot
from . import dot_writer
from . import module_loader


EXT_TO_TYPE = utils.reverse_dict({
    'Python': {'.py'},
    'ASP': {'.lp'},
    'json/ASP': {'.json'},
}, multiple_values=True, aggregator=lambda x: next(iter(x)))
LOADABLE = {'Python', 'ASP', 'json/ASP'}


28 29 30 31
def single_image_from_filenames(fnames:[str], outfile:str=None, dotfile:str=None, return_image:bool=True, verbosity:int=0) -> Image or None:
    pipeline = build_pipeline(fnames, verbosity)
    final_context = run(pipeline, verbosity=verbosity)
    return compile_to_single_image(final_context, outfile=outfile, dotfile=dotfile, return_image=return_image, verbosity=verbosity)
32 33


34 35 36 37 38 39 40 41 42
def gif_from_filenames(fnames:[str], giffile:str, dotfile_template:str=None, duration:int=1000, verbosity:int=0) -> str:
    """Make a gif, with each ASP model as an image. Save it in outfile and dotfile_template"""
    pipeline = build_pipeline(fnames, verbosity)
    final_context = run(pipeline, verbosity=verbosity)
    first, *lasts = compile_to_images(final_context, dotfile_template=dotfile_template, return_image=True, verbosity=verbosity)
    first.save(giffile, save_all=True, append_images=lasts, duration=duration, loop=0)
    return giffile


43
def build_pipeline(fnames:[str], verbosity:int=0) -> [Script]:
44 45
    "Yield scripts found in given filenames"
    for fname in fnames:
46 47 48
        if isinstance(fname, Script):
            yield fname
            continue
49 50 51
        ext = os.path.splitext(fname)[1]
        ftype = EXT_TO_TYPE.get(ext, 'unknow type')
        if ftype not in LOADABLE:
Lucas Bourneuf's avatar
Lucas Bourneuf committed
52
            raise ValueError(f"The type '{ftype}' (extension {ext}) can't be loaded")
53 54
        yield from module_loader.build_scripts_from_file(fname)

55 56 57 58
def build_pipeline_from_configfile(config:str, verbosity:int=0) -> [Script]:
    with open(config) as fd:
        configdata = json.loads(fd.read(), object_pairs_hook=OrderedDict)
    yield from build_pipeline_from_json(configdata)
59

60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
def build_pipeline_from_json(jsondata:dict, verbosity:int=0) -> [Script]:
    if isinstance(jsondata, list):
        for item in jsondata:
            yield from build_pipeline_from_json(item)
    for name, options in jsondata.items():
        scripts = module_loader.build_scripts_from_file(name)
        for script in scripts:
            script.options_values.update(options)
            yield script

build_pipeline.from_configfile = build_pipeline_from_configfile
build_pipeline.from_json = build_pipeline_from_json


def run(scripts:[Script], initial_context:str='', verbosity:int=0) -> str:
    if verbosity >= 1:
        scripts = tuple(scripts)
        print(f"RUNNING {len(scripts)} SCRIPTS…")
    run_start = time.time()
79
    context = initial_context
80 81 82 83 84 85 86 87 88 89 90 91 92 93
    for idx, script in enumerate(scripts, start=1):
        script_start = time.time()
        if script.input_mode is str:
            input_data = context
        else:  # need a solving
            assert script.input_mode is iter
            input_data = solve_context(context)
        if verbosity >= 2:
            print('RUN run_on…', end='', flush=True)
        new_context = script.run_on(input_data, **script.options_values)
        if verbosity >= 2:
            print('OK!')
        # if verbosity >= 3:
            # print('NEW CONTEXT:', new_context)
94
        if script.erase_context:
95
            context = new_context
96
        else:
97 98 99 100 101 102 103
            context += '\n' + new_context
        script_time = round(time.time() - script_start, 2)
        if verbosity >= 1:
            print(f"SCRIPT {idx}: {script.name} add {len(new_context.splitlines())} lines to the context in {script_time}s.")
    run_time = round(time.time() - run_start, 2)
    if verbosity >= 1:
        print(f"RUN {len(context.splitlines())} lines built in {run_time}s.")
104 105 106
    return context


107 108 109 110 111 112
def solve_context(context:str) -> clyngor.Answers:
    return clyngor.solve(inline=context).by_predicate.careful_parsing.int_not_parsed


def compile_to_single_image(context:str, outfile:str=None, dotfile:str=None,
                            return_image:bool=True, verbosity:int=0) -> Image or None:
113 114
    "Return a pillow.Image object, or write it to outfile if given"
    configs = asp_to_dot.visual_config_from_asp(
115
        solve_context(context)
116 117 118 119 120 121 122 123 124 125 126 127 128
    )
    dot = dot_writer.one_graph_from_configs(configs)
    del_outfile = False
    if outfile is None:
        with tempfile.NamedTemporaryFile(delete=False) as fd:
            outfile = fd.name
        del_outfile = True
    dot = dot_writer.dot_to_png(dot, outfile, dotfile=dotfile)
    if return_image:
        img = Image.open(outfile)
        if del_outfile:
            os.unlink(outfile)
        return img
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161


def compile_to_images(context:str, outfile_template:str=None, dotfile_template:str=None,
                      return_image:bool=True, verbosity:int=0) -> [Image]:
    """Yield pillow.Image objects, and write it to outfile if given

    outfile_template -- template name for png files, or None
    dotfile_template -- template name for dot files, or None

    """
    configs = asp_to_dot.visual_config_from_asp(
        solve_context(context)
    )
    if outfile_template and '{}' not in outfile_template:
        raise ValueError("Outfile argument is not a valid template")
    if dotfile_template and '{}' not in dotfile_template:
        raise ValueError("Dotfile argument is not a valid template")
    dots = dot_writer.multiple_graphs_from_configs(configs)
    for idx, dot in enumerate(dots, start=1):
        del_outfile = False
        if outfile_template is None:
            with tempfile.NamedTemporaryFile(delete=False) as fd:
                outfile = fd.name
            del_outfile = True
        else:
            outfile = outfile_template.format(idx)
        dotfile = dotfile_template.format(idx) if dotfile_template else None
        dot = dot_writer.dot_to_png(dot, outfile, dotfile=dotfile)
        if return_image:
            img = Image.open(outfile)
            if del_outfile:
                os.unlink(outfile)
            yield img