From a5b2aa51d101d5fd4fd90e29ffbc8e5eda91e38c Mon Sep 17 00:00:00 2001 From: Teque5 Date: Mon, 17 Nov 2025 15:05:46 -0800 Subject: [PATCH 01/13] Platinum BLUE converter * Initial code by KelseyCreekSoftware * Simplified install: drop scipy optional dependency and [apps] entirely * add documentation for converters * update converter entry points * homologate API for wav & blue converter * add support for metadata-only (0 sample) BLUE files * simplify endian detection * handle duplicate keys in extended header * allow converters to --archive (.sigmf) or create SigMF pairs (.sigmf-data & .sigmf-meta) * add tests for blue (.cdif) and wav (.wav) converters --- .github/workflows/main.yml | 2 +- .gitignore | 1 + .readthedocs.yaml | 1 - docs/source/api.rst | 3 +- docs/source/converters.rst | 93 ++++ docs/source/index.rst | 1 + pyproject.toml | 8 +- sigmf/apps/convert_wav.py | 103 ---- sigmf/{apps => convert}/__init__.py | 0 sigmf/convert/blue.py | 719 ++++++++++++++++++++++++++++ sigmf/convert/wav.py | 139 ++++++ sigmf/error.py | 4 + tests/test_convert.py | 124 +++++ tests/testdata.py | 10 + 14 files changed, 1097 insertions(+), 111 deletions(-) create mode 100644 docs/source/converters.rst delete mode 100755 sigmf/apps/convert_wav.py rename sigmf/{apps => convert}/__init__.py (100%) create mode 100644 sigmf/convert/blue.py create mode 100755 sigmf/convert/wav.py create mode 100644 tests/test_convert.py diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index e688093..8aaf0e5 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -22,7 +22,7 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install .[test,apps] + pip install .[test] - name: Test with pytest run: | coverage run diff --git a/.gitignore b/.gitignore index 4cb6761..d496d21 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ __pycache__/ *.swp *.py[cod] .cache +.vscode # packaging related dist/ diff --git a/.readthedocs.yaml b/.readthedocs.yaml index 9ee2227..e0fcec7 100644 --- a/.readthedocs.yaml +++ b/.readthedocs.yaml @@ -18,7 +18,6 @@ python: path: . extra_requirements: - test - - apps - requirements: docs/requirements.txt # Build documentation in the "docs/" directory with Sphinx diff --git a/docs/source/api.rst b/docs/source/api.rst index 2c3bddb..62d166a 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -7,9 +7,10 @@ SigMF API :template: custom-module-template.rst :recursive: - sigmf.apps.convert_wav sigmf.archive sigmf.archivereader + sigmf.convert.blue + sigmf.convert.wav sigmf.error sigmf.schema sigmf.sigmf_hash diff --git a/docs/source/converters.rst b/docs/source/converters.rst new file mode 100644 index 0000000..76e09d9 --- /dev/null +++ b/docs/source/converters.rst @@ -0,0 +1,93 @@ +================= +Format Converters +================= + +The SigMF Python library includes converters to import data from various file formats into SigMF format. +These converters make it easy to migrate existing RF recordings to the standardized SigMF format while preserving metadata when possible. + +Overview +-------- + +Converters are available for: + +* **BLUE files** - MIDAS Blue and Platinum BLUE RF recordings (``.cdif``) +* **WAV files** - Audio recordings (``.wav``) + +All converters return a :class:`~sigmf.SigMFFile` object that can be used immediately or saved to disk. +Converters preserve datatypes and metadata where possible. + + +Command Line Usage +~~~~~~~~~~~~~~~~~~ + +Converters can be used from the command line after ``pip install sigmf``: + +.. code-block:: bash + + sigmf_convert_blue recording.cdif + sigmf_convert_wav recording.wav + +or by using module syntax: + +.. code-block:: bash + + python3 -m sigmf.convert.blue recording.cdif + python3 -m sigmf.convert.wav recording.wav + + +Output Naming +~~~~~~~~~~~~~ + +All converters treat the value passed with ``-o/--output`` as a base name and ignore any existing suffix. The tools +emit ``.sigmf-data`` and ``.sigmf-meta`` files (retaining any original extensions such as ``.cdif`` or +``.tmp`` in the base). Supplying ``--archive`` packages the result as ``.sigmf`` instead of producing separate +meta/data files. + + +BLUE Converter +-------------- + +The BLUE converter handles CDIF (.cdif) recordings while placing BLUE header information into the following global fields: + +* ``blue:fixed`` - fixed header information (at start of file) +* ``blue:adjunct`` - adjunct header information (after fixed header) +* ``blue:extended`` - extended header information (at end of file) +* ``blue:keywords`` - user-defined key-value pairs + +.. autofunction:: sigmf.convert.blue.blue_to_sigmf + + +.. code-block:: python + + from sigmf.convert.blue import blue_to_sigmf + + # read BLUE, write SigMF, and return SigMFFile object + meta = blue_to_sigmf(blue_path="recording.cdif", out_path="recording.sigmf") + + # access converted data + samples = meta.read_samples() + sample_rate_hz = meta.sample_rate + + # access BLUE-specific metadata + blue_type = meta.get_global_field("blue:fixed")["type"] # e.g., 1000 + blue_version = meta.get_global_field("blue:keywords")["IO"] # e.g., "X-Midas" + + +WAV Converter +------------- + +This is useful when working with audio datasets. + +.. autofunction:: sigmf.convert.wav.wav_to_sigmf + + +.. code-block:: python + + from sigmf.convert.wav import wav_to_sigmf + + # read WAV, write SigMF, and return SigMFFile object + meta = wav_to_sigmf(wav_path="recording.wav", out_path="recording.sigmf") + + # access converted data + samples = meta.read_samples() + sample_rate_hz = meta.sample_rate \ No newline at end of file diff --git a/docs/source/index.rst b/docs/source/index.rst index f845252..9d4a6ab 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -23,6 +23,7 @@ To get started, see the :doc:`quickstart` section or learn how to :ref:`install` quickstart advanced + converters developers .. toctree:: diff --git a/pyproject.toml b/pyproject.toml index cac61a3..dc950af 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,7 +33,8 @@ dependencies = [ [project.scripts] sigmf_validate = "sigmf.validate:main" - sigmf_convert_wav = "sigmf.apps.convert_wav:main [apps]" + sigmf_convert_wav = "sigmf.convert.wav:main" + sigmf_convert_blue = "sigmf.convert.blue:main" [project.optional-dependencies] test = [ "pylint", @@ -41,9 +42,6 @@ dependencies = [ "pytest-cov", "hypothesis", # next-gen testing framework ] - apps = [ - "scipy", # for wav i/o - ] [tool.setuptools] packages = ["sigmf"] @@ -106,6 +104,6 @@ legacy_tox_ini = ''' [testenv] usedevelop = True - deps = .[test,apps] + deps = .[test] commands = coverage run ''' diff --git a/sigmf/apps/convert_wav.py b/sigmf/apps/convert_wav.py deleted file mode 100755 index c2f1f2e..0000000 --- a/sigmf/apps/convert_wav.py +++ /dev/null @@ -1,103 +0,0 @@ -# Copyright: Multiple Authors -# -# This file is part of sigmf-python. https://github.com/sigmf/sigmf-python -# -# SPDX-License-Identifier: LGPL-3.0-or-later - -"""converter for wav containers""" - -import argparse -import getpass -import logging -import tempfile -from datetime import datetime, timezone -from os import PathLike -from pathlib import Path -from typing import Optional - -from scipy.io import wavfile - -from .. import SigMFFile -from .. import __version__ as toolversion -from ..sigmffile import get_sigmf_filenames -from ..utils import SIGMF_DATETIME_ISO8601_FMT, get_data_type_str - -log = logging.getLogger() - - -def convert_wav( - wav_path: str, - out_path: Optional[str] = None, - author: Optional[str] = None, -) -> PathLike: - """ - Read a wav and write a sigmf archive. - """ - wav_path = Path(wav_path) - wav_stem = wav_path.stem - samp_rate, wav_data = wavfile.read(wav_path) - - global_info = { - SigMFFile.AUTHOR_KEY: getpass.getuser() if author is None else author, - SigMFFile.DATATYPE_KEY: get_data_type_str(wav_data), - SigMFFile.DESCRIPTION_KEY: f"converted from {wav_path.name}", - SigMFFile.NUM_CHANNELS_KEY: 1 if len(wav_data.shape) < 2 else wav_data.shape[1], - SigMFFile.RECORDER_KEY: "Official SigMF wav converter", - SigMFFile.SAMPLE_RATE_KEY: samp_rate, - } - - modify_time = wav_path.lstat().st_mtime - wav_datetime = datetime.fromtimestamp(modify_time, tz=timezone.utc) - - capture_info = { - SigMFFile.START_INDEX_KEY: 0, - SigMFFile.DATETIME_KEY: wav_datetime.strftime(SIGMF_DATETIME_ISO8601_FMT), - } - - temp_dir = Path(tempfile.mkdtemp()) - if out_path is None: - # extension will be changed - out_path = Path(wav_stem) - else: - out_path = Path(out_path) - filenames = get_sigmf_filenames(out_path) - - data_path = temp_dir / filenames["data_fn"] - wav_data.tofile(data_path) - - meta = SigMFFile(data_file=data_path, global_info=global_info) - meta.add_capture(0, metadata=capture_info) - log.debug("created %r", meta) - - arc_path = filenames["archive_fn"] - meta.tofile(arc_path, toarchive=True) - log.info("wrote %s", arc_path) - return arc_path - - -def main() -> None: - """ - entry-point for sigmf_convert_wav - """ - parser = argparse.ArgumentParser(description="Convert wav to sigmf archive.") - parser.add_argument("input", type=str, help="wav path") - parser.add_argument("--author", type=str, default=None, help=f"set {SigMFFile.AUTHOR_KEY} metadata") - parser.add_argument("-v", "--verbose", action="count", default=0) - parser.add_argument("--version", action="version", version=f"%(prog)s v{toolversion}") - args = parser.parse_args() - - level_lut = { - 0: logging.WARNING, - 1: logging.INFO, - 2: logging.DEBUG, - } - logging.basicConfig(level=level_lut[min(args.verbose, 2)]) - - _ = convert_wav( - wav_path=args.input, - author=args.author, - ) - - -if __name__ == "__main__": - main() diff --git a/sigmf/apps/__init__.py b/sigmf/convert/__init__.py similarity index 100% rename from sigmf/apps/__init__.py rename to sigmf/convert/__init__.py diff --git a/sigmf/convert/blue.py b/sigmf/convert/blue.py new file mode 100644 index 0000000..72b0534 --- /dev/null +++ b/sigmf/convert/blue.py @@ -0,0 +1,719 @@ +# Copyright: Multiple Authors +# +# This file is part of sigmf-python. https://github.com/sigmf/sigmf-python +# +# SPDX-License-Identifier: LGPL-3.0-or-later + +""" +X-Midas BLUE File converter. +This script reads and parses the HCB (Header Control Block) and Extended Headers. +It supports different file types and extracts metadata accordingly. +Converts the extracted metadata into SigMF format. +""" + +import argparse +import base64 +import getpass +import io +import logging +import struct +import tempfile +from datetime import datetime, timezone +from pathlib import Path +from typing import Optional + +import numpy as np + +from .. import SigMFFile +from .. import __version__ as toolversion +from ..error import SigMFConversionError +from ..sigmffile import get_sigmf_filenames +from ..utils import SIGMF_DATETIME_ISO8601_FMT + +log = logging.getLogger() + +# fmt: off +FIXED_LAYOUT = [ + # Fixed Header definitions: (key, offset, size, fmt, description) up to adjunct + ("version", 0, 4, "4s", "Header version"), + ("head_rep", 4, 4, "4s", "Header representation"), + ("data_rep", 8, 4, "4s", "Data representation"), + ("detached", 12, 4, "i", "Detached header"), + ("protected", 16, 4, "i", "Protected from overwrite"), + ("pipe", 20, 4, "i", "Pipe mode (N/A)"), + ("ext_start", 24, 4, "i", "Extended header start (512-byte blocks)"), + ("ext_size", 28, 4, "i", "Extended header size in bytes"), + ("data_start",32, 8, "d", "Data start in bytes"), + ("data_size", 40, 8, "d", "Data size in bytes"), + ("type", 48, 4, "i", "File type code"), + ("format", 52, 2, "2s", "2 Letter data format code"), + ("flagmask", 54, 2, "h", "16-bit flagmask"), + ("timecode", 56, 8, "d", "Time code field"), + ("inlet", 64, 2, "h", "Inlet owner"), + ("outlets", 66, 2, "h", "Number of outlets"), + ("outmask", 68, 4, "i", "Outlet async mask"), + ("pipeloc", 72, 4, "i", "Pipe location"), + ("pipesize", 76, 4, "i", "Pipe size in bytes"), + ("in_byte", 80, 8, "d", "Next input byte"), + ("out_byte", 88, 8, "d", "Next out byte (cumulative)"), + ("outbytes", 96, 64, "8d", "Next out byte (each outlet)"), + ("keylength", 160, 4, "i", "Length of keyword string"), + ("keywords", 164, 92, "92s", "User defined keyword string"), + # Adjunct starts at byte 256 after this +] +# fmt: on + +HEADER_SIZE_BYTES = 512 +BLOCK_SIZE_BYTES = 512 + +TYPE_MAP = { + # BLUE code to numpy dtype + "A": np.dtype("S1"), # ASCII character + "B": np.int8, + "I": np.int16, + "L": np.int32, + "X": np.int64, + "F": np.float32, + "D": np.float64, + # unsupported codes + # "P" : packed bits + # "N" : 4-bit integer +} + + +def blue_to_sigmf_type_str(h_fixed): + """ + Convert BLUE format code to SigMF datatype string. + + Parameters + ---------- + h_fixed : dict + Fixed Header dictionary containing 'format' and 'data_rep' fields. + + Returns + ------- + str + SigMF datatype string (e.g., 'ci16_le', 'rf32_be'). + """ + # extract format code and endianness from header + format_code = h_fixed.get("format") + endianness = h_fixed.get("data_rep") + + # parse format code components + is_complex = format_code[0] == "C" + numpy_dtype = TYPE_MAP[format_code[1]] + + # compute everything from numpy dtype + dtype_obj = np.dtype(numpy_dtype) + bits = dtype_obj.itemsize * 8 # bytes to bits + + # infer sigmf type from numpy kind + sigmf_type = "i" if dtype_obj.kind in ("i", "u") else "f" + + # build datatype string + prefix = "c" if is_complex else "r" + datatype = f"{prefix}{sigmf_type}{bits}" + + # add endianness for types > 8 bits + if bits > 8: + endian_suffix = "_le" if endianness == "EEEI" else "_be" + datatype += endian_suffix + + return datatype + + +def detect_endian(data): + """ + Detect endianness of a Bluefile header. + + Parameters + ---------- + data : bytes + Raw header data. + + Returns + ------- + str + "<" for little-endian or ">" for big-endian. + + Raises + ------ + SigMFConversionError + If the endianness is unexpected. + """ + endianness = data[8:12].decode("ascii") + if endianness == "EEEI": + return "<" + elif endianness == "IEEE": + return ">" + else: + raise SigMFConversionError(f"Unsupported endianness: {endianness}") + + +def read_hcb(file_path): + """ + Read Header Control Block (HCB) from BLUE file. + + First 256 bytes contains fixed header, followed by 256 bytes of adjunct header. + + Parameters + ---------- + file_path : str + Path to the Blue file. + + Returns + ------- + h_fixed : dict + Fixed Header + h_keywords : dict + Custom User Keywords + h_adjunct : dict + Adjunct Header + + Raises + ------ + SigMFConversionError + If header cannot be parsed. + """ + with open(file_path, "rb") as handle: + header_bytes = handle.read(256) + + endian = detect_endian(header_bytes) + + # fixed header fields + h_fixed = {} + for key, offset, size, fmt, _ in FIXED_LAYOUT: + raw = header_bytes[offset : offset + size] + try: + val = struct.unpack(endian + fmt, raw)[0] + except struct.error: + raise SigMFConversionError(f"Failed to unpack field {key} with endian {endian}") + if isinstance(val, bytes): + val = val.decode("ascii", errors="replace") + h_fixed[key] = val + + # parse user keywords & decode standard keywords + h_keywords = {} + + for field in h_fixed["keywords"].split("\x00"): + if "=" in field: + key, value = field.split("=", 1) + h_keywords[key] = value + + # variable (adjunct) header parsing + if h_fixed["type"] in (1000, 1001): + h_adjunct = { + "xstart": struct.unpack(f"{endian}d", handle.read(8))[0], + "xdelta": struct.unpack(f"{endian}d", handle.read(8))[0], + "xunits": struct.unpack(f"{endian}i", handle.read(4))[0], + } + elif h_fixed["type"] == 2000: + h_adjunct = { + "xstart": struct.unpack(f"{endian}d", handle.read(8))[0], + "xdelta": struct.unpack(f"{endian}d", handle.read(8))[0], + "xunits": struct.unpack(f"{endian}i", handle.read(4))[0], + "subsize": struct.unpack(f"{endian}i", handle.read(4))[0], + "ystart": struct.unpack(f"{endian}d", handle.read(8))[0], + "ydelta": struct.unpack(f"{endian}d", handle.read(8))[0], + "yunits": struct.unpack(f"{endian}i", handle.read(4))[0], + } + else: + # read raw adjunct header as bytes and convert to base64 for JSON serialization + log.warning(f"Unknown BLUE file type {h_fixed['type']}, encoding adjunct header in metadata as base64.") + raw_adjunct = handle.read(256) + h_adjunct = {"raw_base64": base64.b64encode(raw_adjunct).decode("ascii")} + + # FIXME: I've seen VER=2.0.14 + ver_lut = {"1.0": "BLUE 1.0", "1.1": "BLUE 1.1", "2.0": "Platinum"} + spec_str = ver_lut.get(h_keywords.get("VER", "1.0")) + log.info(f"Read {h_fixed['version']} type {h_fixed['type']} using {spec_str} specification.") + + validate_fixed(h_fixed) + validate_adjunct(h_adjunct) + + return h_fixed, h_keywords, h_adjunct + + +def read_extended_header(file_path, h_fixed): + """ + Read Extended Header from a BLUE file. + + Parameters + ---------- + file_path : str + Path to the BLUE file. + h_fixed : dict + Fixed Header containing 'ext_size' and 'ext_start'. + + Returns + ------- + list of dict + List of dictionaries containing parsed records. + + Raises + ------ + SigMFConversionError + If the extended header cannot be parsed. + """ + entries = [] + if h_fixed["ext_size"] <= 0: + return entries + endian = "<" if h_fixed.get("head_rep") == "EEEI" else ">" + with open(file_path, "rb") as handle: + handle.seek(int(h_fixed["ext_start"]) * BLOCK_SIZE_BYTES) + bytes_remaining = int(h_fixed["ext_size"]) + while bytes_remaining > 0: + lkey = struct.unpack(f"{endian}i", handle.read(4))[0] + lext = struct.unpack(f"{endian}h", handle.read(2))[0] + ltag = struct.unpack(f"{endian}b", handle.read(1))[0] + type_char = handle.read(1).decode("ascii", errors="replace") + + # get dtype and compute bytes per element + if type_char in TYPE_MAP: + dtype = TYPE_MAP[type_char] + bytes_per_element = np.dtype(dtype).itemsize + else: + # fallback for unknown types + dtype = np.dtype("S1") + bytes_per_element = 1 + + val_len = lkey - lext + val_count = val_len // bytes_per_element if bytes_per_element else 0 + + if type_char == "A": + raw = handle.read(val_len) + if len(raw) < val_len: + raise SigMFConversionError("Unexpected end of extended header") + value = raw.rstrip(b"\x00").decode("ascii", errors="replace") + else: + value = np.frombuffer(handle.read(val_len), dtype=dtype, count=val_count) + if value.size == 1: + val_item = value[0] + # handle bytes first (numpy.bytes_ is also np.generic) + if isinstance(val_item, bytes): + # handle bytes from S1 dtype - convert to base64 for JSON + value = base64.b64encode(val_item).decode("ascii") + elif isinstance(val_item, np.generic): + # convert numpy scalar to native python type + value = val_item.item() + else: + value = val_item + else: + value = value.tolist() + + tag = handle.read(ltag).decode("ascii", errors="replace") if ltag > 0 else "" + + total = 4 + 2 + 1 + 1 + val_len + ltag + pad = (8 - (total % 8)) % 8 + if pad: + handle.read(pad) + + entries.append({"tag": tag, "type": type_char, "value": value, "lkey": lkey, "lext": lext, "ltag": ltag}) + bytes_remaining -= lkey + + validate_extended_header(entries) + + return entries + + +def data_loopback(blue_path: Path, data_path: Path, h_fixed: dict) -> None: + """ + Write SigMF data file from BLUE file samples. + + Parameters + ---------- + blue_path : Path + Path to the BLUE file. + data_path : Path + Destination path for the SigMF dataset (.sigmf-data). + h_fixed : dict + Header Control Block dictionary. + + Returns + ------- + numpy.ndarray + Parsed samples. Empty array for zero-sample files. + """ + log.debug("parsing BLUE file data values") + + # use header data_size field instead of file size calculation + data_size_bytes = int(h_fixed.get("data_size", 0)) + fmt = h_fixed.get("format") + + log.debug(f"format: {fmt}, data_size from header: {data_size_bytes} bytes") + + # parse format code components + is_complex = fmt[0] == "C" + np_dtype = TYPE_MAP[fmt[1]] + + # calculate element size and count + elem_size = np.dtype(np_dtype).itemsize + elem_count = data_size_bytes // elem_size + + log.debug(f"elem_size: {elem_size}, elem_count: {elem_count}, is_complex: {is_complex}") + + # check for zero-sample file (metadata-only) + if elem_count == 0: + log.info("detected zero-sample BLUE file, creating metadata-only SigMF") + return np.array([], dtype=np_dtype) + + # read raw samples + raw_samples = np.fromfile(blue_path, dtype=np_dtype, offset=HEADER_SIZE_BYTES, count=elem_count) + + if is_complex: + # check if data is already complex or needs deinterleaving + if np.iscomplexobj(raw_samples): + # already complex, no reassembly needed + samples = raw_samples + else: + # reassemble interleaved IQ samples + samples = raw_samples[::2] + 1j * raw_samples[1::2] + else: + # scalar data + samples = raw_samples + + # save out as SigMF IQ data file + samples.tofile(data_path) + log.info("wrote %s", data_path) + + +def construct_sigmf( + filenames: dict, + h_fixed: dict, + h_keywords: dict, + h_adjunct: dict, + h_extended: list, + is_metadata_only: bool = False, + create_archive: bool = False, +) -> SigMFFile: + """ + Built & write a SigMF object from BLUE metadata. + + Parameters + ---------- + filenames : dict + Mapping returned by get_sigmf_filenames containing destination paths. + h_fixed : dict + Fixed Header + h_keywords : dict + Custom User Keywords + h_adjunct : dict + Adjunct Header + h_extended : list of dict + Parsed extended header entries from read_extended_header(). + is_metadata_only : bool, optional + If True, creates a metadata-only SigMF file. + create_archive : bool, optional + When True, package output as SigMF archive instead of a meta/data pair. + + Returns + ------- + SigMFFile + SigMF object. + """ + # helper to look up extended header values by tag + def get_tag(tag): + for entry in h_extended: + if entry["tag"] == tag: + return entry["value"] + return None + + # get sigmf datatype from blue format and endianness + datatype = blue_to_sigmf_type_str(h_fixed) + + log.info(f"Using SigMF datatype: {datatype} for BLUE format {h_fixed['format']}") + + # sample rate: prefer adjunct.xdelta, else extended header SAMPLE_RATE + if "xdelta" in h_adjunct: + sample_rate_hz = 1 / h_adjunct["xdelta"] + else: + sample_rate_hz = float(get_tag("SAMPLE_RATE")) + + if "outlets" in h_fixed and h_fixed["outlets"] > 0: + num_channels = int(h_fixed["outlets"]) + else: + num_channels = 1 + + # base global metadata + global_info = { + "core:author": getpass.getuser(), + SigMFFile.DATATYPE_KEY: datatype, + # SigMFFile.DESCRIPTION_KEY: ???, + SigMFFile.RECORDER_KEY: "Official SigMF BLUE converter", + SigMFFile.NUM_CHANNELS_KEY: num_channels, + SigMFFile.SAMPLE_RATE_KEY: sample_rate_hz, + SigMFFile.EXTENSIONS_KEY: [{"name": "blue", "version": "0.0.1", "optional": True}], + } + + # set metadata-only flag for zero-sample files + if is_metadata_only: + global_info[SigMFFile.METADATA_ONLY_KEY] = True + + # merge HCB values into metadata + global_info["blue:fixed"] = h_fixed + global_info["blue:keywords"] = h_keywords + global_info["blue:adjunct"] = h_adjunct + + # merge extended header fields, handling duplicate keys + if h_extended: + extended = {} + tag_counts = {} + for entry in h_extended: + tag = entry.get("tag") + value = entry.get("value") + if hasattr(value, "item"): + value = value.item() + + # handle duplicate tags by numbering them + if tag in extended: + tag_counts[tag] = tag_counts.get(tag, 0) + 1 + numbered_tag = f"{tag}_{tag_counts[tag]}" + extended[numbered_tag] = value + else: + extended[tag] = value + global_info["blue:extended"] = extended + + blue_start_time = float(h_fixed.get("timecode", 0)) + blue_start_time += h_adjunct.get("xstart", 0) + blue_start_time += float(h_keywords.get("TC_PREC", 0)) + + if blue_start_time == 0: + log.warning("BLUE timecode is zero or missing; datetime metadata will be absent.") + capture_info = {} + else: + # timecode uses 1950-01-01 as epoch, datetime uses 1970-01-01 + blue_epoch = blue_start_time - 631152000 # seconds between 1950 and 1970 + blue_datetime = datetime.fromtimestamp(blue_epoch, tz=timezone.utc) + + capture_info = { + SigMFFile.DATETIME_KEY: blue_datetime.strftime(SIGMF_DATETIME_ISO8601_FMT), + } + + if get_tag("RF_FREQ") is not None: + # There may be other keys related to tune frequency + capture_info[SigMFFile.FREQUENCY_KEY] = float(get_tag("RF_FREQ")) + + # TODO: if no output path is specified, construct non-conforming metadata only SigMF + + # for metadata-only files, don't specify data_file and skip checksum + if is_metadata_only: + meta = SigMFFile( + data_file=None, + global_info=global_info, + skip_checksum=True, + ) + meta.data_buffer = io.BytesIO() + else: + meta = SigMFFile( + data_file=filenames["data_fn"], + global_info=global_info, + ) + meta.add_capture(0, metadata=capture_info) + log.debug("created %r", meta) + + if create_archive: + meta.tofile(filenames["archive_fn"], toarchive=True) + log.info("wrote %s", filenames["archive_fn"]) + else: + meta.tofile(filenames["meta_fn"], toarchive=False) + log.info("wrote %s", filenames["meta_fn"]) + + return meta + + +def validate_file(blue_path: Path) -> None: + """ + Basic validation of the BLUE file. + + Parameters + ---------- + blue_path : Path + Path to the BLUE file. + + Raises + ------ + SigMFConversionError + If the file is abnormal. + """ + if blue_path.stat().st_size < 512: + raise SigMFConversionError("BLUE file is too small to contain required headers.") + + +def validate_fixed(h_fixed: dict) -> None: + """ + Check that Fixed Header contains minimum required fields. + + Parameters + ---------- + h_fixed : dict + Fixed Header dictionary. + + Raises + ------ + SigMFConversionError + If required fields are missing or invalid. + """ + required = ["version", "data_start", "data_size", "data_rep", "head_rep", "detached", "format", "type"] + for field in required: + if field not in h_fixed: + raise SigMFConversionError(f"Missing required Fixed Header field: {field}") + for rep_field in ["data_rep", "head_rep"]: + if h_fixed[rep_field] not in ("EEEI", "IEEE"): + raise SigMFConversionError(f"Invalid value for {rep_field}: {h_fixed[rep_field]}") + if h_fixed["data_size"] < 0: + raise SigMFConversionError(f"Invalid data_size: {h_fixed['data_size']} (must be >= 0)") + if len(h_fixed["format"]) != 2 or h_fixed["format"][0] not in "SC" or h_fixed["format"][1] not in TYPE_MAP: + raise SigMFConversionError(f"Unsupported data format: {h_fixed['format']}") + + +def validate_adjunct(adjunct: dict) -> None: + """ + Check that the Adjunct header contains minimum required fields. + """ + # validate xdelta (1 / samp_rate) if present + if "xdelta" in adjunct: + xdelta = adjunct["xdelta"] + if xdelta <= 0: + raise SigMFConversionError(f"Invalid adjunct xdelta time interval: {xdelta}") + + +def validate_extended_header(entries: list) -> None: + """ + Check that BLUE Extended Header contains minimum required fields. + + Parameters + ---------- + entries : list of dict + List of extended header entries. + + Raises + ------ + SigMFConversionError + If required fields are missing or invalid. + """ + # check for SAMPLE_RATE if present + for entry in entries: + if entry["tag"] == "SAMPLE_RATE": + sample_rate = float(entry["value"]) + if sample_rate <= 0: + raise SigMFConversionError(f"Invalid SAMPLE_RATE in extended header: {sample_rate}") + + +def blue_to_sigmf( + blue_path: str, + out_path: Optional[str] = None, + create_archive: bool = False, +) -> SigMFFile: + """ + Read a MIDAS Bluefile, write to SigMF, return SigMFFile object. + + Parameters + ---------- + blue_path : str + Path to the Blue file. + out_path : str + Path to the output SigMF metadata file. + create_archive : bool, optional + When True, package output as a .sigmf archive. + + Returns + ------- + numpy.ndarray + IQ Data. + + Notes + ----- + This function currently reads BLUE then writes a SigMF pair. We could also + implement a function that instead writes metadata only for a non-conforming + dataset using the HEADER_BYTES_KEY and TRAILING_BYTES_KEY in most cases. + """ + log.debug(f"read {blue_path}") + + blue_path = Path(blue_path) + if out_path is None: + base_path = blue_path + else: + base_path = Path(out_path) + + filenames = get_sigmf_filenames(base_path) + + # ensure output directory exists + filenames["base_fn"].parent.mkdir(parents=True, exist_ok=True) + + validate_file(blue_path) + + # read Header control block (HCB) to determine how to process the rest of the file + h_fixed, h_keywords, h_adjunct = read_hcb(blue_path) + + # read extended header + h_extended = read_extended_header(blue_path, h_fixed) + + # check if this is a zero-sample (metadata-only) file + data_size_bytes = int(h_fixed.get("data_size", 0)) + metadata_only = data_size_bytes == 0 + + with tempfile.TemporaryDirectory() as temp_dir: + if not metadata_only: + if create_archive: + # for archives, write data to a temporary file that will be cleaned up + data_path = Path(temp_dir) / filenames["data_fn"].name + filenames["data_fn"] = data_path # update path for construct_sigmf + else: + # for file pairs, write to the final destination + data_path = filenames["data_fn"] + data_loopback(blue_path, data_path, h_fixed) + else: + log.info("skipping data file creation for zero-sample BLUE file") + + # call the SigMF conversion for metadata generation + meta = construct_sigmf( + filenames=filenames, + h_fixed=h_fixed, + h_keywords=h_keywords, + h_adjunct=h_adjunct, + h_extended=h_extended, + is_metadata_only=metadata_only, + create_archive=create_archive, + ) + + log.debug(">>>>>>>>> Fixed Header") + for key, _, _, _, desc in FIXED_LAYOUT: + log.debug(f"{key:10s}: {h_fixed[key]!r} # {desc}") + + log.debug(">>>>>>>>> User Keywords") + log.debug(h_keywords) + + log.debug(">>>>>>>>> Adjunct Header") + log.debug(h_adjunct) + + log.debug(">>>>>>>>> Extended Header") + for entry in h_extended: + log.debug(f"{entry['tag']:20s}:{entry['value']}") + + return meta + + +def main() -> None: + """ + Entry-point for sigmf_convert_blue + """ + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("-i", "--input", type=str, required=True, help="BLUE file path") + parser.add_argument("-o", "--output", type=str, default=None, help="SigMF path") + parser.add_argument("-v", "--verbose", action="count", default=0) + parser.add_argument("--archive", action="store_true", help="Write a .sigmf archive instead of meta/data pair") + parser.add_argument("--version", action="version", version=f"%(prog)s v{toolversion}") + args = parser.parse_args() + + level_lut = { + 0: logging.WARNING, + 1: logging.INFO, + 2: logging.DEBUG, + } + logging.basicConfig(level=level_lut[min(args.verbose, 2)]) + + _ = blue_to_sigmf(blue_path=args.input, out_path=args.output, create_archive=args.archive) + + +if __name__ == "__main__": + main() diff --git a/sigmf/convert/wav.py b/sigmf/convert/wav.py new file mode 100755 index 0000000..49217ca --- /dev/null +++ b/sigmf/convert/wav.py @@ -0,0 +1,139 @@ +# Copyright: Multiple Authors +# +# This file is part of sigmf-python. https://github.com/sigmf/sigmf-python +# +# SPDX-License-Identifier: LGPL-3.0-or-later + +"""converter for wav containers""" + +import argparse +import logging +import tempfile +import wave +from datetime import datetime, timezone +from pathlib import Path +from typing import Optional + +import numpy as np + +from .. import SigMFFile +from .. import __version__ as toolversion +from ..sigmffile import get_sigmf_filenames +from ..utils import SIGMF_DATETIME_ISO8601_FMT, get_data_type_str + +log = logging.getLogger() + +try: + from scipy.io import wavfile +except ImportError: + SCIPY_INSTALLED = False +else: + SCIPY_INSTALLED = True + + +def wav_to_sigmf( + wav_path: str, + out_path: Optional[str] = None, + create_archive: bool = False, +) -> SigMFFile: + """ + Read a wav, optionally write a sigmf, return SigMFFile object. + + Raises + ------ + wave.Error + If the wav file is not PCM and Scipy is not installed. + """ + wav_path = Path(wav_path) + if SCIPY_INSTALLED: + samp_rate, wav_data = wavfile.read(wav_path) + else: + with wave.open(str(wav_path), "rb") as wav_reader: + n_channels = wav_reader.getnchannels() + samp_width = wav_reader.getsampwidth() + samp_rate = wav_reader.getframerate() + n_frames = wav_reader.getnframes() + raw_data = wav_reader.readframes(n_frames) + np_dtype = f"int{samp_width * 8}" + wav_data = np.frombuffer(raw_data, dtype=np_dtype).reshape(-1, n_channels) + global_info = { + SigMFFile.DATATYPE_KEY: get_data_type_str(wav_data), + SigMFFile.DESCRIPTION_KEY: f"converted from {wav_path.name}", + SigMFFile.NUM_CHANNELS_KEY: 1 if len(wav_data.shape) < 2 else wav_data.shape[1], + SigMFFile.RECORDER_KEY: "Official SigMF WAV converter", + SigMFFile.SAMPLE_RATE_KEY: samp_rate, + } + + modify_time = wav_path.lstat().st_mtime + wav_datetime = datetime.fromtimestamp(modify_time, tz=timezone.utc) + + capture_info = { + SigMFFile.DATETIME_KEY: wav_datetime.strftime(SIGMF_DATETIME_ISO8601_FMT), + } + + if out_path is None: + base_path = wav_path.with_suffix(".sigmf") + else: + base_path = Path(out_path) + + filenames = get_sigmf_filenames(base_path) + + output_dir = filenames["meta_fn"].parent + output_dir.mkdir(parents=True, exist_ok=True) + + if create_archive: + # use temporary directory for data file when creating archive + with tempfile.TemporaryDirectory() as temp_dir: + data_path = Path(temp_dir) / filenames["data_fn"].name + wav_data.tofile(data_path) + + meta = SigMFFile(data_file=data_path, global_info=global_info) + meta.add_capture(0, metadata=capture_info) + log.debug("created %r", meta) + + meta.tofile(filenames["archive_fn"], toarchive=True) + log.info("wrote %s", filenames["archive_fn"]) + else: + data_path = filenames["data_fn"] + wav_data.tofile(data_path) + + meta = SigMFFile(data_file=data_path, global_info=global_info) + meta.add_capture(0, metadata=capture_info) + log.debug("created %r", meta) + + meta.tofile(filenames["meta_fn"], toarchive=False) + log.info("wrote %s and %s", filenames["meta_fn"], filenames["data_fn"]) + + return meta + + +def main() -> None: + """ + Entry-point for sigmf_convert_wav + """ + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("-i", "--input", type=str, required=True, help="WAV path") + parser.add_argument("-o", "--output", type=str, default=None, help="SigMF path") + parser.add_argument("-v", "--verbose", action="count", default=0) + parser.add_argument( + "-a", "--archive", action="store_true", help="Save as SigMF archive instead of separate meta/data files." + ) + parser.add_argument("--version", action="version", version=f"%(prog)s v{toolversion}") + args = parser.parse_args() + + level_lut = { + 0: logging.WARNING, + 1: logging.INFO, + 2: logging.DEBUG, + } + logging.basicConfig(level=level_lut[min(args.verbose, 2)]) + + wav_path = Path(args.input) + if args.output is None: + args.output = wav_path.with_suffix(".sigmf") + + _ = wav_to_sigmf(wav_path=wav_path, out_path=args.output, create_archive=args.archive) + + +if __name__ == "__main__": + main() diff --git a/sigmf/error.py b/sigmf/error.py index 9f2564c..f4364bc 100644 --- a/sigmf/error.py +++ b/sigmf/error.py @@ -22,3 +22,7 @@ class SigMFAccessError(SigMFError): class SigMFFileError(SigMFError): """Exceptions related to reading or writing SigMF files or archives.""" + + +class SigMFConversionError(SigMFError): + """Exceptions related to converting to SigMF format.""" \ No newline at end of file diff --git a/tests/test_convert.py b/tests/test_convert.py new file mode 100644 index 0000000..5bd3051 --- /dev/null +++ b/tests/test_convert.py @@ -0,0 +1,124 @@ +# Copyright: Multiple Authors +# +# This file is part of sigmf-python. https://github.com/sigmf/sigmf-python +# +# SPDX-License-Identifier: LGPL-3.0-or-later + +"""Tests for Converters""" + +import os +import tempfile +import unittest +import wave +from pathlib import Path + +import numpy as np + +import sigmf +from sigmf.convert.blue import blue_to_sigmf +from sigmf.convert.wav import wav_to_sigmf + +from .testdata import NONSIGMF_REPO, NONSIGMF_ENV + + +class TestWAVConverter(unittest.TestCase): + """wav loopback test""" + + def setUp(self) -> None: + """temp wav file for testing""" + self.tmp_dir = tempfile.TemporaryDirectory() + self.tmp_path = Path(self.tmp_dir.name) + self.wav_path = self.tmp_path / "foo.wav" + samp_rate = 48000 + duration_s = 0.1 + ttt = np.linspace(0, duration_s, int(samp_rate * duration_s), endpoint=False) + freq = 440 # A4 note + self.audio_data = 0.5 * np.sin(2 * np.pi * freq * ttt) + # note scipy could write float wav files directly, + # but to avoid adding scipy as a dependency for sigmf-python, + # convert float audio to 16-bit PCM integer format + audio_int16 = (self.audio_data * 32767).astype(np.int16) + + # write wav file using built-in wave module + with wave.open(str(self.wav_path), "wb") as wav_file: + wav_file.setnchannels(1) # mono + wav_file.setsampwidth(2) # 16-bit = 2 bytes + wav_file.setframerate(samp_rate) + wav_file.writeframes(audio_int16.tobytes()) + + def tearDown(self) -> None: + """clean up temporary directory""" + self.tmp_dir.cleanup() + + def test_wav_to_sigmf_pair(self): + sigmf_path = self.tmp_path / "bar.tmp" + meta = wav_to_sigmf(wav_path=self.wav_path, out_path=sigmf_path) + data = meta.read_samples() + # allow numerical differences due to PCM quantization + self.assertTrue(np.allclose(self.audio_data, data, atol=1e-4)) + filenames = sigmf.sigmffile.get_sigmf_filenames(sigmf_path) + self.assertTrue(filenames["data_fn"].exists(), "dataset path missing") + self.assertTrue(filenames["meta_fn"].exists(), "metadata path missing") + + def test_wav_to_sigmf_archive(self): + sigmf_path = self.tmp_path / "baz.ext" + wav_to_sigmf(wav_path=self.wav_path, out_path=sigmf_path, create_archive=True) + filenames = sigmf.sigmffile.get_sigmf_filenames(sigmf_path) + self.assertTrue(filenames["archive_fn"].exists(), "archive path missing") + + +class TestBlueConverter(unittest.TestCase): + """As we have no blue files in the repository, test only when env path specified.""" + + def setUp(self) -> None: + """temp paths & blue files""" + if not NONSIGMF_REPO: + # skip test if environment variable not set + self.skipTest(f"Set {NONSIGMF_ENV} environment variable to path with .cdif files to run test. ") + self.bluefiles = list(NONSIGMF_REPO.glob("**/*.cdif")) + print("bluefiles", self.bluefiles) + if not self.bluefiles: + self.fail(f"No .cdif files found in {NONSIGMF_ENV}.") + self.tmp_dir = tempfile.TemporaryDirectory() + self.tmp_path = Path(self.tmp_dir.name) + + def tearDown(self) -> None: + """clean up temporary directory""" + self.tmp_dir.cleanup() + + def test_blue_to_sigmf_pair(self): + for bdx, bluefile in enumerate(self.bluefiles): + sigmf_path = self.tmp_path / bluefile.stem + meta = blue_to_sigmf(blue_path=bluefile, out_path=sigmf_path) + print(f"Converted {bluefile} to SigMF at {sigmf_path}") + if not meta.get_global_field("core:metadata_only"): + print(meta.read_samples(count=10)) + + # ### EVERYTHING BELOW HERE IS FOR DEBUGGING ONLY _ REMOVE LATER ### + # # plot stft of RF data for visual inspection + # import matplotlib.pyplot as plt + # from scipy.signal import spectrogram + # from swiftfox import summary, smartspec + + # if meta.get_global_field("core:metadata_only"): + # print("Metadata only file, skipping plot.") + # continue + # samples = meta.read_samples() + # # plt.figure(figsize=(10, 10)) + # summary(samples, detail=0.1, samp_rate=meta.get_global_field("core:sample_rate"), title=sigmf_path.name) + # plt.figure() + # # plt.plot(samples.real) + # # plt.plot(samples.imag) + # # plt.figure() + # spec = smartspec(samples, detail=0.5, samp_rate=meta.get_global_field("core:sample_rate")) + # # use imshow to plot spectrogram + + # plt.show() + self.assertIsInstance(meta, sigmf.SigMFFile) + + def test_blue_to_sigmf_archive(self): + for bdx, bluefile in enumerate(self.bluefiles): + sigmf_path = self.tmp_path / f"{bluefile.stem}_archive" + meta = blue_to_sigmf(blue_path=bluefile, out_path=sigmf_path, create_archive=True) + print(f"Converted {bluefile} to SigMF archive at {sigmf_path}") + self.assertIsInstance(meta, sigmf.SigMFFile) diff --git a/tests/testdata.py b/tests/testdata.py index d773d69..5d4a8f9 100644 --- a/tests/testdata.py +++ b/tests/testdata.py @@ -6,10 +6,20 @@ """Shared test data for tests.""" +import os +from pathlib import Path + import numpy as np from sigmf import SigMFFile, __specification__, __version__ +# detection for https://github.com/sigmf/example_nonsigmf_recordings +NONSIGMF_ENV = "EXAMPLE_NONSIGMF_RECORDINGS_PATH" +NONSIGMF_REPO = None +_recordings_path = Path(os.getenv(NONSIGMF_ENV, "nopath")) +if _recordings_path.is_dir(): + NONSIGMF_REPO = Path(_recordings_path) + TEST_FLOAT32_DATA = np.arange(16, dtype=np.float32) TEST_METADATA = { From 810ded8c1cc45d31e60492590ff372f3027f3155 Mon Sep 17 00:00:00 2001 From: Teque5 Date: Wed, 31 Dec 2025 15:11:53 -0800 Subject: [PATCH 02/13] fromfile improvements, BLUE & WAV NCD conversion * fromfile() now autodetects SigMF, BLUE, & WAV formats automatically * Converters now support conversion to non-conforming dataset without writing datafiles back to disk * Add utils.get_magic_bytes() for autodetection purposes * split tests for converters into separate files * Validated implementation against lots of files beyond nonsigmf-examples repo * Updated converter documentation * Added slightly more to README * Drop support for float WAV files; tricky to support NCD * Fix bug in sigmffile._count_samples for NCD files * Fig bug in read_samples when using some NCD files with header & trailing bytes --- README.md | 20 ++- docs/source/converters.rst | 73 ++++++---- sigmf/convert/blue.py | 273 ++++++++++++++++++++++++++++--------- sigmf/convert/wav.py | 159 ++++++++++++++++++--- sigmf/sigmffile.py | 70 +++++++--- sigmf/utils.py | 37 ++++- tests/test_convert.py | 124 ----------------- tests/test_convert_blue.py | 120 ++++++++++++++++ tests/test_convert_wav.py | 230 ++++++++++++++++++++++++++----- 9 files changed, 816 insertions(+), 290 deletions(-) mode change 100755 => 100644 sigmf/convert/wav.py delete mode 100644 tests/test_convert.py create mode 100644 tests/test_convert_blue.py diff --git a/README.md b/README.md index 2dca188..af79c02 100644 --- a/README.md +++ b/README.md @@ -12,10 +12,26 @@ freely under the terms GNU Lesser GPL v3 License. This module follows the SigMF specification [html](https://sigmf.org/)/[pdf](https://sigmf.github.io/SigMF/sigmf-spec.pdf) from the [spec repository](https://github.com/sigmf/SigMF). -To install the latest PyPI release, install from pip: +### Install Latest ```bash pip install sigmf ``` -**[Please visit the documentation for examples & more info.](https://sigmf.readthedocs.io/en/latest/)** +### Read SigMF + +```python +import sigmf + +# read SigMF recording +meta = sigmf.fromfile("recording.sigmf-meta") +samples = meta[0:1024] # get first 1024 samples + +# fromfile() also supports BLUE and WAV files via auto-detection +meta = sigmf.fromfile("recording.cdif") # BLUE file +meta = sigmf.fromfile("recording.wav") # WAV file +``` + +### Full API & Docs + +**[Please visit our documentation for more info.](https://sigmf.readthedocs.io/en/latest/)** diff --git a/docs/source/converters.rst b/docs/source/converters.rst index 76e09d9..5edfcd7 100644 --- a/docs/source/converters.rst +++ b/docs/source/converters.rst @@ -3,7 +3,7 @@ Format Converters ================= The SigMF Python library includes converters to import data from various file formats into SigMF format. -These converters make it easy to migrate existing RF recordings to the standardized SigMF format while preserving metadata when possible. +Converters can create standard SigMF file pairs or Non-Conforming Datasets (NCDs) that reference the original files. Overview -------- @@ -13,35 +13,52 @@ Converters are available for: * **BLUE files** - MIDAS Blue and Platinum BLUE RF recordings (``.cdif``) * **WAV files** - Audio recordings (``.wav``) -All converters return a :class:`~sigmf.SigMFFile` object that can be used immediately or saved to disk. -Converters preserve datatypes and metadata where possible. +All converters return a :class:`~sigmf.SigMFFile` object. Auto-detection is available through :func:`~sigmf.sigmffile.fromfile`. + + +Auto-Detection +~~~~~~~~~~~~~~ + +The :func:`~sigmf.sigmffile.fromfile` function automatically detects file formats and creates Non-Conforming Datasets: + +.. code-block:: python + + import sigmf + + # auto-detect and create NCD for any supported format + meta = sigmf.fromfile("recording.cdif") # BLUE file + meta = sigmf.fromfile("recording.wav") # WAV file + meta = sigmf.fromfile("recording.sigmf") # SigMF archive + + samples = meta.read_samples() Command Line Usage ~~~~~~~~~~~~~~~~~~ -Converters can be used from the command line after ``pip install sigmf``: +Converters can be used from the command line: .. code-block:: bash sigmf_convert_blue recording.cdif sigmf_convert_wav recording.wav -or by using module syntax: +or by using module execution: .. code-block:: bash - python3 -m sigmf.convert.blue recording.cdif - python3 -m sigmf.convert.wav recording.wav + python -m sigmf.convert.blue recording.cdif + python -m sigmf.convert.wav recording.wav + +Output Options +~~~~~~~~~~~~~~ -Output Naming -~~~~~~~~~~~~~ +Converters support multiple output modes: -All converters treat the value passed with ``-o/--output`` as a base name and ignore any existing suffix. The tools -emit ``.sigmf-data`` and ``.sigmf-meta`` files (retaining any original extensions such as ``.cdif`` or -``.tmp`` in the base). Supplying ``--archive`` packages the result as ``.sigmf`` instead of producing separate -meta/data files. +* **Standard conversion**: Creates ``.sigmf-data`` and ``.sigmf-meta`` files +* **Archive mode**: Creates single ``.sigmf`` archive with ``--archive`` +* **Non-Conforming Dataset**: Creates metadata-only file referencing original data with ``--ncd`` BLUE Converter @@ -56,38 +73,42 @@ The BLUE converter handles CDIF (.cdif) recordings while placing BLUE header inf .. autofunction:: sigmf.convert.blue.blue_to_sigmf - .. code-block:: python from sigmf.convert.blue import blue_to_sigmf - # read BLUE, write SigMF, and return SigMFFile object - meta = blue_to_sigmf(blue_path="recording.cdif", out_path="recording.sigmf") + # standard conversion + meta = blue_to_sigmf(blue_path="recording.cdif", out_path="recording") - # access converted data - samples = meta.read_samples() + # create NCD automatically (metadata-only, references original file) + meta = blue_to_sigmf(blue_path="recording.cdif") + + # access standard SigMF data & metadata + all_samples = meta.read_samples() sample_rate_hz = meta.sample_rate # access BLUE-specific metadata - blue_type = meta.get_global_field("blue:fixed")["type"] # e.g., 1000 - blue_version = meta.get_global_field("blue:keywords")["IO"] # e.g., "X-Midas" + blue_type = meta.get_global_field("blue:fixed")["type"] # e.g., 1000 + blue_version = meta.get_global_field("blue:keywords")["IO"] # e.g., "X-Midas" WAV Converter ------------- -This is useful when working with audio datasets. +Converts WAV audio recordings to SigMF format. .. autofunction:: sigmf.convert.wav.wav_to_sigmf - .. code-block:: python from sigmf.convert.wav import wav_to_sigmf - # read WAV, write SigMF, and return SigMFFile object - meta = wav_to_sigmf(wav_path="recording.wav", out_path="recording.sigmf") + # standard conversion + meta = wav_to_sigmf(wav_path="recording.wav", out_path="recording") - # access converted data - samples = meta.read_samples() + # create NCD automatically (metadata-only, references original file) + meta = wav_to_sigmf(wav_path="recording.wav") + + # access standard SigMF data & metadata + all_samples = meta.read_samples() sample_rate_hz = meta.sample_rate \ No newline at end of file diff --git a/sigmf/convert/blue.py b/sigmf/convert/blue.py index 72b0534..e9a0633 100644 --- a/sigmf/convert/blue.py +++ b/sigmf/convert/blue.py @@ -20,7 +20,7 @@ import tempfile from datetime import datetime, timezone from pathlib import Path -from typing import Optional +from typing import Optional, Tuple import numpy as np @@ -225,7 +225,7 @@ def read_hcb(file_path): # FIXME: I've seen VER=2.0.14 ver_lut = {"1.0": "BLUE 1.0", "1.1": "BLUE 1.1", "2.0": "Platinum"} - spec_str = ver_lut.get(h_keywords.get("VER", "1.0")) + spec_str = ver_lut.get(h_keywords.get("VER", "1.0"), "Unknown") log.info(f"Read {h_fixed['version']} type {h_fixed['type']} using {spec_str} specification.") validate_fixed(h_fixed) @@ -355,7 +355,7 @@ def data_loopback(blue_path: Path, data_path: Path, h_fixed: dict) -> None: # check for zero-sample file (metadata-only) if elem_count == 0: log.info("detected zero-sample BLUE file, creating metadata-only SigMF") - return np.array([], dtype=np_dtype) + return # read raw samples raw_samples = np.fromfile(blue_path, dtype=np_dtype, offset=HEADER_SIZE_BYTES, count=elem_count) @@ -377,22 +377,20 @@ def data_loopback(blue_path: Path, data_path: Path, h_fixed: dict) -> None: log.info("wrote %s", data_path) -def construct_sigmf( - filenames: dict, +def _build_common_metadata( h_fixed: dict, h_keywords: dict, h_adjunct: dict, h_extended: list, - is_metadata_only: bool = False, - create_archive: bool = False, -) -> SigMFFile: + is_ncd: bool = False, + blue_file_name: str = None, + trailing_bytes: int = 0, +) -> Tuple[dict, dict]: """ - Built & write a SigMF object from BLUE metadata. + Build common global_info and capture_info metadata for both standard and NCD SigMF files. Parameters ---------- - filenames : dict - Mapping returned by get_sigmf_filenames containing destination paths. h_fixed : dict Fixed Header h_keywords : dict @@ -400,16 +398,23 @@ def construct_sigmf( h_adjunct : dict Adjunct Header h_extended : list of dict - Parsed extended header entries from read_extended_header(). - is_metadata_only : bool, optional - If True, creates a metadata-only SigMF file. - create_archive : bool, optional - When True, package output as SigMF archive instead of a meta/data pair. + Parsed extended header entries. + is_ncd : bool, optional + If True, adds NCD-specific fields. + blue_file_name : str, optional + Original BLUE file name (required for NCD). + trailing_bytes : int, optional + Number of trailing bytes (for NCD). Returns ------- - SigMFFile - SigMF object. + tuple[dict, dict] + (global_info, capture_info) dictionaries. + + Raises + ------ + SigMFConversionError + If SigMF spec compliance is violated. """ # helper to look up extended header values by tag def get_tag(tag): @@ -420,7 +425,6 @@ def get_tag(tag): # get sigmf datatype from blue format and endianness datatype = blue_to_sigmf_type_str(h_fixed) - log.info(f"Using SigMF datatype: {datatype} for BLUE format {h_fixed['format']}") # sample rate: prefer adjunct.xdelta, else extended header SAMPLE_RATE @@ -438,16 +442,16 @@ def get_tag(tag): global_info = { "core:author": getpass.getuser(), SigMFFile.DATATYPE_KEY: datatype, - # SigMFFile.DESCRIPTION_KEY: ???, - SigMFFile.RECORDER_KEY: "Official SigMF BLUE converter", + SigMFFile.RECORDER_KEY: f"Official SigMF BLUE converter", SigMFFile.NUM_CHANNELS_KEY: num_channels, SigMFFile.SAMPLE_RATE_KEY: sample_rate_hz, SigMFFile.EXTENSIONS_KEY: [{"name": "blue", "version": "0.0.1", "optional": True}], } - # set metadata-only flag for zero-sample files - if is_metadata_only: - global_info[SigMFFile.METADATA_ONLY_KEY] = True + # add NCD-specific fields + if is_ncd: + global_info[SigMFFile.TRAILING_BYTES_KEY] = trailing_bytes + global_info[SigMFFile.DATASET_KEY] = blue_file_name # merge HCB values into metadata global_info["blue:fixed"] = h_fixed @@ -473,52 +477,32 @@ def get_tag(tag): extended[tag] = value global_info["blue:extended"] = extended + # calculate blue start time blue_start_time = float(h_fixed.get("timecode", 0)) blue_start_time += h_adjunct.get("xstart", 0) blue_start_time += float(h_keywords.get("TC_PREC", 0)) + capture_info = {} if blue_start_time == 0: log.warning("BLUE timecode is zero or missing; datetime metadata will be absent.") - capture_info = {} else: # timecode uses 1950-01-01 as epoch, datetime uses 1970-01-01 blue_epoch = blue_start_time - 631152000 # seconds between 1950 and 1970 blue_datetime = datetime.fromtimestamp(blue_epoch, tz=timezone.utc) - - capture_info = { - SigMFFile.DATETIME_KEY: blue_datetime.strftime(SIGMF_DATETIME_ISO8601_FMT), - } + capture_info[SigMFFile.DATETIME_KEY] = blue_datetime.strftime(SIGMF_DATETIME_ISO8601_FMT) if get_tag("RF_FREQ") is not None: - # There may be other keys related to tune frequency + # it's possible other keys indicate tune frequency, but RF_FREQ is standard capture_info[SigMFFile.FREQUENCY_KEY] = float(get_tag("RF_FREQ")) - # TODO: if no output path is specified, construct non-conforming metadata only SigMF - - # for metadata-only files, don't specify data_file and skip checksum - if is_metadata_only: - meta = SigMFFile( - data_file=None, - global_info=global_info, - skip_checksum=True, + # validate SigMF spec compliance: metadata_only and dataset fields are mutually exclusive + if SigMFFile.METADATA_ONLY_KEY in global_info and SigMFFile.DATASET_KEY in global_info: + raise SigMFConversionError( + "SigMF spec violation: core:metadata_only MAY NOT be used in conjunction with " + "Non-Conforming Datasets or the core:dataset field" ) - meta.data_buffer = io.BytesIO() - else: - meta = SigMFFile( - data_file=filenames["data_fn"], - global_info=global_info, - ) - meta.add_capture(0, metadata=capture_info) - log.debug("created %r", meta) - if create_archive: - meta.tofile(filenames["archive_fn"], toarchive=True) - log.info("wrote %s", filenames["archive_fn"]) - else: - meta.tofile(filenames["meta_fn"], toarchive=False) - log.info("wrote %s", filenames["meta_fn"]) - - return meta + return global_info, capture_info def validate_file(blue_path: Path) -> None: @@ -599,10 +583,142 @@ def validate_extended_header(entries: list) -> None: raise SigMFConversionError(f"Invalid SAMPLE_RATE in extended header: {sample_rate}") +def construct_sigmf( + filenames: dict, + h_fixed: dict, + h_keywords: dict, + h_adjunct: dict, + h_extended: list, + is_metadata_only: bool = False, + create_archive: bool = False, +) -> SigMFFile: + """ + Built & write a SigMF object from BLUE metadata. + + Parameters + ---------- + filenames : dict + Mapping returned by get_sigmf_filenames containing destination paths. + h_fixed : dict + Fixed Header + h_keywords : dict + Custom User Keywords + h_adjunct : dict + Adjunct Header + h_extended : list of dict + Parsed extended header entries from read_extended_header(). + is_metadata_only : bool, optional + If True, creates a metadata-only SigMF file. + create_archive : bool, optional + When True, package output as SigMF archive instead of a meta/data pair. + + Returns + ------- + SigMFFile + SigMF object. + """ + # use shared helper to build common metadata + global_info, capture_info = _build_common_metadata(h_fixed, h_keywords, h_adjunct, h_extended) + + # set metadata-only flag for zero-sample files (only for non-NCD files) + if is_metadata_only: + # ensure we're not accidentally setting metadata_only for an NCD + if SigMFFile.DATASET_KEY in global_info: + raise ValueError( + "Cannot set metadata_only=True for Non-Conforming Dataset files. " + "Per SigMF spec, metadata_only MAY NOT be used with core:dataset field." + ) + global_info[SigMFFile.METADATA_ONLY_KEY] = True + + # for metadata-only files, don't specify data_file and skip checksum + if is_metadata_only: + meta = SigMFFile( + data_file=None, + global_info=global_info, + skip_checksum=True, + ) + meta.data_buffer = io.BytesIO() + else: + meta = SigMFFile( + data_file=filenames["data_fn"], + global_info=global_info, + ) + meta.add_capture(0, metadata=capture_info) + log.debug("created %r", meta) + + if create_archive: + meta.tofile(filenames["archive_fn"], toarchive=True) + log.info("wrote %s", filenames["archive_fn"]) + else: + meta.tofile(filenames["meta_fn"], toarchive=False) + log.info("wrote %s", filenames["meta_fn"]) + + return meta + + +def construct_sigmf_ncd( + blue_path: Path, + h_fixed: dict, + h_keywords: dict, + h_adjunct: dict, + h_extended: list, + header_bytes: int, + trailing_bytes: int, +) -> SigMFFile: + """ + Construct Non-Conforming Dataset SigMF metadata for BLUE file. + + Parameters + ---------- + blue_path : Path + Path to the original BLUE file. + h_fixed : dict + Fixed Header + h_keywords : dict + Custom User Keywords + h_adjunct : dict + Adjunct Header + h_extended : list of dict + Parsed extended header entries from read_extended_header(). + header_bytes : int + Number of header bytes to skip. + trailing_bytes : int + Number of trailing bytes to ignore. + + Returns + ------- + SigMFFile + NCD SigMF object pointing to original BLUE file. + """ + # use shared helper to build common metadata, with NCD-specific additions + global_info, capture_info = _build_common_metadata( + h_fixed, + h_keywords, + h_adjunct, + h_extended, + is_ncd=True, + blue_file_name=blue_path.name, + trailing_bytes=trailing_bytes, + ) + + # add NCD-specific capture info + capture_info[SigMFFile.HEADER_BYTES_KEY] = header_bytes + + # create NCD metadata-only SigMF pointing to original file + meta = SigMFFile(global_info=global_info, skip_checksum=True) + meta.set_data_file(data_file=blue_path, offset=header_bytes, skip_checksum=True) + meta.data_buffer = io.BytesIO() + meta.add_capture(0, metadata=capture_info) + log.debug("created NCD SigMF: %r", meta) + + return meta + + def blue_to_sigmf( blue_path: str, out_path: Optional[str] = None, create_archive: bool = False, + create_ncd: bool = False, ) -> SigMFFile: """ Read a MIDAS Bluefile, write to SigMF, return SigMFFile object. @@ -611,24 +727,24 @@ def blue_to_sigmf( ---------- blue_path : str Path to the Blue file. - out_path : str + out_path : str, optional Path to the output SigMF metadata file. create_archive : bool, optional When True, package output as a .sigmf archive. + create_ncd : bool, optional + When True, create Non-Conforming Dataset with header_bytes and trailing_bytes. Returns ------- - numpy.ndarray - IQ Data. - - Notes - ----- - This function currently reads BLUE then writes a SigMF pair. We could also - implement a function that instead writes metadata only for a non-conforming - dataset using the HEADER_BYTES_KEY and TRAILING_BYTES_KEY in most cases. + SigMFFile + SigMF object, potentially as Non-Conforming Dataset. """ log.debug(f"read {blue_path}") + # auto-enable NCD when no output path is specified + if out_path is None: + create_ncd = True + blue_path = Path(blue_path) if out_path is None: base_path = blue_path @@ -648,10 +764,36 @@ def blue_to_sigmf( # read extended header h_extended = read_extended_header(blue_path, h_fixed) + # calculate NCD byte boundaries if requested + if create_ncd: + header_bytes = HEADER_SIZE_BYTES + int(h_fixed.get("ext_size", 0)) + + # for NCD, trailing_bytes = file_size - header_bytes - actual_data_size + file_size = blue_path.stat().st_size + actual_data_size = file_size - header_bytes + trailing_bytes = 0 # assume no trailing bytes for NCD unless file is smaller than expected + + log.debug( + f"BLUE NCD: file_size={file_size}, header_bytes={header_bytes}, actual_data_size={actual_data_size}, trailing_bytes={trailing_bytes}" + ) + # check if this is a zero-sample (metadata-only) file data_size_bytes = int(h_fixed.get("data_size", 0)) metadata_only = data_size_bytes == 0 + # handle NCD case where no output files are created + if create_ncd and out_path is None: + # create metadata-only SigMF for NCD pointing to original file + return construct_sigmf_ncd( + blue_path=blue_path, + h_fixed=h_fixed, + h_keywords=h_keywords, + h_adjunct=h_adjunct, + h_extended=h_extended, + header_bytes=header_bytes, + trailing_bytes=trailing_bytes, + ) + with tempfile.TemporaryDirectory() as temp_dir: if not metadata_only: if create_archive: @@ -702,6 +844,9 @@ def main() -> None: parser.add_argument("-o", "--output", type=str, default=None, help="SigMF path") parser.add_argument("-v", "--verbose", action="count", default=0) parser.add_argument("--archive", action="store_true", help="Write a .sigmf archive instead of meta/data pair") + parser.add_argument( + "--ncd", action="store_true", help="Process as Non-Conforming Dataset and write .sigmf-meta only." + ) parser.add_argument("--version", action="version", version=f"%(prog)s v{toolversion}") args = parser.parse_args() @@ -712,7 +857,7 @@ def main() -> None: } logging.basicConfig(level=level_lut[min(args.verbose, 2)]) - _ = blue_to_sigmf(blue_path=args.input, out_path=args.output, create_archive=args.archive) + _ = blue_to_sigmf(blue_path=args.input, out_path=args.output, create_archive=args.archive, create_ncd=args.ncd) if __name__ == "__main__": diff --git a/sigmf/convert/wav.py b/sigmf/convert/wav.py old mode 100755 new mode 100644 index 49217ca..e877d16 --- a/sigmf/convert/wav.py +++ b/sigmf/convert/wav.py @@ -7,6 +7,7 @@ """converter for wav containers""" import argparse +import io import logging import tempfile import wave @@ -23,43 +24,131 @@ log = logging.getLogger() -try: - from scipy.io import wavfile -except ImportError: - SCIPY_INSTALLED = False -else: - SCIPY_INSTALLED = True + +def _calculate_wav_ncd_bytes(wav_path: Path) -> tuple: + """ + Calculate header_bytes and trailing_bytes for WAV NCD. + + Returns + ------- + tuple + (header_bytes, trailing_bytes) + """ + # use wave module to get basic info + with wave.open(str(wav_path), "rb") as wav_reader: + n_channels = wav_reader.getnchannels() + samp_width = wav_reader.getsampwidth() + n_frames = wav_reader.getnframes() + + # calculate sample data size in bytes + sample_bytes = n_frames * n_channels * samp_width + file_size = wav_path.stat().st_size + + # parse WAV file structure to find data chunk + with open(wav_path, "rb") as handle: + # skip RIFF header (12 bytes: 'RIFF' + size + 'WAVE') + handle.seek(12) + header_bytes = 12 + + # search for 'data' chunk + while header_bytes < file_size: + chunk_id = handle.read(4) + if len(chunk_id) != 4: + break + chunk_size = int.from_bytes(handle.read(4), "little") + + if chunk_id == b"data": + # found data chunk, header ends here + header_bytes += 8 # include chunk_id and chunk_size + break + + # skip this chunk + header_bytes += 8 + chunk_size + # ensure even byte boundary (WAV chunks are word-aligned) + if chunk_size % 2: + header_bytes += 1 + handle.seek(header_bytes) + + trailing_bytes = max(0, file_size - header_bytes - sample_bytes) + return header_bytes, trailing_bytes def wav_to_sigmf( wav_path: str, out_path: Optional[str] = None, create_archive: bool = False, + create_ncd: bool = False, ) -> SigMFFile: """ Read a wav, optionally write a sigmf, return SigMFFile object. + Parameters + ---------- + wav_path : str + Path to the WAV file. + out_path : str, optional + Path to the output SigMF metadata file. + create_archive : bool, optional + When True, package output as a .sigmf archive. + create_ncd : bool, optional + When True, create Non-Conforming Dataset with header_bytes and trailing_bytes. + + Returns + ------- + SigMFFile + SigMF object, potentially as Non-Conforming Dataset. + Raises ------ wave.Error - If the wav file is not PCM and Scipy is not installed. + If the wav file cannot be read. """ wav_path = Path(wav_path) - if SCIPY_INSTALLED: - samp_rate, wav_data = wavfile.read(wav_path) - else: - with wave.open(str(wav_path), "rb") as wav_reader: - n_channels = wav_reader.getnchannels() - samp_width = wav_reader.getsampwidth() - samp_rate = wav_reader.getframerate() - n_frames = wav_reader.getnframes() + + # auto-enable NCD when no output path is specified + if out_path is None: + create_ncd = True + + # use built-in wave module exclusively for precise sample boundary detection + with wave.open(str(wav_path), "rb") as wav_reader: + n_channels = wav_reader.getnchannels() + samp_width = wav_reader.getsampwidth() + samp_rate = wav_reader.getframerate() + n_frames = wav_reader.getnframes() + + # for NCD support, calculate precise byte boundaries + if create_ncd: + header_bytes, trailing_bytes = _calculate_wav_ncd_bytes(wav_path) + log.debug(f"WAV NCD: header_bytes={header_bytes}, trailing_bytes={trailing_bytes}") + + # only read audio data if we're not creating NCD metadata-only + wav_data = None # initialize variable + if create_ncd and out_path is None: + # metadata-only NCD: don't read audio data + pass + else: + # normal conversion: read the audio data raw_data = wav_reader.readframes(n_frames) - np_dtype = f"int{samp_width * 8}" - wav_data = np.frombuffer(raw_data, dtype=np_dtype).reshape(-1, n_channels) + + np_dtype = f"int{samp_width * 8}" + + if wav_data is None: + # for NCD metadata-only, create dummy sample to get datatype + dummy_sample = np.array([0], dtype=np_dtype) + datatype_str = get_data_type_str(dummy_sample) + else: + # normal case: process actual audio data + wav_data = ( + np.frombuffer(raw_data, dtype=np_dtype).reshape(-1, n_channels) + if n_channels > 1 + else np.frombuffer(raw_data, dtype=np_dtype) + ) + datatype_str = get_data_type_str(wav_data) + global_info = { - SigMFFile.DATATYPE_KEY: get_data_type_str(wav_data), + SigMFFile.DATATYPE_KEY: datatype_str, SigMFFile.DESCRIPTION_KEY: f"converted from {wav_path.name}", - SigMFFile.NUM_CHANNELS_KEY: 1 if len(wav_data.shape) < 2 else wav_data.shape[1], + SigMFFile.NUM_CHANNELS_KEY: n_channels, SigMFFile.RECORDER_KEY: "Official SigMF WAV converter", SigMFFile.SAMPLE_RATE_KEY: samp_rate, } @@ -71,6 +160,33 @@ def wav_to_sigmf( SigMFFile.DATETIME_KEY: wav_datetime.strftime(SIGMF_DATETIME_ISO8601_FMT), } + if create_ncd: + # NCD requires extra fields + global_info[SigMFFile.TRAILING_BYTES_KEY] = trailing_bytes + global_info[SigMFFile.DATASET_KEY] = wav_path.name + capture_info[SigMFFile.HEADER_BYTES_KEY] = header_bytes + + # handle NCD case where no output files are created + if create_ncd and out_path is None: + # create metadata-only SigMF for NCD pointing to original file + meta = SigMFFile(global_info=global_info, skip_checksum=True) + meta.set_data_file(data_file=wav_path, offset=header_bytes, skip_checksum=True) + meta.data_buffer = io.BytesIO() + meta.add_capture(0, metadata=capture_info) + log.debug("created NCD SigMF: %r", meta) + return meta + + # if we get here, we need the actual audio data to create a new data file + if wav_data is None: + # need to read the audio data now for normal file creation + with wave.open(str(wav_path), "rb") as wav_reader: + raw_data = wav_reader.readframes(n_frames) + wav_data = ( + np.frombuffer(raw_data, dtype=np_dtype).reshape(-1, n_channels) + if n_channels > 1 + else np.frombuffer(raw_data, dtype=np_dtype) + ) + if out_path is None: base_path = wav_path.with_suffix(".sigmf") else: @@ -118,6 +234,9 @@ def main() -> None: parser.add_argument( "-a", "--archive", action="store_true", help="Save as SigMF archive instead of separate meta/data files." ) + parser.add_argument( + "--ncd", action="store_true", help="Process as Non-Conforming Dataset and write .sigmf-meta only." + ) parser.add_argument("--version", action="version", version=f"%(prog)s v{toolversion}") args = parser.parse_args() @@ -132,7 +251,7 @@ def main() -> None: if args.output is None: args.output = wav_path.with_suffix(".sigmf") - _ = wav_to_sigmf(wav_path=wav_path, out_path=args.output, create_archive=args.archive) + _ = wav_to_sigmf(wav_path=wav_path, out_path=args.output, create_archive=args.archive, create_ncd=args.ncd) if __name__ == "__main__": diff --git a/sigmf/sigmffile.py b/sigmf/sigmffile.py index c40e7e8..93ceaea 100644 --- a/sigmf/sigmffile.py +++ b/sigmf/sigmffile.py @@ -23,8 +23,8 @@ SIGMF_METADATA_EXT, SigMFArchive, ) -from .error import SigMFAccessError, SigMFError, SigMFFileError -from .utils import dict_merge +from .error import SigMFAccessError, SigMFConversionError, SigMFError, SigMFFileError +from .utils import dict_merge, get_magic_bytes class SigMFMetafile: @@ -834,6 +834,8 @@ def read_samples(self, start_index=0, count=-1): """ if count == 0: raise IOError("Number of samples must be greater than zero, or -1 for all samples.") + elif count == -1: + count = self.sample_count - start_index elif start_index + count > self.sample_count: raise IOError("Cannot read beyond EOF.") if self.data_file is None and not isinstance(self.data_buffer, io.BytesIO): @@ -862,7 +864,10 @@ def _read_datafile(self, first_byte, nitems): if self.data_file is not None: fp = open(self.data_file, "rb") - fp.seek(first_byte, 0) + # account for data_offset when seeking (important for NCDs) + seek_position = first_byte + getattr(self, "data_offset", 0) + fp.seek(seek_position, 0) + data = np.fromfile(fp, dtype=data_type_in, count=nitems) elif self.data_buffer is not None: # handle offset for data_buffer like we do for data_file @@ -1213,13 +1218,14 @@ def fromarchive(archive_path, dir=None, skip_checksum=False, autoscale=True): def fromfile(filename, skip_checksum=False, autoscale=True): """ - Creates and returns a SigMFFile or SigMFCollection instance with metadata loaded from the specified file. + Read a file as a SigMFFile or SigMFCollection. The file can be one of: - * A SigMF Metadata file (.sigmf-meta) - * A SigMF Dataset file (.sigmf-data) - * A SigMF Collection file (.sigmf-collection) - * A SigMF Archive file (.sigmf-archive) + * a SigMF Archive (.sigmf) + * a SigMF Metadata file (.sigmf-meta) + * a SigMF Dataset file (.sigmf-data) + * a SigMF Collection file (.sigmf-collection) + * a non-SigMF RF recording that can be converted (.wav, .cdif) Parameters ---------- @@ -1232,22 +1238,34 @@ def fromfile(filename, skip_checksum=False, autoscale=True): Returns ------- - object - SigMFFile with dataset & metadata or a SigMFCollection depending on file type. + SigMFFile | SigMFCollection + A SigMFFile or a SigMFCollection depending on file type. + + Raises + ------ + SigMFFileError + If the file cannot be read as any supported format. + SigMFConversionError + If auto-detection conversion fails. """ + file_path = Path(filename) fns = get_sigmf_filenames(filename) meta_fn = fns["meta_fn"] archive_fn = fns["archive_fn"] collection_fn = fns["collection_fn"] - # extract the extension to check whether we are dealing with an archive, collection, etc. - file_path = Path(filename) - ext = file_path.suffix + # extract the extension to check file type + ext = file_path.suffix.lower() - if (ext.lower().endswith(SIGMF_ARCHIVE_EXT) or not Path.is_file(meta_fn)) and Path.is_file(archive_fn): + # group SigMF extensions for cleaner checking + sigmf_extensions = (SIGMF_METADATA_EXT, SIGMF_DATASET_EXT, SIGMF_COLLECTION_EXT, SIGMF_ARCHIVE_EXT) + + # try SigMF archive + if (ext.endswith(SIGMF_ARCHIVE_EXT) or not Path.is_file(meta_fn)) and Path.is_file(archive_fn): return fromarchive(archive_fn, skip_checksum=skip_checksum, autoscale=autoscale) - if (ext.lower().endswith(SIGMF_COLLECTION_EXT) or not Path.is_file(meta_fn)) and Path.is_file(collection_fn): + # try SigMF collection + if (ext.endswith(SIGMF_COLLECTION_EXT) or not Path.is_file(meta_fn)) and Path.is_file(collection_fn): collection_fp = open(collection_fn, "rb") bytestream_reader = codecs.getreader("utf-8") mdfile_reader = bytestream_reader(collection_fp) @@ -1257,7 +1275,8 @@ def fromfile(filename, skip_checksum=False, autoscale=True): dir_path = meta_fn.parent return SigMFCollection(metadata=metadata, base_path=dir_path, skip_checksums=skip_checksum) - else: + # try standard SigMF metadata file + if Path.is_file(meta_fn): meta_fp = open(meta_fn, "rb") bytestream_reader = codecs.getreader("utf-8") mdfile_reader = bytestream_reader(meta_fp) @@ -1267,6 +1286,23 @@ def fromfile(filename, skip_checksum=False, autoscale=True): data_fn = get_dataset_filename_from_metadata(meta_fn, metadata) return SigMFFile(metadata=metadata, data_file=data_fn, skip_checksum=skip_checksum, autoscale=autoscale) + # try auto-detection for non-SigMF files only + if Path.is_file(file_path) and not ext.endswith(sigmf_extensions): + magic_bytes = get_magic_bytes(file_path, count=4, offset=0) + + if magic_bytes == b"RIFF": + from .convert.wav import wav_to_sigmf + + return wav_to_sigmf(file_path, create_ncd=True) + + elif magic_bytes == b"BLUE": + from .convert.blue import blue_to_sigmf + + return blue_to_sigmf(file_path, create_ncd=True) + + # if file doesn't exist at all or no valid files found, raise original error + raise SigMFFileError(f"Cannot read {filename} as SigMF or supported non-SigMF format.") + def get_sigmf_filenames(filename): """ @@ -1279,7 +1315,7 @@ def get_sigmf_filenames(filename): Returns ------- - dict with 'data_fn', 'meta_fn', and 'archive_fn' as keys. + dict with filename keys. """ stem_path = Path(filename) # If the path has a sigmf suffix, remove it. Otherwise do not remove the diff --git a/sigmf/utils.py b/sigmf/utils.py index 571a5e4..3c325c3 100644 --- a/sigmf/utils.py +++ b/sigmf/utils.py @@ -10,10 +10,11 @@ import sys from copy import deepcopy from datetime import datetime, timezone +from pathlib import Path import numpy as np -from .error import SigMFError +from .error import SigMFConversionError, SigMFError SIGMF_DATETIME_ISO8601_FMT = "%Y-%m-%dT%H:%M:%S.%fZ" @@ -112,3 +113,37 @@ def get_data_type_str(ray: np.ndarray) -> str: # only append endianness for types over 8 bits data_type_str += get_endian_str(ray) return data_type_str + + +def get_magic_bytes(file_path: Path, count: int = 4, offset: int = 0) -> bytes: + """ + Get magic bytes from a file to help identify file type. + + Parameters + ---------- + file_path : Path + Path to the file to read magic bytes from. + count : int, optional + Number of bytes to read. Default is 4. + offset : int, optional + Byte offset to start reading from. Default is 0. + + Returns + ------- + bytes + Magic bytes from the file. + + Raises + ------ + SigMFConversionError + If file cannot be read or is too small. + """ + try: + with open(file_path, "rb") as handle: + handle.seek(offset) + magic_bytes = handle.read(count) + if len(magic_bytes) < count: + raise SigMFConversionError(f"File {file_path} too small to read {count} magic bytes at offset {offset}") + return magic_bytes + except (IOError, OSError) as err: + raise SigMFConversionError(f"Cannot read magic bytes from {file_path}: {err}") diff --git a/tests/test_convert.py b/tests/test_convert.py deleted file mode 100644 index 5bd3051..0000000 --- a/tests/test_convert.py +++ /dev/null @@ -1,124 +0,0 @@ -# Copyright: Multiple Authors -# -# This file is part of sigmf-python. https://github.com/sigmf/sigmf-python -# -# SPDX-License-Identifier: LGPL-3.0-or-later - -"""Tests for Converters""" - -import os -import tempfile -import unittest -import wave -from pathlib import Path - -import numpy as np - -import sigmf -from sigmf.convert.blue import blue_to_sigmf -from sigmf.convert.wav import wav_to_sigmf - -from .testdata import NONSIGMF_REPO, NONSIGMF_ENV - - -class TestWAVConverter(unittest.TestCase): - """wav loopback test""" - - def setUp(self) -> None: - """temp wav file for testing""" - self.tmp_dir = tempfile.TemporaryDirectory() - self.tmp_path = Path(self.tmp_dir.name) - self.wav_path = self.tmp_path / "foo.wav" - samp_rate = 48000 - duration_s = 0.1 - ttt = np.linspace(0, duration_s, int(samp_rate * duration_s), endpoint=False) - freq = 440 # A4 note - self.audio_data = 0.5 * np.sin(2 * np.pi * freq * ttt) - # note scipy could write float wav files directly, - # but to avoid adding scipy as a dependency for sigmf-python, - # convert float audio to 16-bit PCM integer format - audio_int16 = (self.audio_data * 32767).astype(np.int16) - - # write wav file using built-in wave module - with wave.open(str(self.wav_path), "wb") as wav_file: - wav_file.setnchannels(1) # mono - wav_file.setsampwidth(2) # 16-bit = 2 bytes - wav_file.setframerate(samp_rate) - wav_file.writeframes(audio_int16.tobytes()) - - def tearDown(self) -> None: - """clean up temporary directory""" - self.tmp_dir.cleanup() - - def test_wav_to_sigmf_pair(self): - sigmf_path = self.tmp_path / "bar.tmp" - meta = wav_to_sigmf(wav_path=self.wav_path, out_path=sigmf_path) - data = meta.read_samples() - # allow numerical differences due to PCM quantization - self.assertTrue(np.allclose(self.audio_data, data, atol=1e-4)) - filenames = sigmf.sigmffile.get_sigmf_filenames(sigmf_path) - self.assertTrue(filenames["data_fn"].exists(), "dataset path missing") - self.assertTrue(filenames["meta_fn"].exists(), "metadata path missing") - - def test_wav_to_sigmf_archive(self): - sigmf_path = self.tmp_path / "baz.ext" - wav_to_sigmf(wav_path=self.wav_path, out_path=sigmf_path, create_archive=True) - filenames = sigmf.sigmffile.get_sigmf_filenames(sigmf_path) - self.assertTrue(filenames["archive_fn"].exists(), "archive path missing") - - -class TestBlueConverter(unittest.TestCase): - """As we have no blue files in the repository, test only when env path specified.""" - - def setUp(self) -> None: - """temp paths & blue files""" - if not NONSIGMF_REPO: - # skip test if environment variable not set - self.skipTest(f"Set {NONSIGMF_ENV} environment variable to path with .cdif files to run test. ") - self.bluefiles = list(NONSIGMF_REPO.glob("**/*.cdif")) - print("bluefiles", self.bluefiles) - if not self.bluefiles: - self.fail(f"No .cdif files found in {NONSIGMF_ENV}.") - self.tmp_dir = tempfile.TemporaryDirectory() - self.tmp_path = Path(self.tmp_dir.name) - - def tearDown(self) -> None: - """clean up temporary directory""" - self.tmp_dir.cleanup() - - def test_blue_to_sigmf_pair(self): - for bdx, bluefile in enumerate(self.bluefiles): - sigmf_path = self.tmp_path / bluefile.stem - meta = blue_to_sigmf(blue_path=bluefile, out_path=sigmf_path) - print(f"Converted {bluefile} to SigMF at {sigmf_path}") - if not meta.get_global_field("core:metadata_only"): - print(meta.read_samples(count=10)) - - # ### EVERYTHING BELOW HERE IS FOR DEBUGGING ONLY _ REMOVE LATER ### - # # plot stft of RF data for visual inspection - # import matplotlib.pyplot as plt - # from scipy.signal import spectrogram - # from swiftfox import summary, smartspec - - # if meta.get_global_field("core:metadata_only"): - # print("Metadata only file, skipping plot.") - # continue - # samples = meta.read_samples() - # # plt.figure(figsize=(10, 10)) - # summary(samples, detail=0.1, samp_rate=meta.get_global_field("core:sample_rate"), title=sigmf_path.name) - # plt.figure() - # # plt.plot(samples.real) - # # plt.plot(samples.imag) - # # plt.figure() - # spec = smartspec(samples, detail=0.5, samp_rate=meta.get_global_field("core:sample_rate")) - # # use imshow to plot spectrogram - - # plt.show() - self.assertIsInstance(meta, sigmf.SigMFFile) - - def test_blue_to_sigmf_archive(self): - for bdx, bluefile in enumerate(self.bluefiles): - sigmf_path = self.tmp_path / f"{bluefile.stem}_archive" - meta = blue_to_sigmf(blue_path=bluefile, out_path=sigmf_path, create_archive=True) - print(f"Converted {bluefile} to SigMF archive at {sigmf_path}") - self.assertIsInstance(meta, sigmf.SigMFFile) diff --git a/tests/test_convert_blue.py b/tests/test_convert_blue.py new file mode 100644 index 0000000..0f1ea58 --- /dev/null +++ b/tests/test_convert_blue.py @@ -0,0 +1,120 @@ +# Copyright: Multiple Authors +# +# This file is part of sigmf-python. https://github.com/sigmf/sigmf-python +# +# SPDX-License-Identifier: LGPL-3.0-or-later + +"""Tests for BLUE Converter""" + +import tempfile +import unittest +from pathlib import Path +from typing import cast + +import sigmf +from sigmf.convert.blue import blue_to_sigmf + +from .testdata import NONSIGMF_ENV, NONSIGMF_REPO + + +class TestBlueConverter(unittest.TestCase): + """BLUE converter tests using external files""" + + def setUp(self) -> None: + """setup paths to blue files""" + self.tmp_dir = tempfile.TemporaryDirectory() + self.tmp_path = Path(self.tmp_dir.name) + if not NONSIGMF_REPO: + # skip test if environment variable not set + self.skipTest(f"Set {NONSIGMF_ENV} environment variable to path with BLUE files to run test.") + + # look for blue files in blue/ directory + blue_dir = NONSIGMF_REPO / "blue" + self.bluefiles = [] + if blue_dir.exists(): + for ext in ["*.cdif", "*.tmp"]: + self.bluefiles.extend(blue_dir.glob(f"**/{ext}")) + + if not self.bluefiles: + self.fail(f"No BLUE files (*.cdif, *.tmp) found in {blue_dir}.") + + def tearDown(self) -> None: + """clean up temporary directory""" + self.tmp_dir.cleanup() + + def _validate_ncd_structure(self, meta, expected_file): + """validate basic NCD structure""" + self.assertEqual(meta.data_file, expected_file, "NCD should point to original file") + self.assertIn("core:trailing_bytes", meta._metadata["global"]) + captures = meta.get_captures() + self.assertGreater(len(captures), 0, "Should have at least one capture") + self.assertIn("core:header_bytes", captures[0]) + + # validate SigMF spec compliance: NCDs must not have metadata_only field + global_meta = meta._metadata["global"] + has_dataset = "core:dataset" in global_meta + has_metadata_only = "core:metadata_only" in global_meta + + self.assertTrue(has_dataset, "NCD should have core:dataset field") + self.assertFalse(has_metadata_only, "NCD should NOT have core:metadata_only field (spec violation)") + + return captures + + def _validate_auto_detection(self, file_path): + """validate auto-detection works and returns valid NCD""" + meta_auto_raw = sigmf.fromfile(file_path) + # auto-detection should return SigMFFile, not SigMFCollection + self.assertIsInstance(meta_auto_raw, sigmf.SigMFFile) + meta_auto = cast(sigmf.SigMFFile, meta_auto_raw) + # data_file might be Path or str, so convert both for comparison + self.assertEqual(str(meta_auto.data_file), str(file_path)) + self.assertIn("core:trailing_bytes", meta_auto._metadata["global"]) + return meta_auto + + def test_blue_to_sigmf_pair(self): + """test standard blue to sigmf conversion with file pairs""" + for bluefile in self.bluefiles: + sigmf_path = self.tmp_path / bluefile.stem + meta = blue_to_sigmf(blue_path=bluefile, out_path=sigmf_path) + if not meta.get_global_field("core:metadata_only"): + meta.read_samples(count=10) + self.assertIsInstance(meta, sigmf.SigMFFile) + + def test_blue_to_sigmf_archive(self): + """test blue to sigmf conversion with archive output""" + for bluefile in self.bluefiles: + sigmf_path = self.tmp_path / f"{bluefile.stem}_archive" + meta = blue_to_sigmf(blue_path=bluefile, out_path=str(sigmf_path), create_archive=True) + self.assertIsInstance(meta, sigmf.SigMFFile) + + def test_blue_to_sigmf_ncd(self): + """test blue to sigmf conversion as Non-Conforming Dataset""" + for bluefile in self.bluefiles: + meta = blue_to_sigmf(blue_path=str(bluefile), create_ncd=True) + + # validate basic NCD structure + self._validate_ncd_structure(meta, bluefile) + + # verify this is metadata-only (no separate data file created) + self.assertIsInstance(meta.data_buffer, type(meta.data_buffer)) + + # test that data can be read if not metadata-only + if not meta.get_global_field("core:metadata_only"): + _ = meta.read_samples(count=10) + + def test_blue_auto_detection(self): + """test automatic BLUE detection through fromfile()""" + for bluefile in self.bluefiles: + # validate auto-detection works + self._validate_auto_detection(bluefile) + + def test_blue_directory_files_ncd(self): + """test NCD conversion""" + for blue_file in self.bluefiles: + meta = blue_to_sigmf(blue_path=str(blue_file), create_ncd=True) + + # validate basic NCD structure + self._validate_ncd_structure(meta, blue_file) + + # validate auto-detection also works + self._validate_auto_detection(blue_file) diff --git a/tests/test_convert_wav.py b/tests/test_convert_wav.py index 11ef52a..c4e5670 100644 --- a/tests/test_convert_wav.py +++ b/tests/test_convert_wav.py @@ -4,43 +4,201 @@ # # SPDX-License-Identifier: LGPL-3.0-or-later -"""Tests wav formatted audio conversion""" +"""Tests for WAV Converter""" -import os import tempfile +import unittest +import wave +from pathlib import Path +from typing import cast import numpy as np -import pytest -from scipy.io import wavfile - -from sigmf.apps.convert_wav import convert_wav - - -def test_wav_to_sigmf_basic(): - """Basic smoke-test: convert a tiny WAV → SIGMF, assert file created.""" - fs = 48_000 - t = np.linspace(0, 0.1, int(fs * 0.1)) # 0.1 s - sine = np.sin(2 * np.pi * 1000 * t) - sine_int = (sine * 32767).astype(np.int16) - - # Create temp file and close it before use - with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_wav: - tmp_wav_path = tmp_wav.name - - # Write to the closed file - wavfile.write(tmp_wav_path, fs, sine_int) - tmp_sigmf = tmp_wav_path.replace(".wav", ".sigmf") - - try: - # Run converter - convert_wav(tmp_wav_path, tmp_sigmf) - - # Assert SIGMF file exists and non-zero - assert os.path.exists(tmp_sigmf), "SIGMF file not created" - assert os.path.getsize(tmp_sigmf) > 0, "SIGMF file is empty" - finally: - # Clean up both files - if os.path.exists(tmp_wav_path): - os.remove(tmp_wav_path) - if os.path.exists(tmp_sigmf): - os.remove(tmp_sigmf) + +import sigmf +from sigmf.convert.wav import wav_to_sigmf + +from .testdata import NONSIGMF_ENV, NONSIGMF_REPO + + +class TestWAVConverter(unittest.TestCase): + """wav converter tests""" + + def setUp(self) -> None: + """temp wav file for testing""" + self.tmp_dir = tempfile.TemporaryDirectory() + self.tmp_path = Path(self.tmp_dir.name) + self.wav_path = self.tmp_path / "foo.wav" + samp_rate = 48000 + duration_s = 0.1 + ttt = np.linspace(0, duration_s, int(samp_rate * duration_s), endpoint=False) + freq = 440 # A4 note + self.audio_data = 0.5 * np.sin(2 * np.pi * freq * ttt) + # convert float audio to 16-bit PCM integer format + audio_int16 = (self.audio_data * 32767).astype(np.int16) + + # write wav file using built-in wave module + with wave.open(str(self.wav_path), "wb") as wav_file: + wav_file.setnchannels(1) # mono + wav_file.setsampwidth(2) # 16-bit = 2 bytes + wav_file.setframerate(samp_rate) + wav_file.writeframes(audio_int16.tobytes()) + + def tearDown(self) -> None: + """clean up temporary directory""" + self.tmp_dir.cleanup() + + def _validate_ncd_structure(self, meta, expected_file): + """validate basic NCD structure""" + self.assertEqual(meta.data_file, expected_file, "NCD should point to original file") + self.assertIn("core:trailing_bytes", meta._metadata["global"]) + captures = meta.get_captures() + self.assertGreater(len(captures), 0, "Should have at least one capture") + self.assertIn("core:header_bytes", captures[0]) + + # validate SigMF spec compliance: NCDs must not have metadata_only field + global_meta = meta._metadata["global"] + has_dataset = "core:dataset" in global_meta + has_metadata_only = "core:metadata_only" in global_meta + + self.assertTrue(has_dataset, "NCD should have core:dataset field") + self.assertFalse(has_metadata_only, "NCD should NOT have core:metadata_only field (spec violation)") + + return captures + + def _validate_dataset_key(self, meta, expected_filename): + """validate DATASET_KEY is correctly set""" + dataset_filename = meta.get_global_field("core:dataset") + self.assertEqual(dataset_filename, expected_filename, "DATASET_KEY should contain filename") + self.assertIsInstance(dataset_filename, str, "DATASET_KEY should be a string") + + def _validate_auto_detection(self, file_path): + """validate auto-detection works and returns valid NCD""" + meta_auto_raw = sigmf.fromfile(file_path) + # auto-detection should return SigMFFile, not SigMFCollection + self.assertIsInstance(meta_auto_raw, sigmf.SigMFFile) + meta_auto = cast(sigmf.SigMFFile, meta_auto_raw) + # data_file might be Path or str, so convert both for comparison + self.assertEqual(str(meta_auto.data_file), str(file_path)) + self.assertIn("core:trailing_bytes", meta_auto._metadata["global"]) + return meta_auto + + def test_wav_to_sigmf_pair(self): + """test standard wav to sigmf conversion with file pairs""" + sigmf_path = self.tmp_path / "bar.tmp" + meta = wav_to_sigmf(wav_path=str(self.wav_path), out_path=str(sigmf_path)) + data = meta.read_samples() + # allow numerical differences due to PCM quantization + self.assertTrue(np.allclose(self.audio_data, data, atol=1e-4)) + self.assertGreater(len(data), 0, "Should read some samples") + filenames = sigmf.sigmffile.get_sigmf_filenames(sigmf_path) + self.assertTrue(filenames["data_fn"].exists(), "dataset path missing") + self.assertTrue(filenames["meta_fn"].exists(), "metadata path missing") + + def test_wav_to_sigmf_archive(self): + """test wav to sigmf conversion with archive output""" + sigmf_path = self.tmp_path / "baz.ext" + wav_to_sigmf(wav_path=str(self.wav_path), out_path=str(sigmf_path), create_archive=True) + filenames = sigmf.sigmffile.get_sigmf_filenames(sigmf_path) + self.assertTrue(filenames["archive_fn"].exists(), "archive path missing") + + def test_wav_to_sigmf_ncd(self): + """test wav to sigmf conversion as Non-Conforming Dataset""" + meta = wav_to_sigmf(wav_path=str(self.wav_path), create_ncd=True) + + # validate basic NCD structure + captures = self._validate_ncd_structure(meta, self.wav_path) + self.assertEqual(len(captures), 1, "Should have exactly one capture") + + # validate DATASET_KEY is set for NCD + self._validate_dataset_key(meta, self.wav_path.name) + + # header_bytes should be non-zero for WAV files + header_bytes = captures[0]["core:header_bytes"] + self.assertGreater(header_bytes, 0, "WAV files should have non-zero header bytes") + + # verify data can still be read correctly from NCD + data = meta.read_samples() + self.assertTrue(np.allclose(self.audio_data, data, atol=1e-4)) + self.assertGreater(len(data), 0, "Should read some samples") + + # verify this is metadata-only (no separate data file created) + self.assertIsInstance(meta.data_buffer, type(meta.data_buffer)) + + def test_wav_auto_detection(self): + """test automatic WAV detection through fromfile()""" + # validate auto-detection works + meta_raw = self._validate_auto_detection(self.wav_path) + meta = cast(sigmf.SigMFFile, meta_raw) + + # validate DATASET_KEY is set for auto-detected NCD + self._validate_dataset_key(meta, self.wav_path.name) + + # verify data can be read correctly + data = meta.read_samples() + self.assertTrue(np.allclose(self.audio_data, data, atol=1e-4)) + self.assertGreater(len(data), 0, "Should read some samples") + + +class TestWAVConverterWithRealFiles(unittest.TestCase): + """Test WAV converter with real example files if available""" + + def setUp(self) -> None: + """setup paths to example wav files""" + self.wav_dir = None + if NONSIGMF_REPO: + wav_path = NONSIGMF_REPO / "wav" + if wav_path.exists(): + self.wav_dir = wav_path + self.wav_files = list(wav_path.glob("*.wav")) + + def _validate_ncd_structure(self, meta, expected_file): + """validate basic NCD structure""" + self.assertEqual(meta.data_file, expected_file, "NCD should point to original file") + self.assertIn("core:trailing_bytes", meta._metadata["global"]) + captures = meta.get_captures() + self.assertGreater(len(captures), 0, "Should have at least one capture") + self.assertIn("core:header_bytes", captures[0]) + + # validate SigMF spec compliance: NCDs must not have metadata_only field + global_meta = meta._metadata["global"] + has_dataset = "core:dataset" in global_meta + has_metadata_only = "core:metadata_only" in global_meta + + self.assertTrue(has_dataset, "NCD should have core:dataset field") + self.assertFalse(has_metadata_only, "NCD should NOT have core:metadata_only field (spec violation)") + + return captures + + def _validate_dataset_key(self, meta, expected_filename): + """validate DATASET_KEY is correctly set""" + dataset_filename = meta.get_global_field("core:dataset") + self.assertEqual(dataset_filename, expected_filename, "DATASET_KEY should contain filename") + + def _validate_auto_detection(self, file_path): + """validate auto-detection works and returns valid NCD""" + meta_auto_raw = sigmf.fromfile(file_path) + # auto-detection should return SigMFFile, not SigMFCollection + self.assertIsInstance(meta_auto_raw, sigmf.SigMFFile) + meta_auto = cast(sigmf.SigMFFile, meta_auto_raw) + # data_file might be Path or str, so convert both for comparison + self.assertEqual(str(meta_auto.data_file), str(file_path)) + self.assertIn("core:trailing_bytes", meta_auto._metadata["global"]) + return meta_auto + + def test_real_wav_files_ncd(self): + """test NCD conversion with real example wav files""" + if not self.wav_dir or not hasattr(self, "wav_files"): + self.skipTest(f"Set {NONSIGMF_ENV} environment variable to path with wav/ directory to run test.") + + if not self.wav_files: + self.skipTest(f"No .wav files found in {self.wav_dir}") + + for wav_file in self.wav_files: + meta = wav_to_sigmf(wav_path=str(wav_file), create_ncd=True) + + # validate basic NCD structure + self._validate_ncd_structure(meta, wav_file) + + # validate auto-detection also works + meta_auto = self._validate_auto_detection(wav_file) + self._validate_dataset_key(meta_auto, wav_file.name) From 30769edc06d01ac51c0ecc7dd8a6237337c98b31 Mon Sep 17 00:00:00 2001 From: Teque5 Date: Fri, 2 Jan 2026 10:53:48 -0800 Subject: [PATCH 03/13] refactor tests; failing test should be fixed after feature/unify-sample-access merged --- sigmf/convert/blue.py | 4 +- tests/test_convert_blue.py | 114 +++++++------------- tests/test_convert_wav.py | 211 +++++++++++++++---------------------- 3 files changed, 127 insertions(+), 202 deletions(-) diff --git a/sigmf/convert/blue.py b/sigmf/convert/blue.py index e9a0633..9e5943c 100644 --- a/sigmf/convert/blue.py +++ b/sigmf/convert/blue.py @@ -402,7 +402,7 @@ def _build_common_metadata( is_ncd : bool, optional If True, adds NCD-specific fields. blue_file_name : str, optional - Original BLUE file name (required for NCD). + Original BLUE file name (for NCD). trailing_bytes : int, optional Number of trailing bytes (for NCD). @@ -450,8 +450,8 @@ def get_tag(tag): # add NCD-specific fields if is_ncd: - global_info[SigMFFile.TRAILING_BYTES_KEY] = trailing_bytes global_info[SigMFFile.DATASET_KEY] = blue_file_name + global_info[SigMFFile.TRAILING_BYTES_KEY] = trailing_bytes # merge HCB values into metadata global_info["blue:fixed"] = h_fixed diff --git a/tests/test_convert_blue.py b/tests/test_convert_blue.py index 0f1ea58..b89f87d 100644 --- a/tests/test_convert_blue.py +++ b/tests/test_convert_blue.py @@ -9,15 +9,17 @@ import tempfile import unittest from pathlib import Path -from typing import cast + +import numpy as np import sigmf from sigmf.convert.blue import blue_to_sigmf +from .test_convert_wav import _validate_ncd from .testdata import NONSIGMF_ENV, NONSIGMF_REPO -class TestBlueConverter(unittest.TestCase): +class TestBlueWithNonSigMFRepo(unittest.TestCase): """BLUE converter tests using external files""" def setUp(self) -> None: @@ -28,93 +30,57 @@ def setUp(self) -> None: # skip test if environment variable not set self.skipTest(f"Set {NONSIGMF_ENV} environment variable to path with BLUE files to run test.") - # look for blue files in blue/ directory + # glob all files in blue/ directory blue_dir = NONSIGMF_REPO / "blue" - self.bluefiles = [] + self.blue_paths = [] if blue_dir.exists(): for ext in ["*.cdif", "*.tmp"]: - self.bluefiles.extend(blue_dir.glob(f"**/{ext}")) - - if not self.bluefiles: + self.blue_paths.extend(blue_dir.glob(f"**/{ext}")) + if not self.blue_paths: self.fail(f"No BLUE files (*.cdif, *.tmp) found in {blue_dir}.") def tearDown(self) -> None: """clean up temporary directory""" self.tmp_dir.cleanup() - def _validate_ncd_structure(self, meta, expected_file): - """validate basic NCD structure""" - self.assertEqual(meta.data_file, expected_file, "NCD should point to original file") - self.assertIn("core:trailing_bytes", meta._metadata["global"]) - captures = meta.get_captures() - self.assertGreater(len(captures), 0, "Should have at least one capture") - self.assertIn("core:header_bytes", captures[0]) - - # validate SigMF spec compliance: NCDs must not have metadata_only field - global_meta = meta._metadata["global"] - has_dataset = "core:dataset" in global_meta - has_metadata_only = "core:metadata_only" in global_meta - - self.assertTrue(has_dataset, "NCD should have core:dataset field") - self.assertFalse(has_metadata_only, "NCD should NOT have core:metadata_only field (spec violation)") - - return captures - - def _validate_auto_detection(self, file_path): - """validate auto-detection works and returns valid NCD""" - meta_auto_raw = sigmf.fromfile(file_path) - # auto-detection should return SigMFFile, not SigMFCollection - self.assertIsInstance(meta_auto_raw, sigmf.SigMFFile) - meta_auto = cast(sigmf.SigMFFile, meta_auto_raw) - # data_file might be Path or str, so convert both for comparison - self.assertEqual(str(meta_auto.data_file), str(file_path)) - self.assertIn("core:trailing_bytes", meta_auto._metadata["global"]) - return meta_auto - - def test_blue_to_sigmf_pair(self): + def test_sigmf_pair(self): """test standard blue to sigmf conversion with file pairs""" - for bluefile in self.bluefiles: - sigmf_path = self.tmp_path / bluefile.stem - meta = blue_to_sigmf(blue_path=bluefile, out_path=sigmf_path) - if not meta.get_global_field("core:metadata_only"): - meta.read_samples(count=10) + for blue_path in self.blue_paths: + sigmf_path = self.tmp_path / blue_path.stem + meta = blue_to_sigmf(blue_path=blue_path, out_path=sigmf_path) self.assertIsInstance(meta, sigmf.SigMFFile) + # FIXME: REPLACE BELOW WITH BELOW COMMENTED AFTER PR #121 MERGED + if not meta.get_global_field("core:metadata_only"): + _ = meta.read_samples(count=10) + # check sample read consistency + # np.testing.assert_array_equal(meta.read_samples(count=10), meta[0:10]) - def test_blue_to_sigmf_archive(self): + def test_sigmf_archive(self): """test blue to sigmf conversion with archive output""" - for bluefile in self.bluefiles: - sigmf_path = self.tmp_path / f"{bluefile.stem}_archive" - meta = blue_to_sigmf(blue_path=bluefile, out_path=str(sigmf_path), create_archive=True) - self.assertIsInstance(meta, sigmf.SigMFFile) - - def test_blue_to_sigmf_ncd(self): - """test blue to sigmf conversion as Non-Conforming Dataset""" - for bluefile in self.bluefiles: - meta = blue_to_sigmf(blue_path=str(bluefile), create_ncd=True) - - # validate basic NCD structure - self._validate_ncd_structure(meta, bluefile) - - # verify this is metadata-only (no separate data file created) - self.assertIsInstance(meta.data_buffer, type(meta.data_buffer)) + for blue_path in self.blue_paths: + sigmf_path = self.tmp_path / f"{blue_path.stem}_archive" + meta = blue_to_sigmf(blue_path=blue_path, out_path=sigmf_path, create_archive=True) + # now read newly created archive + arc_meta = sigmf.fromfile(sigmf_path) + self.assertIsInstance(arc_meta, sigmf.SigMFFile) + # FIXME: REPLACE BELOW WITH BELOW COMMENTED AFTER PR #121 MERGED + if not arc_meta.get_global_field("core:metadata_only"): + _ = arc_meta.read_samples(count=10) + # check sample read consistency + # np.testing.assert_array_equal(meta.read_samples(count=10), meta[0:10]) + + def test_create_ncd(self): + """test direct NCD conversion""" + for blue_path in self.blue_paths: + meta = blue_to_sigmf(blue_path=blue_path) + _validate_ncd(self, meta, blue_path) # test that data can be read if not metadata-only if not meta.get_global_field("core:metadata_only"): _ = meta.read_samples(count=10) - def test_blue_auto_detection(self): - """test automatic BLUE detection through fromfile()""" - for bluefile in self.bluefiles: - # validate auto-detection works - self._validate_auto_detection(bluefile) - - def test_blue_directory_files_ncd(self): - """test NCD conversion""" - for blue_file in self.bluefiles: - meta = blue_to_sigmf(blue_path=str(blue_file), create_ncd=True) - - # validate basic NCD structure - self._validate_ncd_structure(meta, blue_file) - - # validate auto-detection also works - self._validate_auto_detection(blue_file) + def test_autodetect_ncd(self): + """test automatic NCD conversion""" + for blue_path in self.blue_paths: + meta = sigmf.fromfile(blue_path) + _validate_ncd(self, meta, blue_path) diff --git a/tests/test_convert_wav.py b/tests/test_convert_wav.py index c4e5670..12e3aba 100644 --- a/tests/test_convert_wav.py +++ b/tests/test_convert_wav.py @@ -10,7 +10,6 @@ import unittest import wave from pathlib import Path -from typing import cast import numpy as np @@ -20,11 +19,28 @@ from .testdata import NONSIGMF_ENV, NONSIGMF_REPO +def _validate_ncd(test, meta, target_path): + """non-conforming dataset has a specific structure""" + test.assertEqual(str(meta.data_file), str(target_path), "Auto-detected NCD should point to original file") + test.assertIsInstance(meta, sigmf.SigMFFile) + + global_info = meta.get_global_info() + capture_info = meta.get_captures() + + # validate NCD SigMF spec compliance + test.assertGreater(len(capture_info), 0, "Should have at least one capture") + test.assertIn("core:header_bytes", capture_info[0]) + test.assertGreater(capture_info[0]["core:header_bytes"], 0, "Should have non-zero core:header_bytes field") + test.assertIn("core:trailing_bytes", global_info, "Should have core:trailing_bytes field.") + test.assertIn("core:dataset", global_info, "Should have core:dataset field.") + test.assertNotIn("core:metadata_only", global_info, "Should NOT have core:metadata_only field.") + + class TestWAVConverter(unittest.TestCase): - """wav converter tests""" + """Create a realistic WAV file and test conversion methods.""" def setUp(self) -> None: - """temp wav file for testing""" + """temp WAV file with tone for testing""" self.tmp_dir = tempfile.TemporaryDirectory() self.tmp_path = Path(self.tmp_dir.name) self.wav_path = self.tmp_path / "foo.wav" @@ -47,49 +63,14 @@ def tearDown(self) -> None: """clean up temporary directory""" self.tmp_dir.cleanup() - def _validate_ncd_structure(self, meta, expected_file): - """validate basic NCD structure""" - self.assertEqual(meta.data_file, expected_file, "NCD should point to original file") - self.assertIn("core:trailing_bytes", meta._metadata["global"]) - captures = meta.get_captures() - self.assertGreater(len(captures), 0, "Should have at least one capture") - self.assertIn("core:header_bytes", captures[0]) - - # validate SigMF spec compliance: NCDs must not have metadata_only field - global_meta = meta._metadata["global"] - has_dataset = "core:dataset" in global_meta - has_metadata_only = "core:metadata_only" in global_meta - - self.assertTrue(has_dataset, "NCD should have core:dataset field") - self.assertFalse(has_metadata_only, "NCD should NOT have core:metadata_only field (spec violation)") - - return captures - - def _validate_dataset_key(self, meta, expected_filename): - """validate DATASET_KEY is correctly set""" - dataset_filename = meta.get_global_field("core:dataset") - self.assertEqual(dataset_filename, expected_filename, "DATASET_KEY should contain filename") - self.assertIsInstance(dataset_filename, str, "DATASET_KEY should be a string") - - def _validate_auto_detection(self, file_path): - """validate auto-detection works and returns valid NCD""" - meta_auto_raw = sigmf.fromfile(file_path) - # auto-detection should return SigMFFile, not SigMFCollection - self.assertIsInstance(meta_auto_raw, sigmf.SigMFFile) - meta_auto = cast(sigmf.SigMFFile, meta_auto_raw) - # data_file might be Path or str, so convert both for comparison - self.assertEqual(str(meta_auto.data_file), str(file_path)) - self.assertIn("core:trailing_bytes", meta_auto._metadata["global"]) - return meta_auto - def test_wav_to_sigmf_pair(self): """test standard wav to sigmf conversion with file pairs""" sigmf_path = self.tmp_path / "bar.tmp" meta = wav_to_sigmf(wav_path=str(self.wav_path), out_path=str(sigmf_path)) data = meta.read_samples() + self.assertGreater(len(data), 0, "Should read some samples") # allow numerical differences due to PCM quantization self.assertTrue(np.allclose(self.audio_data, data, atol=1e-4)) - self.assertGreater(len(data), 0, "Should read some samples") filenames = sigmf.sigmffile.get_sigmf_filenames(sigmf_path) self.assertTrue(filenames["data_fn"].exists(), "dataset path missing") self.assertTrue(filenames["meta_fn"].exists(), "metadata path missing") @@ -104,101 +85,79 @@ def test_wav_to_sigmf_archive(self): def test_wav_to_sigmf_ncd(self): """test wav to sigmf conversion as Non-Conforming Dataset""" meta = wav_to_sigmf(wav_path=str(self.wav_path), create_ncd=True) - - # validate basic NCD structure - captures = self._validate_ncd_structure(meta, self.wav_path) - self.assertEqual(len(captures), 1, "Should have exactly one capture") - - # validate DATASET_KEY is set for NCD - self._validate_dataset_key(meta, self.wav_path.name) - - # header_bytes should be non-zero for WAV files - header_bytes = captures[0]["core:header_bytes"] - self.assertGreater(header_bytes, 0, "WAV files should have non-zero header bytes") + _validate_ncd(self, meta, self.wav_path) # verify data can still be read correctly from NCD data = meta.read_samples() - self.assertTrue(np.allclose(self.audio_data, data, atol=1e-4)) self.assertGreater(len(data), 0, "Should read some samples") - - # verify this is metadata-only (no separate data file created) - self.assertIsInstance(meta.data_buffer, type(meta.data_buffer)) - - def test_wav_auto_detection(self): - """test automatic WAV detection through fromfile()""" - # validate auto-detection works - meta_raw = self._validate_auto_detection(self.wav_path) - meta = cast(sigmf.SigMFFile, meta_raw) - - # validate DATASET_KEY is set for auto-detected NCD - self._validate_dataset_key(meta, self.wav_path.name) - - # verify data can be read correctly - data = meta.read_samples() + # allow numerical differences due to PCM quantization self.assertTrue(np.allclose(self.audio_data, data, atol=1e-4)) - self.assertGreater(len(data), 0, "Should read some samples") -class TestWAVConverterWithRealFiles(unittest.TestCase): +class TestWAVWithNonSigMFRepo(unittest.TestCase): """Test WAV converter with real example files if available""" def setUp(self) -> None: """setup paths to example wav files""" - self.wav_dir = None - if NONSIGMF_REPO: - wav_path = NONSIGMF_REPO / "wav" - if wav_path.exists(): - self.wav_dir = wav_path - self.wav_files = list(wav_path.glob("*.wav")) - - def _validate_ncd_structure(self, meta, expected_file): - """validate basic NCD structure""" - self.assertEqual(meta.data_file, expected_file, "NCD should point to original file") - self.assertIn("core:trailing_bytes", meta._metadata["global"]) - captures = meta.get_captures() - self.assertGreater(len(captures), 0, "Should have at least one capture") - self.assertIn("core:header_bytes", captures[0]) - - # validate SigMF spec compliance: NCDs must not have metadata_only field - global_meta = meta._metadata["global"] - has_dataset = "core:dataset" in global_meta - has_metadata_only = "core:metadata_only" in global_meta - - self.assertTrue(has_dataset, "NCD should have core:dataset field") - self.assertFalse(has_metadata_only, "NCD should NOT have core:metadata_only field (spec violation)") - - return captures - - def _validate_dataset_key(self, meta, expected_filename): - """validate DATASET_KEY is correctly set""" - dataset_filename = meta.get_global_field("core:dataset") - self.assertEqual(dataset_filename, expected_filename, "DATASET_KEY should contain filename") - - def _validate_auto_detection(self, file_path): - """validate auto-detection works and returns valid NCD""" - meta_auto_raw = sigmf.fromfile(file_path) - # auto-detection should return SigMFFile, not SigMFCollection - self.assertIsInstance(meta_auto_raw, sigmf.SigMFFile) - meta_auto = cast(sigmf.SigMFFile, meta_auto_raw) - # data_file might be Path or str, so convert both for comparison - self.assertEqual(str(meta_auto.data_file), str(file_path)) - self.assertIn("core:trailing_bytes", meta_auto._metadata["global"]) - return meta_auto - - def test_real_wav_files_ncd(self): - """test NCD conversion with real example wav files""" - if not self.wav_dir or not hasattr(self, "wav_files"): - self.skipTest(f"Set {NONSIGMF_ENV} environment variable to path with wav/ directory to run test.") - - if not self.wav_files: - self.skipTest(f"No .wav files found in {self.wav_dir}") - - for wav_file in self.wav_files: - meta = wav_to_sigmf(wav_path=str(wav_file), create_ncd=True) - - # validate basic NCD structure - self._validate_ncd_structure(meta, wav_file) - - # validate auto-detection also works - meta_auto = self._validate_auto_detection(wav_file) - self._validate_dataset_key(meta_auto, wav_file.name) + self.tmp_dir = tempfile.TemporaryDirectory() + self.tmp_path = Path(self.tmp_dir.name) + if not NONSIGMF_REPO: + # skip test if environment variable not set + self.skipTest(f"Set {NONSIGMF_ENV} environment variable to path with WAV files to run test.") + + # glob all files in wav/ directory + wav_dir = NONSIGMF_REPO / "wav" + self.wav_paths = [] + if wav_dir.exists(): + self.wav_paths = list(wav_dir.glob("*.wav")) + if not self.wav_paths: + self.fail(f"No WAV files (*.wav) found in {wav_dir}.") + + def tearDown(self) -> None: + """clean up temporary directory""" + self.tmp_dir.cleanup() + + def test_sigmf_pair(self): + """test standard wav to sigmf conversion with file pairs""" + for wav_path in self.wav_paths: + sigmf_path = self.tmp_path / wav_path.stem + meta = wav_to_sigmf(wav_path=wav_path, out_path=sigmf_path) + self.assertIsInstance(meta, sigmf.SigMFFile) + # FIXME: REPLACE BELOW WITH BELOW COMMENTED AFTER PR #121 MERGED + if not meta.get_global_field("core:metadata_only"): + _ = meta.read_samples(count=10) + # check sample read consistency + # np.testing.assert_array_equal(meta.read_samples(count=10), meta[0:10]) + + def test_sigmf_archive(self): + """test wav to sigmf conversion with archive output""" + for wav_path in self.wav_paths: + sigmf_path = self.tmp_path / f"{wav_path.stem}_archive" + meta = wav_to_sigmf(wav_path=wav_path, out_path=sigmf_path, create_archive=True) + # now read newly created archive + arc_meta = sigmf.fromfile(sigmf_path) + # FIXME: I believe this error related to sample_count being 0 is fixed by PR 121 + print("dbug", arc_meta) + print("dbug len", len(arc_meta)) + print("dbug sample_count", arc_meta.sample_count) + self.assertIsInstance(arc_meta, sigmf.SigMFFile) + # FIXME: REPLACE BELOW WITH BELOW COMMENTED AFTER PR #121 MERGED + if not arc_meta.get_global_field("core:metadata_only"): + _ = arc_meta.read_samples(count=10) + # check sample read consistency + # np.testing.assert_array_equal(meta.read_samples(count=10), meta[0:10]) + + def test_create_ncd(self): + """test direct NCD conversion""" + for wav_path in self.wav_paths: + meta = wav_to_sigmf(wav_path=wav_path) + _validate_ncd(self, meta, wav_path) + + # test file read + _ = meta.read_samples(count=10) + + def test_autodetect_ncd(self): + """test automatic NCD conversion""" + for wav_path in self.wav_paths: + meta = sigmf.fromfile(wav_path) + _validate_ncd(self, meta, wav_path) From 7db46e7f5961397c178ec85130409d9454480393 Mon Sep 17 00:00:00 2001 From: Teque5 Date: Fri, 2 Jan 2026 11:12:06 -0800 Subject: [PATCH 04/13] better version parsing --- README.md | 14 +++++++------- sigmf/convert/blue.py | 15 ++++++++++++--- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index af79c02..2cccdd9 100644 --- a/README.md +++ b/README.md @@ -12,13 +12,13 @@ freely under the terms GNU Lesser GPL v3 License. This module follows the SigMF specification [html](https://sigmf.org/)/[pdf](https://sigmf.github.io/SigMF/sigmf-spec.pdf) from the [spec repository](https://github.com/sigmf/SigMF). -### Install Latest +### Install ```bash pip install sigmf ``` -### Read SigMF +### Reading SigMF ```python import sigmf @@ -27,11 +27,11 @@ import sigmf meta = sigmf.fromfile("recording.sigmf-meta") samples = meta[0:1024] # get first 1024 samples -# fromfile() also supports BLUE and WAV files via auto-detection -meta = sigmf.fromfile("recording.cdif") # BLUE file -meta = sigmf.fromfile("recording.wav") # WAV file +# read other formats containing RF time series as SigMF +meta = sigmf.fromfile("recording.wav") # WAV +meta = sigmf.fromfile("recording.cdif") # BLUE / Platinum ``` -### Full API & Docs +### Docs -**[Please visit our documentation for more info.](https://sigmf.readthedocs.io/en/latest/)** +**[Please visit our documentation for full API reference and more info.](https://sigmf.readthedocs.io/en/latest/)** diff --git a/sigmf/convert/blue.py b/sigmf/convert/blue.py index 9e5943c..18e0788 100644 --- a/sigmf/convert/blue.py +++ b/sigmf/convert/blue.py @@ -21,6 +21,7 @@ from datetime import datetime, timezone from pathlib import Path from typing import Optional, Tuple +from packaging.version import Version, InvalidVersion import numpy as np @@ -223,9 +224,17 @@ def read_hcb(file_path): raw_adjunct = handle.read(256) h_adjunct = {"raw_base64": base64.b64encode(raw_adjunct).decode("ascii")} - # FIXME: I've seen VER=2.0.14 - ver_lut = {"1.0": "BLUE 1.0", "1.1": "BLUE 1.1", "2.0": "Platinum"} - spec_str = ver_lut.get(h_keywords.get("VER", "1.0"), "Unknown") + try: + spec_str = "Unknown" + version = Version(h_keywords.get("VER", "1.0")) + if version.major == 1: + spec_str = f"BLUE {version}" + elif version.major == 2: + spec_str = f"Platinum {version}" + except InvalidVersion: + log.debug("Could not parse BLUE specification from VER keyword.") + pass + # h_fixed will contain number e.g. 1000, 1001, 2000, 2001 log.info(f"Read {h_fixed['version']} type {h_fixed['type']} using {spec_str} specification.") validate_fixed(h_fixed) From 63d4ceef281b6bf3a1d08147c4e9866c94d4c3e3 Mon Sep 17 00:00:00 2001 From: Teque5 Date: Fri, 2 Jan 2026 11:42:49 -0800 Subject: [PATCH 05/13] conversion to archive should return metadta for archive --- README.md | 2 +- sigmf/convert/blue.py | 7 ++++-- sigmf/convert/wav.py | 49 ++++++++++++++------------------------ tests/test_convert_blue.py | 8 +++---- tests/test_convert_wav.py | 14 +++++------ 5 files changed, 33 insertions(+), 47 deletions(-) diff --git a/README.md b/README.md index 2cccdd9..dea36bb 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ This module follows the SigMF specification [html](https://sigmf.org/)/[pdf](htt pip install sigmf ``` -### Reading SigMF +### Read SigMF ```python import sigmf diff --git a/sigmf/convert/blue.py b/sigmf/convert/blue.py index 18e0788..5fc2f17 100644 --- a/sigmf/convert/blue.py +++ b/sigmf/convert/blue.py @@ -21,12 +21,13 @@ from datetime import datetime, timezone from pathlib import Path from typing import Optional, Tuple -from packaging.version import Version, InvalidVersion import numpy as np +from packaging.version import InvalidVersion, Version from .. import SigMFFile from .. import __version__ as toolversion +from .. import fromfile from ..error import SigMFConversionError from ..sigmffile import get_sigmf_filenames from ..utils import SIGMF_DATETIME_ISO8601_FMT @@ -658,6 +659,8 @@ def construct_sigmf( if create_archive: meta.tofile(filenames["archive_fn"], toarchive=True) log.info("wrote %s", filenames["archive_fn"]) + # metadata returned should be for this archive + meta = fromfile(filenames["archive_fn"]) else: meta.tofile(filenames["meta_fn"], toarchive=False) log.info("wrote %s", filenames["meta_fn"]) @@ -730,7 +733,7 @@ def blue_to_sigmf( create_ncd: bool = False, ) -> SigMFFile: """ - Read a MIDAS Bluefile, write to SigMF, return SigMFFile object. + Read a MIDAS Bluefile, optionally write SigMF, return associated SigMF object. Parameters ---------- diff --git a/sigmf/convert/wav.py b/sigmf/convert/wav.py index e877d16..b599693 100644 --- a/sigmf/convert/wav.py +++ b/sigmf/convert/wav.py @@ -19,6 +19,7 @@ from .. import SigMFFile from .. import __version__ as toolversion +from .. import fromfile from ..sigmffile import get_sigmf_filenames from ..utils import SIGMF_DATETIME_ISO8601_FMT, get_data_type_str @@ -80,7 +81,7 @@ def wav_to_sigmf( create_ncd: bool = False, ) -> SigMFFile: """ - Read a wav, optionally write a sigmf, return SigMFFile object. + Read a wav, optionally write sigmf, return associated SigMF object. Parameters ---------- @@ -115,6 +116,7 @@ def wav_to_sigmf( samp_width = wav_reader.getsampwidth() samp_rate = wav_reader.getframerate() n_frames = wav_reader.getnframes() + np_dtype = f"int{samp_width * 8}" # for NCD support, calculate precise byte boundaries if create_ncd: @@ -122,28 +124,21 @@ def wav_to_sigmf( log.debug(f"WAV NCD: header_bytes={header_bytes}, trailing_bytes={trailing_bytes}") # only read audio data if we're not creating NCD metadata-only - wav_data = None # initialize variable - if create_ncd and out_path is None: - # metadata-only NCD: don't read audio data - pass + wav_data = None + if create_ncd: + # for NCD metadata-only, create dummy sample to get datatype + dummy_sample = np.array([0], dtype=np_dtype) + datatype_str = get_data_type_str(dummy_sample) + # don't read any wav_data else: # normal conversion: read the audio data raw_data = wav_reader.readframes(n_frames) - - np_dtype = f"int{samp_width * 8}" - - if wav_data is None: - # for NCD metadata-only, create dummy sample to get datatype - dummy_sample = np.array([0], dtype=np_dtype) - datatype_str = get_data_type_str(dummy_sample) - else: - # normal case: process actual audio data - wav_data = ( - np.frombuffer(raw_data, dtype=np_dtype).reshape(-1, n_channels) - if n_channels > 1 - else np.frombuffer(raw_data, dtype=np_dtype) - ) - datatype_str = get_data_type_str(wav_data) + wav_data = ( + np.frombuffer(raw_data, dtype=np_dtype).reshape(-1, n_channels) + if n_channels > 1 + else np.frombuffer(raw_data, dtype=np_dtype) + ) + datatype_str = get_data_type_str(wav_data) global_info = { SigMFFile.DATATYPE_KEY: datatype_str, @@ -176,17 +171,6 @@ def wav_to_sigmf( log.debug("created NCD SigMF: %r", meta) return meta - # if we get here, we need the actual audio data to create a new data file - if wav_data is None: - # need to read the audio data now for normal file creation - with wave.open(str(wav_path), "rb") as wav_reader: - raw_data = wav_reader.readframes(n_frames) - wav_data = ( - np.frombuffer(raw_data, dtype=np_dtype).reshape(-1, n_channels) - if n_channels > 1 - else np.frombuffer(raw_data, dtype=np_dtype) - ) - if out_path is None: base_path = wav_path.with_suffix(".sigmf") else: @@ -209,6 +193,9 @@ def wav_to_sigmf( meta.tofile(filenames["archive_fn"], toarchive=True) log.info("wrote %s", filenames["archive_fn"]) + + # metadata returned should be for this archive + meta = fromfile(filenames["archive_fn"]) else: data_path = filenames["data_fn"] wav_data.tofile(data_path) diff --git a/tests/test_convert_blue.py b/tests/test_convert_blue.py index b89f87d..2cb691a 100644 --- a/tests/test_convert_blue.py +++ b/tests/test_convert_blue.py @@ -60,12 +60,10 @@ def test_sigmf_archive(self): for blue_path in self.blue_paths: sigmf_path = self.tmp_path / f"{blue_path.stem}_archive" meta = blue_to_sigmf(blue_path=blue_path, out_path=sigmf_path, create_archive=True) - # now read newly created archive - arc_meta = sigmf.fromfile(sigmf_path) - self.assertIsInstance(arc_meta, sigmf.SigMFFile) + self.assertIsInstance(meta, sigmf.SigMFFile) # FIXME: REPLACE BELOW WITH BELOW COMMENTED AFTER PR #121 MERGED - if not arc_meta.get_global_field("core:metadata_only"): - _ = arc_meta.read_samples(count=10) + if not meta.get_global_field("core:metadata_only"): + _ = meta.read_samples(count=10) # check sample read consistency # np.testing.assert_array_equal(meta.read_samples(count=10), meta[0:10]) diff --git a/tests/test_convert_wav.py b/tests/test_convert_wav.py index 12e3aba..83a5d0a 100644 --- a/tests/test_convert_wav.py +++ b/tests/test_convert_wav.py @@ -134,16 +134,14 @@ def test_sigmf_archive(self): for wav_path in self.wav_paths: sigmf_path = self.tmp_path / f"{wav_path.stem}_archive" meta = wav_to_sigmf(wav_path=wav_path, out_path=sigmf_path, create_archive=True) - # now read newly created archive - arc_meta = sigmf.fromfile(sigmf_path) # FIXME: I believe this error related to sample_count being 0 is fixed by PR 121 - print("dbug", arc_meta) - print("dbug len", len(arc_meta)) - print("dbug sample_count", arc_meta.sample_count) - self.assertIsInstance(arc_meta, sigmf.SigMFFile) + print("dbug", meta) + print("dbug len", len(meta)) + print("dbug sample_count", meta.sample_count) + self.assertIsInstance(meta, sigmf.SigMFFile) # FIXME: REPLACE BELOW WITH BELOW COMMENTED AFTER PR #121 MERGED - if not arc_meta.get_global_field("core:metadata_only"): - _ = arc_meta.read_samples(count=10) + if not meta.get_global_field("core:metadata_only"): + _ = meta.read_samples(count=10) # check sample read consistency # np.testing.assert_array_equal(meta.read_samples(count=10), meta[0:10]) From d9d3af1a509bac4a00d97e7d184b60e23cb491ac Mon Sep 17 00:00:00 2001 From: Teque5 Date: Wed, 7 Jan 2026 11:45:49 -0800 Subject: [PATCH 06/13] increment minor version --- sigmf/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sigmf/__init__.py b/sigmf/__init__.py index 4bbd62b..9050252 100644 --- a/sigmf/__init__.py +++ b/sigmf/__init__.py @@ -5,7 +5,7 @@ # SPDX-License-Identifier: LGPL-3.0-or-later # version of this python module -__version__ = "1.5.0" +__version__ = "1.6.0" # matching version of the SigMF specification __specification__ = "1.2.6" From 3263c2ec72a826e62b2cf5a3f4407e7aae089e74 Mon Sep 17 00:00:00 2001 From: Teque5 Date: Wed, 7 Jan 2026 16:37:14 -0800 Subject: [PATCH 07/13] simplify hashing implementation & add related tests --- docs/source/api.rst | 2 +- docs/source/index.rst | 3 +- sigmf/hashing.py | 62 +++++++++++++++++++++++++++++++++++ sigmf/sigmf_hash.py | 36 --------------------- sigmf/sigmffile.py | 19 ++++------- tests/test_hashing.py | 75 +++++++++++++++++++++++++++++++++++++++++++ tests/testdata.py | 1 - 7 files changed, 146 insertions(+), 52 deletions(-) create mode 100644 sigmf/hashing.py delete mode 100644 sigmf/sigmf_hash.py create mode 100644 tests/test_hashing.py diff --git a/docs/source/api.rst b/docs/source/api.rst index 62d166a..eac37d5 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -13,7 +13,7 @@ SigMF API sigmf.convert.wav sigmf.error sigmf.schema - sigmf.sigmf_hash + sigmf.hash sigmf.sigmffile sigmf.utils sigmf.validate diff --git a/docs/source/index.rst b/docs/source/index.rst index 9d4a6ab..d541a70 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -8,11 +8,12 @@ It offers a *simple* and *intuitive* API for Python developers. .. Note: The toolversion & specversion below are replaced dynamically during build. + The root __init__.py file is used as the sole source of truth for these values. This documentation is for version |toolversion| of the library, which is compatible with version |specversion| of the SigMF specification. -To get started, see the :doc:`quickstart` section or learn how to :ref:`install` the library. +To get started, see `quickstart`. ----- diff --git a/sigmf/hashing.py b/sigmf/hashing.py new file mode 100644 index 0000000..3874729 --- /dev/null +++ b/sigmf/hashing.py @@ -0,0 +1,62 @@ +# Copyright: Multiple Authors +# +# This file is part of sigmf-python. https://github.com/sigmf/sigmf-python +# +# SPDX-License-Identifier: LGPL-3.0-or-later + +"""Hashing Functions""" + +import hashlib +from pathlib import Path + + +def calculate_sha512(filename=None, fileobj=None): + """ + Calculate SHA512 hash of a dataset for integrity verification. + + The entire recording file should be hashed according to the SigMF specification. + + Parameters + ---------- + filename : str or Path, optional + Path to the file to hash. If provided, the file will be opened and hashed. + Cannot be used together with fileobj. + fileobj : file-like object, optional + An open file-like object (e.g., BytesIO) to hash. Must have read() and + seek() methods. Cannot be used together with filename. + + Returns + ------- + str + 128 character hex digest (512 bits). + + Raises + ------ + ValueError + If neither filename nor fileobj is provided. + """ + the_hash = hashlib.sha512() + bytes_read = 0 + + if filename is not None: + fileobj = open(filename, "rb") + bytes_to_hash = Path(filename).stat().st_size + elif fileobj is not None: + current_pos = fileobj.tell() + # seek to end + fileobj.seek(0, 2) + bytes_to_hash = fileobj.tell() + # reset to original position + fileobj.seek(current_pos) + else: + raise ValueError("Either filename or fileobj must be provided") + + while bytes_read < bytes_to_hash: + buff = fileobj.read(min(4096, (bytes_to_hash - bytes_read))) + the_hash.update(buff) + bytes_read += len(buff) + + if filename is not None: + fileobj.close() + + return the_hash.hexdigest() diff --git a/sigmf/sigmf_hash.py b/sigmf/sigmf_hash.py deleted file mode 100644 index 9482c35..0000000 --- a/sigmf/sigmf_hash.py +++ /dev/null @@ -1,36 +0,0 @@ -# Copyright: Multiple Authors -# -# This file is part of sigmf-python. https://github.com/sigmf/sigmf-python -# -# SPDX-License-Identifier: LGPL-3.0-or-later - -"""Hashing Functions""" - -import hashlib -from pathlib import Path - - -def calculate_sha512(filename=None, fileobj=None, offset=None, size=None): - """ - Return sha512 of file or fileobj. - """ - the_hash = hashlib.sha512() - bytes_to_hash = size - bytes_read = 0 - - if filename is not None: - fileobj = open(filename, "rb") - if size is None: - bytes_to_hash = Path(filename).stat().st_size - else: - fileobj.seek(offset) - - while bytes_read < bytes_to_hash: - buff = fileobj.read(min(4096, (bytes_to_hash - bytes_read))) - the_hash.update(buff) - bytes_read += len(buff) - - if filename is not None: - fileobj.close() - - return the_hash.hexdigest() diff --git a/sigmf/sigmffile.py b/sigmf/sigmffile.py index 93ceaea..861faed 100644 --- a/sigmf/sigmffile.py +++ b/sigmf/sigmffile.py @@ -15,7 +15,7 @@ import numpy as np -from . import __specification__, __version__, schema, sigmf_hash, validate +from . import __specification__, __version__, hash, schema, validate from .archive import ( SIGMF_ARCHIVE_EXT, SIGMF_COLLECTION_EXT, @@ -624,6 +624,7 @@ def _count_samples(self): If there is no data file but there are annotations, use the sample_count from the annotation with the highest end index. If there are no annotations, use 0. + For complex data, a 'sample' includes both the real and imaginary part. """ if self.data_file is None and self.data_buffer is None: @@ -675,17 +676,9 @@ def calculate_hash(self): """ old_hash = self.get_global_field(self.HASH_KEY) if self.data_file is not None: - new_hash = sigmf_hash.calculate_sha512( - filename=self.data_file, - offset=self.data_offset, - size=self.data_size_bytes, - ) + new_hash = hash.calculate_sha512(filename=self.data_file) else: - new_hash = sigmf_hash.calculate_sha512( - fileobj=self.data_buffer, - offset=self.data_offset, - size=self.data_size_bytes, - ) + new_hash = hash.calculate_sha512(fileobj=self.data_buffer) if old_hash is not None: if old_hash != new_hash: raise SigMFFileError("Calculated file hash does not match associated metadata.") @@ -986,7 +979,7 @@ def verify_stream_hashes(self) -> None: metafile_name = get_sigmf_filenames(stream.get("name"))["meta_fn"] metafile_path = self.base_path / metafile_name if Path.is_file(metafile_path): - new_hash = sigmf_hash.calculate_sha512(filename=metafile_path) + new_hash = hash.calculate_sha512(filename=metafile_path) if old_hash != new_hash: raise SigMFFileError( f"Calculated file hash for {metafile_path} does not match collection metadata." @@ -1004,7 +997,7 @@ def set_streams(self, metafiles) -> None: stream = { # name must be string here to be serializable later "name": str(get_sigmf_filenames(metafile)["base_fn"]), - "hash": sigmf_hash.calculate_sha512(filename=metafile_path), + "hash": hash.calculate_sha512(filename=metafile_path), } streams.append(stream) else: diff --git a/tests/test_hashing.py b/tests/test_hashing.py new file mode 100644 index 0000000..c0c225b --- /dev/null +++ b/tests/test_hashing.py @@ -0,0 +1,75 @@ +# Copyright: Multiple Authors +# +# This file is part of sigmf-python. https://github.com/sigmf/sigmf-python +# +# SPDX-License-Identifier: LGPL-3.0-or-later + +"""Hashing Tests""" + +import io +import shutil +import tempfile +import unittest +from copy import deepcopy +from hashlib import sha512 +from pathlib import Path + +import numpy as np + +from sigmf import SigMFFile, hashing + +from .testdata import TEST_FLOAT32_DATA, TEST_METADATA + + +class TestHashCalculation(unittest.TestCase): + """Test hash calculation consistency across different SigMF formats.""" + + def setUp(self): + """Set up temporary directory for tests.""" + self.temp_dir = Path(tempfile.mkdtemp()) + + def tearDown(self): + """Clean up temporary directory.""" + shutil.rmtree(self.temp_dir) + + def test_ncd_hash_covers_entire_file(self): + """Test that non-conforming datasets hash the entire file including headers.""" + data_path = self.temp_dir / "ncd.bin" + with open(data_path, "wb") as handle: + # Create NCD file with header, data, and trailer + handle.write(b"\x00" * 64) # header + handle.write(TEST_FLOAT32_DATA.tobytes()) # sample data + handle.write(b"\xFF" * 32) # trailer + + # Create SigMF metadata for NCD + ncd_metadata = deepcopy(TEST_METADATA) + del ncd_metadata["global"][SigMFFile.HASH_KEY] + ncd_metadata["global"][SigMFFile.TRAILING_BYTES_KEY] = 32 + meta = SigMFFile(metadata=ncd_metadata) + meta.set_data_file(data_path, offset=64) + + file_hash = hashing.calculate_sha512(filename=data_path) + sigmf_hash = meta.get_global_field(SigMFFile.HASH_KEY) + self.assertEqual(file_hash, sigmf_hash) + + def test_edge_cases(self): + """Test edge cases in hash calculation function.""" + # empty file + empty_file = self.temp_dir / "empty.dat" + empty_file.touch() + empty_hash = hashing.calculate_sha512(filename=empty_file) + empty_hash_expected = sha512(b"").hexdigest() + self.assertEqual(empty_hash, empty_hash_expected) + + # small file (less than 4096 bytes) + small_data = np.random.bytes(128) + small_hash_expected = sha512(small_data).hexdigest() + small_file = self.temp_dir / "small.dat" + small_file.write_bytes(small_data) + small_hash = hashing.calculate_sha512(filename=small_file) + self.assertEqual(small_hash, small_hash_expected) + + # BytesIO + buffer = io.BytesIO(small_data) + buffer_hash = hashing.calculate_sha512(fileobj=buffer) + self.assertEqual(buffer_hash, small_hash_expected) diff --git a/tests/testdata.py b/tests/testdata.py index 5d4a8f9..fceb2ba 100644 --- a/tests/testdata.py +++ b/tests/testdata.py @@ -21,7 +21,6 @@ NONSIGMF_REPO = Path(_recordings_path) TEST_FLOAT32_DATA = np.arange(16, dtype=np.float32) - TEST_METADATA = { SigMFFile.ANNOTATION_KEY: [{SigMFFile.LENGTH_INDEX_KEY: 16, SigMFFile.START_INDEX_KEY: 0}], SigMFFile.CAPTURE_KEY: [{SigMFFile.START_INDEX_KEY: 0}], From 538fa9fe8f4adc7e89edf154f9afd5bec567d720 Mon Sep 17 00:00:00 2001 From: Teque5 Date: Wed, 7 Jan 2026 16:37:53 -0800 Subject: [PATCH 08/13] improve documentation related to building docs --- docs/Makefile | 6 +++++- docs/source/api.rst | 2 +- docs/source/developers.rst | 4 ++-- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/docs/Makefile b/docs/Makefile index d0c3cbf..9071ea0 100644 --- a/docs/Makefile +++ b/docs/Makefile @@ -12,7 +12,11 @@ BUILDDIR = build help: @$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) -.PHONY: help Makefile +clean: + rm -rf "$(BUILDDIR)" + rm -rf "$(SOURCEDIR)/_autosummary" + +.PHONY: help clean Makefile # Catch-all target: route all unknown targets to Sphinx using the new # "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS). diff --git a/docs/source/api.rst b/docs/source/api.rst index eac37d5..ae52a37 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -13,7 +13,7 @@ SigMF API sigmf.convert.wav sigmf.error sigmf.schema - sigmf.hash + sigmf.hashing sigmf.sigmffile sigmf.utils sigmf.validate diff --git a/docs/source/developers.rst b/docs/source/developers.rst index fd323ed..268c713 100644 --- a/docs/source/developers.rst +++ b/docs/source/developers.rst @@ -60,9 +60,9 @@ To build the docs and host locally: .. code-block:: console $ cd docs + $ make clean $ make html - $ cd build/html/ - $ python3 -m http.server + $ python3 -m http.server --directory build/html/ -------------- Find an Issue? From e02b751b6058669c316082214c6d1883766be777 Mon Sep 17 00:00:00 2001 From: Teque5 Date: Wed, 7 Jan 2026 16:40:14 -0800 Subject: [PATCH 09/13] f --- sigmf/sigmffile.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sigmf/sigmffile.py b/sigmf/sigmffile.py index 861faed..f9c3be0 100644 --- a/sigmf/sigmffile.py +++ b/sigmf/sigmffile.py @@ -15,7 +15,7 @@ import numpy as np -from . import __specification__, __version__, hash, schema, validate +from . import __specification__, __version__, hashing, schema, validate from .archive import ( SIGMF_ARCHIVE_EXT, SIGMF_COLLECTION_EXT, @@ -676,9 +676,9 @@ def calculate_hash(self): """ old_hash = self.get_global_field(self.HASH_KEY) if self.data_file is not None: - new_hash = hash.calculate_sha512(filename=self.data_file) + new_hash = hashing.calculate_sha512(filename=self.data_file) else: - new_hash = hash.calculate_sha512(fileobj=self.data_buffer) + new_hash = hashing.calculate_sha512(fileobj=self.data_buffer) if old_hash is not None: if old_hash != new_hash: raise SigMFFileError("Calculated file hash does not match associated metadata.") @@ -979,7 +979,7 @@ def verify_stream_hashes(self) -> None: metafile_name = get_sigmf_filenames(stream.get("name"))["meta_fn"] metafile_path = self.base_path / metafile_name if Path.is_file(metafile_path): - new_hash = hash.calculate_sha512(filename=metafile_path) + new_hash = hashing.calculate_sha512(filename=metafile_path) if old_hash != new_hash: raise SigMFFileError( f"Calculated file hash for {metafile_path} does not match collection metadata." @@ -997,7 +997,7 @@ def set_streams(self, metafiles) -> None: stream = { # name must be string here to be serializable later "name": str(get_sigmf_filenames(metafile)["base_fn"]), - "hash": hash.calculate_sha512(filename=metafile_path), + "hash": hashing.calculate_sha512(filename=metafile_path), } streams.append(stream) else: From cacce3d90a06b3504fa92a68c967f938486c30b2 Mon Sep 17 00:00:00 2001 From: Teque5 Date: Wed, 7 Jan 2026 16:40:41 -0800 Subject: [PATCH 10/13] enforce checksum in normal mode --- sigmf/convert/wav.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sigmf/convert/wav.py b/sigmf/convert/wav.py index b599693..79a919d 100644 --- a/sigmf/convert/wav.py +++ b/sigmf/convert/wav.py @@ -164,8 +164,8 @@ def wav_to_sigmf( # handle NCD case where no output files are created if create_ncd and out_path is None: # create metadata-only SigMF for NCD pointing to original file - meta = SigMFFile(global_info=global_info, skip_checksum=True) - meta.set_data_file(data_file=wav_path, offset=header_bytes, skip_checksum=True) + meta = SigMFFile(global_info=global_info) + meta.set_data_file(data_file=wav_path, offset=header_bytes) meta.data_buffer = io.BytesIO() meta.add_capture(0, metadata=capture_info) log.debug("created NCD SigMF: %r", meta) @@ -197,6 +197,7 @@ def wav_to_sigmf( # metadata returned should be for this archive meta = fromfile(filenames["archive_fn"]) else: + # write separate meta and data files data_path = filenames["data_fn"] wav_data.tofile(data_path) From 83a80e309300eabeacf4616c146eb3ad26a08371 Mon Sep 17 00:00:00 2001 From: Teque5 Date: Wed, 7 Jan 2026 16:56:31 -0800 Subject: [PATCH 11/13] simplify wav tests --- tests/test_convert_wav.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/tests/test_convert_wav.py b/tests/test_convert_wav.py index 83a5d0a..ea295d0 100644 --- a/tests/test_convert_wav.py +++ b/tests/test_convert_wav.py @@ -67,30 +67,36 @@ def test_wav_to_sigmf_pair(self): """test standard wav to sigmf conversion with file pairs""" sigmf_path = self.tmp_path / "bar.tmp" meta = wav_to_sigmf(wav_path=str(self.wav_path), out_path=str(sigmf_path)) + filenames = sigmf.sigmffile.get_sigmf_filenames(sigmf_path) + self.assertTrue(filenames["data_fn"].exists(), "dataset path missing") + self.assertTrue(filenames["meta_fn"].exists(), "metadata path missing") + # verify data data = meta.read_samples() self.assertGreater(len(data), 0, "Should read some samples") # allow numerical differences due to PCM quantization self.assertTrue(np.allclose(self.audio_data, data, atol=1e-4)) - filenames = sigmf.sigmffile.get_sigmf_filenames(sigmf_path) - self.assertTrue(filenames["data_fn"].exists(), "dataset path missing") - self.assertTrue(filenames["meta_fn"].exists(), "metadata path missing") def test_wav_to_sigmf_archive(self): """test wav to sigmf conversion with archive output""" sigmf_path = self.tmp_path / "baz.ext" - wav_to_sigmf(wav_path=str(self.wav_path), out_path=str(sigmf_path), create_archive=True) + meta = wav_to_sigmf(wav_path=str(self.wav_path), out_path=str(sigmf_path), create_archive=True) filenames = sigmf.sigmffile.get_sigmf_filenames(sigmf_path) self.assertTrue(filenames["archive_fn"].exists(), "archive path missing") + # verify data + data = meta.read_samples() + self.assertGreater(len(data), 0, "Should read some samples") + # allow numerical differences due to PCM quantization + self.assertTrue(np.allclose(self.audio_data, data, atol=1e-4)) def test_wav_to_sigmf_ncd(self): """test wav to sigmf conversion as Non-Conforming Dataset""" meta = wav_to_sigmf(wav_path=str(self.wav_path), create_ncd=True) _validate_ncd(self, meta, self.wav_path) - # verify data can still be read correctly from NCD + # verify data data = meta.read_samples() - self.assertGreater(len(data), 0, "Should read some samples") # allow numerical differences due to PCM quantization + self.assertGreater(len(data), 0, "Should read some samples") self.assertTrue(np.allclose(self.audio_data, data, atol=1e-4)) From 3a48d9b3e93ed37198741afc71fd4c64ae37958f Mon Sep 17 00:00:00 2001 From: Teque5 Date: Wed, 7 Jan 2026 17:34:28 -0800 Subject: [PATCH 12/13] fixed some usage edge cases with NCD files --- sigmf/convert/wav.py | 13 +++++++++++-- sigmf/sigmffile.py | 27 ++++++++++++++++++++++++++- 2 files changed, 37 insertions(+), 3 deletions(-) diff --git a/sigmf/convert/wav.py b/sigmf/convert/wav.py index 79a919d..a093316 100644 --- a/sigmf/convert/wav.py +++ b/sigmf/convert/wav.py @@ -105,6 +105,7 @@ def wav_to_sigmf( If the wav file cannot be read. """ wav_path = Path(wav_path) + out_path = None if out_path is None else Path(out_path) # auto-enable NCD when no output path is specified if out_path is None: @@ -161,13 +162,21 @@ def wav_to_sigmf( global_info[SigMFFile.DATASET_KEY] = wav_path.name capture_info[SigMFFile.HEADER_BYTES_KEY] = header_bytes - # handle NCD case where no output files are created - if create_ncd and out_path is None: + if create_ncd: # create metadata-only SigMF for NCD pointing to original file meta = SigMFFile(global_info=global_info) meta.set_data_file(data_file=wav_path, offset=header_bytes) meta.data_buffer = io.BytesIO() meta.add_capture(0, metadata=capture_info) + + # write metadata file if output path specified + if out_path is not None: + filenames = get_sigmf_filenames(out_path) + output_dir = filenames["meta_fn"].parent + output_dir.mkdir(parents=True, exist_ok=True) + meta.tofile(filenames["meta_fn"], toarchive=False) + log.info("wrote %s", filenames["meta_fn"]) + log.debug("created NCD SigMF: %r", meta) return meta diff --git a/sigmf/sigmffile.py b/sigmf/sigmffile.py index f9c3be0..f248e47 100644 --- a/sigmf/sigmffile.py +++ b/sigmf/sigmffile.py @@ -216,7 +216,8 @@ def __init__( if global_info is not None: self.set_global_info(global_info) if data_file is not None: - self.set_data_file(data_file, skip_checksum=skip_checksum, map_readonly=map_readonly) + offset = self._get_ncd_offset() + self.set_data_file(data_file, skip_checksum=skip_checksum, map_readonly=map_readonly, offset=offset) def __len__(self): return self._memmap.shape[0] @@ -392,6 +393,30 @@ def _is_conforming_dataset(self): # if we get here, the file exists and is conforming return True + def _get_ncd_offset(self): + """ + Detect Non-Conforming Dataset files and return the appropriate header offset. + + For NCD files that reference external non-SigMF files (e.g., WAV), the + core:header_bytes field indicates how many bytes to skip to reach the + actual sample data. + + Returns + ------- + int + Byte offset to apply when reading the dataset file. 0 for conforming datasets. + """ + if self._is_conforming_dataset(): + return 0 + + # check if this is an NCD with core:dataset and header_bytes + captures = self.get_captures() + dataset_field = self.get_global_field(self.DATASET_KEY) + if dataset_field and captures and self.HEADER_BYTES_KEY in captures[0]: + return captures[0][self.HEADER_BYTES_KEY] + + return 0 + def get_schema(self): """ Return a schema object valid for the current metadata From 93d0eefe78c161a1b2fd6057f5349d9dfd7b4c50 Mon Sep 17 00:00:00 2001 From: Teque5 Date: Wed, 7 Jan 2026 18:11:02 -0800 Subject: [PATCH 13/13] unify converters so we only have sigmf_convert --- pyproject.toml | 3 +- sigmf/convert/__init__.py | 67 +++++++++++++++++++++++++++++++++++++++ sigmf/convert/blue.py | 30 ------------------ sigmf/convert/wav.py | 36 --------------------- 4 files changed, 68 insertions(+), 68 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index dc950af..c0adbfb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,8 +33,7 @@ dependencies = [ [project.scripts] sigmf_validate = "sigmf.validate:main" - sigmf_convert_wav = "sigmf.convert.wav:main" - sigmf_convert_blue = "sigmf.convert.blue:main" + sigmf_convert = "sigmf.convert:main" [project.optional-dependencies] test = [ "pylint", diff --git a/sigmf/convert/__init__.py b/sigmf/convert/__init__.py index e69de29..70eccfb 100644 --- a/sigmf/convert/__init__.py +++ b/sigmf/convert/__init__.py @@ -0,0 +1,67 @@ +# Copyright: Multiple Authors +# +# This file is part of sigmf-python. https://github.com/sigmf/sigmf-python +# +# SPDX-License-Identifier: LGPL-3.0-or-later + +"""Unified converter for non-SigMF file formats""" + +import argparse +import logging +from pathlib import Path + +from .. import __version__ as toolversion +from ..error import SigMFConversionError +from ..utils import get_magic_bytes + + +def main() -> None: + """ + Unified entry-point for SigMF conversion of non-SigMF files. + """ + parser = argparse.ArgumentParser(description="Convert non-SigMF files to SigMF format") + parser.add_argument("-i", "--input", type=str, required=True, help="Input file path") + parser.add_argument("-o", "--output", type=str, default=None, help="Output SigMF path") + parser.add_argument("-v", "--verbose", action="count", default=0) + parser.add_argument( + "-a", "--archive", action="store_true", help="Save as SigMF archive instead of separate meta/data files." + ) + parser.add_argument( + "--ncd", action="store_true", help="Process as Non-Conforming Dataset and write .sigmf-meta only." + ) + parser.add_argument("--version", action="version", version=f"%(prog)s v{toolversion}") + args = parser.parse_args() + + level_lut = { + 0: logging.WARNING, + 1: logging.INFO, + 2: logging.DEBUG, + } + logging.basicConfig(level=level_lut[min(args.verbose, 2)]) + + input_path = Path(args.input) + + # detect file type using magic bytes (same logic as fromfile()) + magic_bytes = get_magic_bytes(input_path, count=4, offset=0) + + if magic_bytes == b"RIFF": + # WAV file + from .wav import wav_to_sigmf + + _ = wav_to_sigmf(wav_path=input_path, out_path=args.output, create_archive=args.archive, create_ncd=args.ncd) + + elif magic_bytes == b"BLUE": + # BLUE file + from .blue import blue_to_sigmf + + _ = blue_to_sigmf(blue_path=input_path, out_path=args.output, create_archive=args.archive, create_ncd=args.ncd) + + else: + raise SigMFConversionError( + f"Unsupported file format. Magic bytes: {magic_bytes}. " + f"Supported formats for conversion are WAV and BLUE/Platinum." + ) + + +if __name__ == "__main__": + main() diff --git a/sigmf/convert/blue.py b/sigmf/convert/blue.py index 5fc2f17..2bfaa58 100644 --- a/sigmf/convert/blue.py +++ b/sigmf/convert/blue.py @@ -11,7 +11,6 @@ Converts the extracted metadata into SigMF format. """ -import argparse import base64 import getpass import io @@ -845,32 +844,3 @@ def blue_to_sigmf( log.debug(f"{entry['tag']:20s}:{entry['value']}") return meta - - -def main() -> None: - """ - Entry-point for sigmf_convert_blue - """ - parser = argparse.ArgumentParser(description=__doc__) - parser.add_argument("-i", "--input", type=str, required=True, help="BLUE file path") - parser.add_argument("-o", "--output", type=str, default=None, help="SigMF path") - parser.add_argument("-v", "--verbose", action="count", default=0) - parser.add_argument("--archive", action="store_true", help="Write a .sigmf archive instead of meta/data pair") - parser.add_argument( - "--ncd", action="store_true", help="Process as Non-Conforming Dataset and write .sigmf-meta only." - ) - parser.add_argument("--version", action="version", version=f"%(prog)s v{toolversion}") - args = parser.parse_args() - - level_lut = { - 0: logging.WARNING, - 1: logging.INFO, - 2: logging.DEBUG, - } - logging.basicConfig(level=level_lut[min(args.verbose, 2)]) - - _ = blue_to_sigmf(blue_path=args.input, out_path=args.output, create_archive=args.archive, create_ncd=args.ncd) - - -if __name__ == "__main__": - main() diff --git a/sigmf/convert/wav.py b/sigmf/convert/wav.py index a093316..2d123b9 100644 --- a/sigmf/convert/wav.py +++ b/sigmf/convert/wav.py @@ -6,7 +6,6 @@ """converter for wav containers""" -import argparse import io import logging import tempfile @@ -218,38 +217,3 @@ def wav_to_sigmf( log.info("wrote %s and %s", filenames["meta_fn"], filenames["data_fn"]) return meta - - -def main() -> None: - """ - Entry-point for sigmf_convert_wav - """ - parser = argparse.ArgumentParser(description=__doc__) - parser.add_argument("-i", "--input", type=str, required=True, help="WAV path") - parser.add_argument("-o", "--output", type=str, default=None, help="SigMF path") - parser.add_argument("-v", "--verbose", action="count", default=0) - parser.add_argument( - "-a", "--archive", action="store_true", help="Save as SigMF archive instead of separate meta/data files." - ) - parser.add_argument( - "--ncd", action="store_true", help="Process as Non-Conforming Dataset and write .sigmf-meta only." - ) - parser.add_argument("--version", action="version", version=f"%(prog)s v{toolversion}") - args = parser.parse_args() - - level_lut = { - 0: logging.WARNING, - 1: logging.INFO, - 2: logging.DEBUG, - } - logging.basicConfig(level=level_lut[min(args.verbose, 2)]) - - wav_path = Path(args.input) - if args.output is None: - args.output = wav_path.with_suffix(".sigmf") - - _ = wav_to_sigmf(wav_path=wav_path, out_path=args.output, create_archive=args.archive, create_ncd=args.ncd) - - -if __name__ == "__main__": - main()