Mentions légales du service

Skip to content
Snippets Groups Projects
frangi_3d.py 8.5 KiB
Newer Older
DEBREUVE Eric's avatar
DEBREUVE Eric committed
# Copyright CNRS/Inria/UNS
# Contributor(s): Eric Debreuve (since 2019)
#
# eric.debreuve@cnrs.fr
#
# This software is governed by the CeCILL  license under French law and
# abiding by the rules of distribution of free software.  You can  use,
# modify and/ or redistribute the software under the terms of the CeCILL
# license as circulated by CEA, CNRS and INRIA at the following URL
# "http://www.cecill.info".
#
# As a counterpart to the access to the source code and  rights to copy,
# modify and redistribute granted by the license, users are provided only
# with a limited warranty  and the software's author,  the holder of the
# economic rights,  and the successive licensors  have only  limited
# liability.
#
# In this respect, the user's attention is drawn to the risks associated
# with loading,  using,  modifying and/or developing or reproducing the
# software by the user in light of its specific status of free software,
# that may mean  that it is complicated to manipulate,  and  that  also
# therefore means  that it is reserved for developers  and  experienced
# professionals having in-depth computer knowledge. Users are therefore
# encouraged to load and test the software's suitability as regards their
# requirements in conditions enabling the security of their systems and/or
# data to be ensured and,  more generally, to use and operate it in the
# same conditions as regards security.
#
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL license and that you accept its terms.

DEBREUVE Eric's avatar
DEBREUVE Eric committed
from type import array_t

import multiprocessing as pl_
DEBREUVE Eric's avatar
DEBREUVE Eric committed
# from ctypes import c_bool as c_bool_t
from ctypes import c_float as c_float_t
from multiprocessing.sharedctypes import Array as shared_array_t
DEBREUVE Eric's avatar
DEBREUVE Eric committed
# from multiprocessing.sharedctypes import Value as shared_value_t
from typing import List, Tuple

import numpy as np_
from scipy import ndimage as im_


def FrangiEnhancement(
    img: array_t,
    scale_range: Tuple[float, float] = (1.0e-10, 10.0),
    scale_step: float = 2.0,
    alpha: float = 0.5,
    beta: float = 0.5,
    frangi_c: float = 500.0,
    bright_on_dark: bool = True,
    in_parallel: bool = False,
) -> Tuple[array_t, array_t]:
    #
    enhanced = None
    scale_map = None

    img = img.astype(np_.float32, copy=False)
    scales = np_.linspace(
        scale_range[0],
        scale_range[1],
        num=np_.ceil((scale_range[1] - scale_range[0]) / scale_step) + 1,
    )

    if in_parallel:
        # initializer = np_.zeros_like(img, dtype = c_float_t)
        # shared_not_init = shared_value_t(c_bool_t, True)
        # shared_enhanced = shared_array_t(c_float_t, img.size)
        # shared_scale_map = shared_array_t(c_float_t, img.size)
        # kwargs = {
        #     "alpha": alpha,
        #     "beta": beta,
        #     "frangi_c": frangi_c,
        #     "bright_on_dark": bright_on_dark,
        #     "not_init": shared_not_init,
        #     "enhanced": shared_enhanced,
        #     "scale_map": shared_scale_map,
        # }
        shared_enhanced_s = tuple(shared_array_t(c_float_t, img.size) for ___ in scales)
        processes = tuple(
            pl_.Process(
                target=FrangiEnhancementOneScale,
                args=(img, scale),
                kwargs={
                    "alpha": alpha,
                    "beta": beta,
                    "frangi_c": frangi_c,
                    "bright_on_dark": bright_on_dark,
                    "shared_enhanced": shared_enhanced,
                },
            )
            for scale, shared_enhanced in zip(scales, shared_enhanced_s)
        )
        for process in processes:
            process.start()
        for process in processes:
            process.join()
        # enhanced = np_.array(shared_enhanced).astype(np_.float32, copy = False).reshape(img.shape)
        # scale_map = np_.array(shared_scale_map).astype(np_.float32, copy = False).reshape(img.shape)
        enhanced = np_.array(shared_enhanced_s[0])
        scale_map = np_.full(img.size, scales[0], dtype=np_.float32)
        for s_idx, shared_enhanced in enumerate(shared_enhanced_s[1:], start=1):
            local_enhanced = np_.array(shared_enhanced)
            larger_map = local_enhanced > enhanced
            enhanced[larger_map] = local_enhanced[larger_map]
            scale_map[larger_map] = scales[s_idx]
        enhanced = enhanced.astype(np_.float32, copy=False).reshape(img.shape)
        scale_map = scale_map.astype(np_.float32, copy=False).reshape(img.shape)
    else:
        for s_idx, scale in enumerate(scales):
            local_enhanced = FrangiEnhancementOneScale(
                img,
                scale,
                alpha=alpha,
                beta=beta,
                frangi_c=frangi_c,
                bright_on_dark=bright_on_dark,
            )

            if s_idx > 0:
                larger_map = local_enhanced > enhanced
                enhanced[larger_map] = local_enhanced[larger_map]
                scale_map[larger_map] = scale
            else:
                enhanced = local_enhanced
                scale_map = np_.full(img.shape, scale, dtype=np_.float32)

    return enhanced, scale_map


