Source code for aquacal.io.serialization

"""Save and load calibration results."""

from __future__ import annotations

import json
from pathlib import Path
from typing import Any

import numpy as np
from numpy.typing import NDArray

from aquacal.config.schema import (
    BoardConfig,
    CalibrationMetadata,
    CalibrationResult,
    CameraCalibration,
    CameraExtrinsics,
    CameraIntrinsics,
    DiagnosticsData,
    InterfaceParams,
)

# Current serialization format version
SERIALIZATION_VERSION = "1.0"


def _ndarray_to_list(arr: NDArray) -> list:
    """Convert numpy array to nested Python list."""
    return arr.tolist()


def _list_to_ndarray(lst: list, dtype: type = np.float64) -> NDArray:
    """Convert nested list to numpy array."""
    return np.array(lst, dtype=dtype)


def _serialize_camera_intrinsics(intrinsics: CameraIntrinsics) -> dict[str, Any]:
    """Serialize CameraIntrinsics to dict."""
    result = {
        "K": _ndarray_to_list(intrinsics.K),
        "dist_coeffs": _ndarray_to_list(intrinsics.dist_coeffs),
        "image_size": list(intrinsics.image_size),
    }
    if intrinsics.is_fisheye:
        result["is_fisheye"] = True
    return result


def _deserialize_camera_intrinsics(data: dict[str, Any]) -> CameraIntrinsics:
    """Deserialize dict to CameraIntrinsics."""
    return CameraIntrinsics(
        K=_list_to_ndarray(data["K"]),
        dist_coeffs=_list_to_ndarray(data["dist_coeffs"]),
        image_size=tuple(data["image_size"]),
        is_fisheye=data.get("is_fisheye", False),
    )


def _serialize_camera_extrinsics(extrinsics: CameraExtrinsics) -> dict[str, Any]:
    """Serialize CameraExtrinsics to dict."""
    return {
        "R": _ndarray_to_list(extrinsics.R),
        "t": _ndarray_to_list(extrinsics.t),
    }


def _deserialize_camera_extrinsics(data: dict[str, Any]) -> CameraExtrinsics:
    """Deserialize dict to CameraExtrinsics."""
    return CameraExtrinsics(
        R=_list_to_ndarray(data["R"]),
        t=_list_to_ndarray(data["t"]),
    )


def _serialize_camera_calibration(cam: CameraCalibration) -> dict[str, Any]:
    """Serialize CameraCalibration to dict."""
    result = {
        "name": cam.name,
        "intrinsics": _serialize_camera_intrinsics(cam.intrinsics),
        "extrinsics": _serialize_camera_extrinsics(cam.extrinsics),
        "water_z": cam.water_z,
    }
    if cam.is_auxiliary:
        result["is_auxiliary"] = True
    return result


def _deserialize_camera_calibration(data: dict[str, Any]) -> CameraCalibration:
    """Deserialize dict to CameraCalibration.

    Supports backward compatibility: accepts both 'water_z' (new) and
    'interface_distance' (legacy).
    """
    # Backward compatibility: accept both water_z and interface_distance
    if "water_z" in data:
        water_z = data["water_z"]
    elif "interface_distance" in data:
        water_z = data["interface_distance"]
    else:
        raise ValueError(
            "Missing 'water_z' or 'interface_distance' field in camera calibration"
        )

    return CameraCalibration(
        name=data["name"],
        intrinsics=_deserialize_camera_intrinsics(data["intrinsics"]),
        extrinsics=_deserialize_camera_extrinsics(data["extrinsics"]),
        water_z=water_z,
        is_auxiliary=data.get("is_auxiliary", False),
    )


def _serialize_interface_params(interface: InterfaceParams) -> dict[str, Any]:
    """Serialize InterfaceParams to dict."""
    return {
        "normal": _ndarray_to_list(interface.normal),
        "n_air": interface.n_air,
        "n_water": interface.n_water,
    }


def _deserialize_interface_params(data: dict[str, Any]) -> InterfaceParams:
    """Deserialize dict to InterfaceParams."""
    return InterfaceParams(
        normal=_list_to_ndarray(data["normal"]),
        n_air=data["n_air"],
        n_water=data["n_water"],
    )


def _serialize_board_config(board: BoardConfig) -> dict[str, Any]:
    """Serialize BoardConfig to dict."""
    return {
        "squares_x": board.squares_x,
        "squares_y": board.squares_y,
        "square_size": board.square_size,
        "marker_size": board.marker_size,
        "dictionary": board.dictionary,
    }


def _deserialize_board_config(data: dict[str, Any]) -> BoardConfig:
    """Deserialize dict to BoardConfig."""
    return BoardConfig(
        squares_x=data["squares_x"],
        squares_y=data["squares_y"],
        square_size=data["square_size"],
        marker_size=data["marker_size"],
        dictionary=data["dictionary"],
    )


