Mentions légales du service

Skip to content
Snippets Groups Projects
frangi_3d.py 8.28 KiB
Newer Older
DEBREUVE Eric's avatar
DEBREUVE Eric committed
# Copyright CNRS/Inria/UNS
# Contributor(s): Eric Debreuve (since 2019)
#
# eric.debreuve@cnrs.fr
#
# This software is governed by the CeCILL  license under French law and
# abiding by the rules of distribution of free software.  You can  use,
# modify and/ or redistribute the software under the terms of the CeCILL
# license as circulated by CEA, CNRS and INRIA at the following URL
# "http://www.cecill.info".
#
# As a counterpart to the access to the source code and  rights to copy,
# modify and redistribute granted by the license, users are provided only
# with a limited warranty  and the software's author,  the holder of the
# economic rights,  and the successive licensors  have only  limited
# liability.
#
# In this respect, the user's attention is drawn to the risks associated
# with loading,  using,  modifying and/or developing or reproducing the
# software by the user in light of its specific status of free software,
# that may mean  that it is complicated to manipulate,  and  that  also
# therefore means  that it is reserved for developers  and  experienced
# professionals having in-depth computer knowledge. Users are therefore
# encouraged to load and test the software's suitability as regards their
# requirements in conditions enabling the security of their systems and/or
# data to be ensured and,  more generally, to use and operate it in the
# same conditions as regards security.
#
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL license and that you accept its terms.

DEBREUVE Eric's avatar
DEBREUVE Eric committed
from type import array_t

import multiprocessing as pl_
from ctypes import c_float as c_float_t
from multiprocessing.sharedctypes import Array as shared_array_t
from typing import Optional, Tuple
DEBREUVE Eric's avatar
DEBREUVE Eric committed

import numpy as np_
from scipy import ndimage as im_


_hessian_matrices = None


DEBREUVE Eric's avatar
DEBREUVE Eric committed
def FrangiEnhancement(
    img: array_t,
    scale_range: Tuple[float, float],
DEBREUVE Eric's avatar
DEBREUVE Eric committed
    scale_step: float = 2.0,
    alpha: float = 0.5,
    beta: float = 0.5,
    frangi_c: float = 500.0,
    bright_on_dark: bool = True,
    in_parallel: bool = False,
) -> Tuple[array_t, array_t]:
    #
    global _hessian_matrices
DEBREUVE Eric's avatar
DEBREUVE Eric committed

    img = img.astype(np_.float32, copy=False)
    scales = np_.linspace(
        scale_range[0],
        scale_range[1],
        num=np_.ceil((scale_range[1] - scale_range[0]) / scale_step) + 1,
    )

    if in_parallel and (pl_.get_start_method(allow_none=False) == "fork"):
        _hessian_matrices = None
        FrangiEnhancement_fct = _FrangiEnhancementParallel
    else:
        _hessian_matrices = np_.empty((*img.shape, 3, 3), dtype=np_.float32)
        FrangiEnhancement_fct = _FrangiEnhancementSequential

    enhanced, scale_map = FrangiEnhancement_fct(
        img,
        scales,
        alpha=alpha,
        beta=beta,
        frangi_c=frangi_c,
        bright_on_dark=bright_on_dark,
    )

    _hessian_matrices = None

    return enhanced, scale_map


def _FrangiEnhancementSequential(
    img: array_t,
    scales: array_t,
    alpha: float = 0.5,
    beta: float = 0.5,
    frangi_c: float = 500.0,
    bright_on_dark: bool = True,
) -> Tuple[array_t, array_t]:
    #
    enhanced = None
    scale_map = None

    for s_idx, scale in enumerate(scales):
        local_enhanced = _FrangiEnhancementOneScale(
            img,
            scale,
            alpha=alpha,
            beta=beta,
            frangi_c=frangi_c,
            bright_on_dark=bright_on_dark,
DEBREUVE Eric's avatar
DEBREUVE Eric committed
        )
DEBREUVE Eric's avatar
DEBREUVE Eric committed
            larger_map = local_enhanced > enhanced
            enhanced[larger_map] = local_enhanced[larger_map]
            scale_map[larger_map] = scale
        else:
            enhanced = local_enhanced
            scale_map = np_.full(img.shape, scale, dtype=np_.float32)

    return enhanced, scale_map


def _FrangiEnhancementParallel(
    img: array_t,
    scales: array_t,
    alpha: float = 0.5,
    beta: float = 0.5,
    frangi_c: float = 500.0,
    bright_on_dark: bool = True,
) -> Tuple[array_t, array_t]:
    #
    fixed_args = {
        "alpha": alpha,
        "beta": beta,
        "frangi_c": frangi_c,
        "bright_on_dark": bright_on_dark,
    }

    shared_enhanced_s = tuple(shared_array_t(c_float_t, img.size) for ___ in scales)
    processes = tuple(
        pl_.Process(
            target=_FrangiEnhancementOneScale,
            args=(img, scale),
            kwargs={**fixed_args, "shared_enhanced": shared_enhanced,},
        )
        for scale, shared_enhanced in zip(scales, shared_enhanced_s)
    )

    for process in processes:
        process.start()
    for process in processes:
        process.join()

    enhanced = np_.array(shared_enhanced_s[0])
    scale_map = np_.full(img.size, scales[0], dtype=np_.float32)
    for s_idx, shared_enhanced in enumerate(shared_enhanced_s[1:], start=1):
        local_enhanced = np_.array(shared_enhanced)
        larger_map = local_enhanced > enhanced
        enhanced[larger_map] = local_enhanced[larger_map]
        scale_map[larger_map] = scales[s_idx]

    enhanced = enhanced.astype(np_.float32, copy=False).reshape(img.shape)
    scale_map = scale_map.astype(np_.float32, copy=False).reshape(img.shape)
