Commit 1ee2389f authored by VIGNET Pierre's avatar VIGNET Pierre
Browse files

Move a lot of code related to model management to a separated module

parent baf74199
......@@ -68,7 +68,6 @@ import os
import glob
import csv
from urllib import quote as urllib_quote
from logging import DEBUG
# Remove matplotlib dependency
# It is used on demand during the drawing of a graph
try:
......@@ -77,227 +76,24 @@ except ImportError:
pass
# Library imports
from cadbiom.models.biosignal.translators.gt_visitors import compile_cond
from cadbiom.models.guard_transitions.analyser.ana_visitors import TableVisitor
from cadbiom.models.biosignal.sig_expr import *
from cadbiom.models.guard_transitions.analyser.ana_visitors import SigExpIdCollectVisitor
from cadbiom.models.guard_transitions.translators.chart_xml \
import MakeModelFromXmlFile
from cadbiom.models.guard_transitions.analyser.static_analysis import StaticAnalyzer
from tools.solutions import load_solutions, sol_digging
from tools.models import Reporter
from tools.models import get_transitions, get_transitions_from_file, get_places_data
from tools.models import get_transitions, \
get_transitions_from_model_file, \
get_frontier_places
from tools.models import get_places_data, parse_condition
import cadbiom.commons as cm
LOGGER = cm.logger()
def get_frontier_places(transitions, all_places):
"""Return frontier places of a model (deducted from its transitions and
from all places of the model).
.. note:: why we use all_places from the model instead of
(input_places - output_places) to get frontier places ?
Because some nodes are only in conditions and not in transitions.
If we don't do that, these nodes are missing when we compute
valid paths from conditions.
:param arg1: Model's transitions.
{u'h00': [('Ax', 'n1', {u'label': u'h00[]'}),]
:type arg1: <dict>
keys: names of events
values: list of transitions as tuples (with in/output, and label).
:return: Set of frontier places.
:rtype: <set>
"""
# Get transitions in events
g = tuple(trans for event in transitions.values() for trans in event)
# Get input nodes & output nodes
# input_places = {trans[0] for trans in g}
output_places = {trans[1] for trans in g}
# Get all places that are not in transitions in the "output" place
return set(all_places) - output_places
def rec(tree, inhibitors_nodes):
"""Recursive function to decompile conditions
:param tree:
:Example of tree argument:
.. code-block:: python
tree = ('H', 'v', (
('F', 'v', 'G'),
'^',
(
('A', 'v', 'B'),
'^',
('C', 'v', ('D', '^', 'E'))
)
))
"""
# print("TREE", tree, type(tree), dir(tree))
if isinstance(tree, str): # terminal node
path = [tree]
solutions = [path]
return solutions
if isinstance(tree, SigNotExpr):
LOGGER.debug("NOT OPERAND: {}, {}".\
format(
tree.operand,
type(tree.operand)
)
)
try:
current_inhibitors = get_places_from_condition(tree.operand.__str__())
inhibitors_nodes.update(current_inhibitors)
LOGGER.debug("INHIBITORS found: " + str(current_inhibitors))
path = [tree.operand.name]
solutions = [path]
return solutions
except AttributeError:
tree = tree.operand
if isinstance(tree, SigIdentExpr):
path = [tree.name]
solutions = [path]
return solutions
lch = tree.left_h
op = tree.operator
rch = tree.right_h
# print('OZCJSH:', lch, op, rch, sep='\t\t')
lpaths = rec(lch, inhibitors_nodes)
rpaths = rec(rch, inhibitors_nodes)
# print('VFUENK:', lpaths, rpaths)
if op == 'or': # or
# ret = [*lpaths, *rpaths]
ret = list(it.chain(lpaths, rpaths))
# print('RET:', ret)
return ret
else: # and
assert op == 'and'
# print(list(it.product(lpaths, rpaths)))
# raw_input('test')
ret = list(l + r for l, r in it.product(lpaths, rpaths))
# print('RET:', ret)
return ret
def get_places_from_condition(condition):
"""Parse condition string and return all places, regardless of operators.
.. note:: This function is only used to get all nodes in a condition when
we know they all are inhibitors nodes.
:param: Condition string.
:type: <str>
:return: Set of places.
:rtype: <set>
"""
replacement = ['and', 'or', 'not', '(', ')']
for chr in replacement:
condition = condition.replace(chr, ' ')
# Must be exempt of unauthorized chars
return {elem for elem in condition.split(' ')
if elem != ''}
def parse_condition(condition, all_nodes, inhibitors_nodes):
"""Return valid paths according the given logical formula and nodes.
"""
LOGGER.debug("CONDITION: " + condition)
# Error Reporter
err = Reporter()
tvi = TableVisitor(err)
# Link the lexer to the model allows to avoid error in Reporter
# like: "-> dec -> Undeclared event or state"
# In practice this is time consuming and useless for what we want to do
#parser = MakeModelFromXmlFile(BIO_MOLDELS_DIR + "Whole NCI-PID database translated into CADBIOM formalism(and).bcx")
#parser.get_model().accept(tvi)
symb_tab = tvi.tab_symb
# Get tree object from condition string
cond_sexpr = compile_cond(condition, symb_tab, err)
# Get all possible paths from the condition
possible_paths = rec(cond_sexpr, inhibitors_nodes)
# Prune possible paths according to:
# - Inhibitor nodes that must be removed because they will never
# be in the graph.
# - All nodes in transitions (ori -> ext) because we know all transitions
# in the graph, so we know which entities can be choosen to validate a path.
# - All frontier places, that are known entities that can be in conditions
# (not only in ori/ext) of transitions.
# So: authorized nodes = frontier_places + transition_nodes - inhibitor nodes
valid_paths = {tuple(path) for path in possible_paths
if (set(path) - inhibitors_nodes).issubset(all_nodes)}
# Debugging only
if LOGGER.getEffectiveLevel() == DEBUG:
LOGGER.debug("INHIBIT NODES: " + str(inhibitors_nodes))
LOGGER.debug("ALL NODES: " + str(all_nodes))
LOGGER.debug("POSSIBLE PATHS: " + str(possible_paths))
LOGGER.debug("VALID PATHS: " + str(valid_paths))
for path in possible_paths:
pruned_places = set(path) - inhibitors_nodes
isinsubset = pruned_places.issubset(all_nodes)
LOGGER.debug(
"PRUNED PATH: {}, VALID: {}".format(
pruned_places,
isinsubset
)
)
assert len(valid_paths) > 0, "No valid path for: " + str(condition)
if len(valid_paths) > 1:
LOGGER.warning("Multiple valid paths for: {}:\n{}".format(condition,
valid_paths))
return valid_paths
# condition expressions contains only node ident
icv = SigExpIdCollectVisitor()
lst1 = cond_sexpr.accept(icv)
print(cond_sexpr)
print(type(cond_sexpr))
print(dir(cond_sexpr))
print("LISTE", lst1)
# <class 'cadbiom.models.biosignal.sig_expr.SigSyncBinExpr'>
# 'accept', 'get_signals', 'get_ultimate_signals', 'is_bot', 'is_clock',
# 'is_const', 'is_const_false', 'is_ident', 'left_h', 'operator', 'right_h', 'test_equal']
print(cond_sexpr.get_signals())
# print(cond_sexpr.get_ultimate_signals())
print("LEFT", cond_sexpr.left_h)
print("OPERATOR", cond_sexpr.operator)
print("RIGHT", cond_sexpr.right_h)
# ret = treeToTab(cond_sexpr)
# [set([('((formule', True)])]
# print("treeToTab", ret)
# print(type(ret))
# print(dir(ret))
def build_graph(solution, steps, transitions):
"""Build a graph for the given solution.
......@@ -764,7 +560,7 @@ def sol_digging_main(output_dir, model_file, solution_path, conditions=True):
json.dump(decomp_solutions, f_d, sort_keys=True, indent=4)
# Get transitions from the model
model_transitions = get_transitions_from_file(model_file)
model_transitions = get_transitions_from_model_file(model_file)
if os.path.isfile(solution_path):
# The given path is a solution file
......@@ -806,7 +602,7 @@ def parse_trajectories_main(output_dir, model_file, solution_file):
process_solutions(
output_dir,
load_solutions(solution_file),
get_transitions_from_file(model_file)
get_transitions_from_model_file(model_file)
)
......@@ -1269,7 +1065,7 @@ if __name__ == "__main__":
exit()
process_solutions(load_solutions(LOG_DIR + "../run/Whole NCI-PID database translated into CADBIOM formalism(and)_SRP9_cam_complete.txt"),
get_transitions_from_file(BIO_MOLDELS_DIR + "Whole NCI-PID database translated into CADBIOM formalism(and).bcx"))
get_transitions_from_model_file(BIO_MOLDELS_DIR + "Whole NCI-PID database translated into CADBIOM formalism(and).bcx"))
exit()
......@@ -1289,17 +1085,17 @@ if __name__ == "__main__":
# exit()
sol_digging(load_solutions(LOG_DIR + "../run/pid_and_clock_no_perm_p21corrected_start_SRP9_complete.txt"),
get_transitions_from_file(BIO_MOLDELS_DIR + "Whole NCI-PID database translated into CADBIOM formalism(and).bcx"))
get_transitions_from_model_file(BIO_MOLDELS_DIR + "Whole NCI-PID database translated into CADBIOM formalism(and).bcx"))
exit()
sol_digging(load_solutions(LOG_DIR + "sols_new_solver.txt"),
get_transitions_from_file(BIO_MOLDELS_DIR + "mini_test_publi.bcx"))
get_transitions_from_model_file(BIO_MOLDELS_DIR + "mini_test_publi.bcx"))
exit()
process_solutions(load_solutions(LOG_DIR + "sols_new_solver.txt"),
get_transitions_from_file(BIO_MOLDELS_DIR + "mini_test_publi.bcx"))
get_transitions_from_model_file(BIO_MOLDELS_DIR + "mini_test_publi.bcx"))
exit()
# build_graph(load_solutions(LOG_DIR + "../run/pid_and_clock_SRP9_cam_complete_o.txt"),
# get_transitions_from_file(BIO_MOLDELS_DIR + "pid_and_clock.bcx"))
# get_transitions_from_model_file(BIO_MOLDELS_DIR + "pid_and_clock.bcx"))
process_solutions(load_solutions(LOG_DIR + "../run/pid_and_clock_SRP9_cam_complete.txt"),
get_transitions_from_file(BIO_MOLDELS_DIR + "pid_and_clock.bcx"))
get_transitions_from_model_file(BIO_MOLDELS_DIR + "pid_and_clock.bcx"))
......@@ -21,6 +21,14 @@
# Dyliss team
# IRISA Campus de Beaulieu
# 35042 RENNES Cedex, FRANCE
"""
This module groups functions directly related to the management and the
extraction of data of a Cadbiom model.
Here we find high-level functions to manage the logical formulas of the events
and conditions defining the transitions; as well as useful functions to manage
the entities, like to obtain their metadata or the frontier places of the model.
"""
from __future__ import unicode_literals
from __future__ import print_function
......@@ -28,12 +36,13 @@ from __future__ import print_function
from collections import defaultdict
import re
import json
import itertools as it
from logging import DEBUG
# Library imports
from cadbiom.models.guard_transitions.translators.chart_xml \
import MakeModelFromXmlFile
from cadbiom.models.biosignal.translators.gt_visitors import compile_event
from cadbiom.models.biosignal.translators.gt_visitors import compile_event, compile_cond
from cadbiom.models.biosignal.sig_expr import *
from cadbiom.models.guard_transitions.analyser.ana_visitors import TableVisitor
......@@ -62,7 +71,7 @@ class Reporter(object):
LOGGER.debug("\t" + self.mess + " -> " + e)
def get_transitions_from_file(model_file):
def get_transitions_from_model_file(model_file):
"""Get all transitions from a model file (bcx format).
:param: bcx file.
......@@ -172,6 +181,60 @@ def get_transitions(parser):
return dict(transitions)
def get_frontier_places(transitions, all_places):
"""Return frontier places of a model (deducted from its transitions and
from all places of the model).
.. note:: why we use all_places from the model instead of
(input_places - output_places) to get frontier places ?
Because some nodes are only in conditions and not in transitions.
If we don't do that, these nodes are missing when we compute
valid paths from conditions.
:param arg1: Model's transitions.
{u'h00': [('Ax', 'n1', {u'label': u'h00[]'}),]
:type arg1: <dict>
keys: names of events
values: list of transitions as tuples (with in/output, and label).
:return: Set of frontier places.
:rtype: <set>
"""
# Get transitions in events
g = tuple(trans for event in transitions.values() for trans in event)
# Get input nodes & output nodes
# input_places = {trans[0] for trans in g}
output_places = {trans[1] for trans in g}
# Get all places that are not in transitions in the "output" place
return set(all_places) - output_places
################################################################################
def get_places_from_condition(condition):
"""Parse condition string and return all places, regardless of operators.
.. note:: This function is only used to get all nodes in a condition when
we know they all are inhibitors nodes.
:param: Condition string.
:type: <str>
:return: Set of places.
:rtype: <set>
"""
replacement = ['and', 'or', 'not', '(', ')']
for chr in replacement:
condition = condition.replace(chr, ' ')
# Must be exempt of unauthorized chars
return {elem for elem in condition.split(' ')
if elem != ''}
def parse_event(event):
"""Decompile logical formula in event's name.
......@@ -240,6 +303,162 @@ def parse_event(event):
return eventToCondStr
def parse_condition(condition, all_nodes, inhibitors_nodes):
"""Return valid paths according the given logical formula and nodes.
"""
LOGGER.debug("CONDITION: " + condition)
# Error Reporter
err = Reporter()
tvi = TableVisitor(err)
# Link the lexer to the model allows to avoid error in Reporter
# like: "-> dec -> Undeclared event or state"
# In practice this is time consuming and useless for what we want to do
#parser = MakeModelFromXmlFile(BIO_MOLDELS_DIR + "Whole NCI-PID database translated into CADBIOM formalism(and).bcx")
#parser.get_model().accept(tvi)
symb_tab = tvi.tab_symb
# Get tree object from condition string
cond_sexpr = compile_cond(condition, symb_tab, err)
# Get all possible paths from the condition
possible_paths = rec(cond_sexpr, inhibitors_nodes)
# Prune possible paths according to:
# - Inhibitor nodes that must be removed because they will never
# be in the graph.
# - All nodes in transitions (ori -> ext) because we know all transitions
# in the graph, so we know which entities can be choosen to validate a path.
# - All frontier places, that are known entities that can be in conditions
# (not only in ori/ext) of transitions.
# So: authorized nodes = frontier_places + transition_nodes - inhibitor nodes
valid_paths = {tuple(path) for path in possible_paths
if (set(path) - inhibitors_nodes).issubset(all_nodes)}
# Debugging only
if LOGGER.getEffectiveLevel() == DEBUG:
LOGGER.debug("INHIBIT NODES: " + str(inhibitors_nodes))
LOGGER.debug("ALL NODES: " + str(all_nodes))
LOGGER.debug("POSSIBLE PATHS: " + str(possible_paths))
LOGGER.debug("VALID PATHS: " + str(valid_paths))
for path in possible_paths:
pruned_places = set(path) - inhibitors_nodes
isinsubset = pruned_places.issubset(all_nodes)
LOGGER.debug(
"PRUNED PATH: {}, VALID: {}".format(
pruned_places,
isinsubset
)
)
assert len(valid_paths) > 0, "No valid path for: " + str(condition)
if len(valid_paths) > 1:
LOGGER.warning("Multiple valid paths for: {}:\n{}".format(condition,
valid_paths))
return valid_paths
from cadbiom.models.guard_transitions.analyser.ana_visitors import SigExpIdCollectVisitor
# condition expressions contains only node ident
icv = SigExpIdCollectVisitor()
lst1 = cond_sexpr.accept(icv)
print(cond_sexpr)
print(type(cond_sexpr))
print(dir(cond_sexpr))
print("LISTE", lst1)
# <class 'cadbiom.models.biosignal.sig_expr.SigSyncBinExpr'>
# 'accept', 'get_signals', 'get_ultimate_signals', 'is_bot', 'is_clock',
# 'is_const', 'is_const_false', 'is_ident', 'left_h', 'operator', 'right_h', 'test_equal']
print(cond_sexpr.get_signals())
# print(cond_sexpr.get_ultimate_signals())
print("LEFT", cond_sexpr.left_h)
print("OPERATOR", cond_sexpr.operator)
print("RIGHT", cond_sexpr.right_h)
# ret = treeToTab(cond_sexpr)
# [set([('((formule', True)])]
# print("treeToTab", ret)
# print(type(ret))
# print(dir(ret))
def rec(tree, inhibitors_nodes):
"""Recursive function to decompile conditions
:param tree:
:Example of tree argument:
.. code-block:: python
tree = ('H', 'v', (
('F', 'v', 'G'),
'^',
(
('A', 'v', 'B'),
'^',
('C', 'v', ('D', '^', 'E'))
)
))
"""
# print("TREE", tree, type(tree), dir(tree))
if isinstance(tree, str): # terminal node
path = [tree]
solutions = [path]
return solutions
if isinstance(tree, SigNotExpr):
LOGGER.debug("NOT OPERAND: {}, {}".\
format(
tree.operand,
type(tree.operand)
)
)
try:
current_inhibitors = get_places_from_condition(tree.operand.__str__())
inhibitors_nodes.update(current_inhibitors)
LOGGER.debug("INHIBITORS found: " + str(current_inhibitors))
path = [tree.operand.name]
solutions = [path]
return solutions
except AttributeError:
tree = tree.operand
if isinstance(tree, SigIdentExpr):
path = [tree.name]
solutions = [path]
return solutions
lch = tree.left_h
op = tree.operator
rch = tree.right_h
# print('OZCJSH:', lch, op, rch, sep='\t\t')
lpaths = rec(lch, inhibitors_nodes)
rpaths = rec(rch, inhibitors_nodes)
# print('VFUENK:', lpaths, rpaths)
if op == 'or': # or
# ret = [*lpaths, *rpaths]
ret = list(it.chain(lpaths, rpaths))
# print('RET:', ret)
return ret
else: # and
assert op == 'and'
# print(list(it.product(lpaths, rpaths)))
# raw_input('test')
ret = list(l + r for l, r in it.product(lpaths, rpaths))
# print('RET:', ret)
return ret
################################################################################
def get_places_data(places, model):
"""Get a list of JSON data parsed from each given places in the model.
......
......@@ -31,7 +31,7 @@ def feed_conditions():
def test_rec_tree(feed_conditions):
from cadbiom_cmd.solution_repr import rec, Reporter
from cadbiom_cmd.tools.models import rec, Reporter
# Error Reporter
err = Reporter()
......@@ -56,7 +56,7 @@ def test_rec_tree(feed_conditions):
def test_parse_condition():
from cadbiom_cmd.solution_repr import parse_condition
from cadbiom_cmd.tools.models import parse_condition
condition = "((((((STAT4__dimer___JUN_Cbp_active_nucl)or(JUN_FOSNFAT1_c_4_active_nucl))or((STAT5__dimer___active_active_nucl and Elf1)))or(IL2_IL2R_active_intToMb))or(JUN_FOSNFAT1_c_4_active_nucl))or(NFAT1_FOXP3))or(JUN_FOSNFAT1_c_4_active_nucl)"
inhibitors = set()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment