Source code for aquacal.datasets.synthetic

"""Synthetic data generation for testing and validation.

This module provides functions to generate synthetic calibration data with known
ground truth. The main entry point is ``create_scenario()`` which returns
predefined test scenarios with complete ground truth.
"""

from __future__ import annotations

from dataclasses import dataclass

import numpy as np
from numpy.typing import NDArray

from aquacal.config.schema import (
    BoardConfig,
    BoardPose,
    CalibrationResult,
    CameraExtrinsics,
    CameraIntrinsics,
    Detection,
    DetectionResult,
    FrameDetections,
)
from aquacal.core.board import BoardGeometry
from aquacal.core.camera import Camera
from aquacal.core.interface_model import Interface
from aquacal.core.refractive_geometry import refractive_project


[docs] @dataclass class SyntheticScenario: """Complete synthetic test scenario with ground truth. Attributes: name: Scenario name board_config: ChArUco board specification intrinsics: Per-camera intrinsics extrinsics: Per-camera extrinsics water_zs: Per-camera interface distances (Z-coordinate of water surface) board_poses: List of board poses for all frames noise_std: Gaussian noise standard deviation applied to detections (pixels) description: Human-readable description images: Optional dict of rendered images (camera_name -> frame_idx -> image) """ name: str board_config: BoardConfig intrinsics: dict[str, CameraIntrinsics] extrinsics: dict[str, CameraExtrinsics] water_zs: dict[str, float] board_poses: list[BoardPose] noise_std: float description: str images: dict[str, dict[int, NDArray]] | None = None
[docs] def generate_camera_intrinsics( image_size: tuple[int, int] = (1920, 1080), fov_horizontal_deg: float = 60.0, principal_point_offset: tuple[float, float] = (0.0, 0.0), distortion_k1: float = 0.0, distortion_k2: float = 0.0, ) -> CameraIntrinsics: """ Generate camera intrinsics with specified parameters. Args: image_size: (width, height) in pixels fov_horizontal_deg: Horizontal field of view in degrees principal_point_offset: Offset from image center (pixels) distortion_k1: First radial distortion coefficient distortion_k2: Second radial distortion coefficient Returns: CameraIntrinsics with computed K matrix and distortion """ width, height = image_size # Compute focal length from horizontal FOV # fov = 2 * atan(width / (2 * fx)) # => fx = width / (2 * tan(fov/2)) fov_rad = np.deg2rad(fov_horizontal_deg) fx = width / (2 * np.tan(fov_rad / 2)) fy = fx # Square pixels # Principal point at image center plus offset cx = width / 2 + principal_point_offset[0] cy = height / 2 + principal_point_offset[1] K = np.array( [ [fx, 0.0, cx], [0.0, fy, cy], [0.0, 0.0, 1.0], ], dtype=np.float64, ) # Distortion coefficients: [k1, k2, p1, p2, k3] dist_coeffs = np.array( [distortion_k1, distortion_k2, 0.0, 0.0, 0.0], dtype=np.float64 ) return CameraIntrinsics(K=K, dist_coeffs=dist_coeffs, image_size=image_size)
def _rotation_z(angle: float) -> NDArray[np.float64]: """Create rotation matrix for rotation around Z axis.""" c, s = np.cos(angle), np.sin(angle) return np.array( [ [c, -s, 0], [s, c, 0], [0, 0, 1], ], dtype=np.float64, )
[docs] def generate_camera_array( n_cameras: int, layout: str = "grid", spacing: float = 0.1, height_above_water: float = 0.15, height_variation: float = 0.005, image_size: tuple[int, int] = (1920, 1080), fov_deg: float = 60.0, seed: int = 42, ) -> tuple[dict[str, CameraIntrinsics], dict[str, CameraExtrinsics], dict[str, float]]: """ Generate a realistic camera array with known ground truth. Args: n_cameras: Number of cameras (2-14) layout: Camera arrangement - "grid", "line", or "ring" spacing: Distance between adjacent cameras (meters) height_above_water: Mean interface distance (meters) height_variation: Std dev of per-camera height variation (meters) image_size: Image dimensions (width, height) fov_deg: Horizontal field of view seed: Random seed for reproducibility Returns: Tuple of (intrinsics, extrinsics, water_zs) dicts keyed by camera name. Camera "cam0" is always the reference camera at origin with identity rotation. """ rng = np.random.default_rng(seed) intrinsics: dict[str, CameraIntrinsics] = {} extrinsics: dict[str, CameraExtrinsics] = {} distances: dict[str, float] = {} # Generate camera positions based on layout positions: list[NDArray[np.float64]] = [] if layout == "grid": # Arrange in rough square grid side = int(np.ceil(np.sqrt(n_cameras))) for i in range(n_cameras): row, col = i // side, i % side x = (col - (side - 1) / 2) * spacing y = (row - (side - 1) / 2) * spacing positions.append(np.array([x, y, 0.0], dtype=np.float64)) elif layout == "line": for i in range(n_cameras): positions.append(np.array([i * spacing, 0.0, 0.0], dtype=np.float64)) elif layout == "ring": angles = np.linspace(0, 2 * np.pi, n_cameras, endpoint=False) radius = spacing * n_cameras / (2 * np.pi) for a in angles: positions.append( np.array( [radius * np.cos(a), radius * np.sin(a), 0.0], dtype=np.float64 ) ) else: raise ValueError(f"Unknown layout: {layout}") # Center positions so cam0 is at origin offset = positions[0].copy() positions = [p - offset for p in positions] for i, pos in enumerate(positions): cam_name = f"cam{i}" # Intrinsics: same for all cameras intrinsics[cam_name] = generate_camera_intrinsics( image_size=image_size, fov_horizontal_deg=fov_deg, ) # Extrinsics: cameras look straight down # Add small random roll for realism (but not for reference camera) if i == 0: roll = 0.0 else: roll = rng.uniform(-0.1, 0.1) # radians R = _rotation_z(roll) t = -R @ pos extrinsics[cam_name] = CameraExtrinsics(R=R, t=t) # Interface distance with small variation if i == 0: dist = height_above_water # Reference camera: exact height else: dist = height_above_water + rng.normal(0, height_variation) distances[cam_name] = dist return intrinsics, extrinsics, distances
[docs] def generate_real_rig_array() -> tuple[ dict[str, CameraIntrinsics], dict[str, CameraExtrinsics], dict[str, float] ]: """Generate camera array matching the real-world 12-camera rig. Geometry is derived from an actual calibration of the AquaCal hardware rig (12 cameras, e3v8250 excluded) with the following idealizations applied: - Common intrinsics: focal length, principal point, and distortion are averaged across all 12 cameras. - All cameras placed at Z = 0 (average real Z ≈ 0). - All optical axes aligned to world +Z (looking straight down); real cameras deviate < 5 deg. - XY positions preserved from the real calibration. - Common ``water_z = 1.031 m`` (the calibrated value). Returns: Tuple of ``(intrinsics, extrinsics, water_zs)`` dicts keyed by camera name (cam0 … cam11). """ IMAGE_SIZE = (1600, 1200) WATER_Z = 1.031 # Averaged intrinsics across 12 real cameras K = np.array( [ [1587.79, 0.0, 780.22], [0.0, 1588.34, 601.74], [0.0, 0.0, 1.0], ], dtype=np.float64, ) dist_coeffs = np.array( [-0.5022, 0.2968, 0.0006, 0.0025, -0.0552], dtype=np.float64, ) common_intrinsics = CameraIntrinsics( K=K, dist_coeffs=dist_coeffs, image_size=IMAGE_SIZE ) # XY positions from real calibration (world frame, Z forced to 0). # Derived from C = -R^T @ t for each camera in calibration.json, # excluding e3v8250. Ordered CCW from the reference camera (cam0) # when viewed from above, so cam indices trace a spatial loop around # the rig. _POSITIONS_XY: list[tuple[float, float]] = [ (0.0000, 0.0000), # cam0 (reference, e3v829d) (0.2080, 0.2419), # cam1 (e3v832e) (0.3353, 0.5730), # cam2 (e3v82f9) (0.2227, 0.8684), # cam3 (e3v83ef) (0.0039, 1.1490), # cam4 (e3v83ee) (-0.3363, 1.1930), # cam5 (e3v83e9) (-0.6801, 1.1523), # cam6 (e3v83f1) (-0.8868, 0.8828), # cam7 (e3v83eb) (-1.0023, 0.5654), # cam8 (e3v83f0) (-0.8949, 0.2677), # cam9 (e3v831e) (-0.6639, 0.0038), # cam10 (e3v8334) (-0.3364, -0.0573), # cam11 (e3v82e0) ] intrinsics: dict[str, CameraIntrinsics] = {} extrinsics: dict[str, CameraExtrinsics] = {} water_zs: dict[str, float] = {} R_identity = np.eye(3, dtype=np.float64) for i, (cx, cy) in enumerate(_POSITIONS_XY): cam_name = f"cam{i}" intrinsics[cam_name] = common_intrinsics t = np.array([-cx, -cy, 0.0], dtype=np.float64) extrinsics[cam_name] = CameraExtrinsics(R=R_identity.copy(), t=t) water_zs[cam_name] = WATER_Z return intrinsics, extrinsics, water_zs
[docs] def generate_board_trajectory( n_frames: int, camera_positions: dict[str, NDArray[np.float64]], water_zs: dict[str, float], depth_range: tuple[float, float] = (0.3, 0.6), xy_extent: float = 0.15, rotation_range_deg: float = 15.0, min_cameras_per_frame: int = 2, seed: int = 42, ) -> list[BoardPose]: """ Generate board poses ensuring pose graph connectivity. Creates a trajectory that ensures: - Each frame is visible by at least min_cameras_per_frame cameras - The pose graph is connected (can chain from reference to all cameras) - Board stays within reasonable depth range underwater Args: n_frames: Number of frames to generate camera_positions: Dict of camera center positions (from extrinsics) water_zs: Per-camera interface distances depth_range: (min_z, max_z) for board center in world coords xy_extent: Maximum XY offset from origin rotation_range_deg: Maximum board tilt from horizontal min_cameras_per_frame: Minimum cameras that must see board seed: Random seed Returns: List of BoardPose objects with frame indices 0 to n_frames-1 """ rng = np.random.default_rng(seed) poses: list[BoardPose] = [] for frame_idx in range(n_frames): # Position: random within extent, random depth x = rng.uniform(-xy_extent, xy_extent) y = rng.uniform(-xy_extent, xy_extent) z = rng.uniform(depth_range[0], depth_range[1]) tvec = np.array([x, y, z], dtype=np.float64) # Rotation: small tilts, full in-plane rotation max_tilt = np.deg2rad(rotation_range_deg) rx = rng.uniform(-max_tilt, max_tilt) ry = rng.uniform(-max_tilt, max_tilt) rz = rng.uniform(-np.pi, np.pi) rvec = np.array([rx, ry, rz], dtype=np.float64) poses.append(BoardPose(frame_idx=frame_idx, rvec=rvec, tvec=tvec)) return poses
[docs] def generate_real_rig_trajectory( n_frames: int = 100, depth_range: tuple[float, float] = (1.1, 2.0), seed: int = 42, ) -> list[BoardPose]: """Generate board trajectory appropriate for the real rig geometry. The real rig has cameras at Z ≈ 0 with water surface at Z ≈ 1.03 m, so the board should be below the water surface (default 1.1–2.0 m, i.e. ~70–970 mm below the surface). Trajectory covers the full field of view: - Positions sweep across the ~1.3 × 1.2 m footprint of the camera array - Ensures connectivity by visiting regions seen by multiple cameras Args: n_frames: Number of frames to generate depth_range: (min_z, max_z) for board center in world coords seed: Random seed Returns: List of BoardPose objects """ rng = np.random.default_rng(seed) # The rig spans ~1.3m in X (-1.00 to +0.34) and ~1.2m in Y (0 to 1.19). # Center of the rig footprint is approximately (-0.34, 0.55). # Board should move throughout this area to ensure all cameras see it. X_CENTER, Y_CENTER = -0.34, 0.55 XY_EXTENT = 0.7 # +/-700mm from center to ensure full coverage ROTATION_RANGE_DEG = 20.0 poses: list[BoardPose] = [] for frame_idx in range(n_frames): # Position: random within footprint, random depth x = X_CENTER + rng.uniform(-XY_EXTENT, XY_EXTENT) y = Y_CENTER + rng.uniform(-XY_EXTENT, XY_EXTENT) z = rng.uniform(depth_range[0], depth_range[1]) tvec = np.array([x, y, z], dtype=np.float64) # Rotation: small tilts, full in-plane rotation max_tilt = np.deg2rad(ROTATION_RANGE_DEG) rx = rng.uniform(-max_tilt, max_tilt) ry = rng.uniform(-max_tilt, max_tilt) rz = rng.uniform(-np.pi, np.pi) rvec = np.array([rx, ry, rz], dtype=np.float64) poses.append(BoardPose(frame_idx=frame_idx, rvec=rvec, tvec=tvec)) return poses
[docs] def generate_dense_xy_grid( depth: float, n_grid: int = 7, xy_extent: float = 0.5, xy_center: tuple[float, float] = (0.0, 0.0), tilt_deg: float = 3.0, frame_offset: int = 0, seed: int = 42, ) -> list[BoardPose]: """Generate board poses at a regular XY grid at a fixed depth. Used for dense spatial coverage in reconstruction evaluation and heatmaps. Each grid position has a small random tilt and random in-plane rotation. Args: depth: Z coordinate for all board poses (meters) n_grid: Number of grid positions per axis (total poses = n_grid^2) xy_extent: Grid spans from -xy_extent to +xy_extent around xy_center in X and Y (meters) xy_center: (x, y) center of the grid (meters). Should match the centroid of the camera array for best coverage. tilt_deg: Maximum random tilt from horizontal (degrees) frame_offset: Starting frame index (default 0) seed: Random seed for reproducible tilts and rotations Returns: List of n_grid^2 BoardPose objects with frame indices starting from frame_offset. """ rng = np.random.default_rng(seed) # Generate grid positions centered on xy_center cx, cy = xy_center x_values = np.linspace(cx - xy_extent, cx + xy_extent, n_grid) y_values = np.linspace(cy - xy_extent, cy + xy_extent, n_grid) poses: list[BoardPose] = [] frame_idx = frame_offset for x in x_values: for y in y_values: tvec = np.array([x, y, depth], dtype=np.float64) # Small random tilt + random in-plane rotation max_tilt = np.deg2rad(tilt_deg) rx = rng.uniform(-max_tilt, max_tilt) ry = rng.uniform(-max_tilt, max_tilt) rz = rng.uniform(-np.pi, np.pi) rvec = np.array([rx, ry, rz], dtype=np.float64) poses.append(BoardPose(frame_idx=frame_idx, rvec=rvec, tvec=tvec)) frame_idx += 1 return poses
[docs] def generate_synthetic_detections( intrinsics: dict[str, CameraIntrinsics], extrinsics: dict[str, CameraExtrinsics], water_zs: dict[str, float], board: BoardGeometry, board_poses: list[BoardPose], noise_std: float = 0.0, min_corners: int = 8, seed: int = 42, ) -> DetectionResult: """ Generate synthetic detections by projecting through refractive interface. For each board pose and camera: 1. Transform board corners to world coordinates 2. Project each corner through refractive interface 3. Add Gaussian noise to pixel coordinates 4. Filter corners outside image bounds 5. Only include camera if >= min_corners visible Args: intrinsics: Per-camera intrinsics extrinsics: Per-camera extrinsics water_zs: Per-camera interface distances board: Board geometry board_poses: List of board poses noise_std: Gaussian noise standard deviation (pixels) min_corners: Minimum corners for valid detection seed: Random seed for noise Returns: DetectionResult matching format from real detection pipeline """ rng = np.random.default_rng(seed) interface_normal = np.array([0.0, 0.0, -1.0], dtype=np.float64) frames: dict[int, FrameDetections] = {} for bp in board_poses: corners_3d = board.transform_corners(bp.rvec, bp.tvec) detections_dict: dict[str, Detection] = {} for cam_name in intrinsics: camera = Camera(cam_name, intrinsics[cam_name], extrinsics[cam_name]) interface = Interface( normal=interface_normal, camera_distances={cam_name: water_zs[cam_name]}, ) corner_ids: list[int] = [] corners_2d: list[NDArray[np.float64]] = [] for corner_id in range(board.num_corners): point_3d = corners_3d[corner_id] projected = refractive_project(camera, interface, point_3d) if projected is not None: # Check if within image bounds w, h = intrinsics[cam_name].image_size if 0 <= projected[0] < w and 0 <= projected[1] < h: corner_ids.append(corner_id) px = projected.copy() if noise_std > 0: px += rng.normal(0, noise_std, 2) corners_2d.append(px) if len(corner_ids) >= min_corners: detections_dict[cam_name] = Detection( corner_ids=np.array(corner_ids, dtype=np.int32), corners_2d=np.array(corners_2d, dtype=np.float64), ) if detections_dict: frames[bp.frame_idx] = FrameDetections( frame_idx=bp.frame_idx, detections=detections_dict, ) return DetectionResult( frames=frames, camera_names=list(intrinsics.keys()), total_frames=len(board_poses), )
[docs] def compute_calibration_errors( result: CalibrationResult, ground_truth: SyntheticScenario, ) -> dict[str, float]: """ Compare calibration result to ground truth. Computes: - focal_length_error_percent: Max relative error in fx, fy - principal_point_error_px: Max error in cx, cy - rotation_error_deg: Max rotation error across cameras - translation_error_mm: Max translation error across cameras - water_z_error_mm: Max interface distance error Args: result: Calibration result from pipeline ground_truth: Synthetic scenario with known truth Returns: Dict of error metrics """ max_focal_error_pct = 0.0 max_pp_error_px = 0.0 max_rotation_error_deg = 0.0 max_translation_error_mm = 0.0 max_interface_error_mm = 0.0 for cam_name in ground_truth.intrinsics: if cam_name not in result.cameras: continue gt_intr = ground_truth.intrinsics[cam_name] gt_extr = ground_truth.extrinsics[cam_name] gt_dist = ground_truth.water_zs[cam_name] cal = result.cameras[cam_name] cal_intr = cal.intrinsics cal_extr = cal.extrinsics cal_dist = cal.water_z # Focal length error (relative) fx_gt, fy_gt = gt_intr.K[0, 0], gt_intr.K[1, 1] fx_cal, fy_cal = cal_intr.K[0, 0], cal_intr.K[1, 1] fx_err = abs(fx_cal - fx_gt) / fx_gt * 100 fy_err = abs(fy_cal - fy_gt) / fy_gt * 100 max_focal_error_pct = max(max_focal_error_pct, fx_err, fy_err) # Principal point error (absolute, pixels) cx_gt, cy_gt = gt_intr.K[0, 2], gt_intr.K[1, 2] cx_cal, cy_cal = cal_intr.K[0, 2], cal_intr.K[1, 2] pp_err = np.sqrt((cx_cal - cx_gt) ** 2 + (cy_cal - cy_gt) ** 2) max_pp_error_px = max(max_pp_error_px, pp_err) # Rotation error # Compute relative rotation: R_err = R_cal @ R_gt.T R_err = cal_extr.R @ gt_extr.R.T # Rotation angle from rotation matrix: angle = arccos((trace(R) - 1) / 2) trace = np.trace(R_err) # Clamp to valid range for arccos cos_angle = np.clip((trace - 1) / 2, -1.0, 1.0) angle_rad = np.arccos(cos_angle) angle_deg = np.rad2deg(angle_rad) max_rotation_error_deg = max(max_rotation_error_deg, angle_deg) # Translation error (in mm) # Camera center position difference C_gt = gt_extr.C C_cal = cal_extr.C trans_err_mm = np.linalg.norm(C_cal - C_gt) * 1000 max_translation_error_mm = max(max_translation_error_mm, trans_err_mm) # Interface distance error (in mm) dist_err_mm = abs(cal_dist - gt_dist) * 1000 max_interface_error_mm = max(max_interface_error_mm, dist_err_mm) return { "focal_length_error_percent": max_focal_error_pct, "principal_point_error_px": max_pp_error_px, "rotation_error_deg": max_rotation_error_deg, "translation_error_mm": max_translation_error_mm, "water_z_error_mm": max_interface_error_mm, }
[docs] def create_scenario(name: str, seed: int = 42) -> SyntheticScenario: """Create a predefined test scenario with complete ground truth. Available scenarios: - ``'ideal'``: 4 cameras, 20 frames, 0 noise — verify math correctness - ``'minimal'``: 2 cameras, 10 frames, 0.3 px noise — edge case - ``'realistic'``: 12 cameras matching actual hardware, 30 frames, 0.5 px noise All presets use the same ChArUco board (12x9 squares, 60 mm square size, 45 mm marker size, DICT_5X5_100). Args: name: Scenario name (``'ideal'``, ``'minimal'``, or ``'realistic'``) seed: Random seed for reproducibility Returns: SyntheticScenario with complete ground truth (intrinsics, extrinsics, interface distances, board poses). Raises: ValueError: If scenario name is not recognized. Examples: >>> from aquacal.datasets import create_scenario >>> scenario = create_scenario('ideal') >>> print(f"{len(scenario.intrinsics)} cameras, {len(scenario.board_poses)} frames") 4 cameras, 20 frames >>> >>> scenario = create_scenario('realistic') >>> print(f"{len(scenario.intrinsics)} cameras") 12 cameras """ # Common board config (matches real hardware) default_board = BoardConfig( squares_x=12, squares_y=9, square_size=0.060, marker_size=0.045, dictionary="DICT_5X5_100", ) if name == "ideal": intrinsics, extrinsics, distances = generate_camera_array( n_cameras=4, layout="grid", spacing=0.1, height_above_water=0.15, height_variation=0.0, seed=seed, ) camera_positions = {cam: ext.C for cam, ext in extrinsics.items()} board_poses = generate_board_trajectory( n_frames=20, camera_positions=camera_positions, water_zs=distances, depth_range=(0.25, 0.45), xy_extent=0.08, seed=seed, ) return SyntheticScenario( name="ideal", board_config=default_board, intrinsics=intrinsics, extrinsics=extrinsics, water_zs=distances, board_poses=board_poses, noise_std=0.0, description="Ideal conditions: 4 cameras, 20 frames, 0 noise", ) elif name == "minimal": intrinsics, extrinsics, distances = generate_camera_array( n_cameras=2, layout="line", spacing=0.15, height_above_water=0.15, height_variation=0.003, seed=seed, ) camera_positions = {cam: ext.C for cam, ext in extrinsics.items()} board_poses = generate_board_trajectory( n_frames=10, camera_positions=camera_positions, water_zs=distances, depth_range=(0.25, 0.40), xy_extent=0.06, seed=seed, ) return SyntheticScenario( name="minimal", board_config=default_board, intrinsics=intrinsics, extrinsics=extrinsics, water_zs=distances, board_poses=board_poses, noise_std=0.3, description="Minimal scenario: 2 cameras, 10 frames, 0.3px noise", ) elif name == "realistic": intrinsics, extrinsics, distances = generate_real_rig_array() board_poses = generate_real_rig_trajectory( n_frames=30, depth_range=(1.1, 2.0), seed=seed, ) return SyntheticScenario( name="realistic", board_config=default_board, intrinsics=intrinsics, extrinsics=extrinsics, water_zs=distances, board_poses=board_poses, noise_std=0.5, description="12-camera rig matching real hardware (idealized geometry)", ) valid_names = ["ideal", "minimal", "realistic"] raise ValueError(f"Unknown scenario: '{name}'. Valid names: {valid_names}")