Input/Output

The I/O package handles calibration result serialization, detection loading, and video processing.

Serialization

Save and load calibration results.

aquacal.io.serialization.save_calibration(result, path)[source]

Save calibration result to JSON file.

Parameters:
  • result (CalibrationResult) – Complete calibration result to save

  • path (str | Path) – Output file path (should end in .json)

Raises:

OSError – If file cannot be written

Return type:

None

Example

>>> save_calibration(result, "calibration.json")
aquacal.io.serialization.load_calibration(path)[source]

Load calibration result from JSON file.

Parameters:

path (str | Path) – Path to calibration JSON file

Returns:

CalibrationResult object

Raises:
Return type:

CalibrationResult

Example

>>> result = load_calibration("calibration.json")
>>> print(result.diagnostics.reprojection_error_rms)

Detection Loading

ChArUco detection wrapper.

aquacal.io.detection.detect_charuco(image, board, camera_matrix=None, dist_coeffs=None)[source]

Detect ChArUco corners in a single image.

Uses OpenCV 4.6+ ArUco API. Converts BGR to grayscale internally if needed.

Parameters:
  • image (ndarray[tuple[int, ...], dtype[uint8]]) – Grayscale (H, W) or BGR (H, W, 3) image as uint8

  • board (BoardGeometry) – Board geometry (provides OpenCV CharucoBoard)

  • camera_matrix (ndarray[tuple[int, ...], dtype[float64]] | None) – Optional 3x3 intrinsic matrix for corner refinement

  • dist_coeffs (ndarray[tuple[int, ...], dtype[float64]] | None) – Optional distortion coefficients for corner refinement

Returns:

Detection object containing corner_ids and corners_2d, or None if no corners detected.

Return type:

Detection | None

Example

>>> image = cv2.imread('calibration_frame.png')
>>> board = BoardGeometry(config)
>>> detection = detect_charuco(image, board)
>>> if detection is not None:
...     print(f"Found {detection.num_corners} corners")
aquacal.io.detection.detect_all_frames(video_paths, board, intrinsics=None, min_corners=4, frame_step=1, progress_callback=None)[source]

Detect ChArUco corners in all frames of synchronized frame source.

Iterates through all cameras at each frame index, detects ChArUco corners, and organizes results into a DetectionResult.

Supports both video files (VideoSet) and image directories (ImageSet) via automatic detection when dict of paths is passed.

Parameters:
  • video_paths (dict[str, str] | FrameSet) – Dict mapping camera_name to path (video file or image dir), or a FrameSet implementation (VideoSet/ImageSet). If dict is passed, frame source is auto-detected.

  • board (BoardGeometry) – Board geometry

  • intrinsics (dict[str, tuple[ndarray[tuple[int, ...], dtype[float64]], ndarray[tuple[int, ...], dtype[float64]]]] | None) – Optional dict mapping camera_name to (K, dist_coeffs) tuple. Used for corner refinement. Cameras not in dict use None.

  • min_corners (int) – Minimum corners required to keep a detection (default 4)

  • frame_step (int) – Process every Nth frame (default 1 = all frames)

  • progress_callback (Callable[[int, int], None] | None) – Optional callback(current_frame, total_frames) called after processing each frame

Returns:

DetectionResult containing all valid detections organized by frame and camera.

Return type:

DetectionResult

Example

>>> # With video files
>>> paths = {'cam0': 'video0.mp4', 'cam1': 'video1.mp4'}
>>> result = detect_all_frames(paths, board, min_corners=8, frame_step=5)
>>>
>>> # With image directories
>>> paths = {'cam0': 'data/cam0/', 'cam1': 'data/cam1/'}
>>> result = detect_all_frames(paths, board, min_corners=8)
>>>
>>> usable = result.get_frames_with_min_cameras(2)
>>> print(f"Found {len(usable)} frames with 2+ cameras")

Video Processing

Video loading and synchronized frame extraction.

class aquacal.io.video.VideoSet(video_paths)[source]

Bases: object

Manages multiple synchronized video files.

Videos are assumed to be temporally synchronized (frame 0 in all videos corresponds to the same moment in time). Videos may have different total frame counts; the synchronized length is the minimum across all videos.

Supports context manager protocol for automatic resource cleanup.

Example

>>> paths = {'cam0': 'video0.mp4', 'cam1': 'video1.mp4'}
>>> with VideoSet(paths) as videos:
...     print(f"Frame count: {videos.frame_count}")
...     for idx, frames in videos.iterate_frames(step=10):
...         # frames is dict[str, NDArray | None]
...         process(frames)
Parameters:

video_paths (dict[str, str])

video_paths

Dict mapping camera names to video file paths.

property camera_names: list[str]

List of camera names (sorted for deterministic ordering).

property frame_count: int

Synchronized frame count (minimum across all videos).

Opens videos if not already open (to read frame counts).

property is_open: bool

Whether video captures are currently open.

open()[source]

Open all video captures.

Called automatically on first frame access. Safe to call multiple times.

Raises:

RuntimeError – If any video file cannot be opened by OpenCV.

Return type:

None

close()[source]

Release all video captures.

Safe to call multiple times or when already closed.

Return type:

None

get_frame(frame_idx)[source]

Get a single synchronized frame from all cameras.

Opens videos if not already open.

Parameters:

frame_idx (int) – Frame index (0-based). Must be < frame_count.

Returns:

Dict mapping camera_name to BGR image (H, W, 3) as uint8. Value is None if that camera’s frame could not be read.

Raises:

IndexError – If frame_idx < 0 or frame_idx >= frame_count.

Return type:

dict[str, ndarray[tuple[int, …], dtype[uint8]] | None]

Note

This method seeks to the requested frame, which may be slow for non-sequential access. For sequential iteration, use iterate_frames().

iterate_frames(start=0, stop=None, step=1)[source]

Iterate over synchronized frames.

Opens videos if not already open. Frames are read sequentially for efficiency (no seeking when step=1).

Parameters:
  • start (int) – Starting frame index (inclusive). Default 0.

  • stop (int | None) – Ending frame index (exclusive). None means frame_count.

  • step (int) – Frame step. 1 = every frame, 2 = every other frame, etc. Must be >= 1.

Yields:

Tuple of (frame_idx, frame_dict) where frame_dict maps camera_name to BGR image (H, W, 3) as uint8, or None if read failed.

Raises:

ValueError – If start < 0, stop < start, or step < 1.

Return type:

Iterator[tuple[int, dict[str, ndarray[tuple[int, …], dtype[uint8]] | None]]]

Example

>>> with VideoSet(paths) as videos:
...     for idx, frames in videos.iterate_frames(step=5):
...         for cam, img in frames.items():
...             if img is not None:
...                 cv2.imwrite(f'{cam}_{idx}.png', img)