diff --git a/src/cfdb/workflows/processors/tabix.py b/src/cfdb/workflows/processors/tabix.py index 16666ee..1c5af12 100644 --- a/src/cfdb/workflows/processors/tabix.py +++ b/src/cfdb/workflows/processors/tabix.py @@ -55,6 +55,82 @@ __all__ = ["TabixIntervalProcessor", "download_source"] +#: gzip / bgzip stream magic (RFC 1952). bgzip output is gzip-compatible +#: and shares this prefix, so it is accepted too. +_GZIP_MAGIC = b"\x1f\x8b" + +#: Leading magic bytes of non-gzip binary serializations that ``zcat -f`` +#: cannot decompress. Matched explicitly so a mislabeled compressed file +#: (e.g. a BEDOPS ``starch`` archive mapped to ``BED`` upstream — issue +#: #69) is rejected by its magic rather than incidentally by the UTF-8 +#: text test below. +_BINARY_MAGICS: tuple[bytes, ...] = ( + b"\xca\x5c\xad\xe5", # BEDOPS starch + b"BZh", # bzip2 + b"\xfd7zXZ\x00", # xz + b"\x28\xb5\x2f\xfd", # zstd + b"PK\x03\x04", # zip +) + +#: How many leading source bytes to inspect when sniffing the encoding. +#: A handful would suffice for the magic checks; 512 is generous headroom +#: that also lets the UTF-8 text test see a representative plaintext span. +_SOURCE_SNIFF_BYTES = 512 + + +def _source_looks_processable(prefix: bytes) -> bool: + """Return True if ``prefix`` is gzip-compressed or decodes as text. + + The tabix text-interval pipeline assumes ``zcat -f`` yields plain + text, so a source must be either a gzip/bgzip member (magic + ``1f 8b``) or already plaintext. A gzip member is trusted on its + magic alone — ``zcat -f`` is single-pass, so a *doubly*-compressed + source (e.g. ``gzip(starch)``) is out of scope and would still slip + through. Everything else is treated as a binary serialization the + pipeline would silently mangle — most notably a BEDOPS ``starch`` + archive (magic ``ca5cade5``), which the upstream ontology mapper + labels ``BED`` even though ``zcat -f`` cannot decompress it (see + issue #69). Known non-gzip compression magics are rejected + explicitly via ``_BINARY_MAGICS``; any remaining binary is caught by + the NUL / non-UTF-8 text test. + + An empty prefix returns True: the genuinely-empty-source case is + caught later by the zero-data-line guard in ``_stage_prepare``. + + The text test is "decodes as UTF-8 with no NUL byte". It is + deliberately conservative: legitimately-text latin-1/CP1252 BED would + be rejected (failing the job rather than risking a mangled artifact), + which is acceptable because the upstream interval corpus is ASCII. A + multi-byte UTF-8 character can be split at the sniff boundary, so up + to three trailing bytes (a UTF-8 character is at most 4 bytes) are + trimmed before concluding the prefix is not text. + """ + if not prefix: + return True + if prefix.startswith(_GZIP_MAGIC): + return True + if any(prefix.startswith(magic) for magic in _BINARY_MAGICS): + return False + if b"\x00" in prefix: + return False + for trim in range(4): + candidate = prefix if trim == 0 else prefix[:-trim] + if not candidate: + break + try: + candidate.decode("utf-8") + except UnicodeDecodeError: + continue + return True + return False + + +def _read_prefix(path: Path, n: int = _SOURCE_SNIFF_BYTES) -> bytes: + """Read up to ``n`` leading bytes of ``path``.""" + with path.open("rb") as fh: + return fh.read(n) + + # Format → tabix preset. Both GFF and GFF3 map onto the same preset; the # upstream ontology mapper emits ``GFF`` for ``.gff`` and ``GFF3`` for # ``.gff3`` so both must be accepted here to avoid silent bypasses. @@ -76,11 +152,22 @@ "bed": ["-k1,1", "-k2,2n"], } +#: Formats whose source is binary by design and is validated by the tool +#: that consumes it (``bigBedToBed`` verifies the bigBed magic and exits +#: non-zero on bad input, which ``pipefail`` surfaces before any +#: ``cache.put``). These bypass the gzip/text source sniff — applying it +#: would false-reject every legitimate (binary) bigBed. +_SELF_VALIDATING_FORMATS = frozenset({"bigBed"}) + class TabixIntervalProcessor(Processor): """Handle plain-text genomic interval formats and produce a tabix index.""" - processor_version = 1 + # v2: the source-encoding guard (issue #69) changed which sources will + # ever be committed, so re-key all tabix artifacts — a poisoned v1 + # ``data`` entry (committed before the guard existed) becomes a cache + # miss, re-enters _stage_prepare, and is rejected instead of served. + processor_version = 2 supported_formats = frozenset(_TABIX_PRESET.keys()) artifact_kinds = (ArtifactKind.DATA, ArtifactKind.INDEX) @@ -132,6 +219,17 @@ async def _stage_prepare( source = workdir / "source.raw" await download_source(file_meta, source) + # Reject a non-gzip/non-text source before any processing or + # cache commit. Every format here flows through ``zcat -f`` and + # must be gzip or plain text (see issue #69 — BEDOPS starch + # archives are mislabeled ``BED`` upstream). Self-validating + # binary formats (bigBed) are exempt: their tool validates the + # input and fails the pipeline under ``pipefail`` before any + # ``cache.put``, so they need no sniff and would be false-rejected + # by it. + if fmt not in _SELF_VALIDATING_FORMATS: + await self._assert_source_processable(source, fmt, file_meta) + tmp_dir = workdir / "sort_tmp" tmp_dir.mkdir(exist_ok=True) bgz_path = workdir / "out.bgz" @@ -224,6 +322,33 @@ async def _stage_prepare( ) return bgz_path + async def _assert_source_processable( + self, source: Path, fmt: str, file_meta: dict[str, Any] + ) -> None: + """Reject a non-gzip/non-text source before committing anything. + + Reads the leading bytes of the freshly-downloaded ``source`` and + raises ``RuntimeError`` when they are neither gzip nor text, so a + binary serialization the ``zcat -f`` pipeline cannot decompress + (e.g. a BEDOPS ``starch`` archive mislabeled ``BED`` upstream) + fails the job cleanly *before* any ``cache.put`` rather than + committing a corrupt artifact. Callers skip this for ``bigBed``, + whose binary source is handled by ``bigBedToBed``. + """ + prefix = await asyncio.to_thread(_read_prefix, source) + if _source_looks_processable(prefix): + return + local_id = file_meta.get("local_id") + dcc = file_meta.get("dcc") + raise RuntimeError( + f"Refusing to process {fmt} source for dcc={dcc!r} " + f"local_id={local_id!r}: leading bytes {prefix[:8].hex()} are " + "neither gzip nor text. An unsupported serialization (e.g. " + "BEDOPS starch) was likely mapped onto a text-interval format " + "upstream; failing before commit so no corrupt artifact is " + "cached." + ) + async def _count_data_lines(self, bgz_path: Path, fmt: str) -> int: """Return the count of non-header data lines in ``bgz_path``. diff --git a/tests/test_workflows/test_processors_tabix.py b/tests/test_workflows/test_processors_tabix.py index 5e1c0a1..0ea59db 100644 --- a/tests/test_workflows/test_processors_tabix.py +++ b/tests/test_workflows/test_processors_tabix.py @@ -7,6 +7,8 @@ from typing import Any import pytest +from hypothesis import given, settings +from hypothesis import strategies as st from cfdb.workflows import SORT_MEMORY_CAP, keys as key_utils from cfdb.workflows.cache import LocalFsCache @@ -94,6 +96,173 @@ async def fake_count_data_lines(self, _bgz_path, _fmt): ) +def test__source_looks_processable_should_accept_gzip_magic(): + """Test that a gzip/bgzip prefix is accepted. + + Given: + A prefix beginning with the gzip magic bytes. + When: + _source_looks_processable is called. + Then: + It should return True so gzip/bgzip sources flow into the pipeline. + """ + # Act & assert + assert tabix_module._source_looks_processable(b"\x1f\x8b\x08\x00rest") is True + + +def test__source_looks_processable_should_accept_plain_text(): + """Test that a plaintext BED prefix is accepted. + + Given: + A prefix of plain BED text. + When: + _source_looks_processable is called. + Then: + It should return True. + """ + # Act & assert + assert tabix_module._source_looks_processable(b"chr1\t0\t100\tpeak\n") is True + + +def test__source_looks_processable_should_reject_starch_archive(): + """Test that a BEDOPS starch archive prefix is rejected. + + Given: + A prefix beginning with the starch magic (0xca5cade5) followed by + bzip2 bytes. + When: + _source_looks_processable is called. + Then: + It should return False so starch never reaches the zcat pipeline. + """ + # Act & assert + assert ( + tabix_module._source_looks_processable(b"\xca\x5c\xad\xe5BZh91AY&SY") is False + ) + + +@pytest.mark.parametrize( + "magic", + [ + b"BZh91AY&SY", # bzip2 + b"\xfd7zXZ\x00\x00", # xz + b"\x28\xb5\x2f\xfd\x00", # zstd + b"PK\x03\x04\x14\x00", # zip + ], +) +def test__source_looks_processable_should_reject_known_binary_magics(magic): + """Test that known non-gzip compression magics are rejected. + + Given: + A prefix beginning with a bzip2, xz, zstd, or zip magic number. + When: + _source_looks_processable is called. + Then: + It should return False so a mislabeled compressed source — which + zcat -f cannot decompress — never reaches the pipeline. + """ + # Act & assert + assert tabix_module._source_looks_processable(magic) is False + + +def test__source_looks_processable_should_reject_binary_with_nul(): + """Test that NUL-containing binary is rejected. + + Given: + A prefix containing a NUL byte. + When: + _source_looks_processable is called. + Then: + It should return False. + """ + # Act & assert + assert tabix_module._source_looks_processable(b"BB\x00\x01\x02bigbed") is False + + +def test__source_looks_processable_should_reject_short_invalid_binary(): + """Test that a short non-text binary prefix is rejected. + + Given: + A two-byte, NUL-free prefix that is not valid UTF-8 and is shorter + than the boundary-trim window (b"\\xff\\xfe"). + When: + _source_looks_processable is called. + Then: + It should return False once trimming exhausts the prefix without a + successful decode. + """ + # Act & assert + assert tabix_module._source_looks_processable(b"\xff\xfe") is False + + +def test__source_looks_processable_should_accept_text_split_at_boundary(): + """Test that text with a multi-byte char cut at the boundary is accepted. + + Given: + Valid UTF-8 text whose final multi-byte character is truncated by + one byte, as a real read of N bytes can split a character. + When: + _source_looks_processable is called. + Then: + It should return True by trimming the dangling bytes before + deciding, rather than false-rejecting valid text. + """ + # Arrange + prefix = ("α" * 250).encode("utf-8")[:-1] + + # Act & assert + assert tabix_module._source_looks_processable(prefix) is True + + +def test__source_looks_processable_should_accept_empty_prefix(): + """Test that an empty prefix defers to the downstream empty guard. + + Given: + An empty byte prefix (zero-length source read). + When: + _source_looks_processable is called. + Then: + It should return True so the empty-source case is handled by the + post-pipeline zero-data-line guard rather than here. + """ + # Act & assert + assert tabix_module._source_looks_processable(b"") is True + + +@settings(max_examples=50) +@given(payload=st.binary()) +def test__source_looks_processable_should_accept_any_gzip_prefixed_payload(payload): + """Test that any gzip-prefixed payload is accepted. + + Given: + Arbitrary bytes appended to the gzip magic prefix. + When: + _source_looks_processable is called. + Then: + It should always return True, since the gzip magic alone is + sufficient to route the source through zcat. + """ + # Act & assert + assert tabix_module._source_looks_processable(b"\x1f\x8b" + payload) is True + + +@settings(max_examples=50) +@given(payload=st.binary()) +def test__source_looks_processable_should_reject_any_starch_prefixed_payload(payload): + """Test that any starch-prefixed payload is rejected. + + Given: + Arbitrary bytes appended to the BEDOPS starch magic (0xca5cade5). + When: + _source_looks_processable is called. + Then: + It should always return False, generalizing the starch rejection + beyond a single example so no starch archive reaches the pipeline. + """ + # Act & assert + assert tabix_module._source_looks_processable(b"\xca\x5c\xad\xe5" + payload) is False + + class TestTabixIntervalProcessor: @pytest.mark.parametrize( "fmt", @@ -472,6 +641,214 @@ async def zero_count(self, _bgz, _fmt): ) assert await cache.head(data_key) is None + @pytest.mark.parametrize("fmt", ["BED", "VCF", "GTF"]) + @pytest.mark.asyncio + async def test_run_should_reject_starch_source_before_commit( + self, tmp_path, mocker, fmt + ): + """Test that a starch source fails before any cache commit. + + Given: + A BED/VCF/GTF file_meta whose downloaded source is a BEDOPS + starch archive (magic 0xca5cade5), with the pipeline tools + mocked to fail if reached. VCF and GTF take the + intermediate-decompress branches, so this pins that the guard + fires ahead of every non-bigBed path, not just the BED one. + When: + run is awaited. + Then: + It should raise RuntimeError matching "neither gzip nor text" + without running the pipeline and leave the data artifact + uncommitted in the cache. + """ + # Arrange + workdir = tmp_path / "work" + cache_root = tmp_path / "cache" + cache_root.mkdir() + + async def fake_download(_meta, dest): + dest.parent.mkdir(parents=True, exist_ok=True) + dest.write_bytes(b"\xca\x5c\xad\xe5BZh91AY&SYstarch-body") + return dest + + async def fail_shell(cmd): + raise AssertionError(f"pipeline must not run for starch: {cmd}") + + async def fail_run(argv): + raise AssertionError(f"tabix must not run for starch: {argv}") + + mocker.patch.object(tabix_module, "download_source", fake_download) + mocker.patch.object(tabix_module, "run_shell", fail_shell) + mocker.patch.object(tabix_module, "run_argv", fail_run) + + # Act & assert + with pytest.raises(RuntimeError, match="neither gzip nor text"): + async for _event in TabixIntervalProcessor().run( + _file_meta(fmt), workdir, LocalFsCache(cache_root) + ): + pass + cache = LocalFsCache(cache_root) + data_key = key_utils.cache_key( + dcc="encode", + local_id=f"ENCFF-{fmt}", + artifact_kind=ArtifactKind.DATA, + md5=FIXTURE_MD5, + processor_version=TabixIntervalProcessor().processor_version, + ) + assert await cache.head(data_key) is None + + @pytest.mark.asyncio + async def test_run_should_not_sniff_guard_bigbed_binary_source( + self, tmp_path, mocker + ): + """Test that the byte-sniff guard exempts bigBed binary sources. + + Given: + A bigBed file_meta whose downloaded source is binary + (NUL-containing), with the pipeline tools mocked. + When: + run is awaited. + Then: + It should complete and commit the data artifact rather than + raising, because bigBed is handled by bigBedToBed and is + exempt from the gzip/text guard. + """ + # Arrange + harness = _PipelineHarness() + harness.install(mocker) + + async def binary_download(_meta, dest): + dest.parent.mkdir(parents=True, exist_ok=True) + dest.write_bytes(b"\x00\x01\x02\x87\x89bigbed-binary") + return dest + + mocker.patch.object(tabix_module, "download_source", binary_download) + workdir = tmp_path / "work" + cache_root = tmp_path / "cache" + cache_root.mkdir() + + # Act + events = await _drain_run( + TabixIntervalProcessor(), _file_meta("bigBed"), workdir, cache_root + ) + artifacts = _final_artifacts(events) + + # Assert + cache = LocalFsCache(cache_root) + assert await cache.head(artifacts["data"]) is not None + + @pytest.mark.asyncio + async def test_run_should_not_commit_when_bigbedtobed_rejects_source( + self, tmp_path, mocker + ): + """Test that a bad bigBed source commits nothing when the tool fails. + + Given: + A bigBed file_meta whose pipeline raises, modeling bigBedToBed + rejecting a non-bigBed source with a non-zero exit (the bigBed + exemption skips the sniff guard, so the tool is the backstop). + When: + run is awaited. + Then: + It should raise RuntimeError and leave the data artifact + uncommitted, so the exemption cannot poison the cache. + """ + # Arrange + workdir = tmp_path / "work" + cache_root = tmp_path / "cache" + cache_root.mkdir() + + async def fake_download(_meta, dest): + dest.parent.mkdir(parents=True, exist_ok=True) + dest.write_bytes(b"\xca\x5c\xad\xe5not-a-real-bigbed") + return dest + + async def rejecting_shell(cmd): + raise RuntimeError(f"bigBedToBed exited 255: {cmd}") + + async def fail_run(argv): + raise AssertionError(f"tabix must not run: {argv}") + + mocker.patch.object(tabix_module, "download_source", fake_download) + mocker.patch.object(tabix_module, "run_shell", rejecting_shell) + mocker.patch.object(tabix_module, "run_argv", fail_run) + + # Act & assert + with pytest.raises(RuntimeError, match="bigBedToBed exited"): + async for _event in TabixIntervalProcessor().run( + _file_meta("bigBed"), workdir, LocalFsCache(cache_root) + ): + pass + cache = LocalFsCache(cache_root) + data_key = key_utils.cache_key( + dcc="encode", + local_id="ENCFF-bigBed", + artifact_kind=ArtifactKind.DATA, + md5=FIXTURE_MD5, + processor_version=TabixIntervalProcessor().processor_version, + ) + assert await cache.head(data_key) is None + + @pytest.mark.asyncio + async def test_run_should_not_serve_a_poisoned_v1_artifact( + self, tmp_path, mocker + ): + """Test that a v1-cached poisoned artifact is not served after the bump. + + Given: + A garbage data artifact seeded under the OLD processor_version=1 + cache key, with the current source a starch (BED-mapped) archive. + When: + run is awaited with the current processor. + Then: + It should re-key to the current version (a miss on the poisoned + v1 entry), re-enter the guard, raise, and never serve the v1 + bytes — the current data key stays absent. + """ + # Arrange + workdir = tmp_path / "work" + cache_root = tmp_path / "cache" + cache_root.mkdir() + cache = LocalFsCache(cache_root) + + seed = tmp_path / "seed.bgz" + seed.write_bytes(b"corrupt-v1-artifact") + v1_key = key_utils.cache_key( + dcc="encode", + local_id="ENCFF-BED", + artifact_kind=ArtifactKind.DATA, + md5=FIXTURE_MD5, + processor_version=1, + ) + await cache.put(v1_key, seed) + + async def fake_download(_meta, dest): + dest.parent.mkdir(parents=True, exist_ok=True) + dest.write_bytes(b"\xca\x5c\xad\xe5BZh91AY&SYstarch-body") + return dest + + async def fail_shell(cmd): + raise AssertionError(f"pipeline must not run: {cmd}") + + mocker.patch.object(tabix_module, "download_source", fake_download) + mocker.patch.object(tabix_module, "run_shell", fail_shell) + + # Act & assert + with pytest.raises(RuntimeError, match="neither gzip nor text"): + async for _event in TabixIntervalProcessor().run( + _file_meta("BED"), workdir, cache + ): + pass + current_key = key_utils.cache_key( + dcc="encode", + local_id="ENCFF-BED", + artifact_kind=ArtifactKind.DATA, + md5=FIXTURE_MD5, + processor_version=TabixIntervalProcessor().processor_version, + ) + assert current_key != v1_key + assert await cache.head(current_key) is None + @pytest.mark.asyncio async def test_run_should_use_bed_sort_keys_for_bed(self, tmp_path, mocker): """Test (TX-003) that BED inputs sort by ``-k1,1 -k2,2n``.