Mentions légales du service

Skip to content
Snippets Groups Projects
frangi3.py Symbolic link · 14 KiB
Newer Older
# Copyright CNRS/Inria/UNS
# Contributor(s): Eric Debreuve (since 2019)
#
# eric.debreuve@cnrs.fr
#
# This software is governed by the CeCILL  license under French law and
# abiding by the rules of distribution of free software.  You can  use,
# modify and/ or redistribute the software under the terms of the CeCILL
# license as circulated by CEA, CNRS and INRIA at the following URL
# "http://www.cecill.info".
#
# As a counterpart to the access to the source code and  rights to copy,
# modify and redistribute granted by the license, users are provided only
# with a limited warranty  and the software's author,  the holder of the
# economic rights,  and the successive licensors  have only  limited
# liability.
#
# In this respect, the user's attention is drawn to the risks associated
# with loading,  using,  modifying and/or developing or reproducing the
# software by the user in light of its specific status of free software,
# that may mean  that it is complicated to manipulate,  and  that  also
# therefore means  that it is reserved for developers  and  experienced
# professionals having in-depth computer knowledge. Users are therefore
# encouraged to load and test the software's suitability as regards their
# requirements in conditions enabling the security of their systems and/or
# data to be ensured and,  more generally, to use and operate it in the
# same conditions as regards security.
#
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL license and that you accept its terms.

"""
Python version mostly follows: https://github.com/ellisdg/frangi3d
"""

import ctypes as ct_
import multiprocessing as pl_
import os.path as ph_  # Pathlib not necessary
import warnings as wn_
from os import name as os_name_c
from typing import Callable, Optional, Tuple

# import itk as ik_
import numpy as np_
from multiprocessing.sharedctypes import Array as shared_array_t
from scipy import ndimage as im_
import skimage.filters as fl_

_folder, _ = ph_.split(ph_.realpath(__file__))
if _folder.__len__() == 0:
    _folder = "."
_extension_path = ph_.join(_folder, "frangi3-" + os_name_c + ".so")
try:
    _c_extension = ct_.CDLL(_extension_path)
    _ComputeEnhancementInC = _c_extension.ComputeFrangiResponseFromRaw
    _ComputeEnhancementInC.argtypes = (
        ct_.c_void_p,
        ct_.c_int,
        ct_.c_int,
        ct_.c_int,
        ct_.c_bool,
        ct_.c_void_p,
        ct_.c_void_p,
        ct_.c_float,
        ct_.c_float,
        ct_.c_float,
        ct_.c_float,
        ct_.c_float,
        ct_.c_float,
    )
    _ComputeEnhancementInC.restype = None
except Exception as exc:
    wn_.warn(
        f"{_extension_path}: Error when loading extension (see exception below)\n"
        f"{exc}\n"
        f"=> Falling back to Python implementation",
        RuntimeWarning,
    )
    _ComputeEnhancementInC = None


array_t = np_.ndarray
h_matrices_fct_t = Callable[[array_t, array_t], None]
enhancement_fct_t = Callable[
    [
        array_t,
        float,
        h_matrices_fct_t,
        float,
        float,
        float,
        bool,
        Optional[shared_array_t],
    ],
    Optional[array_t],
]


_hessian_matrices = None


