Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
numpy==2.1.3
opencv-python==4.10.0.84
opencv-python-headless==4.10.0.84
pillow==11.0.0
58 changes: 32 additions & 26 deletions serPy/serPy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,31 @@
import numpy as np
from datetime import datetime, timedelta
from PIL import Image
import cv2
import logging

logger = logging.getLogger(__name__)

# Mapping of color_id to OpenCV Bayer conversion codes
BAYER_CONVERSION_CODES = {
8: cv2.COLOR_BAYER_RG2RGB,
9: cv2.COLOR_BAYER_GR2RGB,
10: cv2.COLOR_BAYER_GB2RGB,
11: cv2.COLOR_BAYER_BG2RGB,
# Other Bayer patterns (e.g., CYMG) are less commonly supported in OpenCV
# These require custom processing if needed
}

def save_frame_as_png(frame, output_path, color_id, align_rgb=True):
"""
Saves a debayered and optionally RGB-aligned frame as a PNG file.

Parameters:
frame (np.ndarray): Frame array (2D Bayer-encoded or grayscale).
output_path (str): File path for the output PNG.
color_id (int): Color format ID, determines the Bayer pattern.
align_rgb (bool): Whether to perform RGB alignment.
"""
if color_id in BAYER_CONVERSION_CODES:
"""Saves a debayered and optionally RGB-aligned frame as a PNG file."""

# Import OpenCV lazily so the module can be imported without the library
# being available. This avoids requiring system packages like libGL when
# the functionality is not used (e.g. during tests).
import cv2

# Mapping of ``color_id`` to OpenCV Bayer conversion codes. Defined here so
# it is only created when ``cv2`` is available.
bayer_conversion_codes = {
8: cv2.COLOR_BAYER_RG2RGB,
9: cv2.COLOR_BAYER_GR2RGB,
10: cv2.COLOR_BAYER_GB2RGB,
11: cv2.COLOR_BAYER_BG2RGB,
}

if color_id in bayer_conversion_codes:
# Perform debayering using OpenCV
debayered_frame = cv2.cvtColor(frame, BAYER_CONVERSION_CODES[color_id])
debayered_frame = cv2.cvtColor(frame, bayer_conversion_codes[color_id])
elif color_id == 0: # MONO
# No debayering needed, use the frame directly
debayered_frame = frame
Expand All @@ -47,7 +44,9 @@ def save_frame_as_png(frame, output_path, color_id, align_rgb=True):
# Align Blue to Green
warp_matrix = np.eye(2, 3, dtype=np.float32)
try:
_, warp_matrix = cv2.findTransformECC(g, b, warp_matrix, warp_mode, criteria)
_, warp_matrix = cv2.findTransformECC(
g, b, warp_matrix, warp_mode, criteria
)
b_aligned = cv2.warpAffine(
b,
warp_matrix,
Expand All @@ -61,7 +60,9 @@ def save_frame_as_png(frame, output_path, color_id, align_rgb=True):
# Align Red to Green
warp_matrix = np.eye(2, 3, dtype=np.float32)
try:
_, warp_matrix = cv2.findTransformECC(g, r, warp_matrix, warp_mode, criteria)
_, warp_matrix = cv2.findTransformECC(
g, r, warp_matrix, warp_mode, criteria
)
r_aligned = cv2.warpAffine(
r,
warp_matrix,
Expand All @@ -86,7 +87,8 @@ def save_frame_as_png(frame, output_path, color_id, align_rgb=True):
# Save the image as PNG
image.save(output_path)
print(f"Frame saved as {output_path}")



def ser_timestamp_to_datetime(timestamp):
"""
Converts an SER timestamp (8-byte integer) to a readable date-time string.
Expand All @@ -109,6 +111,7 @@ def ser_timestamp_to_datetime(timestamp):
# Return as a readable string
return readable_datetime.isoformat()


def write_ser(output_path, metadata, frames, timestamps=None):
"""
Writes an SER file with the given metadata, frames, and optional timestamps.
Expand Down Expand Up @@ -168,7 +171,9 @@ def write_ser(output_path, metadata, frames, timestamps=None):
# Write the frames
for frame in frames:
if frame.shape != (frame_height, frame_width):
raise ValueError(f"Frame dimensions {frame.shape} do not match metadata ({frame_height}, {frame_width}).")
raise ValueError(
f"Frame dimensions {frame.shape} do not match metadata ({frame_height}, {frame_width})."
)
ser_file.write(frame.astype(dtype).tobytes())

# Write the timestamps (if provided)
Expand All @@ -180,6 +185,7 @@ def write_ser(output_path, metadata, frames, timestamps=None):

print(f"SER file written successfully to {output_path}")


def read_ser(input_path):
"""
Reads an SER file with the updated header structure.
Expand Down Expand Up @@ -214,7 +220,7 @@ def read_ser(input_path):
instrument,
telescope,
date_time,
date_time_utc
date_time_utc,
) = struct.unpack(header_format, header)

# Validate the file
Expand Down