Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12", "3.13"]
python-version: ["3.10", "3.11", "3.12", "3.13"]

steps:
- uses: actions/checkout@v6
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ authors = [
]
license = {text = "MIT"}
readme = "README.md"
requires-python = ">=3.8"
requires-python = ">=3.10"
dependencies = [
"av",
"av>=14.1", # VideoFrame.rotation was added in 14.1
"numpy",
]

Expand Down
84 changes: 60 additions & 24 deletions simple_video_utils/frames.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,35 @@
from typing import BinaryIO, Generator, Optional, Tuple
from collections.abc import Generator
from typing import BinaryIO

import av
import numpy as np

from simple_video_utils.metadata import VideoMetadata, _open_container, video_metadata_from_container


def _frame_to_rgb(frame: av.VideoFrame) -> np.ndarray:
"""
Convert a frame to an RGB array in display orientation.

PyAV decodes frames in their stored orientation and does not apply the
container's display-matrix rotation (unlike the ffmpeg CLI, which
autorotates). Phone-recorded videos commonly store landscape frames with
a 90° rotation tag, so we apply it here.
"""
array = frame.to_ndarray(format='rgb24')
rotation = frame.rotation % 360
if rotation and rotation % 90 == 0:
# rotation=90 with k=1 (counterclockwise) matches ffmpeg autorotate pixel-exactly.
# np.rot90 returns a non-contiguous view, which consumers like MediaPipe
# and OpenCV reject — copy to a contiguous array.
array = np.ascontiguousarray(np.rot90(array, k=rotation // 90))
return array


def _generate_frames(
container: av.container.InputContainer,
skip_frames: int = 0,
max_frames: Optional[int] = None,
max_frames: int | None = None,
) -> Generator[np.ndarray, None, None]:
"""
Generate RGB frames from a container's current position.
Expand All @@ -23,7 +43,7 @@ def _generate_frames(
max_frames: Maximum number of frames to yield, or None for all remaining.

Yields:
RGB numpy arrays (H, W, 3) for frames after skipping.
RGB numpy arrays (H, W, 3) in display orientation for frames after skipping.
"""
frames_decoded = 0
frames_yielded = 0
Expand All @@ -33,19 +53,19 @@ def _generate_frames(
frames_decoded += 1
continue

yield frame.to_ndarray(format='rgb24')
yield _frame_to_rgb(frame)
frames_yielded += 1

if max_frames is not None and frames_yielded >= max_frames:
break
frames_decoded += 1

def _validate_parameters(
start_frame: Optional[int],
end_frame: Optional[int],
start_time: Optional[float],
end_time: Optional[float],
) -> Tuple[bool, bool]:
start_frame: int | None,
end_frame: int | None,
start_time: float | None,
end_time: float | None,
) -> tuple[bool, bool]:
"""Validate that time and frame parameters aren't mixed."""
has_frame_params = start_frame is not None or end_frame is not None
has_time_params = start_time is not None or end_time is not None
Expand All @@ -58,10 +78,10 @@ def _validate_parameters(


def _convert_time_to_frames(
start_time: Optional[float],
end_time: Optional[float],
start_time: float | None,
end_time: float | None,
fps: float,
) -> Tuple[int, Optional[int]]:
) -> tuple[int, int | None]:
"""Convert time-based parameters to frame indices."""
start = int((start_time or 0.0) * fps)
end = int(end_time * fps) if end_time is not None else None
Expand All @@ -74,9 +94,9 @@ def _convert_time_to_frames(


def _normalize_frame_range(
start_frame: Optional[int],
end_frame: Optional[int],
) -> Tuple[int, Optional[int]]:
start_frame: int | None,
end_frame: int | None,
) -> tuple[int, int | None]:
"""Normalize frame parameters with defaults and validation."""
start = start_frame if start_frame is not None else 0

Expand Down Expand Up @@ -122,10 +142,10 @@ def _calculate_seek_position(

def read_frames_exact(
src: str,
start_frame: Optional[int] = None,
end_frame: Optional[int] = None,
start_time: Optional[float] = None,
end_time: Optional[float] = None,
start_frame: int | None = None,
end_frame: int | None = None,
start_time: float | None = None,
end_time: float | None = None,
thread_type: str = "AUTO",
) -> Generator[np.ndarray, None, None]:
"""
Expand Down Expand Up @@ -195,7 +215,7 @@ def read_frames_from_stream(
skip_frames: int = 0,
thread_type: str = "AUTO",
buffer_size: int = 32768, # PyAV default buffer size, can be reduced for lower latency when realtime streaming
) -> Tuple[VideoMetadata, Generator[np.ndarray, None, None]]:
) -> tuple[VideoMetadata, Generator[np.ndarray, None, None]]:
"""
Read frames from a video stream (file-like object).

Expand All @@ -217,13 +237,29 @@ def read_frames_from_stream(
seeking (MP4 with moov at end), the stream must be fully available.
"""
container = av.open(stream, mode='r', buffer_size=buffer_size)
for s in container.streams.video:
s.thread_type = thread_type
meta = video_metadata_from_container(container)
try:
for s in container.streams.video:
s.thread_type = thread_type

# The display-matrix rotation is only exposed per-frame, and the stream may
# not be seekable (e.g. a pipe) — so decode the first frame eagerly for the
# metadata and hand it back through the generator.
first_frame = next(container.decode(video=0), None)
rotation = first_frame.rotation if first_frame is not None else 0
meta = video_metadata_from_container(container, rotation=rotation)
except Exception:
container.close()
raise

Comment thread
coderabbitai[bot] marked this conversation as resolved.
def frame_generator() -> Generator[np.ndarray, None, None]:
try:
yield from _generate_frames(container, skip_frames=skip_frames, max_frames=None)
remaining_skip = skip_frames
if first_frame is not None:
if remaining_skip == 0:
yield _frame_to_rgb(first_frame)
else:
remaining_skip -= 1
yield from _generate_frames(container, skip_frames=remaining_skip, max_frames=None)
finally:
container.close()

Expand Down
59 changes: 50 additions & 9 deletions simple_video_utils/metadata.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import io
from contextlib import contextmanager
from functools import lru_cache
from typing import NamedTuple, Optional, Union
from typing import NamedTuple

import av

Expand All @@ -10,13 +10,14 @@ class VideoMetadata(NamedTuple):
width: int
height: int
fps: float
nb_frames: Optional[int]
time_base: Optional[str]
duration: Optional[float] # seconds; None if the container header doesn't carry one
nb_frames: int | None
time_base: str | None
duration: float | None # seconds; None if the container header doesn't carry one
rotation: int = 0 # display-matrix rotation in degrees; width/height already account for it


@contextmanager
def _open_container(source: Union[str, io.BytesIO]):
def _open_container(source: str | io.BytesIO):
"""Context manager for safely opening and closing PyAV containers."""
container = None
try:
Expand All @@ -30,8 +31,39 @@ def _open_container(source: Union[str, io.BytesIO]):
container.close()


def video_metadata_from_container(container: av.container.InputContainer) -> VideoMetadata:
"""Extract metadata from an open PyAV container."""
def _probe_rotation(container: av.container.InputContainer) -> int:
"""
Read the display-matrix rotation by decoding the first frame, then rewind.

PyAV only exposes the rotation per-frame (``VideoFrame.rotation``), not on
the stream. Requires a seekable container; returns 0 if the video can't be
decoded.
"""
try:
frame = next(container.decode(video=0), None)
rotation = frame.rotation if frame is not None else 0
except (av.FFmpegError, OSError):
rotation = 0
container.seek(0)
return rotation


def video_metadata_from_container(
container: av.container.InputContainer,
rotation: int | None = None,
) -> VideoMetadata:
"""
Extract metadata from an open PyAV container.

Width/height are reported in display orientation (rotation applied),
matching the frames yielded by the frames module.

Args:
container: Open PyAV container.
rotation: Display rotation in degrees if already known (e.g. from a
decoded frame). When None, it is probed by decoding the first
frame and rewinding — pass it explicitly for non-seekable input.
"""
stream = container.streams.video[0]
fps = float(stream.average_rate) if stream.average_rate else 0.0
nb_frames = stream.frames if stream.frames > 0 else None
Expand All @@ -50,13 +82,22 @@ def video_metadata_from_container(container: av.container.InputContainer) -> Vid
else:
duration = None

if rotation is None:
rotation = _probe_rotation(container)
rotation %= 360

width, height = stream.width, stream.height
if rotation % 180 == 90:
width, height = height, width

return VideoMetadata(
width=stream.width,
height=stream.height,
width=width,
height=height,
fps=fps,
nb_frames=nb_frames,
time_base=time_base,
duration=duration,
rotation=rotation,
)


Expand Down
Binary file added tests/assets/rotated90.mp4
Binary file not shown.
14 changes: 7 additions & 7 deletions tests/test_frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def test_sequential_vs_range_reading(self, video_path):

assert len(range_frames) == len(individual_frames) == 3

for range_frame, individual_frame in zip(range_frames, individual_frames):
for range_frame, individual_frame in zip(range_frames, individual_frames, strict=False):
np.testing.assert_array_equal(range_frame, individual_frame)

def test_frames_are_different(self, video_path):
Expand Down Expand Up @@ -165,7 +165,7 @@ def test_end_frame_none_consistency(self, video_path):
assert len(frames1) == len(frames2)

# Frames should be identical
for f1, f2 in zip(frames1, frames2):
for f1, f2 in zip(frames1, frames2, strict=False):
np.testing.assert_array_equal(f1, f2)

def test_end_frame_none_vs_explicit_end(self, video_path):
Expand Down Expand Up @@ -253,7 +253,7 @@ def test_time_vs_frame_equivalence(self, video_path):
assert len(frames_by_index) == len(frames_by_time)

# Frames should be identical
for i, (frame_idx, frame_time) in enumerate(zip(frames_by_index, frames_by_time)):
for i, (frame_idx, frame_time) in enumerate(zip(frames_by_index, frames_by_time, strict=False)):
np.testing.assert_array_equal(
frame_idx,
frame_time,
Expand Down Expand Up @@ -297,7 +297,7 @@ def test_no_parameters_reads_all(self, video_path):

# Should produce same result
assert len(frames_no_params) == len(frames_explicit)
for f1, f2 in zip(frames_no_params, frames_explicit):
for f1, f2 in zip(frames_no_params, frames_explicit, strict=False):
np.testing.assert_array_equal(f1, f2)

def test_time_vs_frame_seeking_precision_remote(self):
Expand Down Expand Up @@ -336,7 +336,7 @@ def test_time_vs_frame_seeking_precision_remote(self):
)

# Every frame should be identical
for i, (frame_time, frame_idx) in enumerate(zip(frames_by_time, frames_by_frame)):
for i, (frame_time, frame_idx) in enumerate(zip(frames_by_time, frames_by_frame, strict=False)):
actual_frame_num = start_frame_idx + i
np.testing.assert_array_equal(
frame_time,
Expand Down Expand Up @@ -410,7 +410,7 @@ def test_read_frames_from_stream_all_frames(self, video_bytes, video_path):
assert len(stream_frames) == len(file_frames)

# Frames should be identical
for i, (stream_frame, file_frame) in enumerate(zip(stream_frames, file_frames)):
for i, (stream_frame, file_frame) in enumerate(zip(stream_frames, file_frames, strict=False)):
np.testing.assert_array_equal(
stream_frame,
file_frame,
Expand All @@ -430,7 +430,7 @@ def test_read_frames_from_stream_skip_frames(self, video_bytes, video_path):

assert len(stream_frames) == len(file_frames)

for i, (stream_frame, file_frame) in enumerate(zip(stream_frames, file_frames)):
for i, (stream_frame, file_frame) in enumerate(zip(stream_frames, file_frames, strict=False)):
np.testing.assert_array_equal(
stream_frame,
file_frame,
Expand Down
13 changes: 7 additions & 6 deletions tests/test_regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

import json
import subprocess
from collections.abc import Generator
from functools import lru_cache
from pathlib import Path
from typing import Generator, NamedTuple, Optional
from typing import NamedTuple

import numpy as np
import pytest
Expand All @@ -17,8 +18,8 @@ class VideoMetadata(NamedTuple):
width: int
height: int
fps: float
nb_frames: Optional[int]
time_base: Optional[str]
nb_frames: int | None
time_base: str | None


@lru_cache(maxsize=8)
Expand Down Expand Up @@ -53,7 +54,7 @@ def ffprobe(url_or_path: str) -> VideoMetadata:
def ffmpeg_read_frames_exact( # noqa: C901
src: str,
start_frame: int,
end_frame: Optional[int] = None,
end_frame: int | None = None,
) -> Generator[np.ndarray, None, None]:
"""
Return frames [start_frame, end_frame] inclusive as RGB np.ndarrays using ffmpeg.
Expand Down Expand Up @@ -194,7 +195,7 @@ def test_frames_match_ffmpeg_from_start(self, video_path):
)

# Every frame should be identical (pixel-perfect)
for i, (pyav_frame, ffmpeg_frame) in enumerate(zip(pyav_frames, ffmpeg_frames)):
for i, (pyav_frame, ffmpeg_frame) in enumerate(zip(pyav_frames, ffmpeg_frames, strict=False)):
np.testing.assert_array_equal(
pyav_frame,
ffmpeg_frame,
Expand Down Expand Up @@ -265,7 +266,7 @@ def test_frames_match_ffmpeg_time_based(self, video_path):
)

# Every frame should be identical
for i, (pyav_frame, ffmpeg_frame) in enumerate(zip(pyav_frames, ffmpeg_frames)):
for i, (pyav_frame, ffmpeg_frame) in enumerate(zip(pyav_frames, ffmpeg_frames, strict=False)):
actual_frame_num = start_frame + i
np.testing.assert_array_equal(
pyav_frame,
Expand Down
Loading