Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 126 additions & 1 deletion src/cfdb/workflows/processors/tabix.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,82 @@
__all__ = ["TabixIntervalProcessor", "download_source"]


#: gzip / bgzip stream magic (RFC 1952). bgzip output is gzip-compatible
#: and shares this prefix, so it is accepted too.
_GZIP_MAGIC = b"\x1f\x8b"

#: Leading magic bytes of non-gzip binary serializations that ``zcat -f``
#: cannot decompress. Matched explicitly so a mislabeled compressed file
#: (e.g. a BEDOPS ``starch`` archive mapped to ``BED`` upstream — issue
#: #69) is rejected by its magic rather than incidentally by the UTF-8
#: text test below.
_BINARY_MAGICS: tuple[bytes, ...] = (
b"\xca\x5c\xad\xe5", # BEDOPS starch
b"BZh", # bzip2
b"\xfd7zXZ\x00", # xz
b"\x28\xb5\x2f\xfd", # zstd
b"PK\x03\x04", # zip
)

#: How many leading source bytes to inspect when sniffing the encoding.
#: A handful would suffice for the magic checks; 512 is generous headroom
#: that also lets the UTF-8 text test see a representative plaintext span.
_SOURCE_SNIFF_BYTES = 512


def _source_looks_processable(prefix: bytes) -> bool:
"""Return True if ``prefix`` is gzip-compressed or decodes as text.

The tabix text-interval pipeline assumes ``zcat -f`` yields plain
text, so a source must be either a gzip/bgzip member (magic
``1f 8b``) or already plaintext. A gzip member is trusted on its
magic alone — ``zcat -f`` is single-pass, so a *doubly*-compressed
source (e.g. ``gzip(starch)``) is out of scope and would still slip
through. Everything else is treated as a binary serialization the
pipeline would silently mangle — most notably a BEDOPS ``starch``
archive (magic ``ca5cade5``), which the upstream ontology mapper
labels ``BED`` even though ``zcat -f`` cannot decompress it (see
issue #69). Known non-gzip compression magics are rejected
explicitly via ``_BINARY_MAGICS``; any remaining binary is caught by
the NUL / non-UTF-8 text test.

An empty prefix returns True: the genuinely-empty-source case is
caught later by the zero-data-line guard in ``_stage_prepare``.

The text test is "decodes as UTF-8 with no NUL byte". It is
deliberately conservative: legitimately-text latin-1/CP1252 BED would
be rejected (failing the job rather than risking a mangled artifact),
which is acceptable because the upstream interval corpus is ASCII. A
multi-byte UTF-8 character can be split at the sniff boundary, so up
to three trailing bytes (a UTF-8 character is at most 4 bytes) are
trimmed before concluding the prefix is not text.
"""
if not prefix:
return True
if prefix.startswith(_GZIP_MAGIC):
return True
if any(prefix.startswith(magic) for magic in _BINARY_MAGICS):
return False
if b"\x00" in prefix:
return False
for trim in range(4):
candidate = prefix if trim == 0 else prefix[:-trim]
if not candidate:
break
try:
candidate.decode("utf-8")
except UnicodeDecodeError:
continue
return True
return False


def _read_prefix(path: Path, n: int = _SOURCE_SNIFF_BYTES) -> bytes:
"""Read up to ``n`` leading bytes of ``path``."""
with path.open("rb") as fh:
return fh.read(n)


# Format → tabix preset. Both GFF and GFF3 map onto the same preset; the
# upstream ontology mapper emits ``GFF`` for ``.gff`` and ``GFF3`` for
# ``.gff3`` so both must be accepted here to avoid silent bypasses.
Expand All @@ -76,11 +152,22 @@
"bed": ["-k1,1", "-k2,2n"],
}

#: Formats whose source is binary by design and is validated by the tool
#: that consumes it (``bigBedToBed`` verifies the bigBed magic and exits
#: non-zero on bad input, which ``pipefail`` surfaces before any
#: ``cache.put``). These bypass the gzip/text source sniff — applying it
#: would false-reject every legitimate (binary) bigBed.
_SELF_VALIDATING_FORMATS = frozenset({"bigBed"})


class TabixIntervalProcessor(Processor):
"""Handle plain-text genomic interval formats and produce a tabix index."""

processor_version = 1
# v2: the source-encoding guard (issue #69) changed which sources will
# ever be committed, so re-key all tabix artifacts — a poisoned v1
# ``data`` entry (committed before the guard existed) becomes a cache
# miss, re-enters _stage_prepare, and is rejected instead of served.
processor_version = 2
supported_formats = frozenset(_TABIX_PRESET.keys())
artifact_kinds = (ArtifactKind.DATA, ArtifactKind.INDEX)

Expand Down Expand Up @@ -132,6 +219,17 @@ async def _stage_prepare(
source = workdir / "source.raw"
await download_source(file_meta, source)

# Reject a non-gzip/non-text source before any processing or
# cache commit. Every format here flows through ``zcat -f`` and
# must be gzip or plain text (see issue #69 — BEDOPS starch
# archives are mislabeled ``BED`` upstream). Self-validating
# binary formats (bigBed) are exempt: their tool validates the
# input and fails the pipeline under ``pipefail`` before any
# ``cache.put``, so they need no sniff and would be false-rejected
# by it.
if fmt not in _SELF_VALIDATING_FORMATS:
await self._assert_source_processable(source, fmt, file_meta)

tmp_dir = workdir / "sort_tmp"
tmp_dir.mkdir(exist_ok=True)
bgz_path = workdir / "out.bgz"
Expand Down Expand Up @@ -224,6 +322,33 @@ async def _stage_prepare(
)
return bgz_path

async def _assert_source_processable(
self, source: Path, fmt: str, file_meta: dict[str, Any]
) -> None:
"""Reject a non-gzip/non-text source before committing anything.

Reads the leading bytes of the freshly-downloaded ``source`` and
raises ``RuntimeError`` when they are neither gzip nor text, so a
binary serialization the ``zcat -f`` pipeline cannot decompress
(e.g. a BEDOPS ``starch`` archive mislabeled ``BED`` upstream)
fails the job cleanly *before* any ``cache.put`` rather than
committing a corrupt artifact. Callers skip this for ``bigBed``,
whose binary source is handled by ``bigBedToBed``.
"""
prefix = await asyncio.to_thread(_read_prefix, source)
if _source_looks_processable(prefix):
return
local_id = file_meta.get("local_id")
dcc = file_meta.get("dcc")
raise RuntimeError(
f"Refusing to process {fmt} source for dcc={dcc!r} "
f"local_id={local_id!r}: leading bytes {prefix[:8].hex()} are "
"neither gzip nor text. An unsupported serialization (e.g. "
"BEDOPS starch) was likely mapped onto a text-interval format "
"upstream; failing before commit so no corrupt artifact is "
"cached."
)

async def _count_data_lines(self, bgz_path: Path, fmt: str) -> int:
"""Return the count of non-header data lines in ``bgz_path``.

Expand Down
Loading
Loading