Input/Output¶
The I/O package handles calibration result serialization, detection loading, and video processing.
Serialization¶
Save and load calibration results.
- aquacal.io.serialization.save_calibration(result, path)[source]¶
Save calibration result to JSON file.
- Parameters:
result (CalibrationResult) – Complete calibration result to save
- Raises:
OSError – If file cannot be written
- Return type:
None
Example
>>> save_calibration(result, "calibration.json")
- aquacal.io.serialization.load_calibration(path)[source]¶
Load calibration result from JSON file.
- Parameters:
- Returns:
CalibrationResult object
- Raises:
FileNotFoundError – If file does not exist
ValueError – If file format is invalid or version mismatch
- Return type:
Example
>>> result = load_calibration("calibration.json") >>> print(result.diagnostics.reprojection_error_rms)
Detection Loading¶
ChArUco detection wrapper.
- aquacal.io.detection.detect_charuco(image, board, camera_matrix=None, dist_coeffs=None)[source]¶
Detect ChArUco corners in a single image.
Uses OpenCV 4.6+ ArUco API. Converts BGR to grayscale internally if needed.
- Parameters:
image (ndarray[tuple[int, ...], dtype[uint8]]) – Grayscale (H, W) or BGR (H, W, 3) image as uint8
board (BoardGeometry) – Board geometry (provides OpenCV CharucoBoard)
camera_matrix (ndarray[tuple[int, ...], dtype[float64]] | None) – Optional 3x3 intrinsic matrix for corner refinement
dist_coeffs (ndarray[tuple[int, ...], dtype[float64]] | None) – Optional distortion coefficients for corner refinement
- Returns:
Detection object containing corner_ids and corners_2d, or None if no corners detected.
- Return type:
Detection | None
Example
>>> image = cv2.imread('calibration_frame.png') >>> board = BoardGeometry(config) >>> detection = detect_charuco(image, board) >>> if detection is not None: ... print(f"Found {detection.num_corners} corners")
- aquacal.io.detection.detect_all_frames(video_paths, board, intrinsics=None, min_corners=4, frame_step=1, progress_callback=None)[source]¶
Detect ChArUco corners in all frames of synchronized frame source.
Iterates through all cameras at each frame index, detects ChArUco corners, and organizes results into a DetectionResult.
Supports both video files (VideoSet) and image directories (ImageSet) via automatic detection when dict of paths is passed.
- Parameters:
video_paths (dict[str, str] | FrameSet) – Dict mapping camera_name to path (video file or image dir), or a FrameSet implementation (VideoSet/ImageSet). If dict is passed, frame source is auto-detected.
board (BoardGeometry) – Board geometry
intrinsics (dict[str, tuple[ndarray[tuple[int, ...], dtype[float64]], ndarray[tuple[int, ...], dtype[float64]]]] | None) – Optional dict mapping camera_name to (K, dist_coeffs) tuple. Used for corner refinement. Cameras not in dict use None.
min_corners (int) – Minimum corners required to keep a detection (default 4)
frame_step (int) – Process every Nth frame (default 1 = all frames)
progress_callback (Callable[[int, int], None] | None) – Optional callback(current_frame, total_frames) called after processing each frame
- Returns:
DetectionResult containing all valid detections organized by frame and camera.
- Return type:
Example
>>> # With video files >>> paths = {'cam0': 'video0.mp4', 'cam1': 'video1.mp4'} >>> result = detect_all_frames(paths, board, min_corners=8, frame_step=5) >>> >>> # With image directories >>> paths = {'cam0': 'data/cam0/', 'cam1': 'data/cam1/'} >>> result = detect_all_frames(paths, board, min_corners=8) >>> >>> usable = result.get_frames_with_min_cameras(2) >>> print(f"Found {len(usable)} frames with 2+ cameras")
Video Processing¶
Video loading and synchronized frame extraction.
- class aquacal.io.video.VideoSet(video_paths)[source]¶
Bases:
objectManages multiple synchronized video files.
Videos are assumed to be temporally synchronized (frame 0 in all videos corresponds to the same moment in time). Videos may have different total frame counts; the synchronized length is the minimum across all videos.
Supports context manager protocol for automatic resource cleanup.
Example
>>> paths = {'cam0': 'video0.mp4', 'cam1': 'video1.mp4'} >>> with VideoSet(paths) as videos: ... print(f"Frame count: {videos.frame_count}") ... for idx, frames in videos.iterate_frames(step=10): ... # frames is dict[str, NDArray | None] ... process(frames)
- video_paths¶
Dict mapping camera names to video file paths.
- property frame_count: int¶
Synchronized frame count (minimum across all videos).
Opens videos if not already open (to read frame counts).
- open()[source]¶
Open all video captures.
Called automatically on first frame access. Safe to call multiple times.
- Raises:
RuntimeError – If any video file cannot be opened by OpenCV.
- Return type:
None
- close()[source]¶
Release all video captures.
Safe to call multiple times or when already closed.
- Return type:
None
- get_frame(frame_idx)[source]¶
Get a single synchronized frame from all cameras.
Opens videos if not already open.
- Parameters:
frame_idx (int) – Frame index (0-based). Must be < frame_count.
- Returns:
Dict mapping camera_name to BGR image (H, W, 3) as uint8. Value is None if that camera’s frame could not be read.
- Raises:
IndexError – If frame_idx < 0 or frame_idx >= frame_count.
- Return type:
Note
This method seeks to the requested frame, which may be slow for non-sequential access. For sequential iteration, use iterate_frames().
- iterate_frames(start=0, stop=None, step=1)[source]¶
Iterate over synchronized frames.
Opens videos if not already open. Frames are read sequentially for efficiency (no seeking when step=1).
- Parameters:
- Yields:
Tuple of (frame_idx, frame_dict) where frame_dict maps camera_name to BGR image (H, W, 3) as uint8, or None if read failed.
- Raises:
ValueError – If start < 0, stop < start, or step < 1.
- Return type:
Iterator[tuple[int, dict[str, ndarray[tuple[int, …], dtype[uint8]] | None]]]
Example
>>> with VideoSet(paths) as videos: ... for idx, frames in videos.iterate_frames(step=5): ... for cam, img in frames.items(): ... if img is not None: ... cv2.imwrite(f'{cam}_{idx}.png', img)