Source code for vision3d.datasets.kitti

"""`KITTI 3D Object Detection <http://www.cvlibs.net/datasets/kitti/eval_object.php?obj_benchmark=3d>`_ Dataset."""

import csv
import io
import os
import urllib.request
import zipfile
from collections.abc import Iterable
from pathlib import Path
from typing import Any, ClassVar, override

import numpy as np
import torch
from torch import Tensor
from torch.utils.data import Dataset
from torchvision.datasets.utils import download_and_extract_archive
from torchvision.io import ImageReadMode, decode_image

from vision3d.datasets import FusionInputs, SampleTargets
from vision3d.tensors import (
    BoundingBox3DFormat,
    BoundingBoxes3D,
    CameraExtrinsics,
    CameraImages,
    CameraIntrinsics,
    PointCloud3D,
)


[docs] class Kitti3D(Dataset[tuple[FusionInputs, SampleTargets | None]]): """`KITTI 3D <http://www.cvlibs.net/datasets/kitti/eval_object.php?obj_benchmark=3d>`_ Dataset. Returns samples in **lidar frame** (X-forward, Y-left, Z-up), converting from KITTI's camera-frame annotations automatically. Args: root (str or ``pathlib.Path``): Root directory where data is downloaded to. Expects the following folder structure if download=False: .. code:: <root> └── Kitti3D/ └── raw/ ├── training/ | ├── velodyne/ | ├── label_2/ | ├── calib/ | └── image_2/ └── testing/ ├── velodyne/ ├── calib/ └── image_2/ train (bool, optional): Use ``train`` split if true, else ``test`` split. Defaults to ``True``. transforms (Callable, optional): A function/transform that takes input sample and its target as entry and returns a transformed version. download (bool, optional): If true, downloads the dataset from the internet and puts it in root directory. If dataset is already downloaded, it is not downloaded again. mini (bool, optional): If true, downloading fetches only the frames named in ``frames`` via HTTP range requests against the upstream archives instead of the full dataset. Has no effect when ``download`` is ``False``. frames: Frame indices to retrieve when ``mini`` is true. Each ``i`` is formatted as ``f"{i:06d}"`` and pulled from the requested split; the iterable may be contiguous (``range(10)``), strided (``range(0, 7481, 700)``), or scattered (``[0, 23, 412]``). Defaults to ``range(10)``. """ data_url: ClassVar[str] = "https://s3.eu-central-1.amazonaws.com/avg-kitti/" resources: ClassVar[list[str]] = [ "data_object_velodyne.zip", "data_object_image_2.zip", "data_object_label_2.zip", "data_object_calib.zip", ] velodyne_dir_name: ClassVar[str] = "velodyne" image_dir_name: ClassVar[str] = "image_2" labels_dir_name: ClassVar[str] = "label_2" calib_dir_name: ClassVar[str] = "calib" # Single forward-facing left color camera ("P2" in KITTI's calibration). camera_names: ClassVar[tuple[str, ...]] = ("CAM_2",) camera_grid: ClassVar[tuple[tuple[int, ...], ...] | None] = None classes: ClassVar[tuple[str, ...]] = ( "Car", "Pedestrian", "Cyclist", "Van", "Truck", "Person_sitting", "Tram", "Misc", ) class_to_idx: ClassVar[dict[str, int]] = {name: i for i, name in enumerate(classes)} def __init__( self, root: str | os.PathLike[str], train: bool = True, transforms: Any | None = None, download: bool = False, mini: bool = False, frames: Iterable[int] = range(10), ) -> None: self.root = Path(root) self.train = train self.transforms = transforms self.mini = mini self.frames = tuple(frames) self._location = "training" if train else "testing" if download: if mini: self._download_mini(self.frames) else: self.download() if not self._check_exists(): raise RuntimeError( "Dataset not found. You may use download=True to download it." ) velodyne_dir = self._raw_folder / self._location / self.velodyne_dir_name self._frame_ids = sorted( p.stem for p in velodyne_dir.iterdir() if p.suffix == ".bin" ) def __len__(self) -> int: """Return the number of frames.""" return len(self._frame_ids) @override def __getitem__(self, index: int) -> tuple[FusionInputs, SampleTargets | None]: """Load a single frame. Args: index (int): Index. Returns: Tuple of ``(inputs, targets)``. **inputs** is a dict with keys: - ``"points"``: :class:`PointCloud3D` in lidar frame ``[N, 4]`` (x, y, z, intensity). - ``"images"``: :class:`CameraImages` ``[1, 3, H, W]`` (left camera). - ``"extrinsics"``: :class:`CameraExtrinsics` ``[1, 4, 4]`` (lidar-to-camera). - ``"intrinsics"``: :class:`CameraIntrinsics` ``[1, 3, 3]``. **targets** is a dict (training) or None (testing) with keys: - ``"boxes"``: :class:`BoundingBoxes3D` in lidar frame, format ``XYZLWHY``. - ``"labels"``: :class:`~torch.Tensor` of class indices. """ frame_id = self._frame_ids[index] base = self._raw_folder / self._location points = self._load_velodyne(base, frame_id) calib = self._load_calib(base, frame_id) image = self._load_image(base, frame_id) # Filter points to camera FOV using K @ extrinsics[:3, :] img_h, img_w = image.shape[2], image.shape[3] K = calib["intrinsics"][0] # [3, 3] ext = calib["extrinsics"][0] # [4, 4] lidar_to_img = K @ ext[:3, :] # [3, 4] fov_mask = _get_fov_mask(points[:, :3], lidar_to_img, img_h, img_w) points = points[fov_mask] inputs: FusionInputs = { "points": PointCloud3D(points), "images": CameraImages(image), "extrinsics": CameraExtrinsics(calib["extrinsics"]), "intrinsics": CameraIntrinsics( calib["intrinsics"], image_size=(img_h, img_w) ), } targets = None if self.train: targets = self._load_targets(base, frame_id, calib) if self.transforms is not None: inputs, targets = self.transforms(inputs, targets) return inputs, targets @property def _raw_folder(self) -> Path: return self.root / self.__class__.__name__ / "raw" def _check_exists(self) -> bool: folders = [self.velodyne_dir_name, self.calib_dir_name] if self.train: folders.append(self.labels_dir_name) return all((self._raw_folder / self._location / d).is_dir() for d in folders)
[docs] def download(self) -> None: """Download the KITTI dataset if it doesn't exist already.""" if self._check_exists(): return self._raw_folder.mkdir(parents=True, exist_ok=True) for fname in self.resources: download_and_extract_archive( url=f"{self.data_url}{fname}", download_root=str(self._raw_folder), filename=fname, )
def _download_mini(self, frames: tuple[int, ...]) -> None: """Extract the named frames via HTTP range requests. Args: frames: Frame indices to extract from the requested split. Each ``i`` resolves to member ``{i:06d}.<ext>``. Raises: ValueError: If ``frames`` is empty. """ if not frames: msg = "frames must be non-empty" raise ValueError(msg) if self._check_exists(): return location_dir = self._raw_folder / self._location location_dir.mkdir(parents=True, exist_ok=True) frame_ids = [f"{i:06d}" for i in frames] # (zip filename, member subdirectory, file extension). The label_2 # archive only contains the training split. archives = [ ("data_object_velodyne.zip", self.velodyne_dir_name, "bin"), ("data_object_image_2.zip", self.image_dir_name, "png"), ("data_object_calib.zip", self.calib_dir_name, "txt"), ] if self.train: archives.append(("data_object_label_2.zip", self.labels_dir_name, "txt")) for fname, subdir, ext in archives: members = [f"{self._location}/{subdir}/{fid}.{ext}" for fid in frame_ids] _extract_remote_zip_members( url=f"{self.data_url}{fname}", members=members, dest=self._raw_folder, ) def _load_velodyne(self, base: Path, frame_id: str) -> Tensor: path = base / self.velodyne_dir_name / f"{frame_id}.bin" points = np.fromfile(path, dtype=np.float32).reshape(-1, 4) return torch.from_numpy(points) def _load_image(self, base: Path, frame_id: str) -> Tensor: path = base / self.image_dir_name / f"{frame_id}.png" if path.exists(): img = decode_image(str(path), mode=ImageReadMode.RGB) # [3, H, W] uint8 return img.unsqueeze(0).float() / 255.0 return torch.zeros(1, 3, 1, 1) def _load_calib(self, base: Path, frame_id: str) -> dict[str, Tensor]: """Parse KITTI calibration file. Returns: Dict with ``"extrinsics"`` (lidar-to-camera, ``[1, 4, 4]``) and ``"intrinsics"`` (camera P2 projection, ``[1, 3, 3]``). """ path = base / self.calib_dir_name / f"{frame_id}.txt" calib_data: dict[str, np.ndarray] = {} with path.open() as f: for line in f: if ":" not in line: continue key, value = line.split(":", 1) calib_data[key.strip()] = np.array( [float(x) for x in value.split()], dtype=np.float32 ) # P2: 3x4 projection matrix for left color camera p2 = calib_data["P2"].reshape(3, 4) K = p2[:, :3] intrinsics = torch.from_numpy(K).unsqueeze(0) # [1, 3, 3] # R0_rect: 3x3 rectification rotation r0 = np.eye(4, dtype=np.float32) r0[:3, :3] = calib_data["R0_rect"].reshape(3, 3) # Tr_velo_to_cam: 3x4 velodyne-to-camera velo_to_cam = np.eye(4, dtype=np.float32) velo_to_cam[:3, :] = calib_data["Tr_velo_to_cam"].reshape(3, 4) # P2's 4th column encodes a stereo baseline offset in camera frame. # Fold it into the extrinsic so that K @ extrinsics[:3, :] gives # exact pixel-accurate projection (matching P2 @ R0 @ Tr_velo_to_cam). baseline = np.eye(4, dtype=np.float32) baseline[:3, 3] = np.linalg.solve(K, p2[:, 3]) extrinsics = torch.from_numpy(baseline @ r0 @ velo_to_cam).unsqueeze( 0 ) # [1, 4, 4] return {"extrinsics": extrinsics, "intrinsics": intrinsics} def _load_targets( self, base: Path, frame_id: str, calib: dict[str, Tensor], ) -> SampleTargets: """Parse KITTI label file and convert to lidar frame. Returns: Dict with ``"boxes"`` (:class:`BoundingBoxes3D`, XYZLWHY format), ``"labels"`` (int tensor). """ path = base / self.labels_dir_name / f"{frame_id}.txt" label_ids: list[int] = [] boxes_cam: list[list[float]] = [] with path.open() as f: for line in csv.reader(f, delimiter=" "): if not line or line[0] == "DontCare": continue label_ids.append(self.class_to_idx.get(line[0], -1)) # KITTI label: h, w, l, x, y, z, rotation_y (camera frame) h, w, l = float(line[8]), float(line[9]), float(line[10]) x, y, z = float(line[11]), float(line[12]), float(line[13]) ry = float(line[14]) boxes_cam.append([x, y, z, h, w, l, ry]) if not boxes_cam: return { "boxes": BoundingBoxes3D( torch.zeros(0, 7), format=BoundingBox3DFormat.XYZLWHY ), "labels": torch.zeros(0, dtype=torch.int64), } boxes_cam_t = torch.tensor(boxes_cam, dtype=torch.float32) boxes_lidar = _cam_to_lidar_boxes(boxes_cam_t, calib["extrinsics"][0]) return { "boxes": BoundingBoxes3D(boxes_lidar, format=BoundingBox3DFormat.XYZLWHY), "labels": torch.tensor(label_ids, dtype=torch.int64), }
def _cam_to_lidar_boxes(boxes_cam: Tensor, extrinsics: Tensor) -> Tensor: """Convert KITTI camera-frame boxes to lidar-frame XYZLWHY format. Args: boxes_cam: ``[N, 7]`` boxes as ``(x, y, z, h, w, l, ry)`` in camera frame. extrinsics: ``[4, 4]`` lidar-to-camera matrix. Returns: ``[N, 7]`` boxes as ``(cx, cy, cz, l, w, h, yaw)`` in lidar frame. """ x_cam, y_cam, z_cam = boxes_cam[:, 0], boxes_cam[:, 1], boxes_cam[:, 2] h, w, l = boxes_cam[:, 3], boxes_cam[:, 4], boxes_cam[:, 5] ry = boxes_cam[:, 6] # KITTI location is at the bottom center of the box in camera frame. # Camera Y points down, so shift up by half height to get the geometric center. y_cam = y_cam - h / 2 # Transform center from camera to lidar using inverse extrinsics ones = torch.ones_like(x_cam) centers_cam = torch.stack([x_cam, y_cam, z_cam, ones], dim=-1) # [N, 4] cam_to_lidar = torch.linalg.inv(extrinsics) centers_lidar = (cam_to_lidar @ centers_cam.T).T[:, :3] # [N, 3] # Dimensions: KITTI h,w,l -> our l,w,h (dx,dy,dz in lidar frame) l_lidar = l # camera Z -> lidar X w_lidar = w # camera X -> lidar Y h_lidar = h # camera Y -> lidar Z # Rotation: KITTI rotation_y is around camera Y (pointing down) # In lidar frame, yaw is around Z (pointing up) yaw = -ry - np.pi / 2 return torch.stack( [ centers_lidar[:, 0], centers_lidar[:, 1], centers_lidar[:, 2], l_lidar, w_lidar, h_lidar, yaw, ], dim=-1, ) def _get_fov_mask( points_3d: Tensor, proj_matrix: Tensor, img_h: int, img_w: int, ) -> Tensor: """Get boolean mask for points that project into the camera image. Args: points_3d: ``[N, 3]`` 3D points. proj_matrix: ``[3, 4]`` projection matrix that maps ``points_3d`` to image coordinates (e.g. ``P2`` for camera-frame points, or ``P2 @ R0 @ Tr`` for lidar-frame points). img_h: Image height in pixels. img_w: Image width in pixels. Returns: Boolean mask ``[N]``. True for points with positive depth that project within image bounds. """ n = points_3d.shape[0] ones = torch.ones(n, 1, dtype=points_3d.dtype) pts_hom = torch.cat([points_3d, ones], dim=1) # [N, 4] # Project: [3, 4] @ [4, N] -> [3, N] pts_img = (proj_matrix @ pts_hom.T).T # [N, 3] depth = pts_img[:, 2] u = pts_img[:, 0] / depth.clamp(min=1e-6) v = pts_img[:, 1] / depth.clamp(min=1e-6) valid = (depth > 0) & (u >= 0) & (u < img_w) & (v >= 0) & (v < img_h) return valid class _HttpRangeFile(io.RawIOBase): """Read-only seekable file-like backed by HTTP ``Range`` requests. Lets :class:`zipfile.ZipFile` open a remote archive without downloading it in full. This is required for the KITTI mini split, where the upstream zips are tens of gigabytes but only a handful of stored (uncompressed) members are needed. """ def __init__(self, url: str) -> None: super().__init__() self._url = url self._pos = 0 with urllib.request.urlopen( urllib.request.Request(url, method="HEAD") ) as response: length = response.headers.get("Content-Length") if length is None: msg = f"server did not return Content-Length for {url!r}" raise OSError(msg) self._size = int(length) @override def readable(self) -> bool: return True @override def seekable(self) -> bool: return True @override def seek(self, offset: int, whence: int = io.SEEK_SET) -> int: if whence == io.SEEK_SET: self._pos = offset elif whence == io.SEEK_CUR: self._pos += offset elif whence == io.SEEK_END: self._pos = self._size + offset else: msg = f"invalid whence: {whence}" raise ValueError(msg) return self._pos @override def read(self, size: int = -1) -> bytes: if size is None or size < 0: size = self._size - self._pos if size <= 0 or self._pos >= self._size: return b"" end = min(self._pos + size, self._size) - 1 req = urllib.request.Request( self._url, headers={"Range": f"bytes={self._pos}-{end}"} ) with urllib.request.urlopen(req) as response: data = response.read() self._pos += len(data) return data def _extract_remote_zip_members( url: str, members: list[str], dest: Path, ) -> None: """Extract specific members from a remote zip via HTTP range requests. Args: url: HTTP(S) URL of the zip archive. The server must respond to ``Range`` requests. members: Member names to extract, as they appear in the zip's central directory. Each is written to ``dest / member``. A :class:`KeyError` propagates from :mod:`zipfile` if any member is missing. dest: Destination root directory. Member paths are joined relative to it. """ remote = _HttpRangeFile(url) with zipfile.ZipFile(remote) as zf: for member in members: dst = dest / member dst.parent.mkdir(parents=True, exist_ok=True) with zf.open(member) as src, dst.open("wb") as out: out.write(src.read())