# Copyright CNRS/Inria/UNS # Contributor(s): Eric Debreuve (since 2019), Morgane Nadal (2020) # # eric.debreuve@cnrs.fr # # This software is governed by the CeCILL license under French law and # abiding by the rules of distribution of free software. You can use, # modify and/ or redistribute the software under the terms of the CeCILL # license as circulated by CEA, CNRS and INRIA at the following URL # "http://www.cecill.info". # # As a counterpart to the access to the source code and rights to copy, # modify and redistribute granted by the license, users are provided only # with a limited warranty and the software's author, the holder of the # economic rights, and the successive licensors have only limited # liability. # # In this respect, the user's attention is drawn to the risks associated # with loading, using, modifying and/or developing or reproducing the # software by the user in light of its specific status of free software, # that may mean that it is complicated to manipulate, and that also # therefore means that it is reserved for developers and experienced # professionals having in-depth computer knowledge. Users are therefore # encouraged to load and test the software's suitability as regards their # requirements in conditions enabling the security of their systems and/or # data to be ensured and, more generally, to use and operate it in the # same conditions as regards security. # # The fact that you are presently reading this means that you have had # knowledge of the CeCILL license and that you accept its terms. # Time profiling: # python -m cProfile -o runtime/profiling.log -s name main.py # Memory profiling: # python -m memory_profiler main.py # or # mprof run main.py # mprof plot # import os as os_ import sys as sstm from datetime import datetime from pathlib import Path as path_t from typing import Any, Sequence import better_exceptions as xcpt # import imageio as io_ import matplotlib.pyplot as pl_ # import networkx as nx_ import numpy as nmpy import pandas as pd_ import skimage.draw as sidw import skimage.exposure as ex_ import skimage.measure as ms_ import skimage.morphology as mp_ from logger_36 import LOGGER from skl_graph import SKLMapFromObjectMap from skl_graph.task.check.graph import Issues as GraphIssues from skl_graph.task.plot.graph import Plot as PlotSKLGraph from skl_graph.type.constant import SKL_GRAPH from skl_graph.type.edge import edge_t import brick.feature.cell as ge_ import brick.io.console.feedback import brick.io.screen.feedback as fb_ import brick.io.screen.plot as po_ import brick.io.screen.type.soma_validation as smvl import brick.task.check as in_ import brick.task.dijkstra as dk_ import brick.task.img_processing import brick.type.connection as cn_ from brick.io.storage.parameters import NutrimorphArguments from brick.task.img_processing import IntensityNormalizedImage from brick.task.unit import FindVoxelDimensionInMicron, ToPixel from brick.type.extension import extension_t from brick.type.graph import skl_graph_t from brick.type.soma import soma_t from im_tools_36.input import ImageVolumeOrSequence xcpt.hook() array_t = nmpy.ndarray _CSV_FEATURE_FILE = "features.csv" def NutriMorph( connection_cfg: dict[str, Any], extension_cfg: dict[str, Any], feature_cfg: dict[str, Any], frangi_cfg: dict[str, Any], input_cfg: dict[str, Any], output_cfg: dict[str, Any], soma_cfg: dict[str, Any], workflow_cfg: dict[str, Any], /, ) -> pd_.DataFrame: """""" LOGGER.info("STARTED") # --- Images LOGGER.info(f"Image {input_cfg['data_path']}") image = ImageVolumeOrSequence(input_cfg["data_path"]) # io_.volread for imageio # [:, 1400:, 1200:] # [:, 800:, 300:] # [:, 600:, 500:] # [:, 800:, 500:] image = image[:, 800:, 500:] img_shape = image.shape # TODO: Do something more general; Here, too specific to one microscope metadata. input_cfg["voxel_size_in_micron"] = FindVoxelDimensionInMicron( input_cfg["data_path"], voxel_size_in_micron=input_cfg["voxel_size_in_micron"] ) image = in_.ImageVerification(image, input_cfg["channel"]) # iv_.image_verification(image, channel) # -> PySide2 user interface # TODO: must return the modified image! # /!\ conflicts between some versions of PySide2 and Python3 img_shape_as_str = str(img_shape)[1:-1].replace(", ", "x") LOGGER.info( f"IMAGE: s.{img_shape_as_str} t.{image.dtype} m.{nmpy.amin(image)} M.{nmpy.amax(image)}" ) image_for_soma = IntensityNormalizedImage(image) image_for_ext = image.copy() LOGGER.info( f"NRM-IMG: t.{image_for_soma.dtype} m.{nmpy.amin(image_for_soma):.2f} M.{nmpy.amax(image_for_soma):.2f}" ) # --- Initialization som_nfo = {} ext_nfo = {} axes = {} # --- Somas LOGGER.info("SOMA DETECTION") soma_cfg["soma_min_area"] = ToPixel( soma_cfg["soma_min_area"], input_cfg["voxel_size_in_micron"], dimension=(0, 1, 2), ) soma_cfg["soma_max_area"] = ToPixel( soma_cfg["soma_max_area"], input_cfg["voxel_size_in_micron"], dimension=(0, 1, 2), ) soma_selem_c = mp_.disk( ToPixel(soma_cfg["soma_selem_micron"], input_cfg["voxel_size_in_micron"]) ) LOGGER.info("Soma maps...") som_nfo["map"], som_nfo["lmp"], som_nfo["map_w_large"], n_somas = soma_t.Maps( image_for_soma, soma_cfg["soma_low"], soma_cfg["soma_high"], soma_selem_c, soma_cfg["soma_min_area"], soma_cfg["soma_max_area"], ) if workflow_cfg["interactive"]: window = smvl.soma_validation_window_t( image_for_soma, som_nfo["lmp"], mip_axis=0, ) som_nfo["lmp"], n_somas = window.LaunchValidation() # TODO: som_nfo["map"] and som_nfo["map_w_large"] become out-of-sync. Is it a problem? if n_somas == 0: if workflow_cfg["interactive"]: LOGGER.info(f"Validated somas: {n_somas}") else: LOGGER.info(f"Detected somas: {n_somas}") sstm.exit(0) LOGGER.info("Distance and influence map creation...") # For soma<->extension and extension<->extension som_nfo["dist_to_closest"], som_nfo["influence_map"] = soma_t.InfluenceMaps( som_nfo["lmp"] ) LOGGER.info(f"Somas creation ({n_somas})...") somas = tuple( soma_t.NewFromMap(som_nfo["lmp"], uid) for uid in range(1, n_somas + 1) ) LOGGER.info(f"END OF SOMA DETECTION STEP. Number of somas = {n_somas}") if workflow_cfg["with_plot"]: fb_.PlotSomas(somas, som_nfo, axes) # --- Extensions LOGGER.info("EXTENSION DETECTION") extension_cfg["ext_min_area"] = ToPixel( extension_cfg["ext_min_area"], input_cfg["voxel_size_in_micron"], dimension=(0, 1, 2), ) ext_selem_pixel_c = mp_.disk( ToPixel(extension_cfg["ext_selem_micron"], input_cfg["voxel_size_in_micron"]) ) # Use som_nfo["map_w_large"] instead of som_nfo["map"] to take into account the aggregation of cells detected as one # big soma to avoid extension creation there image_for_ext[som_nfo["map_w_large"]] = 0.0 LOGGER.info("Frangi Enhancement...") enhanced_ext, ext_scales = extension_t.EnhancedForDetection( image_for_ext, frangi_cfg["scale_range"], frangi_cfg["scale_step"], frangi_cfg["alpha"], frangi_cfg["beta"], frangi_cfg["frangi_c"], frangi_cfg["bright_on_dark"], frangi_cfg["method"], frangi_cfg["diff_mode"], in_parallel=workflow_cfg["in_parallel"], ) LOGGER.info("Enhancing contrast...") if extension_cfg["adapt_hist_equalization"]: enhanced_ext = ex_.equalize_adapthist(enhanced_ext) else: enhanced_ext = ex_.equalize_hist(enhanced_ext) enhanced_ext = ex_.rescale_intensity(enhanced_ext, out_range=(0, 255)) LOGGER.info("Coarse extension map...") ext_nfo["coarse_map"] = extension_t.CoarseMap( enhanced_ext, extension_cfg["ext_low"], extension_cfg["ext_high"], ext_selem_pixel_c, extension_cfg["ext_min_area"], ) del enhanced_ext _, n_coarse_extensions = ms_.label(ext_nfo["coarse_map"], return_num=True) LOGGER.info(f"Coarse extensions: {n_coarse_extensions}") LOGGER.info("Skeletonization...") ext_nfo["map"] = extension_t.FineMapFromCoarseMap( ext_nfo["coarse_map"], soma=som_nfo["map"] ) LOGGER.info("Map Labelling...") ext_nfo["lmp"], n_extensions = ms_.label(ext_nfo["map"], return_num=True) LOGGER.info(f"Extension Creation ({n_extensions})...") extensions = tuple( extension_t.NewFromMap(ext_nfo["lmp"], ext_scales, uid) for uid in range(1, n_extensions + 1) ) LOGGER.info(f"Number of extensions = {n_extensions}") if n_extensions == 0: sstm.exit(0) LOGGER.info("Global end-point map for extensions...") glob_ext_ep_map = extension_t.EndPointMap(ext_nfo["map"]) all_end_points = list(zip(*glob_ext_ep_map.nonzero())) LOGGER.info("END OF EXTENSION DETECTION STEP") if workflow_cfg["with_plot"]: fb_.PlotExtensions(extensions, ext_nfo, img_shape) # -- Preparation for Connections LOGGER.info("DIJKSTRA COSTS") # Calculate the cost of each pixel and dilate the existing extensions to avoid tangency of connections to extensions dijkstra_costs = dk_.DijkstraCosts(image, som_nfo["map"], ext_nfo["map"]) if connection_cfg["dilatation_erosion"]: for extension in extensions: cn_.Dilate(extension.sites, dijkstra_costs) # -- Soma-Extention LOGGER.info("SOMA <-> EXTENSION") connection_cfg["max_straight_sq_dist"] = ToPixel( connection_cfg["max_straight_sq_dist"], input_cfg["voxel_size_in_micron"] ) connection_cfg["max_weighted_length"] = ToPixel( connection_cfg["max_weighted_length"], input_cfg["voxel_size_in_micron"] ) LOGGER.info("Look for candidate connections...") candidate_conn_nfo = cn_.CandidateConnections( somas, som_nfo["influence_map"], som_nfo["dist_to_closest"], extensions, connection_cfg["max_straight_sq_dist"], ) # For each candidate, verify that it is valid based on the shortest path and the maximum allowed straight distance for ep_idx, soma, extension, end_point in candidate_conn_nfo: if extension.is_unconnected: LOGGER.info(f" Soma.{soma.uid} <-?-> Ext.{extension.uid}({ep_idx})") if connection_cfg["dilatation_erosion"]: # Erode the end_point of the extension in the costs map cn_.Erode( (end_point,), dijkstra_costs, extension, extensions, image, all_end_points=all_end_points, ) path, length = cn_.ShortestPathFromToN( image=image, point=end_point, extension=extension, costs=dijkstra_costs, candidate_points_fct=soma.ContourPointsCloseTo, max_straight_sq_dist=connection_cfg["max_straight_sq_dist"], ) if path.__len__() > 0: if length <= connection_cfg["max_weighted_length"]: cn_.ValidateConnection( soma, extension, end_point, path, dijkstra_costs, all_ep=all_end_points, ) if connection_cfg["dilatation_erosion"]: cn_.Dilate(extension.sites, dijkstra_costs) LOGGER.info(" : Made") else: if connection_cfg["dilatation_erosion"]: cn_.Dilate(extension.sites, dijkstra_costs) LOGGER.info( f" : Cancelled: Too long: {length} > {connection_cfg['max_weighted_length_c']}" ) else: if connection_cfg["dilatation_erosion"]: cn_.Dilate(extension.sites, dijkstra_costs) LOGGER.info(" : Cancelled: No found path") brick.io.console.feedback.PrintConnectedExtensions(extensions) LOGGER.info("END OF SOMA <-> EXTENSION STEP") if workflow_cfg["with_plot"]: fb_.PlotSomasWithExtensions(somas, som_nfo, "all") # -- Extention-Extention LOGGER.info("EXTENSION <-> EXTENSION") should_look_for_connections = True while should_look_for_connections: if connection_cfg["dilatation_erosion"]: for soma in somas: # Update the costs by dilating everything again plus the contour points of the soma cn_.Dilate(soma.contour_points, dijkstra_costs) # Below probably not necessary but security for extension in soma.extensions: cn_.Dilate(extension.sites, dijkstra_costs) som_nfo["soma_w_ext_lmp"] = soma_t.SomasLMap(somas) som_nfo["dist_to_closest"], som_nfo["influence_map"] = soma_t.InfluenceMaps( som_nfo["soma_w_ext_lmp"] ) candidate_conn_nfo = cn_.CandidateConnections( somas, som_nfo["influence_map"], som_nfo["dist_to_closest"], extensions, connection_cfg["max_straight_sq_dist"], ) should_look_for_connections = False for ep_idx, soma, extension, end_point in candidate_conn_nfo: if extension.is_unconnected: LOGGER.info(f" Soma.{soma.uid} <-?-> Ext.{extension.uid}({ep_idx})") if connection_cfg["dilatation_erosion"]: # Erode the end_point of the extension in the costs map cn_.Erode( (end_point,), dijkstra_costs, extension, extensions, image, all_end_points=all_end_points, ) path, length = cn_.ShortestPathFromToN( image=image, point=end_point, extension=extension, costs=dijkstra_costs, candidate_points_fct=soma.ExtensionPointsCloseTo, extensions=extensions, all_end_points=all_end_points, max_straight_sq_dist=connection_cfg["max_straight_sq_dist"], erode_path=connection_cfg["dilatation_erosion"], ) if path.__len__() > 0: if length <= connection_cfg["max_weighted_length"]: # Verify that the last element of the connexion path is contained in the extension to be # connected. If so, returns the extensions containing the path last site. tgt_extenstion = extension_t.ExtensionContainingSite( extensions, path[-1] ) cn_.ValidateConnection( tgt_extenstion, extension, end_point, path, dijkstra_costs, all_ep=all_end_points, ) if connection_cfg["dilatation_erosion"]: cn_.Dilate(extension.sites, dijkstra_costs) should_look_for_connections = True LOGGER.info(" : Made") else: if connection_cfg["dilatation_erosion"]: cn_.Dilate(extension.sites, dijkstra_costs) LOGGER.info( f" : Cancelled: Too long: {length} > {connection_cfg['max_weighted_length_c']}" ) else: if connection_cfg["dilatation_erosion"]: cn_.Dilate(extension.sites, dijkstra_costs) LOGGER.info(" : Cancelled: No found path") # -- Create an extension map containing all the ext + connexions, without the somas. Used for the graphs extraction. ext_nfo["lmp_soma"] = som_nfo["soma_w_ext_lmp"].copy() ext_nfo["lmp_soma"][som_nfo["map"] > 0] = 0 brick.io.console.feedback.PrintConnectedExtensions(extensions) LOGGER.info("END OF EXTENSION <-> EXTENSION STEP") if workflow_cfg["with_plot"]: fb_.PlotSomasWithExtensions(somas, som_nfo, "with_ext_of_ext") # -- Summary for info, name in ((som_nfo, "som_nfo"), (ext_nfo, "ext_nfo")): for key, value in info.items(): if isinstance(value, array_t): LOGGER.info( f"{name}[{key}]: {value.dtype.name}x{value.shape}={value.nbytes}B" ) LOGGER.info("\n") for soma in somas: LOGGER.info(soma) LOGGER.info("END OF IMAGE PROCESSING STEPS") if workflow_cfg["with_plot"]: pl_.show() if output_cfg["save_images"] is not None: po_.MaximumIntensityProjectionZ( som_nfo["soma_w_ext_lmp"], block=False, output_image_file_name=str( output_cfg["save_images"] / f"MIP_{input_cfg['data_path'].stem}.png" ), ) # --- Extract all the extensions of all somas as a graph LOGGER.info("GRAPH EXTRACTION") for soma in somas: LOGGER.info(f"Soma {soma.uid}") ext_map, width_map = SKLMapFromObjectMap( ext_nfo["lmp_soma"] == soma.uid, with_width=True ) soma.skl_graph = skl_graph_t.NewFromSKLMap(ext_map, width_map=width_map) # --- Find the root of the {ext+conn} graphs # Roots are the nodes of degree 1 that are to be linked to the soma soma.graph_roots = ge_.FindGraphsRootWithNodes(soma) # TODO: the update of this code to link soma nodes to extension is a WIP # Add a node "soma" and link it to the root nodes. /!\ Put the coordinates # inside an iterable (e.g., a tuple), otherwise nmpy.array creates a # 0-dimensional array, which is invalid for branch node sites. soma_uid = soma.skl_graph.AddBranchNode( tuple(nmpy.array((int(round(_elm)),)) for _elm in soma.centroid), width_map=width_map, ) soma.skl_graph.nodes[soma_uid]["soma"] = True soma.skl_graph.nodes[soma_uid]["soma_nfo"] = soma for node in soma.graph_roots.values(): sites = sidw.line_nd( soma.skl_graph.nodes[soma_uid][SKL_GRAPH].position, soma.skl_graph.nodes[node][SKL_GRAPH].position, endpoint=True, ) adjacent_node_uids = (soma_uid, node) edge = edge_t.NewWithDetails( sites, True, adjacent_node_uids, width_map=width_map, ) soma.skl_graph.AddEdge(edge, adjacent_node_uids) # The ", edge.uid" at the end of indexing is due to the Multigraph nature of soma soma.skl_graph.edges[ adjacent_node_uids[0], adjacent_node_uids[1], edge.uid ]["root"] = True issues = GraphIssues(soma.skl_graph) if issues.__len__() > 0: LOGGER.info(f"Soma {soma.uid} - Graph Issues") LOGGER.info("\n".join(issues)) if output_cfg["save_images"] is not None: output_path = str( output_cfg["save_images"] / f"graph_{input_cfg['data_path'].stem}_soma{soma.uid}.png" ) from json_any.task.storage import StoreAsJSON from skl_graph.task.storage import DescriptionForJSON json_path = path_t(output_path.replace(".", "-")).with_suffix(".json.bz2") print(f">>> Saving Graph in {json_path}") history = StoreAsJSON( soma.skl_graph, json_path, descriptors={"": DescriptionForJSON}, should_continue_on_error=True, ) if isinstance(history, Sequence): LOGGER.info("\n".join(history)) print("<<< Saving Done") soma.skl_graph.SetPlotStyles(label=False) figure, axes = PlotSKLGraph( soma.skl_graph, should_block=False, should_return_axes=True, should_return_figure=True, ) axes.set_title(f"{soma.uid} - {soma_uid}") figure.savefig(output_path) if workflow_cfg["with_plot"]: pl_.show() else: pl_.close() LOGGER.info("END OF GRAPH EXTRACTION STEP") # --- Extract features LOGGER.info("FEATURE EXTRACTION") if feature_cfg["hist_bins_borders_length"] is None: bins_length = nmpy.linspace( feature_cfg["hist_min_length"], feature_cfg["hist_min_length"] + feature_cfg["hist_step_length"] * feature_cfg["number_of_bins_length"], num=feature_cfg["number_of_bins_length"], ) bins_length[-1] = nmpy.inf else: bins_length = nmpy.array(feature_cfg["hist_bins_borders_length"]) bins_length[-1] = nmpy.inf if feature_cfg["hist_bins_borders_curvature"] is None: bins_curvature = nmpy.linspace( feature_cfg["hist_min_curvature"], feature_cfg["hist_min_curvature"] + feature_cfg["hist_step_curvature"] * feature_cfg["number_of_bins_curvature"], num=feature_cfg["number_of_bins_curvature"], ) bins_curvature[-1] = nmpy.inf else: bins_curvature = nmpy.array(feature_cfg["hist_bins_borders_curvature"]) bins_curvature[-1] = nmpy.inf features_df = ge_.ExtractFeaturesInDF( input_cfg["data_path"].stem, input_cfg["condition"], input_cfg["duration"], somas, input_cfg["voxel_size_in_micron"], bins_length, bins_curvature, ext_scales, ) if output_cfg["save_csv"] is None: features_df.to_csv( str(input_cfg["data_path"].parent / f"{input_cfg['data_path'].stem}.csv") ) else: features_df.to_csv( str(output_cfg["save_csv"] / f"{input_cfg['data_path'].stem}.csv") ) LOGGER.info("END OF FEATURE EXTRACTION STEP") return features_df def Main(): """""" if sstm.argv.__len__() < 2: print("Missing parameter file argument") sstm.exit(0) argument = path_t(sstm.argv[1]) if not (argument.is_file() and (argument.suffix.lower() == ".ini")): print("Wrong parameter file path or type, or parameter file unreadable") sstm.exit(0) config = NutrimorphArguments(argument) ( connection_cfg, extension_cfg, feature_cfg, frangi_cfg, input_cfg, output_cfg, soma_cfg, workflow_cfg, ) = config base_folder = ( output_cfg["save_images"] / input_cfg["data_path"].stem / str(datetime.now()).replace(" ", "_").replace(":", "-").replace(".", "-") ) output_cfg["save_images"] = base_folder / "image" output_cfg["save_csv"] = base_folder / "features" output_cfg["save_features_analysis"] = base_folder / "analysis" for path in ( output_cfg["save_images"], output_cfg["save_csv"], output_cfg["save_features_analysis"], ): path.mkdir(parents=True, exist_ok=True) if input_cfg["data_path"].is_file(): LOGGER.info( "WARNING: Will not perform features analysis on a single image.\n" "For features analysis, give a directory path" ) _ = NutriMorph( connection_cfg, extension_cfg, feature_cfg, frangi_cfg, input_cfg, output_cfg, soma_cfg, workflow_cfg, ) elif input_cfg["data_path"].is_dir(): paths = sorted(input_cfg["data_path"].glob("**/*.tif")) concatenated_features_df = pd_.DataFrame() for path in paths: if path.is_file(): input_cfg["data_path"] = path features_df = NutriMorph( connection_cfg.copy(), extension_cfg.copy(), feature_cfg.copy(), frangi_cfg.copy(), input_cfg.copy(), output_cfg.copy(), soma_cfg.copy(), workflow_cfg.copy(), ) concatenated_features_df = pd_.concat( (concatenated_features_df, features_df) ) # If some rows (i.e. somas) have NaN, drop them (due to best fitting ellipsoid algo (JTJ is singular due # to convex hull being flat)). concatenated_features_df.dropna(inplace=True) LOGGER.info(f"Features: New={features_df.shape}, So far={concatenated_features_df.shape}.") concatenated_features_df.to_csv(output_cfg["save_csv"] / _CSV_FEATURE_FILE) # Clustering with this df and module features_analysis.py if output_cfg["statistical_analysis"]: from brick.feature.analysis import Main as Analysis path_1 = output_cfg["save_csv"] path_2 = output_cfg["save_features_analysis"] Analysis(str(path_1 / _CSV_FEATURE_FILE), path_2) # os_.system( # str(path_t(__file__).parent / "brick" / "feature" / "analysis.py") # + " " # + str(path_1 / _CSV_FEATURE_FILE) # + f" {path_2}" # ) else: path = input_cfg["data_path"] raise ImportError(f"{path}: Not a valid data path!") if __name__ == "__main__": # Main()