From 9c45e5ec1fe6ac12d6f479f0ceb10c760bbcad78 Mon Sep 17 00:00:00 2001 From: Conrad Date: Tue, 23 Jun 2026 21:48:27 -0400 Subject: [PATCH 1/4] fix: Reject non-gzip/non-text sources before caching tabix artifact ENCODE starch files (BEDOPS archives) are mapped to the BED format upstream, so the tabix processor ran zcat -f, sort and bgzip on them. starch is neither gzip nor text, so the pipeline produced a valid BGZF wrapping scrambled bytes and committed it to the content-addressed cache before tabix failed. That poisoned the cache: later reads served corrupt bytes and every retry was a stage-1 cache hit that re-failed the index step forever. Sniff the downloaded source's leading bytes and fail before any cache write when they are neither gzip nor plain text, so an unsupported serialization fails cleanly instead of committing a corrupt artifact. bigBed is exempt because its binary source is handled by bigBedToBed. --- src/cfdb/workflows/processors/tabix.py | 84 ++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) diff --git a/src/cfdb/workflows/processors/tabix.py b/src/cfdb/workflows/processors/tabix.py index 16666ee..d9a6134 100644 --- a/src/cfdb/workflows/processors/tabix.py +++ b/src/cfdb/workflows/processors/tabix.py @@ -55,6 +55,56 @@ __all__ = ["TabixIntervalProcessor", "download_source"] +#: gzip / bgzip stream magic (RFC 1952). bgzip output is gzip-compatible +#: and shares this prefix, so it is accepted too. +_GZIP_MAGIC = b"\x1f\x8b" + +#: How many leading source bytes to inspect when sniffing the encoding. +_SOURCE_SNIFF_BYTES = 512 + + +def _source_looks_processable(prefix: bytes) -> bool: + """Return True if ``prefix`` is gzip-compressed or decodes as text. + + The tabix text-interval pipeline assumes ``zcat -f`` yields plain + text, so a source must be either a gzip/bgzip member (magic + ``1f 8b``) or already plaintext. Anything else is a binary + serialization the pipeline would silently mangle — most notably a + BEDOPS ``starch`` archive (magic ``ca5cade5``), which the upstream + ontology mapper labels ``BED`` even though ``zcat -f`` cannot + decompress it (see issue #69). + + An empty prefix returns True: the genuinely-empty-source case is + caught later by the zero-data-line guard in ``_stage_prepare``. + + The text test is "decodes as UTF-8 with no NUL byte". A multi-byte + UTF-8 character may be split at the sniff boundary, so up to three + trailing bytes are trimmed before concluding the prefix is not text. + """ + if not prefix: + return True + if prefix.startswith(_GZIP_MAGIC): + return True + if b"\x00" in prefix: + return False + for trim in range(4): + candidate = prefix[: len(prefix) - trim] if trim else prefix + if not candidate: + break + try: + candidate.decode("utf-8") + except UnicodeDecodeError: + continue + return True + return False + + +def _read_prefix(path: Path, n: int = _SOURCE_SNIFF_BYTES) -> bytes: + """Read up to ``n`` leading bytes of ``path``.""" + with path.open("rb") as fh: + return fh.read(n) + + # Format → tabix preset. Both GFF and GFF3 map onto the same preset; the # upstream ontology mapper emits ``GFF`` for ``.gff`` and ``GFF3`` for # ``.gff3`` so both must be accepted here to avoid silent bypasses. @@ -132,6 +182,15 @@ async def _stage_prepare( source = workdir / "source.raw" await download_source(file_meta, source) + # Reject a non-gzip/non-text source before any processing or + # cache commit. bigBed is binary by design and handled by + # ``bigBedToBed`` (which validates its own input), so it is + # exempt; every other format flows through ``zcat -f`` and must + # be gzip or plain text (see issue #69 — BEDOPS starch archives + # are mislabeled ``BED`` upstream). + if fmt != "bigBed": + await self._assert_source_processable(source, fmt, file_meta) + tmp_dir = workdir / "sort_tmp" tmp_dir.mkdir(exist_ok=True) bgz_path = workdir / "out.bgz" @@ -224,6 +283,31 @@ async def _stage_prepare( ) return bgz_path + async def _assert_source_processable( + self, source: Path, fmt: str, file_meta: dict[str, Any] + ) -> None: + """Reject a non-gzip/non-text source before committing anything. + + Reads the leading bytes of the freshly-downloaded ``source`` and + raises ``RuntimeError`` when they are neither gzip nor text, so a + binary serialization the ``zcat -f`` pipeline cannot decompress + (e.g. a BEDOPS ``starch`` archive mislabeled ``BED`` upstream) + fails the job cleanly *before* any ``cache.put`` rather than + committing a corrupt artifact. Callers skip this for ``bigBed``, + whose binary source is handled by ``bigBedToBed``. + """ + prefix = await asyncio.to_thread(_read_prefix, source) + if _source_looks_processable(prefix): + return + local_id = file_meta.get("local_id") + raise RuntimeError( + f"Refusing to process {fmt} source for {local_id!r}: leading " + f"bytes {prefix[:8].hex()!r} are neither gzip nor text. An " + "unsupported serialization (e.g. BEDOPS starch) was likely " + "mapped onto a text-interval format upstream; failing before " + "commit so no corrupt artifact is cached." + ) + async def _count_data_lines(self, bgz_path: Path, fmt: str) -> int: """Return the count of non-header data lines in ``bgz_path``. From 12df4aba038e88b2a363e1c9c9c233ab2db40538 Mon Sep 17 00:00:00 2001 From: Conrad Date: Tue, 23 Jun 2026 21:50:18 -0400 Subject: [PATCH 2/4] test: Cover the tabix source-encoding guard Add predicate tests for the gzip, text, starch, NUL-binary and empty cases plus boundary-split and starch-prefixed property checks, and run-level tests asserting a starch source raises without committing a cache artifact while bigBed binary sources stay exempt from the guard. --- tests/test_workflows/test_processors_tabix.py | 238 ++++++++++++++++++ 1 file changed, 238 insertions(+) diff --git a/tests/test_workflows/test_processors_tabix.py b/tests/test_workflows/test_processors_tabix.py index 5e1c0a1..c527490 100644 --- a/tests/test_workflows/test_processors_tabix.py +++ b/tests/test_workflows/test_processors_tabix.py @@ -7,6 +7,8 @@ from typing import Any import pytest +from hypothesis import given, settings +from hypothesis import strategies as st from cfdb.workflows import SORT_MEMORY_CAP, keys as key_utils from cfdb.workflows.cache import LocalFsCache @@ -94,6 +96,149 @@ async def fake_count_data_lines(self, _bgz_path, _fmt): ) +def test__source_looks_processable_should_accept_gzip_magic(): + """Test that a gzip/bgzip prefix is accepted. + + Given: + A prefix beginning with the gzip magic bytes. + When: + _source_looks_processable is called. + Then: + It should return True so gzip/bgzip sources flow into the pipeline. + """ + # Act & assert + assert tabix_module._source_looks_processable(b"\x1f\x8b\x08\x00rest") is True + + +def test__source_looks_processable_should_accept_plain_text(): + """Test that a plaintext BED prefix is accepted. + + Given: + A prefix of plain BED text. + When: + _source_looks_processable is called. + Then: + It should return True. + """ + # Act & assert + assert tabix_module._source_looks_processable(b"chr1\t0\t100\tpeak\n") is True + + +def test__source_looks_processable_should_reject_starch_archive(): + """Test that a BEDOPS starch archive prefix is rejected. + + Given: + A prefix beginning with the starch magic (0xca5cade5) followed by + bzip2 bytes. + When: + _source_looks_processable is called. + Then: + It should return False so starch never reaches the zcat pipeline. + """ + # Act & assert + assert ( + tabix_module._source_looks_processable(b"\xca\x5c\xad\xe5BZh91AY&SY") is False + ) + + +def test__source_looks_processable_should_reject_binary_with_nul(): + """Test that NUL-containing binary is rejected. + + Given: + A prefix containing a NUL byte. + When: + _source_looks_processable is called. + Then: + It should return False. + """ + # Act & assert + assert tabix_module._source_looks_processable(b"BB\x00\x01\x02bigbed") is False + + +def test__source_looks_processable_should_reject_short_invalid_binary(): + """Test that a short non-text binary prefix is rejected. + + Given: + A two-byte, NUL-free prefix that is not valid UTF-8 and is shorter + than the boundary-trim window (b"\\xff\\xfe"). + When: + _source_looks_processable is called. + Then: + It should return False once trimming exhausts the prefix without a + successful decode. + """ + # Act & assert + assert tabix_module._source_looks_processable(b"\xff\xfe") is False + + +def test__source_looks_processable_should_accept_text_split_at_boundary(): + """Test that text with a multi-byte char cut at the boundary is accepted. + + Given: + Valid UTF-8 text whose final multi-byte character is truncated by + one byte, as a real read of N bytes can split a character. + When: + _source_looks_processable is called. + Then: + It should return True by trimming the dangling bytes before + deciding, rather than false-rejecting valid text. + """ + # Arrange + prefix = ("α" * 250).encode("utf-8")[:-1] + + # Act & assert + assert tabix_module._source_looks_processable(prefix) is True + + +def test__source_looks_processable_should_accept_empty_prefix(): + """Test that an empty prefix defers to the downstream empty guard. + + Given: + An empty byte prefix (zero-length source read). + When: + _source_looks_processable is called. + Then: + It should return True so the empty-source case is handled by the + post-pipeline zero-data-line guard rather than here. + """ + # Act & assert + assert tabix_module._source_looks_processable(b"") is True + + +@settings(max_examples=50) +@given(payload=st.binary()) +def test__source_looks_processable_should_accept_any_gzip_prefixed_payload(payload): + """Test that any gzip-prefixed payload is accepted. + + Given: + Arbitrary bytes appended to the gzip magic prefix. + When: + _source_looks_processable is called. + Then: + It should always return True, since the gzip magic alone is + sufficient to route the source through zcat. + """ + # Act & assert + assert tabix_module._source_looks_processable(b"\x1f\x8b" + payload) is True + + +@settings(max_examples=50) +@given(payload=st.binary()) +def test__source_looks_processable_should_reject_any_starch_prefixed_payload(payload): + """Test that any starch-prefixed payload is rejected. + + Given: + Arbitrary bytes appended to the BEDOPS starch magic (0xca5cade5). + When: + _source_looks_processable is called. + Then: + It should always return False, generalizing the starch rejection + beyond a single example so no starch archive reaches the pipeline. + """ + # Act & assert + assert tabix_module._source_looks_processable(b"\xca\x5c\xad\xe5" + payload) is False + + class TestTabixIntervalProcessor: @pytest.mark.parametrize( "fmt", @@ -472,6 +617,99 @@ async def zero_count(self, _bgz, _fmt): ) assert await cache.head(data_key) is None + @pytest.mark.asyncio + async def test_run_should_reject_starch_source_before_commit( + self, tmp_path, mocker + ): + """Test that a starch source fails before any cache commit. + + Given: + A BED file_meta whose downloaded source is a BEDOPS starch + archive (magic 0xca5cade5), with the pipeline tools mocked to + fail if reached. + When: + run is awaited. + Then: + It should raise RuntimeError matching "neither gzip nor text" + without running the pipeline and leave the data artifact + uncommitted in the cache. + """ + # Arrange + workdir = tmp_path / "work" + cache_root = tmp_path / "cache" + cache_root.mkdir() + + async def fake_download(_meta, dest): + dest.parent.mkdir(parents=True, exist_ok=True) + dest.write_bytes(b"\xca\x5c\xad\xe5BZh91AY&SYstarch-body") + return dest + + async def fail_shell(cmd): + raise AssertionError(f"pipeline must not run for starch: {cmd}") + + async def fail_run(argv): + raise AssertionError(f"tabix must not run for starch: {argv}") + + mocker.patch.object(tabix_module, "download_source", fake_download) + mocker.patch.object(tabix_module, "run_shell", fail_shell) + mocker.patch.object(tabix_module, "run_argv", fail_run) + + # Act & assert + with pytest.raises(RuntimeError, match="neither gzip nor text"): + async for _event in TabixIntervalProcessor().run( + _file_meta("BED"), workdir, LocalFsCache(cache_root) + ): + pass + cache = LocalFsCache(cache_root) + data_key = key_utils.cache_key( + dcc="encode", + local_id="ENCFF-BED", + artifact_kind=ArtifactKind.DATA, + md5=FIXTURE_MD5, + processor_version=TabixIntervalProcessor().processor_version, + ) + assert await cache.head(data_key) is None + + @pytest.mark.asyncio + async def test_run_should_not_sniff_guard_bigbed_binary_source( + self, tmp_path, mocker + ): + """Test that the byte-sniff guard exempts bigBed binary sources. + + Given: + A bigBed file_meta whose downloaded source is binary + (NUL-containing), with the pipeline tools mocked. + When: + run is awaited. + Then: + It should complete and commit the data artifact rather than + raising, because bigBed is handled by bigBedToBed and is + exempt from the gzip/text guard. + """ + # Arrange + harness = _PipelineHarness() + harness.install(mocker) + + async def binary_download(_meta, dest): + dest.parent.mkdir(parents=True, exist_ok=True) + dest.write_bytes(b"\x00\x01\x02\x87\x89bigbed-binary") + return dest + + mocker.patch.object(tabix_module, "download_source", binary_download) + workdir = tmp_path / "work" + cache_root = tmp_path / "cache" + cache_root.mkdir() + + # Act + events = await _drain_run( + TabixIntervalProcessor(), _file_meta("bigBed"), workdir, cache_root + ) + artifacts = _final_artifacts(events) + + # Assert + cache = LocalFsCache(cache_root) + assert await cache.head(artifacts["data"]) is not None + @pytest.mark.asyncio async def test_run_should_use_bed_sort_keys_for_bed(self, tmp_path, mocker): """Test (TX-003) that BED inputs sort by ``-k1,1 -k2,2n``. From be711afa262375df58fb6e9159bd71d54e9edd00 Mon Sep 17 00:00:00 2001 From: Conrad Date: Wed, 24 Jun 2026 13:09:42 -0400 Subject: [PATCH 3/4] fix: Harden the starch source guard per review Address the round-1 principal-engineer review of the source-encoding guard: - Bump processor_version so an artifact poisoned by the pre-guard code (committed under the old version) is re-keyed to a miss, re-enters _stage_prepare, and is rejected instead of served. The guard was otherwise unreachable for already-cached entries. - Reject known non-gzip compression magics (BEDOPS starch, bzip2, xz, zstd, zip) explicitly before the UTF-8 text test, so a mislabeled compressed source is caught by its magic rather than incidentally. - Name the bigBed exemption as a set and document the safety chain (the tool validates its own input and fails under pipefail before any cache commit), so the carve-out is greppable and self-explaining. - Scope the docstring honestly (a gzip member is trusted on its magic alone, so a doubly-compressed source is out of scope; latin-1/CP1252 text is deliberately rejected) and add the dcc to the error message. --- src/cfdb/workflows/processors/tabix.py | 83 +++++++++++++++++++------- 1 file changed, 62 insertions(+), 21 deletions(-) diff --git a/src/cfdb/workflows/processors/tabix.py b/src/cfdb/workflows/processors/tabix.py index d9a6134..1c5af12 100644 --- a/src/cfdb/workflows/processors/tabix.py +++ b/src/cfdb/workflows/processors/tabix.py @@ -59,7 +59,22 @@ #: and shares this prefix, so it is accepted too. _GZIP_MAGIC = b"\x1f\x8b" +#: Leading magic bytes of non-gzip binary serializations that ``zcat -f`` +#: cannot decompress. Matched explicitly so a mislabeled compressed file +#: (e.g. a BEDOPS ``starch`` archive mapped to ``BED`` upstream — issue +#: #69) is rejected by its magic rather than incidentally by the UTF-8 +#: text test below. +_BINARY_MAGICS: tuple[bytes, ...] = ( + b"\xca\x5c\xad\xe5", # BEDOPS starch + b"BZh", # bzip2 + b"\xfd7zXZ\x00", # xz + b"\x28\xb5\x2f\xfd", # zstd + b"PK\x03\x04", # zip +) + #: How many leading source bytes to inspect when sniffing the encoding. +#: A handful would suffice for the magic checks; 512 is generous headroom +#: that also lets the UTF-8 text test see a representative plaintext span. _SOURCE_SNIFF_BYTES = 512 @@ -68,27 +83,38 @@ def _source_looks_processable(prefix: bytes) -> bool: The tabix text-interval pipeline assumes ``zcat -f`` yields plain text, so a source must be either a gzip/bgzip member (magic - ``1f 8b``) or already plaintext. Anything else is a binary - serialization the pipeline would silently mangle — most notably a - BEDOPS ``starch`` archive (magic ``ca5cade5``), which the upstream - ontology mapper labels ``BED`` even though ``zcat -f`` cannot - decompress it (see issue #69). + ``1f 8b``) or already plaintext. A gzip member is trusted on its + magic alone — ``zcat -f`` is single-pass, so a *doubly*-compressed + source (e.g. ``gzip(starch)``) is out of scope and would still slip + through. Everything else is treated as a binary serialization the + pipeline would silently mangle — most notably a BEDOPS ``starch`` + archive (magic ``ca5cade5``), which the upstream ontology mapper + labels ``BED`` even though ``zcat -f`` cannot decompress it (see + issue #69). Known non-gzip compression magics are rejected + explicitly via ``_BINARY_MAGICS``; any remaining binary is caught by + the NUL / non-UTF-8 text test. An empty prefix returns True: the genuinely-empty-source case is caught later by the zero-data-line guard in ``_stage_prepare``. - The text test is "decodes as UTF-8 with no NUL byte". A multi-byte - UTF-8 character may be split at the sniff boundary, so up to three - trailing bytes are trimmed before concluding the prefix is not text. + The text test is "decodes as UTF-8 with no NUL byte". It is + deliberately conservative: legitimately-text latin-1/CP1252 BED would + be rejected (failing the job rather than risking a mangled artifact), + which is acceptable because the upstream interval corpus is ASCII. A + multi-byte UTF-8 character can be split at the sniff boundary, so up + to three trailing bytes (a UTF-8 character is at most 4 bytes) are + trimmed before concluding the prefix is not text. """ if not prefix: return True if prefix.startswith(_GZIP_MAGIC): return True + if any(prefix.startswith(magic) for magic in _BINARY_MAGICS): + return False if b"\x00" in prefix: return False for trim in range(4): - candidate = prefix[: len(prefix) - trim] if trim else prefix + candidate = prefix if trim == 0 else prefix[:-trim] if not candidate: break try: @@ -126,11 +152,22 @@ def _read_prefix(path: Path, n: int = _SOURCE_SNIFF_BYTES) -> bytes: "bed": ["-k1,1", "-k2,2n"], } +#: Formats whose source is binary by design and is validated by the tool +#: that consumes it (``bigBedToBed`` verifies the bigBed magic and exits +#: non-zero on bad input, which ``pipefail`` surfaces before any +#: ``cache.put``). These bypass the gzip/text source sniff — applying it +#: would false-reject every legitimate (binary) bigBed. +_SELF_VALIDATING_FORMATS = frozenset({"bigBed"}) + class TabixIntervalProcessor(Processor): """Handle plain-text genomic interval formats and produce a tabix index.""" - processor_version = 1 + # v2: the source-encoding guard (issue #69) changed which sources will + # ever be committed, so re-key all tabix artifacts — a poisoned v1 + # ``data`` entry (committed before the guard existed) becomes a cache + # miss, re-enters _stage_prepare, and is rejected instead of served. + processor_version = 2 supported_formats = frozenset(_TABIX_PRESET.keys()) artifact_kinds = (ArtifactKind.DATA, ArtifactKind.INDEX) @@ -183,12 +220,14 @@ async def _stage_prepare( await download_source(file_meta, source) # Reject a non-gzip/non-text source before any processing or - # cache commit. bigBed is binary by design and handled by - # ``bigBedToBed`` (which validates its own input), so it is - # exempt; every other format flows through ``zcat -f`` and must - # be gzip or plain text (see issue #69 — BEDOPS starch archives - # are mislabeled ``BED`` upstream). - if fmt != "bigBed": + # cache commit. Every format here flows through ``zcat -f`` and + # must be gzip or plain text (see issue #69 — BEDOPS starch + # archives are mislabeled ``BED`` upstream). Self-validating + # binary formats (bigBed) are exempt: their tool validates the + # input and fails the pipeline under ``pipefail`` before any + # ``cache.put``, so they need no sniff and would be false-rejected + # by it. + if fmt not in _SELF_VALIDATING_FORMATS: await self._assert_source_processable(source, fmt, file_meta) tmp_dir = workdir / "sort_tmp" @@ -300,12 +339,14 @@ async def _assert_source_processable( if _source_looks_processable(prefix): return local_id = file_meta.get("local_id") + dcc = file_meta.get("dcc") raise RuntimeError( - f"Refusing to process {fmt} source for {local_id!r}: leading " - f"bytes {prefix[:8].hex()!r} are neither gzip nor text. An " - "unsupported serialization (e.g. BEDOPS starch) was likely " - "mapped onto a text-interval format upstream; failing before " - "commit so no corrupt artifact is cached." + f"Refusing to process {fmt} source for dcc={dcc!r} " + f"local_id={local_id!r}: leading bytes {prefix[:8].hex()} are " + "neither gzip nor text. An unsupported serialization (e.g. " + "BEDOPS starch) was likely mapped onto a text-interval format " + "upstream; failing before commit so no corrupt artifact is " + "cached." ) async def _count_data_lines(self, bgz_path: Path, fmt: str) -> int: From aca3df3bc6b6cb786460d4dd06c24abc59a68b91 Mon Sep 17 00:00:00 2001 From: Conrad Date: Wed, 24 Jun 2026 13:09:51 -0400 Subject: [PATCH 4/4] test: Cover the guard's denylist, bigBed rejection, and cache re-key Extend the source-encoding guard coverage: - Parametrize the starch-rejection run test over BED, VCF and GTF so the guard is pinned ahead of the intermediate-decompress branches, not just the single-pipeline BED path. - Add predicate cases for the bzip2, xz, zstd and zip magic denylist. - Add a bigBed test where the tool raises, asserting the exemption commits nothing when bigBedToBed rejects a bad source. - Add a regression test that a poisoned artifact under the old processor version is re-keyed and never served. --- tests/test_workflows/test_processors_tabix.py | 151 +++++++++++++++++- 1 file changed, 145 insertions(+), 6 deletions(-) diff --git a/tests/test_workflows/test_processors_tabix.py b/tests/test_workflows/test_processors_tabix.py index c527490..0ea59db 100644 --- a/tests/test_workflows/test_processors_tabix.py +++ b/tests/test_workflows/test_processors_tabix.py @@ -141,6 +141,30 @@ def test__source_looks_processable_should_reject_starch_archive(): ) +@pytest.mark.parametrize( + "magic", + [ + b"BZh91AY&SY", # bzip2 + b"\xfd7zXZ\x00\x00", # xz + b"\x28\xb5\x2f\xfd\x00", # zstd + b"PK\x03\x04\x14\x00", # zip + ], +) +def test__source_looks_processable_should_reject_known_binary_magics(magic): + """Test that known non-gzip compression magics are rejected. + + Given: + A prefix beginning with a bzip2, xz, zstd, or zip magic number. + When: + _source_looks_processable is called. + Then: + It should return False so a mislabeled compressed source — which + zcat -f cannot decompress — never reaches the pipeline. + """ + # Act & assert + assert tabix_module._source_looks_processable(magic) is False + + def test__source_looks_processable_should_reject_binary_with_nul(): """Test that NUL-containing binary is rejected. @@ -617,16 +641,19 @@ async def zero_count(self, _bgz, _fmt): ) assert await cache.head(data_key) is None + @pytest.mark.parametrize("fmt", ["BED", "VCF", "GTF"]) @pytest.mark.asyncio async def test_run_should_reject_starch_source_before_commit( - self, tmp_path, mocker + self, tmp_path, mocker, fmt ): """Test that a starch source fails before any cache commit. Given: - A BED file_meta whose downloaded source is a BEDOPS starch - archive (magic 0xca5cade5), with the pipeline tools mocked to - fail if reached. + A BED/VCF/GTF file_meta whose downloaded source is a BEDOPS + starch archive (magic 0xca5cade5), with the pipeline tools + mocked to fail if reached. VCF and GTF take the + intermediate-decompress branches, so this pins that the guard + fires ahead of every non-bigBed path, not just the BED one. When: run is awaited. Then: @@ -657,13 +684,13 @@ async def fail_run(argv): # Act & assert with pytest.raises(RuntimeError, match="neither gzip nor text"): async for _event in TabixIntervalProcessor().run( - _file_meta("BED"), workdir, LocalFsCache(cache_root) + _file_meta(fmt), workdir, LocalFsCache(cache_root) ): pass cache = LocalFsCache(cache_root) data_key = key_utils.cache_key( dcc="encode", - local_id="ENCFF-BED", + local_id=f"ENCFF-{fmt}", artifact_kind=ArtifactKind.DATA, md5=FIXTURE_MD5, processor_version=TabixIntervalProcessor().processor_version, @@ -710,6 +737,118 @@ async def binary_download(_meta, dest): cache = LocalFsCache(cache_root) assert await cache.head(artifacts["data"]) is not None + @pytest.mark.asyncio + async def test_run_should_not_commit_when_bigbedtobed_rejects_source( + self, tmp_path, mocker + ): + """Test that a bad bigBed source commits nothing when the tool fails. + + Given: + A bigBed file_meta whose pipeline raises, modeling bigBedToBed + rejecting a non-bigBed source with a non-zero exit (the bigBed + exemption skips the sniff guard, so the tool is the backstop). + When: + run is awaited. + Then: + It should raise RuntimeError and leave the data artifact + uncommitted, so the exemption cannot poison the cache. + """ + # Arrange + workdir = tmp_path / "work" + cache_root = tmp_path / "cache" + cache_root.mkdir() + + async def fake_download(_meta, dest): + dest.parent.mkdir(parents=True, exist_ok=True) + dest.write_bytes(b"\xca\x5c\xad\xe5not-a-real-bigbed") + return dest + + async def rejecting_shell(cmd): + raise RuntimeError(f"bigBedToBed exited 255: {cmd}") + + async def fail_run(argv): + raise AssertionError(f"tabix must not run: {argv}") + + mocker.patch.object(tabix_module, "download_source", fake_download) + mocker.patch.object(tabix_module, "run_shell", rejecting_shell) + mocker.patch.object(tabix_module, "run_argv", fail_run) + + # Act & assert + with pytest.raises(RuntimeError, match="bigBedToBed exited"): + async for _event in TabixIntervalProcessor().run( + _file_meta("bigBed"), workdir, LocalFsCache(cache_root) + ): + pass + cache = LocalFsCache(cache_root) + data_key = key_utils.cache_key( + dcc="encode", + local_id="ENCFF-bigBed", + artifact_kind=ArtifactKind.DATA, + md5=FIXTURE_MD5, + processor_version=TabixIntervalProcessor().processor_version, + ) + assert await cache.head(data_key) is None + + @pytest.mark.asyncio + async def test_run_should_not_serve_a_poisoned_v1_artifact( + self, tmp_path, mocker + ): + """Test that a v1-cached poisoned artifact is not served after the bump. + + Given: + A garbage data artifact seeded under the OLD processor_version=1 + cache key, with the current source a starch (BED-mapped) archive. + When: + run is awaited with the current processor. + Then: + It should re-key to the current version (a miss on the poisoned + v1 entry), re-enter the guard, raise, and never serve the v1 + bytes — the current data key stays absent. + """ + # Arrange + workdir = tmp_path / "work" + cache_root = tmp_path / "cache" + cache_root.mkdir() + cache = LocalFsCache(cache_root) + + seed = tmp_path / "seed.bgz" + seed.write_bytes(b"corrupt-v1-artifact") + v1_key = key_utils.cache_key( + dcc="encode", + local_id="ENCFF-BED", + artifact_kind=ArtifactKind.DATA, + md5=FIXTURE_MD5, + processor_version=1, + ) + await cache.put(v1_key, seed) + + async def fake_download(_meta, dest): + dest.parent.mkdir(parents=True, exist_ok=True) + dest.write_bytes(b"\xca\x5c\xad\xe5BZh91AY&SYstarch-body") + return dest + + async def fail_shell(cmd): + raise AssertionError(f"pipeline must not run: {cmd}") + + mocker.patch.object(tabix_module, "download_source", fake_download) + mocker.patch.object(tabix_module, "run_shell", fail_shell) + + # Act & assert + with pytest.raises(RuntimeError, match="neither gzip nor text"): + async for _event in TabixIntervalProcessor().run( + _file_meta("BED"), workdir, cache + ): + pass + current_key = key_utils.cache_key( + dcc="encode", + local_id="ENCFF-BED", + artifact_kind=ArtifactKind.DATA, + md5=FIXTURE_MD5, + processor_version=TabixIntervalProcessor().processor_version, + ) + assert current_key != v1_key + assert await cache.head(current_key) is None + @pytest.mark.asyncio async def test_run_should_use_bed_sort_keys_for_bed(self, tmp_path, mocker): """Test (TX-003) that BED inputs sort by ``-k1,1 -k2,2n``.