# Copyright CNRS/Inria/UNS # Contributor(s): Eric Debreuve (since 2019), Morgane Nadal (2020) # # eric.debreuve@cnrs.fr # # This software is governed by the CeCILL license under French law and # abiding by the rules of distribution of free software. You can use, # modify and/ or redistribute the software under the terms of the CeCILL # license as circulated by CEA, CNRS and INRIA at the following URL # "http://www.cecill.info". # # As a counterpart to the access to the source code and rights to copy, # modify and redistribute granted by the license, users are provided only # with a limited warranty and the software's author, the holder of the # economic rights, and the successive licensors have only limited # liability. # # In this respect, the user's attention is drawn to the risks associated # with loading, using, modifying and/or developing or reproducing the # software by the user in light of its specific status of free software, # that may mean that it is complicated to manipulate, and that also # therefore means that it is reserved for developers and experienced # professionals having in-depth computer knowledge. Users are therefore # encouraged to load and test the software's suitability as regards their # requirements in conditions enabling the security of their systems and/or # data to be ensured and, more generally, to use and operate it in the # same conditions as regards security. # # The fact that you are presently reading this means that you have had # knowledge of the CeCILL license and that you accept its terms. # Time profiling: # python -m cProfile -o runtime/profiling.log -s name main.py # Memory profiling: # python -m memory_profiler main.py # or # mprof run main.py # mprof plot import brick.component.connection as cn_ import brick.component.extension as xt_ import brick.component.soma as sm_ import brick.general.feedback as fb_ import brick.processing.input as in_ import brick.processing.plot as po_ # import brick.processing.image_verification as iv_ # from main_prm import * from sklgraph.skl_fgraph import skl_graph_t from sklgraph.skl_graph import plot_mode_e from sklgraph.skl_map import skl_map_t import os as os_ import sys as sy_ import time as tm_ # import tracemalloc as tr_ import matplotlib.pyplot as pl_ import numpy as np_ import skimage.io as io_ # from skimage.segmentation import relabel_sequential import skimage.morphology as mp_ import skimage.measure as ms_ import networkx as nx_ print(sy_.argv, sy_.argv.__len__()) if sy_.argv.__len__() < 2: print("Missing parameter file argument") sy_.exit(0) if not (os_.path.isfile(sy_.argv[1]) and os_.access(sy_.argv[1], os_.R_OK)): print("Wrong parameter file path or parameter file unreadable") sy_.exit(0) data_path = None channel = None size_voxel_in_micron = None soma_low_c = None soma_high_c = None soma_selem_micron_c = None soma_min_area_c = None ext_low_c = None ext_high_c = None ext_selem_micron_c = None ext_min_area_c = None max_straight_sq_dist_c = None max_weighted_length_c = None scale_range = None scale_step = None alpha = None beta = None frangi_c = None bright_on_dark = None method = None with_plot = None in_parallel = None exec(open(sy_.argv[1]).read()) # Only with parameters.py (older versions) or search_parameters.py (stable version) soma_t = sm_.soma_t extension_t = xt_.extension_t print(f"STARTED: {tm_.strftime('%a, %b %d %Y @ %H:%M:%S')}") start_time = tm_.time() # tr_.start() # --- Images # Find the dimension of the image voxels in micron size_voxel_in_micron = in_.FindVoxelDimensionInMicron(data_path, size_voxel_in_micron=size_voxel_in_micron) image = io_.imread(data_path) # Image size verification - simple version without user interface image = in_.ImageVerification(image, channel) # iv_.image_verification(image, channel) # -> PySide2 interface # TODO: must return the modified image! # /!\ conflicts between some versions of PySide2 and Python3 image = image[:, 800:, 800:] # 512 # 562 # Just for development img_shape = image.shape # pl_.matshow(image[image.shape[0] // 2, :, :]) # pl_.show() # # max_img = image.max() # image.fill(image.min()) # image[2:3,10:11,:] = max_img # print(f"IMAGE: s.{img_shape} t.{image.dtype} m.{image.min()} M.{image.max()}") # Intensity relative normalization (between 0 and 1). # TODO: change Frangi parameters to take the normalization btw 0 and 1 into account ? # image_for_soma = in_.IntensityNormalizedImage(image) # image_for_ext = in_.IntensityNormalizedImage(image) # This normalization is essential for the detection of extension and the connexions to the somas! image_for_soma = in_.NormalizedImage(image) image_for_ext = in_.NormalizedImage(image) print(f"NRM-IMG: t.{image_for_soma.dtype} m.{image_for_soma.min():.2f} M.{image_for_soma.max():.2f}") # --- Initialization som_nfo = {} ext_nfo = {} axes = {} # n_somas = 0 # n_extensions = 0 # somas = None # Tuple of soma objects # extensions = None # Tuple of extension objects # --- Somas print("\n--- Soma Detection") soma_min_area_c = in_.ToPixel(soma_min_area_c, size_voxel_in_micron, dimension=(0, 1)) soma_selem_c = mp_.disk(in_.ToPixel(soma_selem_micron_c, size_voxel_in_micron)) som_nfo["map"] = soma_t.Map(image_for_soma, soma_low_c, soma_high_c, soma_selem_c) som_nfo["map"], som_lmp = soma_t.FilteredMap(som_nfo["map"], soma_min_area_c) som_nfo["lmp"], n_somas = ms_.label(som_nfo["map"], return_num=True) # som_nfo["lmp"] = relabel_sequential(som_lmp)[0] # Use relabel instead of label to optimize the algorithm. # n_somas = som_nfo["lmp"].max() som_nfo["dist_to_closest"], som_nfo["influence_map"] = soma_t.InfluenceMaps( som_nfo["lmp"] ) somas = tuple( soma_t().FromMap(som_nfo["lmp"], uid) for uid in range(1, n_somas + 1) ) print(f" n = {n_somas}") elapsed_time = tm_.gmtime(tm_.time() - start_time) print(f"\nElapsed Time={tm_.strftime('%Hh %Mm %Ss', elapsed_time)}") if with_plot: fb_.PlotSomas(somas, som_nfo, axes) # -- Extentions print("\n--- Extension Detection") ext_min_area_c = in_.ToPixel(ext_min_area_c, size_voxel_in_micron, dimension=(0, 1)) ext_selem_pixel_c = mp_.disk(in_.ToPixel(ext_selem_micron_c, size_voxel_in_micron)) scale_range_pixel = [] for value in scale_range: value_in_pixel = in_.ToPixel(value, size_voxel_in_micron, decimals=1) scale_range_pixel.append(value_in_pixel) scale_range = tuple(scale_range_pixel) scale_step = in_.ToPixel(scale_step, size_voxel_in_micron, decimals=1) alpha = in_.ToPixel(alpha, size_voxel_in_micron, decimals=1) beta = in_.ToPixel(beta, size_voxel_in_micron, decimals=1) frangi_c = in_.ToPixel(frangi_c, size_voxel_in_micron) enhanced_ext, ext_scales = extension_t.EnhancedForDetection( image_for_ext, scale_range, scale_step, alpha, beta, frangi_c, bright_on_dark, method, in_parallel=in_parallel) ext_nfo["coarse_map"] = extension_t.CoarseMap(enhanced_ext, ext_low_c, ext_high_c, ext_selem_pixel_c) ext_nfo["coarse_map"], ext_lmp = extension_t.FilteredCoarseMap(ext_nfo["coarse_map"], ext_min_area_c) ext_nfo["map"] = extension_t.FineMapFromCoarseMap(ext_nfo["coarse_map"]) ext_nfo["map"][som_nfo["map"] > 0] = 0 ext_nfo["lmp"], n_extensions = ms_.label(ext_nfo["map"], return_num=True) # Use relabel instead of label to optimize the algorithm. BUT PROBLEM WITH THE NUMBER OF EXTENSIONS DETECTED ! # ext_nfo["lmp"] = relabel_sequential(ext_lmp)[0] # n_extensions = ext_nfo["lmp"].max() extensions = tuple( extension_t().FromMap(ext_nfo["lmp"], ext_scales, uid) for uid in range(1, n_extensions + 1) ) print(f" n = {n_extensions}") elapsed_time = tm_.gmtime(tm_.time() - start_time) print(f"\nElapsed Time={tm_.strftime('%Hh %Mm %Ss', elapsed_time)}") if with_plot: fb_.PlotExtensions(extensions, ext_nfo, img_shape) # -- Preparation for Connections dijkstra_costs = in_.DijkstraCosts(image, som_nfo["map"], ext_nfo["map"]) # -- Soma-Extention print("\n--- Soma <-> Extension") max_straight_sq_dist_c = in_.ToPixel(max_straight_sq_dist_c, size_voxel_in_micron) max_weighted_length_c = in_.ToPixel(max_weighted_length_c, size_voxel_in_micron) candidate_conn_nfo = cn_.CandidateConnections( somas, som_nfo["influence_map"], som_nfo["dist_to_closest"], extensions, max_straight_sq_dist_c, ) for ep_idx, soma, extension, end_point in candidate_conn_nfo: if extension.is_unconnected: print(f" Soma.{soma.uid} <-?-> Ext.{extension.uid}({ep_idx})", end="") path, length = cn_.ShortestPathFromToN( end_point, dijkstra_costs, soma.ContourPointsCloseTo, max_straight_sq_dist=max_straight_sq_dist_c, ) if length <= max_weighted_length_c: cn_.ValidateConnection(soma, extension, path, dijkstra_costs) print(": Made") else: print("") # for soma in somas: # soma.Extend(extensions, som_nfo["dist_to_closest"], dijkstra_costs) fb_.PrintConnectedExtensions(extensions) elapsed_time = tm_.gmtime(tm_.time() - start_time) print(f"\nElapsed Time={tm_.strftime('%Hh %Mm %Ss', elapsed_time)}") if with_plot: fb_.PlotSomasWithExtensions(somas, som_nfo, "all") # -- Extention-Extention print("\n--- Extension <-> Extension") should_look_for_connections = True while should_look_for_connections: som_nfo["soma_w_ext_lmp"] = soma_t.SomasLMap(somas) som_nfo["dist_to_closest"], som_nfo["influence_map"] = soma_t.InfluenceMaps( som_nfo["soma_w_ext_lmp"] ) candidate_conn_nfo = cn_.CandidateConnections( somas, som_nfo["influence_map"], som_nfo["dist_to_closest"], extensions, max_straight_sq_dist_c, ) should_look_for_connections = False for ep_idx, soma, extension, end_point in candidate_conn_nfo: if extension.is_unconnected: print(f" Soma.{soma.uid} <-?-> Ext.{extension.uid}({ep_idx})", end="") path, length = cn_.ShortestPathFromToN( end_point, dijkstra_costs, soma.ExtensionPointsCloseTo, max_straight_sq_dist=max_straight_sq_dist_c, ) if length <= max_weighted_length_c: tgt_extenstion = extension_t.ExtensionContainingSite(extensions, path[-1]) cn_.ValidateConnection(tgt_extenstion, extension, path, dijkstra_costs) should_look_for_connections = True print(": Made") else: print("") fb_.PrintConnectedExtensions(extensions) elapsed_time = tm_.gmtime(tm_.time() - start_time) print(f"\nElapsed Time={tm_.strftime('%Hh %Mm %Ss', elapsed_time)}") if with_plot: fb_.PlotSomasWithExtensions(somas, som_nfo, "with_ext_of_ext") # -- Summary print("\n") for soma in somas: print(soma) # for extension in extensions: # print(extension) # snapshot = tr_.take_snapshot() # top_file_stats = snapshot.statistics('lineno') # print("Memory Profiling: Top 10 FILES") # for stat in top_file_stats[:10]: # print(stat) # top_block_stats = snapshot.statistics('traceback') # top_block_stats = top_block_stats[0] # print(f"Memory Profiling: {top_block_stats.count} memory blocks: {top_block_stats.size / 1024:.1f} KiB") # for line in top_block_stats.traceback.format(): # print(line) elapsed_time = tm_.gmtime(tm_.time() - start_time) print(f"\nElapsed Time={tm_.strftime('%Hh %Mm %Ss', elapsed_time)}") print(f"DONE: {tm_.strftime('%a, %b %d %Y @ %H:%M:%S')}") if with_plot: pl_.show() po_.MaximumIntensityProjectionZ(som_nfo['soma_w_ext_lmp']) # # Skeletonize the somas to extract the graph of the glial cells # space_dim = 3 # array_shape = (80, 80, 50) # threshold_fct = lambda img: 0.75 * image.max() # struct_elm_shape = (3, 3, 3) # # soma_w_ext_map = som_nfo['soma_w_ext_lmp'] # skeleton = skl_map_t.FromShapeMap(soma_w_ext_map, store_widths=True) # skel_graph = skl_graph_t.FromSkeleton(skeleton) # # # --- Some info about the skeleton graph # print( # f"Obj map area={np_.count_nonzero(soma_w_ext_map)}\n\n" # f"Validity={skel_graph.is_valid}\n\n" # f"N nodes={skel_graph.n_nodes}\n" # f"N edges={skel_graph.n_edges}\n" # f"Highest degree={skel_graph.highest_degree}/{skel_graph.highest_degree_w_nodes}\n\n" # f"Length={skel_graph.length}<-{skel_graph.edge_lengths}\n" # f"Width=Hom.{skel_graph.reduced_width()}/Het.{skel_graph.heterogeneous_reduced_width()}<-{skel_graph.edge_reduced_widths()}\n" # f"Area as WxL={skel_graph.reduced_width() * skel_graph.length}\n" # f"Area as WW Length={skel_graph.ww_length}<-{skel_graph.edge_ww_lengths}\n") # # # --- Checking correctness of skeleton graph via various rebuilt maps # rebuilt_skel = skel_graph.RebuiltSkeletonMap() # part_map_for_check = skeleton.PartMap() # # Keep next line before its next one # part_map_for_check[part_map_for_check == skeleton.invalid_n_neighbors] = 0 # part_map_for_check[part_map_for_check > 3] = 3 # # print( # f"Rebuilt skeleton = Original one? Part map? " # f"{np_.array_equal(rebuilt_skel > 0, skeleton.map)} " # f"{np_.array_equal(rebuilt_skel, part_map_for_check)}") # # if space_dim == 3: # rebuilt_skel = rebuilt_skel.max(axis=2) # ## --- Checking correctness of skeleton graph via rebuilt object map # soma_w_ext_map = soma_w_ext_map.astype(np_.uint8) # rebuilt_map = skel_graph.RebuiltObjectMap() # # if space_dim == 3: # soma_w_ext_map = soma_w_ext_map.max(axis=2) # rebuilt_map = rebuilt_map.max(axis=2) # # # --- Show various images and plot the skeleton graph # ___, axes = pl_.subplots(1, 3, figsize=(15,15)) # axes[0].matshow(soma_w_ext_map) # axes[1].matshow(rebuilt_map) # axes[2].matshow(soma_w_ext_map - rebuilt_map) # axes[0].set_title("Object") # axes[1].set_title("Rebuilt") # axes[2].set_title("Object - Rebuilt") # for axis in axes: # axis.set_axis_off() # pl_.tight_layout(pad=0.3) # # ___, axes = pl_.subplots(1, 3) # axes[0].matshow(rebuilt_skel + soma_w_ext_map) # axes[1].matshow(rebuilt_skel + rebuilt_map) # axes[2].matshow(rebuilt_skel) # axes[0].set_title("Rebuilt Skel Over Obj") # axes[1].set_title("Rebuilt Skel Over Rebuilt Obj") # axes[2].set_title("Rebuilt Skel") # for axis in axes: # axis.set_axis_off() # pl_.tight_layout(pad=0.3) # # if space_dim == 2: # skel_graph.Plot(mode=plot_mode_e.Networkx, should_block=False) # skel_graph.Plot(mode=plot_mode_e.Graphviz, should_block=False) # # skel_graph.Plot(mode=plot_mode_e.SKL_Curve, w_directions=True, should_block=False) # skel_graph.Plot(should_block=False) # # pl_.show()