def FrangiEnhancement(
    img: array_t,
    scale_range: Tuple[float, float],
    scale_step: float = 2.0,
    alpha: float = 0.5,
    beta: float = 0.5,
    frangi_c: float = 500.0,
    bright_on_dark: bool = True,
    in_parallel: bool = False,
    method: str = "python",  # python, itk, c
    differentiation_mode: str = "direct",  # direct, indirect
) -> Tuple[array_t, array_t]:
    #
    global _hessian_matrices

    if (method == "c") and (_ComputeEnhancementInC is None):
        method = "python"
        in_parallel = True
        differentiation_mode = "direct"

    if method == "c":
        img = img.astype(np_.float32, order="F", copy=False)
        enhanced = np_.empty_like(img)
        scale_map = np_.empty_like(img)
        height, width, depth = img.shape

        _ComputeEnhancementInC(
            img.ctypes.data,
            width,
            height,
            depth,
            bright_on_dark,
            enhanced.ctypes.data,
            scale_map.ctypes.data,
            scale_range[0],
            scale_range[1],
            scale_step,
            alpha,
            beta,
            frangi_c,
        )
    #
    else:
        img = img.astype(np_.float32, copy=False)
        scales = np_.linspace(
            scale_range[0],
            scale_range[1],
            num=int(np_.around((scale_range[1] - scale_range[0]) / scale_step)) + 1,
        )

        if in_parallel and (pl_.get_start_method(allow_none=False) == "fork"):
            # Do not share Hessian matrices storage across scales
            _hessian_matrices = None
            Enhancement_fct = _EnhancementWParallelScales
        else:
            # Do share Hessian matrices storage across scales
            _hessian_matrices = np_.empty((*img.shape, 3, 3), dtype=np_.float32)
            Enhancement_fct = _EnhancementWSequentialScales

        if method == "python":
            EnhancementOneScale_fct = _EnhancementOneScaleWPython
        elif method == "itk":
            # EnhancementOneScale_fct = _EnhancementOneScaleWITK
            raise ValueError(f"{method}: Computation method currently disabled")
        else:
            raise ValueError(f"{method}: Invalid computation method")
        if differentiation_mode == "direct":
            HessianMatrices_fct = _ComputeHessianMatricesDirectly
        elif differentiation_mode == "indirect":
            HessianMatrices_fct = _ComputeHessianMatricesBasedOnFirstOrder
        else:
            raise ValueError(f"{method}: Invalid differentiation mode")

        enhanced, scale_map = Enhancement_fct(
            img,
            scales,
            HessianMatrices_fct,
            alpha=alpha,
            beta=beta,
            frangi_c=frangi_c,
            bright_on_dark=bright_on_dark,
            method_fct=EnhancementOneScale_fct,
        )

        _hessian_matrices = None

    return enhanced, scale_map


def _EnhancementWSequentialScales(
    img: array_t,
    scales: array_t,
    HessianMatrices_fct: h_matrices_fct_t,
    alpha: float = 0.5,
    beta: float = 0.5,
    frangi_c: float = 500.0,
    bright_on_dark: bool = True,
    method_fct: enhancement_fct_t = None,
) -> Tuple[array_t, array_t]:
    #
    enhanced = None
    scale_map = None

    # local_enhanced = fl_.frangi(
    #     img,
    #     sigmas=scales,
    #     alpha=alpha,
    #     beta=beta,
    #     gamma=frangi_c,
    #     black_ridges=not bright_on_dark,
    # )

    for s_idx, scale in enumerate(scales):
        local_enhanced = method_fct(
            img,
            scale,
            HessianMatrices_fct,
            alpha=alpha,
            beta=beta,
            frangi_c=frangi_c,
            bright_on_dark=bright_on_dark,
        )

        # print(f"scale={scale}")

        if s_idx > 0:
            larger_map = local_enhanced > enhanced
            enhanced[larger_map] = local_enhanced[larger_map]
            scale_map[larger_map] = scale
        else:
            enhanced = local_enhanced
            scale_map = np_.full(img.shape, scale, dtype=np_.float32)

    return enhanced, scale_map


