Commit ef67c734 authored by Lucas Bourneuf's avatar Lucas Bourneuf

first working implementation for CLI only

embeds lots of code from initial implementation of biseau,
including all scripts
include README.mkd
exclude Makefile
recursive-include biseau *.py
recursive-include scripts *.json
recursive-include scripts *.lp
recursive-include scripts *.py
prune test
prune doc
prune out
python -m biseau scripts/example.lp scripts/black_theme.json -o out/out.png
t: test
python -m pytest biseau test --ignore=venv --doctest-module
python -c "import configparser; c = configparser.ConfigParser();'setup.cfg'); print(c['options']['install_requires'])" | xargs pip install -U
python install
# Biseau
__B__uild by __I__nput __S__ome __E__asy __A__SP from __U__sers.
Helps the exploration of formal structures by using ASP as a DSL for drawing graphs.
## Principles
Now, you may want to read about:
- [visualization details](doc/user-doc.mkd#visualization-principles), that explains the theory behind the visualization itself.
- [visualization DSL](doc/user-doc.mkd#asp-to-dot), that explains which atoms produces which effect in visualization.
- [scripting](doc/user-doc.mkd#scripting), that allow you to implement new behavior for biseau.
## Installation
See [user documentation](doc/user-doc.mkd#installation) for details,
but in short, you need [clingo](,
[graphviz]( and `make install-deps`.
## Usage
See [user documentation](doc/user-doc.mkd#basic-usage) for details.
See [`Makefile`](Makefile) for various recipes.
Running biseau can be done using the following commands:
python -m biseau gui # run the Graphical User Interface
python -m biseau # run the Command Line Interface
python -m biseau --help # get help for the CLI
from .module_loader import Script
from .core import single_image_from_filenames
__version__ = '0.0.1'
"""Entry point for package
import os
import argparse
from . import core
def parse_cli(args:iter=None) -> dict:
# main parser
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('infiles', type=str, nargs='+', metavar='MODULE',
default=[], help='files containing ASP or Python code')
parser.add_argument('--outfile', '-o', type=str, default='out.png',
help="output file. Will be overwritten with png data. Can be templated with '{model_number}'")
parser.add_argument('--dotfile', '-d', type=str, default=None,
help="output file. Will be overwritten with dot data. Can be templated with '{model_number}'")
# flags
parser.add_argument('--flag-example', action='store_true',
help="Do nothing currently")
return parser.parse_args(args)
if __name__ == '__main__':
args = parse_cli()
# encoding: utf8
"""The core part of visualization, compiling ASP into dot.
import textwrap as textwrap_module
from collections import namedtuple, defaultdict
from biseau import utils
RANK_TYPES = {'same', 'min', 'source', 'max', 'sink'}
DOTABLE_PREDICATES = {'link', 'color', 'shape', 'label', 'annot', 'dot_property', 'obj_property', 'textwrap', 'rank'}
VisualConfig = namedtuple('VisualConfig', 'arcs, properties, upper_annotations, lower_annotations, global_properties, ranks')
arcs -- iterable of 2-uplet (source's uid, target's uid)
properties -- map uid -> (field -> value) and (uid, uid) -> (field -> value)
upper_annotations -- map uid -> {field -> value} specialized for annotations
lower_annotations -- map uid -> {field -> value} specialized for annotations
global_properties -- map object -> (field -> value), with object in (graph, edge, node)
ranks -- map rank-type -> iterable of sets of node
Properties are mapping directly dot properties to nodes (single uid)
or edges (two uid). This allow user to build very precisely the output dot.
def visual_config_from_asp(asp_models:iter, annotation_sep:str=' ') -> [VisualConfig]:
"""Yield VisualConfig instances initialized according to rules
found in each given asp models.
asp_models -- iterable of clyngor.Answers instances
annotation_sep -- separator between each annotation content on same node
base_atoms = defaultdict(set) # predicate -> {args}
for model in asp_models.by_predicate:
viz_atoms = defaultdict(lambda: defaultdict(set)) # level -> {predicate: {args}}
for viz in model.get('viz', ()):
if len(viz) == 1:
pred, args = viz[0]
elif len(viz) == 2: # the first param is here to define the level
pred, args = viz[1]
# put the predicate not in viz() in the base level
for predicate in DOTABLE_PREDICATES:
base_atoms[predicate] |= frozenset(model.get(predicate, ()))
if viz_atoms:
for level, atoms in viz_atoms.items():
yield visual_config_from_atoms(atoms, base_atoms, annotation_sep)
else: # no viz atoms
yield visual_config_from_atoms({}, base_atoms, annotation_sep)
def visual_config_from_atoms(atoms:dict, base_atoms:dict,
annotation_sep:str) -> VisualConfig:
arcs = []
upper_annotations = defaultdict(lambda: defaultdict(set))
lower_annotations = defaultdict(lambda: defaultdict(set))
properties = defaultdict(lambda: defaultdict(set)) # node -> (property -> {value})
global_properties = defaultdict(lambda: defaultdict(set)) # dot object -> (property -> value)
ranks = defaultdict(set) # rank-type -> {node}
max_label_width = {} # object: maximal text width
def get_atoms_of_predicate(predicate:str):
assert predicate in DOTABLE_PREDICATES, predicate
yield from base_atoms.get(predicate, ())
yield from atoms.get(predicate, ())
def get_uid_from_atom(atom:str or tuple):
if isinstance(atom, str):
return atom
elif isinstance(atom, tuple):
if len(atom) == 2: # a regular atom
if len(atom[1]) == 0:
return atom[0]
else: # atom with args
return '{}({})'.format(atom[0], ','.join(map(get_uid_from_atom, atom[1])))
raise ValueError("Malformed node uid found: " + str(atom))
for link in get_atoms_of_predicate('link'):
if len(link) == 2:
arcs.append(tuple(map(get_uid_from_atom, link)))
for args in get_atoms_of_predicate('textwrap'):
if len(args) == 1: # global value
max_label_width[None] = int(args[0])
elif len(args) == 2:
node, value = args
max_label_width[node] = int(value)
elif len(args) == 3:
src, trg, value = args
max_label_width[src, trg] = int(value)
for annotation in get_atoms_of_predicate('annot'):
if len(annotation) == 3:
type, node, content = annotation
node = get_uid_from_atom(node)
if type == 'upper':
elif type == 'lower':
elif type == 'label':
print('Unknow annotation type: {}'.format(type))
elif len(annotation) == 4: # other field
type, node, field, content = annotation
node = get_uid_from_atom(node)
if type == 'upper':
elif type == 'lower':
for property in get_atoms_of_predicate('dot_property'):
if len(property) == 3: # it's for node
node, field, value = property
node = get_uid_from_atom(node)
elif len(property) == 4: # it's for edges
src, trg, field, value = property
src, trg = map(get_uid_from_atom, (src, trg))
properties[src, trg][field.strip('"')].add(value.strip('"'))
for ranking in get_atoms_of_predicate('rank'):
if len(ranking) == 2: # rank, node
ranktype, node = ranking
if ranktype not in RANK_TYPES:
print("WARNING: atom rank({},{}) describe a rank with unknow "
"type {}. Expected types: {}."
"".format(*ranking, ranktype, ', '.join(RANK_TYPES)))
for colored in get_atoms_of_predicate('color'):
if len(colored) == 2: # node
node, color = colored
node = get_uid_from_atom(node)
elif len(colored) == 3: # edge
src, trg, color = colored
src, trg = map(get_uid_from_atom, (src, trg))
properties[src, trg]['color'].add(color) # fillcolor do not exists for edges
for shaped in get_atoms_of_predicate('shape'):
if len(shaped) == 2: # node
node, shape = shaped
node = get_uid_from_atom(node)
for labeled in get_atoms_of_predicate('label'):
if len(labeled) == 2: # node
node, label = labeled
node = get_uid_from_atom(node)
elif len(labeled) == 3: # edge
src, trg, label = labeled
src, trg = map(get_uid_from_atom, (src, trg))
properties[src, trg]['label'].add(label)
for property in get_atoms_of_predicate('obj_property'):
if len(property) == 3:
obj, field, value = map(lambda s:str.strip(s, '"'), property)
if obj not in {'graph', 'edge', 'node'}:
print('WARNING: object property {} is unexpected, and may '
'lead to error in generation.'.format(obj))
if field in global_properties:
print('WARNING: object property {} set multiple times with {} replacing {}.'
''.format(field, value, graph_properties[field]))
global_properties[obj.strip('"')][field] = value
# posttreat the data for later use
arcs = tuple(arcs)
nodes = frozenset(properties.keys())
def treat_texts(texts:iter, node, max_label_width=max_label_width) -> str:
ret = annotation_sep.join(map(get_uid_from_atom, texts)).strip('"')
text_width = max_label_width.get(node, max_label_width.get(None))
if text_width:
ret = textwrap_module.fill(ret, width=int(text_width))
return ret
for node in upper_annotations:
props = upper_annotations[node]
props.setdefault('color', 'transparent')
props.setdefault('labelangle', '90')
for key in props:
if not isinstance(props[key], str):
props[key] = treat_texts(props[key], (node, node))
for node in lower_annotations:
props = lower_annotations[node]
props.setdefault('color', 'transparent')
props.setdefault('labelangle', '270')
for key in props:
if not isinstance(props[key], str):
props[key] = treat_texts(props[key], (node, node))
for key in properties:
if 'color' in properties[key]:
properties[key]['color'] = utils.color_from_colors(properties[key]['color'])
except ValueError: # invalid color
properties[key]['color'] = next(iter(properties[key]['color']))
if 'fillcolor' in properties[key]:
properties[key]['fillcolor'] = utils.color_from_colors(properties[key]['fillcolor'])
except ValueError: # invalid color
properties[key]['fillcolor'] = next(iter(properties[key]['fillcolor']))
for field in properties[key]:
if field not in {'color', 'fillcolor'}:
properties[key][field] = treat_texts(properties[key][field], key)
return VisualConfig(
arcs, properties, upper_annotations, lower_annotations,
global_properties, ranks
"""Core functions implementing the main behaviors.
Call example in main.
import os
import clyngor
import tempfile
from PIL import Image
from . import utils
from . import Script
from . import asp_to_dot
from . import dot_writer
from . import module_loader
EXT_TO_TYPE = utils.reverse_dict({
'Python': {'.py'},
'ASP': {'.lp'},
'json/ASP': {'.json'},
}, multiple_values=True, aggregator=lambda x: next(iter(x)))
LOADABLE = {'Python', 'ASP', 'json/ASP'}
def single_image_from_filenames(fnames:[str], outfile:str=None, dotfile:str=None, return_image:bool=True) -> Image or None:
pipeline = build_pipeline(fnames)
final_context = run(pipeline)
return compile_to_single_image(final_context, outfile=outfile, dotfile=dotfile, return_image=return_image)
def build_pipeline(fnames:[str]) -> [Script]:
"Yield scripts found in given filenames"
for fname in fnames:
ext = os.path.splitext(fname)[1]
ftype = EXT_TO_TYPE.get(ext, 'unknow type')
if ftype not in LOADABLE:
raise ValueError(f"The type '{ftype}' can't be loaded")
yield from module_loader.build_scripts_from_file(fname)
def run(scripts:[Script], initial_context:str='') -> str:
context = initial_context
for script in scripts:
if script.erase_context:
context = script.run_on(context)
context += '\n' + script.run_on(context)
return context
def compile_to_single_image(context:str, outfile:str=None, dotfile:str=None, return_image:bool=True) -> Image or None:
"Return a pillow.Image object, or write it to outfile if given"
configs = asp_to_dot.visual_config_from_asp(
dot = dot_writer.one_graph_from_configs(configs)
del_outfile = False
if outfile is None:
with tempfile.NamedTemporaryFile(delete=False) as fd:
outfile =
del_outfile = True
dot = dot_writer.dot_to_png(dot, outfile, dotfile=dotfile)
if return_image:
img =
if del_outfile:
return img
# encoding: utf8
"""Routines manipulating the dot.
import pydot
from .asp_to_dot import VisualConfig
def _dot_from_properties(properties:dict or None, prefix:str=' ') -> str:
"""Return a dot '[]' expression where given properties are represented.
If given properties are None, empty string will be output.
if properties:
content = ' '.join(
'{}="{}"'.format(field, value.replace('"', r'\"'))
for field, value in properties.items()
return prefix + '[' + content + ']'
return ''
def multiple_graphs_from_configs(visual_configs:[VisualConfig]) -> iter:
"""Yield lines of dot describing the given VisualConfig instances.
Produce one graph per VisualConfig.
See function counterpart, one_graph_from_configs.
for visual_config in visual_configs:
yield 'Digraph biseau_graph {\n'
yield from _from_config(visual_config)
yield '}\n\n\n'
def one_graph_from_configs(visual_configs:[VisualConfig]) -> iter:
"""Yield lines of dot describing the given VisualConfig instances.
Produce only one graph,
using the union of visual_configs to implement the view.
yield 'Digraph biseau_graph {\n'
for visual_config in visual_configs:
yield from _from_config(visual_config)
yield '}'
def _from_config(visual_config:VisualConfig) -> iter:
"""Yield lines of dot's graph describing the given VisualConfig instance.
arcs, properties, upper_annotations, lower_annotations, globals_props, ranks = visual_config
for object, props in globals_props.items():
yield '\t{}{};\n'.format(object, _dot_from_properties(props))
if 'node' not in globals_props:
yield '\tnode [shape=ellipse style=filled width=.25]\n'
if 'edge' not in globals_props:
yield '\tedge [arrowhead=none labeldistance=1.5 minlen=2]\n'
treated_nodes = set() # contains nodes already treated
for source, target in arcs:
for node in source, target:
if node not in treated_nodes:
node_dot_props = _dot_from_properties(properties.get(node))
if node_dot_props:
yield '\t{n}{d}\n'.format(n=node, d=node_dot_props)
if lower_annotations.get(node):
yield '\t{n} -> {n} {d}\n'.format(
n=node, d=_dot_from_properties(lower_annotations[node]))
if upper_annotations.get(node):
yield '\t{n} -> {n} {d}\n'.format(
n=node, d=_dot_from_properties(upper_annotations[node]))
yield '\t{}->{}{}\n'.format(source, target, _dot_from_properties(properties.get((source, target))))
# build ranks
for ranktype, nodes in ranks.items():
yield '\t{{rank={}; {}}}\n'.format(ranktype, ';'.join(nodes))
def dot_to_png(dot_lines:iter, outfile:str, dotfile:str=DEFAULT_DOT_FILE,
"""Write in outfile a png render of the graph described in given dot lines"""
dot_lines = ''.join(dot_lines)
if dotfile:
with open(dotfile, 'w') as fd:
graphs = iter(pydot.graph_from_dot_data(dot_lines))
for graph in graphs:
with open(outfile, 'wb') as fd:
fd.write(graph.create(prog=prog, format='png'))
# TODO: handle multiple graphs (see #10)
for graph in graphs:
print('WARNING: an additionnal graph as been found in the dot file.'
'It will be ignored.')
# encoding: utf8
"""Load and validate a python module received as a filename.
import os
import re
import glob
import json
import inspect
import textwrap
import importlib
import itertools
import traceback
from functools import partial
from collections import namedtuple, defaultdict
import clyngor
from biseau import utils
DEFAULT_DOC = 'NO SCRIPT DOC PROVIDED. Fix this by writing a module documentation inside script definition.'
RETURNS_TYPES = {iter, str}
OPTIONS_TYPES = {int, float, bool, str, list, open, partial}
TYPE_DEFAULT = {int: 0, float: 0., bool: False, str: '', list: (), open: None, partial: None}
REGEX_OPTION_DESC = re.compile(r'([a-zA-Z0-9_]+)\s*--\s*(.+)$')
class ScriptError(ValueError):
Script = namedtuple('Script', 'name, tags, description, module, run_on, options, input_mode, incompatible, active_by_default, spec_inputs, spec_outputs, inputs, outputs, source_view, disabled, erase_context')
# name -- human readable name
# tags -- set of tags identifying the script
# description -- human readable and high level description of the script
# module -- reference to the module itself
# run_on -- function in module to call on context
# options -- list of (name, type, default, description) describing each option
# input_mode -- define if run_on must receive the context or the resulting ASP models
# incompatible -- list of incompatibles modules
# active_by_default -- true if the script must be activated at start
# spec_inputs -- function in module to call to get the inputs knowing the parameters
# spec_outputs -- function in module to call to get the outputs knowing the parameters
# inputs -- function in module to call to get all possible inputs
# outputs -- function in module to call to get all possible outputs
# source_view -- None or human readable representation of module's source code
# disabled -- true if the script must be ignored
# erase_context -- true if the script erase the context (default: false, context is kept)
def gen_scripts_in_dir(dirname:str, extensions:[str]=('py', 'lp', 'json'),
filter_prefixes:[str]='_') -> (str, str):
yield from (
for fname in map(os.path.basename, glob.glob('{}/*.{{{}}}'.format(dirname, ','.join(extensions))))
if not fname.startswith(filter_prefixes)
def build_scripts_from_file(fname:str) -> [Script]:
name, ext = os.path.splitext(fname)
if ext == '.json':
yield from build_scripts_from_json_file(fname)
elif ext == '.py':
script = build_python_script_from_name(name)
if script.disabled:
if isinstance(script.disabled, str):
print('SCRIPT {} DISABLED:', script.disabled)
yield script
except ScriptError as err:
print('SCRIPT ERROR:', str(err))
elif ext == '.lp':
yield build_asp_script_from_name(fname)
def build_scripts_from_dir(base_dir:str='scripts') -> iter:
scripts = gen_scripts_in_dir(base_dir)
yield from map(build_scripts_from_file, scripts)
def merge_scripts_lists(*scripts_lists:iter) -> iter:
"""Yield scripts, ordered according to their dependancies"""
yield from sort_scripts_per_dependancies(itertools.chain.from_iterable(scripts_lists))
def sort_scripts_per_dependancies(scripts:iter) -> iter:
"""Topological sort of scripts based on their inputs/outputs.
Do not handle scripts interdependancies.
scripts = tuple(scripts)
inputs = {script: frozenset(script.inputs) for script in scripts}
outputs = {script: frozenset(script.outputs) for script in scripts}
yield from topological_sort_by_io(inputs, outputs)
def topological_sort_by_io(inputs:dict, outputs:dict) -> iter:
"""Yield keys of inputs and outputs so that a value yielded after another
is either in need of the previous's outputs, or unrelated.
inputs -- mapping {value: {input}}
outputs -- mapping {value: {output}}
# decide {pred: {succs}} for scripts
topology = defaultdict(set)
for script, input in inputs.items():
topology[script] # just ensure there is one
for maybe_pred, output in outputs.items():
if input & output:
successors = frozenset(itertools.chain.from_iterable(topology.values()))
sources = {script for script in topology if script not in successors}
# compute source, and decide a path
prev_len = None
while topology: # while catch cycles
while len(topology) != prev_len:
prev_len = len(topology)
yield from sources
topology = {script: {succ for succ in succs if succ not in sources}
for script, succs in topology.items()
if script not in sources}
successors = frozenset(itertools.chain.from_iterable(topology.values()))
sources = {script for script in topology if script not in successors}
if topology: # there is at least one cycle
# take a predecessor, say it is a source