Mentions légales du service

Skip to content
Snippets Groups Projects
nutrimorph.py 11.86 KiB
# Copyright CNRS/Inria/UNS
# Contributor(s): Eric Debreuve (since 2019), Morgane Nadal (2020)
#
# eric.debreuve@cnrs.fr
#
# This software is governed by the CeCILL  license under French law and
# abiding by the rules of distribution of free software.  You can  use,
# modify and/ or redistribute the software under the terms of the CeCILL
# license as circulated by CEA, CNRS and INRIA at the following URL
# "http://www.cecill.info".
#
# As a counterpart to the access to the source code and  rights to copy,
# modify and redistribute granted by the license, users are provided only
# with a limited warranty  and the software's author,  the holder of the
# economic rights,  and the successive licensors  have only  limited
# liability.
#
# In this respect, the user's attention is drawn to the risks associated
# with loading,  using,  modifying and/or developing or reproducing the
# software by the user in light of its specific status of free software,
# that may mean  that it is complicated to manipulate,  and  that  also
# therefore means  that it is reserved for developers  and  experienced
# professionals having in-depth computer knowledge. Users are therefore
# encouraged to load and test the software's suitability as regards their
# requirements in conditions enabling the security of their systems and/or
# data to be ensured and,  more generally, to use and operate it in the
# same conditions as regards security.
#
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL license and that you accept its terms.

# Time profiling:
#     python -m cProfile -o runtime/profiling.log -s name main.py
# Memory profiling:
#     python -m memory_profiler main.py
#     or
#     mprof run main.py
#     mprof plot

import brick.component.connection as cn_
import brick.component.extension as xt_
import brick.component.soma as sm_
import brick.general.feedback as fb_
import brick.processing.input as in_
# import brick.processing.image_verification as iv_
# from main_prm import *

import os as os_
import sys as sy_
import time as tm_
# import tracemalloc as tr_

import matplotlib.pyplot as pl_
import numpy as np_
import skimage.io as io_
from skimage.segmentation import relabel_sequential
import skimage.morphology as mp_
import skimage.measure as ms_

import configparser

print(sy_.argv, sy_.argv.__len__())

if sy_.argv.__len__() < 2:
    print("Missing parameter file argument")
    sy_.exit(0)
if not (os_.path.isfile(sy_.argv[1]) and os_.access(sy_.argv[1], os_.R_OK)):
    print("Wrong parameter file path or parameter file unreadable")
    sy_.exit(0)


# data_path = None
# channel = None
# size_voxel_in_micron = None
# soma_low_c = None
# soma_high_c = None
# soma_selem_micron_c = None
# soma_min_area_c = None
# ext_low_c = None
# ext_high_c = None
# ext_selem_micron_c = None
# ext_min_area_c = None
# max_straight_sq_dist_c = None
# max_weighted_length_c = None
# with_plot = True
# in_parallel = None


# --Parameters from .INI file
parameters = configparser.ConfigParser(allow_no_value=True)
parameters.read('parameters.ini')


def IfNotFloat(section: str, key: str) -> float:
    try:
        value = parameters.getfloat(section, key)
        return value
    except:
        return eval(parameters.get(section, key))


# [Input]
data_path = parameters['Input']['data_path']
channel = parameters['Input']['channel']

size_voxel_in_micron = parameters['Input']['size_voxel_in_micron']
if size_voxel_in_micron is not None:
    size_voxel_in_micron = size_voxel_in_micron.split(',')
    size_voxel_in_micron = list(float(size_voxel_in_micron[i]) for i in range(len(size_voxel_in_micron)))

# [Somas]
soma_low_c = IfNotFloat('Somas', 'soma_low_c')
soma_high_c = IfNotFloat('Somas', 'soma_high_c')
soma_selem_micron_c = IfNotFloat('Somas', 'soma_selem_micron_c')
soma_min_area_c = IfNotFloat('Somas', 'soma_min_area_c')

# [Extensions]
ext_low_c = IfNotFloat('Extensions', 'ext_low_c')
ext_high_c = IfNotFloat('Extensions', 'ext_high_c')
ext_selem_micron_c = IfNotFloat('Extensions', 'ext_selem_micron_c')
ext_min_area_c = IfNotFloat('Extensions', 'ext_min_area_c')

# [Connexions]
max_straight_sq_dist_c = IfNotFloat('Connexions', 'max_straight_sq_dist_c')
max_weighted_length_c = IfNotFloat('Connexions', 'max_weighted_length_c')

# [Program running]
with_plot = parameters.getboolean('Program Running', 'with_plot')
in_parallel = parameters.getboolean('Program Running', 'in_parallel')


# exec(open(sy_.argv[1]).read())  # Only with parameters.py