def _EnhancementWParallelScales(
    img: array_t,
    scales: array_t,
    HessianMatrices_fct: h_matrices_fct_t,
    alpha: float = 0.5,
    beta: float = 0.5,
    frangi_c: float = 500.0,
    bright_on_dark: bool = True,
    method_fct: enhancement_fct_t = None,
) -> Tuple[array_t, array_t]:
    #
    fixed_args = {
        "alpha": alpha,
        "beta": beta,
        "frangi_c": frangi_c,
        "bright_on_dark": bright_on_dark,
    }

    shared_enhanced_s = tuple(shared_array_t(ct_.c_float, img.size) for ___ in scales)
    processes = tuple(
        pl_.Process(
            target=method_fct,
            args=(img, scale, HessianMatrices_fct),
            kwargs={**fixed_args, "shared_enhanced": shared_enhanced,},
        )
        for scale, shared_enhanced in zip(scales, shared_enhanced_s)
    )

    for process in processes:
        process.start()
    for process in processes:
        process.join()

    enhanced = np_.array(shared_enhanced_s[0])
    scale_map = np_.full(img.size, scales[0], dtype=np_.float32)
    for s_idx, shared_enhanced in enumerate(shared_enhanced_s[1:], start=1):
        local_enhanced = np_.array(shared_enhanced)
        larger_map = local_enhanced > enhanced
        enhanced[larger_map] = local_enhanced[larger_map]
        scale_map[larger_map] = scales[s_idx]

    enhanced = enhanced.astype(np_.float32, copy=False).reshape(img.shape)
    scale_map = scale_map.astype(np_.float32, copy=False).reshape(img.shape)

    return enhanced, scale_map


def _EnhancementOneScaleWPython(
    img: array_t,
    scale: float,
    HessianMatrices_fct: h_matrices_fct_t,
    alpha: float = 0.5,
    beta: float = 0.5,
    frangi_c: float = 500.0,
    bright_on_dark: bool = True,
    shared_enhanced: shared_array_t = None,
) -> Optional[array_t]:
    #
    print(f"    scale={scale}")

    sorted_eigenvalues = _EigenValuesOfHessianMatrices(img, scale, HessianMatrices_fct)
    abs_eig_val = np_.fabs(sorted_eigenvalues)

    # Absolute plate, blob, and background maps
    plate_map = _DivisionWithNaN(abs_eig_val[..., 1], abs_eig_val[..., 2])
    blob_map = _DivisionWithNaN(
        abs_eig_val[..., 0], np_.sqrt(np_.prod(abs_eig_val[..., 1:], axis=-1))
    )
    bckgnd = np_.sqrt(np_.sum(abs_eig_val ** 2, axis=-1))

    # Normalized plate, blob, and background maps
    plate_factor = -0.5 / alpha ** 2
    blob_factor = -0.5 / beta ** 2
    bckgnd_factor = -0.5 / frangi_c ** 2
    plate_map = 1.0 - np_.exp(plate_factor * plate_map ** 2)
    blob_map = np_.exp(blob_factor * blob_map ** 2)
    bckgnd = 1.0 - np_.exp(bckgnd_factor * bckgnd ** 2)

    local_enhanced = plate_map * blob_map * bckgnd
    invalid_condition = np_.logical_not(np_.isfinite(local_enhanced))
    if bright_on_dark:
        eig_condition = np_.any(sorted_eigenvalues[..., 1:] > 0.0, axis=-1)
    else:
        eig_condition = np_.any(sorted_eigenvalues[..., 1:] < 0.0, axis=-1)
    local_enhanced[np_.logical_or(eig_condition, invalid_condition)] = 0.0

    if shared_enhanced is None:
        return local_enhanced
    else:
        shared_enhanced[:] = local_enhanced.flatten()[:]


