-
DEBREUVE Eric authoredDEBREUVE Eric authored
nutrimorph.py 22.71 KiB
# Copyright CNRS/Inria/UNS
# Contributor(s): Eric Debreuve (since 2019), Morgane Nadal (2020)
#
# eric.debreuve@cnrs.fr
#
# This software is governed by the CeCILL license under French law and
# abiding by the rules of distribution of free software. You can use,
# modify and/ or redistribute the software under the terms of the CeCILL
# license as circulated by CEA, CNRS and INRIA at the following URL
# "http://www.cecill.info".
#
# As a counterpart to the access to the source code and rights to copy,
# modify and redistribute granted by the license, users are provided only
# with a limited warranty and the software's author, the holder of the
# economic rights, and the successive licensors have only limited
# liability.
#
# In this respect, the user's attention is drawn to the risks associated
# with loading, using, modifying and/or developing or reproducing the
# software by the user in light of its specific status of free software,
# that may mean that it is complicated to manipulate, and that also
# therefore means that it is reserved for developers and experienced
# professionals having in-depth computer knowledge. Users are therefore
# encouraged to load and test the software's suitability as regards their
# requirements in conditions enabling the security of their systems and/or
# data to be ensured and, more generally, to use and operate it in the
# same conditions as regards security.
#
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL license and that you accept its terms.
# Time profiling:
# python -m cProfile -o runtime/profiling.log -s name main.py
# Memory profiling:
# python -m memory_profiler main.py
# or
# mprof run main.py
# mprof plot
import os as os_
import sys as sy_
from pathlib import Path as path_t
import better_exceptions as xcpt
import imageio as io_
import matplotlib.pyplot as pl_
import networkx as nx_
import numpy as nmpy
import pandas as pd_
import skimage.draw as sidw
import skimage.exposure as ex_
import skimage.measure as ms_
import skimage.morphology as mp_
from logger_36 import LOGGER, AddFileHandler, SetShowMemoryUsage
from logger_36.main import TimeStamp
from skl_graph.type.edge import edge_t
from skl_graph import SKLMapFromObjectMap
from brick.general.graph import skl_graph_t
import brick.component.connection as cn_
import brick.component.extension as xt_
import brick.component.soma as sm_
import brick.feature.graph as ge_
import brick.image.input as in_
import brick.image.unit
import brick.io.console.feedback
import brick.io.screen.feedback as fb_
import brick.io.screen.plot as po_
import brick.io.screen.soma_validation as smvl
import brick.processing.dijkstra as dk_
from brick.io.storage.parameters import NutrimorphArguments
xcpt.hook()
def NutriMorph(
data_path: str | path_t,
save_images,
save_csv,
channel=None,
dilatation_erosion=None,
size_voxel_in_micron=None,
soma_low_c=None,
soma_high_c=None,
soma_selem_micron_c=None,
soma_min_area_c=None,
soma_max_area_c=None,
adapt_hist_equalization=None,
ext_low_c=None,
ext_high_c=None,
ext_selem_micron_c=None,
ext_min_area_c=None,
max_straight_sq_dist_c=None,
max_weighted_length_c=None,
scale_range=None,
scale_step=None,
alpha=None,
beta=None,
frangi_c=None,
diff_mode=None,
bright_on_dark=None,
method=None,
hist_min_length=None,
hist_step_length=None,
number_of_bins_length=None,
hist_bins_borders_length=None,
hist_min_curvature=None,
hist_step_curvature=None,
number_of_bins_curvature=None,
hist_bins_borders_curvature=None,
with_plot=None,
interactive=None,
in_parallel=None,
) -> pd_.DataFrame():
""""""
soma_t = sm_.soma_t
extension_t = xt_.extension_t
LOGGER.info("STARTED")
# TODO add proper warning with warn module
# --- Images
if isinstance(data_path, str):
data_path = path_t(data_path)
LOGGER.info(f"Image {data_path.name} in {data_path.parent}")
image = io_.volread(data_path)
image = in_.ImageVerification(image, channel)
# iv_.image_verification(image, channel) # -> PySide2 user interface # TODO: must return the modified image!
# /!\ conflicts between some versions of PySide2 and Python3
image = image[:, 800:, 500:] # [:, 800:, 300:] # [:, 600:, 500:]
img_shape = image.shape
img_shape_as_str = str(img_shape)[1:-1].replace(", ", "x")
LOGGER.info(
f"IMAGE: s.{img_shape_as_str} t.{image.dtype} m.{nmpy.amin(image)} M.{nmpy.amax(image)}"
)
image_for_soma = in_.IntensityNormalizedImage(image)
image_for_ext = image.copy()
LOGGER.info(
f"NRM-IMG: t.{image_for_soma.dtype} m.{nmpy.amin(image_for_soma):.2f} M.{nmpy.amax(image_for_soma):.2f}"
)
# --- Initialization
som_nfo = {}
ext_nfo = {}
axes = {}
# TODO do something more general, here too specific to one microscope metadata
size_voxel_in_micron = brick.image.unit.FindVoxelDimensionInMicron(
str(data_path), size_voxel_in_micron=size_voxel_in_micron
)
# --- Somas
LOGGER.info("SOMA DETECTION")
soma_min_area_c = brick.image.unit.ToPixel(
soma_min_area_c, size_voxel_in_micron, dimension=(0, 1, 2)
)
soma_max_area_c = brick.image.unit.ToPixel(
soma_max_area_c, size_voxel_in_micron, dimension=(0, 1, 2)
)
soma_selem_c = mp_.disk(
brick.image.unit.ToPixel(soma_selem_micron_c, size_voxel_in_micron)
)
LOGGER.info("Hysteresis thresholding...")
som_nfo["map_small"] = soma_t.Map(
image_for_soma, soma_low_c, soma_high_c, soma_selem_c
)
LOGGER.info("Morphological cleaning...")
som_nfo["map_small"], som_lmp_small = soma_t.DeleteSmallSomas(
som_nfo["map_small"], soma_min_area_c
)
LOGGER.info("Large somas deletion...")
# Deletion of too big elements: probably aggregation of cells
# Separated from the deletion of small elements to avoid extensions to be detected where too big somas were just deleted
som_nfo["map"], som_lmp = soma_t.DeleteBigSomas(
som_nfo["map_small"], som_lmp_small, soma_max_area_c
)
LOGGER.info("Somas labelling...")
som_nfo["lmp"], n_somas = ms_.label(som_nfo["map"], return_num=True)
# Use relabel instead of label to optimize the algorithm. /!\ But BUG.
# som_nfo["lmp"] = relabel_sequential(som_lmp)[0]
# n_somas = som_nfo["lmp"].max()
LOGGER.info("Distance and influence map creation...")
# For soma<->extension and extension<->extension
som_nfo["dist_to_closest"], som_nfo["influence_map"] = soma_t.InfluenceMaps(
som_nfo["lmp"]
)
LOGGER.info("Somas creation...")
somas = tuple(
soma_t().FromMap(som_nfo["lmp"], uid) for uid in range(1, n_somas + 1)
)
LOGGER.info(f"END OF SOMA DETECTION STEP. Number of somas = {n_somas}")
if with_plot:
fb_.PlotSomas(somas, som_nfo, axes)
elif interactive:
window = smvl.soma_validation_window_t(
image_for_soma,
som_nfo["lmp"],
mip_axis=0,
)
n_somas = window.LaunchValidation()
LOGGER.info(f"Validated Somas: {n_somas}")
# --- Extensions
LOGGER.info("EXTENSION DETECTION")
ext_min_area_c = brick.image.unit.ToPixel(
ext_min_area_c, size_voxel_in_micron, dimension=(0, 1, 2)
)
if ext_selem_micron_c == 0:
ext_selem_pixel_c = None
else:
ext_selem_pixel_c = mp_.disk(
brick.image.unit.ToPixel(ext_selem_micron_c, size_voxel_in_micron)
)
# TODO: is the following comment a TODO?
# Take into account the aggregation of cells detected as one big soma to avoid extension creation there
image_for_ext[som_nfo["map_small"] > 0] = 0
LOGGER.info("Frangi Enhancement...")
enhanced_ext, ext_scales = extension_t.EnhancedForDetection(
image_for_ext,
scale_range,
scale_step,
alpha,
beta,
frangi_c,
bright_on_dark,
method,
diff_mode,
in_parallel=in_parallel,
)
LOGGER.info("Enhancing contrast...")
if adapt_hist_equalization:
enhanced_ext = ex_.equalize_adapthist(enhanced_ext)
else:
enhanced_ext = ex_.equalize_hist(enhanced_ext)
enhanced_ext = ex_.rescale_intensity(enhanced_ext, out_range=(0, 255))
LOGGER.info("Hysteresis Thresholding...")
ext_nfo["coarse_map"] = extension_t.CoarseMap(
enhanced_ext, ext_low_c, ext_high_c, ext_selem_pixel_c
)
LOGGER.info("Morphological cleaning...") # Deletion of too small elements
ext_nfo["coarse_map"], ext_lmp = extension_t.FilteredCoarseMap(
ext_nfo["coarse_map"], ext_min_area_c
)
LOGGER.info("Skeletonization...")
ext_nfo["map"] = extension_t.FineMapFromCoarseMap(ext_nfo["coarse_map"])
ext_nfo["map"][som_nfo["map"] > 0] = 0
LOGGER.info("Map Labelling...")
ext_nfo["lmp"], n_extensions = ms_.label(ext_nfo["map"], return_num=True)
# Use relabel instead of label to optimize the algorithm. BUT PROBLEM WITH THE NUMBER OF EXTENSIONS DETECTED !
# ext_nfo["lmp"] = relabel_sequential(ext_lmp)[0]
# n_extensions = ext_nfo["lmp"].max()
LOGGER.info("Extension Creation...")
extensions = tuple(
extension_t().FromMap(ext_nfo["lmp"], ext_scales, uid)
for uid in range(1, n_extensions + 1)
)
LOGGER.info(f"Number of extensions = {n_extensions}")
LOGGER.info("Global end-point map for extensions...")
glob_ext_ep_map = xt_.EndPointMap(ext_nfo["lmp"] > 0)
all_end_points = list(zip(*glob_ext_ep_map.nonzero()))
LOGGER.info("END OF EXTENSION DETECTION STEP")
if with_plot:
fb_.PlotExtensions(extensions, ext_nfo, img_shape)
# -- Preparation for Connections
LOGGER.info("DIJKSTRA COSTS")
# Calculate the cost of each pixel and dilate the existing extensions to avoid tangency of connections to extensions
dijkstra_costs = dk_.DijkstraCosts(image, som_nfo["map"], ext_nfo["map"])
if dilatation_erosion:
for extension in extensions:
cn_.Dilate(extension.sites, dijkstra_costs)
# -- Soma-Extention
LOGGER.info("SOMA <-> EXTENSION")
max_straight_sq_dist_c = brick.image.unit.ToPixel(
max_straight_sq_dist_c, size_voxel_in_micron
)
max_weighted_length_c = brick.image.unit.ToPixel(
max_weighted_length_c, size_voxel_in_micron
)
LOGGER.info("Look for candidate connections...")
candidate_conn_nfo = cn_.CandidateConnections(
somas,
som_nfo["influence_map"],
som_nfo["dist_to_closest"],
extensions,
max_straight_sq_dist_c,
)
# For each candidate, verify that it is valid based on the shortest path and the maximum allowed straight distance
for ep_idx, soma, extension, end_point in candidate_conn_nfo:
if extension.is_unconnected:
LOGGER.info(f" Soma.{soma.uid} <-?-> Ext.{extension.uid}({ep_idx})")
if dilatation_erosion:
# Erode the end_point of the extension in the costs map
cn_.Erode(
(end_point,),
dijkstra_costs,
extension,
extensions,
image,
all_end_points=all_end_points,
)
path, length = cn_.ShortestPathFromToN(
image=image,
point=end_point,
extension=extension,
costs=dijkstra_costs,
candidate_points_fct=soma.ContourPointsCloseTo,
max_straight_sq_dist=max_straight_sq_dist_c,
)
if path.__len__() > 0:
if length <= max_weighted_length_c:
cn_.ValidateConnection(
soma,
extension,
end_point,
path,
dijkstra_costs,
all_ep=all_end_points,
)
if dilatation_erosion:
cn_.Dilate(extension.sites, dijkstra_costs)
LOGGER.info(" : Made")
else:
if dilatation_erosion:
cn_.Dilate(extension.sites, dijkstra_costs)
LOGGER.info(
f" : Cancelled: Too long: {length} > {max_weighted_length_c}"
)
else:
if dilatation_erosion:
cn_.Dilate(extension.sites, dijkstra_costs)
LOGGER.info(" : Cancelled: No found path")
brick.io.console.feedback.PrintConnectedExtensions(extensions)
LOGGER.info("END OF SOMA <-> EXTENSION STEP")
if with_plot:
fb_.PlotSomasWithExtensions(somas, som_nfo, "all")
# -- Extention-Extention
LOGGER.info("EXTENSION <-> EXTENSION")
should_look_for_connections = True
while should_look_for_connections:
if dilatation_erosion:
for soma in somas:
# Update the costs by dilating everything again plus the contour points of the soma
cn_.Dilate(soma.contour_points, dijkstra_costs)
# Below probably not necessary but security
for extension in soma.extensions:
cn_.Dilate(extension.sites, dijkstra_costs)
som_nfo["soma_w_ext_lmp"] = soma_t.SomasLMap(somas)
som_nfo["dist_to_closest"], som_nfo["influence_map"] = soma_t.InfluenceMaps(
som_nfo["soma_w_ext_lmp"]
)
candidate_conn_nfo = cn_.CandidateConnections(
somas,
som_nfo["influence_map"],
som_nfo["dist_to_closest"],
extensions,
max_straight_sq_dist_c,
)
should_look_for_connections = False
for ep_idx, soma, extension, end_point in candidate_conn_nfo:
if extension.is_unconnected:
LOGGER.info(f" Soma.{soma.uid} <-?-> Ext.{extension.uid}({ep_idx})")
if dilatation_erosion:
# Erode the end_point of the extension in the costs map
cn_.Erode(
(end_point,),
dijkstra_costs,
extension,
extensions,
image,
all_end_points=all_end_points,
)
path, length = cn_.ShortestPathFromToN(
image=image,
point=end_point,
extension=extension,
costs=dijkstra_costs,
candidate_points_fct=soma.ExtensionPointsCloseTo,
extensions=extensions,
all_end_points=all_end_points,
max_straight_sq_dist=max_straight_sq_dist_c,
erode_path=dilatation_erosion,
)
if path.__len__() > 0:
if length <= max_weighted_length_c:
# Verify that the last element of the connexion path is contained in the extension to be
# connected. If so, returns the extensions containing the path last site.
tgt_extenstion = extension_t.ExtensionContainingSite(
extensions, path[-1]
)
cn_.ValidateConnection(
tgt_extenstion,
extension,
end_point,
path,
dijkstra_costs,
all_ep=all_end_points,
)
if dilatation_erosion:
cn_.Dilate(extension.sites, dijkstra_costs)
should_look_for_connections = True
LOGGER.info(" : Made")
else:
if dilatation_erosion:
cn_.Dilate(extension.sites, dijkstra_costs)
LOGGER.info(
f" : Cancelled: Too long: {length} > {max_weighted_length_c}"
)
else:
if dilatation_erosion:
cn_.Dilate(extension.sites, dijkstra_costs)
LOGGER.info(" : Cancelled: No found path")
# -- Create an extension map containing all the ext + connexions, without the somas. Used for the graphs extraction.
ext_nfo["lmp_soma"] = som_nfo["soma_w_ext_lmp"].copy()
ext_nfo["lmp_soma"][som_nfo["map"] > 0] = 0
brick.io.console.feedback.PrintConnectedExtensions(extensions)
LOGGER.info("END OF EXTENSION <-> EXTENSION STEP")
if with_plot:
fb_.PlotSomasWithExtensions(somas, som_nfo, "with_ext_of_ext")
# -- Summary
LOGGER.info("\n")
for soma in somas:
LOGGER.info(soma)
LOGGER.info("END OF IMAGE PROCESSING STEPS")
if with_plot:
pl_.show()
if save_images is not None:
po_.MaximumIntensityProjectionZ(
som_nfo["soma_w_ext_lmp"],
block=False,
output_image_file_name=str(
path_t(save_images) / f"MIP_{data_path.stem}.png"
),
)
# --- Extract all the extensions of all somas as a graph
LOGGER.info("GRAPH EXTRACTION")
for soma in somas:
LOGGER.info(f"Soma {soma.uid}")
ext_map, width_map = SKLMapFromObjectMap(
ext_nfo["lmp_soma"] == soma.uid, with_width=True
)
soma.skl_graph = skl_graph_t.NewFromSKLMap(ext_map, width_map=width_map)
# --- Find the root of the {ext+conn} graphs
# Roots are the nodes of degree 1 that are to be linked to the soma
soma.graph_roots = ge_.FindGraphsRootWithNodes(soma)
# TODO: the update of this code to link soma nodes to extension is a WIP
# Add a node "soma" and link it to the root nodes
soma_uid = soma.skl_graph.AddBranchNode(
tuple(nmpy.array(int(round(_elm))) for _elm in soma.centroid),
width_map=width_map,
)
soma.skl_graph.nodes[soma_uid]["soma"] = True
soma.skl_graph.nodes[soma_uid]["soma_nfo"] = soma
for node in soma.graph_roots.values():
sites = sidw.line_nd(
soma.skl_graph.nodes[soma_uid]["details"].position,
soma.skl_graph.nodes[node]["details"].position,
)
adjacent_node_uids = (soma_uid, node)
edge = edge_t.NewWithDetails(
sites,
adjacent_node_uids,
width_map=width_map,
)
soma.skl_graph.AddEdge(edge, adjacent_node_uids)
# The ", edge.uid" at the end of indexing is due to the Multigraph nature of soma
soma.skl_graph.edges[adjacent_node_uids[0], adjacent_node_uids[1], edge.uid][
"root"
] = True
if save_images is not None:
nx_.draw_networkx(soma.skl_graph)
pl_.savefig(
str(path_t(save_images) / f"graph_{data_path.stem}_soma{soma.uid}.png")
)
# pl_.show(block=True)
pl_.close()
LOGGER.info("END OF GRAPH EXTRACTION STEP")
# --- Extract features
LOGGER.info("FEATURE EXTRACTION")
if hist_bins_borders_length is None:
number_of_bins_length = int(number_of_bins_length)
bins_length = nmpy.linspace(
hist_min_length,
hist_min_length + hist_step_length * number_of_bins_length,
num=number_of_bins_length,
)
bins_length[-1] = nmpy.inf
else:
bins_length = nmpy.array(hist_bins_borders_length)
bins_length[-1] = nmpy.inf
if hist_bins_borders_curvature is None:
number_of_bins_curvature = int(number_of_bins_curvature)
bins_curvature = nmpy.linspace(
hist_min_curvature,
hist_min_curvature + hist_step_curvature * number_of_bins_curvature,
num=number_of_bins_curvature,
)
bins_curvature[-1] = nmpy.inf
else:
bins_curvature = nmpy.array(hist_bins_borders_curvature)
bins_curvature[-1] = nmpy.inf
features_df = ge_.ExtractFeaturesInDF(
data_path.stem,
somas,
size_voxel_in_micron,
bins_length,
bins_curvature,
ext_scales,
)
if save_csv is not None:
features_df.to_csv(str(path_t(save_csv) / f"{data_path.stem}.csv"))
else:
features_df.to_csv(str(path_t(data_path.parent) / f"{data_path.stem}.csv"))
LOGGER.info("END OF FEATURE EXTRACTION STEP")
return features_df
def Main():
""""""
if sy_.argv.__len__() < 2:
print("Missing parameter file argument")
sy_.exit(0)
argument = sy_.argv[1]
if not (
os_.path.isfile(argument)
and os_.access(argument, os_.R_OK)
and (os_.path.splitext(argument)[1].lower() == ".ini")
):
print("Wrong parameter file path or type, or parameter file unreadable")
sy_.exit(0)
(
data_path,
save_images,
save_csv,
save_features_analysis,
statistical_analysis,
arguments,
) = NutrimorphArguments(argument)
data_path = path_t(data_path)
SetShowMemoryUsage(True)
AddFileHandler(path_t(save_images).parent / f"{TimeStamp()}-info-warning-error.txt")
if data_path.is_file():
LOGGER.info(
"WARNING: Will not perform features analysis on a single image.\n"
"For features analysis, give a directory path.\n"
)
_ = NutriMorph(data_path, save_images, save_csv, **arguments)
elif data_path.is_dir():
concatenated_features_df = pd_.DataFrame()
for path in data_path.glob("**/*.tif"):
if path.is_file():
features_df = NutriMorph(str(path), save_images, save_csv, **arguments)
concatenated_features_df = concatenated_features_df.append(features_df)
# If some rows (i.e. somas) have NaN, drop them (due to best fitting ellipsoid algo (JTJ is singular due
# to convex hull being flat)).
concatenated_features_df = concatenated_features_df.dropna()
if save_csv is None:
concatenated_features_df.to_csv(
str(path_t(data_path.parent) / "features.csv")
)
else:
concatenated_features_df.to_csv(str(path_t(save_csv) / "features.csv"))
# Clustering with this df and module features_analysis.py
if statistical_analysis:
os_.system(
f"feature_analysis.py {save_csv}/features.csv {save_features_analysis}"
)
else:
raise ImportError(f"{data_path}: Not a valid data path!")
print("\n\n--- NOT IMPORTING MEMORY PROFILING ---\n\n")
# import brick.auto_decorator as dctr
# dctr.MemoryProfileCallables(__name__)
if __name__ == "__main__":
#
Main()