"""Stage 2: Extrinsic initialization via pose graph."""
from __future__ import annotations
import heapq
from collections import deque
from collections.abc import Callable
from dataclasses import dataclass, field
import cv2
import numpy as np
from numpy.typing import NDArray
from scipy.optimize import least_squares
from aquacal.config.schema import (
CameraExtrinsics,
CameraIntrinsics,
ConnectivityError,
DetectionResult,
)
from aquacal.core.board import BoardGeometry
from aquacal.core.camera import Camera
from aquacal.core.interface_model import Interface
from aquacal.core.refractive_geometry import refractive_project
from aquacal.utils.transforms import compose_poses, invert_pose, rvec_to_matrix
[docs]
@dataclass
class Observation:
"""A single camera's observation of the board in one frame."""
camera: str
frame_idx: int
corner_ids: NDArray[np.int32] # shape (N,)
corners_2d: NDArray[np.float64] # shape (N, 2)
[docs]
@dataclass
class PoseGraph:
"""
Graph connecting cameras through shared board observations.
Cameras connect indirectly: if cameras A and B both see the board in
frame F, they are linked through that frame's board pose node.
Attributes:
camera_names: List of all camera names in the graph
frame_indices: List of frame indices with 2+ camera observations
observations: All camera-board observations
adjacency: Dict mapping each node to its neighbors for connectivity analysis.
Node names: camera names (str) and frame indices prefixed with "f" (e.g., "f42")
"""
camera_names: list[str]
frame_indices: list[int]
observations: list[Observation]
adjacency: dict[str, set[str]] = field(default_factory=dict)
[docs]
def estimate_board_pose(
intrinsics: CameraIntrinsics,
corners_2d: NDArray[np.float64],
corner_ids: NDArray[np.int32],
board: BoardGeometry,
) -> tuple[NDArray[np.float64], NDArray[np.float64]] | None:
"""
Estimate board pose relative to camera using PnP.
Args:
intrinsics: Camera intrinsic parameters
corners_2d: Detected corner positions, shape (N, 2)
corner_ids: Corner IDs corresponding to corners_2d, shape (N,)
board: Board geometry for 3D corner positions
Returns:
Tuple of (rvec, tvec) representing board pose in camera frame,
or None if PnP fails (e.g., too few points).
- rvec: Rodrigues rotation vector, shape (3,)
- tvec: Translation vector, shape (3,)
Example:
>>> result = estimate_board_pose(intrinsics, corners, ids, board)
>>> if result is not None:
... rvec, tvec = result
"""
if len(corner_ids) < 4:
return None
# Get 3D points in board frame
object_points = board.get_corner_array(corner_ids).astype(np.float32)
image_points = corners_2d.astype(np.float32)
# Run PnP
success, rvec, tvec = cv2.solvePnP(
object_points,
image_points,
intrinsics.K.astype(np.float64),
intrinsics.dist_coeffs.astype(np.float64),
flags=cv2.SOLVEPNP_ITERATIVE,
)
if not success:
return None
return rvec.flatten().astype(np.float64), tvec.flatten().astype(np.float64)
[docs]
def refractive_solve_pnp(
intrinsics: CameraIntrinsics,
corners_2d: NDArray[np.float64],
corner_ids: NDArray[np.int32],
board: BoardGeometry,
water_z: float,
interface_normal: NDArray[np.float64] | None = None,
n_air: float = 1.0,
n_water: float = 1.333,
) -> tuple[NDArray[np.float64], NDArray[np.float64]] | None:
"""
Estimate board pose with refractive correction using LM refinement.
Uses standard solvePnP as initial guess, applies rough depth correction,
then refines by minimizing refractive reprojection error.
The key trick: use an identity-extrinsics camera so camera frame = world
frame. Board corners transformed by the candidate pose become "world points"
that get projected through the refractive interface.
Args:
intrinsics: Camera intrinsic parameters
corners_2d: Detected corner positions, shape (N, 2)
corner_ids: Corner IDs corresponding to corners_2d, shape (N,)
board: Board geometry for 3D corner positions
water_z: Distance from camera to water surface in meters
interface_normal: Interface normal vector. If None, uses [0, 0, -1].
n_air: Refractive index of air (default 1.0)
n_water: Refractive index of water (default 1.333)
Returns:
Tuple of (rvec, tvec) representing board pose in camera frame,
or None if PnP fails.
"""
# Get initial guess from standard PnP
result = estimate_board_pose(intrinsics, corners_2d, corner_ids, board)
if result is None:
return None
rvec_init, tvec_init = result
# Apply rough depth correction for refraction
tvec_init[2] *= n_water
if interface_normal is None:
interface_normal = np.array([0.0, 0.0, -1.0], dtype=np.float64)
# Set up identity-extrinsics camera (camera frame = world frame)
identity_ext = CameraExtrinsics(R=np.eye(3), t=np.zeros(3))
camera = Camera("_pnp", intrinsics, identity_ext)
interface = Interface(
normal=interface_normal,
camera_distances={"_pnp": water_z},
n_air=n_air,
n_water=n_water,
)
# Get 3D object points in board frame
object_points = board.get_corner_array(corner_ids).astype(np.float64)
image_points = corners_2d.astype(np.float64)
n_pts = len(object_points)
def residuals(x: NDArray[np.float64]) -> NDArray[np.float64]:
rvec = x[:3]
tvec = x[3:]
R = rvec_to_matrix(rvec)
# Board corners in camera frame (= world frame for identity camera)
pts_cam = (R @ object_points.T).T + tvec
resid = np.empty(n_pts * 2, dtype=np.float64)
for i, pt in enumerate(pts_cam):
projected = refractive_project(camera, interface, pt)
if projected is None:
resid[2 * i] = 100.0
resid[2 * i + 1] = 100.0
else:
resid[2 * i] = projected[0] - image_points[i, 0]
resid[2 * i + 1] = projected[1] - image_points[i, 1]
return resid
x0 = np.concatenate([rvec_init, tvec_init])
result_opt = least_squares(residuals, x0, method="lm", max_nfev=200)
rvec_out = result_opt.x[:3].astype(np.float64)
tvec_out = result_opt.x[3:].astype(np.float64)
return rvec_out, tvec_out
def _find_connected_components(
adjacency: dict[str, set[str]],
camera_names: list[str],
) -> list[set[str]]:
"""Find connected components in the pose graph using BFS."""
visited: set[str] = set()
components: list[set[str]] = []
for start in camera_names:
if start in visited:
continue
# BFS from this camera
component: set[str] = set()
queue = deque([start])
while queue:
node = queue.popleft()
if node in visited:
continue
visited.add(node)
component.add(node)
for neighbor in sorted(adjacency.get(node, [])):
if neighbor not in visited:
queue.append(neighbor)
components.append(component)
return components
[docs]
def build_pose_graph(
detections: DetectionResult,
min_cameras: int = 2,
) -> PoseGraph:
"""
Build pose graph from detection results.
Creates a bipartite graph where cameras and frames are nodes,
connected by observation edges. Only includes frames where
at least min_cameras see the board.
Args:
detections: Detection results from detect_all_frames
min_cameras: Minimum cameras per frame to include (default 2)
Returns:
PoseGraph with adjacency structure for connectivity analysis
Raises:
ConnectivityError: If the graph is not connected (some cameras
cannot be linked to others through shared observations).
Error message includes details about disconnected components.
Example:
>>> detections = detect_all_frames(videos, board)
>>> pose_graph = build_pose_graph(detections, min_cameras=2)
>>> print(f"Graph has {len(pose_graph.frame_indices)} usable frames")
"""
# Filter frames with enough cameras
usable_frames = detections.get_frames_with_min_cameras(min_cameras)
if not usable_frames:
raise ConnectivityError(
f"No frames with {min_cameras}+ cameras detecting the board"
)
# Collect observations and build adjacency
observations: list[Observation] = []
adjacency: dict[str, set[str]] = {}
camera_set: set[str] = set()
for frame_idx in usable_frames:
frame_node = f"f{frame_idx}"
if frame_node not in adjacency:
adjacency[frame_node] = set()
frame_det = detections.frames[frame_idx]
for cam_name, det in frame_det.detections.items():
camera_set.add(cam_name)
# Add observation
observations.append(
Observation(
camera=cam_name,
frame_idx=frame_idx,
corner_ids=det.corner_ids,
corners_2d=det.corners_2d,
)
)
# Build adjacency (bipartite: cameras <-> frames)
if cam_name not in adjacency:
adjacency[cam_name] = set()
adjacency[cam_name].add(frame_node)
adjacency[frame_node].add(cam_name)
camera_names = sorted(camera_set)
# Check connectivity
components = _find_connected_components(adjacency, camera_names)
if len(components) > 1:
# Format error message with component details
comp_strs = []
for i, comp in enumerate(components):
cameras_in_comp = [n for n in comp if not n.startswith("f")]
comp_strs.append(f"Component {i + 1}: {cameras_in_comp}")
raise ConnectivityError(
f"Pose graph is not connected. Found {len(components)} components:\n"
+ "\n".join(comp_strs)
)
return PoseGraph(
camera_names=camera_names,
frame_indices=sorted(usable_frames),
observations=observations,
adjacency=adjacency,
)
def _average_rotations(
rotations: list[NDArray[np.float64]],
weights: list[float],
) -> NDArray[np.float64]:
"""Compute weighted chordal L2 mean of rotation matrices.
Projects the weighted sum of rotation matrices back to SO(3) via SVD.
Args:
rotations: List of 3x3 rotation matrices
weights: List of non-negative weights (one per rotation)
Returns:
3x3 rotation matrix representing the weighted average
"""
M = np.zeros((3, 3), dtype=np.float64)
for R, w in zip(rotations, weights):
M += w * R
U, _, Vt = np.linalg.svd(M)
R_avg = U @ Vt
# Ensure proper rotation (det = +1)
if np.linalg.det(R_avg) < 0:
U[:, -1] *= -1
R_avg = U @ Vt
return R_avg
def _refine_poses_multi_frame(
camera_poses: dict[str, tuple[NDArray, NDArray]],
board_poses: dict[int, tuple[NDArray, NDArray]],
obs_index: dict[tuple[str, int], Observation],
pose_graph: PoseGraph,
intrinsics: dict[str, CameraIntrinsics],
board: BoardGeometry,
reference_camera: str,
water_zs: dict[str, float] | None = None,
interface_normal: NDArray[np.float64] | None = None,
n_air: float = 1.0,
n_water: float = 1.333,
) -> tuple[dict[str, tuple[NDArray, NDArray]], dict[int, tuple[NDArray, NDArray]]]:
"""Refine poses via one pass of multi-frame averaging.
Step A: For each frame, re-estimate board pose from all cameras that see it,
then average. Step B: For each non-reference camera, re-estimate camera pose
from all frames it sees, then average.
Args:
camera_poses: Dict of camera_name -> (R, t) world->camera
board_poses: Dict of frame_idx -> (R, t) board->world
obs_index: Dict of (camera_name, frame_idx) -> Observation
pose_graph: The pose graph
intrinsics: Per-camera intrinsics
board: Board geometry
reference_camera: Camera fixed at world origin
water_zs: Optional per-camera interface distances
interface_normal: Interface normal vector
n_air: Refractive index of air
n_water: Refractive index of water
Returns:
Tuple of (refined_camera_poses, refined_board_poses)
"""
refined_board_poses = dict(board_poses)
refined_camera_poses = dict(camera_poses)
# Step A: Refine board poses
for frame_idx in sorted(board_poses.keys()):
frame_node = f"f{frame_idx}"
cam_neighbors = sorted(pose_graph.adjacency.get(frame_node, []))
rotations = []
translations = []
weights = []
for cam_name in cam_neighbors:
if cam_name not in camera_poses:
continue
obs_maybe = obs_index.get((cam_name, frame_idx))
if obs_maybe is None:
continue
obs = obs_maybe
# PnP: board in camera frame
if water_zs is not None and cam_name in water_zs:
result = refractive_solve_pnp(
intrinsics[cam_name],
obs.corners_2d,
obs.corner_ids,
board,
water_zs[cam_name],
interface_normal,
n_air,
n_water,
)
else:
result = estimate_board_pose(
intrinsics[cam_name], obs.corners_2d, obs.corner_ids, board
)
if result is None:
continue
rvec_bc, tvec_bc = result
R_bc = cv2.Rodrigues(rvec_bc)[0].astype(np.float64)
# board_in_world = cam_in_world @ board_in_cam
cam_R, cam_t = camera_poses[cam_name]
R_cw, t_cw = invert_pose(cam_R, cam_t)
R_bw, t_bw = compose_poses(R_cw, t_cw, R_bc, tvec_bc)
rotations.append(R_bw)
translations.append(t_bw)
weights.append(float(len(obs.corner_ids)))
if len(rotations) >= 1:
R_avg = _average_rotations(rotations, weights)
w_total = sum(weights)
t_avg = sum(w * t for w, t in zip(weights, translations)) / w_total
refined_board_poses[frame_idx] = (R_avg, t_avg)
# Step B: Refine camera poses
for cam_name in sorted(camera_poses.keys()):
if cam_name == reference_camera:
continue
cam_neighbors = sorted(pose_graph.adjacency.get(cam_name, []))
rotations = []
translations = []
weights = []
for frame_node in cam_neighbors:
frame_idx = int(frame_node[1:])
if frame_idx not in refined_board_poses:
continue
obs_maybe = obs_index.get((cam_name, frame_idx))
if obs_maybe is None:
continue
obs = obs_maybe
# PnP: board in camera frame
if water_zs is not None and cam_name in water_zs:
result = refractive_solve_pnp(
intrinsics[cam_name],
obs.corners_2d,
obs.corner_ids,
board,
water_zs[cam_name],
interface_normal,
n_air,
n_water,
)
else:
result = estimate_board_pose(
intrinsics[cam_name], obs.corners_2d, obs.corner_ids, board
)
if result is None:
continue
rvec_bc, tvec_bc = result
R_bc = cv2.Rodrigues(rvec_bc)[0].astype(np.float64)
# world_in_cam = board_in_cam @ world_in_board
R_bw, t_bw = refined_board_poses[frame_idx]
R_wb, t_wb = invert_pose(R_bw, t_bw)
R_wc, t_wc = compose_poses(R_bc, tvec_bc, R_wb, t_wb)
rotations.append(R_wc)
translations.append(t_wc)
weights.append(float(len(obs.corner_ids)))
if len(rotations) >= 1:
R_avg = _average_rotations(rotations, weights)
w_total = sum(weights)
t_avg = sum(w * t for w, t in zip(weights, translations)) / w_total
refined_camera_poses[cam_name] = (R_avg, t_avg)
return refined_camera_poses, refined_board_poses
[docs]
def estimate_extrinsics(
pose_graph: PoseGraph,
intrinsics: dict[str, CameraIntrinsics],
board: BoardGeometry,
reference_camera: str | None = None,
water_zs: dict[str, float] | None = None,
interface_normal: NDArray[np.float64] | None = None,
n_air: float = 1.0,
n_water: float = 1.333,
progress_callback: Callable[[str, int, int], None] | None = None,
) -> dict[str, CameraExtrinsics]:
"""
Estimate camera extrinsics by chaining poses through the graph.
Uses BFS traversal from reference camera, computing each camera's
pose relative to world frame (centered at reference camera).
The algorithm fixes the reference camera at world origin (R=I, t=0),
then runs BFS through the pose graph. When visiting a frame node from
a known camera, it computes the board pose via PnP. When visiting a
camera node from a known frame, it computes the camera pose via PnP
and inversion.
Args:
pose_graph: Pose graph from build_pose_graph
intrinsics: Dict mapping camera names to intrinsics
board: Board geometry
reference_camera: Camera to place at world origin.
If None, uses first camera name (sorted).
water_zs: Optional dict mapping camera names to interface
distances in meters. When provided, uses refractive PnP for
cameras with known distances.
interface_normal: Interface normal vector. If None, uses [0, 0, -1].
n_air: Refractive index of air (default 1.0)
n_water: Refractive index of water (default 1.333)
progress_callback: Optional callback(camera_name, cameras_located, total_cameras)
called after each camera is located during BFS traversal
Returns:
Dict mapping camera names to CameraExtrinsics.
Reference camera has R=I, t=0.
Raises:
ValueError: If reference_camera not in pose_graph
ValueError: If intrinsics missing for any camera in graph
Example:
>>> extrinsics = estimate_extrinsics(pose_graph, intrinsics, board)
>>> cam0_pose = extrinsics['cam0']
>>> print(f"cam0 at world origin: {np.allclose(cam0_pose.t, 0)}")
"""
# Validate reference camera
if reference_camera is None:
reference_camera = pose_graph.camera_names[0]
elif reference_camera not in pose_graph.camera_names:
raise ValueError(f"Reference camera '{reference_camera}' not in pose graph")
# Validate intrinsics
for cam in pose_graph.camera_names:
if cam not in intrinsics:
raise ValueError(f"Missing intrinsics for camera '{cam}'")
# Index observations by (camera, frame) for quick lookup
obs_index: dict[tuple[str, int], Observation] = {}
for obs in pose_graph.observations:
obs_index[(obs.camera, obs.frame_idx)] = obs
# Initialize poses
# camera_poses: R, t transforming world -> camera
# board_poses: R, t transforming board -> world (for each frame)
camera_poses: dict[str, tuple[NDArray, NDArray]] = {}
board_poses: dict[int, tuple[NDArray, NDArray]] = {}
# Reference camera at origin
camera_poses[reference_camera] = (
np.eye(3, dtype=np.float64),
np.zeros(3, dtype=np.float64),
)
# Report progress for reference camera
total_cameras = len(pose_graph.camera_names)
if progress_callback is not None:
progress_callback(reference_camera, 1, total_cameras)
# Priority BFS to propagate poses, ordered by corner count (highest first)
# Heap entries: (-num_corners, node_name) — negative for max-heap behavior
visited_cameras: set[str] = {reference_camera}
visited_frames: set[int] = set()
# Seed heap with reference camera (max priority)
heap: list[tuple[int, str]] = [(0, reference_camera)]
while heap:
_priority, node = heapq.heappop(heap)
if not node.startswith("f"):
# Camera node - propagate to connected frames
cam_name = node
if cam_name not in camera_poses:
continue
cam_R, cam_t = camera_poses[cam_name]
# Score each neighboring frame by corner count, pick best first
frame_neighbors = []
for neighbor in pose_graph.adjacency.get(cam_name, []):
frame_idx = int(neighbor[1:]) # "f42" -> 42
if frame_idx in visited_frames:
continue
obs_maybe = obs_index.get((cam_name, frame_idx))
if obs_maybe is None:
continue
frame_neighbors.append((frame_idx, obs_maybe))
for frame_idx, obs in frame_neighbors:
if frame_idx in visited_frames:
continue
if water_zs is not None and cam_name in water_zs:
result = refractive_solve_pnp(
intrinsics[cam_name],
obs.corners_2d,
obs.corner_ids,
board,
water_zs[cam_name],
interface_normal,
n_air,
n_water,
)
else:
result = estimate_board_pose(
intrinsics[cam_name], obs.corners_2d, obs.corner_ids, board
)
if result is None:
continue
rvec_bc, tvec_bc = result # board in camera frame
R_bc: NDArray[np.float64] = cv2.Rodrigues(rvec_bc)[0].astype(np.float64)
# board_in_world = cam_in_world @ board_in_cam
R_cw, t_cw = invert_pose(cam_R, cam_t) # camera in world
R_bw, t_bw = compose_poses(R_cw, t_cw, R_bc, tvec_bc) # board in world
board_poses[frame_idx] = (R_bw, t_bw)
visited_frames.add(frame_idx)
# Priority: negative corner count for max-heap; tie-break by name
heapq.heappush(heap, (-len(obs.corner_ids), f"f{frame_idx}"))
else:
# Frame node - propagate to connected cameras
frame_idx = int(node[1:])
if frame_idx not in board_poses:
continue
R_bw, t_bw = board_poses[frame_idx]
# Score each neighboring camera by corner count
cam_neighbors = []
for neighbor in sorted(pose_graph.adjacency.get(node, [])):
cam_name = neighbor
if cam_name in visited_cameras:
continue
obs_maybe = obs_index.get((cam_name, frame_idx))
if obs_maybe is None:
continue
cam_neighbors.append((cam_name, obs_maybe))
for cam_name, obs in cam_neighbors:
if cam_name in visited_cameras:
continue
if water_zs is not None and cam_name in water_zs:
result = refractive_solve_pnp(
intrinsics[cam_name],
obs.corners_2d,
obs.corner_ids,
board,
water_zs[cam_name],
interface_normal,
n_air,
n_water,
)
else:
result = estimate_board_pose(
intrinsics[cam_name], obs.corners_2d, obs.corner_ids, board
)
if result is None:
continue
rvec_bc, tvec_bc = result # board in camera frame
R_bc = cv2.Rodrigues(rvec_bc)[0].astype(np.float64)
# world_in_cam = board_in_cam @ world_in_board
R_wb, t_wb = invert_pose(R_bw, t_bw)
R_wc, t_wc = compose_poses(R_bc, tvec_bc, R_wb, t_wb)
camera_poses[cam_name] = (R_wc, t_wc)
visited_cameras.add(cam_name)
if progress_callback is not None:
progress_callback(cam_name, len(visited_cameras), total_cameras)
heapq.heappush(heap, (-len(obs.corner_ids), cam_name))
# Multi-frame averaging refinement pass
if progress_callback is not None:
progress_callback("_averaging", len(visited_cameras), total_cameras)
camera_poses, board_poses = _refine_poses_multi_frame(
camera_poses,
board_poses,
obs_index,
pose_graph,
intrinsics,
board,
reference_camera,
water_zs,
interface_normal,
n_air,
n_water,
)
# Convert to CameraExtrinsics
extrinsics_result: dict[str, CameraExtrinsics] = {}
for cam_name in pose_graph.camera_names:
if cam_name not in camera_poses:
raise ValueError(f"Could not determine pose for camera '{cam_name}'")
R, t = camera_poses[cam_name]
extrinsics_result[cam_name] = CameraExtrinsics(R=R, t=t)
return extrinsics_result