Source code for aquacal.calibration.intrinsics

"""Stage 1: Per-camera intrinsic calibration."""

from __future__ import annotations

from pathlib import Path
from typing import Callable

import cv2
import numpy as np
from numpy.typing import NDArray

from aquacal.config.schema import CameraIntrinsics
from aquacal.core.board import BoardGeometry
from aquacal.io.detection import _create_frame_source, detect_charuco


[docs] def validate_intrinsics( intrinsics: CameraIntrinsics, camera_name: str = "", max_roundtrip_error_px: float = 1.0, expected_fx: float | None = None, fx_tolerance_fraction: float = 0.3, ) -> list[str]: """ Validate intrinsic calibration quality with post-calibration sanity checks. Detects bad intrinsic calibrations (broken undistortion, unstable distortion model, implausible focal length) before they cascade into wrong extrinsics. Args: intrinsics: The calibration result to validate camera_name: For warning messages (default "") max_roundtrip_error_px: Maximum acceptable undistortion roundtrip error (default 1.0 px) expected_fx: If provided, check that calibrated fx is within tolerance of this value (default None = skip check) fx_tolerance_fraction: Fractional tolerance for fx check (default 0.3 = 30%) Returns: List of warning strings (empty = all checks passed) Example: >>> warnings = validate_intrinsics(intrinsics, camera_name="cam0") >>> for w in warnings: ... print(f"WARNING: {w}") """ warnings = [] K = intrinsics.K dist_coeffs = intrinsics.dist_coeffs w, h = intrinsics.image_size is_fisheye = intrinsics.is_fisheye # --- Check 1: Undistortion roundtrip --- # Generate grid of pixel coordinates (exclude 5% border) border = 0.05 x_min, x_max = int(w * border), int(w * (1 - border)) y_min, y_max = int(h * border), int(h * (1 - border)) x_grid = np.linspace(x_min, x_max, 20) y_grid = np.linspace(y_min, y_max, 20) xx, yy = np.meshgrid(x_grid, y_grid) pixels = np.stack([xx.ravel(), yy.ravel()], axis=1).astype(np.float64) # Undistort pixels to get normalized coordinates pixels_reshaped = pixels.reshape(-1, 1, 2) if is_fisheye: D = dist_coeffs.reshape(4, 1) normalized = cv2.fisheye.undistortPoints(pixels_reshaped, K=K, D=D) else: normalized = cv2.undistortPoints(pixels_reshaped, K, dist_coeffs) normalized = normalized.reshape(-1, 2) # Re-project normalized coordinates back to pixels # Use identity R and t (points are already in camera frame) rvec = np.zeros(3, dtype=np.float64) tvec = np.zeros(3, dtype=np.float64) # Convert normalized coords to 3D points (z=1) points_3d = np.hstack([normalized, np.ones((len(normalized), 1))]) points_3d = points_3d.reshape(-1, 1, 3).astype(np.float64) if is_fisheye: reprojected, _ = cv2.fisheye.projectPoints(points_3d, rvec, tvec, K, D) else: reprojected, _ = cv2.projectPoints(points_3d, rvec, tvec, K, dist_coeffs) reprojected = reprojected.reshape(-1, 2) # Compute max absolute error errors = np.abs(pixels - reprojected) max_err = np.max(errors) if max_err > max_roundtrip_error_px: warnings.append( f"[{camera_name}] Undistortion roundtrip error {max_err:.1f} px " f"(threshold: {max_roundtrip_error_px} px). " f"Intrinsic calibration may be unreliable." ) # --- Check 2: Distortion monotonicity (pinhole only) --- if not is_fisheye: fx = K[0, 0] fy = K[1, 1] cx = K[0, 2] cy = K[1, 2] # Compute r_max = distance from principal point to farthest corner corners = np.array([[0, 0], [w, 0], [0, h], [w, h]], dtype=np.float64) # Convert to normalized coordinates r_values = [] for corner in corners: dx = (corner[0] - cx) / fx dy = (corner[1] - cy) / fy r = np.sqrt(dx**2 + dy**2) r_values.append(r) r_max = max(r_values) # Sample radii from 0 to r_max r_samples = np.linspace(0, r_max, 200) # Compute distortion factor at each radius k1, k2, p1, p2, k3 = ( dist_coeffs[:5] if len(dist_coeffs) >= 5 else (*dist_coeffs, *[0] * (5 - len(dist_coeffs))) ) # For 8-coeff rational model if len(dist_coeffs) == 8: k4, k5, k6 = dist_coeffs[5:8] numerator = 1 + k1 * r_samples**2 + k2 * r_samples**4 + k3 * r_samples**6 denominator = 1 + k4 * r_samples**2 + k5 * r_samples**4 + k6 * r_samples**6 distortion_factor = numerator / denominator else: # 5-coeff model distortion_factor = ( 1 + k1 * r_samples**2 + k2 * r_samples**4 + k3 * r_samples**6 ) # Check for negative values if np.any(distortion_factor < 0): idx_negative = np.where(distortion_factor < 0)[0][0] r_val = r_samples[idx_negative] warnings.append( f"[{camera_name}] Distortion model goes negative at r={r_val:.3f} " f"(image covers r_max={r_max:.3f}). Model is ill-conditioned." ) else: # Check for non-monotonicity (decreasing by more than 5%) diffs = np.diff(distortion_factor) decreases = diffs < 0 if np.any(decreases): # Check if any decrease is > 5% of previous value for i in range(len(diffs)): if diffs[i] < 0: pct_decrease = ( abs(diffs[i]) / abs(distortion_factor[i]) if distortion_factor[i] != 0 else 0 ) if pct_decrease > 0.05: r_val = r_samples[i + 1] warnings.append( f"[{camera_name}] Distortion model is non-monotonic at r={r_val:.3f} " f"(image covers r_max={r_max:.3f}). Model is ill-conditioned." ) break # --- Check 3: Focal length plausibility (optional) --- if expected_fx is not None: fx_actual = K[0, 0] fy_actual = K[1, 1] # Check fx diff_fx = abs(fx_actual - expected_fx) pct_fx = diff_fx / expected_fx if pct_fx > fx_tolerance_fraction: warnings.append( f"[{camera_name}] Calibrated fx={fx_actual:.1f} differs from expected " f"{expected_fx:.1f} by {pct_fx * 100:.0f}% (tolerance: {fx_tolerance_fraction * 100:.0f}%)." ) # Check fy similarly diff_fy = abs(fy_actual - expected_fx) pct_fy = diff_fy / expected_fx if pct_fy > fx_tolerance_fraction: warnings.append( f"[{camera_name}] Calibrated fy={fy_actual:.1f} differs from expected " f"{expected_fx:.1f} by {pct_fy * 100:.0f}% (tolerance: {fx_tolerance_fraction * 100:.0f}%)." ) return warnings
[docs] def calibrate_intrinsics_single( video_path: str | Path, board: BoardGeometry, max_frames: int = 100, min_corners: int = 8, frame_step: int = 1, rational_model: bool = False, fisheye: bool = False, ) -> tuple[CameraIntrinsics, float]: """ Calibrate intrinsics for a single camera from in-air video. Detects ChArUco corners in video frames, selects a subset with good spatial coverage, and runs OpenCV camera calibration. Args: video_path: Path to calibration video (in-air, no refraction) board: Board geometry max_frames: Maximum number of frames to use for calibration (default 100) min_corners: Minimum corners required per frame (default 8) frame_step: Process every Nth frame from video (default 1) rational_model: If True, use 8-coefficient rational distortion model instead of the standard 5-coefficient model. Use for wide-angle lenses. fisheye: If True, use equidistant fisheye calibration (cv2.fisheye.calibrate). Returns 4 distortion coefficients (k1-k4). Incompatible with rational_model. Returns: Tuple of (CameraIntrinsics, reprojection_error_rms): - CameraIntrinsics with K and dist_coeffs (5, 8, or 4 coefficients), and image_size - RMS reprojection error in pixels Raises: ValueError: If no valid frames found or calibration fails Example: >>> board = BoardGeometry(config) >>> intrinsics, error = calibrate_intrinsics_single("cam0_inair.mp4", board) >>> print(f"Reprojection error: {error:.3f} pixels") """ video_path = Path(video_path) # Collect detections from video all_detections: list[tuple[NDArray[np.int32], NDArray[np.float64]]] = [] image_size: tuple[int, int] | None = None with _create_frame_source({video_path.stem: str(video_path)}) as vs: for frame_idx, frames in vs.iterate_frames(step=frame_step): frame = frames[video_path.stem] if frame is None: continue # Capture image size from first frame if image_size is None: h, w = frame.shape[:2] image_size = (w, h) # OpenCV convention: (width, height) # Detect ChArUco corners detection = detect_charuco(frame, board) if detection is None or detection.num_corners < min_corners: continue # Skip collinear detections (degenerate for homography estimation) obj_pts = board.get_corner_array(detection.corner_ids) if np.linalg.matrix_rank(obj_pts[:, :2] - obj_pts[0, :2]) < 2: continue all_detections.append((detection.corner_ids, detection.corners_2d)) if not all_detections: raise ValueError( f"No ChArUco board detected in any frame of {video_path}. " f"Check that the video contains visible board frames and that " f"the board configuration (dictionary, dimensions) is correct." ) if image_size is None: raise ValueError(f"Could not determine image size from {video_path}") # Select frames for calibration (with spatial coverage) selected = _select_calibration_frames(all_detections, max_frames, image_size) if len(selected) < 4: raise ValueError( f"Insufficient frames for calibration: {len(selected)} (need at least 4)" ) if fisheye and rational_model: raise ValueError("fisheye and rational_model are mutually exclusive") # Prepare data for OpenCV calibration object_points = [] # 3D points in board frame image_points = [] # 2D points in image for corner_ids, corners_2d in selected: obj_pts = board.get_corner_array(corner_ids) object_points.append(obj_pts.astype(np.float32)) image_points.append(corners_2d.astype(np.float32)) if fisheye: # cv2.fisheye.calibrate requires (1, N, 3) and (1, N, 2) per image fisheye_obj_pts = [pts.reshape(1, -1, 3) for pts in object_points] fisheye_img_pts = [pts.reshape(1, -1, 2) for pts in image_points] K_init = np.eye(3, dtype=np.float64) D_init = np.zeros((4, 1), dtype=np.float64) fisheye_flags = ( cv2.fisheye.CALIB_RECOMPUTE_EXTRINSIC + cv2.fisheye.CALIB_CHECK_COND ) try: ret, K, D, rvecs, tvecs = cv2.fisheye.calibrate( fisheye_obj_pts, fisheye_img_pts, image_size, K_init, D_init, flags=fisheye_flags, ) except cv2.error as e: raise ValueError( f"Fisheye calibration failed (ill-conditioned problem). " f"Try providing more calibration frames with varied board poses. " f"OpenCV error: {e}" ) from e intrinsics = CameraIntrinsics( K=K.astype(np.float64), dist_coeffs=D.flatten().astype(np.float64), image_size=image_size, is_fisheye=True, ) return intrinsics, float(ret) # Run OpenCV calibration (pinhole model) flags = cv2.CALIB_RATIONAL_MODEL if rational_model else 0 ret, K, dist_coeffs, rvecs, tvecs = cv2.calibrateCamera( # type: ignore[call-overload] object_points, image_points, image_size, None, None, flags=flags, ) if not ret: raise ValueError("OpenCV calibration failed") intrinsics = CameraIntrinsics( K=K.astype(np.float64), dist_coeffs=dist_coeffs.flatten().astype(np.float64), image_size=image_size, ) # If the full model produces a bad undistortion roundtrip, retry with # progressively simpler distortion models (fix k3, then fix k3+k2). # With few calibration frames, higher-order radial coefficients overfit # and the distortion polynomial blows up outside the calibrated region. if not rational_model: simplification_flags = [ cv2.CALIB_FIX_K3, cv2.CALIB_FIX_K3 | cv2.CALIB_FIX_K2, ] for simp_flags in simplification_flags: warnings = validate_intrinsics(intrinsics) if not warnings: break ret_s, K_s, d_s, _, _ = cv2.calibrateCamera( # type: ignore[call-overload] object_points, image_points, image_size, None, None, flags=simp_flags, ) if ret_s: intrinsics = CameraIntrinsics( K=K_s.astype(np.float64), dist_coeffs=d_s.flatten().astype(np.float64), image_size=image_size, ) ret = ret_s return intrinsics, float(ret) # ret is RMS reprojection error
def _select_calibration_frames( detections: list[tuple[NDArray[np.int32], NDArray[np.float64]]], max_frames: int, image_size: tuple[int, int], ) -> list[tuple[NDArray[np.int32], NDArray[np.float64]]]: """ Select frames with good spatial coverage for calibration. Prioritizes frames where detected corners are spread across the image rather than clustered in one region. Args: detections: List of (corner_ids, corners_2d) tuples max_frames: Maximum number of frames to select image_size: Image dimensions (width, height) Returns: Selected subset of detections """ if len(detections) <= max_frames: return detections # Score each detection by spatial coverage # Use standard deviation of corner positions as coverage metric scores = [] for corner_ids, corners_2d in detections: # Normalize to [0, 1] range w, h = image_size normalized = corners_2d / np.array([w, h]) # Coverage score: higher std = better spread std_x = np.std(normalized[:, 0]) std_y = np.std(normalized[:, 1]) coverage_score = std_x + std_y # Also consider number of corners (more is better) corner_score = len(corner_ids) / 50.0 # Normalize roughly scores.append(coverage_score + corner_score) # Select top-scoring frames indices = np.argsort(scores)[::-1][:max_frames] return [detections[i] for i in indices]
[docs] def calibrate_intrinsics_all( video_paths: dict[str, str | Path], board: BoardGeometry, max_frames: int = 100, min_corners: int = 8, frame_step: int = 1, rational_model_cameras: list[str] | None = None, fisheye_cameras: list[str] | None = None, progress_callback: Callable[[str, int, int], None] | None = None, ) -> dict[str, tuple[CameraIntrinsics, float]]: """ Calibrate intrinsics for multiple cameras. Args: video_paths: Dict mapping camera_name to video file path board: Board geometry (same for all cameras) max_frames: Maximum frames per camera (default 100) min_corners: Minimum corners per frame (default 8) frame_step: Process every Nth frame (default 1) rational_model_cameras: List of camera names that should use the 8-coefficient rational distortion model (default None = all use 5-coeff) fisheye_cameras: List of camera names that should use the equidistant fisheye model (default None = none use fisheye) progress_callback: Optional callback(camera_name, current_cam, total_cams) Returns: Dict mapping camera_name to (CameraIntrinsics, reprojection_error_rms) Raises: ValueError: If calibration fails for any camera Example: >>> paths = {'cam0': 'cam0.mp4', 'cam1': 'cam1.mp4'} >>> board = BoardGeometry(config) >>> results = calibrate_intrinsics_all(paths, board) >>> for name, (intrinsics, error) in results.items(): ... print(f"{name}: error={error:.3f}px") """ results = {} camera_names = sorted(video_paths.keys()) total = len(camera_names) for idx, name in enumerate(camera_names): if progress_callback is not None: progress_callback(name, idx + 1, total) rational = rational_model_cameras is not None and name in rational_model_cameras use_fisheye = fisheye_cameras is not None and name in fisheye_cameras intrinsics, error = calibrate_intrinsics_single( video_paths[name], board, max_frames=max_frames, min_corners=min_corners, frame_step=frame_step, rational_model=rational, fisheye=use_fisheye, ) results[name] = (intrinsics, error) return results