soma_t = sm_.soma_t
extension_t = xt_.extension_t

print(f"STARTED: {tm_.strftime('%a, %b %d %Y @ %H:%M:%S')}")
start_time = tm_.time()

# tr_.start()


# --- Images
# Find the dimension of the image voxels in micron
size_voxel_in_micron = in_.FindVoxelDimensionInMicron(data_path)

image = io_.imread(data_path)

# Image size verification - simple version without user interface
image = in_.ImageVerification(image, channel)
# iv_.image_verification(image, channel)  # -> PySide2 interface  # TODO: must return the modified image!
# /!\ conflicts between some versions of PySide2 and Python3

image = image[:, 512:, 512:]  # 562 # Just for development
img_shape = image.shape
# pl_.matshow(image[image.shape[0] // 2, :, :])
# pl_.show()
#
# max_img = image.max()
# image.fill(image.min())
# image[2:3,10:11,:] = max_img
#
print(f"IMAGE: s.{img_shape} t.{image.dtype} m.{image.min()} M.{image.max()}")

# Intensity relative normalization (between 0 and 1).
# TODO: change Frangi to take the normalization btw 0 and 1 into account
# image_for_soma = in_.IntensityNormalizedImage(image)
# image_for_ext = in_.IntensityNormalizedImage(image)
# image_for_soma = in_.NormalizedImage(image)
# image_for_ext = in_.NormalizedImage(image)
image_for_soma = image
image_for_ext = image
print(f"NRM-IMG: t.{image_for_soma.dtype} m.{image_for_soma.min():.2f} M.{image_for_soma.max():.2f}")

# --- Initialization
som_nfo = {}
ext_nfo = {}
axes = {}

# n_somas = 0
# n_extensions = 0
# somas = None  # Tuple of soma objects
# extensions = None  # Tuple of extension objects


# --- Somas
print("\n--- Soma Detection")

soma_min_area_c = in_.ToPixel(soma_min_area_c, size_voxel_in_micron, dimension=(0, 1))
soma_selem_c = mp_.disk(in_.ToPixel(soma_selem_micron_c, size_voxel_in_micron))

som_nfo["map"] = soma_t.Map(image_for_soma, soma_low_c, soma_high_c, soma_selem_c)
som_nfo["map"], som_lmp = soma_t.FilteredMap(som_nfo["map"], soma_min_area_c)
som_nfo["lmp"], n_somas = ms_.label(som_nfo["map"], return_num=True)
# som_nfo["lmp"] = relabel_sequential(som_lmp)[0]  # Use relabel instead of label to optimize the algorithm.
# n_somas = som_nfo["lmp"].max()

som_nfo["dist_to_closest"], som_nfo["influence_map"] = soma_t.InfluenceMaps(
    som_nfo["lmp"]
)

somas = tuple(
    soma_t().FromMap(som_nfo["lmp"], uid) for uid in range(1, n_somas + 1)
)

print(f"    n = {n_somas}")
elapsed_time = tm_.gmtime(tm_.time() - start_time)
print(f"\nElapsed Time={tm_.strftime('%Hh %Mm %Ss', elapsed_time)}")
if with_plot:
    fb_.PlotSomas(somas, som_nfo, axes)

# -- Extentions
print("\n--- Extension Detection")

ext_min_area_c = in_.ToPixel(ext_min_area_c, size_voxel_in_micron, dimension=(0, 1))
ext_selem_pixel_c = mp_.disk(in_.ToPixel(ext_selem_micron_c, size_voxel_in_micron))

enhanced_ext, ext_scales = extension_t.EnhancedForDetection(image_for_ext, in_parallel=in_parallel)
ext_nfo["coarse_map"] = extension_t.CoarseMap(enhanced_ext, ext_low_c, ext_high_c, ext_selem_pixel_c)
ext_nfo["coarse_map"], ext_lmp = extension_t.FilteredCoarseMap(ext_nfo["coarse_map"], ext_min_area_c)
ext_nfo["map"] = extension_t.FineMapFromCoarseMap(ext_nfo["coarse_map"])
ext_nfo["map"][som_nfo["map"] > 0] = 0
ext_nfo["lmp"], n_extensions = ms_.label(ext_nfo["map"], return_num=True)

# Use relabel instead of label to optimize the algorithm. BUT PROBLEM WITH THE NUMBER OF EXTENSIONS DETECTED !
# ext_nfo["lmp"] = relabel_sequential(ext_lmp)[0]
# n_extensions = ext_nfo["lmp"].max()

extensions = tuple(
    extension_t().FromMap(ext_nfo["lmp"], ext_scales, uid)
    for uid in range(1, n_extensions + 1)
)