DEBREUVE Eric's avatar
DEBREUVE Eric committed

    return enhanced, scale_map


def _FrangiEnhancementOneScale(
DEBREUVE Eric's avatar
DEBREUVE Eric committed
    img: array_t,
    scale: float,
    alpha: float = 0.5,
    beta: float = 0.5,
    frangi_c: float = 500.0,
    bright_on_dark: bool = True,
    shared_enhanced: shared_array_t = None,
) -> Optional[array_t]:
DEBREUVE Eric's avatar
DEBREUVE Eric committed
    #
    print(f"    scale={scale}")
DEBREUVE Eric's avatar
DEBREUVE Eric committed

    sorted_eigenvalues = _EigenValuesOfHessianMatrix(img, scale)
DEBREUVE Eric's avatar
DEBREUVE Eric committed

    # Absolute plate, blob, and background maps
    abs_eig_val = np_.fabs(sorted_eigenvalues)
    plate_map = _CarefullDivision(abs_eig_val[...,1], abs_eig_val[...,2])
    blob_map = _CarefullDivision(abs_eig_val[...,0], np_.sqrt(np_.prod(abs_eig_val[...,1:], axis = -1)))
    bckgnd = np_.sqrt(np_.sum(abs_eig_val ** 2, axis = -1))
DEBREUVE Eric's avatar
DEBREUVE Eric committed

    # Normalized plate, blob, and background maps
    plate_factor = -2.0 * alpha ** 2
    blob_factor = -2.0 * beta ** 2
    bckgnd_factor = -2.0 * frangi_c ** 2
    plate_map = 1.0 - np_.exp(plate_map ** 2 / plate_factor)
    blob_map = np_.exp(blob_map ** 2 / blob_factor)
    bckgnd = 1.0 - np_.exp(bckgnd ** 2 / bckgnd_factor)

    local_enhanced = plate_map * blob_map * bckgnd
    if bright_on_dark:
        eig_condition = np_.any(sorted_eigenvalues[...,1:] > 0.0, axis=-1)
DEBREUVE Eric's avatar
DEBREUVE Eric committed
    else:
        eig_condition = np_.any(sorted_eigenvalues[...,1:] < 0.0, axis=-1)
    invalid_condition = np_.logical_not(np_.isfinite(local_enhanced))
    local_enhanced[np_.logical_or(eig_condition, invalid_condition)] = 0.0
DEBREUVE Eric's avatar
DEBREUVE Eric committed

    if shared_enhanced is None:
        return local_enhanced
    else:
        shared_enhanced[:] = local_enhanced.flatten()[:]


def _EigenValuesOfHessianMatrix(img: array_t, scale: float) -> array_t:
DEBREUVE Eric's avatar
DEBREUVE Eric committed
    #
    global _hessian_matrices

DEBREUVE Eric's avatar
DEBREUVE Eric committed
    if scale > 0.0:
        img = scale ** 2 * im_.gaussian_filter(img, scale)
    else:
        img = scale ** 2 * img

    if _hessian_matrices is None:
        hessian_matrices = np_.empty((*img.shape, 3, 3), dtype=np_.float32)
    else:
        hessian_matrices = _hessian_matrices
DEBREUVE Eric's avatar
DEBREUVE Eric committed
    Dx, Dy, Dz = np_.gradient(img)
    (
        hessian_matrices[..., 0, 0],
        hessian_matrices[..., 0, 1],
        hessian_matrices[..., 0, 2],
    ) = np_.gradient(Dx)
    hessian_matrices[..., 1, 1], hessian_matrices[..., 1, 2] = np_.gradient(
        Dy, axis=(1, 2)
    )
    hessian_matrices[..., 2, 2] = np_.gradient(Dz, axis=2)
DEBREUVE Eric's avatar
DEBREUVE Eric committed

    hessian_matrices[..., 1, 0] = hessian_matrices[..., 0, 1]
    hessian_matrices[..., 2, 0] = hessian_matrices[..., 0, 2]
    hessian_matrices[..., 2, 1] = hessian_matrices[..., 1, 2]
DEBREUVE Eric's avatar
DEBREUVE Eric committed

    eigenvalues = np_.linalg.eigvalsh(hessian_matrices)
DEBREUVE Eric's avatar
DEBREUVE Eric committed

    # Sorted by abs value
    index = list(np_.ix_(*[np_.arange(size) for size in eigenvalues.shape]))
    index[-1] = np_.fabs(eigenvalues).argsort(axis=-1)
    sorted_eigenvalues = eigenvalues[tuple(index)]
DEBREUVE Eric's avatar
DEBREUVE Eric committed

    return sorted_eigenvalues
def _CarefullDivision(array_1: array_t, array_2: array_t) -> array_t:
DEBREUVE Eric's avatar
DEBREUVE Eric committed
    #
    null_map = array_2 == 0.0
    if null_map.any():
        denominator = array_2.copy()
DEBREUVE Eric's avatar
DEBREUVE Eric committed
        denominator[null_map] = 1.0e-10
    else:
        denominator = array_2

    return array_1 / denominator