# -*- coding: utf-8 -*-
# Copyright (C) 2017 IRISA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
# The original code contained here was initially developed by:
#
# Pierre Vignet.
# IRISA
# Dyliss team
# IRISA Campus de Beaulieu
# 35042 RENNES Cedex, FRANCE
"""Handle generated files
This module provides some functions to do some analyzis on the output
files of Cadbiom.
Entry points:
* :meth:`~cadbiom_cmd.solution_sort.solutions_2_json`
* :meth:`~cadbiom_cmd.solution_sort.solutions_2_graphs`
* :meth:`~cadbiom_cmd.solution_sort.solutions_2_common_graph`
:Example of the content of a complete solution file:
.. code::
Bx Ax
% h2 h00
% h3
% h0 h1
% hlast
Bx Ax
% h2
% h3 h00
% h0 h1
%
% hlast
Bx Ax
% h2
% h3 h00
% h0 h1
% hlast
%
%
Bx Ax
% h2 h00
% h3
% h0 h1
% hlast
%
%
%
"""
from __future__ import unicode_literals
from __future__ import print_function
# Standard imports
from collections import defaultdict, Counter
import itertools as it
import json
import os
import glob
import csv
import networkx as nx
# Library imports
from tools.solutions import get_solutions
from tools.models import get_transitions_from_model_file
from tools.solutions import load_solutions, convert_solutions_to_json, \
get_query_from_filename, get_mac_lines
from tools.graphs import export_graph, build_graph
import cadbiom.commons as cm
LOGGER = cm.logger()
## Sort functions ##############################################################
def sort_solutions_in_file(filepath):
"""Sort all solutions in the given file in alphabetical order.
.. warning:: The file is modified in place.
:param: Filepath to be opened and in which solutions will be sorted.
:arg:
"""
solutions = dict()
with open(filepath, 'r+') as f_d:
# Get old line as key and ordered line as value
for line, stripped_line in get_solutions(f_d):
# Sort in lower case, remove ' ' empty elements
solutions[line] = \
" ".join(sorted([place for place in stripped_line.split(' ')
if place != ' '], key=lambda s: s.lower()))
# Rewind the whole file
f_d.seek(0)
# Load all the content
file_text = f_d.read()
# Replace old sols with the new ones
for original_sol, sorted_sol in solutions.items():
file_text = file_text.replace(original_sol, sorted_sol)
# Rewind the whole file
f_d.seek(0)
# Write all text in the current opened file
f_d.write(file_text)
def solutions_sort(path):
"""Entry point for sorting solutions.
Read a solution(s) file(s) (\*mac\* files) and sort all
frontier places/boundaries in alphabetical order.
This functions tests if the given path is a directory or a file.
.. warning:: The files will be modified in place.
:param: Filepath or directory path containing Cadbiom solutions.
:type:
"""
# Check valid input file/directory
assert os.path.isfile(path) or os.path.isdir(path)
if os.path.isdir(path):
# Recursive search of *mac* files
# (mac.txt, mac_complete.txt, mac_step.txt)
path = path if path[-1] == '/' else path + '/'
[sort_solutions_in_file(file) for file in glob.glob(path + '*mac*')]
else:
sort_solutions_in_file(path)
## Conversion functions ########################################################
def solutions_2_json(output_dir, model_file, path, conditions=True):
"""Entry point for solutions_2_json
Create a JSON formated file containing all data from complete MAC files
(\*mac_complete files). The file will contain frontier places/boundaries
and decompiled steps with their respective events for each solution.
This is a function to quickly search all transition attributes involved
in a solution.
This functions tests if the given path is a directory or a file.
:param output_dir: Output path.
:param model_file: Filepath of the model.
:param path: Filepath/directory of a complete solution file.
:param conditions: (Optional) If False, conditions of transitions will not
be present in the JSON file. This allows to have only places/entities
used inside trajectories; thus, inhibitors are avoided.
:type output_dir:
:type model_file:
:type path:
:type conditions:
"""
def write_json_file(file_path, decomp_solutions):
"""Write decompiled solutions to a JSON formated file"""
# Add _decomp to the solution filename
filename = os.path.basename(os.path.splitext(file_path)[0])
with open(output_dir + filename + '_decomp.json', 'w') as f_d:
json.dump(decomp_solutions, f_d, sort_keys=True, indent=2)
# Check valid input file/directory
assert os.path.isfile(path) or os.path.isdir(path)
# Get transitions from the model
model_transitions = get_transitions_from_model_file(model_file)
if os.path.isfile(path):
# The given path is a solution file
decomp_solutions = convert_solutions_to_json(
load_solutions(path),
model_transitions,
conditions=conditions,
)
write_json_file(path, decomp_solutions)
elif os.path.isdir(path):
# The given path is a directory
path = path if path[-1] == '/' else path + '/'
# Decompilation of all files in the directory
file_number = 0
for file_number, solution_file in \
enumerate(glob.glob(path + '*mac_complete.txt'), 1):
decomp_solutions = convert_solutions_to_json(
load_solutions(solution_file),
model_transitions,
conditions=conditions,
)
write_json_file(solution_file, decomp_solutions)
LOGGER.info("Files processed: %s", file_number)
assert file_number != 0, "No *mac_complete.txt files found!"
def solutions_2_graphs(output_dir, model_file, path):
"""Entry point for solutions_2_graphs
Create GraphML formated files containing a representation of the
trajectories for each solution in complete MAC files (\*mac_complete files).
This is a function to visualize paths taken by the solver from the boundaries
to the entities of interest.
This functions tests if the given path is a directory or a file.
:param output_dir: Output path.
:param model_file: Filepath of the model.
:param path: Filepath/directory of a/many complete solutions files.
:type output_dir:
:type model_file:
:type path:
"""
# Check valid input file/directory
assert os.path.isfile(path) or os.path.isdir(path)
# Get transitions from the model
model_transitions = get_transitions_from_model_file(model_file)
if os.path.isfile(path):
# The given path is a solution file
convert_solution_file_to_graphs(
output_dir,
load_solutions(path),
model_transitions
)
elif os.path.isdir(path):
# The given path is a directory
path = path if path[-1] == '/' else path + '/'
# Decompilation of all files in the directory
file_number = 0
for file_number, solution_file in \
enumerate(glob.glob(path + '*mac_complete.txt'), 1):
convert_solution_file_to_graphs(
output_dir,
load_solutions(solution_file),
model_transitions
)
LOGGER.info("Files processed: %s", file_number)
assert file_number != 0, "No *mac_complete.txt files found!"
def convert_solution_file_to_graphs(output_dir, sol_steps, transitions):
"""Build and write graphs based on the given solutions
Each solution is composed of a set of frontier places and steps,
themselves composed of events.
We construct a graph based on the transitions that occur in the composition
of the events of the given solution.
:param output_dir: Output path.
:param sol_steps: A generator of tuples of "frontier places" and a list of
events in each step.
:Example:
.. code-block:: python
("Bx Ax", [['h2', 'h00'], ['h3'], ['h0', 'h1'], ['hlast']])
:param transitions: A dictionnary of events as keys, and transitions as values.
Since many transitions can define an event, values are lists.
Each transition is a tuple with: origin node, final node, attributes
like label and condition.
:Example:
.. code-block:: python
{'h00': [('Ax', 'n1', {'label': 'h00[]'}),]
:type output_dir:
:type sol_steps: , >
:type transitions: , , : >>>
"""
for sol_index, (sol, steps) in enumerate(sol_steps):
# build_graph() returns :
# G, transition_nodes, all_nodes, edges_in_cond, edges
# sol_index is used to order files according to the order of appearance
# in the file
export_graph(output_dir, sol, sol_index,
build_graph(sol, steps, transitions)[0])
def solutions_2_common_graph(output_dir, model_file, path):
"""Entry point for solutions_2_common_graph
Create a GraphML formated file containing a unique representation of **all**
trajectories corresponding to all solutions in each complete MAC files
(*mac_complete files).
This is a function to visualize paths taken by the solver from the boundaries
to the entities of interest.
This functions tests if the given path is a directory or a file.
:param output_dir: Output path.
:param model_file: Filepath of the model.
:param path: Filepath/directory of a/many complete solutions files.
:type output_dir:
:type model_file:
:type path:
"""
def get_solution_graph(sol_steps, transitions):
"""Generator that yields the graph of the given solutions.
.. note:: See the doc of a similar function
:meth:`~cadbiom_cmd.solution_sort.convert_solution_file_to_graphs`.
"""
for sol, steps in sol_steps:
# build_graph() returns :
# G, transition_nodes, all_nodes, edges_in_cond, edges
# Python 3: partial unpacking: G, *_
yield build_graph(sol, steps, transitions)[0]
# Check valid input file/directory
assert os.path.isfile(path) or os.path.isdir(path)
# Get transitions from the model
model_transitions = get_transitions_from_model_file(model_file)
if os.path.isfile(path):
# The given path is a solution file
graphs = get_solution_graph(
load_solutions(path),
model_transitions
)
# Get query string from the name of the solution file
query = get_query_from_filename(model_file, path)
# Write graph
export_graph(output_dir, query, '', merge_graphs(graphs))
elif os.path.isdir(path):
# The given path is a directory
path = path if path[-1] == '/' else path + '/'
# Decompilation of all files in the directory
file_number = 0
for file_number, solution_file in \
enumerate(glob.glob(path + '*mac_complete.txt'), 1):
# Get query string from the name of the solution file
query = get_query_from_filename(model_file, solution_file)
LOGGER.info("Processing %s query...", query)
graphs = get_solution_graph(
load_solutions(solution_file),
model_transitions
)
# Write graph
export_graph(output_dir, query, '', merge_graphs(graphs))
LOGGER.info("Files processed: %s", file_number)
assert file_number != 0, "No *mac_complete.txt files found!"
def merge_graphs(graphs):
"""Merge graphs in the given iterable; count and add the weights to the edges
of the final graph
:param graphs: Networkx graph objects.
:type graphs: >
:return: Networkx graph object.
:rtype:
"""
G = nx.DiGraph()
for graph in graphs:
missing_nodes = set(graph.nodes_iter()) - set(G.nodes_iter())
if missing_nodes:
# Add missing nodes in G from the current graph
# Build a tuple (node_name, attrs)
G.add_nodes_from((node, graph.node[node]) for node in missing_nodes)
for ori, ext, data in graph.edges_iter(data=True):
if G.has_edge(ori, ext):
# Update the edge
G[ori][ext]['weight'] += 1
else:
# Add the missing edge
G.add_edge(ori, ext, attr_dict=data, weight=1)
return G
## Matrices of occurrences #####################################################
def solutions_2_occcurrences_matrix(output_dir, model_file, path,
transposed=False, normalized=False):
"""Entry point for solutions_2_occcurrences_matrix
See :meth:`~cadbiom_cmd.solution_sort.occurrence_matrix`.
:param output_dir: Output path.
:param model_file: Filepath of the model.
:param path: Directory of many complete solutions files.
:param transposed: (Optional) Transpose the final matrix (switch columns and rows).
:type output_dir:
:type model_file:
:type path:
:type transposed:
"""
# Check valid input directory
assert os.path.isdir(path)
path = path if path[-1] == '/' else path + '/'
# Make matrix
occurrence_matrix(output_dir, model_file, path)
if transposed:
transpose_csv(input_file=output_dir + 'occurrence_matrix.csv',
output_file=output_dir + 'occurrence_matrix_t.csv')
def occurrence_matrix(output_dir, model_file, path,
matrix_filename='occurrence_matrix.csv'):
"""Make a matrix of occurrences for the solutions in the given path.
- Compute occurrences of each place in all `mac.txt` files.
- Save the matrix in csv format with the following columns:
Fieldnames: "patterns (number)/places (number);mac_number;frontier places"
Each request (pattern) is accompanied by the number of solutions found.
.. todo:: Split the creation and writing of the matrix in 2 functions.
:param output_dir: Output path.
:param model_file: Filepath of the model.
:param path: Directory of many complete solutions files.
:param matrix_filename: (Optional) Filename of the matrix file.
:type output_dir:
:type model_file:
:type path:
:type matrix_filename:
:return: A dictionnary with the matrix object.
keys: queries, values: occurrences of frontier places
:rtype:
"""
# Key: Logical formula as input of Cadbiom
# Value: Number of each place in all solutions of the current file
matrix = defaultdict(Counter)
# All frontier places in all mac files
all_frontier_places = set()
# Compute occurrences of each place in all mac files
file_number = 0
for file_number, filepath in enumerate(glob.glob(path + '*mac.txt'), 1):
# gene pattern
# pattern = {gene for gene in genes if gene in mac}
# Get query string from the name of the solution file
# From: 'MODEL_NAME_PLACE1 and not PLACE2 and not PLACE3_mac.txt'
# Get: 'PLACE1 and not PLACE2 and not PLACE3'
query = get_query_from_filename(model_file, filepath)
mac_number = 0
for mac_number, mac_line in enumerate(get_mac_lines(filepath), 1):
frontier_places = set(mac_line.split(' '))
# Update set of all frontier places
all_frontier_places.update(frontier_places)
# Update counter of places => compute frequencies
matrix[query] += Counter(frontier_places)
# Set the mac_number for future standardization
matrix[query]["mac_number"] = mac_number
LOGGER.info("Files processed: %s", file_number)
assert file_number != 0, "No *mac.txt files found!"
# Save the matrix
# columns: "patterns (number)/places (number);mac_number;frontier places"
with open(output_dir + matrix_filename, 'w') as f_d:
# Forge header
header = "patterns ({})/places ({})".format(
len(matrix),
len(all_frontier_places),
)
writer = csv.DictWriter(
f_d,
delimiter=str(';'),
restval=0, # default value for frequency
fieldnames=[header, "mac_number"] + list(all_frontier_places))
writer.writeheader()
# Add a last line in the csv: total of occurrences for each place
global_frontier_counter = Counter()
# The first column is composed of the query + the number of solutions for it
for query, row in matrix.iteritems():
global_frontier_counter += row
# PS: THIS modifies the matrix by adding a new key ('header')
row[header] = "{} ({})".format(query, row["mac_number"])
writer.writerow(row)
# Total of occurrences at the end of the file
global_frontier_counter[header] = "Total of occurrences"
writer.writerow(global_frontier_counter)
return matrix
def transpose_csv(input_file='occurrence_matrix.csv',
output_file='occurrence_matrix_t.csv'):
"""Useful function to transpose a csv file x,y => y,x
.. note:: The csv file must be semicolon ';' separated.
:param input_file: Input file.
:param output_file: Output file transposed.
:type input_file:
:type output_file:
"""
# Transpose file
# PS: izip('ABCD', 'xy') --> Ax By
data = it.izip(*csv.reader(open(input_file, "r"), delimiter=str(';')))
csv.writer(open(output_file, "w"), delimiter=str(';')).writerows(data)