-
DEBREUVE Eric authoredDEBREUVE Eric authored
dijkstra.py 12.74 KiB
# Copyright CNRS/Inria/UNS
# Contributor(s): Eric Debreuve (since 2019)
#
# eric.debreuve@cnrs.fr
#
# This software is governed by the CeCILL license under French law and
# abiding by the rules of distribution of free software. You can use,
# modify and/ or redistribute the software under the terms of the CeCILL
# license as circulated by CEA, CNRS and INRIA at the following URL
# "http://www.cecill.info".
#
# As a counterpart to the access to the source code and rights to copy,
# modify and redistribute granted by the license, users are provided only
# with a limited warranty and the software's author, the holder of the
# economic rights, and the successive licensors have only limited
# liability.
#
# In this respect, the user's attention is drawn to the risks associated
# with loading, using, modifying and/or developing or reproducing the
# software by the user in light of its specific status of free software,
# that may mean that it is complicated to manipulate, and that also
# therefore means that it is reserved for developers and experienced
# professionals having in-depth computer knowledge. Users are therefore
# encouraged to load and test the software's suitability as regards their
# requirements in conditions enabling the security of their systems and/or
# data to be ensured and, more generally, to use and operate it in the
# same conditions as regards security.
#
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL license and that you accept its terms.
import numpy as nmpy
array_t = nmpy.ndarray
def DijkstraCosts(image: array_t, som_map: array_t, ext_map: array_t) -> array_t:
"""
Gives the value inf if the voxel belongs to a soma or an extension.
Otherwise, gives the value 1 / (voxel intensity + 1).
The closer to cost 1, the less probable a connexion will take this path.
The closer to cost 0.5, the more probable.
"""
dijkstra_costs = 1.0 / (image + 1.0)
dijkstra_costs[nmpy.logical_or(som_map > 0, ext_map > 0)] = nmpy.inf
return dijkstra_costs
# def DijkstraShortestPath(
# costs: array_t,
# origin: site_h,
# target: Union[site_h, Sequence[site_h]],
# limit_to_sphere: bool = True,
# constrain_direction: bool = True,
# return_all: bool = False,
# ) -> Union[path_nfo_h, Tuple[path_nfo_h, ...]]:
# """
# Perform the Dijkstra shortest weighted path algorithm
# """
# #
# costs, targets, valid_directions, dir_lengths = _DijkstraShortestPathPrologue(
# costs,
# origin,
# target,
# limit_to_sphere,
# constrain_direction,
# )
#
# nearest_sites = nearest_site_queue_t()
# nearest_sites.Insert(0, origin)
# min_distance_to = {origin: 0.0}
# predecessor_of = {}
# visited_sites = set()
#
# while True:
# site_nfo = nearest_sites.Pop()
# if site_nfo is None:
# # Empty queue: full graph traversal did not allow to reach all targets
# # This case is correctly dealt with in the following
# break
#
# distance, site = site_nfo
# if site in targets:
# targets.remove(site)
# if targets.__len__() > 0:
# continue
# else:
# break
#
# if site not in visited_sites:
# visited_sites.add(site)
# for successor, edge_length in _OutgoingEdges(
# site, valid_directions, dir_lengths, costs
# ):
# successor = tuple(successor)
# next_distance = distance + edge_length
# min_distance = min_distance_to.get(successor)
# if (min_distance is None) or (next_distance < min_distance):
# min_distance_to[successor] = next_distance
# predecessor_of[successor] = site
# nearest_sites.Insert(next_distance, successor)
#
# if isinstance(target[0], Sequence):
# targets = tuple(target)
# else:
# targets = (target,)
#
# if return_all:
# all_paths = []
# for one_target in targets:
# distance = min_distance_to.get(one_target)
# if distance is None:
# all_paths.append(((), None))
# else:
# path = []
# site = one_target
# while site is not None:
# path.append(site)
# site = predecessor_of.get(site)
# all_paths.append((tuple(reversed(path)), distance))
#
# return tuple(all_paths)
# else:
# min_distance = nmpy.inf
# closest_target = None
# for one_target in targets:
# distance = min_distance_to.get(one_target)
# if (distance is not None) and (distance < min_distance):
# min_distance = distance
# closest_target = one_target
#
# path = []
# if closest_target is None:
# distance = None
# else:
# distance = min_distance_to.get(closest_target)
# site = closest_target
# while site is not None:
# path.append(site)
# site = predecessor_of.get(site)
#
# return tuple(reversed(path)), distance
#
#
# def _DijkstraShortestPathPrologue(
# costs: array_t,
# origin: site_h,
# target: Union[site_h, Sequence[site_h]],
# limit_to_sphere: bool,
# constrain_direction: bool,
# ) -> Tuple[array_t, List[site_h], array_t, array_t]:
# #
# if isinstance(target[0], Sequence):
# targets = list(target)
# else:
# targets = [target]
#
# if limit_to_sphere:
# costs = _SphereLimitedCosts(costs, origin, targets)
#
# if costs.ndim == 2:
# if constrain_direction:
# valid_directions, dir_lengths = _FilteredDirections(
# DIRECTIONS_2D, LENGTHS_2D, origin, targets
# )
# else:
# valid_directions, dir_lengths = DIRECTIONS_2D, LENGTHS_2D
# elif costs.ndim == 3:
# if constrain_direction:
# valid_directions, dir_lengths = _FilteredDirections(
# DIRECTIONS_3D, LENGTHS_3D, origin, targets
# )
# else:
# valid_directions, dir_lengths = DIRECTIONS_3D, LENGTHS_3D
# else:
# raise ValueError(f"Cost matrix has {costs.ndim} dimension(s); Expecting 2 or 3")
#
# return costs, targets, valid_directions, dir_lengths
#
#
# def _OutgoingEdges(
# site: site_h, valid_directions: array_t, dir_lengths: array_t, costs: array_t
# ) -> Iterator:
# #
# neighbors = valid_directions + nmpy.array(site, dtype=valid_directions.dtype)
# n_dims = site.__len__()
#
# inside = nmpy.all(neighbors >= 0, axis=1)
# for c_idx in range(n_dims):
# nmpy.logical_and(
# inside, neighbors[:, c_idx] <= costs.shape[c_idx] - 1, out=inside
# )
# neighbors = neighbors[inside, :]
# dir_lengths = dir_lengths[inside]
#
# # For any n_dims: neighbors_costs = costs[tuple(zip(*neighbors.tolist()))]
# if n_dims == 2:
# neighbors_costs = costs[(neighbors[:, 0], neighbors[:, 1])]
# else:
# neighbors_costs = costs[(neighbors[:, 0], neighbors[:, 1], neighbors[:, 2])]
#
# valid_sites = nmpy.isfinite(neighbors_costs) # Excludes inf and nan
# neighbors = neighbors[valid_sites, :]
# neighbors_costs = neighbors_costs[valid_sites] * dir_lengths[valid_sites]
#
# return zip(neighbors.tolist(), neighbors_costs)
#
#
# DIRECTIONS_2D: Final = nmpy.array(
# tuple((i, j) for i in (-1, 0, 1) for j in (-1, 0, 1) if i != 0 or j != 0),
# dtype=nmpy.int16,
# )
#
# DIRECTIONS_3D: Final = nmpy.array(
# tuple(
# (i, j, k)
# for i in (-1, 0, 1)
# for j in (-1, 0, 1)
# for k in (-1, 0, 1)
# if i != 0 or j != 0 or k != 0
# ),
# dtype=nmpy.int16,
# )
#
# LENGTHS_2D: Final = nmpy.linalg.norm(DIRECTIONS_2D, axis=1)
# LENGTHS_3D: Final = nmpy.linalg.norm(DIRECTIONS_3D, axis=1)
#
#
# def _FilteredDirections(
# all_directions: array_t,
# all_lengths: array_t,
# origin: site_h,
# targets: Sequence[site_h],
# ) -> Tuple[array_t, array_t]:
# #
# n_dims = origin.__len__()
# n_targets = targets.__len__()
#
# straight_lines = nmpy.empty((n_dims, n_targets), dtype=nmpy.float64)
# for p_idx, target in enumerate(targets):
# for c_idx in range(n_dims):
# straight_lines[c_idx, p_idx] = target[c_idx] - origin[c_idx]
#
# inner_prods = all_directions @ straight_lines
# valid_directions = nmpy.any(inner_prods >= 0, axis=1)
#
# return all_directions[valid_directions, :], all_lengths[valid_directions]
#
#
# def _SphereLimitedCosts(
# costs: array_t, origin: site_h, targets: Sequence[site_h]
# ) -> array_t:
# """
# Note: Un-numba-ble because of slice
# """
# valid_sites = nmpy.zeros_like(costs, dtype=nmpy.bool_)
# center_map = nmpy.ones_like(costs, dtype=nmpy.uint8)
# distances = nmpy.empty_like(costs, dtype=nmpy.float64)
#
# targets_as_array = nmpy.array(targets)
# centers = nmpy.around(0.5 * targets_as_array.__add__(origin)).astype(
# nmpy.int64, copy=False
# )
# centers = tuple(tuple(center) for center in centers)
#
# n_dims = origin.__len__()
#
# for t_idx, target in enumerate(targets):
# sq_radius = max(
# (nmpy.subtract(centers[t_idx], origin) ** 2).sum(),
# (nmpy.subtract(centers[t_idx], target) ** 2).sum(),
# )
# radius = nmpy.sqrt(sq_radius).astype(nmpy.int64, copy=False) + 1
# # Note the +1 in slices ends to account for right-open ranginess
# bbox = tuple(
# slice(
# max(centers[t_idx][c_idx] - radius, 0),
# min(centers[t_idx][c_idx] + radius + 1, distances.shape[c_idx]),
# )
# for c_idx in range(n_dims)
# )
# if t_idx > 0:
# center_map[centers[t_idx - 1]] = 1
# center_map[centers[t_idx]] = 0
# mp_.distance_transform_edt(center_map[bbox], distances=distances[bbox])
# distance_thr = max(distances[origin], distances[target])
#
# valid_sites[bbox][distances[bbox] <= distance_thr] = True
# # # SSA
# # if n_dims == 2:
# # valid_coords = dw_.circle(
# # *centers[t_idx], radius, shape=valid_sites.shape
# # )
# # else:
# # local_shape = tuple(slc.stop - slc.start for slc in bbox)
# # local_center = tuple(centers[t_idx][c_idx] - slc.start for c_idx, slc in enumerate(bbox))
# # valid_coords = __SphereCoords__(*local_center, radius, shape=local_shape)
# # valid_coords = tuple(valid_coords[c_idx] + slc.start for c_idx, slc in enumerate(bbox))
# # valid_sites[valid_coords] = True
#
# local_cost = costs.copy()
# local_cost[nmpy.logical_not(valid_sites)] = nmpy.inf
#
# return local_cost
#
#
# class nearest_site_queue_t:
# #
# __slots__ = ("heap", "insertion_idx", "visited_sites")
#
# def __init__(self):
# #
# self.heap = []
# self.insertion_idx = 0
# self.visited_sites = {}
#
# def Insert(self, distance: number_h, site: site_h) -> None:
# """
# Insert a new site with its distance, or update the distance of an existing site
# """
# if site in self.visited_sites:
# self._Delete(site)
# self.insertion_idx += 1
# site_nfo = [distance, self.insertion_idx, site]
# self.visited_sites[site] = site_nfo
# hp_.heappush(self.heap, site_nfo)
#
# def Pop(self) -> Optional[Tuple[number_h, site_h]]:
# """
# Return (distance, site) for the site of minimum distance, or None if queue is empty
# """
# while self.heap:
# distance, _, site = hp_.heappop(self.heap)
# if site is not None:
# del self.visited_sites[site]
# return distance, site
#
# return None
#
# def _Delete(self, site: site_h) -> None:
# #
# site_nfo = self.visited_sites.pop(site)
# site_nfo[-1] = None
# # SSA
# def __SphereCoords__(
# row: int, col: int, dep: int, radius: int, shape: Tuple[int, int, int]
# ) -> np_array_picker_h:
# #
# sphere = nmpy.zeros(shape, dtype=nmpy.bool)
# # dw_.ellipsoid leaves a one pixel margin around the ellipse, hence [1:-1, 1:-1, 1:-1]
# ellipse = dw_.ellipsoid(radius, radius, radius)[1:-1, 1:-1, 1:-1]
# sp_slices = tuple(
# slice(0, min(sphere.shape[idx_], ellipse.shape[idx_])) for idx_ in (0, 1, 2)
# )
# sphere[sp_slices] = ellipse[sp_slices]
#
# sphere = im_.shift(
# sphere, (row - radius, col - radius, dep - radius), order=0, prefilter=False
# )
#
# return sphere.nonzero()