Mentions légales du service

Skip to content
Snippets Groups Projects
dijkstra.py 12.7 KiB
Newer Older
DEBREUVE Eric's avatar
DEBREUVE Eric committed
# Copyright CNRS/Inria/UNS
# Contributor(s): Eric Debreuve (since 2019)
#
# eric.debreuve@cnrs.fr
#
# This software is governed by the CeCILL  license under French law and
# abiding by the rules of distribution of free software.  You can  use,
# modify and/ or redistribute the software under the terms of the CeCILL
# license as circulated by CEA, CNRS and INRIA at the following URL
# "http://www.cecill.info".
#
# As a counterpart to the access to the source code and  rights to copy,
# modify and redistribute granted by the license, users are provided only
# with a limited warranty  and the software's author,  the holder of the
# economic rights,  and the successive licensors  have only  limited
# liability.
#
# In this respect, the user's attention is drawn to the risks associated
# with loading,  using,  modifying and/or developing or reproducing the
# software by the user in light of its specific status of free software,
# that may mean  that it is complicated to manipulate,  and  that  also
# therefore means  that it is reserved for developers  and  experienced
# professionals having in-depth computer knowledge. Users are therefore
# encouraged to load and test the software's suitability as regards their
# requirements in conditions enabling the security of their systems and/or
# data to be ensured and,  more generally, to use and operate it in the
# same conditions as regards security.
#
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL license and that you accept its terms.

import numpy as nmpy
array_t = nmpy.ndarray
DEBREUVE Eric's avatar
DEBREUVE Eric committed


def DijkstraCosts(image: array_t, som_map: array_t, ext_map: array_t) -> array_t:
    """
    Gives the value inf if the voxel belongs to a soma or an extension.
    Otherwise, gives the value 1 / (voxel intensity + 1).

    The closer to cost 1, the less probable a connexion will take this path.
    The closer to cost 0.5, the more probable.
    """
    dijkstra_costs = 1.0 / (image + 1.0)
    dijkstra_costs[nmpy.logical_or(som_map > 0, ext_map > 0)] = nmpy.inf
DEBREUVE Eric's avatar
DEBREUVE Eric committed

    return dijkstra_costs


