Commit 9d0d1456 authored by VIGNET Pierre's avatar VIGNET Pierre
Browse files

First graph representation of trajectories

parent 94384154
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from __future__ import print_function
import datetime as dt
from collections import defaultdict
import networkx as nx
import itertools as it
import matplotlib.pyplot as plt
from cadbiom.models.guard_transitions.translators.chart_xml \
import MakeModelFromXmlFile
LOG_DIR = "./logs/"
BIO_MOLDELS_DIR = "./bio_models/"
GRAPHS_DIR = "./graphs/"
"""
Bx Ax
% h2 h00
% h3
% h0 h1
% hlast
Bx Ax
% h2
% h3 h00
% h0 h1
%
% hlast
Bx Ax
% h2
% h3 h00
% h0 h1
% hlast
%
%
Bx Ax
% h2 h00
% h3
% h0 h1
% hlast
%
%
%
"""
# 'not' = ' not' '(not' '"not'
def load_solutions(file):
"""Open a file with many solution/MACs.
:param: File name
:type: <str>
:return:A tuple of "frontier places" and a list of events in each step.
("Bx Ax", [[u'h2', u'h00'], [u'h3'], [u'h0', u'h1'], [u'hlast']])
:rtype: <tuple <str>, <list>>
"""
sol_steps = defaultdict(list)
sol = ""
with open(file, 'r') as fd:
for line in fd:
print("line:", line)
# Remove possible \t separator from first line (frontier solution)
line = line.rstrip('\n').rstrip('\t').replace('\t', ' ')
# TODO: remove last space ' ' => beware, may be informative...
# (start transitions have no event names: ori="__start__0")
line = line.rstrip(' ')
if line == '' or line[0] == '=':
# Skip blank lines and solution separator in some files (=====)
print('VIDE')
continue
elif line[0] != '%':
if sol == line:
# Same frontier places
yield sol, sol_steps[sol]
# reinit sol
sol_steps[sol] = list()
continue
elif sol == '':
# First sol
sol = line
else:
# Yield previous sol
yield sol, sol_steps[sol]
sol = line
elif line[0] == '%':
# Remove step with only "% "
step = line.lstrip('% ')
if step != '':
sol_steps[sol].append(step.split(' '))
# Yield last sol
yield sol, sol_steps[sol]
def get_transitions(file):
"""Get all transitions in a file model (bcx format).
:param: Model in bcx format.
:type: <str>
:return: A dictionnary of events as keys, and transitions as values.
Since many transitions can define an event, values are lists.
Each transition is a tuple with: origin node, final node, attributes
like label and condition.
{u'h00': [('Ax', 'n1', {u'label': u'h00[]'}),]
:rtype: <dict <list <tuple <str>, <str>, <dict <str>: <str>>>>
"""
parser = MakeModelFromXmlFile(file)
g = (trans for transition in parser.handler.top_pile.transitions
for trans in transition)
transitions = defaultdict(list)
for trans in g:
print("NEW trans", trans.event)
# Handle multiple transitions for 1 event
transitions[trans.event].append(
(
trans.ori.name, trans.ext.name,
{
'label': trans.event + '[' + trans.condition + ']',
'condition': trans.condition,
}
)
)
#print(transitions)
return transitions
def parse_condition(condition):
"""Parse condition string.
"""
replacement = ['and', 'or', 'not', '(', ')']
for chr in replacement:
condition = condition.replace(chr, ' ')
# Must be exempt of unauthorized chars
print("CONDITION", condition)
return [elem for elem in condition.split(' ')
if elem != '']
def build_graph(i, sol, steps, transitions):
"""Build a graph for the given solution.
- Get & make all needed edges
- Draw graph
- Save graph in svg/graphml
:param arg1: Frontier places.
:param arg2: List of steps (with events in each step).
:param arg3: A dictionnary of events as keys, and transitions as values
(see get_transitions()).
:type arg1: <str>
:type arg2: <list>
:type arg3: <dict <list <tuple <str>, <str>, <dict <str>: <str>>>>
"""
def filter_transitions(step_event):
""" Insert a transittion in a transition event if there is a condition.
=> Insert a node in a edge.
=> Link all nodes involved in the condition with this new node.
:param: A list of events (from a step in a solution).
[('Ax', 'n1', {u'label': u'h00[]'}),]
:type: <tuple>
:return: Fill lists of edges:
edges_with_cond: if transition has a condition
transition_nodes: add new nodes corresponding to transitions with
conditions.
edges_in_cond: Add all edges to nodes linked via the condition.
edges: transition untouched if there is no condition.
:rtype: None
"""
for trans in step_event:
attributes = trans[2]
ori = trans[0]
ext = trans[1]
event = attributes['label'].split('[')[0]
if attributes['condition'] != '':
# Add the transition as node
transition_nodes.append(
(
event,
{
'label': attributes['label'],
'name': attributes['label'],
'color': 'blue',
}
)
)
# Origin => transition node
edges_with_cond.append(
(
ori, event,
{
'label': ori + '-' + event,
}
)
)
# Transition node => ext
edges_with_cond.append(
(
event, ext,
{
'label': event + '-' + ext,
}
)
)
# Add all transitions to nodes linked via the condition
# TODO: detect inhibition/activation
nodes_in_condition = parse_condition(attributes['condition'])
for node in nodes_in_condition:
edges_in_cond.append(
(
node, event,
{
'label': node + '-' + event,
'color': 'blue',
}
)
)
else:
# Normal edges
edges.append(trans)
# return trans
print("BUILD GRAPH FOR SOL:", sol)
print("STEPS:", steps)
frontier_places = sol.split(' ')
edges_with_cond = list()
edges_in_cond = list()
transition_nodes = list()
edges = list()
# Get & make all needed edges
[filter_transitions(transitions[step_event]) for step_event in it.chain(*steps)]
for step_event in it.chain(*steps):
event = transitions[step_event]
# print(step_event)
if len(event) == 0:
print("ERROR for event: " + str(event) + '\n')
# TODO: handle this
# Add the transition as node
transition_nodes.append(
(
"ERROR: " + str(step_event),
{
'label': step_event,
'name': step_event,
'color': 'yellow',
}
)
)
else:
filter_transitions(event)
# edges = [edge for edge in g if edge is not None]
print("edges without cond", edges)
print("edges with cond", edges_with_cond)
print("transition nodes added", transition_nodes)
# Make Graph ###############################################################
G = nx.DiGraph()
# Add fontier places
G.add_nodes_from(frontier_places, node_color='red')
# Add all nodes
G.add_nodes_from(transition_nodes, color='blue')
# Node attribute ?
# print(G.node['h1'])
# Add all edges
G.add_edges_from(edges)
G.add_edges_from(edges_with_cond)
G.add_edges_from(edges_in_cond)
unzip = lambda l: list(zip(*l))
# Get a list of nodes (without dictionnary of attributes)
unziped_transition_nodes = unzip(transition_nodes)[0]
def color_map(node):
# print("color for:", node)
if node in frontier_places:
return 'red'
if node in unziped_transition_nodes:
return 'blue'
else:
return 'white'
# Drawing ##################################################################
# draw_circular(G, **kwargs) On a circle.
# draw_random(G, **kwargs) Uniformly at random in the unit square.
# draw_spectral(G, **kwargs) Eigenvectors of the graph Laplacian.
# draw_spring(G, **kwargs) Fruchterman-Reingold force-directed algorithm.
# draw_shell(G, **kwargs) Concentric circles.
# draw_graphviz(G[, prog]) Draw networkx graph with graphviz layout.
pos = nx.circular_layout(G)
# Legend of conditions in transition nodes
f = plt.figure(1)
ax = f.add_subplot(1,1,1)
text = '\n'.join(
node_dict['label'] for node_dict in unzip(transition_nodes)[1]
)
ax.text(0, 0, text, style='italic', fontsize=10,
bbox={'facecolor':'white', 'alpha':0.5, 'pad':10})
# Draw nodes:
# - red: frontier places (in solution variable),
# - white: middle edges,
# - blue: transition edges
colors = [color_map(node) for node in G.nodes_iter()]
nx.draw(G, pos=pos, with_labels=True,
node_color=colors, node_size=1000, alpha=0.5, ax=ax)
# Draw edges involved in transitions with conditions
nx.draw_networkx_edges(G, pos, edgelist=edges_in_cond,
edge_color='b', width=2, alpha=0.5)
# Draw labels for normal transitions (move pos to the end of the arrow)
# ex: [('Ax', 'n1', {u'condition': u'', u'label': u'h00[]'}),]
edges_labels = {(edge[0], edge[1]): edge[2]['label'] for edge in edges}
nx.draw_networkx_edge_labels(G, pos, edges_labels, label_pos=0.3)
# Save & show
date = dt.datetime.now().strftime("%H-%M-%S-%f")
plt.legend()
# plt.savefig(GRAPHS_DIR + date + '_' + sol[:75] + ".svg", format="svg")
# plt.show()
nx.write_graphml(G, GRAPHS_DIR + date + '_' + str(i) +'_' + sol[:75] + ".graphml")
#nx.write_gml(G, date + '_' + sol + ".gml")
def process_solutions(sol_steps, transitions):
"""Build a graph per solution"""
for i, (sol, steps) in enumerate(sol_steps):
build_graph(i, sol, steps, transitions)
def test_main():
"""Test"""
# chart_model.py
# chart_xml.py
parser = MakeModelFromXmlFile(BIO_MOLDELS_DIR + "mini_test_publi.bcx")
print(type(parser.parser))
print(dir(parser))
print("HANDLER")
print(dir(parser.handler))
print(dir(parser.parser))
print(dir(parser.get_model()))
print("ICI")
# print(parser.get_model().get_simple_node())
print(parser.handler.node_dict)
print(parser.handler.top_pile)
print(parser.handler.pile_dict)
print(parser.handler.transition.event)
print(type(parser.handler.top_pile))
transitions = dict()
for transition in parser.handler.top_pile.transitions:
# print(type(transition)) => list
for trans in transition:
# 'action', 'activated', 'clean', 'clock', 'condition', 'event',
# 'ext', 'ext_coord', 'fact_ids', 'get_influencing_places',
# 'get_key', 'is_me', 'macro_node', 'name', 'note', 'ori',
# 'ori_coord', 'remove', 'search_mark', 'selected', 'set_action',
# 'set_condition', 'set_event', 'set_name', 'set_note'
# {'name': '', 'clock': None, 'selected': False, 'activated': False,
# 'search_mark': False, 'note': '', 'ext': <cadbiom.models.guard_transitions.chart_model.CSimpleNode object at 0x7f391c7406d0>,
# 'ext_coord': 0.0, 'ori': <cadbiom.models.guard_transitions.chart_model.CSimpleNode object at 0x7f391c740650>,
# 'action': u'', 'macro_node': <cadbiom.models.guard_transitions.chart_model.CTopNode object at 0x7f391c7db490>,
# 'ori_coord': 0.0, 'event': u'h5', 'condition': u'', 'fact_ids': []}
# print(dir(trans))
print("NEW trans", trans.event)
# print(trans.__dict__)
# print(trans.name, trans.clock, trans.selected, trans.activated,
# trans.search_mark, trans.note, trans.ext, trans.ext_coord,
# trans.ori, trans.action, trans.macro_node, trans.ori_coord,
# trans.event, trans.condition, trans.fact_ids
# )
transitions[trans.event] = trans.condition
#print("ORI", trans.ori.__dict__)
#{'name': 'n4', 'yloc': 0.906099768906, 'selected': False,
#'father': <cadbiom.models.guard_transitions.chart_model.CTopNode object at 0x7f09eed91490>,
#'xloc': 0.292715433748, 'search_mark': False, 'was_activated': False,
#'incoming_trans': [<cadbiom.models.guard_transitions.chart_model.CTransition object at 0x7f09eecf67d0>],
#'model': <cadbiom.models.guard_transitions.chart_model.ChartModel object at 0x7f09ef2cf3d0>,
#'outgoing_trans': [<cadbiom.models.guard_transitions.chart_model.CTransition object at 0x7f09eecf6850>],
#'activated': False, 'hloc': 1.0}
print("ORI", trans.ori.name)
try:
print("ori INCO", [(tr.event, tr.condition) for tr in trans.ori.incoming_trans])
except: pass
try:
print("ori OUTGO", [(tr.event, tr.condition) for tr in trans.ori.outgoing_trans])
except: pass
print("EXT", trans.ext.name)
try:
print("ext INCO", [(tr.event, tr.condition) for tr in trans.ext.incoming_trans])
except: pass
try:
print("ext OUTGO", [(tr.event, tr.condition) for tr in trans.ext.outgoing_trans])
except: pass
print(transitions)
def sol_repr(sol_steps, transitions):
"""
"""
def write_transition_def(step_event):
"""Write each event on a new line"""
# Many transitions per event (ex: complex dissociation)
for trans in step_event:
# ori="JUN_nucl_gene" ext="JUN_nucl" event="_h_391"
line = 'ori="{}" ext="{}" event="{}" condition="{}"'.format(
trans[0],
trans[1],
trans[2]['label'].split('[')[0],
trans[2]['condition']
)
fd.write(line + '\n')
with open("output.txt", "w") as fd:
for sol, steps in sol_steps:
# Solution header
fd.write(sol + '\n')
for step in steps:
for event in step:
step_event = transitions[event]
# print(step_event)
if len(step_event) == 0:
fd.write("ERROR for event: " + event + '\n')
else:
write_transition_def(transitions[event])
# Step separation
fd.write('\n')
# Sol separation
fd.write('\n====================================================\n')
def main(model_file, solution_file):
"""
"""
process_solutions(
load_solutions(solution_file),
get_transitions(model_file)
)
if __name__ == "__main__":
process_solutions(load_solutions(LOG_DIR + "../run/Whole NCI-PID database translated into CADBIOM formalism(and)_SRP9_cam_complete.txt"),
get_transitions(BIO_MOLDELS_DIR + "Whole NCI-PID database translated into CADBIOM formalism(and).bcx"))
exit()
#sort_solutions("/media/DATA/Projets/dyliss_tgf/cadbiom/run/Whole NCI-PID database translated into CADBIOM formalism(and)_SRP9_cam_complete.txt")
#sort_solutions("/media/DATA/Projets/dyliss_tgf/cadbiom/run/Whole NCI-PID database translated into CADBIOM formalism(and)_SRP9_cam.txt")
#sort_solutions("/media/DATA/Projets/dyliss_tgf/cadbiom/run/pid_and_clock_no_perm_p21corrected_start_SRP9_complete.txt")
#exit()
# cond = "((((not(ATF2_JUND_macroH2A_nucl))or(Fra1_JUND_active_nucl))or(Fra1_JUN_active_nucl))or(TCF4_betacatenin_active_nucl))or(JUN_FOS_active_nucl)"
# ret = parse_condition(cond)
# print(ret)
# exit()
# sol_steps = load_solutions(LOG_DIR + "../run/pid_and_clock_SRP9_cam_complete_o.txt")
# g = [sol_step for sol_step in sol_steps]
# print(g)
# exit()
sol_repr(load_solutions(LOG_DIR + "../run/pid_and_clock_no_perm_p21corrected_start_SRP9_complete.txt"),
get_transitions(BIO_MOLDELS_DIR + "Whole NCI-PID database translated into CADBIOM formalism(and).bcx"))
exit()
sol_repr(load_solutions(LOG_DIR + "sols_new_solver.txt"),
get_transitions(BIO_MOLDELS_DIR + "mini_test_publi.bcx"))
exit()
process_solutions(load_solutions(LOG_DIR + "sols_new_solver.txt"),
get_transitions(BIO_MOLDELS_DIR + "mini_test_publi.bcx"))
exit()
# build_graph(load_solutions(LOG_DIR + "../run/pid_and_clock_SRP9_cam_complete_o.txt"),
# get_transitions(BIO_MOLDELS_DIR + "pid_and_clock.bcx"))
process_solutions(load_solutions(LOG_DIR + "../run/pid_and_clock_SRP9_cam_complete.txt"),
get_transitions(BIO_MOLDELS_DIR + "pid_and_clock.bcx"))
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment