From b3e8f04f2482dd47eecbc323f3d85b09ae9f8995 Mon Sep 17 00:00:00 2001 From: Luca Deininger Date: Thu, 30 Apr 2026 15:53:07 +0200 Subject: [PATCH 1/3] quality control vessel airways in viewer --- src/skeleplex/app/_curate.py | 416 +++++++++++++++++++++++- src/skeleplex/app/_utils.py | 23 +- src/skeleplex/app/_viewer_controller.py | 16 +- src/skeleplex/app/qt/app_controls.py | 23 ++ 4 files changed, 465 insertions(+), 13 deletions(-) diff --git a/src/skeleplex/app/_curate.py b/src/skeleplex/app/_curate.py index 23c4578..929a7eb 100644 --- a/src/skeleplex/app/_curate.py +++ b/src/skeleplex/app/_curate.py @@ -1,7 +1,10 @@ +import json import numbers +import re from collections import deque from copy import deepcopy from io import BytesIO +from pathlib import Path from typing import TYPE_CHECKING, Annotated, Any import matplotlib as mpl @@ -9,9 +12,11 @@ import networkx as nx import numpy as np from magicgui import magicgui -from qtpy.QtCore import QByteArray +from qtpy.QtCore import QByteArray, Qt from qtpy.QtGui import QPixmap from qtpy.QtWidgets import ( + QCheckBox, + QHBoxLayout, QLabel, QLineEdit, QPushButton, @@ -246,7 +251,9 @@ def delete_edge( # delete the edge from the skeleton graph for edge in edges: - delete_edge(skeleton_graph=self._data.skeleton_graph, edge=edge, force=force) + delete_edge( + skeleton_graph=self._data.skeleton_graph, edge=edge, force=force + ) if redraw: # redraw the graph @@ -517,7 +524,7 @@ def preview_split(): point_size = np.max((np.max(viewer.data.node_coordinates) * 0.01, 50)) split_edge_widget.point_visual, split_edge_widget.point_store = viewer.add_points( point_size=point_size - ) + ) split_edge_widget.point_visual.appearance.visible = False return split_edge_widget @@ -623,7 +630,7 @@ def get_min_max_values(self, edge_attribute: str): if isinstance(value, numbers.Number) and not np.isnan(value) ] if not values: - return 0,0 + return 0, 0 return min(values), max(values) def _on_run_clicked(self): @@ -741,9 +748,7 @@ def get_reachable_edges( nodes_v.add(v) nearby_nodes = nodes_u | nodes_v subgraph = graph.subgraph(nearby_nodes) - reachable = {(a, b) for a, b in subgraph.edges()} | { - (b, a) for a, b in subgraph.edges() - } + reachable = set(subgraph.edges()) | {(b, a) for a, b in subgraph.edges()} return reachable @@ -982,3 +987,400 @@ def filter_edges_by_attribute( default_color=np.array([0, 0, 0, 1], dtype=np.float32), ) viewer.data.edge_colormap = edge_colormap + + +def _parse_edge_tuple(s) -> tuple[int, int]: + """Parse a single edge into a (int, int) tuple. + + Accepts a list/array like [1, 2] or a string like '(1, 2)'. + """ + if isinstance(s, list | tuple): + val = s + else: + val = ast.literal_eval(str(s)) + if len(val) != 2: + raise ValueError(f"Edge must have exactly 2 nodes, got: {s!r}") + return (int(val[0]), int(val[1])) + + +def _parse_key_edges(key_str: str) -> list[tuple[int, int]]: + """Parse a JSON key string into one or more (int, int) edge tuples. + + Handles both a single edge '(1, 2)' and compound keys like + '(1881529, 1958293)-(1907667, 1966453)' where two edges are + joined by a dash. + """ + matches = re.findall(r"\((\d+),\s*(\d+)\)", key_str) + if not matches: + raise ValueError(f"Cannot parse edge key: {key_str!r}") + return [(int(u), int(v)) for u, v in matches] + + +def _find_shortest_path_edges( + graph: nx.Graph, + edge1: tuple[int, int], + edge2: tuple[int, int], +) -> list[tuple[int, int]]: + """Return the edges on the shortest path connecting two edges. + + Tries all four endpoint combinations of edge1 and edge2 and picks the + shortest path found. The input edges themselves are excluded from the + result. Returns an empty list when no path exists. + """ + u1, v1 = edge1 + u2, v2 = edge2 + + best_path: list[int] | None = None + for src, dst in [(u1, u2), (u1, v2), (v1, u2), (v1, v2)]: + try: + path = nx.shortest_path(graph, src, dst) + if best_path is None or len(path) < len(best_path): + best_path = path + except (nx.NetworkXNoPath, nx.NodeNotFound): + pass + + if best_path is None: + return [] + + search_edges = {tuple(sorted(edge1)), tuple(sorted(edge2))} + return [ + (best_path[i], best_path[i + 1]) + for i in range(len(best_path) - 1) + if tuple(sorted((best_path[i], best_path[i + 1]))) not in search_edges + ] + + +class EdgeColoringNavigatorWidget(QWidget): + """Auxiliary widget for navigating through edge coloring entries from a JSON file. + + The JSON file keys identify pairs of red edges (e.g. + '(1881529, 1958293)-(1907667, 1966453)'). When an entry is displayed, + the shortest path between the two red edges is computed live on the + current graph and shown in green. The path is recomputed automatically + after any graph change (e.g. edge deletion) and on every navigation step. + + JSON format example:: + + {"(1881529, 1958293)-(1907667, 1966453)": []} + + The value list is ignored; only the key is used. + + Parameters + ---------- + viewer : SkelePlexApp + The SkelePlex application instance. + edge_coloring_path : Path | None + Optional path to a JSON file to load immediately on construction. + """ + + _RED_COLOR = np.array([1.0, 0.0, 0.0, 1.0], dtype=np.float32) + _GREEN_COLOR = np.array([0.0, 0.8, 0.0, 1.0], dtype=np.float32) + _DEFAULT_BLUE = np.array([0.0, 0.0, 1.0, 1.0], dtype=np.float32) + + def __init__(self, viewer, edge_coloring_path: Path | None = None) -> None: + super().__init__() + self.viewer = viewer + + # Each entry is just the list of red edges parsed from the JSON key. + self._entries: list[list[tuple[int, int]]] = [] + self._current_index: int = -1 + # Guard against the colormap-setter's own events.data triggering us. + self._applying: bool = False + # False = all-blue context view; True = red/green highlight view. + self._coloring_active: bool = False + # Center of the last auto-computed bounding box; used when user resizes. + self._bb_center: np.ndarray | None = None + + self._prev_button = QPushButton("< Prev") + self._next_button = QPushButton("Next >") + self._entry_label = QLabel("0/0") + self._entry_label.setAlignment(Qt.AlignCenter) + + nav_layout = QHBoxLayout() + nav_layout.addWidget(self._prev_button) + nav_layout.addWidget(self._entry_label) + nav_layout.addWidget(self._next_button) + + self._bb_width_spinbox = QSpinBox() + self._bb_width_spinbox.setMinimum(0) + self._bb_width_spinbox.setMaximum(999999) + self._bb_width_spinbox.setSpecialValueText("—") + + self._update_view_button = QPushButton("Update") + + bb_layout = QHBoxLayout() + bb_layout.addWidget(QLabel("BB size:")) + bb_layout.addWidget(self._bb_width_spinbox) + bb_layout.addWidget(self._update_view_button) + + self._render_seg_checkbox = QCheckBox("Render segmentation") + + self._toggle_button = QPushButton("Show coloring") + self._reset_button = QPushButton("Reset (full view)") + self._status_label = QLabel("") + self._status_label.setWordWrap(True) + + self._prev_button.clicked.connect(self._on_prev_clicked) + self._next_button.clicked.connect(self._on_next_clicked) + self._toggle_button.clicked.connect(self._on_toggle_clicked) + self._reset_button.clicked.connect(self._on_reset_clicked) + self._render_seg_checkbox.stateChanged.connect(self._on_render_seg_changed) + self._update_view_button.clicked.connect(self._on_update_view_clicked) + + layout = QVBoxLayout() + layout.addLayout(nav_layout) + layout.addLayout(bb_layout) + layout.addWidget(self._render_seg_checkbox) + layout.addWidget(self._toggle_button) + layout.addWidget(self._reset_button) + layout.addWidget(self._status_label) + self.setLayout(layout) + + viewer.add_auxiliary_widget(self, name="Edge Coloring Navigator") + + # Recompute the shortest path whenever the graph changes (e.g. after + # an edge deletion). The _applying flag prevents the colormap setter's + # own events.data signal from feeding back here. + viewer.data.events.data.connect(self._on_graph_changed) + + if edge_coloring_path is not None: + self.load_entries(edge_coloring_path) + + def load_entries(self, path: Path) -> None: + """Load red-edge pairs from a JSON file and show the first entry. + + Supports two formats: + + - List format (preferred):: + + [[[u1, v1], [u2, v2]], [[u3, v3], [u4, v4]], ...] + + - Dict format (legacy):: + + {"(u1, v1)-(u2, v2)": [...], ...} + """ + try: + with open(path) as f: + data = json.load(f) + except FileNotFoundError: + self._set_status(f"File not found: {path}") + return + except json.JSONDecodeError as e: + self._set_status(f"JSON parse error: {e}") + return + + entries = [] + try: + if isinstance(data, list): + for item in data: + red_edges = [_parse_edge_tuple(e) for e in item] + entries.append(red_edges) + else: + for key_str in data.keys(): + red_edges = _parse_key_edges(key_str) + entries.append(red_edges) + except Exception as e: + self._set_status(f"Error parsing entries: {e}") + return + + self._entries = entries + self._current_index = 0 if entries else -1 + self._update_label() + + if entries: + self._apply_current_entry() + else: + self._set_status("No entries found in file.") + + def _on_file_path_selected(self, path: "Path | None") -> None: + """Called when the left-panel file picker selects a new file.""" + if path is not None: + self.load_entries(path) + + def _on_graph_changed(self) -> None: + """Recompute the shortest path after any graph structural change.""" + if self._applying: + return + if self._current_index >= 0 and self._entries: + self._apply_current_entry() + + def _on_prev_clicked(self) -> None: + if not self._entries or self._current_index <= 0: + return + self._current_index -= 1 + self._update_label() + self._apply_current_entry() + + def _on_next_clicked(self) -> None: + if not self._entries or self._current_index >= len(self._entries) - 1: + return + self._current_index += 1 + self._update_label() + self._apply_current_entry() + + def _on_toggle_clicked(self) -> None: + self._coloring_active = not self._coloring_active + self._toggle_button.setText( + "Show all blue" if self._coloring_active else "Show coloring" + ) + current_size = self._bb_width_spinbox.value() + self._apply_current_entry( + bb_size_override=current_size if current_size > 0 else None + ) + + def _on_reset_clicked(self) -> None: + self._coloring_active = False + self._bb_center = None + self._toggle_button.setText("Show coloring") + self._bb_width_spinbox.blockSignals(True) + self._bb_width_spinbox.setValue(0) + self._bb_width_spinbox.blockSignals(False) + self.viewer.data.skeleton_view.mode = "all" + self.viewer.data.segmentation_view.mode = "none" + self.viewer.data.edge_colormap = EdgeColormap.from_arrays( + colormap={}, + default_color=self._DEFAULT_BLUE, + ) + self._set_status("") + + def _on_update_view_clicked(self) -> None: + custom_size = self._bb_width_spinbox.value() + self._apply_current_entry( + bb_size_override=custom_size if custom_size > 0 else None + ) + + def _on_render_seg_changed(self) -> None: + if self._bb_center is None: + return + current_size = self._bb_width_spinbox.value() + if current_size <= 0: + return + half = current_size / 2.0 + self._apply_bounding_box(self._bb_center - half, self._bb_center + half) + + def _apply_bounding_box(self, bb_min: np.ndarray, bb_max: np.ndarray) -> None: + skel_view = self.viewer.data.skeleton_view + skel_view.bounding_box._min_coordinate = bb_min + skel_view.bounding_box._max_coordinate = bb_max + skel_view.mode = "bounding_box" + + if ( + self._render_seg_checkbox.isChecked() + and self.viewer.data._segmentation is not None + ): + seg_view = self.viewer.data.segmentation_view + seg_view.bounding_box._min_coordinate = bb_min + seg_view.bounding_box._max_coordinate = bb_max + seg_view.mode = "bounding_box" + else: + self.viewer.data.segmentation_view.mode = "none" + + def _update_label(self) -> None: + n = len(self._entries) + if n == 0: + self._entry_label.setText("0/0") + else: + self._entry_label.setText(f"{self._current_index + 1}/{n}") + + def _apply_current_entry(self, bb_size_override: int | None = None) -> None: + if not self._entries or self._current_index < 0: + return + + red_edges = self._entries[self._current_index] + graph = self.viewer.data.skeleton_graph + if graph is None: + self._set_status("No skeleton graph loaded.") + return + + warnings = [] + color_dict: dict[tuple[int, int], np.ndarray] = {} + visible_nodes: set[int] = set() + + missing_red = [] + for r_edge in red_edges: + if not graph.graph.has_edge(*r_edge): + missing_red.append(r_edge) + else: + # Add both directions so the colormap lookup succeeds regardless + # of the canonical edge direction stored in edge_splines. + color_dict[r_edge] = self._RED_COLOR.copy() + color_dict[(r_edge[1], r_edge[0])] = self._RED_COLOR.copy() + visible_nodes.update(r_edge) + if missing_red: + warnings.append(f"Red edges not found (deleted?): {missing_red}") + + # Compute shortest path live between the first two red edges. + green_edges: list[tuple[int, int]] = [] + if len(red_edges) >= 2: + green_edges = _find_shortest_path_edges( + graph.graph, red_edges[0], red_edges[1] + ) + if not green_edges and not missing_red: + warnings.append("No path found between the two red edges.") + for g_edge in green_edges: + color_dict[g_edge] = self._GREEN_COLOR.copy() + color_dict[(g_edge[1], g_edge[0])] = self._GREEN_COLOR.copy() + visible_nodes.update(g_edge) + + # Zoom to a bounding box that covers all visible nodes with a margin. + # Use the full (unfiltered) data arrays for the coordinate lookup. + all_node_keys = self.viewer.data.node_keys + all_node_coords = self.viewer.data.node_coordinates + bb_applied = False + if all_node_keys is not None and all_node_coords is not None and visible_nodes: + mask = np.isin(all_node_keys, list(visible_nodes)) + visible_coords = all_node_coords[mask] + if len(visible_coords) > 0: + bb_min_tight = visible_coords.min(axis=0) + bb_max_tight = visible_coords.max(axis=0) + margin = np.maximum((bb_max_tight - bb_min_tight) * 0.2, 50.0) + bb_min_auto = bb_min_tight - margin + bb_max_auto = bb_max_tight + margin + self._bb_center = (bb_min_auto + bb_max_auto) / 2 + if bb_size_override is not None and bb_size_override > 0: + half = bb_size_override / 2.0 + bb_min = self._bb_center - half + bb_max = self._bb_center + half + bb_width = bb_size_override + else: + bb_min = bb_min_auto + bb_max = bb_max_auto + bb_width = round(float(np.max(bb_max - bb_min))) + self._bb_width_spinbox.blockSignals(True) + self._bb_width_spinbox.setValue(bb_width) + self._bb_width_spinbox.blockSignals(False) + self._apply_bounding_box(bb_min, bb_max) + bb_applied = True + + if not bb_applied: + self._bb_center = None + self._bb_width_spinbox.blockSignals(True) + self._bb_width_spinbox.setValue(0) + self._bb_width_spinbox.blockSignals(False) + self.viewer.data.segmentation_view.mode = "none" + + # Guard: the colormap setter emits events.data which would re-enter + # here via _on_graph_changed without this flag. + self._applying = True + try: + self.viewer.data.edge_colormap = EdgeColormap.from_arrays( + colormap=color_dict if self._coloring_active else {}, + default_color=self._DEFAULT_BLUE, + ) + finally: + self._applying = False + + red_str = ", ".join(str(e) for e in red_edges) + path_str = " → ".join(str(e) for e in green_edges) if green_edges else "none" + status = ( + f"Entry {self._current_index + 1}/{len(self._entries)}: " + f"{len(red_edges)} red, {len(green_edges)} green edge(s)\n" + f"Red: {red_str}\n" + f"Path: {path_str}" + ) + if warnings: + status += "\n" + "; ".join(warnings) + self._set_status(status) + + def _set_status(self, msg: str) -> None: + self._status_label.setText(msg) diff --git a/src/skeleplex/app/_utils.py b/src/skeleplex/app/_utils.py index 4758e56..9cf49c2 100644 --- a/src/skeleplex/app/_utils.py +++ b/src/skeleplex/app/_utils.py @@ -13,9 +13,11 @@ SkeletonDataPaths, SkeletonGraphFile, ) -from skeleplex.app._curate import (ChangeBranchColorWidget, - make_split_edge_widget, - RenderReachableEdgesWidget +from skeleplex.app._curate import ( + ChangeBranchColorWidget, + EdgeColoringNavigatorWidget, + RenderReachableEdgesWidget, + make_split_edge_widget, ) # store reference to QApplication to prevent garbage collection @@ -27,6 +29,7 @@ def view_skeleton( segmentation_path: str | None = None, segmentation_voxel_size_um: tuple[float, float, float] = (1, 1, 1), launch_widgets: bool = True, + edge_coloring_path: str | None = None, ): """Launch the skeleton viewer application. @@ -42,6 +45,12 @@ def view_skeleton( launch_widgets : bool, optional Whether to launch the auxiliary widgets for curation. Defaults to True. + edge_coloring_path : str | None + Path to a JSON file for edge coloring navigation. + The JSON maps red-edge keys (e.g. '(1, 2)') to lists of green-edge + keys. The Edge Coloring Navigator widget allows stepping through + entries. The file can also be loaded interactively via the Data Stores + panel in the GUI. Returns ------- @@ -124,6 +133,14 @@ def view_skeleton( viewer.add_auxiliary_widget(split_edge_widget.native, name="Split edge") RenderReachableEdgesWidget(viewer) + coloring_path = Path(edge_coloring_path) if edge_coloring_path else None + edge_coloring_widget = EdgeColoringNavigatorWidget( + viewer, edge_coloring_path=coloring_path + ) + viewer._main_window.app_controls.widget().load_edge_coloring_group_box.load_widget.called.connect( + edge_coloring_widget._on_file_path_selected + ) + except Exception as e: print(f"Error launching widgets: {e}") return viewer diff --git a/src/skeleplex/app/_viewer_controller.py b/src/skeleplex/app/_viewer_controller.py index 052f1ac..f013cec 100644 --- a/src/skeleplex/app/_viewer_controller.py +++ b/src/skeleplex/app/_viewer_controller.py @@ -186,7 +186,9 @@ def update_skeleton_geometry( if self._skeleton.node_highlight_visual is None: # make the highlight points material highlight_points_material_3d = PointsUniformAppearance( - size=sizes["node_highlight"], color=(0, 1, 0, 1), size_coordinate_space="data" + size=sizes["node_highlight"], + color=(0, 1, 0, 1), + size_coordinate_space="data", ) # make the highlight points model @@ -235,11 +237,15 @@ def update_skeleton_geometry( else: self._skeleton.node_visual.appearance.size = sizes["node"] if self._skeleton.node_highlight_visual is not None: - self._skeleton.node_highlight_visual.appearance.size = sizes["node_highlight"] + self._skeleton.node_highlight_visual.appearance.size = sizes[ + "node_highlight" + ] if self._skeleton.edges_visual is not None: self._skeleton.edges_visual.appearance.size = sizes["edge"] if self._skeleton.edge_highlight_visual is not None: - self._skeleton.edge_highlight_visual.appearance.size = sizes["edge_highlight"] + self._skeleton.edge_highlight_visual.appearance.size = sizes[ + "edge_highlight" + ] # reslice the scene self._backend.reslice_scene(scene_id=self.scene_id) @@ -260,6 +266,9 @@ def update_segmentation_image( The 4x4 affine transform to apply to the segmentation image. """ if image is None: + if self._segmentation.visual is not None: + self._segmentation.visual.appearance.visible = False + self._backend.reslice_scene(scene_id=self.scene_id) return # make the segmentation data store @@ -299,6 +308,7 @@ def update_segmentation_image( else: self._segmentation.visual.transform = transform + self._segmentation.visual.appearance.visible = True # reslice the scene self._backend.reslice_scene(scene_id=self.scene_id) diff --git a/src/skeleplex/app/qt/app_controls.py b/src/skeleplex/app/qt/app_controls.py index e6e2a8f..9913e73 100644 --- a/src/skeleplex/app/qt/app_controls.py +++ b/src/skeleplex/app/qt/app_controls.py @@ -482,6 +482,27 @@ def _load_segmentation_gui( ) +class LoadEdgeColoringGroupBox(QGroupBox): + """A widget for loading an edge coloring JSON file.""" + + def __init__(self, parent=None): + super().__init__(title="Edge Coloring", parent=parent) + + self.load_widget = magicgui(self._load_edge_coloring_gui) + + layout = QVBoxLayout() + layout.addWidget(self.load_widget.native) + self.setLayout(layout) + + self.setStyleSheet(GROUP_BOX_STYLE) + + def _load_edge_coloring_gui( + self, + json_path: Path | None = None, + ) -> Path | None: + return json_path + + class AppControlsWidget(QWidget): """A widget for the application controls. @@ -494,11 +515,13 @@ def __init__(self, parent: QWidget | None = None): # make the widgets for loading data self.load_skeleton_group_box = LoadSkeletonDataGroupBox(parent=self) self.load_segmentation_group_box = LoadSegmentationDataGroupBox(parent=self) + self.load_edge_coloring_group_box = LoadEdgeColoringGroupBox(parent=self) stores_box = FlatVGroupBox( "Data Stores", accent_color="#b7e2d8", collapsible=True, parent=self ) stores_box.add_widget(self.load_skeleton_group_box) stores_box.add_widget(self.load_segmentation_group_box) + stores_box.add_widget(self.load_edge_coloring_group_box) # widget for selecting the skeleton data view self.skeleton_view_box = SkeletonDataViewWidget( From 8e5f71990c333c620b80af601ec08597e36b297f Mon Sep 17 00:00:00 2001 From: Luca Deininger Date: Tue, 5 May 2026 10:47:42 +0200 Subject: [PATCH 2/3] script to find candidate vessel/airway edge pairs for QC --- ...extract_candidate_vessel_airway_regions.py | 179 ++++++++++++++++++ 1 file changed, 179 insertions(+) create mode 100644 scripts/extract_candidate_vessel_airway_regions.py diff --git a/scripts/extract_candidate_vessel_airway_regions.py b/scripts/extract_candidate_vessel_airway_regions.py new file mode 100644 index 0000000..f8b269a --- /dev/null +++ b/scripts/extract_candidate_vessel_airway_regions.py @@ -0,0 +1,179 @@ +"""Find candidate vessel/airway edge pairs for QC. + +Outputs a JSON file consumable by the EdgeColoringNavigatorWidget: + [ [[u1, v1], [u2, v2]], ... ] +""" + +import json +import random +from pathlib import Path + +import numpy as np +from scipy.spatial import KDTree + +from skeleplex.graph.constants import EDGE_COORDINATES_KEY, EDGE_SPLINE_KEY +from skeleplex.graph.skeleton_graph import SkeletonGraph + +# ── Paths ───────────────────────────────────────────────────────────────────── +GRAPH_PATH = Path( + "/local1/dluca/lung/20260429_graph_seg/" + "LADAF-2021-17-left-v9_graph_break_detection_on_scales_curated.json" +) +OUTPUT_PATH = Path("candidate_vessel_airway_pairs.json") + +# ── Parameters ──────────────────────────────────────────────────────────────── +SUBSET_SIZE = None # None = use all edges +N_PAIRS = None # None = no limit +MAX_DIST_UM = 2000 # maximum distance between parallel edges in µm +MAX_ANGLE = 30 # maximum angle of parallel edges in degrees +MIN_LENGTH = 3500 # minimum length of parallel edges in µm +RANDOM_SEED = 42 # for subsampling +# ────────────────────────────────────────────────────────────────────────────── + + +def _get_edge_data(g, u, v): + """Return the attribute dict for edge (u, v), handling multigraph keys.""" + return g[u][v][next(iter(g[u][v]))] + + +def _get_edge_direction(g, u, v): + """Return the unit direction vector of edge (u, v) from start to end.""" + data = _get_edge_data(g, u, v) + if EDGE_COORDINATES_KEY in data: + coords = data[EDGE_COORDINATES_KEY] + direction = coords[-1] - coords[0] + elif EDGE_SPLINE_KEY in data: + pts = np.array(data[EDGE_SPLINE_KEY].eval(np.array([0.01, 0.99]))) + direction = pts[-1] - pts[0] + else: + raise KeyError(f"Edge ({u},{v}) has neither path nor spline.") + return direction / np.linalg.norm(direction) + + +def _get_edge_midpoint(g, u, v): + """Return the 3-D midpoint coordinate of edge (u, v).""" + data = _get_edge_data(g, u, v) + if EDGE_COORDINATES_KEY in data: + coords = data[EDGE_COORDINATES_KEY] + return coords[len(coords) // 2] + elif EDGE_SPLINE_KEY in data: + return np.array(data[EDGE_SPLINE_KEY].eval(np.array([0.5])))[0] + else: + raise KeyError(f"Edge ({u},{v}) has neither path nor spline.") + + +def _get_edge_length(g, u, v): + """Return the arc length of edge (u, v) in the same units as the coordinates.""" + data = _get_edge_data(g, u, v) + if EDGE_COORDINATES_KEY in data: + coords = data[EDGE_COORDINATES_KEY] + elif EDGE_SPLINE_KEY in data: + coords = np.array(data[EDGE_SPLINE_KEY].eval(np.linspace(0.01, 0.99, 20))) + else: + raise KeyError(f"Edge ({u},{v}) has neither path nor spline.") + return float(np.sum(np.linalg.norm(np.diff(coords, axis=0), axis=1))) + + +def _angle_between_edges(g, edge1, edge2): + """Return the acute angle in degrees between the directions of two edges.""" + d1 = _get_edge_direction(g, *edge1) + d2 = _get_edge_direction(g, *edge2) + cos_angle = np.clip(np.dot(d1, d2), -1.0, 1.0) + angle = np.degrees(np.arccos(cos_angle)) + return min(angle, 180 - angle) + + +def find_candidate_pairs( + g, + *, + subset_size=None, + n_pairs=1000, + max_dist_um=2000, + max_angle=30, + min_length=3500, + random_seed=42, +): + """Return edge pairs that are spatially close and nearly parallel. + + Returns + ------- + list of (edge1, edge2, angle_deg, dist_um) + """ + all_edges = list(g.edges()) + print(f"Total edges: {len(all_edges)}") + + random.seed(random_seed) + pool = ( + all_edges + if subset_size is None + else random.sample(all_edges, min(subset_size, len(all_edges))) + ) + + midpoints = {} + for u, v in pool: + try: + if _get_edge_length(g, u, v) >= min_length: + midpoints[(u, v)] = _get_edge_midpoint(g, u, v) + except Exception: + pass + + valid_edges = list(midpoints.keys()) + coords_arr = np.array([midpoints[e] for e in valid_edges]) + print(f"Edges passing length filter ({min_length} µm): {len(valid_edges)}") + + tree = KDTree(coords_arr) + seen = set() + pairs = [] + for i, edge1 in enumerate(valid_edges): + dists, nbrs = tree.query(coords_arr[i], k=2) + if dists[1] >= max_dist_um: + continue + edge2 = valid_edges[nbrs[1]] + key = frozenset([edge1, edge2]) + if key in seen: + continue + seen.add(key) + try: + angle = _angle_between_edges(g, edge1, edge2) + if angle <= max_angle: + pairs.append((edge1, edge2, angle, dists[1])) + except Exception: + pass + if n_pairs is not None and len(pairs) >= n_pairs: + break + + print( + f"Found {len(pairs)} pairs within {max_dist_um} µm " f"and angle ≤ {max_angle}°" + ) + return pairs + + +def main(): + """Load skeleton graph, find candidate edge pairs, and save results as JSON.""" + print(f"Loading graph from {GRAPH_PATH} ...") + skeleton = SkeletonGraph.from_json_file(GRAPH_PATH) + g = skeleton.graph + + pairs = find_candidate_pairs( + g, + subset_size=SUBSET_SIZE, + n_pairs=N_PAIRS, + max_dist_um=MAX_DIST_UM, + max_angle=MAX_ANGLE, + min_length=MIN_LENGTH, + random_seed=RANDOM_SEED, + ) + + print(f"\n{'Edge 1':<28} {'Edge 2':<28} {'Angle':>8} {'Distance':>12}") + print("-" * 82) + for edge1, edge2, angle, dist in pairs: + print(f" {edge1!s:<26} {edge2!s:<26} {angle:>7.2f}° {dist:>10.1f} µm") + + output = [[list(edge1), list(edge2)] for edge1, edge2, *_ in pairs] + with open(OUTPUT_PATH, "w") as f: + json.dump(output, f) + print(f"\nSaved {len(output)} pairs → {OUTPUT_PATH}") + + +if __name__ == "__main__": + main() From 16745202f8deb82368c66b3b092004388e5b0e87 Mon Sep 17 00:00:00 2001 From: Luca Deininger Date: Tue, 5 May 2026 11:28:31 +0200 Subject: [PATCH 3/3] shortest path recalculation & re-rendering after edge deletion --- src/skeleplex/app/_curate.py | 60 +++++++++++++----------------------- 1 file changed, 21 insertions(+), 39 deletions(-) diff --git a/src/skeleplex/app/_curate.py b/src/skeleplex/app/_curate.py index 929a7eb..00609a3 100644 --- a/src/skeleplex/app/_curate.py +++ b/src/skeleplex/app/_curate.py @@ -1117,8 +1117,6 @@ def __init__(self, viewer, edge_coloring_path: Path | None = None) -> None: self._toggle_button = QPushButton("Show coloring") self._reset_button = QPushButton("Reset (full view)") - self._status_label = QLabel("") - self._status_label.setWordWrap(True) self._prev_button.clicked.connect(self._on_prev_clicked) self._next_button.clicked.connect(self._on_next_clicked) @@ -1133,15 +1131,15 @@ def __init__(self, viewer, edge_coloring_path: Path | None = None) -> None: layout.addWidget(self._render_seg_checkbox) layout.addWidget(self._toggle_button) layout.addWidget(self._reset_button) - layout.addWidget(self._status_label) self.setLayout(layout) viewer.add_auxiliary_widget(self, name="Edge Coloring Navigator") # Recompute the shortest path whenever the graph changes (e.g. after - # an edge deletion). The _applying flag prevents the colormap setter's - # own events.data signal from feeding back here. - viewer.data.events.data.connect(self._on_graph_changed) + # an edge deletion). skeleton_view.events.data is emitted by + # skeleton_view.update(), which is called at the end of + # _update_and_request_redraw() after every structural edit. + viewer.data.skeleton_view.events.data.connect(self._on_graph_changed) if edge_coloring_path is not None: self.load_entries(edge_coloring_path) @@ -1202,7 +1200,10 @@ def _on_graph_changed(self) -> None: if self._applying: return if self._current_index >= 0 and self._entries: - self._apply_current_entry() + current_size = self._bb_width_spinbox.value() + self._apply_current_entry( + bb_size_override=current_size if current_size > 0 else None + ) def _on_prev_clicked(self) -> None: if not self._entries or self._current_index <= 0: @@ -1285,38 +1286,37 @@ def _update_label(self) -> None: def _apply_current_entry(self, bb_size_override: int | None = None) -> None: if not self._entries or self._current_index < 0: return + if self._applying: + return + self._applying = True + try: + self._do_apply_current_entry(bb_size_override) + finally: + self._applying = False + def _do_apply_current_entry(self, bb_size_override: int | None = None) -> None: red_edges = self._entries[self._current_index] graph = self.viewer.data.skeleton_graph if graph is None: self._set_status("No skeleton graph loaded.") return - warnings = [] color_dict: dict[tuple[int, int], np.ndarray] = {} visible_nodes: set[int] = set() - missing_red = [] for r_edge in red_edges: - if not graph.graph.has_edge(*r_edge): - missing_red.append(r_edge) - else: + if graph.graph.has_edge(*r_edge): # Add both directions so the colormap lookup succeeds regardless # of the canonical edge direction stored in edge_splines. color_dict[r_edge] = self._RED_COLOR.copy() color_dict[(r_edge[1], r_edge[0])] = self._RED_COLOR.copy() visible_nodes.update(r_edge) - if missing_red: - warnings.append(f"Red edges not found (deleted?): {missing_red}") # Compute shortest path live between the first two red edges. - green_edges: list[tuple[int, int]] = [] if len(red_edges) >= 2: green_edges = _find_shortest_path_edges( graph.graph, red_edges[0], red_edges[1] ) - if not green_edges and not missing_red: - warnings.append("No path found between the two red edges.") for g_edge in green_edges: color_dict[g_edge] = self._GREEN_COLOR.copy() color_dict[(g_edge[1], g_edge[0])] = self._GREEN_COLOR.copy() @@ -1359,28 +1359,10 @@ def _apply_current_entry(self, bb_size_override: int | None = None) -> None: self._bb_width_spinbox.blockSignals(False) self.viewer.data.segmentation_view.mode = "none" - # Guard: the colormap setter emits events.data which would re-enter - # here via _on_graph_changed without this flag. - self._applying = True - try: - self.viewer.data.edge_colormap = EdgeColormap.from_arrays( - colormap=color_dict if self._coloring_active else {}, - default_color=self._DEFAULT_BLUE, - ) - finally: - self._applying = False - - red_str = ", ".join(str(e) for e in red_edges) - path_str = " → ".join(str(e) for e in green_edges) if green_edges else "none" - status = ( - f"Entry {self._current_index + 1}/{len(self._entries)}: " - f"{len(red_edges)} red, {len(green_edges)} green edge(s)\n" - f"Red: {red_str}\n" - f"Path: {path_str}" + self.viewer.data.edge_colormap = EdgeColormap.from_arrays( + colormap=color_dict if self._coloring_active else {}, + default_color=self._DEFAULT_BLUE, ) - if warnings: - status += "\n" + "; ".join(warnings) - self._set_status(status) def _set_status(self, msg: str) -> None: - self._status_label.setText(msg) + pass