# def _EnhancementOneScaleWITK(
#     img: array_t,
#     scale: float,
#     _: h_matrices_fct_t,
#     alpha: float = 0.5,
#     beta: float = 2.0,
#     frangi_c: float = 500.0,
#     bright_on_dark: bool = True,
#     shared_enhanced: shared_array_t = None,
# ) -> Optional[array_t]:
#     #
#     print(f"    scale={scale}")
#
#     if alpha >= beta:
#         raise ValueError(
#             f"Parameter alpha (passed {alpha}) must be strictly smaller than beta (passed {beta})"
#         )
#     if not bright_on_dark:
#         raise ValueError(
#             "ITK version applies only to bright objects on dark background\n"
#             "For the dark-on-bright context, please invert intensity first"
#         )
#
#     img = ik_.image_view_from_array(img)
#     hessian_image = ik_.hessian_recursive_gaussian_image_filter(img, sigma=scale)
#
#     vesselness_filter = ik_.Hessian3DToVesselnessMeasureImageFilter[
#         ik_.ctype("float")
#     ].New()
#     vesselness_filter.SetInput(hessian_image)
#     vesselness_filter.SetAlpha1(alpha)
#     vesselness_filter.SetAlpha2(beta)
#
#     # Do not view-version here since vesselness_filter is local
#     local_enhanced = ik_.array_from_image(vesselness_filter)
#
#     if shared_enhanced is None:
#         return local_enhanced
#     else:
#         shared_enhanced[:] = local_enhanced.flatten()[:]


def _ComputeHessianMatricesBasedOnFirstOrder(
    hessian_matrices: array_t, img: array_t
) -> None:
    #
    Dx, Dy, Dz = np_.gradient(img)

    (
        hessian_matrices[..., 0, 0],
        hessian_matrices[..., 0, 1],
        hessian_matrices[..., 0, 2],
    ) = np_.gradient(Dx)

    hessian_matrices[..., 1, 1], hessian_matrices[..., 1, 2] = np_.gradient(
        Dy, axis=(1, 2)
    )

    hessian_matrices[..., 2, 2] = np_.gradient(Dz, axis=2)


def _ComputeHessianMatricesDirectly(hessian_matrices: array_t, img: array_t) -> None:
    #
    for axis in range(3):
        hessian_matrices[..., axis, axis] = (
            np_.roll(img, 1, axis=axis) - 2.0 * img + np_.roll(img, -1, axis=axis)
        )

    for axes in ((0, 1), (0, 2), (1, 2)):
        hessian_matrices[..., axes[0], axes[1]] = 0.25 * (
            np_.roll(img, 1, axis=axes)
            + np_.roll(img, -1, axis=axes)
            - np_.roll(np_.roll(img, 1, axis=axes[0]), -1, axis=axes[1])
            - np_.roll(np_.roll(img, -1, axis=axes[0]), 1, axis=axes[1])
        )


def _EigenValuesOfHessianMatrices(
    img: array_t, scale: float, HessianMatrices_fct: h_matrices_fct_t
) -> array_t:
    #
    global _hessian_matrices

    if scale > 0.0:
        # img = scale ** 2 * im_.uniform_filter(img, size=2 * (int(np_.around(2.0*scale)) // 2) + 1)
        img = scale ** 2 * im_.gaussian_filter(img, scale)
    else:
        img = scale ** 2 * img

    if _hessian_matrices is None:
        # Private Hessian matrices storage (parallel version)
        hessian_matrices = np_.empty((*img.shape, 3, 3), dtype=np_.float32)
    else:
        # Shared Hessian matrices storage (sequential version)
        hessian_matrices = _hessian_matrices
    #
    HessianMatrices_fct(hessian_matrices, img)
    #
    hessian_matrices[..., 1, 0] = hessian_matrices[..., 0, 1]
    hessian_matrices[..., 2, 0] = hessian_matrices[..., 0, 2]
    hessian_matrices[..., 2, 1] = hessian_matrices[..., 1, 2]

    eigenvalues = np_.linalg.eigvalsh(hessian_matrices)

    # Sorted by abs value
    index = list(np_.ix_(*[np_.arange(size) for size in eigenvalues.shape]))
    index[-1] = np_.fabs(eigenvalues).argsort(axis=-1)
    sorted_eigenvalues = eigenvalues[tuple(index)]

    return sorted_eigenvalues


def _DivisionWithNaN(array_1: array_t, array_2: array_t) -> array_t:
    #
    null_map = array_2 == 0.0
    if null_map.any():
        denominator = array_2.copy()
        denominator[null_map] = np_.nan
    else:
        denominator = array_2

    return array_1 / denominator