def FrangiEnhancementOneScale(
    img: array_t,
    scale: float,
    alpha: float = 0.5,
    beta: float = 0.5,
    frangi_c: float = 500.0,
    bright_on_dark: bool = True,
    shared_enhanced=None,
    # not_init = None,
    # enhanced =None,
    # scale_map =None,
) -> array_t:
    #
    print(f"scale={scale}")

    Dxx, Dxy, Dxz, Dyy, Dyz, Dzz = HessianMatrix(img, scale)
    eig_val_1, eig_val_2, eig_val_3 = EigenValues(Dxx, Dxy, Dxz, Dyy, Dyz, Dzz)

    # Absolute plate, blob, and background maps
    abs_eig_val_1 = np_.fabs(eig_val_1)
    abs_eig_val_2 = np_.fabs(eig_val_2)
    abs_eig_val_3 = np_.fabs(eig_val_3)
    plate_map = CarefullDivision(abs_eig_val_2, abs_eig_val_3)
    blob_map = CarefullDivision(abs_eig_val_1, np_.sqrt(abs_eig_val_2 * abs_eig_val_3))
    bckgnd = np_.sqrt(abs_eig_val_1 ** 2 + abs_eig_val_2 ** 2 + abs_eig_val_3 ** 2)

    # Normalized plate, blob, and background maps
    plate_factor = -2.0 * alpha ** 2
    blob_factor = -2.0 * beta ** 2
    bckgnd_factor = -2.0 * frangi_c ** 2
    plate_map = 1.0 - np_.exp(plate_map ** 2 / plate_factor)
    blob_map = np_.exp(blob_map ** 2 / blob_factor)
    bckgnd = 1.0 - np_.exp(bckgnd ** 2 / bckgnd_factor)

    local_enhanced = plate_map * blob_map * bckgnd
    if bright_on_dark:
        local_enhanced[eig_val_2 > 0.0] = 0.0
        local_enhanced[eig_val_3 > 0.0] = 0.0
    else:
        local_enhanced[eig_val_2 < 0.0] = 0.0
        local_enhanced[eig_val_3 < 0.0] = 0.0
    local_enhanced[
        np_.logical_or(np_.isnan(local_enhanced), np_.isinf(local_enhanced))
    ] = 0.0

    # if enhanced is None:
    if shared_enhanced is None:
        return local_enhanced
    else:
        shared_enhanced[:] = local_enhanced.flatten()[:]


def HessianMatrix(
    img: array_t, scale: float
) -> Tuple[array_t, array_t, array_t, array_t, array_t, array_t]:
    #
    if scale > 0.0:
        img = scale ** 2 * im_.gaussian_filter(img, scale)
    else:
        img = scale ** 2 * img

    Dx, Dy, Dz = np_.gradient(img)

    Dxx, Dxy, Dxz = np_.gradient(Dx, axis=(0, 1, 2))
    Dyy, Dyz = np_.gradient(Dy, axis=(1, 2))
    Dzz = np_.gradient(Dz, axis=2)

    return Dxx, Dxy, Dxz, Dyy, Dyz, Dzz


def EigenValues(
    Dxx: array_t, Dxy: array_t, Dxz: array_t, Dyy: array_t, Dyz: array_t, Dzz: array_t
) -> List[array_t]:
    #
    hessians = np_.stack((Dxx, Dxy, Dxz, Dxy, Dyy, Dyz, Dxz, Dyz, Dzz), axis=-1)
    hessians = np_.reshape(hessians, (*Dxx.shape, 3, 3))

    eigenvalues = np_.linalg.eigvalsh(hessians)
    sorted_eigenvalues = SortedByAbsValues(eigenvalues, axis=-1)

    return [
        np_.squeeze(eigenvalue, axis=-1)
        for eigenvalue in np_.split(
            sorted_eigenvalues, sorted_eigenvalues.shape[-1], axis=-1
        )
    ]


def SortedByAbsValues(array: array_t, axis: int = 0) -> array_t:
    #
    index = list(np_.ix_(*[np_.arange(size) for size in array.shape]))
    index[axis] = np_.fabs(array).argsort(axis=axis)

    return array[tuple(index)]


def CarefullDivision(array_1: array_t, array_2: array_t) -> array_t:
    #
    null_map = array_2 == 0.0
    if null_map.any():
        denominator = np_.copy(array_2)
        denominator[null_map] = 1.0e-10
    else:
        denominator = array_2

    return array_1 / denominator