Source code for aquacal.calibration.interface_estimation

"""Stage 3 refractive optimization for interface estimation.

This module implements joint optimization of camera extrinsics, per-camera
interface distances, and board poses using underwater ChArUco detections.
"""

import numpy as np
from numpy.typing import NDArray
from scipy.optimize import least_squares

from aquacal.calibration._optim_common import (
    build_bounds,
    build_jacobian_sparsity,
    compute_residuals,
    make_sparse_jacobian_func,
    pack_params,
    unpack_params,
)
from aquacal.calibration.extrinsics import _average_rotations, refractive_solve_pnp
from aquacal.config.schema import (
    BoardPose,
    CameraExtrinsics,
    CameraIntrinsics,
    ConvergenceError,
    DetectionResult,
    InsufficientDataError,
    Vec3,
)
from aquacal.core.board import BoardGeometry
from aquacal.utils.transforms import (
    compose_poses,
    invert_pose,
    matrix_to_rvec,
    rvec_to_matrix,
)


def _compute_initial_board_poses(
    detections: DetectionResult,
    intrinsics: dict[str, CameraIntrinsics],
    extrinsics: dict[str, CameraExtrinsics],
    board: BoardGeometry,
    min_corners: int = 4,
    water_zs: dict[str, float] | None = None,
    interface_normal: NDArray[np.float64] | None = None,
    n_air: float = 1.0,
    n_water: float = 1.333,
) -> dict[int, BoardPose]:
    """
    Compute initial board poses via refractive PnP for each frame.

    For each frame, uses the camera with the most detected corners to estimate
    the board pose using refractive_solve_pnp, which accounts for refraction
    at the air-water interface.

    Args:
        detections: Detection results
        intrinsics: Per-camera intrinsics
        extrinsics: Per-camera extrinsics (for transforming to world frame)
        board: Board geometry
        min_corners: Minimum corners required for PnP
        water_zs: Per-camera interface distances. If None,
            defaults to 0.15m for all cameras.
        interface_normal: Interface normal vector. If None, uses [0, 0, -1].
        n_air: Refractive index of air (default 1.0)
        n_water: Refractive index of water (default 1.333)

    Returns:
        Dict mapping frame_idx to BoardPose (in world frame)
    """
    if water_zs is None:
        water_zs = {cam: 0.15 for cam in intrinsics.keys()}

    board_poses = {}

    for frame_idx, frame_det in detections.frames.items():
        # Find camera with most corners
        best_cam = None
        best_count = 0
        for cam_name, det in frame_det.detections.items():
            if det.num_corners >= min_corners and det.num_corners > best_count:
                best_cam = cam_name
                best_count = det.num_corners

        if best_cam is None:
            continue

        det = frame_det.detections[best_cam]

        result = refractive_solve_pnp(
            intrinsics[best_cam],
            det.corners_2d,
            det.corner_ids,
            board,
            water_zs.get(best_cam, 0.15),
            interface_normal,
            n_air,
            n_water,
        )
        if result is None:
            continue

        rvec_bc, tvec_bc = result

        # Transform board pose from camera frame to world frame
        # board_in_world = camera_in_world @ board_in_camera
        ext = extrinsics[best_cam]
        R_cw, t_cw = invert_pose(ext.R, ext.t)  # camera in world
        R_bc = rvec_to_matrix(rvec_bc)
        R_bw, t_bw = compose_poses(R_cw, t_cw, R_bc, tvec_bc)
        rvec_bw = matrix_to_rvec(R_bw)

        board_poses[frame_idx] = BoardPose(
            frame_idx=frame_idx,
            rvec=rvec_bw,
            tvec=t_bw,
        )

    return board_poses


