core.py 7.71 KB
Newer Older
1 2 3 4 5 6
"""Core functions implementing the main behaviors.

Call example in main.

"""
import os
Lucas Bourneuf's avatar
Lucas Bourneuf committed
7
import io
8 9
import time
import json
10 11 12
import clyngor
import tempfile
from PIL import Image
13
from collections import OrderedDict
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
from . import utils
from . import Script
from . import asp_to_dot
from . import dot_writer
from . import module_loader


EXT_TO_TYPE = utils.reverse_dict({
    'Python': {'.py'},
    'ASP': {'.lp'},
    'json/ASP': {'.json'},
}, multiple_values=True, aggregator=lambda x: next(iter(x)))
LOADABLE = {'Python', 'ASP', 'json/ASP'}


29
def single_image_from_filenames(fnames:[str], outfile:str=None, dotfile:str=None, nb_model:int=0, return_image:bool=True, dot_prog:str='dot', verbosity:int=0) -> Image or None:
30 31
    pipeline = build_pipeline(fnames, verbosity)
    final_context = run(pipeline, verbosity=verbosity)
32
    return compile_to_single_image(final_context, nb_model=nb_model, outfile=outfile, dotfile=dotfile, return_image=return_image, dot_prog=dot_prog, verbosity=verbosity)
33

34
def multiple_images_from_filenames(fnames:[str], outfile:str='out-{}.png', dotfile:str=None, nb_model:int=0, return_image:bool=True, dot_prog:str='dot', verbosity:int=0) -> Image or None:
35 36
    pipeline = build_pipeline(fnames, verbosity)
    final_context = run(pipeline, verbosity=verbosity)
37
    return tuple(compile_to_images(final_context, nb_model=nb_model, outfile_template=outfile, dotfile_template=dotfile, return_image=return_image, dot_prog=dot_prog, verbosity=verbosity))
38

39
def gif_from_filenames(fnames:[str], giffile:str=None, dotfile_template:str=None, duration:int=1000, loop:int=0, nb_model:int=0, dot_prog:str='dot', verbosity:int=0) -> bytes or str:
40 41 42
    """Make a gif, with each ASP model as an image. Save it in outfile and dotfile_template"""
    pipeline = build_pipeline(fnames, verbosity)
    final_context = run(pipeline, verbosity=verbosity)
43
    first, *lasts = compile_to_images(final_context, dotfile_template=dotfile_template, return_image=True, nb_model=nb_model, dot_prog=dot_prog, verbosity=verbosity)
Lucas Bourneuf's avatar
Lucas Bourneuf committed
44
    output = io.BytesIO() if giffile is None else giffile
45
    first.save(output, format='gif', append_images=lasts, duration=duration, loop=loop, save_all=True)
Lucas Bourneuf's avatar
Lucas Bourneuf committed
46
    return output.getvalue() if giffile is None else giffile
47 48


49
def build_pipeline(fnames:[str], verbosity:int=0) -> [Script]:
50 51
    "Yield scripts found in given filenames"
    for fname in fnames:
52 53 54
        if isinstance(fname, Script):
            yield fname
            continue
55 56 57
        ext = os.path.splitext(fname)[1]
        ftype = EXT_TO_TYPE.get(ext, 'unknow type')
        if ftype not in LOADABLE:
Lucas Bourneuf's avatar
Lucas Bourneuf committed
58
            raise ValueError(f"The type '{ftype}' (extension {ext}) can't be loaded")
59 60
        yield from module_loader.build_scripts_from_file(fname)

61 62 63 64
def build_pipeline_from_configfile(config:str, verbosity:int=0) -> [Script]:
    with open(config) as fd:
        configdata = json.loads(fd.read(), object_pairs_hook=OrderedDict)
    yield from build_pipeline_from_json(configdata)
65

66 67 68 69 70
def build_pipeline_from_json(jsondata:dict, verbosity:int=0) -> [Script]:
    if isinstance(jsondata, list):
        for item in jsondata:
            yield from build_pipeline_from_json(item)
    for name, options in jsondata.items():
71
        scripts = module_loader.build_scripts_from_file(name, options)
72
        for script in scripts:
73 74
            if isinstance(options, dict):
                script.options_values.update(options)
75 76 77 78 79 80
            yield script

build_pipeline.from_configfile = build_pipeline_from_configfile
build_pipeline.from_json = build_pipeline_from_json


81 82
def yield_run(scripts:[Script], initial_context:str='', verbosity:int=0) -> (str, float):
    "Yield context and duration found at each step of the running process"
