Commit 1406badf authored by VIGNET Pierre's avatar VIGNET Pierre
Browse files

[cmd] black; PEP compliance

parent f7e5a2a5
from pkg_resources import get_distribution
__version__ = get_distribution('cadbiom_cmd').version
__version__ = get_distribution("cadbiom_cmd").version
......@@ -181,6 +181,7 @@ def queries_2_clustermap(args):
"""
# Module import
import queries_2_clustermap
# output, path
queries_2_clustermap.queries_2_clustermap(**args)
......@@ -344,21 +345,25 @@ class ReadableDir(argparse.Action):
prospective_dir = values
if not os.path.isdir(prospective_dir):
LOGGER.error("readable_dir:<{}> is not a valid path".format(prospective_dir))
LOGGER.error(
"readable_dir:<{}> is not a valid path".format(prospective_dir)
)
exit()
if os.access(prospective_dir, os.R_OK):
setattr(namespace, self.dest, prospective_dir)
else:
LOGGER.error("readable_dir:<{}> is not a readable dir".format(prospective_dir))
LOGGER.error(
"readable_dir:<{}> is not a readable dir".format(prospective_dir)
)
exit()
class CustomFormatter(
argparse.ArgumentDefaultsHelpFormatter,
argparse.RawDescriptionHelpFormatter
argparse.ArgumentDefaultsHelpFormatter, argparse.RawDescriptionHelpFormatter
):
"""Formatter used to print default values AND do not format raw text"""
pass
......@@ -683,9 +688,7 @@ def main():
# Solution file (mac.txt)
# Output (csv)
parser_merge_macs = subparsers.add_parser(
"merge_macs",
help=merge_macs.__doc__,
formatter_class=custom_formatter,
"merge_macs", help=merge_macs.__doc__, formatter_class=custom_formatter
)
parser_merge_macs.add_argument("solutions_directory", nargs="?", default="result/")
parser_merge_macs.add_argument(
......
......@@ -98,11 +98,18 @@ def get_solutions_and_related_places_from_file(file_path):
# yield (tuple(solution['solution'].split(' ')), places)
yield (
tuple(solution['solution'].split(' ')),
set(it.chain(*((transition['ext'], transition['ori'])
for step in solution['steps']
for event in step
for transition in event['transitions']))))
tuple(solution["solution"].split(" ")),
set(
it.chain(
*(
(transition["ext"], transition["ori"])
for step in solution["steps"]
for event in step
for transition in event["transitions"]
)
)
),
)
def get_solutions_and_related_places(path):
......@@ -135,12 +142,10 @@ def get_solutions_and_related_places(path):
elif os.path.isdir(path):
# The given path is a directory
path = path if path[-1] == '/' \
else path + '/'
path = path if path[-1] == "/" else path + "/"
file_number = 0
for file_number, file_path in \
enumerate(glob.glob(path + '*_decomp.json'), 1):
for file_number, file_path in enumerate(glob.glob(path + "*_decomp.json"), 1):
for sol_places in get_solutions_and_related_places_from_file(file_path):
yield sol_places
......@@ -205,17 +210,19 @@ def filter_trajectories(trajectories, molecules_of_interest):
# Associate the entity of interest with the frontier places and
# count their occurences.
# The count is kept in order to weight the edges of the graph.
binary_interactions[searched_molec] += Counter(sol)#trajectory_places
binary_interactions[searched_molec] += Counter(sol) # trajectory_places
entity_found = True
return entity_found
binary_interactions = defaultdict(Counter)
# Almost one entity of interest is found in the trajectory
# => we keep the solution
filtered_macs = tuple(sol for sol, trajectory_places in trajectories
if entity_is_in_trajectory(sol, trajectory_places))
filtered_macs = tuple(
sol
for sol, trajectory_places in trajectories
if entity_is_in_trajectory(sol, trajectory_places)
)
assert filtered_macs, "No molecule of interest was found."
......@@ -273,15 +280,17 @@ def build_interactions(filtered_macs, binary_interactions):
for frontier_places in filtered_macs:
# Extract genes from the frontier places
genes = {frontier_place for frontier_place in frontier_places
if '_gene' in frontier_place}
genes = {
frontier_place
for frontier_place in frontier_places
if "_gene" in frontier_place
}
# Stimulis are the other places
stimuli = set(frontier_places) - genes
all_genes.update(genes)
all_stimuli.update(stimuli)
# For every solution composed of frontier places,
# compute binary interactions between:
# genes and stimulis
......@@ -338,12 +347,15 @@ def build_interactions(filtered_macs, binary_interactions):
# Get occurences for each binary interaction (molecule, frontier_place)
return {molec_place: places_occurrences[molec_place[1]]
for molec_place in it.product([molecule_of_interest], frontier_places)}
g = (make_molec_stimuli_interactions(*data)
for data in binary_interactions.iteritems())
return {
molec_place: places_occurrences[molec_place[1]]
for molec_place in it.product([molecule_of_interest], frontier_places)
}
g = (
make_molec_stimuli_interactions(*data)
for data in binary_interactions.iteritems()
)
# warning: marchera pas si la molec of interest est dans les conditions
# et non dans les places des trajectoires...
# => Comme si la molecule n'était pas trouvée
......@@ -415,11 +427,11 @@ def build_graph(output_dir, all_genes, all_stimuli, genes_interactions,
def colormap(node):
"""Get color assigned to the given node depending on its group"""
if node in all_genes:
return 'red'
return "red"
elif node in all_stimuli:
return 'blue'
return "blue"
# Molecule of interest
return 'yellow'
return "yellow"
G = nx.DiGraph()
......@@ -434,17 +446,16 @@ def build_graph(output_dir, all_genes, all_stimuli, genes_interactions,
# Add all edges
# PS Syntax: G.add_weighted_edges_from([(0,1,3.0),(1,2,7.5)],color='red')
# Interactions between genes in the same solution
G.add_weighted_edges_from(
build_edge_data(genes_interactions), color='red')
G.add_weighted_edges_from(build_edge_data(genes_interactions), color="red")
# Interactions between stimuli in the same solution
#G.add_weighted_edges_from(build_edge_data(stimulis_interactions), color='blue')
# G.add_weighted_edges_from(build_edge_data(stimulis_interactions), color='blue')
# Interactions between genes and stimulis in the same solution
G.add_weighted_edges_from(
build_edge_data(genes_stimuli_interactions), color='red')
G.add_weighted_edges_from(build_edge_data(genes_stimuli_interactions), color="red")
# Interactions between molecules of interest and frontier places
# that are not genes in trajectories
G.add_weighted_edges_from(
build_edge_data(molecule_stimuli_interactions), color='yellow')
build_edge_data(molecule_stimuli_interactions), color="yellow"
)
# Write graph
nx.write_graphml(G, output_dir + "interaction_graph.graphml")
......@@ -478,12 +489,9 @@ def json_2_interaction_graph(output_dir, molecules_of_interest, path):
output_dir,
*build_interactions(
*filter_trajectories(
get_solutions_and_related_places(path),
molecules_of_interest
get_solutions_and_related_places(path), molecules_of_interest
)
)
)
LOGGER.info("Graph generated in %s", time.time() - chrono_start)
......@@ -41,24 +41,31 @@ import networkx.algorithms.isomorphism as iso
# Library imports
from cadbiom.models.biosignal.sig_expr import *
from cadbiom.models.guard_transitions.translators.chart_xml \
import MakeModelFromXmlFile
from cadbiom.models.guard_transitions.translators.chart_xml import MakeModelFromXmlFile
from cadbiom.models.guard_transitions.analyser.static_analysis import StaticAnalyzer
from tools.models import Reporter
from tools.models import get_transitions, \
get_frontier_places, \
get_model_identifier_mapping
from tools.models import (
get_transitions,
get_frontier_places,
get_model_identifier_mapping,
)
from tools.models import get_places_data
from tools.graphs import build_graph, get_json_graph, export_graph, get_solutions_graph_data
from tools.graphs import (
build_graph,
get_json_graph,
export_graph,
get_solutions_graph_data,
)
import cadbiom.commons as cm
LOGGER = cm.logger()
def graph_isomorph_test(model_file_1, model_file_2, output_dir='graphs/',
make_graphs=False, make_json=False):
def graph_isomorph_test(
model_file_1, model_file_2, output_dir="graphs/", make_graphs=False, make_json=False
):
"""Entry point for model consistency checking.
This functions checks if the graphs based on the two given models have
......@@ -135,19 +142,18 @@ def graph_isomorph_test(model_file_1, model_file_2, output_dir='graphs/',
G2 = res_2[0]
# Checking
nm = iso.categorical_node_match('color', 'grey')
em = iso.categorical_edge_match('color', '')
check_state = \
{
'topology': nx.is_isomorphic(G1, G2),
'nodes': nx.is_isomorphic(G1, G2, node_match=nm),
'edges': nx.is_isomorphic(G1, G2, edge_match=em),
nm = iso.categorical_node_match("color", "grey")
em = iso.categorical_edge_match("color", "")
check_state = {
"topology": nx.is_isomorphic(G1, G2),
"nodes": nx.is_isomorphic(G1, G2, node_match=nm),
"edges": nx.is_isomorphic(G1, G2, edge_match=em),
}
LOGGER.info("Topology checking: %s", check_state['topology'])
LOGGER.info("Nodes checking: %s", check_state['nodes'])
LOGGER.info("Edges checking: %s", check_state['edges'])
LOGGER.info("Topology checking: %s", check_state["topology"])
LOGGER.info("Nodes checking: %s", check_state["nodes"])
LOGGER.info("Edges checking: %s", check_state["edges"])
# Draw graph
if make_graphs:
......@@ -156,8 +162,8 @@ def graph_isomorph_test(model_file_1, model_file_2, output_dir='graphs/',
# Export to JSON file
if make_json:
with open(output_dir + "graph_isomorphic_test.json", 'w') as fd:
fd.write(json.dumps(check_state, sort_keys=True, indent=4) + '\n')
with open(output_dir + "graph_isomorphic_test.json", "w") as fd:
fd.write(json.dumps(check_state, sort_keys=True, indent=4) + "\n")
return check_state
......@@ -243,12 +249,12 @@ def low_graph_info(model_file, graph_data=False, centralities=False):
return res_1, front_places
info = {
'modelFile': model_file,
'modelName': model.name,
'events:': len(transitions_1), # One event can have multiple transitions
'entities': len(all_places_1), # places
'boundaries': len(front_places), # frontier places
'transitions': len(model.transition_list),
"modelFile": model_file,
"modelName": model.name,
"events:": len(transitions_1), # One event can have multiple transitions
"entities": len(all_places_1), # places
"boundaries": len(front_places), # frontier places
"transitions": len(model.transition_list),
}
get_solutions_graph_data(G, info, centralities)
......@@ -258,8 +264,9 @@ def low_graph_info(model_file, graph_data=False, centralities=False):
return res_1, front_places, info
def low_model_info(model_file,
all_entities=False, boundaries=False, genes=False, smallmolecules=False):
def low_model_info(
model_file, all_entities=False, boundaries=False, genes=False, smallmolecules=False
):
"""Low level function for :meth:`~cadbiom_cmd.models.model_info`.
Get JSON data with information about the model and its entities.
......@@ -347,12 +354,12 @@ def low_model_info(model_file,
# Basic informations
info = {
'modelFile': model_file,
'modelName': model.name,
'events:': len(transitions_1), # One event can have multiple transitions
'entities': len(all_places_1), # places
'boundaries': len(front_places), # frontier places
'transitions': len(model.transition_list),
"modelFile": model_file,
"modelName": model.name,
"events:": len(transitions_1), # One event can have multiple transitions
"entities": len(all_places_1), # places
"boundaries": len(front_places), # frontier places
"transitions": len(model.transition_list),
}
# Complete the data with StaticAnalysis
......@@ -371,24 +378,26 @@ def low_model_info(model_file,
# Filter places
if all_entities:
info['entitiesData'] = get_places_data(all_places_1, model)
info["entitiesData"] = get_places_data(all_places_1, model)
if boundaries:
info['entitiesData'] = get_places_data(front_places, model)
info["entitiesData"] = get_places_data(front_places, model)
if genes:
g = (place_name for place_name in all_places_1 if '_gene' in place_name)
info['entitiesData'] = get_places_data(g, model)
g = (place_name for place_name in all_places_1 if "_gene" in place_name)
info["entitiesData"] = get_places_data(g, model)
if smallmolecules:
# Filter on entityTypes
info['entitiesData'] = \
[data for data in get_places_data(all_places_1, model)
if data["entityType"] == "SmallMolecule"]
info["entitiesData"] = [
data
for data in get_places_data(all_places_1, model)
if data["entityType"] == "SmallMolecule"
]
# Edit places and insert immediate successors... It is ugly but requested...
# Another request that brings overhead for nothing...
for place in info['entitiesData']:
for place in info["entitiesData"]:
place["immediateSuccessors"] = places_successors[place["cadbiomName"]]
return info
......@@ -404,30 +413,30 @@ def model_identifier_mapping(model_file, *args, **kwargs):
:type external_file: <str>
:type external_identifiers: <list>
"""
if kwargs.get('external_file', None):
with open(kwargs['external_file'], 'r') as f_d:
external_identifiers = set(line.strip('\n').strip('\r') for line in f_d)
if kwargs.get("external_file", None):
with open(kwargs["external_file"], "r") as f_d:
external_identifiers = set(line.strip("\n").strip("\r") for line in f_d)
else:
external_identifiers = set(kwargs['external_identifiers'])
external_identifiers = set(kwargs["external_identifiers"])
mapping = get_model_identifier_mapping(model_file, external_identifiers)
# Make CSV file
with open("mapping.csv", 'w') as csvfile:
writer = csv.writer(csvfile, delimiter=str(';'))
with open("mapping.csv", "w") as csvfile:
writer = csv.writer(csvfile, delimiter=str(";"))
# Header
writer.writerow(["external identifiers", "cadbiom identifiers"])
# Join multiple Cadbiom names with a |
g = ((external_id, "|".join(cadbiom_names))
for external_id, cadbiom_names in mapping.iteritems())
g = (
(external_id, "|".join(cadbiom_names))
for external_id, cadbiom_names in mapping.iteritems()
)
writer.writerows(g)
def model_graph(model_file, output_dir='./graphs/',
centralities=False,
**kwargs):
def model_graph(model_file, output_dir="./graphs/", centralities=False, **kwargs):
"""Get quick information and make a graph based on the model.
:param model_file: File for the '.bcx' model.
......@@ -447,9 +456,9 @@ def model_graph(model_file, output_dir='./graphs/',
"""
# Bind arguments to avoid overwriting previous imports
make_json = kwargs['json']
make_graph = kwargs['graph']
make_json_graph = kwargs['json_graph']
make_json = kwargs["json"]
make_graph = kwargs["graph"]
make_json_graph = kwargs["json_graph"]
# If json is not set, remove centralities parameter (time consuming)
if not make_json:
......@@ -463,14 +472,17 @@ def model_graph(model_file, output_dir='./graphs/',
if make_json_graph:
# Pass a Networkx graph and get dictionary
json_data = get_json_graph(res_1[0])
with open(output_dir + "graph.json", 'w') as f_d:
with open(output_dir + "graph.json", "w") as f_d:
f_d.write(json.dumps(json_data, indent=2))
# Draw graph
if make_graph:
export_graph(output_dir, front_places,
urllib_quote(model_graph_info['modelName'], safe=''),
*res_1)
export_graph(
output_dir,
front_places,
urllib_quote(model_graph_info["modelName"], safe=""),
*res_1
)
# Export to json file
if make_json:
......@@ -513,19 +525,21 @@ def model_info(model_file, output_dir='./',
"""
# Bind arguments to avoid overwriting previous imports
make_json = kwargs['json']
make_csv = kwargs['csv']
make_json = kwargs["json"]
make_csv = kwargs["csv"]
if not (make_json and make_csv):
default = True
def dump_places_to_csv(entities_data, output_filename):
"""Write informations about places in the model to a csv."""
with open(output_filename, 'w') as csvfile:
with open(output_filename, "w") as csvfile:
# Get all database names
database_names = \
{db_name for place in entities_data
for db_name in place.get('xrefs', dict()).iterkeys()}
database_names = {
db_name
for place in entities_data
for db_name in place.get("xrefs", dict()).iterkeys()
}
# Write headers
fieldnames = (
......@@ -536,7 +550,7 @@ def model_info(model_file, output_dir='./',
writer = csv.DictWriter(
csvfile,
fieldnames=fieldnames,
extrasaction='ignore', # Ignore keys not found in fieldnames (xrefs)
extrasaction="ignore", # Ignore keys not found in fieldnames (xrefs)
)
writer.writeheader()
......@@ -546,10 +560,14 @@ def model_info(model_file, output_dir='./',
# Join names with a pipe...
# Handle escaped unicode characters in model
# Ex: \u03b2-catenin => β-Catenin
temp_place['names'] = "|".join(place.get('names', list())).encode("utf-8")
temp_place['immediateSuccessors'] = "|".join(place["immediateSuccessors"]).encode("utf-8")
temp_place["names"] = "|".join(place.get("names", list())).encode(
"utf-8"
)
temp_place["immediateSuccessors"] = "|".join(
place["immediateSuccessors"]
).encode("utf-8")
# Join xrefs ids with a pipe...
for db_name, db_ids in place.get('xrefs', dict()).iteritems():
for db_name, db_ids in place.get("xrefs", dict()).iteritems():
temp_place[db_name] = "|".join(db_ids).encode("utf-8")
writer.writerow(temp_place)
......@@ -576,8 +594,7 @@ def model_info(model_file, output_dir='./',
return
model_info = low_model_info(
model_file,
all_entities, boundaries, genes, smallmolecules
model_file, all_entities, boundaries, genes, smallmolecules
)
# Export to csv file
......
......@@ -28,7 +28,8 @@ import glob
import csv
import itertools as it
def merge_macs_to_csv(directory, output_dir, csvfile='merged_macs.csv'):
def merge_macs_to_csv(directory, output_dir, csvfile="merged_macs.csv"):
"""Merge \*mac.txt files from a directory to a csv file.
:Structure of the CSV file:
......@@ -38,30 +39,30 @@ def merge_macs_to_csv(directory, output_dir, csvfile='merged_macs.csv'):
"""
# Add dir separator to the end if not present
directory = directory + '/' if directory[-1] != '/' else directory
directory = directory + "/" if directory[-1] != "/" else directory
csv_data = list()
# Read all files in the given directory
for filename in glob.glob(directory + '*_mac.txt'):
#print(filename)
for filename in glob.glob(directory + "*_mac.txt"):
# print(filename)
# Extract the formula from the filename
# ex:
# # ['./result/model_name', 'TGFB1', 'mac.txt']
formula = ''.join(filename.split('_')[1:-1])
formula = "".join(filename.split("_")[1:-1])
# Read the content of the mac file & memorize this content
with open(filename) as f_d:
# Add the formula column, before each mac to futur csv file
csv_data.append([[formula] + [line.rstrip('\n')] for line in f_d])
csv_data.append([[formula] + [line.rstrip("\n")] for line in f_d])
# Write the final csv
with open(output_dir + csvfile, 'w') as f_d:
writer = csv.writer(f_d, delimiter=str(';'))
with open(output_dir + csvfile, "w") as f_d:
writer = csv.writer(f_d, delimiter=str(";"))
writer.writerows(it.chain(*csv_data))
if __name__ == "__main__":
merge_macs_to_csv('result')
merge_macs_to_csv("result")
......@@ -148,7 +148,7 @@ def get_dimacs_start_properties(mcla, previous_frontier_places):
"""
dimacs_start = [
[-mcla.unfolder.var_dimacs_code(place) for place in frontier_places]
for frontier_places in previous_frontier_places
for frontier_places in previous_frontier_places
]
return dimacs_start
......
......@@ -100,14 +100,17 @@ def sort_solutions_in_file(filepath):
solutions = dict()
with open(filepath, 'r+') as f_d:
with open(filepath, "r+") as f_d:
# Get old line as key and ordered line as value
for line, stripped_line in get_solutions(f_d):
# Sort in lower case, remove ' ' empty elements
solutions[line] = \
" ".join(sorted([place for place in stripped_line.split(' ')
if place != ' '], key=lambda s: s.lower()))
solutions[line] = " ".join(
sorted(
[place for place in stripped_line.split(" ") if place != " "],
key=lambda s: s.lower(),
)
)
# Rewind the whole file
f_d.seek(0)
......@@ -146,8 +149,8 @@ def solutions_sort(path):
if os.path.isdir(path):
# Recursive search of *mac* files
# (mac.txt, mac_complete.txt, mac_step.txt)
path = path if path[-1