# def DijkstraShortestPath(
#     costs: array_t,
#     origin: site_h,
#     target: Union[site_h, Sequence[site_h]],
#     limit_to_sphere: bool = True,
#     constrain_direction: bool = True,
#     return_all: bool = False,
# ) -> Union[path_nfo_h, Tuple[path_nfo_h, ...]]:
#     """
#     Perform the Dijkstra shortest weighted path algorithm
#     """
#     #
#     costs, targets, valid_directions, dir_lengths = _DijkstraShortestPathPrologue(
#         costs,
#         origin,
#         target,
#         limit_to_sphere,
#         constrain_direction,
#     )
#
#     nearest_sites = nearest_site_queue_t()
#     nearest_sites.Insert(0, origin)
#     min_distance_to = {origin: 0.0}
#     predecessor_of = {}
#     visited_sites = set()
#
#     while True:
#         site_nfo = nearest_sites.Pop()
#         if site_nfo is None:
#             # Empty queue: full graph traversal did not allow to reach all targets
#             # This case is correctly dealt with in the following
#             break
#
#         distance, site = site_nfo
#         if site in targets:
#             targets.remove(site)
#             if targets.__len__() > 0:
#                 continue
#             else:
#                 break
#
#         if site not in visited_sites:
#             visited_sites.add(site)
#             for successor, edge_length in _OutgoingEdges(
#                 site, valid_directions, dir_lengths, costs
#             ):
#                 successor = tuple(successor)
#                 next_distance = distance + edge_length
#                 min_distance = min_distance_to.get(successor)
#                 if (min_distance is None) or (next_distance < min_distance):
#                     min_distance_to[successor] = next_distance
#                     predecessor_of[successor] = site
#                     nearest_sites.Insert(next_distance, successor)
#
#     if isinstance(target[0], Sequence):
#         targets = tuple(target)
#     else:
#         targets = (target,)
#
#     if return_all:
#         all_paths = []
#         for one_target in targets:
#             distance = min_distance_to.get(one_target)
#             if distance is None:
#                 all_paths.append(((), None))
#             else:
#                 path = []
#                 site = one_target
#                 while site is not None:
#                     path.append(site)
#                     site = predecessor_of.get(site)
#                 all_paths.append((tuple(reversed(path)), distance))
#
#         return tuple(all_paths)
#     else:
#         min_distance = nmpy.inf
#         closest_target = None
#         for one_target in targets:
#             distance = min_distance_to.get(one_target)
#             if (distance is not None) and (distance < min_distance):
#                 min_distance = distance
#                 closest_target = one_target
#
#         path = []
#         if closest_target is None:
#             distance = None
#         else:
#             distance = min_distance_to.get(closest_target)
#             site = closest_target
#             while site is not None:
#                 path.append(site)
#                 site = predecessor_of.get(site)
#
#         return tuple(reversed(path)), distance
#
#
# def _DijkstraShortestPathPrologue(
#     costs: array_t,
#     origin: site_h,
#     target: Union[site_h, Sequence[site_h]],
#     limit_to_sphere: bool,
#     constrain_direction: bool,
# ) -> Tuple[array_t, List[site_h], array_t, array_t]:
#     #
#     if isinstance(target[0], Sequence):
#         targets = list(target)
#     else:
#         targets = [target]
#
#     if limit_to_sphere:
#         costs = _SphereLimitedCosts(costs, origin, targets)
#
#     if costs.ndim == 2:
#         if constrain_direction:
#             valid_directions, dir_lengths = _FilteredDirections(
#                 DIRECTIONS_2D, LENGTHS_2D, origin, targets
#             )
#         else:
#             valid_directions, dir_lengths = DIRECTIONS_2D, LENGTHS_2D
#     elif costs.ndim == 3:
#         if constrain_direction:
#             valid_directions, dir_lengths = _FilteredDirections(
#                 DIRECTIONS_3D, LENGTHS_3D, origin, targets
#             )
#         else:
#             valid_directions, dir_lengths = DIRECTIONS_3D, LENGTHS_3D
#     else:
#         raise ValueError(f"Cost matrix has {costs.ndim} dimension(s); Expecting 2 or 3")
#
#     return costs, targets, valid_directions, dir_lengths
#
#
# def _OutgoingEdges(
#     site: site_h, valid_directions: array_t, dir_lengths: array_t, costs: array_t
# ) -> Iterator:
#     #
#     neighbors = valid_directions + nmpy.array(site, dtype=valid_directions.dtype)
#     n_dims = site.__len__()
#
#     inside = nmpy.all(neighbors >= 0, axis=1)
#     for c_idx in range(n_dims):
#         nmpy.logical_and(
#             inside, neighbors[:, c_idx] <= costs.shape[c_idx] - 1, out=inside
#         )
#     neighbors = neighbors[inside, :]
#     dir_lengths = dir_lengths[inside]
#
#     # For any n_dims: neighbors_costs = costs[tuple(zip(*neighbors.tolist()))]
#     if n_dims == 2:
#         neighbors_costs = costs[(neighbors[:, 0], neighbors[:, 1])]
#     else:
#         neighbors_costs = costs[(neighbors[:, 0], neighbors[:, 1], neighbors[:, 2])]
#
#     valid_sites = nmpy.isfinite(neighbors_costs)  # Excludes inf and nan
#     neighbors = neighbors[valid_sites, :]
#     neighbors_costs = neighbors_costs[valid_sites] * dir_lengths[valid_sites]
#
#     return zip(neighbors.tolist(), neighbors_costs)
#
#
# DIRECTIONS_2D: Final = nmpy.array(
#     tuple((i, j) for i in (-1, 0, 1) for j in (-1, 0, 1) if i != 0 or j != 0),
#     dtype=nmpy.int16,
# )
#
# DIRECTIONS_3D: Final = nmpy.array(
#     tuple(
#         (i, j, k)
#         for i in (-1, 0, 1)
#         for j in (-1, 0, 1)
#         for k in (-1, 0, 1)
#         if i != 0 or j != 0 or k != 0
#     ),
#     dtype=nmpy.int16,
# )
#
# LENGTHS_2D: Final = nmpy.linalg.norm(DIRECTIONS_2D, axis=1)
# LENGTHS_3D: Final = nmpy.linalg.norm(DIRECTIONS_3D, axis=1)
#
#
# def _FilteredDirections(
#     all_directions: array_t,
#     all_lengths: array_t,
#     origin: site_h,
#     targets: Sequence[site_h],
# ) -> Tuple[array_t, array_t]:
#     #
#     n_dims = origin.__len__()
#     n_targets = targets.__len__()
#
#     straight_lines = nmpy.empty((n_dims, n_targets), dtype=nmpy.float64)
#     for p_idx, target in enumerate(targets):
#         for c_idx in range(n_dims):
#             straight_lines[c_idx, p_idx] = target[c_idx] - origin[c_idx]
#
#     inner_prods = all_directions @ straight_lines
#     valid_directions = nmpy.any(inner_prods >= 0, axis=1)
#
#     return all_directions[valid_directions, :], all_lengths[valid_directions]
#
#
# def _SphereLimitedCosts(
#     costs: array_t, origin: site_h, targets: Sequence[site_h]
# ) -> array_t:
#     """
#     Note: Un-numba-ble because of slice
#     """
#     valid_sites = nmpy.zeros_like(costs, dtype=nmpy.bool_)
#     center_map = nmpy.ones_like(costs, dtype=nmpy.uint8)
#     distances = nmpy.empty_like(costs, dtype=nmpy.float64)
#
#     targets_as_array = nmpy.array(targets)
#     centers = nmpy.around(0.5 * targets_as_array.__add__(origin)).astype(
#         nmpy.int64, copy=False
#     )
#     centers = tuple(tuple(center) for center in centers)
#
#     n_dims = origin.__len__()
#
#     for t_idx, target in enumerate(targets):
#         sq_radius = max(
#             (nmpy.subtract(centers[t_idx], origin) ** 2).sum(),
#             (nmpy.subtract(centers[t_idx], target) ** 2).sum(),
#         )
#         radius = nmpy.sqrt(sq_radius).astype(nmpy.int64, copy=False) + 1
#         # Note the +1 in slices ends to account for right-open ranginess
#         bbox = tuple(
#             slice(
#                 max(centers[t_idx][c_idx] - radius, 0),
#                 min(centers[t_idx][c_idx] + radius + 1, distances.shape[c_idx]),
#             )
#             for c_idx in range(n_dims)
#         )
#         if t_idx > 0:
#             center_map[centers[t_idx - 1]] = 1
#         center_map[centers[t_idx]] = 0
#         mp_.distance_transform_edt(center_map[bbox], distances=distances[bbox])
#         distance_thr = max(distances[origin], distances[target])
#
#         valid_sites[bbox][distances[bbox] <= distance_thr] = True
#         # # SSA
#         # if n_dims == 2:
#         #     valid_coords = dw_.circle(
#         #         *centers[t_idx], radius, shape=valid_sites.shape
#         #     )
#         # else:
#         #     local_shape = tuple(slc.stop - slc.start for slc in bbox)
#         #     local_center = tuple(centers[t_idx][c_idx] - slc.start for c_idx, slc in enumerate(bbox))
#         #     valid_coords = __SphereCoords__(*local_center, radius, shape=local_shape)
#         #     valid_coords = tuple(valid_coords[c_idx] + slc.start for c_idx, slc in enumerate(bbox))
#         # valid_sites[valid_coords] = True
#
#     local_cost = costs.copy()
#     local_cost[nmpy.logical_not(valid_sites)] = nmpy.inf
#
#     return local_cost
#
#
# class nearest_site_queue_t:
#     #
#     __slots__ = ("heap", "insertion_idx", "visited_sites")
#
#     def __init__(self):
#         #
#         self.heap = []
#         self.insertion_idx = 0
#         self.visited_sites = {}
#
#     def Insert(self, distance: number_h, site: site_h) -> None:
#         """
#         Insert a new site with its distance, or update the distance of an existing site
#         """
#         if site in self.visited_sites:
#             self._Delete(site)
#         self.insertion_idx += 1
#         site_nfo = [distance, self.insertion_idx, site]
#         self.visited_sites[site] = site_nfo
#         hp_.heappush(self.heap, site_nfo)
#
#     def Pop(self) -> Optional[Tuple[number_h, site_h]]:
#         """
#         Return (distance, site) for the site of minimum distance, or None if queue is empty
#         """
#         while self.heap:
#             distance, _, site = hp_.heappop(self.heap)
#             if site is not None:
#                 del self.visited_sites[site]
#                 return distance, site
#
#         return None
#
#     def _Delete(self, site: site_h) -> None:
#         #
#         site_nfo = self.visited_sites.pop(site)
#         site_nfo[-1] = None
DEBREUVE Eric's avatar
DEBREUVE Eric committed


# # SSA
# def __SphereCoords__(
#     row: int, col: int, dep: int, radius: int, shape: Tuple[int, int, int]
# ) -> np_array_picker_h:
#     #
#     sphere = nmpy.zeros(shape, dtype=nmpy.bool_)
DEBREUVE Eric's avatar
DEBREUVE Eric committed
#     # dw_.ellipsoid leaves a one pixel margin around the ellipse, hence [1:-1, 1:-1, 1:-1]
#     ellipse = dw_.ellipsoid(radius, radius, radius)[1:-1, 1:-1, 1:-1]
#     sp_slices = tuple(
#         slice(0, min(sphere.shape[idx_], ellipse.shape[idx_])) for idx_ in (0, 1, 2)
#     )
#     sphere[sp_slices] = ellipse[sp_slices]
#
#     sphere = im_.shift(
#         sphere, (row - radius, col - radius, dep - radius), order=0, prefilter=False
#     )
#
#     return sphere.nonzero()