[docs] def optimize_interface( detections: DetectionResult, intrinsics: dict[str, CameraIntrinsics], initial_extrinsics: dict[str, CameraExtrinsics], board: BoardGeometry, reference_camera: str, initial_water_zs: dict[str, float] | None = None, interface_normal: Vec3 | None = None, n_air: float = 1.0, n_water: float = 1.333, loss: str = "huber", loss_scale: float = 1.0, min_corners: int = 4, use_sparse_jacobian: bool = True, verbose: int = 1, normal_fixed: bool = True, ) -> tuple[dict[str, CameraExtrinsics], dict[str, float], list[BoardPose], float]: """ Jointly optimize camera extrinsics, interface distances, and board poses. This is Stage 3 of the calibration pipeline. It refines the initial estimates from Stage 2 by accounting for refraction at the air-water interface. Internally, a single global water_z parameter replaces N per-camera interface distances. Per-camera distances are derived as d_i = water_z - C_z_i, where C_z_i is the camera center's Z coordinate. This eliminates the degeneracy between camera height and interface distance by construction. Note: For details on the optimizer implementation and sparse Jacobian structure, see the :doc:`Optimizer Guide </guide/optimizer>` guide. Args: detections: Underwater ChArUco detections from detect_all_frames intrinsics: Per-camera intrinsic parameters (fixed during optimization) initial_extrinsics: Initial camera extrinsics from Stage 2 board: ChArUco board geometry reference_camera: Camera name to fix at origin (extrinsics not optimized) initial_water_zs: Optional initial distances per camera. If None, defaults to 0.15m for all cameras. interface_normal: Interface normal vector. If None, uses [0, 0, -1]. Normal is fixed during optimization. n_air: Refractive index of air (default 1.0) n_water: Refractive index of water (default 1.333) loss: Robust loss function ("linear", "huber", "soft_l1", "cauchy") loss_scale: Scale parameter for robust loss in pixels min_corners: Minimum corners per detection to include in optimization use_sparse_jacobian: Use sparse Jacobian structure (default True). Dramatically improves performance for large camera arrays. verbose: Verbosity level for scipy.optimize.least_squares (default 1). 0 = silent, 1 = one-line per iteration, 2 = full per-iteration report. normal_fixed: If False, estimate reference camera tilt (2 DOF) to account for non-perpendicular camera-to-water-surface alignment. Returns: Tuple of: - dict[str, CameraExtrinsics]: Optimized extrinsics for all cameras - dict[str, float]: Optimized interface distances per camera (derived from water_z) - list[BoardPose]: Optimized board poses for each frame used - float: Final RMS reprojection error in pixels Raises: InsufficientDataError: If no valid frames for optimization ConvergenceError: If optimization fails to converge ValueError: If reference_camera not in initial_extrinsics """ # Validate reference camera if reference_camera not in initial_extrinsics: raise ValueError( f"reference_camera '{reference_camera}' not in initial_extrinsics. " f"Available cameras: {list(initial_extrinsics.keys())}" ) # Set default interface normal if interface_normal is None: interface_normal = np.array([0.0, 0.0, -1.0], dtype=np.float64) else: interface_normal = np.asarray(interface_normal, dtype=np.float64) # Set default interface distances if initial_water_zs is None: initial_water_zs = {cam: 0.15 for cam in initial_extrinsics.keys()} # Create ordered lists for consistent parameter packing camera_order = sorted(initial_extrinsics.keys()) # Compute initial board poses initial_board_poses = _compute_initial_board_poses( detections, intrinsics, initial_extrinsics, board, min_corners, water_zs=initial_water_zs, interface_normal=interface_normal, n_air=n_air, n_water=n_water, ) # Filter to frames with valid board poses frame_order = sorted(initial_board_poses.keys()) # Check for sufficient data if len(frame_order) == 0: raise InsufficientDataError( "No valid frames for optimization. " f"Need at least {min_corners} corners per detection." ) # Compute initial water_z from reference camera # C_z_ref = 0 since reference camera is at origin, so water_z = d_ref initial_water_z = initial_water_zs[reference_camera] # Pack initial parameters initial_params = pack_params( initial_extrinsics, initial_water_z, initial_board_poses, reference_camera, camera_order, frame_order, normal_fixed=normal_fixed, ) # Build bounds lower, upper = build_bounds( camera_order, frame_order, reference_camera, normal_fixed=normal_fixed, ) # Reference extrinsics (fixed during optimization) reference_extrinsics = initial_extrinsics[reference_camera] # Build cost function args cost_args = ( detections, intrinsics, board, reference_camera, reference_extrinsics, interface_normal, n_air, n_water, camera_order, frame_order, min_corners, False, # refine_intrinsics normal_fixed, ) # Build sparse Jacobian if enabled jac = "2-point" if use_sparse_jacobian: jac_sparsity = build_jacobian_sparsity( detections, reference_camera, camera_order, frame_order, min_corners, normal_fixed=normal_fixed, ) jac = make_sparse_jacobian_func( compute_residuals, cost_args, jac_sparsity, (lower, upper), ) # Run optimization result = least_squares( compute_residuals, x0=initial_params, args=cost_args, method="trf", loss=loss, f_scale=loss_scale, bounds=(lower, upper), jac=jac, verbose=verbose, ) if result.status <= 0: raise ConvergenceError(f"Optimization failed: {result.message}") # Unpack optimized parameters opt_extrinsics, opt_distances, opt_board_poses, _ = unpack_params( result.x, reference_camera, reference_extrinsics, camera_order, frame_order, normal_fixed=normal_fixed, ) # Compute final RMS error rms_error = np.sqrt(np.mean(result.fun**2)) # Convert board_poses dict to list board_poses_list = [opt_board_poses[frame_idx] for frame_idx in frame_order] return opt_extrinsics, opt_distances, board_poses_list, rms_error
def _multi_frame_pnp_init( obs_frames: list[tuple[int, NDArray[np.int32], NDArray[np.float64]]], board_poses: dict[int, BoardPose], intrinsics: CameraIntrinsics, board: BoardGeometry, water_z: float, interface_normal: NDArray[np.float64], n_air: float, n_water: float, top_n: int = 10, outlier_threshold: float = 0.5, ) -> tuple[NDArray[np.float64], NDArray[np.float64]] | None: """Multi-frame PnP initialization for robust camera pose estimation. Runs refractive PnP on multiple frames, filters outliers based on C_z, and returns a weighted average pose. This is more robust than single-frame PnP for overhead cameras where depth is ill-conditioned. Args: obs_frames: List of (frame_idx, corner_ids, corners_2d) tuples board_poses: Fixed board poses from Stage 3 (frame_idx -> BoardPose) intrinsics: Camera intrinsic parameters board: Board geometry water_z: Global water surface Z from Stage 3 interface_normal: Interface normal vector n_air: Refractive index of air n_water: Refractive index of water top_n: Number of top frames to use (by corner count) outlier_threshold: Maximum deviation from median C_z in meters Returns: Tuple of (rvec, tvec) in world frame, or None if all frames fail """ # Sort frames by corner count and take top N sorted_frames = sorted(obs_frames, key=lambda x: len(x[1]), reverse=True) selected_frames = sorted_frames[:top_n] # Run PnP on each frame and collect results pnp_results = [] for frame_idx, corner_ids, corners_2d in selected_frames: pnp_result = refractive_solve_pnp( intrinsics, corners_2d, corner_ids, board, water_z, interface_normal, n_air, n_water, ) if pnp_result is None: continue rvec_bc, tvec_bc = pnp_result # Transform to world frame bp = board_poses[frame_idx] R_bw = rvec_to_matrix(bp.rvec) R_bc = rvec_to_matrix(rvec_bc) R_wb, t_wb = invert_pose(R_bw, bp.tvec) R_wc, t_wc = compose_poses(R_bc, tvec_bc, R_wb, t_wb) # Compute camera center in world frame C = -R_wc.T @ t_wc C_z = C[2] num_corners = len(corner_ids) pnp_results.append((R_wc, t_wc, C_z, num_corners)) if len(pnp_results) == 0: return None # Filter outliers based on C_z C_z_values = np.array([r[2] for r in pnp_results]) median_C_z = np.median(C_z_values) filtered_results = [] for R_wc, t_wc, C_z, num_corners in pnp_results: if abs(C_z - median_C_z) <= outlier_threshold: filtered_results.append((R_wc, t_wc, num_corners)) # Fallback: if all filtered out, use the one closest to median if len(filtered_results) == 0: closest_idx = np.argmin(np.abs(C_z_values - median_C_z)) R_wc, t_wc, C_z, num_corners = pnp_results[closest_idx] return matrix_to_rvec(R_wc), t_wc # Fallback: if only one survives, use it if len(filtered_results) == 1: R_wc, t_wc, _ = filtered_results[0] return matrix_to_rvec(R_wc), t_wc # Average rotations and translations, weighted by corner count rotations = [r[0] for r in filtered_results] translations = [r[1] for r in filtered_results] weights = [float(r[2]) for r in filtered_results] # Normalize weights total_weight = sum(weights) normalized_weights = [w / total_weight for w in weights] # Average rotation using weighted chordal L2 mean R_avg = _average_rotations(rotations, normalized_weights) # Average translation with same weights t_avg = np.zeros(3, dtype=np.float64) for t, w in zip(translations, normalized_weights): t_avg += w * t return matrix_to_rvec(R_avg), t_avg
[docs] def register_auxiliary_camera( camera_name: str, intrinsics: CameraIntrinsics, detections: DetectionResult, board_poses: dict[int, BoardPose], board: BoardGeometry, water_z: float, interface_normal: Vec3 | None = None, n_air: float = 1.0, n_water: float = 1.333, min_corners: int = 4, refine_intrinsics: bool = False, verbose: int = 0, ) -> ( tuple[CameraExtrinsics, float, float] | tuple[CameraExtrinsics, float, float, CameraIntrinsics] ): """Register a single auxiliary camera against fixed board poses. Estimates the camera's extrinsics by minimizing refractive reprojection error against known board poses from Stage 3. The interface distance is derived from the known global water_z and the camera's Z position: d = water_z - C_z. This is a 6-parameter problem (extrinsics only) with no degeneracy. Optionally refines intrinsics (fx, fy, cx, cy) for a 10-parameter optimization. Args: camera_name: Name of the auxiliary camera intrinsics: Camera intrinsic parameters (initial values, refined if refine_intrinsics=True) detections: Full detection results (must contain this camera's detections) board_poses: Fixed board poses from Stage 3 (frame_idx -> BoardPose) board: Board geometry water_z: Global water surface Z from Stage 3 (required) interface_normal: Interface normal (default [0, 0, -1]) n_air: Refractive index of air n_water: Refractive index of water min_corners: Minimum corners per detection refine_intrinsics: If True, also optimize fx, fy, cx, cy (10 params total). Distortion coefficients are kept fixed. verbose: Verbosity level Returns: When refine_intrinsics=False: Tuple of (extrinsics, water_z, rms_error) When refine_intrinsics=True: Tuple of (extrinsics, water_z, rms_error, refined_intrinsics) Raises: InsufficientDataError: If no usable frames found """ from aquacal.core.camera import create_camera from aquacal.core.interface_model import Interface from aquacal.core.refractive_geometry import refractive_project if interface_normal is None: interface_normal = np.array([0.0, 0.0, -1.0], dtype=np.float64) else: interface_normal = np.asarray(interface_normal, dtype=np.float64) # --- Step 1: Collect observations --- obs_frames = [] total_corners = 0 for frame_idx, frame_det in detections.frames.items(): if camera_name not in frame_det.detections: continue if frame_idx not in board_poses: continue det = frame_det.detections[camera_name] if det.num_corners < min_corners: continue obs_frames.append((frame_idx, det.corner_ids, det.corners_2d)) total_corners += det.num_corners if len(obs_frames) == 0: raise InsufficientDataError( f"No usable frames for auxiliary camera '{camera_name}'. " f"Need frames with both detections (>={min_corners} corners) " f"and existing board poses." ) # --- Step 2: Initial guess via multi-frame refractive PnP --- pnp_result = _multi_frame_pnp_init( obs_frames, board_poses, intrinsics, board, water_z, interface_normal, n_air, n_water, ) if pnp_result is None: raise InsufficientDataError( f"Multi-frame refractive PnP failed for auxiliary camera '{camera_name}'. " f"Tried {len(obs_frames)} frames but all PnP attempts failed." ) initial_rvec, initial_tvec = pnp_result # --- Step 3: Build parameter vector --- if refine_intrinsics: # [rvec(3), tvec(3), fx, fy, cx, cy] = 10 params x0 = np.concatenate( [ initial_rvec, initial_tvec, [ intrinsics.K[0, 0], intrinsics.K[1, 1], intrinsics.K[0, 2], intrinsics.K[1, 2], ], ] ) else: # [rvec(3), tvec(3)] = 6 params (extrinsics only) x0 = np.concatenate([initial_rvec, initial_tvec]) # --- Step 4: Residual function --- def residuals(x): rvec = x[:3] tvec = x[3:6] # Build camera intrinsics (refined or original) if refine_intrinsics: fx, fy, cx, cy = x[6], x[7], x[8], x[9] K_refined = intrinsics.K.copy() K_refined[0, 0] = fx K_refined[1, 1] = fy K_refined[0, 2] = cx K_refined[1, 2] = cy camera_intrinsics = CameraIntrinsics( K=K_refined, dist_coeffs=intrinsics.dist_coeffs, image_size=intrinsics.image_size, is_fisheye=intrinsics.is_fisheye, ) else: camera_intrinsics = intrinsics R = rvec_to_matrix(rvec) ext = CameraExtrinsics(R=R, t=tvec) # Interface distance is the water surface Z-coordinate iface_dist = water_z camera = create_camera(camera_name, camera_intrinsics, ext) interface = Interface( normal=interface_normal, camera_distances={camera_name: iface_dist}, n_air=n_air, n_water=n_water, ) resid = [] for frame_idx, corner_ids, corners_2d in obs_frames: bp = board_poses[frame_idx] corners_3d = board.transform_corners(bp.rvec, bp.tvec) for i, cid in enumerate(corner_ids): pt_3d = corners_3d[int(cid)] projected = refractive_project(camera, interface, pt_3d) if projected is not None: resid.append(projected[0] - corners_2d[i, 0]) resid.append(projected[1] - corners_2d[i, 1]) else: resid.append(100.0) resid.append(100.0) return np.array(resid, dtype=np.float64) # --- Step 5: Bounds --- if refine_intrinsics: # Add bounds for fx, fy, cx, cy fx0, fy0 = intrinsics.K[0, 0], intrinsics.K[1, 1] w, h = intrinsics.image_size lower = np.array([-np.inf] * 6 + [0.5 * fx0, 0.5 * fy0, 0.0, 0.0]) upper = np.array([np.inf] * 6 + [2.0 * fx0, 2.0 * fy0, float(w), float(h)]) else: # All unbounded for 6-param extrinsics lower = np.full(6, -np.inf) upper = np.full(6, np.inf) # --- Step 6: Optimize --- result = least_squares( residuals, x0=x0, method="trf", loss="huber", f_scale=1.0, bounds=(lower, upper), verbose=verbose, ) # --- Step 7: Extract results --- opt_rvec = result.x[:3] opt_tvec = result.x[3:6] opt_R = rvec_to_matrix(opt_rvec) opt_extrinsics = CameraExtrinsics(R=opt_R, t=opt_tvec) # Derive final interface distance (water surface Z-coordinate) opt_iface_dist = float(water_z) rms_error = float(np.sqrt(np.mean(result.fun**2))) if refine_intrinsics: # Extract refined intrinsics fx, fy, cx, cy = result.x[6], result.x[7], result.x[8], result.x[9] K_refined = intrinsics.K.copy() K_refined[0, 0] = fx K_refined[1, 1] = fy K_refined[0, 2] = cx K_refined[1, 2] = cy refined_intrinsics = CameraIntrinsics( K=K_refined, dist_coeffs=intrinsics.dist_coeffs, image_size=intrinsics.image_size, is_fisheye=intrinsics.is_fisheye, ) return opt_extrinsics, opt_iface_dist, rms_error, refined_intrinsics else: return opt_extrinsics, opt_iface_dist, rms_error