print(f"    n = {n_extensions}")
elapsed_time = tm_.gmtime(tm_.time() - start_time)
print(f"\nElapsed Time={tm_.strftime('%Hh %Mm %Ss', elapsed_time)}")
if with_plot:
    fb_.PlotExtensions(extensions, ext_nfo, img_shape)

# -- Preparation for Connections
dijkstra_costs = in_.DijkstraCosts(image, som_nfo["map"], ext_nfo["map"])

# -- Soma-Extention
print("\n--- Soma <-> Extension")

max_straight_sq_dist_c = in_.ToPixel(max_straight_sq_dist_c, size_voxel_in_micron)
max_weighted_length_c = in_.ToPixel(max_weighted_length_c, size_voxel_in_micron)

candidate_conn_nfo = cn_.CandidateConnections(
    somas,
    som_nfo["influence_map"],
    som_nfo["dist_to_closest"],
    extensions,
    max_straight_sq_dist_c,
)

for ep_idx, soma, extension, end_point in candidate_conn_nfo:
    if extension.is_unconnected:
        print(f"    Soma.{soma.uid} <-?-> Ext.{extension.uid}({ep_idx})", end="")
        path, length = cn_.ShortestPathFromToN(
            end_point,
            dijkstra_costs,
            soma.ContourPointsCloseTo,
            max_straight_sq_dist=max_straight_sq_dist_c,
        )
        if length <= max_weighted_length_c:
            cn_.ValidateConnection(soma, extension, path, dijkstra_costs)
            print(": Made")
        else:
            print("")

# for soma in somas:
#     soma.Extend(extensions, som_nfo["dist_to_closest"], dijkstra_costs)

fb_.PrintConnectedExtensions(extensions)
elapsed_time = tm_.gmtime(tm_.time() - start_time)
print(f"\nElapsed Time={tm_.strftime('%Hh %Mm %Ss', elapsed_time)}")
if with_plot:
    fb_.PlotSomasWithExtensions(somas, som_nfo, "all")

# -- Extention-Extention
print("\n--- Extension <-> Extension")

should_look_for_connections = True
while should_look_for_connections:
    som_nfo["soma_w_ext_lmp"] = soma_t.SomasLMap(somas)
    som_nfo["dist_to_closest"], som_nfo["influence_map"] = soma_t.InfluenceMaps(
        som_nfo["soma_w_ext_lmp"]
    )

    candidate_conn_nfo = cn_.CandidateConnections(
        somas,
        som_nfo["influence_map"],
        som_nfo["dist_to_closest"],
        extensions,
        max_straight_sq_dist_c,
    )

    should_look_for_connections = False
    for ep_idx, soma, extension, end_point in candidate_conn_nfo:
        if extension.is_unconnected:
            print(f"    Soma.{soma.uid} <-?-> Ext.{extension.uid}({ep_idx})", end="")
            path, length = cn_.ShortestPathFromToN(
                end_point,
                dijkstra_costs,
                soma.ExtensionPointsCloseTo,
                max_straight_sq_dist=max_straight_sq_dist_c,
            )
            if length <= max_weighted_length_c:
                tgt_extenstion = extension_t.ExtensionContainingSite(extensions, path[-1])
                cn_.ValidateConnection(tgt_extenstion, extension, path, dijkstra_costs)
                should_look_for_connections = True
                print(": Made")
            else:
                print("")

fb_.PrintConnectedExtensions(extensions)
elapsed_time = tm_.gmtime(tm_.time() - start_time)
print(f"\nElapsed Time={tm_.strftime('%Hh %Mm %Ss', elapsed_time)}")
if with_plot:
    fb_.PlotSomasWithExtensions(somas, som_nfo, "with_ext_of_ext")

# -- Summary
print("\n")
for soma in somas:
    print(soma)
# for extension in extensions:
#     print(extension)

# snapshot = tr_.take_snapshot()
# top_file_stats = snapshot.statistics('lineno')
# print("Memory Profiling: Top 10 FILES")
# for stat in top_file_stats[:10]:
#     print(stat)
# top_block_stats = snapshot.statistics('traceback')
# top_block_stats = top_block_stats[0]
# print(f"Memory Profiling: {top_block_stats.count} memory blocks: {top_block_stats.size / 1024:.1f} KiB")
# for line in top_block_stats.traceback.format():
#     print(line)

elapsed_time = tm_.gmtime(tm_.time() - start_time)
print(f"\nElapsed Time={tm_.strftime('%Hh %Mm %Ss', elapsed_time)}")
print(f"DONE: {tm_.strftime('%a, %b %d %Y @ %H:%M:%S')}")

if with_plot:
    pl_.show()

# graph = skl_graph_t.FromSkeleton(soma_map)  # Travailler apd squelettes (soma pas sous fore de sk)