83 84 85 86
    if verbosity >= 1:
        scripts = tuple(scripts)
        print(f"RUNNING {len(scripts)} SCRIPTS…")
    run_start = time.time()
87
    context = initial_context
88 89 90 91 92 93 94 95 96 97 98 99 100 101
    for idx, script in enumerate(scripts, start=1):
        script_start = time.time()
        if script.input_mode is str:
            input_data = context
        else:  # need a solving
            assert script.input_mode is iter
            input_data = solve_context(context)
        if verbosity >= 2:
            print('RUN run_on…', end='', flush=True)
        new_context = script.run_on(input_data, **script.options_values)
        if verbosity >= 2:
            print('OK!')
        # if verbosity >= 3:
            # print('NEW CONTEXT:', new_context)
102
        if script.erase_context:
103
            context = new_context
104
        else:
105 106 107 108
            context += '\n' + new_context
        script_time = round(time.time() - script_start, 2)
        if verbosity >= 1:
            print(f"SCRIPT {idx}: {script.name} add {len(new_context.splitlines())} lines to the context in {script_time}s.")
109
        yield context, script_time
110 111 112
    run_time = round(time.time() - run_start, 2)
    if verbosity >= 1:
        print(f"RUN {len(context.splitlines())} lines built in {run_time}s.")
113 114 115


def run(scripts:[Script], initial_context:str='', verbosity:int=0) -> str:
116 117
    "Return the final context obtained by running given pipeline on initial_context"
    out = None
118
    for out, _ in yield_run(scripts, initial_context, verbosity):
119 120
        pass
    return out
121 122


123 124
def solve_context(context:str, *, nb_model:int=0) -> clyngor.Answers:
    return clyngor.solve(inline=context, nb_model=nb_model, options='--project').by_predicate.careful_parsing.int_not_parsed
125 126 127


def compile_to_single_image(context:str, outfile:str=None, dotfile:str=None,
128
                            nb_model:int=0, return_image:bool=True,
129
                            dot_prog:str='dot', verbosity:int=0) -> Image or None:
130 131
    "Return a pillow.Image object, or write it to outfile if given"
    configs = asp_to_dot.visual_config_from_asp(
132
        solve_context(context, nb_model=nb_model)
133 134 135 136 137 138 139
    )
    dot = dot_writer.one_graph_from_configs(configs)
    del_outfile = False
    if outfile is None:
        with tempfile.NamedTemporaryFile(delete=False) as fd:
            outfile = fd.name
        del_outfile = True
140
    dot = dot_writer.dot_to_png(dot, outfile, dotfile=dotfile, prog=dot_prog)
141 142 143 144 145
    if return_image:
        img = Image.open(outfile)
        if del_outfile:
            os.unlink(outfile)
        return img
146 147 148


def compile_to_images(context:str, outfile_template:str=None, dotfile_template:str=None,
149
                      nb_model:int=0, return_image:bool=True,
150
                      dot_prog:str='dot', verbosity:int=0) -> [Image]:
151 152 153 154 155 156 157 158 159 160
    """Yield pillow.Image objects, and write it to outfile if given

    outfile_template -- template name for png files, or None
    dotfile_template -- template name for dot files, or None

    """
    if outfile_template and '{}' not in outfile_template:
        raise ValueError("Outfile argument is not a valid template")
    if dotfile_template and '{}' not in dotfile_template:
        raise ValueError("Dotfile argument is not a valid template")
161
    dots = compile_context_to_dots(context, nb_model)
162 163 164 165 166 167 168 169 170
    for idx, dot in enumerate(dots, start=1):
        del_outfile = False
        if outfile_template is None:
            with tempfile.NamedTemporaryFile(delete=False) as fd:
                outfile = fd.name
            del_outfile = True
        else:
            outfile = outfile_template.format(idx)
        dotfile = dotfile_template.format(idx) if dotfile_template else None
171
        dot = dot_writer.dot_to_png(dot, outfile, dotfile=dotfile, prog=dot_prog)
172 173 174 175 176
        if return_image:
            img = Image.open(outfile)
            if del_outfile:
                os.unlink(outfile)
            yield img
177 178 179 180 181 182 183 184


def compile_context_to_dots(context:str, nb_model:int=0) -> [str]:
    "Yield a dot string for each found model"
    configs = asp_to_dot.visual_config_from_asp(
        solve_context(context, nb_model=nb_model)
    )
    yield from dot_writer.multiple_graphs_from_configs(configs)