Mentions légales du service

Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • edebreuv/nutrimorph
  • monadal/nutrimorph
2 results
Show changes
Commits on Source (127)
Showing
with 4031 additions and 657 deletions
.idea/
.mypy_cache/
__pycache__/
.*/
_*/
__dll__/
__material__/
__runtime__/
__version__/
__when__.txt
*.so
*.prof
data/
documentation/
result/
mprofile_*.dat
brick/processing/frangi_c/obj/
brick/processing/frangi_c/src/original-formatting/
brick/processing/frangi_c/vesselness.so
brick/vesselness.so
__when__.txt
......@@ -29,37 +29,43 @@
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL license and that you accept its terms.
from brick.general.type import array_t
import sys as sstm
from pathlib import Path as path_t
import numpy as np_
from brick.feature.analysis import Main as Analysis
# from brick.io.storage.parameters import NutrimorphArguments
_CSV_FEATURE_FILE = "/home/eric/Code/project/mno/nutrimorph/2-morgane/_result-SKIP/complete-20W/hypothalamus-priority/features.csv"
shifts_of_26_neighbors_c = tuple(
(i, j, k)
for i in (-1, 0, 1)
for j in (-1, 0, 1)
for k in (-1, 0, 1)
if i != 0 or j != 0 or k != 0 # take every direct neighbors except itself
)
# Must be positive and higher (strictly) than the max number of neighbors
invalid_n_neighbors_c = shifts_of_26_neighbors_c.__len__() + 1
def Main():
""""""
if sstm.argv.__len__() < 2:
print("Missing parameter file argument")
sstm.exit(0)
argument = path_t(sstm.argv[1])
if not (argument.is_file() and (argument.suffix.lower() == ".ini")):
print("Wrong parameter file path or type, or parameter file unreadable")
sstm.exit(0)
# config = NutrimorphArguments(argument)
# (
# *_,
# input_cfg,
# output_cfg,
# __,
# ___,
# ) = config
# base_folder = sorted((output_cfg["save_images"] / input_cfg["data_path"].stem).glob("2023*"))[-1]
# output_cfg["save_images"] = base_folder / "image"
# output_cfg["save_csv"] = base_folder / "features"
# output_cfg["save_features_analysis"] = base_folder / "analysis"
def PartLMap(map_: array_t) -> array_t:
#
"""
The part mask is labeled as follows: background=invalid_n_neighbors_c;
Pixels of the skeleton = number of neighboring pixels that belong to the skeleton
(as expected, isolated pixels receive 0).
"""
#
result = np_.array(map_, dtype=np_.int8)
result[result > 0] = 1 # binary map
padded_sm = np_.pad(map_ > 0, 1, "constant") # pad for voxels at the border --> map of booleans / binary mask
# path_1 = output_cfg["save_csv"]
# path_2 = output_cfg["save_features_analysis"]
# print(base_folder, path_1, path_2, sep="\n")
Analysis(_CSV_FEATURE_FILE, path_t(_CSV_FEATURE_FILE).parent / "analysis")#str(path_1 / _CSV_FEATURE_FILE), path_2)
for shifts in shifts_of_26_neighbors_c:
result += np_.roll(padded_sm, shifts, axis=(0, 1, 2))[1:-1, 1:-1, 1:-1]
result[map_ == 0] = invalid_n_neighbors_c + 1
return result - 1
if __name__ == "__main__":
#
Main()
import inspect as nspt
import pathlib
import sys as sstm
from types import FunctionType as function_t
from types import MethodType as method_t
import __main__ as main_package
from memory_profiler import profile as Profiled
def MemoryProfileCallables(module: str) -> None:
"""
module: typically, __name__
"""
module_name = module
module = sstm.modules[module]
functions = (
(_nme, _elm)
for _nme, _elm in nspt.getmembers(module)
if (isinstance(_elm, (function_t, method_t)) and _elm.__module__ == module_name)
)
memory_profile_report = open(
pathlib.Path(main_package.__file__).parent / "memory_profiler.txt", "w+"
)
profile = lambda _fct: Profiled(_fct, stream=memory_profile_report)
for name, function in functions:
setattr(module, name, profile(function))
# Copyright CNRS/Inria/UNS
# Contributor(s): Eric Debreuve (since 2019), Morgane Nadal (2020)
#
# eric.debreuve@cnrs.fr
#
# This software is governed by the CeCILL license under French law and
# abiding by the rules of distribution of free software. You can use,
# modify and/ or redistribute the software under the terms of the CeCILL
# license as circulated by CEA, CNRS and INRIA at the following URL
# "http://www.cecill.info".
#
# As a counterpart to the access to the source code and rights to copy,
# modify and redistribute granted by the license, users are provided only
# with a limited warranty and the software's author, the holder of the
# economic rights, and the successive licensors have only limited
# liability.
#
# In this respect, the user's attention is drawn to the risks associated
# with loading, using, modifying and/or developing or reproducing the
# software by the user in light of its specific status of free software,
# that may mean that it is complicated to manipulate, and that also
# therefore means that it is reserved for developers and experienced
# professionals having in-depth computer knowledge. Users are therefore
# encouraged to load and test the software's suitability as regards their
# requirements in conditions enabling the security of their systems and/or
# data to be ensured and, more generally, to use and operate it in the
# same conditions as regards security.
#
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL license and that you accept its terms.
import brick.processing.dijkstra_1_to_n as dk_
from brick.component.extension import extension_t
from brick.component.glial_cmp import glial_cmp_t
from brick.component.soma import soma_t
from brick.general.type import array_t, site_h, site_path_h
import itertools as it_
from typing import Callable, Sequence, Tuple
import numpy as np_
import matplotlib.pyplot as pl_
import skimage
def CandidateConnections(
somas: Sequence[soma_t],
influence_map: array_t,
dist_to_closest: array_t,
extensions: Sequence[extension_t],
max_straight_sq_dist: float = np_.inf,
) -> list:
#
candidate_conn_nfo = [] # conn=connection
extensions = filter(lambda ext: ext.is_unconnected, extensions)
for soma, extension in it_.product(somas, extensions):
new_candidates = extension.EndPointsForSoma(soma.uid, influence_map)
candidate_conn_nfo.extend(
(ep_idx, soma, extension, end_point)
for ep_idx, end_point in enumerate(new_candidates)
if dist_to_closest[end_point] <= max_straight_sq_dist
)
candidate_conn_nfo.sort(key=lambda elm: dist_to_closest[elm[3]])
return candidate_conn_nfo
def ShortestPathFromToN(
point: site_h,
costs: array_t,
candidate_points_fct: Callable,
max_straight_sq_dist: float = np_.inf,
) -> Tuple[site_path_h, float]:
#
candidate_points, candidate_indexing = candidate_points_fct(
point, max_straight_sq_dist
)
if candidate_points is None:
return (), np_.inf
costs[point] = 0.0
costs[candidate_indexing] = 0.0
# pl_.imshow(costs[10,:,:])
# pl_.show(block=True)
path, length = dk_.DijkstraShortestPath(costs, point, candidate_points)
# print(skimage.graph.route_through_array(image, point, candidate_points))
# We could use here skimage.graph.route_through_array(array, start, end, fully_connected=True, geometric=True)
# -> (path: list, cost: float)
costs[point] = np_.inf
costs[candidate_indexing] = np_.inf
return path, length
def ValidateConnection(
glial_cmp: glial_cmp_t, extension: glial_cmp_t, end_point: tuple, ep_idx: int, dijkstra_path: site_path_h, costs: array_t
) -> None:
#
connection_path = tuple(zip(*dijkstra_path[1:-1]))
if connection_path.__len__() == 0:
connection_path = None
glial_cmp.connection_path[extension.uid] = connection_path
glial_cmp.extensions.append(extension)
extension.BackReferenceSoma(glial_cmp)
# Add the site of the connexion between the extension and the soma, for each soma and for ech extension
if type(glial_cmp) is soma_t: # restrain the verification to the soma <-> ext step
glial_cmp.ext_roots.append(end_point)
# TODO: delete one of this info (ep_idx or end_point) according to the code following and its needs
# TODO: Ideally, these paths should be dilated + put this outside
# but in ext-ext connections, there must not be dilation around the current ext
# (current ext plays the role of a soma in soma-ext step)
if connection_path is not None:
costs[connection_path] = np_.inf
This diff is collapsed.
This diff is collapsed.
......@@ -34,7 +34,6 @@ from skimage import measure
def Ext_parameters(ext_label, skeleton_label):
ext_p1 = []
ext_p2 = []
ext_p3 = []
......
......@@ -29,7 +29,7 @@
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL license and that you accept its terms.
import numpy as np_
import numpy as nmpy
import skimage.measure as ms_
import sklearn.decomposition as dc_
......@@ -54,34 +54,34 @@ def Soma_parameters(image, final_soma_label):
# compute eccentricity using PCA
pca = dc_.PCA()
for l in np_.unique(soma_label):
for l in nmpy.unique(soma_label):
if l != 0:
region = np_.zeros([w, n, m])
loc = np_.where(soma_label == l)
region = nmpy.zeros([w, n, m])
loc = nmpy.where(soma_label == l)
region[loc] = image[loc]
loc = np_.asarray(loc)
loc = nmpy.asarray(loc)
# Estimation, calcul des composantes principales for each soma
C = pca.fit(loc).transform(loc)
egv1 = (C[0][:]).max()
egv2 = (C[1][:]).max()
egv3 = (C[2][:]).max()
egv_vect = np_.array([egv1, egv2, egv3])
egv_max = egv_vect.max()
egv_vect = nmpy.array([egv1, egv2, egv3])
egv_max = nmpy.amax(egv_vect)
ration1 = egv1 / egv_max
ration2 = egv2 / egv_max
ration3 = egv3 / egv_max
loc = np_.where(egv_vect == egv_max)
vect_ration = np_.array([ration1, ration2, ration3])
ration = np_.delete(vect_ration, loc)
loc = nmpy.where(egv_vect == egv_max)
vect_ration = nmpy.array([ration1, ration2, ration3])
ration = nmpy.delete(vect_ration, loc)
soma_p5.append(ration)
soma_p1 = np_.asarray(soma_p1)
soma_p2 = np_.asarray(soma_p2)
soma_p3 = np_.asarray(soma_p3)
soma_p4 = np_.asarray(soma_p4)
soma_p5 = np_.asarray(soma_p5)
soma_p1 = nmpy.asarray(soma_p1)
soma_p2 = nmpy.asarray(soma_p2)
soma_p3 = nmpy.asarray(soma_p3)
soma_p4 = nmpy.asarray(soma_p4)
soma_p5 = nmpy.asarray(soma_p5)
parameters_soma = np_.array(
parameters_soma = nmpy.array(
[soma_p1, soma_p2, soma_p3, soma_p4, soma_p5[:, 0], soma_p5[:, 1]]
)
parameters_soma = parameters_soma.T
......
# Copyright CNRS/Inria/UNS
# Contributor(s): Eric Debreuve (since 2018)
# Contributor(s): Eric Debreuve (since 2019), Morgane Nadal (2020)
#
# eric.debreuve@cnrs.fr
#
......@@ -29,31 +29,20 @@
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL license and that you accept its terms.
from string import ascii_uppercase as ASCII_UPPERCASE_LIST
from string import digits as DIGIT_LIST
from typing import Union
from typing import Sequence
from logger_36 import LOGGER
coord_sep_c = "-"
alphabet_c = ASCII_UPPERCASE_LIST + DIGIT_LIST
from brick.type.extension import extension_t
def EncodedNumber(number: Union[int, str], base: int = 0) -> str:
def PrintConnectedExtensions(extensions: Sequence[extension_t]) -> None:
#
if isinstance(number, str):
number = int(number)
elif isinstance(number, int):
pass
else:
raise TypeError(f"{number}: Invalid type; Actual={type(number).__name__}, Expected=int, str")
if base == 0:
base = alphabet_c.__len__()
residual = number
encoding = []
while residual:
residual, remaining = divmod(residual, base)
encoding.append(alphabet_c[remaining])
return "".join(reversed(encoding))
ext_uids = tuple(
extension.uid for extension in extensions if extension.soma_uid is not None
)
LOGGER.info(
f" Connected Ext = {ext_uids.__len__()}"
f"/{extensions.__len__()}\n"
f" {ext_uids}"
)
......@@ -29,28 +29,16 @@
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL license and that you accept its terms.
import brick.processing.plot as ot_
from brick.component.extension import extension_t
from brick.component.soma import soma_t
from typing import Sequence, Tuple
import matplotlib as mpl_
import matplotlib.pyplot as pl_
mpl_.use('TkAgg')
import brick.io.screen.plot as ot_
from brick.type.extension import extension_t
from brick.type.soma import soma_t
def PrintConnectedExtensions(extensions: Sequence[extension_t]) -> None:
#
ext_uids = tuple(
extension.uid for extension in extensions if extension.soma_uid is not None
)
print(
f" Connected Ext = {ext_uids.__len__()}"
f"/{extensions.__len__()}\n"
f" {ext_uids}"
)
# mpl_.use('TkAgg')
def PlotSomas(somas: Sequence[soma_t], som_nfo: dict, axes: dict) -> None:
......
......@@ -29,28 +29,36 @@
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL license and that you accept its terms.
import brick.processing.dijkstra_1_to_n as dk_
from brick.component.extension import extension_t
from brick.component.soma import soma_t
from brick.general.type import array_t, py_array_picker_h
from typing import Any, Optional, Sequence, Tuple, Union
import dijkstra_img as dk_
import matplotlib.pyplot as pl_
import mpl_toolkits.mplot3d as p3_
import numpy as np_
import numpy as nmpy
import skimage.measure as ms_
def MaximumIntensityProjectionZ(img: array_t, cmap: str ='tab20', axis: int = 0, output_image_file_name: str = None) -> None:
""" Maximum Image Projection on the Z axis. """
from mpl_toolkits.mplot3d import Axes3D
from mpl_toolkits.mplot3d.art3d import Poly3DCollection
from brick.type.base import array_t, py_array_picker_h
from brick.type.extension import extension_t
from brick.type.soma import soma_t
def MaximumIntensityProjectionZ(
img: array_t,
cmap: str = "tab20",
axis: int = 0,
block=True,
output_image_file_name: str = None,
) -> None:
"""Maximum Image Projection on the Z axis."""
#
xy = np_.amax(img, axis=axis)
xy = nmpy.amax(img, axis=axis)
pl_.imshow(xy, cmap=cmap)
pl_.show(block=True)
pl_.show(block=block)
if output_image_file_name is not None:
pl_.imsave(output_image_file_name, xy, cmap=cmap)
print('Image saved in', output_image_file_name)
pl_.close()
print("Image saved in", output_image_file_name)
colors_c = ("g", "r", "b", "m", "c", "y")
......@@ -58,7 +66,7 @@ mc_precision_c = 5 # mc=marching cubes
def PlotLMap(
lmp: array_t, axes=None, labels: Union[int, Tuple[int, ...]] = None
lmp: array_t, axes=None, labels: Union[int, Tuple[int, ...]] = None
) -> Optional[Any]:
#
depth_factor, depth_limit = __DepthFactorAndLimit__(lmp.shape)
......@@ -69,16 +77,16 @@ def PlotLMap(
if isinstance(labels, int):
labels = (labels,)
elif labels is None:
labels = range(1, lmp.max() + 1)
labels = range(1, nmpy.amax(lmp).item() + 1)
for label in labels:
try:
vertices, faces, _, _ = ms_.marching_cubes_lewiner(
lmp == label, 0.5, step_size=mc_precision_c
vertices, faces, *_ = ms_.marching_cubes_lewiner(
lmp == label, level=0.5, step_size=mc_precision_c
)
vertices[:, 0] *= depth_factor
triangles = vertices[faces]
mesh = p3_.art3d.Poly3DCollection(triangles)
mesh = Poly3DCollection(triangles)
mesh.set_facecolor(colors_c[label % colors_c.__len__()])
axes.add_collection3d(mesh)
except RuntimeError as exc:
......@@ -91,7 +99,7 @@ def PlotLMap(
def PlotConnection(
connection: py_array_picker_h, soma_uid: int, shape: Sequence[int], axes=None
connection: py_array_picker_h, soma_uid: int, shape: Sequence[int], axes=None
) -> Optional[Any]:
#
depth_factor, depth_limit = __DepthFactorAndLimit__(shape)
......@@ -102,7 +110,7 @@ def PlotConnection(
# TODO This test is put here but could be move outside this function
if connection is not None:
axes.plot(
depth_factor * np_.array(connection[0]),
depth_factor * nmpy.array(connection[0]),
*connection[1:],
colors_c[soma_uid % colors_c.__len__()],
)
......@@ -114,9 +122,9 @@ def PlotConnection(
def PlotExtensions(
extensions: Union[extension_t, Sequence[extension_t]],
shape: Sequence[int],
axes=None,
extensions: Union[extension_t, Sequence[extension_t]],
shape: Sequence[int],
axes=None,
) -> Optional[Any]:
#
depth_factor, depth_limit = __DepthFactorAndLimit__(shape)
......@@ -124,26 +132,25 @@ def PlotExtensions(
if new_axes:
_, axes = __FigAndAxes__(shape, depth_limit)
costs = np_.empty(shape, dtype=np_.float32)
costs = nmpy.empty(shape, dtype=nmpy.float32)
if isinstance(extensions, extension_t):
extensions = (extensions,)
for extension in extensions:
# Remove voxels that can be removed w/o breaking connectivity
costs.fill(np_.inf)
costs.fill(nmpy.inf)
costs[extension.sites] = 1
for src_ep_idx in range(extension.end_points_as_array.shape[1] - 1):
src_point = tuple(extension.end_points_as_array[:, src_ep_idx].tolist())
for tgt_ep_idx in range(
src_ep_idx + 1, extension.end_points_as_array.shape[1]
src_ep_idx + 1, extension.end_points_as_array.shape[1]
):
tgt_point = tuple(extension.end_points_as_array[:, tgt_ep_idx].tolist())
sites, _ = dk_.DijkstraShortestPath(
costs,
src_point,
tgt_point,
limit_to_sphere=False,
constrain_direction=False,
should_constrain_steps=False,
)
sites = tuple(zip(*sites))
......@@ -174,12 +181,12 @@ def PlotSomaWithExtensions(soma: soma_t, soma_lmp: array_t, axes=None) -> Option
PlotLMap(soma_lmp, labels=soma.uid, axes=axes)
for connection_path in filter(
lambda path: path is not None, soma.connection_path.values()
lambda path: path is not None, soma.connection_path.values()
):
PlotConnection(connection_path, soma.uid, shape, axes=axes)
for extension in soma.Extensions():
for connection_path in filter(
lambda path: path is not None, extension.connection_path.values()
lambda path: path is not None, extension.connection_path.values()
):
PlotConnection(connection_path, soma.uid, shape, axes=axes)
PlotExtensions(extension, shape, axes=axes)
......@@ -201,7 +208,7 @@ def __DepthFactorAndLimit__(shape: Sequence[int]) -> Tuple[float, int]:
def __FigAndAxes__(shape: Sequence[int], depth_limit: float) -> Tuple[Any, Any]:
#
fig = pl_.figure()
axes = fig.add_subplot(111, projection=p3_.Axes3D.name)
axes = fig.add_subplot(111, projection=Axes3D.name)
axes.set_xlabel(f"depth: {shape[0]}")
axes.set_ylabel(f"row: {shape[1]}")
......
......@@ -30,39 +30,50 @@
# knowledge of the CeCILL license and that you accept its terms.
import sys as sy_
from PySide2.QtGui import QImage, QPixmap
from PySide2.QtWidgets import *
import skimage.io as io_
import matplotlib as mpl_
import matplotlib.pyplot as pl_
import numpy as np_
import skimage.io as io_
from PySide6.QtWidgets import *
from brick.general.type import array_t
from brick.type.base import array_t
mpl_.use('TkAgg')
channel = 'G'
data_path = '././data/DIO_6H_6_1.70bis_2.2_3.tif'
image = io_.imread(data_path)
mpl_.use("TkAgg")
CHANNEL = "G"
DATA_PATH = "././data/DIO_6H_6_1.70bis_2.2_3.tif"
IMAGE = io_.imread(DATA_PATH)
# image = image[:,:,:,1]
class image_verification(QWidget):
def __init__(self, image: array_t = None, channel: str = None, parent=None):
super(image_verification, self).__init__(parent)
self.channel = channel
self.image = image
# Create widgets
# --Text
self.text0 = QLabel('The image input must be not constant.')
self.text1 = QLabel('The image has only one color channel.')
self.text2 = QLabel(f"The image has only 3 dimensions. However, a value for the 'channel' parameter"
f"is specified: {channel}. Continue with this image?")
self.text3 = QLabel(f"The image has multiple color channels. The channel {channel} is specified"
f" in the parameters. Continue with the resulting image?")
self.text4 = QLabel('The image has multiple color channels. Error in the value of the parameter channel.')
self.text5 = QLabel(f'The image dimensions are not correct: {image.ndim}, instead of 3 or 4.')
self.text0 = QLabel(parent=None, text="The image input must be not constant.")
self.text1 = QLabel(parent=None, text="The image has only one color channel.")
self.text2 = QLabel(
parent=None,
text=f"The image has only 3 dimensions. However, a value for the 'channel' parameter"
f"is specified: {channel}. Continue with this image?",
)
self.text3 = QLabel(
parent=None,
text=f"The image has multiple color channels. The channel {channel} is specified"
f" in the parameters. Continue with the resulting image?",
)
self.text4 = QLabel(
parent=None,
text="The image has multiple color channels. Error in the value of the parameter channel.",
)
self.text5 = QLabel(
parent=None,
text=f"The image dimensions are not correct: {image.ndim}, instead of 3 or 4.",
)
# --Buttons
self.button_quit = QPushButton("Quit and modify the parameters.")
self.button_continue = QPushButton("Continue.")
......@@ -79,7 +90,8 @@ class image_verification(QWidget):
def Continue(self):
self.close()
def Quit(self):
@staticmethod
def Quit():
sy_.exit(0)
def ReturnImage(self):
......@@ -87,7 +99,7 @@ class image_verification(QWidget):
# def PlotImage(self, image: array_t) -> None :
# im = image.copy()
# im = np_.rint(im).astype(np_.uint8)
# im = nmpy.rint(im).astype(nmpy.uint8)
# im = QImage(im, im.shape[1], im.shape[0], im.shape[1] * 3, QImage.Format_RGB888)
# pix = QPixmap.fromImage(im)
# img = QLabel(self)
......@@ -96,7 +108,7 @@ class image_verification(QWidget):
def ImVerif(self) -> array_t:
# The image must not be constant
if self.image.max() == self.image.min():
value_error = 'The image input must be not constant.'
value_error = "The image input must be not constant."
print(value_error)
layout = QVBoxLayout()
......@@ -106,15 +118,17 @@ class image_verification(QWidget):
# Verification of the dimension of the image and its coherence with the parameters (channel)
elif self.image.ndim == 3:
print('The image has only one color channel.')
pl_.imshow(self.image[10, :, :], cmap='gray')
print("The image has only one color channel.")
pl_.imshow(self.image[10, :, :], cmap="gray")
pl_.show(block=True)
text = self.text1
if channel is not None:
value_error = f'The image has only 3 dimensions. However, a value for the "channel" parameter is ' \
f'specified : {channel}. Give the channel the value None? '
if CHANNEL is not None:
value_error = (
f'The image has only 3 dimensions. However, a value for the "channel" parameter is '
f"specified : {CHANNEL}. Give the channel the value None? "
)
print(value_error)
text = self.text2
......@@ -126,20 +140,22 @@ class image_verification(QWidget):
self.setLayout(layout)
elif self.image.ndim == 4:
if channel == 'R' or channel == 'G' or channel == 'B':
if CHANNEL == "R" or CHANNEL == "G" or CHANNEL == "B":
# not changed into if --channel in 'RGB'-- because if channel='RG' => True.
value_error = f'The image has multiple color channels. The channel {channel} is specified in the ' \
f'parameters. '
value_error = (
f"The image has multiple color channels. The channel {CHANNEL} is specified in the "
f"parameters. "
)
print(value_error)
text = self.text3
self.image = self.image[:, :, :, 'RGB'.find(channel)]
pl_.imshow(self.image[10, :, :], cmap='gray')
self.image = self.image[:, :, :, "RGB".find(CHANNEL)]
pl_.imshow(self.image[10, :, :], cmap="gray")
pl_.show(block=True)
# The obtained image must not be constant
if self.image.max() == self.image.min():
value_error = 'The image input must be not constant.'
value_error = "The image input must be not constant."
print(value_error)
text = self.text0
......@@ -153,7 +169,9 @@ class image_verification(QWidget):
return self.image # TODO: How to return the image ??
else:
print('The image has multiple color channels. Error in the value of the parameter channel.')
print(
"The image has multiple color channels. Error in the value of the parameter channel."
)
layout = QVBoxLayout()
layout.addWidget(self.text4)
......@@ -161,7 +179,9 @@ class image_verification(QWidget):
self.setLayout(layout)
elif self.image.ndim != 4 and self.image.ndim != 3:
print(f'The image dimensions are not correct: {self.image.ndim}, instead of 3 or 4.')
print(
f"The image dimensions are not correct: {self.image.ndim}, instead of 3 or 4."
)
layout = QVBoxLayout()
layout.addWidget(self.text5)
......@@ -169,8 +189,8 @@ class image_verification(QWidget):
self.setLayout(layout)
if __name__ == '__main__':
app = QApplication(sy_.argv)
verif = image_verification(image, channel)
if __name__ == "__main__":
app = QApplication()
verif = image_verification(IMAGE, CHANNEL)
verif.show()
sy_.exit(app.exec_())
from __future__ import annotations
import tkinter as tknt
from typing import Any, Callable, Optional, Sequence, Tuple, Union
import numpy as nmpy
from brick.io.screen.type.tk_and_pil import (
ColoredVersion,
LabelColors,
ScaledVersion,
image_record_t,
)
array_t = nmpy.ndarray
class mip_widget_t(tknt.Label):
""""""
attributes = (
"mip_records",
"data_mode", # "continuous", "discrete"
"color_mode", # "gray", "color"
"gray_offset",
"colormap",
"colors",
"static",
"probe_able",
"resizeable",
"mip_axis_wgt",
"selected_mip_axis",
"probe_info_wgt",
"companion_mip",
"parent",
)
mip_records: Sequence[image_record_t]
data_mode: str
color_mode: str
gray_offset: int
colormap: str
colors: Sequence[Union[float, array_t]]
static: bool
probe_able: bool
resizeable: bool
mip_axis_wgt: tknt.Menubutton
selected_mip_axis: tknt.IntVar
probe_info_wgt: Optional[tknt.Widget]
companion_mip: Optional[mip_widget_t]
parent: Union[tknt.Widget, tknt.Tk]
def __init__(
self,
parent: Union[tknt.Widget, tknt.Tk],
/,
):
""""""
super().__init__(parent, borderwidth=0, padx=0, pady=0)
for attribute in mip_widget_t.attributes:
setattr(self, attribute, None)
@classmethod
def NewForImage(
cls,
image: array_t,
data_mode: str,
color_mode: str,
/,
*,
mip_axis: int = 0, # Do not use negative values
with_mip_selector: bool = False,
should_pack_selector: bool = True,
with_companion: mip_widget_t = None,
gray_offset: int = 0,
colormap: str = "viridis",
static: bool = False,
probe_able: bool = True,
resizeable: bool = True,
parent: Union[tknt.Widget, tknt.Tk] = None,
) -> mip_widget_t:
""""""
instance = cls(parent)
# Do not comment out; Used in eval(attribute) below
mip_records, colors = _AllMIPImages(
image,
data_mode,
color_mode,
gray_offset,
colormap,
probe_able,
resizeable,
parent,
)
if static and (with_mip_selector or (with_companion is not None)):
raise ValueError(
"MIP widget with MIP selector or companion cannot be static"
)
if with_mip_selector and (with_companion is not None):
raise ValueError(
"MIP widget cannot have both a MIP selector and a companion"
)
elif with_mip_selector:
if should_pack_selector:
selector_parent = instance
else:
selector_parent = parent
# Do not comment out; Used in eval(attribute)
mip_axis_wgt, selected_mip_axis = _MIPAxisSelectorWidget(
mip_axis,
instance.Update,
image.shape,
selector_parent,
)
elif with_companion is not None:
selected_mip_axis = with_companion.selected_mip_axis
_SetMIPAxisSelectorActions(
selected_mip_axis,
(instance.Update, with_companion.Update),
)
else:
raise ValueError(
"MIP widget must have either a MIP selector or a companion"
)
if with_mip_selector and should_pack_selector:
raise NotImplementedError("Packing MIP selector not currently implemented")
if resizeable:
# Note: Binding cannot be done before super init
instance.bind("<Configure>", instance._OnResize)
# --- Saving required variables as object attributes
for attribute in cls.attributes:
try:
value = eval(attribute)
except NameError:
value = None
if value is not None:
setattr(instance, attribute, value)
instance.Update()
return instance
def UpdateImage(self, image: array_t, /) -> None:
""""""
if self.static:
raise RuntimeError("Requesting the update of a static mip")
self.mip_records, self.colors = _AllMIPImages(
image,
self.data_mode,
self.color_mode,
self.gray_offset,
self.colormap,
self.probe_able,
self.resizeable,
self.parent,
)
self.Update()
def Update(self, *_, **__) -> None:
""""""
if self.selected_mip_axis is None:
return
mip_record = self.mip_records[self.selected_mip_axis.get()]
self.configure(image=mip_record.tk_version)
def AddProbe(
self, probe_info_wgt: tknt.Widget, /, *, for_event: str = "<Motion>"
) -> None:
"""
probe_info_wgt: Must have a text attribute updatable through the configure(text=...) method.
The widget constructor could have been declared with a probe info widget as a parameter. However, to avoid
requiring the probe info widget being created before the MIP widget, this 2-step option was chosen.
"""
if not self.probe_able:
raise ValueError(
'Adding a probe to a MIP widget instantiated with "probe_able" to False'
)
self.probe_info_wgt = probe_info_wgt
self.bind(for_event, self._DisplayInfo)
def _DisplayInfo(self, event: tknt.EventType.Motion, /) -> None:
""""""
value, row, col = self.ValueAndIndicesUnderPointer(event.y, event.x)
self.probe_info_wgt.configure(text=f"Label: {value} @ R={row}xC={col}")
def _OnResize(self, event: tknt.EventType.Configure, /) -> None:
""""""
mip_record = self.mip_records[self.selected_mip_axis.get()]
mip_record.Resize(event.width, event.height)
self.configure(image=mip_record.tk_version)
def ValueAndIndicesUnderPointer(
self, pointer_row: int, pointer_col: int, /
) -> Tuple[Union[int, float], int, int]:
""""""
mip_record = self.mip_records[self.selected_mip_axis.get()]
np_version = mip_record.np_version
shape = np_version.shape
row = int(round(shape[0] * pointer_row / self.winfo_height()))
col = int(round(shape[1] * pointer_col / self.winfo_width()))
try:
value = np_version[row, col]
except IndexError:
# If margins/paddings/borders are all set to zero, this should not happen
value = 0
return value, row, col
def _MIPAxisSelectorWidget(
current_axis: int,
Action: Union[Callable, Sequence[Callable]],
shape: Sequence[int],
parent: Union[tknt.Widget, tknt.Tk],
/,
) -> Tuple[tknt.Menubutton, tknt.IntVar]:
""""""
title = f"MIP Axis [{','.join(str(_lgt) for _lgt in shape)}]"
selector = tknt.Menubutton(parent, text=title, relief="raised")
selected_value = tknt.IntVar()
selected_value.set(current_axis)
menu = tknt.Menu(selector, tearoff=False)
entries = ("First dim", "Second dim", "Third dim")
for idx, entry in enumerate(entries):
menu.add_radiobutton(label=entry, value=idx, variable=selected_value)
menu.invoke(selected_value.get())
# Set action only after calling invoke to avoid redundant call at window creation
_SetMIPAxisSelectorActions(selected_value, Action)
selector["menu"] = menu
return selector, selected_value
def _SetMIPAxisSelectorActions(
mip_selector_variable: tknt.IntVar,
Action: Union[Callable, Sequence[Callable]],
) -> None:
""""""
if isinstance(Action, Callable):
Actions = (Action,)
else:
Actions = Action
def Callback(prm1: str, prm2: str, prm3: str, /) -> Any:
#
for OneAction in Actions:
OneAction(prm1, prm2, prm3)
mip_selector_variable.trace_add("write", Callback)
def _AllMIPImages(
image: array_t,
data_mode: str,
color_mode: str,
gray_offset: int,
colormap: str,
probe_able: bool,
resizeable: bool,
parent: Union[tknt.Widget, tknt.Tk],
/,
) -> Tuple[Sequence[image_record_t], Sequence[Union[float, array_t]]]:
""""""
if data_mode == "continuous":
colors = None
else:
colors = LabelColors(
image, color_mode, gray_offset=gray_offset, colormap=colormap
)
mip_records = [
_MIPImages(
image,
_axs,
colors,
parent,
)
for _axs in range(3)
]
for record in mip_records:
record.Optimize(probe_able, resizeable)
return mip_records, colors
def _MIPImages(
image: array_t,
mip_axis: int,
colors: Optional[Sequence[Union[float, array_t]]],
parent: Union[tknt.Widget, tknt.Tk],
/,
) -> image_record_t:
""""""
mip = nmpy.amax(image, axis=mip_axis)
if colors is None:
min_value = nmpy.amin(mip).item()
max_value = nmpy.amax(mip).item()
display_version = (255.0 / (max_value - min_value)) * (mip - min_value)
display_version = nmpy.around(display_version).astype(nmpy.uint8)
elif isinstance(colors[0], float):
display_version = ScaledVersion(mip, colors)
else:
display_version = ColoredVersion(mip, colors)
output = image_record_t(mip, display_version, parent)
return output
# Copyright CNRS/Inria/UNS
# Contributor(s): Eric Debreuve (since 2019)
#
# eric.debreuve@cnrs.fr
#
# This software is governed by the CeCILL license under French law and
# abiding by the rules of distribution of free software. You can use,
# modify and/ or redistribute the software under the terms of the CeCILL
# license as circulated by CEA, CNRS and INRIA at the following URL
# "http://www.cecill.info".
#
# As a counterpart to the access to the source code and rights to copy,
# modify and redistribute granted by the license, users are provided only
# with a limited warranty and the software's author, the holder of the
# economic rights, and the successive licensors have only limited
# liability.
#
# In this respect, the user's attention is drawn to the risks associated
# with loading, using, modifying and/or developing or reproducing the
# software by the user in light of its specific status of free software,
# that may mean that it is complicated to manipulate, and that also
# therefore means that it is reserved for developers and experienced
# professionals having in-depth computer knowledge. Users are therefore
# encouraged to load and test the software's suitability as regards their
# requirements in conditions enabling the security of their systems and/or
# data to be ensured and, more generally, to use and operate it in the
# same conditions as regards security.
#
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL license and that you accept its terms.
from __future__ import annotations
import tkinter as tknt
from typing import Optional, Sequence
import numpy as nmpy
import skimage.segmentation as sisg
from brick.io.screen.type.mip_widget import mip_widget_t
from brick.io.screen.type.three_d_widget import three_d_widget_t
array_t = nmpy.ndarray
STATIC_ROW_MIN_HEIGHT = 30
INITIAL_RELATIVE_ISOVALUE = 0.6
class soma_validation_window_t:
""""""
__slots__ = (
"gfp",
"lmap",
"main_window",
"three_d_selector",
"gfp_wgt",
"gfp_3d_wgt",
"lmap_wgt",
"lmap_3d_wgt",
"cursor_nfo",
"invisible_widgets",
)
attributes = __slots__
gfp: array_t
lmap: array_t
main_window: tknt.Tk
three_d_selector: tknt.Checkbutton
gfp_wgt: mip_widget_t
gfp_3d_wgt: Optional[three_d_widget_t]
lmap_wgt: mip_widget_t
lmap_3d_wgt: Optional[three_d_widget_t]
cursor_nfo: tknt.Label
invisible_widgets: Sequence
def __init__(
self,
gfp: array_t,
lmap: array_t,
/,
*,
mip_axis: int = -1,
gray_offset: int = 0,
colormap: str = "viridis",
):
"""
with_cm: "plasma" and "viridis" seem to be good options
"""
exec(f"{'='.join(soma_validation_window_t.attributes)} = None")
main_window = tknt.Tk()
# ---- Creation of widgets
if mip_axis < 0:
mip_axis = gfp.ndim + mip_axis
gfp_wgt = mip_widget_t.NewForImage(
gfp,
"continuous",
"gray",
mip_axis=mip_axis,
with_mip_selector=True,
should_pack_selector=False,
gray_offset=gray_offset,
probe_able=False,
parent=main_window,
)
lmap_wgt = mip_widget_t.NewForImage(
lmap,
"discrete",
"color",
mip_axis=mip_axis,
with_companion=gfp_wgt,
colormap=colormap,
parent=main_window,
)
state_variable = tknt.IntVar()
Toggle3D = lambda *args, **kwargs: self._Toggle3D(state_variable)
three_d_selector = tknt.Checkbutton(
main_window, text="3D View", variable=state_variable, command=Toggle3D
)
cursor_nfo = tknt.Label(main_window, text="")
done_button = tknt.Button(main_window, text="Done", command=main_window.quit)
# --- Event management
lmap_wgt.AddProbe(cursor_nfo)
lmap_wgt.bind("<Button-1>", self._DeleteSoma)
# --- Widget placement in grid
next_available_row = 0
gfp_wgt.mip_axis_wgt.grid(row=next_available_row, column=0)
three_d_selector.grid(row=next_available_row, column=1)
next_available_row += 1
# sticky=... solves the super-slow-to-resize issue!
gfp_wgt.grid(
row=next_available_row, column=0, sticky=tknt.W + tknt.E + tknt.N + tknt.S
)
lmap_wgt.grid(
row=next_available_row, column=1, sticky=tknt.W + tknt.E + tknt.N + tknt.S
)
# Leave 2 rows free for isovalue slider and smoothing width slider
next_available_row += 3
cursor_nfo.grid(row=next_available_row, column=0)
done_button.grid(row=next_available_row, column=1)
next_available_row += 1
# --- Window resize management
n_grid_cols, n_grid_rows = main_window.grid_size()
for row in range(n_grid_rows):
if row == 1:
main_window.rowconfigure(1, weight=10)
else:
main_window.rowconfigure(row, weight=1, minsize=STATIC_ROW_MIN_HEIGHT)
for col in range(n_grid_cols):
main_window.columnconfigure(col, weight=1)
# --- Saving required variables as object attributes
for attribute in soma_validation_window_t.attributes:
setattr(self, attribute, eval(attribute))
self.invisible_widgets = ()
def LaunchValidation(self) -> tuple[array_t, int]:
""""""
self.main_window.mainloop()
self.main_window.destroy()
relabeled, *_ = sisg.relabel_sequential(self.lmap)
return relabeled, nmpy.amax(relabeled).item()
def _DeleteSoma(self, event: tknt.EventType.ButtonPress, /) -> None:
""""""
label, *_ = self.lmap_wgt.ValueAndIndicesUnderPointer(event.y, event.x)
if label > 0:
lmap = self.lmap
where_label = lmap == label
if nmpy.all(nmpy.logical_or(lmap == 0, where_label)):
return
lmap[where_label] = 0
self.lmap_wgt.UpdateImage(lmap)
if self.lmap_3d_wgt is not None:
self.lmap_3d_wgt.UpdateImage(lmap, colors=self.lmap_wgt.colors)
def _Toggle3D(self, state: tknt.IntVar, /) -> None:
""""""
three_d_state = state.get()
if three_d_state == 0:
soon_invisible_widgets = (
self.gfp_3d_wgt,
self.lmap_3d_wgt,
self.gfp_3d_wgt.isolevel_wgt,
self.gfp_3d_wgt.smoothing_width_wgt,
)
else:
soon_invisible_widgets = (
self.gfp_wgt,
self.lmap_wgt,
self.gfp_wgt.mip_axis_wgt,
)
for widget in soon_invisible_widgets:
widget.grid_remove()
if self.invisible_widgets.__len__() > 0:
for widget in self.invisible_widgets:
widget.grid()
else:
mip_axis = self.gfp_wgt.selected_mip_axis.get()
gfp_3d_wgt = three_d_widget_t.NewForImage(
self.gfp,
"continuous",
None,
self.main_window,
mip_axis=mip_axis,
should_pack_slider=False,
initial_relative_isovalue=INITIAL_RELATIVE_ISOVALUE,
)
lmap_3d_wgt = three_d_widget_t.NewForImage(
self.lmap,
"discrete",
self.lmap_wgt.colors,
self.main_window,
mip_axis=mip_axis,
)
gfp_3d_wgt.AddCompanionWidget(lmap_3d_wgt)
lmap_3d_wgt.AddCompanionWidget(gfp_3d_wgt)
gfp_3d_wgt.grid(row=1, column=0, sticky=tknt.W + tknt.E + tknt.N + tknt.S)
lmap_3d_wgt.grid(row=1, column=1, sticky=tknt.W + tknt.E + tknt.N + tknt.S)
gfp_3d_wgt.isolevel_wgt.grid(
row=2, column=0, sticky=tknt.W + tknt.E + tknt.N + tknt.S
)
gfp_3d_wgt.smoothing_width_wgt.grid(
row=3, column=0, sticky=tknt.W + tknt.E + tknt.N + tknt.S
)
self.gfp_3d_wgt = gfp_3d_wgt
self.lmap_3d_wgt = lmap_3d_wgt
self.invisible_widgets = soon_invisible_widgets
if three_d_state == 0:
self.cursor_nfo.configure(text="")
else:
self.cursor_nfo.configure(text="No MIP info in 3-D mode")
from typing import Optional, Tuple
import numpy as nmpy
from scipy import ndimage as spim
from skimage import filters as fltr
array_t = nmpy.ndarray
class image_record_t:
""""""
__slots__ = (
"original",
"resized",
"smoothed",
"mode", # "continuous", "discrete"
"zoom_factors",
"smoothing_width",
)
attributes = __slots__
original: array_t
resized: array_t
smoothed: array_t
mode: str
zoom_factors: Tuple[float, float, float]
smoothing_width: Optional[float]
IMAGE_DTYPE = nmpy.float32
MAX_LENGTH = 500
MIN_LENGTH_RATIO = 0.25
INITIAL_SMOOTHING_WIDTH = 1.0
defaults = {}
def __init__(
self,
image: array_t,
mode: str,
/,
):
""""""
exec(f"{'='.join(image_record_t.attributes)} = None")
for attribute in image_record_t.attributes:
if attribute in image_record_t.defaults:
setattr(self, attribute, image_record_t.defaults[attribute])
else:
setattr(self, attribute, eval(attribute))
self.Update(image, image_record_t.INITIAL_SMOOTHING_WIDTH)
def Update(self, image: array_t, smoothing_width: float, /) -> None:
""""""
self.smoothing_width = None # Ensures that new image will be smoothed even if smoothing width is the same
self.original = _Retyped(image)
self.SetResized()
self.SetSmoothed(smoothing_width)
def SetResized(self) -> None:
""""""
shape = self.original.shape
min_length = min(shape)
max_length = max(shape)
zoom_factors = (1.0, 1.0, 1.0)
if min_length < image_record_t.MIN_LENGTH_RATIO * max_length:
smallest_axis = shape.index(min_length)
zoom_factors = (
zoom_factors[:smallest_axis]
+ (image_record_t.MIN_LENGTH_RATIO * max_length / min_length,)
+ zoom_factors[(smallest_axis + 1) :]
)
if max_length > image_record_t.MAX_LENGTH:
scaling = image_record_t.MAX_LENGTH / max_length
zoom_factors = tuple(scaling * _zmf for _zmf in zoom_factors)
self.zoom_factors = zoom_factors
if any(_zmf != 1.0 for _zmf in zoom_factors):
if self.mode == "continuous":
order = 3
else:
order = 0
resized = spim.zoom(self.original, zoom_factors, order=order)
self.resized = _Retyped(resized)
else:
self.resized = self.original
def SetSmoothed(self, smoothing_width: float, /) -> None:
""""""
if self.mode == "continuous":
if smoothing_width != self.smoothing_width:
smoothed = fltr.gaussian(self.resized, smoothing_width)
self.smoothed = _Retyped(smoothed)
self.smoothing_width = smoothing_width
else:
self.smoothed = self.resized
def _Retyped(image: array_t) -> array_t:
""""""
return image.astype(image_record_t.IMAGE_DTYPE, copy=False)
This diff is collapsed.
import tkinter as tknt
from typing import Optional, Sequence, Tuple, Union
import numpy as nmpy
from matplotlib import cm as clmp
from PIL import Image as plim
from PIL import ImageTk as pltk
array_t = nmpy.ndarray
pil_image_t = plim.Image
tk_image_t = pltk.PhotoImage
class image_record_t:
""""""
__slots__ = (
"np_version",
"pil_version_original",
"pil_version",
"tk_version",
"parent",
)
attributes = __slots__
np_version: Optional[array_t]
pil_version_original: Optional[pil_image_t]
pil_version: pil_image_t
tk_version: tk_image_t
parent: Union[tknt.Widget, tknt.Tk]
def __init__(
self,
np_version: array_t,
display_version: array_t,
parent: Union[tknt.Widget, tknt.Tk],
/,
):
""""""
# Do not comment out; They are used in eval(attribute) below
tk_version, pil_version = TkAndPILImagesFromNumpyArray(display_version, parent)
pil_version_original = pil_version
# --- Saving required variables as object attributes
for attribute in image_record_t.attributes:
setattr(self, attribute, eval(attribute))
def Optimize(
self,
probe_able: bool,
resizeable: bool,
/,
) -> None:
""""""
if not probe_able:
self.np_version = None
if not resizeable:
self.pil_version_original = None
def Resize(self, width: int, height: int, /) -> None:
""""""
self.pil_version = self.pil_version_original.resize((width, height))
self.tk_version = pltk.PhotoImage(master=self.parent, image=self.pil_version)
def LabelColors(
image: array_t, mode: str, /, *, gray_offset: int = 0, colormap: str = "viridis"
) -> Sequence[Union[float, array_t]]:
"""
mode: "gray", "color"
gray_offset: Value of darkest non-background intensity
"""
# Keep max instead of unique.size to maintain colors in case of deletion
n_labels = int(nmpy.amax(image))
if mode == "gray":
output = nmpy.linspace(max(gray_offset, 1) / 255.0, 1.0, num=n_labels)
elif mode == "color":
LinearValueToRGB = clmp.get_cmap(colormap)
if n_labels > 1:
output = tuple(
nmpy.array(LinearValueToRGB((_lbl - 1.0) / (n_labels - 1.0))[:3])
for _lbl in range(1, n_labels + 1)
)
else:
output = (nmpy.array(LinearValueToRGB(1.0)[:3]),)
else:
raise ValueError(f'{mode}: Invalid mode; Expected="gray" or "color"')
return output
def ScaledVersion(image: array_t, gray_levels: Sequence[float], /) -> array_t:
""""""
output = nmpy.zeros(image.shape, dtype=nmpy.uint8)
for label, gray_level in enumerate(gray_levels, start=1):
output[image == label] = round(255.0 * gray_level)
return output
def ColoredVersion(image: array_t, colors: Sequence[array_t], /) -> array_t:
""""""
output = nmpy.zeros(image.shape + (3,), dtype=nmpy.uint8)
for label, color in enumerate(colors, start=1):
output[image == label] = nmpy.around(255.0 * color)
return output
def TkAndPILImagesFromNumpyArray(
array: array_t, parent: Union[tknt.Widget, tknt.Tk], /
) -> Tuple[tk_image_t, pil_image_t]:
""""""
pil_image = plim.fromarray(array)
tk_image = pltk.PhotoImage(master=parent, image=pil_image)
return tk_image, pil_image
......@@ -29,95 +29,106 @@
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL license and that you accept its terms.
import ast as asgt
import configparser
# --Parameters from .INI file
parameters = configparser.ConfigParser(allow_no_value=True)
parameters.read('parameters.ini')
def IfNotFloat(section: str, key: str) -> float:
try:
value = parameters.getfloat(section, key)
return value
except:
return eval(parameters.get(section, key))
# [Input]
data_path = parameters['Input']['data_path']
channel = parameters['Input']['channel']
try:
size_voxel_in_micron = parameters['Input']['size_voxel_in_micron']
if size_voxel_in_micron is not None:
size_voxel_in_micron = size_voxel_in_micron.split(',')
size_voxel_in_micron = list(float(size_voxel_in_micron[i]) for i in range(len(size_voxel_in_micron)))
except:
raise ValueError('The voxel size parameter is not given correctly. Write X,Y,Z without () or [] or spaces. Do not '
'forget ":" between size_voxel_in_micron and its value')
# [Somas]
soma_low_c = IfNotFloat('Somas', 'soma_low_c')
soma_high_c = IfNotFloat('Somas', 'soma_high_c')
soma_selem_micron_c = IfNotFloat('Somas', 'soma_selem_micron_c')
soma_min_area_c = IfNotFloat('Somas', 'soma_min_area_c')
# [Extensions]
ext_low_c = IfNotFloat('Extensions', 'ext_low_c')
ext_high_c = IfNotFloat('Extensions', 'ext_high_c')
ext_selem_micron_c = IfNotFloat('Extensions', 'ext_selem_micron_c')
ext_min_area_c = IfNotFloat('Extensions', 'ext_min_area_c')
# [Connexions]
max_straight_sq_dist_c = IfNotFloat('Connexions', 'max_straight_sq_dist_c')
max_weighted_length_c = IfNotFloat('Connexions', 'max_weighted_length_c')
# [Frangi]
try:
scale_range = parameters['Frangi']['scale_range']
scale_range = scale_range.split(',')
scale_range = tuple(float(scale_range[i]) for i in range(len(scale_range)))
except:
raise ValueError('The scale range parameter is not given correctly. Write min,max without () or [] or spaces. '
'Do not forget ":" between scale_range and its value')
scale_step = IfNotFloat('Frangi', 'scale_step')
alpha = IfNotFloat('Frangi', 'alpha')
beta = IfNotFloat('Frangi', 'beta')
frangi_c = IfNotFloat('Frangi', 'frangi_c')
bright_on_dark = parameters.getboolean('Frangi', 'bright_on_dark')
method = parameters['Frangi']['method']
# [Program running]
with_plot = parameters.getboolean('Program Running', 'with_plot')
in_parallel = parameters.getboolean('Program Running', 'in_parallel')
# data_path = "./data/DIO_6H_6_1.70bis_2.2_3.tif"
# channel = 'G'
# size_voxel_in_micron = None # -> list [X,Y,Z]
# with_plot = False
#
# soma_low_c = 0.15
# soma_high_c = 0.7126
# ext_low_c = 0.2 #3 #1e-7 ##0.2 # 0.02 # 0.2 # ext_low_c = 9.0e-4
# ext_high_c = 0.6 #1e-10 #1.2e-7 ##0.6 # 0.04 # 0.6 # high_ext = 8.0e-3
#
# # soma_selem_c = mp_.disk(2)
# # ext_selem_c = mp_.disk(1)
# soma_selem_micron_c = 0.24050024 * 2
# ext_selem_micron_c = 0.24050024 * 1
#
# max_straight_sq_dist_c = (30 ** 2) * 0.24050024
# max_weighted_length_c = 20.0 * 0.24050024
#
# soma_min_area_c = 1000 * (0.24050024 ** 2)
# ext_min_area_c = 100 * (0.24050024 ** 2)
# # soma_min_area_c = 1000
# # ext_min_area_c = 100
#
# in_parallel = False
import pprint as pprt
from configparser import DEFAULTSECT as DEFAULT_SECTION
from pathlib import Path as path_t
from typing import Any
from logger_36 import LOGGER
from logger_36.handler import AddFileHandler
from logger_36.system import LogSystemDetails
from logger_36.time import TimeStamp
_BOOLEANS = (
("Connection", "dilatation_erosion"),
("Extension", "adapt_hist_equalization"),
("Frangi", "bright_on_dark"),
("Output", "statistical_analysis"),
("Workflow", "in_parallel"),
("Workflow", "interactive"),
("Workflow", "with_plot"),
)
_INTEGERS = (
("Feature", "number_of_bins_curvature"),
("Feature", "number_of_bins_length"),
)
_FLOATS = (
("Connection", "max_straight_sq_dist"),
("Connection", "max_weighted_length"),
("Extension", "ext_high"),
("Extension", "ext_low"),
("Extension", "ext_min_area"),
("Extension", "ext_selem_micron"),
("Feature", "hist_min_curvature"),
("Feature", "hist_min_length"),
("Feature", "hist_step_curvature"),
("Feature", "hist_step_length"),
("Frangi", "alpha"),
("Frangi", "beta"),
("Frangi", "frangi_c"),
("Frangi", "scale_step"),
("Soma", "soma_high"),
("Soma", "soma_low"),
("Soma", "soma_max_area"),
("Soma", "soma_min_area"),
("Soma", "soma_selem_micron"),
)
_PATHS = (
("Input", "data_path"),
("Output", "save_csv"),
("Output", "save_features_analysis"),
("Output", "save_images"),
)
_TO_BE_EVALUATED = (
("Feature", "hist_bins_borders_curvature"),
("Feature", "hist_bins_borders_length"),
("Frangi", "scale_range"),
("Input", "voxel_size_in_micron"),
)
def NutrimorphArguments(path: path_t, /) -> tuple[dict[str, Any], ...]:
""""""
config = configparser.ConfigParser(allow_no_value=True, inline_comment_prefixes="#")
config.read(path)
output: dict[str, dict[str, Any]] = dict(
(_sct, dict(_prm)) for _sct, _prm in config.items() if _sct != DEFAULT_SECTION
)
for parameters, getter in (
(_BOOLEANS, "getboolean"),
(_INTEGERS, "getint"),
(_FLOATS, "getfloat"),
(_PATHS, path_t),
(_TO_BE_EVALUATED, asgt.literal_eval),
):
if isinstance(getter, str):
Converted = getattr(config, getter)
else:
Converted = lambda _sct, _prm: getter(config.get(_sct, _prm))
for section, parameter in parameters:
if output[section][parameter] is not None:
output[section][parameter] = Converted(section, parameter)
base_folder = output["Output"]["save_images"] / output["Input"]["data_path"].stem
if base_folder.exists() and not base_folder.is_dir():
raise RuntimeError(f"{base_folder}: Exists and is not a folder.")
if not base_folder.exists():
base_folder.mkdir(parents=True)
AddFileHandler(
base_folder / f"info-warning-error-{TimeStamp()}.txt", show_memory_usage=True
)
LogSystemDetails()
LOGGER.info(pprt.pformat(output))
# Section order: 'Connection', 'Extension', 'Feature', 'Frangi', 'Input', 'Output', 'Soma', 'Workflow'
return tuple(output[_sct] for _sct in sorted(output.keys()))
# Copyright CNRS/Inria/UNS
# Contributor(s): Eric Debreuve (since 2019)
#
# eric.debreuve@cnrs.fr
#
# This software is governed by the CeCILL license under French law and
# abiding by the rules of distribution of free software. You can use,
# modify and/ or redistribute the software under the terms of the CeCILL
# license as circulated by CEA, CNRS and INRIA at the following URL
# "http://www.cecill.info".
#
# As a counterpart to the access to the source code and rights to copy,
# modify and redistribute granted by the license, users are provided only
# with a limited warranty and the software's author, the holder of the
# economic rights, and the successive licensors have only limited
# liability.
#
# In this respect, the user's attention is drawn to the risks associated
# with loading, using, modifying and/or developing or reproducing the
# software by the user in light of its specific status of free software,
# that may mean that it is complicated to manipulate, and that also
# therefore means that it is reserved for developers and experienced
# professionals having in-depth computer knowledge. Users are therefore
# encouraged to load and test the software's suitability as regards their
# requirements in conditions enabling the security of their systems and/or
# data to be ensured and, more generally, to use and operate it in the
# same conditions as regards security.
#
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL license and that you accept its terms.
"""
Dijkstra Shortest Weighted Path from one image/volume pixel/voxel to one or more image/volume pixel(s)/voxel(s)
Graph nodes = pixels/voxels = sites
Graph edges = pixel/voxel neighborhood relationships
Edge weights = typically computed from pixel/voxel values
Adapted from material of a course by Marc Pegon:
https://www.ljll.math.upmc.fr/pegon/teaching.html
https://www.ljll.math.upmc.fr/pegon/documents/BCPST/TP06_src.tar.gz
https://www.researchgate.net/profile/Marc_Pegon
"""
import heapq as hp_
from typing import Final, Iterator, List, Optional, Sequence, Tuple, Union
import numpy as np_
import scipy.ndimage.morphology as mp_
# Slightly slower alternative (look for SSA below)
# import scipy.ndimage as im_
# import skimage.draw as dw_
number_h = Union[int, float]
tintegers_h = Tuple[int, ...] # tintegers=tuple of integers
site_h = tintegers_h
path_h = Tuple[site_h, ...]
path_nfo_h = Tuple[path_h, float]
array_t = np_.ndarray
def DijkstraShortestPath(
costs: array_t,
origin: site_h,
target: Union[site_h, Sequence[site_h]],
limit_to_sphere: bool = True,
constrain_direction: bool = True,
return_all: bool = False,
) -> Union[path_nfo_h, Tuple[path_nfo_h, ...]]:
#
costs, targets, valid_directions, dir_lengths = _DijkstraShortestPathPrologue(
costs, origin, target, limit_to_sphere, constrain_direction,
)
nearest_sites = nearest_site_queue_t()
nearest_sites.Insert(0, origin)
min_distance_to = {origin: 0.0}
predecessor_of = {}
visited_sites = set()
while True:
site_nfo = nearest_sites.Pop()
if site_nfo is None:
# Empty queue: full graph traversal did not allow to reach all targets
# This case is correctly dealt with in the following
break
distance, site = site_nfo
if site in targets:
targets.remove(site)
if targets.__len__() > 0:
continue
else:
break
if site not in visited_sites:
visited_sites.add(site)
for successor, edge_length in _OutgoingEdges(
site, valid_directions, dir_lengths, costs
):
successor = tuple(successor)
next_distance = distance + edge_length
min_distance = min_distance_to.get(successor)
if (min_distance is None) or (next_distance < min_distance):
min_distance_to[successor] = next_distance
predecessor_of[successor] = site
nearest_sites.Insert(next_distance, successor)
if isinstance(target[0], Sequence):
targets = tuple(target)
else:
targets = (target,)
if return_all:
all_paths = []
for one_target in targets:
distance = min_distance_to.get(one_target)
if distance is None:
all_paths.append(((), None))
else:
path = []
site = one_target
while site is not None:
path.append(site)
site = predecessor_of.get(site)
all_paths.append((tuple(reversed(path)), distance))
return tuple(all_paths)
else:
min_distance = np_.inf
closest_target = None
for one_target in targets:
distance = min_distance_to.get(one_target)
if (distance is not None) and (distance < min_distance):
min_distance = distance
closest_target = one_target
path = []
if closest_target is None:
distance = None
else:
distance = min_distance_to.get(closest_target)
site = closest_target
while site is not None:
path.append(site)
site = predecessor_of.get(site)
return tuple(reversed(path)), distance
def _DijkstraShortestPathPrologue(
costs: array_t,
origin: site_h,
target: Union[site_h, Sequence[site_h]],
limit_to_sphere: bool,
constrain_direction: bool,
) -> Tuple[array_t, List[site_h], array_t, array_t]:
#
if isinstance(target[0], Sequence):
targets = list(target)
else:
targets = [target]
if limit_to_sphere:
costs = _SphereLimitedCosts(costs, origin, targets)
if costs.ndim == 2:
if constrain_direction:
valid_directions, dir_lengths = _FilteredDirections(
DIRECTIONS_2D, LENGTHS_2D, origin, targets
)
else:
valid_directions, dir_lengths = DIRECTIONS_2D, LENGTHS_2D
elif costs.ndim == 3:
if constrain_direction:
valid_directions, dir_lengths = _FilteredDirections(
DIRECTIONS_3D, LENGTHS_3D, origin, targets
)
else:
valid_directions, dir_lengths = DIRECTIONS_3D, LENGTHS_3D
else:
raise ValueError(f"Cost matrix has {costs.ndim} dimension(s); Expecting 2 or 3")
return costs, targets, valid_directions, dir_lengths
def _OutgoingEdges(
site: site_h, valid_directions: array_t, dir_lengths: array_t, costs: array_t
) -> Iterator:
#
neighbors = valid_directions + np_.array(site, dtype=valid_directions.dtype)
n_dims = site.__len__()
inside = np_.all(neighbors >= 0, axis=1)
for c_idx in range(n_dims):
np_.logical_and(
inside, neighbors[:, c_idx] <= costs.shape[c_idx] - 1, out=inside
)
neighbors = neighbors[inside, :]
dir_lengths = dir_lengths[inside]
# For any n_dims: neighbors_costs = costs[tuple(zip(*neighbors.tolist()))]
if n_dims == 2:
neighbors_costs = costs[(neighbors[:, 0], neighbors[:, 1])]
else:
neighbors_costs = costs[(neighbors[:, 0], neighbors[:, 1], neighbors[:, 2])]
valid_sites = np_.isfinite(neighbors_costs) # Excludes inf and nan
neighbors = neighbors[valid_sites, :]
neighbors_costs = neighbors_costs[valid_sites] * dir_lengths[valid_sites]
return zip(neighbors.tolist(), neighbors_costs)
DIRECTIONS_2D: Final = np_.array(
tuple((i, j) for i in (-1, 0, 1) for j in (-1, 0, 1) if i != 0 or j != 0),
dtype=np_.int16,
)
DIRECTIONS_3D: Final = np_.array(
tuple(
(i, j, k)
for i in (-1, 0, 1)
for j in (-1, 0, 1)
for k in (-1, 0, 1)
if i != 0 or j != 0 or k != 0
),
dtype=np_.int16,
)
LENGTHS_2D: Final = np_.linalg.norm(DIRECTIONS_2D, axis=1)
LENGTHS_3D: Final = np_.linalg.norm(DIRECTIONS_3D, axis=1)
def _FilteredDirections(
all_directions: array_t,
all_lengths: array_t,
origin: site_h,
targets: Sequence[site_h],
) -> Tuple[array_t, array_t]:
#
n_dims = origin.__len__()
n_targets = targets.__len__()
straight_lines = np_.empty((n_dims, n_targets), dtype=np_.float64)
for p_idx, target in enumerate(targets):
for c_idx in range(n_dims):
straight_lines[c_idx, p_idx] = target[c_idx] - origin[c_idx]
inner_prods = all_directions @ straight_lines
valid_directions = np_.any(inner_prods >= 0, axis=1)
return all_directions[valid_directions, :], all_lengths[valid_directions]
def _SphereLimitedCosts(
costs: array_t, origin: site_h, targets: Sequence[site_h]
) -> array_t:
"""
Note: Un-numba-ble because of slice
"""
valid_sites = np_.zeros_like(costs, dtype=np_.bool)
center_map = np_.ones_like(costs, dtype=np_.uint8)
distances = np_.empty_like(costs, dtype=np_.float64)
targets_as_array = np_.array(targets)
centers = np_.around(0.5 * targets_as_array.__add__(origin)).astype(
np_.int64, copy=False
)
centers = tuple(tuple(center) for center in centers)
n_dims = origin.__len__()
for t_idx, target in enumerate(targets):
sq_radius = max(
(np_.subtract(centers[t_idx], origin) ** 2).sum(),
(np_.subtract(centers[t_idx], target) ** 2).sum(),
)
radius = np_.sqrt(sq_radius).astype(np_.int64, copy=False) + 1
# Note the +1 in slices ends to account for right-open ranginess
bbox = tuple(
slice(
max(centers[t_idx][c_idx] - radius, 0),
min(centers[t_idx][c_idx] + radius + 1, distances.shape[c_idx]),
)
for c_idx in range(n_dims)
)
if t_idx > 0:
center_map[centers[t_idx - 1]] = 1
center_map[centers[t_idx]] = 0
mp_.distance_transform_edt(center_map[bbox], distances=distances[bbox])
distance_thr = max(distances[origin], distances[target])
valid_sites[bbox][distances[bbox] <= distance_thr] = True
# # SSA
# if n_dims == 2:
# valid_coords = dw_.circle(
# *centers[t_idx], radius, shape=valid_sites.shape
# )
# else:
# local_shape = tuple(slc.stop - slc.start for slc in bbox)
# local_center = tuple(centers[t_idx][c_idx] - slc.start for c_idx, slc in enumerate(bbox))
# valid_coords = __SphereCoords__(*local_center, radius, shape=local_shape)
# valid_coords = tuple(valid_coords[c_idx] + slc.start for c_idx, slc in enumerate(bbox))
# valid_sites[valid_coords] = True
local_cost = costs.copy()
local_cost[np_.logical_not(valid_sites)] = np_.inf
return local_cost
class nearest_site_queue_t:
#
__slots__ = ("heap", "insertion_idx", "visited_sites")
def __init__(self):
#
self.heap = []
self.insertion_idx = 0
self.visited_sites = {}
def Insert(self, distance: number_h, site: site_h) -> None:
"""
Insert a new site with its distance, or update the distance of an existing site
"""
if site in self.visited_sites:
self._Delete(site)
self.insertion_idx += 1
site_nfo = [distance, self.insertion_idx, site]
self.visited_sites[site] = site_nfo
hp_.heappush(self.heap, site_nfo)
def Pop(self) -> Optional[Tuple[number_h, site_h]]:
"""
Return (distance, site) for the site of minimum distance, or None if queue is empty
"""
while self.heap:
distance, _, site = hp_.heappop(self.heap)
if site is not None:
del self.visited_sites[site]
return distance, site
return None
def _Delete(self, site: site_h) -> None:
#
site_nfo = self.visited_sites.pop(site)
site_nfo[-1] = None
# # SSA
# def __SphereCoords__(
# row: int, col: int, dep: int, radius: int, shape: Tuple[int, int, int]
# ) -> np_array_picker_h:
# #
# sphere = np_.zeros(shape, dtype=np_.bool)
# # dw_.ellipsoid leaves a one pixel margin around the ellipse, hence [1:-1, 1:-1, 1:-1]
# ellipse = dw_.ellipsoid(radius, radius, radius)[1:-1, 1:-1, 1:-1]
# sp_slices = tuple(
# slice(0, min(sphere.shape[idx_], ellipse.shape[idx_])) for idx_ in (0, 1, 2)
# )
# sphere[sp_slices] = ellipse[sp_slices]
#
# sphere = im_.shift(
# sphere, (row - radius, col - radius, dep - radius), order=0, prefilter=False
# )
#
# return sphere.nonzero()
File deleted