diff --git a/dijkstra_1_to_n.py b/dijkstra_1_to_n.py deleted file mode 100644 index 2a6142a259458b886816281fb88e2d7c98ee3b18..0000000000000000000000000000000000000000 --- a/dijkstra_1_to_n.py +++ /dev/null @@ -1,344 +0,0 @@ -""" -Dijkstra Shortest Weighted Path from one image/volume pixel/voxel to one or more image/volume pixel(s)/voxel(s) - Graph nodes = pixels/voxels - Graph edges = pixel/voxel neighborhood relationships - Edge weights = computed from pixel/voxel values typically - -Adapted from material of a Marc Pegon course: - https://www.ljll.math.upmc.fr/pegon/teaching.html - https://www.ljll.math.upmc.fr/pegon/documents/BCPST/TP06_src.tar.gz -""" -from type import array_t, number_h, site_h, site_path_h - -import heapq as hp_ -from typing import Any, Iterator, Sequence, Tuple, Union - -import numpy as np_ -import scipy.ndimage.morphology as mp_ - - -# Slightly slower alternative -# import scipy.ndimage as im_ -# import skimage.draw as dw_ - - -def DijkstraShortestPath( - costs: array_t, - from_point: site_h, - to_point: Union[site_h, Sequence[site_h]], - limit_to_sphere: bool = True, - constrain_direction: bool = True, -) -> Tuple[site_path_h, float]: - # - if isinstance(to_point[0], Sequence): - to_points = list(to_point) - else: - to_points = [to_point] - - if limit_to_sphere: - costs = SphereLimitedCosts(costs, from_point, to_points) - - if costs.ndim == 2: - if constrain_direction: - allowed_directions, dir_lengths = FilteredDirections( - directions_2d_c, lengths_2d_c, from_point, to_points - ) - else: - allowed_directions, dir_lengths = directions_2d_c, lengths_2d_c - elif costs.ndim == 3: - if constrain_direction: - allowed_directions, dir_lengths = FilteredDirections( - directions_3d_c, lengths_3d_c, from_point, to_points - ) - else: - allowed_directions, dir_lengths = directions_3d_c, lengths_3d_c - else: - raise ValueError(f"Cost matrix has {costs.ndim} dimension(s); Expecting 2 or 3") - - priority_queue = priority_queue_t() - priority_queue.Insert(0, from_point) - min_distance_to = {from_point: 0} - predecessor_of = {} - visited_nodes = set() - - while True: - node_info = priority_queue.Pop() - if node_info is None: - break # Empty queue: full graph traversal did not allow to reach all to_points - - distance, node = node_info - if node in to_points: - to_points.remove(node) - if to_points.__len__() > 0: - continue - else: - break - - if node not in visited_nodes: - visited_nodes.add(node) - for successor, edge_length in OutgoingEdges( - node, allowed_directions, dir_lengths, costs - ): - successor = tuple(successor) - next_distance = distance + edge_length - min_distance = min_distance_to.get(successor) - if (min_distance is None) or (next_distance < min_distance): - min_distance_to[successor] = next_distance - predecessor_of[successor] = node - priority_queue.Insert(next_distance, successor) - - if isinstance(to_point[0], Sequence): - to_points = list(to_point) - else: - to_points = [to_point] - - min_distance = np_.inf - closest_node = None - for current_to_point in to_points: - distance = min_distance_to.get(current_to_point) - if (distance is not None) and (distance < min_distance): - min_distance = distance - closest_node = current_to_point - - path = [] - distance = min_distance_to.get(closest_node) if closest_node is not None else None - if distance is not None: - node = closest_node - while node is not None: - path.append(node) - node = predecessor_of.get(node) - - return tuple(reversed(path)), distance - - -def OutgoingEdges( - node_coords: site_h, - allowed_directions: array_t, - dir_lengths: array_t, - costs: array_t, -) -> Iterator: - # - neighbors = allowed_directions + np_.array( - node_coords, dtype=allowed_directions.dtype - ) - n_dims = node_coords.__len__() - - inside = np_.all(neighbors >= 0, axis=1) - for c_idx in range(n_dims): - np_.logical_and( - inside, neighbors[:, c_idx] <= costs.shape[c_idx] - 1, out=inside - ) - neighbors = neighbors[inside, :] - dir_lengths = dir_lengths[inside] - - if n_dims == 2: - neighbors_costs = costs[(neighbors[:, 0], neighbors[:, 1])] - else: - neighbors_costs = costs[(neighbors[:, 0], neighbors[:, 1], neighbors[:, 2])] - # For any n_dims: neighbors_costs = costs[tuple(zip(*neighbors.tolist()))] - - allowed_nodes = np_.isfinite(neighbors_costs) # Excludes inf and nan - neighbors = neighbors[allowed_nodes, :] - neighbors_costs = neighbors_costs[allowed_nodes] * dir_lengths[allowed_nodes] - - return zip(neighbors.tolist(), neighbors_costs) - - -directions_2d_c = np_.array( - tuple((i, j) for i in (-1, 0, 1) for j in (-1, 0, 1) if i != 0 or j != 0), - dtype=np_.int16, -) -lengths_2d_c = np_.linalg.norm(directions_2d_c, axis=1) - -directions_3d_c = np_.array( - tuple( - (i, j, k) - for i in (-1, 0, 1) - for j in (-1, 0, 1) - for k in (-1, 0, 1) - if i != 0 or j != 0 or k != 0 - ), - dtype=np_.int16, -) -lengths_3d_c = np_.linalg.norm(directions_3d_c, axis=1) - - -def FilteredDirections( - all_directions: array_t, - all_lengths: array_t, - from_point: site_h, - to_points: Sequence[site_h], -) -> Tuple[array_t, array_t]: - # - n_dims = from_point.__len__() - n_to_points = to_points.__len__() - - straight_lines = np_.empty((n_dims, n_to_points), dtype=np_.float) - for p_idx, to_point in enumerate(to_points): - for c_idx in range(n_dims): - straight_lines[c_idx, p_idx] = to_point[c_idx] - from_point[c_idx] - - inner_prods = all_directions @ straight_lines - valid_directions = np_.any(inner_prods >= 0, axis=1) - - return all_directions[valid_directions, :], all_lengths[valid_directions] - - -def SphereLimitedCosts( - costs: array_t, from_point: site_h, to_points: Sequence[site_h] -) -> array_t: - # - valid_sites = np_.zeros_like(costs, dtype=np_.bool) - center_map = np_.ones_like(costs, dtype=np_.int8) - distances = np_.empty_like(costs, dtype=np_.float) - - to_points_as_array = np_.array(to_points) - centers = np_.around(0.5 * (from_point + to_points_as_array)).astype(np_.int64) - centers = tuple(tuple(center) for center in centers) - - n_dims = from_point.__len__() - - for p_idx, to_point in enumerate(to_points): - sq_radius = max( - (np_.subtract(centers[p_idx], from_point) ** 2).sum(), - (np_.subtract(centers[p_idx], to_point) ** 2).sum(), - ) - radius = np_.sqrt(sq_radius).astype(np_.int64) + 1 - # Note the +1 in slices ends to account for right-open ranginess - bbox = tuple( - slice( - max(centers[p_idx][c_idx] - radius, 0), - min(centers[p_idx][c_idx] + radius + 1, distances.shape[c_idx]), - ) - for c_idx in range(n_dims) - ) - if p_idx > 0: - center_map[centers[p_idx - 1]] = 1 - center_map[centers[p_idx]] = 0 - mp_.distance_transform_edt(center_map[bbox], distances=distances[bbox]) - distance_thr = max(distances[from_point], distances[to_point]) - - valid_sites[bbox][distances[bbox] <= distance_thr] = True - # Slightly slower alternative - # if n_dims == 2: - # valid_coords = dw_.circle( - # *centers[p_idx], radius, shape=valid_sites.shape - # ) - # else: - # local_shape = tuple(slc.stop - slc.start for slc in bbox) - # local_center = tuple(centers[p_idx][c_idx] - slc.start for c_idx, slc in enumerate(bbox)) - # valid_coords = __SphereCoords__(*local_center, radius, shape=local_shape) - # valid_coords = tuple(valid_coords[c_idx] + slc.start for c_idx, slc in enumerate(bbox)) - # valid_sites[valid_coords] = True - - local_cost = costs.copy() - local_cost[np_.logical_not(valid_sites)] = np_.inf - - return local_cost - - -# Slightly slower alternative -# def __SphereCoords__( -# row: int, col: int, dep: int, radius: int, shape: Tuple[int, int, int] -# ) -> np_array_picker_h: -# # -# sphere = np_.zeros(shape, dtype=np_.bool) -# # dw_.ellipsoid leaves a one pixel margin around the ellipse, hence [1:-1, 1:-1, 1:-1] -# ellipse = dw_.ellipsoid(radius, radius, radius)[1:-1, 1:-1, 1:-1] -# sp_slices = tuple( -# slice(0, min(sphere.shape[idx_], ellipse.shape[idx_])) for idx_ in (0, 1, 2) -# ) -# sphere[sp_slices] = ellipse[sp_slices] -# -# sphere = im_.shift( -# sphere, (row - radius, col - radius, dep - radius), order=0, prefilter=False -# ) -# -# return sphere.nonzero() - - -class priority_queue_t: - # - def __init__(self): - # - self.heap = [] - self.counter = 0 - self.entry_finder = {} - self.REMOVED = "<remove_marker>" - - def Insert(self, priority: number_h, task: Any) -> None: - """ - Insert a new task with some priority, or update the priority of an existing task - """ - if task in self.entry_finder: - self.Delete(task) - self.counter += 1 - entry = [priority, self.counter, task] - self.entry_finder[task] = entry - hp_.heappush(self.heap, entry) - - def Pop(self) -> Tuple[number_h, Any]: - """ - Return (priority, task) for the task of minimum priority, or None if queue is empty - """ - while self.heap: - priority, count, task = hp_.heappop(self.heap) - if task is not self.REMOVED: - del self.entry_finder[task] - return priority, task - - def Delete(self, task: Any) -> None: - # - entry = self.entry_finder.pop(task) - entry[-1] = self.REMOVED - - -if __name__ == "__main__": - # - import matplotlib.pyplot as pl_ - import skimage.data as dt_ - - n_dims_ = 2 # 2 or 3 - - image = dt_.astronaut() - - if n_dims_ == 2: - from_point_, to_point_ = (70, 10), [(145, 260), (150, 260), (155, 260)] - costs_ = 1.0 / (np_.sum(image, axis=2) + 1.0) ** 2 - else: - from_point_, to_point_ = ( - (70, 10, 0), - [(145, 260, 2), (150, 260, 1), (155, 260, 0)], - ) - costs_ = 1.0 / (image + 1) ** 2 - - path_, distance_ = DijkstraShortestPath( - costs_, from_point_, to_point_, limit_to_sphere=False - ) - - print(distance_, path_[-1]) - for row, col, *dep in path_: - image[row, col, :] = (255, 0, 0) - pl_.imshow(image) - pl_.show() - - image = image[:, :, 0] - image.fill(0) - image[120:200, 50:70] = 1 - image[125:200, 50:80] = 1 - image[200:220, 50:120] = 1 - image[120:220, 120:140] = 1 - - from_point_, to_point_ = (120, 69), (120, 120) - costs_ = 1.0 / (image + 1.0e-10) - - path_, distance_ = DijkstraShortestPath( - costs_, from_point_, to_point_, limit_to_sphere=False - ) - c_path_, c_distance_ = DijkstraShortestPath(costs_, from_point_, to_point_) - - print(distance_, c_distance_) - image[tuple(zip(*path_))] += 2 - image[tuple(zip(*c_path_))] += 4 - pl_.imshow(image) - pl_.show() diff --git a/dijkstra_1_to_n.py b/dijkstra_1_to_n.py new file mode 120000 index 0000000000000000000000000000000000000000..c28f560f82fc17f05e899094160a79e5174f378a --- /dev/null +++ b/dijkstra_1_to_n.py @@ -0,0 +1 @@ +../../../../brick/def/dijkstra-img/dijkstra_1_to_n.py \ No newline at end of file diff --git a/extension.py b/extension.py index 4d935c6245382b4824d8a1cfaa69d0b213afe404..c0b12719119ee42ff01a2dd181dd62e97293cb58 100644 --- a/extension.py +++ b/extension.py @@ -1,6 +1,7 @@ from __future__ import annotations import frangi_3d as fg_ +from glial_cmp import glial_cmp_t import map_labeling as ml_ from type import array_t, site_h @@ -16,35 +17,24 @@ from scipy import ndimage as im_ min_area_c = 100 -class extension_t: +class extension_t(glial_cmp_t): # # bmp=boolean map # lmp=labeled map (intXX or uintXX array) # map=extension map (map=binary, int8 or uint8 array)) # ep_=end point # soma_uid: connected to a soma somewhere upstream (as opposed to downstream extensions) - # extension_uids: downstream (as opposed to being upstreamed connected) + # extensions: downstream (as opposed to being upstreamed connected) # - __slots__ = ( - "uid", - "sites", - "scales", - "end_points", - "ep_closest_somas", - "soma_uid", - "extension_uids", - "__cache__", - ) + __slots__ = ("scales", "end_points", "ep_closest_somas", "soma_uid", "__cache__") def __init__(self): # - self.uid = None - self.sites = None + super().__init__() self.scales = None self.end_points = None self.ep_closest_somas = None self.soma_uid = None - self.extension_uids = None self.__cache__ = None @classmethod @@ -54,12 +44,9 @@ class extension_t: # instance = cls() - # sites: might contain voxels that could be removed w/o breaking connectivity - instance.uid = uid - instance.sites = (lmp == uid).nonzero() + instance.InitializeFromMaps(lmp, uid) instance.scales = scales[instance.sites] instance.end_points = (end_point_lmp == uid).nonzero() - instance.extension_uids = [] instance.__cache__ = {} return instance @@ -88,9 +75,37 @@ class extension_t: return () - def Extend(self, extensions: Sequence[extension_t], costs: array_t) -> None: + def BackReferenceSoma(self, glial_cmp: glial_cmp_t) -> None: # - print(f"{__name__}.py: {self.Extend.__name__}: To be implemented") + if isinstance(glial_cmp, extension_t): + self.soma_uid = glial_cmp.soma_uid + else: + self.soma_uid = glial_cmp.uid + + def __str__(self) -> str: + # + if self.extensions is None: + n_extensions = 0 + else: + n_extensions = self.extensions.__len__() + + return ( + f"Ext.{self.uid}, " + f"sites={self.sites[0].__len__()}, " + f"endpoints={self.end_points[0].__len__()}, " + f"closest soma(s)={np_.unique(self.ep_closest_somas).__str__()[1:-1]}, " + f"soma={self.soma_uid}, " + f"extensions={n_extensions}" + ) + + @staticmethod + def ExtensionWithSite(extensions: Sequence[extension_t], site: site_h): + # + for extension in extensions: + if site in tuple(zip(*extension.sites)): + return extension + + return None @staticmethod def EnhancedForDetection( @@ -196,7 +211,7 @@ def __HysterisisImage__(image: array_t, low: float, high: float) -> array_t: def __MorphologicalCleaning__(image: array_t) -> array_t: - + # result = image.copy() selem = mp_.disk(1) diff --git a/glial_cmp.py b/glial_cmp.py new file mode 100644 index 0000000000000000000000000000000000000000..bf512b44106bde69e892507f1de21441130fe239 --- /dev/null +++ b/glial_cmp.py @@ -0,0 +1,52 @@ +from __future__ import annotations + +from type import array_t, site_path_h + +import numpy as np_ + + +class glial_cmp_t: + # + __slots__ = ("uid", "sites", "connection_path", "extensions", "img_shape") + + def __init__(self): + # + self.uid = None + self.sites = None + self.connection_path = None + self.extensions = None + self.img_shape = None + + def InitializeFromMaps(self, lmp: array_t, uid: int) -> None: + # + self.uid = uid + # sites: might contain voxels that could be removed w/o breaking connectivity + self.sites = (lmp == uid).nonzero() + self.connection_path = {} + self.extensions = [] + self.img_shape = lmp.shape + + def ExtendWith( + self, extension: glial_cmp_t, through: site_path_h, costs: array_t + ) -> None: + # + connection_path = tuple(zip(*through[1:-1])) + if connection_path.__len__() == 0: + connection_path = None + extension_path = extension.sites + + self.connection_path[extension.uid] = connection_path + self.extensions.append(extension) + + extension.BackReferenceSoma(self) + + # TODO: Ideally, these paths should be dilated + # but in ext-ext connections, there must not be dilation around the current ext + # (current ext plays the role of a soma in soma-ext step) + if connection_path is not None: + costs[connection_path] = np_.inf + costs[extension_path] = np_.inf + + def BackReferenceSoma(self, glial_cmp: glial_cmp_t): + # + raise NotImplementedError diff --git a/main.py b/main.py index e95cf976f0f97206de7d3c8ece2a640864d45eac..0742fae95413d41d1b147743ea2d520f91b1af35 100644 --- a/main.py +++ b/main.py @@ -2,16 +2,18 @@ import extension as ext_ import plot as ot_ import soma as soma_ +import itertools as it_ +import time as tm_ + import matplotlib.pyplot as pl_ import numpy as np_ import skimage.color as cl_ import skimage.io as io_ import skimage.measure as ms_ import skimage.morphology as mp_ -import time as tm_ -print(f"STARTED: {tm_.strftime('%a, %b %d %Y @ %H:%M:%S')}\n") +print(f"STARTED: {tm_.strftime('%a, %b %d %Y @ %H:%M:%S')}") start_time = tm_.time() @@ -31,6 +33,11 @@ ext_high = 0.6 # high_ext = 8.0e-3 soma_selem = mp_.disk(2) +max_straight_sq_dist = 30 ** 2 +max_weighted_length = 20.0 + +min_area_c = 1000 + # --- Images image = io_.imread(data_path) @@ -56,10 +63,10 @@ axes = None # --- Somas if "soma" in run: - print("--- Soma Detection") + print("\n--- Soma Detection") som_nfo["map"] = soma_t.Map(image_for_soma, soma_low, soma_high, soma_selem) - som_nfo["map"] = soma_t.FilteredMap(som_nfo["map"]) + som_nfo["map"] = soma_t.FilteredMap(som_nfo["map"], min_area_c) som_nfo["contour_map"] = soma_t.ContourMap(som_nfo["map"]) som_nfo["lmp"], n_somas = ms_.label(som_nfo["map"], return_num=True) @@ -89,7 +96,7 @@ if "soma" in run: # -- Extentions if "extension" in run: - print("--- Extension Detection") + print("\n--- Extension Detection") enhanced_ext, ext_scales = extension_t.EnhancedForDetection(image_for_ext) ext_nfo["coarse_map"] = extension_t.CoarseMap(enhanced_ext, ext_low, ext_high) @@ -102,7 +109,9 @@ if "extension" in run: ext_nfo["end_point_lmp"] = ext_nfo["end_point_map"] * ext_nfo["lmp"] extensions = tuple( - extension_t().FromMaps(ext_nfo["lmp"], ext_nfo["end_point_lmp"], ext_scales, uid) + extension_t().FromMaps( + ext_nfo["lmp"], ext_nfo["end_point_lmp"], ext_scales, uid + ) for uid in range(1, n_extensions + 1) ) for extension in extensions: @@ -121,41 +130,108 @@ if "extension" in run: # -- Soma-Extention if "som-ext" in run: - print("--- Soma <-> Extension") + print("\n--- Soma <-> Extension") # TODO: Ideally, the extension part should be dilated # but in ext-ext connections, there must not be dilation around the current ext # (current ext plays the role of a soma in soma-ext step) costs[np_.logical_or(som_nfo["map"] > 0, ext_nfo["map"] > 0)] = np_.inf - for soma in somas: - soma.Extend(extensions, som_nfo["dist_to_closest"], costs) + candidate_conn_nfo = [] # conn=connection + for soma, extension in it_.product(somas, extensions): + new_candidates = extension.EndPointsForSoma(soma.uid) + candidate_conn_nfo.extend( + (soma, extension, end_point) for end_point in new_candidates + ) + candidate_conn_nfo.sort(key=lambda elm: som_nfo["dist_to_closest"][elm[2]]) + + for c_idx, (soma, extension, end_point) in enumerate(candidate_conn_nfo): + print(f" Soma.{soma.uid} <-?-> Ext.{extension.uid}({c_idx})", end="") + path, length = soma_t.ShortestPathFrom( + end_point, + costs, + soma.ContourPointsCloseTo, + max_straight_sq_dist=max_straight_sq_dist, + ) + if length <= max_weighted_length: + soma.ExtendWith(extension, path, costs) + print(": Made") + else: + print("") + som_nfo["soma_w_ext_lmp"] = soma_t.SomasWithExtensionsLMap(somas) + + # for soma in somas: + # soma.Extend(extensions, som_nfo["dist_to_closest"], costs) connected_ext_uids = tuple( extension.uid for extension in extensions if extension.soma_uid is not None ) - print(f" Connected Ext = {connected_ext_uids.__len__()}" - f"/{extensions.__len__()}\n" - f" {connected_ext_uids}") + print( + f" Connected Ext = {connected_ext_uids.__len__()}" + f"/{extensions.__len__()}\n" + f" {connected_ext_uids}" + ) if with_plot: - for soma in somas: - if not soma.has_extensions: - continue - - _ = ot_.PlotSomaWithExtensions(soma, som_nfo["lmp"], extensions) - pl_.title(f"Soma.{soma.uid} + Ext.{soma.extension_uids}") - soma_w_ext_lmp = soma_t.SomasWithExtensionsLMap(somas, som_nfo["lmp"], extensions) - pl_.matshow(soma_w_ext_lmp.max(axis=0)), pl_.title("Somas + Extensions") + # Somas without extensions have already been plotted + for soma in filter(lambda elm: elm.has_extensions, somas): + _ = ot_.PlotSomaWithExtensions(soma, som_nfo["lmp"]) + pl_.title( + f"Soma.{soma.uid} + Ext.{tuple(ext.uid for ext in soma.extensions)}" + ) + pl_.matshow(som_nfo["soma_w_ext_lmp"].max(axis=0)), pl_.title( + "Somas + Extensions" + ) # -- Extention-Extention if "ext-ext" in run: - print("--- Extension <-> Extension") - - for extension in extensions: - extension.Extend(extensions, costs) - + print("\n--- Extension <-> Extension") + + should_look_for_connections = True + while should_look_for_connections: + som_nfo["soma_w_ext_lmp"] = soma_t.SomasWithExtensionsLMap(somas) + som_nfo["dist_to_closest"], som_nfo["influence_map"] = soma_t.InfluenceMaps( + som_nfo["soma_w_ext_lmp"] + ) + unconnected_extensions = list( + filter(lambda ext: ext.soma_uid is None, extensions) + ) + for extension in unconnected_extensions: + extension.CaptureClosestSomas(som_nfo["influence_map"]) + + candidate_conn_nfo = [] # conn=connection + for soma, extension in it_.product(somas, unconnected_extensions): + new_candidates = extension.EndPointsForSoma(soma.uid) + candidate_conn_nfo.extend( + (soma, extension, end_point) for end_point in new_candidates + ) + candidate_conn_nfo.sort(key=lambda elm: som_nfo["dist_to_closest"][elm[2]]) + + should_look_for_connections = False + for c_idx, (soma, extension, end_point) in enumerate(candidate_conn_nfo): + print(f" Soma.{soma.uid} <-?-> Ext.{extension.uid}({c_idx})", end="") + path, length = soma_t.ShortestPathFrom( + end_point, + costs, + soma.ExtensionPointsCloseTo, + max_straight_sq_dist=max_straight_sq_dist, + ) + if length <= max_weighted_length: + tgt_extenstion = extension_t.ExtensionWithSite(extensions, path[-1]) + tgt_extenstion.ExtendWith(extension, path, costs) + should_look_for_connections = True + print(": Made") + else: + print("") + + +# -- Summary +print("\n") +for soma in somas: + print(soma) +for extension in extensions: + print(extension) elapsed_time = tm_.gmtime(tm_.time() - start_time) print(f"\nElapsed Time={tm_.strftime('%Hh %Mm %Ss', elapsed_time)}") diff --git a/plot.py b/plot.py index 73122b6cd65d8f647da373763f7f1a90e2604ba7..17808c84f4f329fa78ebfe430288af5c1ce31d65 100644 --- a/plot.py +++ b/plot.py @@ -122,9 +122,7 @@ def PlotExtensions( return axes -def PlotSomaWithExtensions( - soma: soma_t, soma_lmp: array_t, extensions: Sequence[extension_t], axes=None -) -> Optional[Any]: +def PlotSomaWithExtensions(soma: soma_t, soma_lmp: array_t, axes=None) -> Optional[Any]: # shape = soma_lmp.shape depth_factor, depth_limit = __DepthFactorAndLimit__(shape) @@ -133,11 +131,16 @@ def PlotSomaWithExtensions( _, axes = __FigAndAxes__(shape, depth_limit) PlotLMap(soma_lmp, labels=soma.uid, axes=axes) - for ext_uid in soma.extension_uids: - PlotConnection(soma.connection_path[ext_uid], soma.uid, shape, axes=axes) - for extension in extensions: - if extension.uid == ext_uid: - PlotExtensions(extension, shape, axes=axes) + for connection_path in filter( + lambda path: path is not None, soma.connection_path.values() + ): + PlotConnection(connection_path, soma.uid, shape, axes=axes) + for extension in soma.Extensions(): + for connection_path in filter( + lambda path: path is not None, extension.connection_path.values() + ): + PlotConnection(connection_path, soma.uid, shape, axes=axes) + PlotExtensions(extension, shape, axes=axes) pl_.tight_layout() diff --git a/soma.py b/soma.py index 5ca6322adf50060325ecc6f5a95817a81e4c2af1..37c7b3076a2c9fe4372acab7ba4d751100e8d1d5 100644 --- a/soma.py +++ b/soma.py @@ -1,12 +1,14 @@ from __future__ import annotations import dijkstra_1_to_n as dk_ +from glial_cmp import glial_cmp_t import map_labeling as ml_ from extension import extension_t -from type import array_t, py_array_picker_h, site_h +from type import array_t, py_array_picker_h, site_h, site_path_h from collections import namedtuple as namedtuple_t -from typing import Optional, Sequence, Tuple +import sys as sy_ +from typing import Callable, Optional, Sequence, Tuple import numpy as np_ import scipy.ndimage as im_ @@ -15,55 +17,55 @@ import skimage.measure as ms_ import skimage.morphology as mp_ -som_ext_path_t = namedtuple_t("som_ext_path_t", "extension length sites") +som_ext_path_t = namedtuple_t("som_ext_path_t", "extension length path idx") -max_straight_sq_dist = 30 ** 2 -max_weighted_dist = 20.0 - - -min_area_c = 1000 - - -class soma_t: +class soma_t(glial_cmp_t): # # bmp=boolean map # lmp=labeled map (intXX or uintXX array) # map=extension map (map=binary, int8 or uint8 array)) # - __slots__ = ( - "uid", - "lmp_ref", - "contour_points", - "connection_path", - "extension_uids", - ) + __slots__ = ("contour_points",) def __init__(self): # - self.uid = None - self.lmp_ref = None + super().__init__() self.contour_points = None - self.connection_path = None - self.extension_uids = None @classmethod def FromMaps(cls, lmp: array_t, contour_lmp: array_t, uid: int) -> soma_t: # instance = cls() - instance.uid = uid - instance.lmp_ref = lmp + instance.InitializeFromMaps(lmp, uid) instance.contour_points = tuple(zip(*(contour_lmp == uid).nonzero())) - instance.connection_path = {} - instance.extension_uids = [] return instance @property def has_extensions(self) -> bool: # - return self.extension_uids.__len__() > 0 + return self.extensions.__len__() > 0 + + def Extensions(self, max_level: int = sy_.maxsize) -> Tuple[extension_t]: + # + # max_level=1: directly connected extensions + # ... + soma_ext = self.extensions.copy() + + current_level_idx = 0 + for level in range(2, max_level + 1): + next_level_idx = soma_ext.__len__() + if next_level_idx == current_level_idx: + break + + for extension in soma_ext[current_level_idx:]: + soma_ext.extend(extension.extensions) + + current_level_idx = next_level_idx + + return tuple(soma_ext) def ContourPointsCloseTo( self, point: site_h, max_distance: float @@ -83,69 +85,105 @@ class soma_t: return points, points_as_picker - def Extend( - self, extensions: Sequence[extension_t], dist_to_soma: array_t, costs: array_t - ) -> None: + def ExtensionPointsCloseTo( + self, point: site_h, max_distance: float + ) -> Tuple[Optional[Tuple[site_h, ...]], Optional[py_array_picker_h]]: # - candidate_ext_eps = [] # eps=end points - for extension in extensions: - new_candidates = extension.EndPointsForSoma(self.uid) - candidate_ext_eps.extend( - (end_point, extension) for end_point in new_candidates + points = [] + for extension in self.Extensions(): + ext_sites = tuple(zip(*extension.sites)) + points.extend( + ext_point + for ext_point in ext_sites + if (np_.subtract(point, ext_point) ** 2).sum() <= max_distance ) - candidate_ext_eps.sort(key=lambda elm: dist_to_soma[elm[0]]) - - while True: - som_ext_paths = [] - for ep_idx, (ext_end_point, extension) in enumerate(candidate_ext_eps): - if extension.soma_uid is not None: - continue - - close_contour_points, contour_indexing = self.ContourPointsCloseTo( - ext_end_point, max_straight_sq_dist - ) - if close_contour_points is None: - continue - - costs[ext_end_point] = 0.0 - costs[contour_indexing] = 0.0 - print(f" Soma.{self.uid} <-?-> Ext.{extension.uid}.{ep_idx}") - sites, length = dk_.DijkstraShortestPath( - costs, ext_end_point, close_contour_points - ) - costs[ext_end_point] = np_.inf - costs[contour_indexing] = np_.inf - if length <= max_weighted_dist: - som_ext_paths.append( - som_ext_path_t(extension=extension, length=length, sites=sites) - ) - if sites.__len__() == 2: - break - - if som_ext_paths.__len__() > 0: - som_ext_paths.sort(key=lambda path: path.length) - shorest_path = som_ext_paths[0] - connection_path = tuple(zip(*shorest_path.sites[1:-1])) - if connection_path.__len__() == 0: - connection_path = None - - closest_extension = shorest_path.extension - closest_extension.soma_uid = self.uid - extension_path = closest_extension.sites - - self.connection_path[closest_extension.uid] = connection_path - self.extension_uids.append(closest_extension.uid) - - # TODO: Ideally, these paths should be dilated - # but in ext-ext connections, there must not be dilation around the current ext - # (current ext plays the role of a soma in soma-ext step) - if connection_path is not None: - costs[connection_path] = np_.inf - costs[extension_path] = np_.inf - - print(f" => {self.uid} <-> {closest_extension.uid}") - else: - break + points = tuple(points) + + if points.__len__() > 0: + points_as_picker = tuple(zip(*points)) + else: + points = None + points_as_picker = None + + return points, points_as_picker + + # def Extend( + # self, extensions: Sequence[extension_t], dist_to_soma: array_t, costs: array_t + # ) -> None: + # # + # candidate_ext_nfo = [] # eps=end points + # for extension in extensions: + # new_candidates = extension.EndPointsForSoma(self.uid) + # candidate_ext_nfo.extend( + # (end_point, extension) for end_point in new_candidates + # ) + # candidate_ext_nfo.sort(key=lambda elm: dist_to_soma[elm[0]]) + # unconnected_candidates = list( + # filter(lambda elm: elm[1].soma_uid is None, candidate_ext_nfo) + # ) + # + # should_look_for_connections = True + # while should_look_for_connections: + # som_ext_path_nfos = [] + # for ep_idx, (ext_end_point, extension) in enumerate(unconnected_candidates): + # path, length = self.ShortestPathFrom( + # extension.uid, ext_end_point, ep_idx, costs + # ) + # if length <= max_weighted_dist: + # som_ext_path_nfos.append( + # som_ext_path_t( + # extension=extension, length=length, path=path, idx=ep_idx + # ) + # ) + # if path.__len__() == 2: + # break + # + # if som_ext_path_nfos.__len__() > 0: + # som_ext_path_nfos.sort(key=lambda nfo: nfo.length) + # shorest_path = som_ext_path_nfos[0] + # self.ExtendWith(shorest_path.extension, shorest_path.path, costs) + # del unconnected_candidates[shorest_path.idx] + # else: + # should_look_for_connections = False + + @staticmethod + def ShortestPathFrom( + point: site_h, + costs: array_t, + soma_candidate_points_fct: Callable, + max_straight_sq_dist: float = np_.inf, + ) -> Tuple[site_path_h, float]: + # + soma_candidate_points, candidate_indexing = soma_candidate_points_fct( + point, max_straight_sq_dist + ) + if soma_candidate_points is None: + return (), np_.inf + + costs[point] = 0.0 + costs[candidate_indexing] = 0.0 + path, length = dk_.DijkstraShortestPath(costs, point, soma_candidate_points) + costs[point] = np_.inf + costs[candidate_indexing] = np_.inf + + return path, length + + def BackReferenceSoma(self, glial_cmp: glial_cmp_t): + # + raise NotImplementedError + + def __str__(self) -> str: + # + if self.extensions is None: + n_extensions = 0 + else: + n_extensions = self.extensions.__len__() + + return ( + f"Soma.{self.uid}, " + f"contour points={self.contour_points.__len__()}, " + f"extensions={n_extensions}" + ) @staticmethod def Map(image: array_t, low: float, high: float, selem: array_t) -> array_t: @@ -170,13 +208,13 @@ class soma_t: return result @staticmethod - def FilteredMap(map_: array_t) -> array_t: + def FilteredMap(map_: array_t, min_area: int) -> array_t: result = map_.copy() lmp = ms_.label(map_) for region in ms_.regionprops(lmp): - if region.area <= min_area_c: + if region.area <= min_area: region_sites = (lmp == region.label).nonzero() result[region_sites] = 0 @@ -213,25 +251,31 @@ class soma_t: @staticmethod def SomasWithExtensionsLMap( - somas: Sequence[soma_t], soma_lmp: array_t, extensions: Sequence[extension_t] + somas: Sequence[soma_t] ) -> array_t: # - result = soma_lmp.copy() + shape = somas[0].img_shape + result = np_.zeros(shape, dtype=np_.int64) for soma in somas: - for ext_uid in soma.extension_uids: - connection_path = soma.connection_path[ext_uid] - if connection_path is not None: + result[soma.sites] = soma.uid + + for connection_path in filter( + lambda path: path is not None, soma.connection_path.values() + ): + result[connection_path] = soma.uid + for extension in soma.Extensions(): + for connection_path in filter( + lambda path: path is not None, extension.connection_path.values() + ): result[connection_path] = soma.uid - for extension in extensions: - if extension.uid == ext_uid: - result[extension.sites] = soma.uid + result[extension.sites] = soma.uid return result def NormalizedImage(image: array_t) -> array_t: - + # nonextreme_values = image[np_.logical_and(image > 0.0, image < image.max())] nonextreme_avg = np_.mean(nonextreme_values) result = image / nonextreme_avg