def _serialize_diagnostics(diag: DiagnosticsData) -> dict[str, Any]:
    """Serialize DiagnosticsData to dict. Omits None fields."""
    result = {
        "reprojection_error_rms": diag.reprojection_error_rms,
        "reprojection_error_per_camera": diag.reprojection_error_per_camera,
        "validation_3d_error_mean": diag.validation_3d_error_mean,
        "validation_3d_error_std": diag.validation_3d_error_std,
    }
    # Only include optional fields if not None
    if diag.per_corner_residuals is not None:
        result["per_corner_residuals"] = _ndarray_to_list(diag.per_corner_residuals)
    if diag.per_corner_camera_labels is not None:
        result["per_corner_camera_labels"] = diag.per_corner_camera_labels
    if diag.per_frame_errors is not None:
        # Convert int keys to strings for JSON compatibility
        result["per_frame_errors"] = {
            str(k): v for k, v in diag.per_frame_errors.items()
        }
    return result


def _deserialize_diagnostics(data: dict[str, Any]) -> DiagnosticsData:
    """Deserialize dict to DiagnosticsData."""
    per_corner_residuals = None
    if "per_corner_residuals" in data:
        per_corner_residuals = _list_to_ndarray(data["per_corner_residuals"])

    per_corner_camera_labels = data.get("per_corner_camera_labels")

    per_frame_errors = None
    if "per_frame_errors" in data:
        # Convert string keys back to int
        per_frame_errors = {int(k): v for k, v in data["per_frame_errors"].items()}

    return DiagnosticsData(
        reprojection_error_rms=data["reprojection_error_rms"],
        reprojection_error_per_camera=data["reprojection_error_per_camera"],
        validation_3d_error_mean=data["validation_3d_error_mean"],
        validation_3d_error_std=data["validation_3d_error_std"],
        per_corner_residuals=per_corner_residuals,
        per_corner_camera_labels=per_corner_camera_labels,
        per_frame_errors=per_frame_errors,
    )


def _serialize_metadata(meta: CalibrationMetadata) -> dict[str, Any]:
    """Serialize CalibrationMetadata to dict."""
    return {
        "calibration_date": meta.calibration_date,
        "software_version": meta.software_version,
        "config_hash": meta.config_hash,
        "num_frames_used": meta.num_frames_used,
        "num_frames_holdout": meta.num_frames_holdout,
    }


def _deserialize_metadata(data: dict[str, Any]) -> CalibrationMetadata:
    """Deserialize dict to CalibrationMetadata."""
    return CalibrationMetadata(
        calibration_date=data["calibration_date"],
        software_version=data["software_version"],
        config_hash=data["config_hash"],
        num_frames_used=data["num_frames_used"],
        num_frames_holdout=data["num_frames_holdout"],
    )


[docs] def save_calibration(result: CalibrationResult, path: str | Path) -> None: """ Save calibration result to JSON file. Args: result: Complete calibration result to save path: Output file path (should end in .json) Raises: OSError: If file cannot be written Example: >>> save_calibration(result, "calibration.json") """ data = { "version": SERIALIZATION_VERSION, "cameras": { name: _serialize_camera_calibration(cam) for name, cam in result.cameras.items() }, "interface": _serialize_interface_params(result.interface), "board": _serialize_board_config(result.board), "diagnostics": _serialize_diagnostics(result.diagnostics), "metadata": _serialize_metadata(result.metadata), } path = Path(path) with open(path, "w") as f: json.dump(data, f, indent=2)
[docs] def load_calibration(path: str | Path) -> CalibrationResult: """ Load calibration result from JSON file. Args: path: Path to calibration JSON file Returns: CalibrationResult object Raises: FileNotFoundError: If file does not exist ValueError: If file format is invalid or version mismatch Example: >>> result = load_calibration("calibration.json") >>> print(result.diagnostics.reprojection_error_rms) """ path = Path(path) if not path.exists(): raise FileNotFoundError(f"Calibration file not found: {path}") with open(path, "r") as f: data = json.load(f) # Version check version = data.get("version") if version != SERIALIZATION_VERSION: raise ValueError( f"Unsupported calibration file version: {version}. " f"Expected: {SERIALIZATION_VERSION}" ) return CalibrationResult( cameras={ name: _deserialize_camera_calibration(cam_data) for name, cam_data in data["cameras"].items() }, interface=_deserialize_interface_params(data["interface"]), board=_deserialize_board_config(data["board"]), diagnostics=_deserialize_diagnostics(data["diagnostics"]), metadata=_deserialize_metadata(data["metadata"]), )