# Copyright CNRS/Inria/UNS # Contributor(s): Eric Debreuve (since 2019) # # eric.debreuve@cnrs.fr # # This software is governed by the CeCILL license under French law and # abiding by the rules of distribution of free software. You can use, # modify and/ or redistribute the software under the terms of the CeCILL # license as circulated by CEA, CNRS and INRIA at the following URL # "http://www.cecill.info". # # As a counterpart to the access to the source code and rights to copy, # modify and redistribute granted by the license, users are provided only # with a limited warranty and the software's author, the holder of the # economic rights, and the successive licensors have only limited # liability. # # In this respect, the user's attention is drawn to the risks associated # with loading, using, modifying and/or developing or reproducing the # software by the user in light of its specific status of free software, # that may mean that it is complicated to manipulate, and that also # therefore means that it is reserved for developers and experienced # professionals having in-depth computer knowledge. Users are therefore # encouraged to load and test the software's suitability as regards their # requirements in conditions enabling the security of their systems and/or # data to be ensured and, more generally, to use and operate it in the # same conditions as regards security. # # The fact that you are presently reading this means that you have had # knowledge of the CeCILL license and that you accept its terms. """ Python version mostly follows: https://github.com/ellisdg/frangi3d """ import ctypes as ct_ import multiprocessing as pl_ import os.path as ph_ # Pathlib not necessary import warnings as wn_ from os import name as os_name_c from typing import Callable, Optional, Tuple # import itk as ik_ import numpy as np_ from multiprocessing.sharedctypes import Array as shared_array_t from scipy import ndimage as im_ import skimage.filters as fl_ _folder, _ = ph_.split(ph_.realpath(__file__)) if _folder.__len__() == 0: _folder = "." _extension_path = ph_.join(_folder, "frangi3-" + os_name_c + ".so") try: _c_extension = ct_.CDLL(_extension_path) _ComputeEnhancementInC = _c_extension.ComputeFrangiResponseFromRaw _ComputeEnhancementInC.argtypes = ( ct_.c_void_p, ct_.c_int, ct_.c_int, ct_.c_int, ct_.c_bool, ct_.c_void_p, ct_.c_void_p, ct_.c_float, ct_.c_float, ct_.c_float, ct_.c_float, ct_.c_float, ct_.c_float, ) _ComputeEnhancementInC.restype = None except Exception as exc: wn_.warn( f"{_extension_path}: Error when loading extension (see exception below)\n" f"{exc}\n" f"=> Falling back to Python implementation", RuntimeWarning, ) _ComputeEnhancementInC = None array_t = np_.ndarray h_matrices_fct_t = Callable[[array_t, array_t], None] enhancement_fct_t = Callable[ [ array_t, float, h_matrices_fct_t, float, float, float, bool, Optional[shared_array_t], ], Optional[array_t], ] _hessian_matrices = None def FrangiEnhancement( img: array_t, scale_range: Tuple[float, float], scale_step: float = 2.0, alpha: float = 0.5, beta: float = 0.5, frangi_c: float = 500.0, bright_on_dark: bool = True, in_parallel: bool = False, method: str = "python", # python, itk, c differentiation_mode: str = "direct", # direct, indirect ) -> Tuple[array_t, array_t]: # global _hessian_matrices if (method == "c") and (_ComputeEnhancementInC is None): method = "python" in_parallel = True differentiation_mode = "direct" if method == "c": img = img.astype(np_.float32, order="F", copy=False) enhanced = np_.empty_like(img) scale_map = np_.empty_like(img) height, width, depth = img.shape _ComputeEnhancementInC( img.ctypes.data, width, height, depth, bright_on_dark, enhanced.ctypes.data, scale_map.ctypes.data, scale_range[0], scale_range[1], scale_step, alpha, beta, frangi_c, ) # else: img = img.astype(np_.float32, copy=False) scales = np_.linspace( scale_range[0], scale_range[1], num=int(np_.around((scale_range[1] - scale_range[0]) / scale_step)) + 1, ) if in_parallel and (pl_.get_start_method(allow_none=False) == "fork"): # Do not share Hessian matrices storage across scales _hessian_matrices = None Enhancement_fct = _EnhancementWParallelScales else: # Do share Hessian matrices storage across scales _hessian_matrices = np_.empty((*img.shape, 3, 3), dtype=np_.float32) Enhancement_fct = _EnhancementWSequentialScales if method == "python": EnhancementOneScale_fct = _EnhancementOneScaleWPython elif method == "itk": # EnhancementOneScale_fct = _EnhancementOneScaleWITK raise ValueError(f"{method}: Computation method currently disabled") else: raise ValueError(f"{method}: Invalid computation method") if differentiation_mode == "direct": HessianMatrices_fct = _ComputeHessianMatricesDirectly elif differentiation_mode == "indirect": HessianMatrices_fct = _ComputeHessianMatricesBasedOnFirstOrder else: raise ValueError(f"{method}: Invalid differentiation mode") enhanced, scale_map = Enhancement_fct( img, scales, HessianMatrices_fct, alpha=alpha, beta=beta, frangi_c=frangi_c, bright_on_dark=bright_on_dark, method_fct=EnhancementOneScale_fct, ) _hessian_matrices = None return enhanced, scale_map def _EnhancementWSequentialScales( img: array_t, scales: array_t, HessianMatrices_fct: h_matrices_fct_t, alpha: float = 0.5, beta: float = 0.5, frangi_c: float = 500.0, bright_on_dark: bool = True, method_fct: enhancement_fct_t = None, ) -> Tuple[array_t, array_t]: # enhanced = None scale_map = None # local_enhanced = fl_.frangi( # img, # sigmas=scales, # alpha=alpha, # beta=beta, # gamma=frangi_c, # black_ridges=not bright_on_dark, # ) for s_idx, scale in enumerate(scales): local_enhanced = method_fct( img, scale, HessianMatrices_fct, alpha=alpha, beta=beta, frangi_c=frangi_c, bright_on_dark=bright_on_dark, ) # print(f"scale={scale}") if s_idx > 0: larger_map = local_enhanced > enhanced enhanced[larger_map] = local_enhanced[larger_map] scale_map[larger_map] = scale else: enhanced = local_enhanced scale_map = np_.full(img.shape, scale, dtype=np_.float32) return enhanced, scale_map def _EnhancementWParallelScales( img: array_t, scales: array_t, HessianMatrices_fct: h_matrices_fct_t, alpha: float = 0.5, beta: float = 0.5, frangi_c: float = 500.0, bright_on_dark: bool = True, method_fct: enhancement_fct_t = None, ) -> Tuple[array_t, array_t]: # fixed_args = { "alpha": alpha, "beta": beta, "frangi_c": frangi_c, "bright_on_dark": bright_on_dark, } shared_enhanced_s = tuple(shared_array_t(ct_.c_float, img.size) for ___ in scales) processes = tuple( pl_.Process( target=method_fct, args=(img, scale, HessianMatrices_fct), kwargs={**fixed_args, "shared_enhanced": shared_enhanced,}, ) for scale, shared_enhanced in zip(scales, shared_enhanced_s) ) for process in processes: process.start() for process in processes: process.join() enhanced = np_.array(shared_enhanced_s[0]) scale_map = np_.full(img.size, scales[0], dtype=np_.float32) for s_idx, shared_enhanced in enumerate(shared_enhanced_s[1:], start=1): local_enhanced = np_.array(shared_enhanced) larger_map = local_enhanced > enhanced enhanced[larger_map] = local_enhanced[larger_map] scale_map[larger_map] = scales[s_idx] enhanced = enhanced.astype(np_.float32, copy=False).reshape(img.shape) scale_map = scale_map.astype(np_.float32, copy=False).reshape(img.shape) return enhanced, scale_map def _EnhancementOneScaleWPython( img: array_t, scale: float, HessianMatrices_fct: h_matrices_fct_t, alpha: float = 0.5, beta: float = 0.5, frangi_c: float = 500.0, bright_on_dark: bool = True, shared_enhanced: shared_array_t = None, ) -> Optional[array_t]: # print(f" scale={scale}") sorted_eigenvalues = _EigenValuesOfHessianMatrices(img, scale, HessianMatrices_fct) abs_eig_val = np_.fabs(sorted_eigenvalues) # Absolute plate, blob, and background maps plate_map = _DivisionWithNaN(abs_eig_val[..., 1], abs_eig_val[..., 2]) blob_map = _DivisionWithNaN( abs_eig_val[..., 0], np_.sqrt(np_.prod(abs_eig_val[..., 1:], axis=-1)) ) bckgnd = np_.sqrt(np_.sum(abs_eig_val ** 2, axis=-1)) # Normalized plate, blob, and background maps plate_factor = -0.5 / alpha ** 2 blob_factor = -0.5 / beta ** 2 bckgnd_factor = -0.5 / frangi_c ** 2 plate_map = 1.0 - np_.exp(plate_factor * plate_map ** 2) blob_map = np_.exp(blob_factor * blob_map ** 2) bckgnd = 1.0 - np_.exp(bckgnd_factor * bckgnd ** 2) local_enhanced = plate_map * blob_map * bckgnd invalid_condition = np_.logical_not(np_.isfinite(local_enhanced)) if bright_on_dark: eig_condition = np_.any(sorted_eigenvalues[..., 1:] > 0.0, axis=-1) else: eig_condition = np_.any(sorted_eigenvalues[..., 1:] < 0.0, axis=-1) local_enhanced[np_.logical_or(eig_condition, invalid_condition)] = 0.0 if shared_enhanced is None: return local_enhanced else: shared_enhanced[:] = local_enhanced.flatten()[:] # def _EnhancementOneScaleWITK( # img: array_t, # scale: float, # _: h_matrices_fct_t, # alpha: float = 0.5, # beta: float = 2.0, # frangi_c: float = 500.0, # bright_on_dark: bool = True, # shared_enhanced: shared_array_t = None, # ) -> Optional[array_t]: # # # print(f" scale={scale}") # # if alpha >= beta: # raise ValueError( # f"Parameter alpha (passed {alpha}) must be strictly smaller than beta (passed {beta})" # ) # if not bright_on_dark: # raise ValueError( # "ITK version applies only to bright objects on dark background\n" # "For the dark-on-bright context, please invert intensity first" # ) # # img = ik_.image_view_from_array(img) # hessian_image = ik_.hessian_recursive_gaussian_image_filter(img, sigma=scale) # # vesselness_filter = ik_.Hessian3DToVesselnessMeasureImageFilter[ # ik_.ctype("float") # ].New() # vesselness_filter.SetInput(hessian_image) # vesselness_filter.SetAlpha1(alpha) # vesselness_filter.SetAlpha2(beta) # # # Do not view-version here since vesselness_filter is local # local_enhanced = ik_.array_from_image(vesselness_filter) # # if shared_enhanced is None: # return local_enhanced # else: # shared_enhanced[:] = local_enhanced.flatten()[:] def _ComputeHessianMatricesBasedOnFirstOrder( hessian_matrices: array_t, img: array_t ) -> None: # Dx, Dy, Dz = np_.gradient(img) ( hessian_matrices[..., 0, 0], hessian_matrices[..., 0, 1], hessian_matrices[..., 0, 2], ) = np_.gradient(Dx) hessian_matrices[..., 1, 1], hessian_matrices[..., 1, 2] = np_.gradient( Dy, axis=(1, 2) ) hessian_matrices[..., 2, 2] = np_.gradient(Dz, axis=2) def _ComputeHessianMatricesDirectly(hessian_matrices: array_t, img: array_t) -> None: # for axis in range(3): hessian_matrices[..., axis, axis] = ( np_.roll(img, 1, axis=axis) - 2.0 * img + np_.roll(img, -1, axis=axis) ) for axes in ((0, 1), (0, 2), (1, 2)): hessian_matrices[..., axes[0], axes[1]] = 0.25 * ( np_.roll(img, 1, axis=axes) + np_.roll(img, -1, axis=axes) - np_.roll(np_.roll(img, 1, axis=axes[0]), -1, axis=axes[1]) - np_.roll(np_.roll(img, -1, axis=axes[0]), 1, axis=axes[1]) ) def _EigenValuesOfHessianMatrices( img: array_t, scale: float, HessianMatrices_fct: h_matrices_fct_t ) -> array_t: # global _hessian_matrices if scale > 0.0: # img = scale ** 2 * im_.uniform_filter(img, size=2 * (int(np_.around(2.0*scale)) // 2) + 1) img = scale ** 2 * im_.gaussian_filter(img, scale) else: img = scale ** 2 * img if _hessian_matrices is None: # Private Hessian matrices storage (parallel version) hessian_matrices = np_.empty((*img.shape, 3, 3), dtype=np_.float32) else: # Shared Hessian matrices storage (sequential version) hessian_matrices = _hessian_matrices # HessianMatrices_fct(hessian_matrices, img) # hessian_matrices[..., 1, 0] = hessian_matrices[..., 0, 1] hessian_matrices[..., 2, 0] = hessian_matrices[..., 0, 2] hessian_matrices[..., 2, 1] = hessian_matrices[..., 1, 2] eigenvalues = np_.linalg.eigvalsh(hessian_matrices) # Sorted by abs value index = list(np_.ix_(*[np_.arange(size) for size in eigenvalues.shape])) index[-1] = np_.fabs(eigenvalues).argsort(axis=-1) sorted_eigenvalues = eigenvalues[tuple(index)] return sorted_eigenvalues def _DivisionWithNaN(array_1: array_t, array_2: array_t) -> array_t: # null_map = array_2 == 0.0 if null_map.any(): denominator = array_2.copy() denominator[null_map] = np_.nan else: denominator = array_2 return array_1 / denominator