"""End-to-end calibration pipeline orchestration."""
from __future__ import annotations
import contextlib
import hashlib
import importlib.metadata
import random
import time
from collections.abc import Iterator
from datetime import datetime
from pathlib import Path
import cv2
import numpy as np
import yaml
from aquacal.calibration.extrinsics import build_pose_graph, estimate_extrinsics
from aquacal.calibration.interface_estimation import (
_compute_initial_board_poses,
optimize_interface,
register_auxiliary_camera,
)
from aquacal.calibration.intrinsics import calibrate_intrinsics_all
from aquacal.calibration.refinement import joint_refinement
from aquacal.config.schema import (
BoardConfig,
BoardPose,
CalibrationConfig,
CalibrationMetadata,
CalibrationResult,
CameraCalibration,
CameraExtrinsics,
CameraIntrinsics,
DetectionResult,
DiagnosticsData,
FrameDetections,
InterfaceParams,
)
from aquacal.core.board import BoardGeometry
from aquacal.io.detection import detect_all_frames
from aquacal.io.serialization import save_calibration
from aquacal.utils.transforms import matrix_to_rvec
from aquacal.validation.diagnostics import (
generate_diagnostic_report,
save_diagnostic_report,
)
from aquacal.validation.reconstruction import compute_3d_distance_errors
from aquacal.validation.reprojection import compute_reprojection_errors
[docs]
def calibrate_from_detections(
detections: DetectionResult,
intrinsics: dict[str, CameraIntrinsics],
board: BoardGeometry,
*,
reference_camera: str | None = None,
n_air: float = 1.0,
n_water: float = 1.333,
loss: str = "huber",
loss_scale: float = 1.0,
min_corners: int = 4,
verbose: int = 0,
) -> tuple[CalibrationResult, dict[int, BoardPose]]:
"""Run Stages 2-3 on pre-computed detections and return a CalibrationResult.
This is a high-level convenience function that takes detections and intrinsics
(typically from synthetic data or a previous detection step) and runs the
extrinsic initialization (Stage 2) and joint refractive optimization (Stage 3).
Args:
detections: Detected corners across all cameras and frames.
intrinsics: Per-camera intrinsic parameters.
board: Board geometry used for detection.
reference_camera: Name of the reference camera (identity extrinsics).
Defaults to the first camera in sorted order.
n_air: Refractive index of air.
n_water: Refractive index of water.
loss: Robust loss function for Stage 3 ('huber', 'cauchy', etc.).
loss_scale: Scale parameter for the robust loss.
min_corners: Minimum corners per detection to use.
verbose: Verbosity level (0=silent, 1=summary, 2=per-iteration).
Returns:
Tuple of (CalibrationResult, board_poses) where board_poses maps
frame index to the optimized BoardPose.
Example:
>>> from aquacal.datasets import create_scenario, generate_synthetic_detections
>>> from aquacal.calibration import calibrate_from_detections
>>> scenario = create_scenario("minimal")
>>> board = BoardGeometry(scenario.board_config)
>>> detections = generate_synthetic_detections(
... scenario.intrinsics, scenario.extrinsics, scenario.water_zs,
... board, scenario.board_poses, noise_std=scenario.noise_std,
... )
>>> result, poses = calibrate_from_detections(
... detections, scenario.intrinsics, board,
... )
>>> print(f"RMS: {result.diagnostics.reprojection_error_rms:.3f} px")
"""
camera_names = sorted(intrinsics.keys())
if reference_camera is None:
reference_camera = camera_names[0]
interface_normal = np.array([0.0, 0.0, -1.0], dtype=np.float64)
# Stage 2: Extrinsic initialization
pose_graph = build_pose_graph(detections, min_cameras=2)
initial_extrinsics = estimate_extrinsics(
pose_graph,
intrinsics,
board,
reference_camera,
)
# Stage 3: Joint refractive optimization
opt_extrinsics, opt_distances, opt_poses_list, rms = optimize_interface(
detections=detections,
intrinsics=intrinsics,
initial_extrinsics=initial_extrinsics,
board=board,
reference_camera=reference_camera,
interface_normal=interface_normal,
n_air=n_air,
n_water=n_water,
loss=loss,
loss_scale=loss_scale,
min_corners=min_corners,
verbose=verbose,
)
board_poses = {bp.frame_idx: bp for bp in opt_poses_list}
# Build CalibrationResult
interface_params = InterfaceParams(
normal=interface_normal,
n_air=n_air,
n_water=n_water,
)
result = _build_calibration_result(
intrinsics=intrinsics,
extrinsics=opt_extrinsics,
water_z_values=opt_distances,
board_config=board.config,
interface_params=interface_params,
diagnostics=DiagnosticsData(
reprojection_error_rms=rms,
reprojection_error_per_camera={},
validation_3d_error_mean=0.0,
validation_3d_error_std=0.0,
),
metadata=CalibrationMetadata(
calibration_date=datetime.now().isoformat(),
software_version=importlib.metadata.version("aquacal"),
config_hash="",
num_frames_used=len(board_poses),
num_frames_holdout=0,
),
)
# Compute per-camera reprojection errors
reproj = compute_reprojection_errors(result, detections, board_poses)
result.diagnostics.reprojection_error_rms = reproj.rms
result.diagnostics.reprojection_error_per_camera = reproj.per_camera
return result, board_poses
[docs]
def load_config(config_path: str | Path) -> CalibrationConfig:
"""
Load calibration configuration from YAML file.
Args:
config_path: Path to config.yaml file
Returns:
CalibrationConfig populated from file
Raises:
FileNotFoundError: If config file doesn't exist
ValueError: If config is invalid or missing required fields
"""
config_path = Path(config_path)
if not config_path.exists():
raise FileNotFoundError(f"Config file not found: {config_path}")
with open(config_path, "r") as f:
data = yaml.safe_load(f)
# Validate required sections
required = ["board", "cameras", "paths"]
for key in required:
if key not in data:
raise ValueError(f"Missing required config section: {key}")
# Build BoardConfig (extrinsic/underwater board)
board_data = data["board"]
board = BoardConfig(
squares_x=board_data["squares_x"],
squares_y=board_data["squares_y"],
square_size=board_data["square_size"],
marker_size=board_data["marker_size"],
dictionary=board_data.get("dictionary", "DICT_4X4_50"),
legacy_pattern=board_data.get("legacy_pattern", False),
)
# Build optional intrinsic BoardConfig (if provided)
intrinsic_board = None
if "intrinsic_board" in data:
intrinsic_data = data["intrinsic_board"]
intrinsic_board = BoardConfig(
squares_x=intrinsic_data["squares_x"],
squares_y=intrinsic_data["squares_y"],
square_size=intrinsic_data["square_size"],
marker_size=intrinsic_data["marker_size"],
dictionary=intrinsic_data.get("dictionary", "DICT_4X4_50"),
legacy_pattern=intrinsic_data.get("legacy_pattern", False),
)
# Build paths
paths = data["paths"]
intrinsic_paths = {k: Path(v) for k, v in paths["intrinsic_videos"].items()}
extrinsic_paths = {k: Path(v) for k, v in paths["extrinsic_videos"].items()}
output_dir = Path(paths["output_dir"])
# Auxiliary cameras (parsed early since initial_water_z references it)
auxiliary_cameras = data.get("auxiliary_cameras", [])
if auxiliary_cameras:
overlap = set(data["cameras"]) & set(auxiliary_cameras)
if overlap:
raise ValueError(
f"auxiliary_cameras must not overlap with cameras. "
f"Overlap: {sorted(overlap)}"
)
for aux_cam in auxiliary_cameras:
if aux_cam not in intrinsic_paths:
raise ValueError(
f"Auxiliary camera '{aux_cam}' missing from paths.intrinsic_videos"
)
if aux_cam not in extrinsic_paths:
raise ValueError(
f"Auxiliary camera '{aux_cam}' missing from paths.extrinsic_videos"
)
# Interface settings
interface = data.get("interface", {})
n_air = interface.get("n_air", 1.0)
n_water = interface.get("n_water", 1.333)
normal_fixed = interface.get("normal_fixed", True)
# Parse initial_water_z (optional) with backward compatibility
initial_water_z = None
if "initial_distances" in interface:
import warnings
warnings.warn(
"Config field 'initial_distances' is deprecated. Use 'initial_water_z' instead.",
UserWarning,
stacklevel=2,
)
raw_distances = interface["initial_distances"]
# Handle scalar format (apply to all cameras including auxiliary)
if isinstance(raw_distances, (int, float)):
if raw_distances <= 0:
raise ValueError(
f"initial_water_z must be positive, got {raw_distances}"
)
initial_water_z = {
cam: float(raw_distances) for cam in data["cameras"] + auxiliary_cameras
}
# Handle dict format (per-camera)
elif isinstance(raw_distances, dict):
# Validate all cameras are covered
missing_cameras = set(data["cameras"]) - set(raw_distances.keys())
if missing_cameras:
raise ValueError(
f"initial_water_z dict must cover all cameras. "
f"Missing: {sorted(missing_cameras)}"
)
# Validate all distances are positive
for cam, dist in raw_distances.items():
if dist <= 0:
raise ValueError(
f"initial_water_z['{cam}'] must be positive, got {dist}"
)
# Warn about extra cameras (not in cameras or auxiliary list)
extra_cameras = (
set(raw_distances.keys())
- set(data["cameras"])
- set(auxiliary_cameras)
)
if extra_cameras:
import sys
print(
f"Warning: initial_water_z contains cameras not in cameras list: "
f"{sorted(extra_cameras)}",
file=sys.stderr,
)
initial_water_z = {k: float(v) for k, v in raw_distances.items()}
else:
raise ValueError(
f"initial_water_z must be a number or dict, got {type(raw_distances).__name__}"
)
elif "initial_water_z" in interface:
raw_distances = interface["initial_water_z"]
# Handle scalar format (apply to all cameras including auxiliary)
if isinstance(raw_distances, (int, float)):
if raw_distances <= 0:
raise ValueError(
f"initial_water_z must be positive, got {raw_distances}"
)
initial_water_z = {
cam: float(raw_distances) for cam in data["cameras"] + auxiliary_cameras
}
# Handle dict format (per-camera)
elif isinstance(raw_distances, dict):
# Validate all cameras are covered
missing_cameras = set(data["cameras"]) - set(raw_distances.keys())
if missing_cameras:
raise ValueError(
f"initial_water_z dict must cover all cameras. "
f"Missing: {sorted(missing_cameras)}"
)
# Validate all distances are positive
for cam, dist in raw_distances.items():
if dist <= 0:
raise ValueError(
f"initial_water_z['{cam}'] must be positive, got {dist}"
)
# Warn about extra cameras (not in cameras or auxiliary list)
extra_cameras = (
set(raw_distances.keys())
- set(data["cameras"])
- set(auxiliary_cameras)
)
if extra_cameras:
import sys
print(
f"Warning: initial_water_z contains cameras not in cameras list: "
f"{sorted(extra_cameras)}",
file=sys.stderr,
)
initial_water_z = {k: float(v) for k, v in raw_distances.items()}
else:
raise ValueError(
f"initial_water_z must be a number or dict, got {type(raw_distances).__name__}"
)
# Optimization settings
opt = data.get("optimization", {})
robust_loss = opt.get("robust_loss", "huber")
loss_scale = opt.get("loss_scale", 1.0)
max_cal_frames_raw = opt.get("max_calibration_frames", None)
max_cal_frames = int(max_cal_frames_raw) if max_cal_frames_raw is not None else None
refine_intrinsics = opt.get("refine_intrinsics", False)
refine_auxiliary_intrinsics = opt.get("refine_auxiliary_intrinsics", False)
# Detection settings
det = data.get("detection", {})
min_corners = det.get("min_corners", 8)
min_cameras = det.get("min_cameras", 2)
frame_step = det.get("frame_step", 1)
# Camera model settings
rational_model_cameras = data.get("rational_model_cameras", [])
fisheye_cameras = data.get("fisheye_cameras", [])
# Validate fisheye_cameras: must be subset of auxiliary_cameras
if fisheye_cameras:
non_aux = set(fisheye_cameras) - set(auxiliary_cameras)
if non_aux:
raise ValueError(
f"fisheye_cameras must be a subset of auxiliary_cameras. "
f"Not in auxiliary_cameras: {sorted(non_aux)}"
)
# Validate no overlap with rational_model_cameras
overlap = set(fisheye_cameras) & set(rational_model_cameras)
if overlap:
raise ValueError(
f"fisheye_cameras and rational_model_cameras must be disjoint. "
f"Overlap: {sorted(overlap)}"
)
# Validation settings
val = data.get("validation", {})
holdout_fraction = val.get("holdout_fraction", 0.2)
save_detailed = val.get("save_detailed_residuals", True)
return CalibrationConfig(
board=board,
camera_names=data["cameras"],
intrinsic_video_paths=intrinsic_paths,
extrinsic_video_paths=extrinsic_paths,
output_dir=output_dir,
intrinsic_board=intrinsic_board,
n_air=n_air,
n_water=n_water,
interface_normal_fixed=normal_fixed,
robust_loss=robust_loss,
loss_scale=loss_scale,
min_corners_per_frame=min_corners,
min_cameras_per_frame=min_cameras,
frame_step=frame_step,
holdout_fraction=holdout_fraction,
max_calibration_frames=max_cal_frames,
refine_intrinsics=refine_intrinsics,
refine_auxiliary_intrinsics=refine_auxiliary_intrinsics,
save_detailed_residuals=save_detailed,
initial_water_z=initial_water_z,
rational_model_cameras=rational_model_cameras,
auxiliary_cameras=auxiliary_cameras,
fisheye_cameras=fisheye_cameras,
)
[docs]
def split_detections(
detections: DetectionResult,
holdout_fraction: float,
seed: int = 42,
) -> tuple[DetectionResult, DetectionResult]:
"""
Split detections into calibration and validation sets.
Randomly assigns entire frames to either set (not individual detections).
Args:
detections: Full detection result
holdout_fraction: Fraction of frames for validation (0.0 to 1.0)
seed: Random seed for reproducibility
Returns:
Tuple of (calibration_detections, validation_detections)
"""
frame_indices = list(detections.frames.keys())
rng = random.Random(seed)
rng.shuffle(frame_indices)
n_holdout = int(len(frame_indices) * holdout_fraction)
holdout_indices = set(frame_indices[:n_holdout])
calibration_indices = set(frame_indices[n_holdout:])
cal_frames = {idx: detections.frames[idx] for idx in calibration_indices}
val_frames = {idx: detections.frames[idx] for idx in holdout_indices}
cal_detections = DetectionResult(
frames=cal_frames,
camera_names=detections.camera_names,
total_frames=len(cal_frames),
)
val_detections = DetectionResult(
frames=val_frames,
camera_names=detections.camera_names,
total_frames=len(val_frames),
)
return cal_detections, val_detections
def _subsample_detections(
detections: DetectionResult,
max_frames: int,
) -> DetectionResult:
"""Uniformly subsample detection frames to at most max_frames.
Selects frames at uniform temporal intervals from the sorted frame indices,
preserving the first and last frames.
Args:
detections: Full detection result
max_frames: Maximum number of frames to keep
Returns:
New DetectionResult with at most max_frames frames
"""
frame_indices = sorted(detections.frames.keys())
if len(frame_indices) <= max_frames:
return detections
# Uniform selection: np.linspace to pick evenly spaced indices
selected_positions = np.round(
np.linspace(0, len(frame_indices) - 1, max_frames)
).astype(int)
selected_frames = {frame_indices[i] for i in selected_positions}
return DetectionResult(
frames={k: v for k, v in detections.frames.items() if k in selected_frames},
camera_names=detections.camera_names,
total_frames=detections.total_frames,
)
def _save_board_reference_images(
board: BoardGeometry,
intrinsic_board: BoardGeometry | None,
output_dir: Path,
) -> None:
"""
Save reference PNG images of configured board(s) for visual verification.
Generates grayscale ChArUco board images at 800x600 resolution with 50px
margin. Saves extrinsic board always; saves intrinsic board only if it
differs from extrinsic board.
Args:
board: Extrinsic board geometry
intrinsic_board: Intrinsic board geometry (may be same as board)
output_dir: Directory to save images
"""
# Generate and save extrinsic board image
cv_board = board.get_opencv_board()
board_img = cv_board.generateImage((800, 600), marginSize=50)
cv2.imwrite(str(output_dir / "board_extrinsic.png"), board_img)
# Save intrinsic board image only if it differs from extrinsic board
if intrinsic_board is not board:
cv_intr_board = intrinsic_board.get_opencv_board()
intr_img = cv_intr_board.generateImage((800, 600), marginSize=50)
cv2.imwrite(str(output_dir / "board_intrinsic.png"), intr_img)
[docs]
def run_calibration(
config_path: str | Path, verbose: bool = False
) -> CalibrationResult:
"""
Run complete calibration pipeline from config file.
Loads configuration from YAML and delegates to run_calibration_from_config().
Args:
config_path: Path to config.yaml file
verbose: If True, enable per-iteration progress output from optimizers
Returns:
Complete CalibrationResult
Raises:
FileNotFoundError: If config or video files not found
CalibrationError: If any calibration stage fails
Example:
>>> from aquacal import run_calibration
>>> result = run_calibration("config.yaml", verbose=True)
>>> print(f"Calibrated {len(result.cameras)} cameras")
>>> print(f"Water surface at Z = {result.cameras['cam0'].water_z:.3f} m")
Note:
For details on the optimizer pipeline, see the
:doc:`Optimizer Guide </guide/optimizer>` guide.
"""
config = load_config(config_path)
return run_calibration_from_config(config, verbose=verbose)
@contextlib.contextmanager
def _time_stage(timings: dict[str, float], key: str) -> Iterator[None]:
"""Record elapsed wall time of the wrapped block into ``timings[key]``."""
t0 = time.perf_counter()
try:
yield
finally:
timings[key] = time.perf_counter() - t0
[docs]
def run_calibration_from_config(
config: CalibrationConfig, verbose: bool = False
) -> CalibrationResult:
"""
Run complete calibration pipeline from configuration object.
Pipeline stages:
1. Detect ChArUco in intrinsic (in-air) videos
2. Run Stage 1: Intrinsic calibration
3. Detect ChArUco in extrinsic (underwater) videos
4. Split underwater detections into calibration/validation sets
5. Run Stage 2: Extrinsic initialization
6. Run Stage 3: Interface and pose optimization
7. Optionally run Stage 4: Joint refinement
8. Run validation on held-out data
9. Generate and save diagnostics
10. Save final calibration result
Args:
config: Complete calibration configuration
verbose: If True, enable per-iteration progress output from optimizers
Returns:
CalibrationResult with all calibrations and diagnostics
Raises:
CalibrationError: If any stage fails
InsufficientDataError: If not enough detections
ConnectivityError: If pose graph is disconnected
"""
board = BoardGeometry(config.board)
# Intrinsic board: use separate board if provided, else fall back to extrinsic board
intrinsic_board = (
BoardGeometry(config.intrinsic_board) if config.intrinsic_board else board
)
# Create output directory
config.output_dir.mkdir(parents=True, exist_ok=True)
# Accumulator for per-stage wall-clock timings (seconds). Skipped stages
# are simply absent from this dict.
timings: dict[str, float] = {}
# Save board reference images for visual verification
_save_board_reference_images(board, intrinsic_board, config.output_dir)
print("=" * 60)
print("AquaCal Calibration Pipeline")
print("=" * 60)
# --- Stage 1: Intrinsic Calibration ---
print("\n[Stage 1] Intrinsic calibration (in-air)...")
with _time_stage(timings, "stage1_intrinsics"):
intrinsics_results = calibrate_intrinsics_all(
video_paths={k: str(v) for k, v in config.intrinsic_video_paths.items()},
board=intrinsic_board,
min_corners=config.min_corners_per_frame,
frame_step=config.frame_step,
rational_model_cameras=config.rational_model_cameras or None,
fisheye_cameras=config.fisheye_cameras or None,
progress_callback=lambda name, cur, total: print(
f" Calibrating {name} ({cur}/{total})..."
),
)
# Extract just intrinsics from (intrinsics, error) tuples
intrinsics = {name: result[0] for name, result in intrinsics_results.items()}
for name, (_, rms) in intrinsics_results.items():
print(f" {name}: RMS {rms:.3f} px")
print(f" Calibrated {len(intrinsics)} cameras")
# Validate intrinsics
from aquacal.calibration.intrinsics import validate_intrinsics
for name, (intr, _) in intrinsics_results.items():
warnings = validate_intrinsics(intr, camera_name=name)
for w in warnings:
print(f" WARNING: {w}")
# --- Detect in underwater videos ---
print("\n[Detection] Detecting ChArUco in underwater videos...")
def _detection_progress(current: int, total: int) -> None:
"""Print detection progress at ~10% intervals."""
if total > 0 and (current % max(1, total // 10) == 0 or current == total):
print(f" Frame {current}/{total} ({100 * current // total}%)")
with _time_stage(timings, "detection_underwater"):
all_detections = detect_all_frames(
video_paths={k: str(v) for k, v in config.extrinsic_video_paths.items()},
board=board,
intrinsics={k: (v.K, v.dist_coeffs) for k, v in intrinsics.items()},
min_corners=config.min_corners_per_frame,
frame_step=config.frame_step,
progress_callback=_detection_progress,
)
usable_frames = all_detections.get_frames_with_min_cameras(
config.min_cameras_per_frame
)
print(f" Found {len(usable_frames)} usable frames")
# --- Split calibration/validation ---
print(f"\n[Split] Holdout fraction: {config.holdout_fraction}")
cal_detections, val_detections = split_detections(
all_detections, config.holdout_fraction
)
print(f" Calibration frames: {len(cal_detections.frames)}")
print(f" Validation frames: {len(val_detections.frames)}")
# --- Filter to primary cameras for Stages 2-3 ---
primary_camera_set = set(config.camera_names)
primary_intrinsics = {
k: v for k, v in intrinsics.items() if k in primary_camera_set
}
# Filter detection frames to primary cameras only
primary_cal_frames = {}
for frame_idx, frame_det in cal_detections.frames.items():
primary_dets = {
k: v for k, v in frame_det.detections.items() if k in primary_camera_set
}
if primary_dets:
primary_cal_frames[frame_idx] = FrameDetections(
frame_idx=frame_idx, detections=primary_dets
)
primary_cal_detections = DetectionResult(
frames=primary_cal_frames,
camera_names=config.camera_names,
total_frames=cal_detections.total_frames,
)
# --- Stage 2: Extrinsic Initialization ---
print("\n[Stage 2] Extrinsic initialization...")
reference_camera = config.camera_names[0]
interface_normal = np.array([0.0, 0.0, -1.0], dtype=np.float64)
with _time_stage(timings, "stage2_extrinsic_init"):
pose_graph = build_pose_graph(
primary_cal_detections, config.min_cameras_per_frame
)
extrinsics = estimate_extrinsics(
pose_graph,
primary_intrinsics,
board,
reference_camera,
water_zs=config.initial_water_z,
interface_normal=interface_normal,
n_air=config.n_air,
n_water=config.n_water,
progress_callback=lambda cam, cur, total: print(
" Averaging poses..."
if cam == "_averaging"
else f" Located {cam} ({cur}/{total})"
),
)
print(f" Initialized {len(extrinsics)} camera poses")
# Build initial CalibrationResult for saving and visualization
initial_interface_dists = {}
for cam_name in extrinsics:
if config.initial_water_z is not None:
initial_interface_dists[cam_name] = config.initial_water_z.get(
cam_name, 0.15
)
else:
initial_interface_dists[cam_name] = 0.15
initial_result = _build_calibration_result(
intrinsics=primary_intrinsics,
extrinsics=extrinsics,
water_z_values=initial_interface_dists,
board_config=config.board,
interface_params=InterfaceParams(
normal=interface_normal,
n_air=config.n_air,
n_water=config.n_water,
),
diagnostics=DiagnosticsData(
reprojection_error_rms=0.0,
reprojection_error_per_camera={},
validation_3d_error_mean=0.0,
validation_3d_error_std=0.0,
),
metadata=CalibrationMetadata(
calibration_date=datetime.now().isoformat(),
software_version=importlib.metadata.version("aquacal"),
config_hash=_compute_config_hash(config),
num_frames_used=0,
num_frames_holdout=0,
),
)
# Save pre-optimization calibration
save_calibration(initial_result, config.output_dir / "calibration_initial.json")
print(" Saved calibration_initial.json")
# Save initial camera rig visualization (pre-optimization)
try:
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from aquacal.validation.diagnostics import plot_camera_rig
fig = plot_camera_rig(
initial_result,
title="Stage 2: Initial Camera Positions (pre-optimization)",
)
fig.savefig(
str(config.output_dir / "camera_rig_initial.png"),
dpi=150,
bbox_inches="tight",
)
plt.close(fig)
print(" Saved camera_rig_initial.png")
except Exception as e:
print(f" Warning: Could not save camera_rig_initial.png: {e}")
# --- Subsample for optimization if configured ---
optim_detections = primary_cal_detections
if (
config.max_calibration_frames is not None
and len(primary_cal_detections.frames) > config.max_calibration_frames
):
optim_detections = _subsample_detections(
primary_cal_detections, config.max_calibration_frames
)
print(
f"\n[Frame Selection] Subsampled {len(primary_cal_detections.frames)} -> {len(optim_detections.frames)} frames for optimization"
)
# --- Stage 3: Interface Optimization ---
print("\n[Stage 3] Interface and pose optimization...")
t0 = time.perf_counter()
stage3_extrinsics, stage3_distances, stage3_poses, stage3_rms = optimize_interface(
detections=optim_detections,
intrinsics=primary_intrinsics,
initial_extrinsics=extrinsics,
board=board,
reference_camera=reference_camera,
initial_water_zs=config.initial_water_z,
interface_normal=interface_normal,
n_air=config.n_air,
n_water=config.n_water,
loss=config.robust_loss,
loss_scale=config.loss_scale,
min_corners=config.min_corners_per_frame,
verbose=2 if verbose else 1,
normal_fixed=config.interface_normal_fixed,
)
elapsed = time.perf_counter() - t0
timings["stage3_interface_optimization"] = elapsed
print(f" Stage 3 RMS: {stage3_rms:.3f} pixels ({elapsed:.1f}s)")
if not config.interface_normal_fixed:
ref_R = stage3_extrinsics[reference_camera].R
ref_rvec = matrix_to_rvec(ref_R)
tilt_deg = np.degrees(np.linalg.norm(ref_rvec[:2]))
print(f" Estimated reference camera tilt: {tilt_deg:.2f} degrees")
# Water surface and camera heights
# Get water_z from any camera (all have same value after optimization)
water_z = list(stage3_distances.values())[0]
print(f" Water surface Z: {water_z:.4f} m")
print(" Camera heights above water (h_c):")
heights = []
for cam_name in sorted(stage3_extrinsics.keys()):
C = stage3_extrinsics[cam_name].C
cam_z = C[2]
h_c = water_z - cam_z # camera-to-water vertical distance
heights.append(h_c)
print(f" {cam_name}: cam_z={cam_z:.4f} h_c={h_c:.4f}")
heights = np.array(heights)
print(f" Camera height spread: {np.ptp(heights):.4f} m")
# --- Stage 4: Optional Joint Refinement ---
refine_intrinsics = config.refine_intrinsics
if refine_intrinsics:
print("\n[Stage 4] Joint refinement with intrinsics...")
stage3_result = (stage3_extrinsics, stage3_distances, stage3_poses, stage3_rms)
t0 = time.perf_counter()
(
final_extrinsics,
final_distances,
final_poses,
final_intrinsics,
final_rms,
) = joint_refinement(
stage3_result=stage3_result,
detections=optim_detections,
intrinsics=primary_intrinsics,
board=board,
reference_camera=reference_camera,
refine_intrinsics=True,
interface_normal=interface_normal,
n_air=config.n_air,
n_water=config.n_water,
loss=config.robust_loss,
loss_scale=config.loss_scale,
verbose=2 if verbose else 1,
normal_fixed=config.interface_normal_fixed,
)
elapsed = time.perf_counter() - t0
timings["stage4_joint_refinement"] = elapsed
print(f" Stage 4 RMS: {final_rms:.3f} pixels ({elapsed:.1f}s)")
# Water surface and camera heights after refinement
water_z_final = list(final_distances.values())[0]
print(f" Water surface Z (after refinement): {water_z_final:.4f} m")
print(" Camera heights above water (h_c):")
heights_final = []
for cam_name in sorted(final_extrinsics.keys()):
C = final_extrinsics[cam_name].C
cam_z = C[2]
h_c = water_z_final - cam_z
heights_final.append(h_c)
print(f" {cam_name}: cam_z={cam_z:.4f} h_c={h_c:.4f}")
heights_final = np.array(heights_final)
print(f" Camera height spread: {np.ptp(heights_final):.4f} m")
else:
print("\n[Stage 4] Skipped (refine_intrinsics=False)")
final_extrinsics = stage3_extrinsics
final_distances = stage3_distances
final_poses = stage3_poses
final_intrinsics = primary_intrinsics
final_rms = stage3_rms
# Convert poses list to dict
board_poses_dict = {bp.frame_idx: bp for bp in final_poses}
# --- Stage 3b/4b: Register Auxiliary Cameras ---
aux_extrinsics = {}
aux_distances = {}
if config.auxiliary_cameras:
stage_label = "Stage 4b" if config.refine_auxiliary_intrinsics else "Stage 3b"
print(
f"\n[{stage_label}] Registering {len(config.auxiliary_cameras)} auxiliary camera(s)..."
)
# Derive water_z from Stage 3 output (reference camera has C_z = 0)
water_z = float(final_distances[reference_camera])
# Time the full auxiliary registration loop as a single stage.
with _time_stage(timings, "auxiliary_registration"):
for aux_cam in config.auxiliary_cameras:
# Count observations
n_frames = 0
n_corners = 0
for frame_idx, frame_det in all_detections.frames.items():
if (
aux_cam in frame_det.detections
and frame_idx in board_poses_dict
):
n_frames += 1
n_corners += frame_det.detections[aux_cam].num_corners
print(f" {aux_cam}: {n_frames} frames, {n_corners} corners")
try:
result = register_auxiliary_camera(
camera_name=aux_cam,
intrinsics=intrinsics[aux_cam],
detections=all_detections,
board_poses=board_poses_dict,
board=board,
water_z=water_z,
interface_normal=interface_normal,
n_air=config.n_air,
n_water=config.n_water,
refine_intrinsics=config.refine_auxiliary_intrinsics,
verbose=2 if verbose else 1,
)
# Handle variable-length return
if config.refine_auxiliary_intrinsics:
aux_ext, aux_dist, aux_rms, aux_intr = result
intrinsics[aux_cam] = aux_intr
print(
f" {aux_cam}: RMS {aux_rms:.2f} px, interface_d={aux_dist:.4f}m (intrinsics refined)"
)
else:
aux_ext, aux_dist, aux_rms = result
print(
f" {aux_cam}: RMS {aux_rms:.2f} px, interface_d={aux_dist:.4f}m"
)
aux_extrinsics[aux_cam] = aux_ext
aux_distances[aux_cam] = aux_dist
except Exception as e:
print(f" {aux_cam}: FAILED - {e}")
# Merge auxiliary cameras into working dicts so validation includes them
if aux_extrinsics:
final_extrinsics.update(aux_extrinsics)
final_distances.update(aux_distances)
final_intrinsics.update({cam: intrinsics[cam] for cam in aux_extrinsics})
# Estimate board poses for validation frames
print("\n[Validation] Estimating board poses for held-out frames...")
_validation_t0 = time.perf_counter()
val_initial_poses = _compute_initial_board_poses(
val_detections,
final_intrinsics,
final_extrinsics,
board,
min_corners=config.min_corners_per_frame,
n_water=config.n_water,
)
val_refined_poses = _estimate_validation_poses(
val_detections,
val_initial_poses,
final_intrinsics,
final_extrinsics,
final_distances,
board,
interface_normal,
config.n_air,
config.n_water,
)
board_poses_dict.update(val_refined_poses)
print(f" Estimated {len(val_refined_poses)} validation frame poses")
# --- Validation ---
print("\n[Validation] Computing errors on held-out data...")
# Build temporary CalibrationResult for validation functions
interface_params = InterfaceParams(
normal=interface_normal,
n_air=config.n_air,
n_water=config.n_water,
)
# Determine primary and auxiliary cameras
aux_cam_names = set(config.auxiliary_cameras) if config.auxiliary_cameras else set()
primary_cam_names = set(final_intrinsics.keys()) - aux_cam_names
# Build full result with all cameras (for board pose estimation and plots)
temp_result = _build_calibration_result(
intrinsics=final_intrinsics,
extrinsics=final_extrinsics,
water_z_values=final_distances,
board_config=config.board,
interface_params=interface_params,
diagnostics=DiagnosticsData(
reprojection_error_rms=0.0,
reprojection_error_per_camera={},
validation_3d_error_mean=0.0,
validation_3d_error_std=0.0,
),
metadata=CalibrationMetadata(
calibration_date="",
software_version="",
config_hash="",
num_frames_used=0,
num_frames_holdout=0,
),
auxiliary_cameras=aux_cam_names,
)
# --- Primary camera validation ---
primary_result = _filter_cameras(temp_result, primary_cam_names)
# Reprojection errors on validation set (primary cameras only)
primary_reproj = compute_reprojection_errors(
primary_result, val_detections, board_poses_dict
)
# 3D reconstruction errors (primary cameras only)
primary_3d = compute_3d_distance_errors(
primary_result, val_detections, board, include_spatial=True
)
# Save spatial measurements if available
if primary_3d.spatial is not None and len(primary_3d.spatial.positions) > 0:
from aquacal.validation.reconstruction import save_spatial_measurements
spatial_csv_path = config.output_dir / "spatial_measurements.csv"
save_spatial_measurements(primary_3d.spatial, spatial_csv_path)
# Print primary camera metrics
if np.isnan(primary_reproj.rms):
print(" Primary cameras:")
print(" Reprojection RMS: N/A (no valid observations)")
else:
print(" Primary cameras:")
print(f" Reprojection RMS: {primary_reproj.rms:.3f} pixels")
if np.isnan(primary_3d.mean):
print(" 3D distance error: N/A (no valid comparisons)")
else:
print(
f" 3D distance error: MAE {primary_3d.mean * 1000:.2f} mm, "
f"RMSE {primary_3d.rmse * 1000:.2f} mm "
f"({primary_3d.percent_error:.1f}% of square size)"
)
if abs(primary_3d.signed_mean) > 0.0005: # > 0.5mm bias
sign = "+" if primary_3d.signed_mean > 0 else ""
bias_type = (
"overestimate" if primary_3d.signed_mean > 0 else "underestimate"
)
print(
f" Scale bias: {sign}{primary_3d.signed_mean * 1000:.2f} mm ({bias_type})"
)
# --- Auxiliary camera validation (if any) ---
aux_reproj = None
if aux_cam_names:
aux_result = _filter_cameras(temp_result, aux_cam_names)
aux_reproj = compute_reprojection_errors(
aux_result, val_detections, board_poses_dict
)
print(" Auxiliary cameras:")
for cam_name in sorted(aux_cam_names):
if cam_name in aux_reproj.per_camera:
rms = aux_reproj.per_camera[cam_name]
print(f" {cam_name}: RMS {rms:.3f} pixels")
else:
print(f" {cam_name}: RMS N/A (no valid observations)")
# Store primary metrics for later use
reproj_errors = primary_reproj
reconstruction_errors = primary_3d
timings["validation"] = time.perf_counter() - _validation_t0
# --- Generate Diagnostics ---
print("\n[Diagnostics] Generating report...")
with _time_stage(timings, "diagnostics_generate"):
diagnostic_report = generate_diagnostic_report(
calibration=primary_result, # Use primary-only for summary stats
detections=val_detections,
board_poses=board_poses_dict,
reprojection_errors=reproj_errors,
reconstruction_errors=reconstruction_errors,
board=board,
auxiliary_reprojection=aux_reproj,
)
# Build timings payload AFTER all timed regions and BEFORE the save call
# (the save itself is intentionally not in the timing block).
timings_payload = {
"seconds_per_stage": dict(timings),
"total_seconds": float(sum(timings.values())),
}
# Save diagnostics (uses full temp_result for plots, but report has primary-only stats)
save_diagnostic_report(
diagnostic_report,
temp_result, # Full result for plots
val_detections,
config.output_dir,
save_images=True,
auxiliary_reprojection=aux_reproj,
timings=timings_payload,
)
print(f" Saved diagnostics to {config.output_dir}")
# --- Build Final Result ---
# Merge primary + auxiliary per-camera errors so all cameras appear in diagnostics
all_per_camera = dict(reproj_errors.per_camera)
if aux_reproj is not None:
all_per_camera.update(aux_reproj.per_camera)
# Merge primary + auxiliary residuals and camera labels
if config.save_detailed_residuals:
all_residuals = reproj_errors.residuals
all_labels = (
reproj_errors.camera_labels.tolist()
if reproj_errors.camera_labels is not None
else []
)
if aux_reproj is not None and len(aux_reproj.residuals) > 0:
all_residuals = np.concatenate(
[reproj_errors.residuals, aux_reproj.residuals]
)
aux_labels = (
aux_reproj.camera_labels.tolist()
if aux_reproj.camera_labels is not None
else []
)
all_labels = all_labels + aux_labels
else:
all_residuals = None
all_labels = None
diagnostics = DiagnosticsData(
reprojection_error_rms=reproj_errors.rms,
reprojection_error_per_camera=all_per_camera,
validation_3d_error_mean=reconstruction_errors.mean,
validation_3d_error_std=reconstruction_errors.std,
per_corner_residuals=all_residuals,
per_corner_camera_labels=all_labels or None,
per_frame_errors=(
reproj_errors.per_frame if config.save_detailed_residuals else None
),
)
metadata = CalibrationMetadata(
calibration_date=datetime.now().isoformat(),
software_version=importlib.metadata.version("aquacal"),
config_hash=_compute_config_hash(config),
num_frames_used=len(optim_detections.frames),
num_frames_holdout=len(val_detections.frames),
)
result = _build_calibration_result(
intrinsics=final_intrinsics,
extrinsics=final_extrinsics,
water_z_values=final_distances,
board_config=config.board,
interface_params=interface_params,
diagnostics=diagnostics,
metadata=metadata,
auxiliary_cameras=set(config.auxiliary_cameras),
)
# --- Save Calibration ---
print("\n[Save] Saving calibration result...")
output_path = config.output_dir / "calibration.json"
save_calibration(result, output_path)
print(f" Saved to {output_path}")
print("\n" + "=" * 60)
print("Calibration complete!")
print(" Primary cameras:")
if np.isnan(reproj_errors.rms):
print(" Reprojection RMS: N/A")
else:
print(f" Reprojection RMS: {reproj_errors.rms:.3f} pixels")
if np.isnan(reconstruction_errors.mean):
print(" 3D error: N/A")
else:
print(
f" 3D error: MAE {reconstruction_errors.mean * 1000:.2f} mm, "
f"RMSE {reconstruction_errors.rmse * 1000:.2f} mm "
f"({reconstruction_errors.percent_error:.1f}%)"
)
if aux_cam_names:
print(" Auxiliary cameras:")
for cam_name in sorted(aux_cam_names):
if aux_reproj and cam_name in aux_reproj.per_camera:
rms = aux_reproj.per_camera[cam_name]
print(f" {cam_name}: RMS {rms:.3f} pixels")
print("=" * 60)
return result
def _estimate_validation_poses(
detections: DetectionResult,
initial_poses: dict[int, BoardPose],
intrinsics: dict[str, CameraIntrinsics],
extrinsics: dict[str, CameraExtrinsics],
water_z_values: dict[str, float],
board: BoardGeometry,
interface_normal: np.ndarray,
n_air: float,
n_water: float,
) -> dict[int, BoardPose]:
"""Refine board poses for validation frames via per-frame optimization.
For each frame, minimizes refractive reprojection error over the 6 pose
parameters (rvec, tvec) while holding all camera parameters fixed.
Args:
detections: Detection results for validation frames
initial_poses: PnP-initialized board poses
intrinsics: Per-camera intrinsics
extrinsics: Per-camera extrinsics
water_z_values: Per-camera interface distances
board: Board geometry
interface_normal: Interface normal vector
n_air: Refractive index of air
n_water: Refractive index of water
Returns:
Dict mapping frame_idx to refined BoardPose
"""
from scipy.optimize import least_squares
from aquacal.core.camera import create_camera
from aquacal.core.interface_model import Interface
from aquacal.core.refractive_geometry import refractive_project
refined_poses = {}
for frame_idx, initial_pose in initial_poses.items():
if frame_idx not in detections.frames:
continue
frame_det = detections.frames[frame_idx]
# Build cameras and interface objects
cameras = {}
for cam_name in frame_det.detections:
if cam_name not in intrinsics:
continue
cameras[cam_name] = create_camera(
cam_name, intrinsics[cam_name], extrinsics[cam_name]
)
interface = Interface(
normal=interface_normal,
camera_distances=water_z_values,
n_air=n_air,
n_water=n_water,
)
# Cost function: refractive reprojection residuals for this frame
def frame_residuals(params):
rvec = params[:3]
tvec = params[3:]
corners_3d = board.transform_corners(rvec, tvec)
residuals = []
for cam_name, det in frame_det.detections.items():
if cam_name not in cameras:
continue
camera = cameras[cam_name]
for i, corner_id in enumerate(det.corner_ids):
pt_3d = corners_3d[int(corner_id)]
projected = refractive_project(camera, interface, pt_3d)
if projected is not None:
residuals.append(det.corners_2d[i, 0] - projected[0])
residuals.append(det.corners_2d[i, 1] - projected[1])
else:
residuals.append(100.0)
residuals.append(100.0)
return residuals if residuals else [0.0, 0.0]
x0 = np.concatenate([initial_pose.rvec, initial_pose.tvec])
result = least_squares(frame_residuals, x0, method="lm", max_nfev=100)
refined_poses[frame_idx] = BoardPose(
frame_idx=frame_idx,
rvec=result.x[:3],
tvec=result.x[3:],
)
return refined_poses
def _filter_cameras(
result: CalibrationResult,
camera_names: set[str],
) -> CalibrationResult:
"""
Create a new CalibrationResult containing only the specified cameras.
Args:
result: Original CalibrationResult
camera_names: Set of camera names to include
Returns:
New CalibrationResult with filtered cameras
"""
filtered_cameras = {
name: calib for name, calib in result.cameras.items() if name in camera_names
}
return CalibrationResult(
cameras=filtered_cameras,
interface=result.interface,
board=result.board,
diagnostics=result.diagnostics,
metadata=result.metadata,
)
def _build_calibration_result(
intrinsics: dict[str, CameraIntrinsics],
extrinsics: dict[str, CameraExtrinsics],
water_z_values: dict[str, float],
board_config: BoardConfig,
interface_params: InterfaceParams,
diagnostics: DiagnosticsData,
metadata: CalibrationMetadata,
auxiliary_cameras: set[str] | None = None,
) -> CalibrationResult:
"""
Assemble final CalibrationResult from components.
Args:
intrinsics: Per-camera intrinsic parameters
extrinsics: Per-camera extrinsic parameters
water_z_values: Per-camera interface distances
board_config: Board configuration used
interface_params: Interface parameters (normal, refractive indices)
diagnostics: Validation diagnostics
metadata: Calibration metadata
auxiliary_cameras: Set of auxiliary camera names
Returns:
Complete CalibrationResult
"""
cameras = {}
for cam_name in intrinsics:
cameras[cam_name] = CameraCalibration(
name=cam_name,
intrinsics=intrinsics[cam_name],
extrinsics=extrinsics[cam_name],
water_z=water_z_values[cam_name],
is_auxiliary=cam_name in (auxiliary_cameras or set()),
)
return CalibrationResult(
cameras=cameras,
interface=interface_params,
board=board_config,
diagnostics=diagnostics,
metadata=metadata,
)
def _compute_config_hash(config: CalibrationConfig) -> str:
"""
Compute hash of configuration for reproducibility tracking.
Args:
config: Calibration configuration
Returns:
Hex string hash of configuration
"""
# Create deterministic string representation
hash_input = (
f"{config.board.squares_x},{config.board.squares_y},"
f"{config.board.square_size},{config.board.marker_size},"
f"{config.n_air},{config.n_water},"
f"{config.robust_loss},{config.loss_scale},"
f"{config.holdout_fraction}"
)
# Include intrinsic_board if provided
if config.intrinsic_board is not None:
hash_input += (
f",intrinsic:{config.intrinsic_board.squares_x},"
f"{config.intrinsic_board.squares_y},"
f"{config.intrinsic_board.square_size},"
f"{config.intrinsic_board.marker_size}"
)
# Include initial_water_z if provided
if config.initial_water_z is not None:
# Sort by camera name for deterministic hash
sorted_distances = sorted(config.initial_water_z.items())
distance_str = ",".join(f"{cam}:{dist}" for cam, dist in sorted_distances)
hash_input += f",init_dist:{distance_str}"
return hashlib.md5(hash_input.encode()).hexdigest()[:12]