diff --git a/README.md b/README.md index dbecdd8..304c155 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,7 @@ The code for the software itself is also available and citeable in Zenodo: https * [Usage](#usage) * [Further documentation on the STRkit caller, including output format](#further-documentation-on-the-strkit-caller-including-output-format) * [`strkit visualize`: Call visualizer](#strkit-visualize-call-visualizer) + * [`strkit merge`: VCF merging tool](#strkit-merge-vcf-merging-tool) * [`strkit mi`: Mendelian inheritance analysis](#strkit-mi-mendelian-inheritance-analysis) * [Usage](#usage-1) * [Further documentation](#further-documentation) @@ -296,6 +297,12 @@ the start-up instructions. +### `strkit merge`: VCF merging tool + +TODO + + + ### `strkit mi`: Mendelian inheritance analysis Using trio data, candidate de novo STR mutations (or genotyping errors/dropout rates) can be discovered diff --git a/docs/output_formats.md b/docs/output_formats.md index c17230a..4f5a24b 100644 --- a/docs/output_formats.md +++ b/docs/output_formats.md @@ -168,6 +168,7 @@ VCF meta fields (non-exhaustive): * `phasing`: present and set to `partial` if using SNV/HP phasing * `reference`: absolute file URI to FASTA reference (`file://<...>`) * `strkitVersion`: STRkit version used to generate the file +* `strkitCommand`: STRkit subcommand used to generate the VCF (either `call` or `merge`) * `strkitCatalogNumLoci`: Number of total loci in the catalog used for genotyping * `strkitCatalogHash`: SHA256 hash of the loci loaded from the catalog diff --git a/strkit/call/call_sample.py b/strkit/call/call_sample.py index d157c12..392f78a 100644 --- a/strkit/call/call_sample.py +++ b/strkit/call/call_sample.py @@ -264,9 +264,11 @@ def call_sample( from heapq import merge as heapq_merge from numpy.random import default_rng as np_default_rng + from pathlib import Path from pysam import VariantFile from strkit.logger import get_main_logger + from strkit.vcf_utils.header import build_vcf_header from .loci import load_loci from .output import ( @@ -274,7 +276,6 @@ def call_sample( output_json_report_results, output_json_report_footer, output_tsv as output_tsv_fn, - build_vcf_header, output_contig_vcf_lines, ) from .utils import get_new_seed @@ -312,8 +313,9 @@ def call_sample( vf: VariantFile | None = None if vcf_path is not None: vh = build_vcf_header( - sample_id_str, - params.reference_file, + "call", + (sample_id_str,), + Path(params.reference_file), partial_phasing=params.snv_vcf is not None or params.use_hp, num_loci=num_loci, loci_hash=loci_hash, diff --git a/strkit/call/output/__init__.py b/strkit/call/output/__init__.py index 23a1322..5707b6a 100644 --- a/strkit/call/output/__init__.py +++ b/strkit/call/output/__init__.py @@ -1,12 +1,11 @@ from .json_report import output_json_report_header, output_json_report_results, output_json_report_footer from .tsv import output_tsv -from .vcf import build_vcf_header, output_contig_vcf_lines +from .vcf import output_contig_vcf_lines __all__ = [ "output_json_report_header", "output_json_report_results", "output_json_report_footer", "output_tsv", - "build_vcf_header", "output_contig_vcf_lines", ] diff --git a/strkit/call/output/vcf.py b/strkit/call/output/vcf.py index 152a41c..94731da 100644 --- a/strkit/call/output/vcf.py +++ b/strkit/call/output/vcf.py @@ -1,14 +1,10 @@ from __future__ import annotations from collections import Counter -from datetime import datetime -from functools import cache from os.path import commonprefix -from pathlib import Path -from pysam import FastaFile, VariantHeader from typing import Iterable, TYPE_CHECKING -from strkit import __version__ +from strkit import vcf_utils as vu from strkit.utils import is_none, idx_0_getter from ..utils import cn_getter @@ -19,131 +15,19 @@ from ..types import LocusResult __all__ = [ - "build_vcf_header", "output_contig_vcf_lines", ] -# VCF_ALLELE_CNV_TR = "" - -# VCF_TR_INFO_RECORDS: tuple[tuple[str, str, str, str], ...] = ( -# ("SVLEN", "A", "Integer", "Length of the structural variant"), -# ("CN", "A", "Float", "Copy number of allele"), -# ("RN", "A", "Integer", "Total number of repeat sequences in this allele"), -# ("RUS", ".", "String", "Repeat unit sequence of the corresponding repeat sequence"), -# ("RUL", ".", "Integer", "Repeat unit length of the corresponding repeat sequence"), -# ("RB", ".", "Integer", "Total number of bases in the corresponding repeat sequence"), -# ("CIRUC", ".", "Float", "Confidence interval around RUC"), -# ("CIRB", ".", "Integer", "Confidence interval around RB"), -# ) - -VCF_INFO_VT = "VT" -VCF_INFO_MOTIF = "MOTIF" -VCF_INFO_REFMC = "REFMC" -VCF_INFO_BED_START = "BED_START" -VCF_INFO_BED_END = "BED_END" -VCF_INFO_ANCH = "ANCH" - -VT_STR = "str" -VT_SNV = "snv" - - def iter_to_upper(x: Iterable[str]) -> Iterable[str]: # noinspection PyTypeChecker return map(str.upper, x) -def build_vcf_header( - sample_id: str, reference_file: str, partial_phasing: bool, num_loci: int, loci_hash: str -) -> VariantHeader: - vh = VariantHeader() # automatically sets VCF version to 4.2 - - # Add file date - now = datetime.now() - vh.add_meta("fileDate", f"{now.year}{now.month:02d}{now.day:02d}") - - # Add source - vh.add_meta("source", "strkit") - - # Mark that we have partial phasing if we're using HP/SNVs - if partial_phasing: - vh.add_meta("phasing", "partial") - - # Add an absolute path to the reference genome - vh.add_meta("reference", f"file://{str(Path(reference_file).resolve().absolute())}") - - # Add all contigs from the reference genome file + lengths - with FastaFile(reference_file) as rf: - for contig in rf.references: - vh.contigs.add(contig, length=rf.get_reference_length(contig)) - - # Add STRkit-specific fields: - # - marking version - vh.add_meta("strkitVersion", str(__version__)) - # - indicating number of loci provided (i.e., catalogue size) - vh.add_meta("strkitCatalogNumLoci", str(num_loci)) - # - indicating hash of STRkitLocus objects (for checking catalogue sameness) - vh.add_meta("strkitCatalogLociHash", loci_hash) - - # Add CNV:TR alt type (symbolic allele: tandem repeat) - # vh.add_meta("ALT", "") - - # Set up basic VCF formats - vh.formats.add("AD", ".", "Integer", "Read depth for each allele") - vh.formats.add("ANCL", ".", "Integer", "Anchor length for the ref and each alt, five-prime of TR sequence") - vh.formats.add("CONS", ".", "String", "Consensus methods used for each alt (single/poa/best_rep)") - vh.formats.add("DP", 1, "Integer", "Read depth") - vh.formats.add("DPS", 1, "Integer", "Read depth (supporting reads only)") - vh.formats.add("GT", 1, "String", "Genotype") - vh.formats.add("MC", ".", "Integer", "Motif copy number for each allele") - vh.formats.add("MCCI", ".", "String", "Motif copy number 95% confidence interval for each allele") - vh.formats.add("MCRL", ".", "String", "Read-level motif copy numbers for each allele") - vh.formats.add("MMAS", 1, "Float", "Mean model (candidate TR sequence) alignment score across reads.") - vh.formats.add("NSNV", 1, "Integer", "Number of supporting SNVs for the STR peak-call") - vh.formats.add("PS", 1, "Integer", "Phase set") - vh.formats.add("PM", 1, "String", "Peak-calling method (dist/snv+dist/snv/hp)") - - # Set up VCF info fields - vh.info.add(VCF_INFO_VT, 1, "String", "Variant record type (str/snv)") - vh.info.add(VCF_INFO_MOTIF, 1, "String", "Motif string") - vh.info.add(VCF_INFO_REFMC, 1, "Integer", "Motif copy number in the reference genome") - vh.info.add( - VCF_INFO_BED_START, - 1, - "Integer", - "Original start position of the locus as defined in the catalog (0-based inclusive)", - ) - vh.info.add( - VCF_INFO_BED_END, - 1, - "Integer", - "Original end position of the locus as defined in the catalog (0-based exclusive, i.e., 1-based)", - ) - vh.info.add(VCF_INFO_ANCH, 1, "Integer", "Five-prime anchor size") - - # Add INFO records for tandem repeat copies - these are new to VCF4.4! TODO - # for iv in VCF_TR_INFO_RECORDS: - # vh.info.add(*iv) - - # Add the sample - vh.add_sample(sample_id) - - return vh - - def _vr_pos_key(vr: VariantRecord) -> int: return vr.pos -@cache -def _blank_entry(n_alleles: int) -> tuple[None, ...]: - return tuple([None] * n_alleles) - - -class SkipWritingLocus(Exception): - pass - - def create_result_vcf_records( params: CallParams, variant_file: VariantFile, @@ -160,7 +44,7 @@ def create_result_vcf_records( if "ref_start_anchor" not in result: logger.debug("No ref anchor for %s:%d; skipping VCF output for locus", contig, start) - raise SkipWritingLocus() + raise vu.record.SkipWritingLocus() ref_start_anchor = result["ref_start_anchor"].upper() ref_seq = result["ref_seq"].upper() @@ -176,11 +60,11 @@ def create_result_vcf_records( if any(map(is_none, peak_seqs)): # Occurs when no consensus for one of the peaks logger.error("Encountered None in results[%d].peaks.seqs: %s", result_idx, peak_seqs) - raise SkipWritingLocus() + raise vu.record.SkipWritingLocus() if any(map(is_none, peak_start_anchor_seqs)): # Occurs when no consensus for one of the peaks logger.error("Encountered None in results[%d].peaks.start_anchor_seqs: %s", result_idx, peak_start_anchor_seqs) - raise SkipWritingLocus() + raise vu.record.SkipWritingLocus() peak_start_anchor_seqs_upper = tuple(iter_to_upper(peak_start_anchor_seqs)) common_anchor_prefix = commonprefix([ref_start_anchor, *peak_start_anchor_seqs_upper]) @@ -235,24 +119,24 @@ def create_result_vcf_records( alleles=seq_alleles, ) - vr.info[VCF_INFO_VT] = VT_STR - vr.info[VCF_INFO_MOTIF] = result["motif"] - vr.info[VCF_INFO_REFMC] = result["ref_cn"] - vr.info[VCF_INFO_BED_START] = result["start"] - vr.info[VCF_INFO_BED_END] = result["end"] - vr.info[VCF_INFO_ANCH] = params.vcf_anchor_size - anchor_offset + vr.info[vu.header.VCF_INFO_VT.key] = vu.header.VT_STR + vr.info[vu.header.VCF_INFO_MOTIF.key] = result["motif"] + vr.info[vu.header.VCF_INFO_REFMC.key] = result["ref_cn"] + vr.info[vu.header.VCF_INFO_BED_START.key] = result["start"] + vr.info[vu.header.VCF_INFO_BED_END.key] = result["end"] + vr.info[vu.header.VCF_INFO_ANCH.key] = params.vcf_anchor_size - anchor_offset try: - vr.samples[sample_id]["GT"] = ( - tuple(map(seq_alleles_raw.index, seqs_with_anchors)) - if call is not None and peak_seqs - else _blank_entry(n_alleles) + vr.samples[sample_id]["GT"] = vu.record.genotype_indices( + alleles=seq_alleles_raw, + call=None if call is None or not peak_seqs else seqs_with_anchors, + n_alleles=n_alleles, ) except ValueError: logger.error( "results[%d] (locus_id=%s): one of %s not in %s", - result_idx, locus_id, seqs_with_anchors, seq_alleles_raw) - raise SkipWritingLocus() + result_idx, result["locus_id"], seqs_with_anchors, seq_alleles_raw) + raise vu.record.SkipWritingLocus() if am := result.get("assign_method"): vr.samples[sample_id]["PM"] = am @@ -329,9 +213,9 @@ def create_result_vcf_records( alleles=snv_alleles, ) - snv_vr.info[VCF_INFO_VT] = VT_SNV + snv_vr.info[vu.header.VCF_INFO_VT.key] = vu.header.VT_SNV - snv_vr.samples[sample_id]["GT"] = tuple(map(snv_alleles.index, snv["call"])) + snv_vr.samples[sample_id]["GT"] = vu.record.genotype_indices(snv_alleles, snv["call"], n_alleles) snv_vr.samples[sample_id]["DP"] = sum(snv["rcs"]) snv_vr.samples[sample_id]["AD"] = snv["rcs"] @@ -369,7 +253,7 @@ def output_contig_vcf_lines( logger, ) ) - except SkipWritingLocus: + except vu.record.SkipWritingLocus: pass # just skipping the locus, nothing to do here as we've already logged except Exception as e: # fallback if we didn't handle a case properly in create_result_vcf_records logger.exception("Error while writing VCF: unhandled exception at results[%d]", result_idx, exc_info=e) diff --git a/strkit/entry.py b/strkit/entry.py index e018458..475211f 100644 --- a/strkit/entry.py +++ b/strkit/entry.py @@ -272,6 +272,10 @@ def add_call_parser_args(call_parser): call_parser.add_argument("--profile", action="store_true", help="Profile function calls (for development.)") +def add_merge_parser_args(merge_parser): + merge_parser.add_argument("vcf_files", type=str, nargs="+", help="STRkit-generated VCF files to merge together.") + + def add_mi_parser_args(mi_parser): mi_parser.add_argument( "--debug", @@ -479,6 +483,12 @@ def _exec_call(p_args) -> None: ) +def _exec_merge(p_args) -> None: + from strkit.merge.merge_vcfs import merge_vcfs + logger = _main_logger(p_args) + merge_vcfs(tuple(Path(p) for p in p_args.vcf_files), logger) + + def _exec_mi(p_args) -> None: from strkit.mi.base import BaseCalculator from strkit.mi.expansionhunter import ExpansionHunterCalculator @@ -688,6 +698,12 @@ def _make_subparser(*names: str, help_text: str, exec_func: Callable, arg_func: exec_func=_exec_call, arg_func=add_call_parser_args) + _make_subparser( + "merge", + help_text="A merging tool for STRkit VCF output.", + exec_func=_exec_merge, + arg_func=add_merge_parser_args) + _make_subparser( "mi", help_text="A Mendelian inheritance calculator for different TR genotyping callers.", diff --git a/strkit/merge/__init__.py b/strkit/merge/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/strkit/merge/merge_vcfs.py b/strkit/merge/merge_vcfs.py new file mode 100644 index 0000000..b1398ca --- /dev/null +++ b/strkit/merge/merge_vcfs.py @@ -0,0 +1,261 @@ +from __future__ import annotations + +from pysam import VariantFile +from typing import TYPE_CHECKING, Callable + +from .. import vcf_utils as vu +from ..call.output import vcf + +if TYPE_CHECKING: + from logging import Logger + from pathlib import Path + from pysam import VariantHeader, VariantRecord + +__all__ = ["merge_vcfs"] + + +COMMON_STR_INFO_ITEMS = ( + vu.header.VCF_INFO_VT, + vu.header.VCF_INFO_MOTIF, + vu.header.VCF_INFO_REFMC, + vu.header.VCF_INFO_BED_START, + vu.header.VCF_INFO_BED_END, +) +COMMON_SNV_INFO_ITEMS = ( + vu.header.VCF_INFO_VT, +) + + +class VariantFileForMerge: + def __init__(self, path: Path): + self.path = path + self.f = VariantFile(str(path)) + self.h = self.f.header + self.samples = tuple(s for s in self.h.samples) + + def close(self): + self.f.close() + + +def merge_headers( + files: tuple[VariantFileForMerge, ...], logger: Logger +) -> tuple[VariantHeader, dict[str, VariantFileForMerge]]: + partial_phasing: bool = False + header_contigs: set[tuple[str, ...]] = set() + strkit_versions: set[str] = set() + strkit_catalog_hashes: set[str] = set() + sample_id_file_map: dict[str, VariantFileForMerge] = {} + + # -- Loop through headers to collect information for header validation and merging --------------------------------- + + for idx, f in enumerate(files, 1): + hdr = f.h + header_contigs.add(tuple(sorted(hdr.contigs))) + strkit_version_found = False + strkit_catalog_hash_found = False + for line in hdr.records: + if line.key == "phasing" and line.value == "partial": + # make phasing partial for the whole file if at least one file has it + partial_phasing = True + if line.key == "strkitVersion": + strkit_versions.add(line.value) + strkit_version_found = True + elif line.key == "strkitCatalogLociHash": + strkit_catalog_hashes.add(line.value) + strkit_catalog_hash_found = True + if not strkit_version_found: + logger.critical("did not find strkitVersion in VCF #%d", idx) + exit(1) + if not strkit_catalog_hash_found: + logger.critical("did not find strkitCatalogLociHash in VCF #%d", idx) + exit(1) + if intersect := set(hdr.samples).intersection(set(sample_id_file_map.keys())): + logger.critical("found overlap between sample IDs: %s", intersect) + exit(1) + for s in hdr.samples: + sample_id_file_map[s] = f # add samples from file to map + + # -- Perform pre-merge validation on the header data --------------------------------------------------------------- + + passed_validation = True + + # check contigs + if len(header_contigs) > 1: + logger.warning("more than one header contig set found - are all VCFs using the same reference genome?") + passed_validation = False + # check STRkit versions + if len(strkit_versions) > 1: + logger.warning( + "more than one STRkit version found (%s) - output may be different across versions", + ", ".join(sorted(strkit_versions)), + ) + passed_validation = False + # check catalog hashes + if len(strkit_catalog_hashes) > 1: + logger.warning( + "more than one catalog hash found (%s) - are all locus catalogs the same? merging may not work with " + "different catalogs" + ) + passed_validation = False + # check references + # TODO + # check info field overlaps + # TODO + + if passed_validation: + logger.info("VCF headers passed validation") + else: + logger.warning("VCF headers failed validation") + + # -- Create header for merged VCF ---------------------------------------------------------------------------------- + # TODO: maybe just create a header from scratch... + merged_header = files[0].h.copy() + for f in files[1:]: + for s in f.samples: + merged_header.add_sample(s) + + print(merged_header, end="") + + return merged_header, sample_id_file_map + + +def merge_str_records(merged_header: VariantHeader, records: list[VariantRecord], logger: Logger) -> VariantRecord: + contig = records[0].contig + + refs = tuple(p.ref for p in records) + if len(set(refs)) > 1: + logger.debug("merge_str_records: encountered >1 STR reference sequence") + + infos = set(tuple(p.info[k.key] for k in COMMON_STR_INFO_ITEMS) for p in records) + + if len(infos) > 1: + logger.warning("merge_str_records: encountered >1 set of INFO field values for STR") + # TODO: more reporting + + if len(set(refs)) > 1: + print("TODO ref consolidation!") + record_max_anchor = max(records, key=lambda p: p.info["ANCH"]) + max_anchor = record_max_anchor.info["ANCH"] + ref_max_anchor = record_max_anchor.ref + start = record_max_anchor.start + added_prefix_lengths = tuple(max_anchor - p.info["ANCH"] for p in records) + + # TODO: also consider if there's extra on the end or something? + ref = ref_max_anchor + # TODO: this misses SNVs that are in the VCF anchor. Maybe STRkit should genotype these. + # TODO: filter out ref allele + alts = sorted(set(ref_max_anchor[:added_prefix_lengths[i]] + a for i, p in enumerate(records) for a in p.alts)) + else: + ref = refs[0] + alts = sorted(set(a for p in records for a in (p.alts or ()) if a != ref), key=lambda a: (len(a), a)) + start = records[0].start + + # TODO: merge info/format fields + + # all same locus, can merge + alleles = (ref, *(alts or (".",))) + print("new str alleles", alleles) + rec = merged_header.new_record(contig, start=start, alleles=alleles, id=records[0].id) + + # -- add INFO (common across samples) to merged STR record --------------------------------------------------------- + common_info = next(iter(infos)) + for ki, k in enumerate(COMMON_STR_INFO_ITEMS): + rec.info[k] = common_info[ki] + # ------------------------------------------------------------------------------------------------------------------ + + # Done common stuff, now need to add/transform sample data + for sample in merged_header.samples: + pass + + return rec + + +def merge_snv_records(merged_header: VariantHeader, records: list[VariantRecord], logger: Logger) -> VariantRecord: + print("TODO: SNV MERGE") + print([str(r) for r in records]) + + # confirm we have a single base ref + refs = tuple(p.ref for p in records) + if len(ref_set := set(refs)) > 1: + logger.critical("merge_snv_records: encountered >1 SNP reference sequence (%s)", ref_set) + exit(1) + ref = refs[0] + + infos = set(tuple(p.info[k] for k in COMMON_SNV_INFO_ITEMS) for p in records) + + if len(infos) > 1: + logger.warning("merge_str_records: encountered >1 set of INFO field values for STR") + + alts = sorted(set(a for p in records for a in (p.alts or ()) if a != ref)) + alleles = (ref, *alts) + print(f"new snv alleles ({len(alleles)})", alleles) + rec = merged_header.new_record(contig=records[0].contig, start=records[0].start, alleles=alleles) # TODO + + common_info = next(iter(infos)) + for ki, k in enumerate(COMMON_SNV_INFO_ITEMS): + rec.info[k.key] = common_info[ki] + # ------------------------------------------------------------------------------------------------------------------ + + # Done common stuff, now need to add/transform sample data + + calls_by_sample_id = {} + for r in records: + for s_id in merged_header.samples: + calls_by_sample_id[s_id] = TODO + + for sample_id in merged_header.samples: + # TODO: preserve phasing + rec.samples[sample_id]["GT"] = vu.record.genotype_indices( + alleles, calls_by_sample_id[sample_id], TODO_N_ALLELES + ) + # TODO + # rec.samples[sample_id]["GT"] = ( + # tuple(map(alleles.index, seqs_with_anchors)) + # if rec[sample_id] is not None + # else _blank_entry(n_alleles) + # ) + pass + + return rec + + +MERGE_FUNCTION: dict[str, Callable[[VariantHeader, list[VariantRecord], Logger], VariantRecord]] = { + vcf.VT_STR: merge_str_records, + vcf.VT_SNV: merge_snv_records, +} + + +def merge_vcfs(paths: tuple[Path, ...], logger: Logger): + fhs: tuple[VariantFileForMerge, ...] = tuple(VariantFileForMerge(p) for p in paths) + + try: + # Check that headers indicate these files can be merged (they are STRkit headers and are compatible), + # then merge them. Also computes a map from sample ID --> file. + merged_header, sample_id_file_map = merge_headers(fhs, logger) # TODO + print(merged_header, end="") + + # -------------------------------------------------------------------------------------------------------------- + + iters = tuple(f.f.fetch() for f in fhs) + ptrs = [next(i, None) for i in iters] + + while any(p is not None for p in ptrs): + # find compatible records for merging based on ID (requires STRkit to output variant IDs for everything) + if all(p is not None for p in ptrs) and len(set(p.id for p in ptrs)) == 1: + print(MERGE_FUNCTION[ptrs[0].info["VT"]](merged_header, ptrs, logger), end="") + ptrs = [next(i, None) for i in iters] + else: + # we have a record which doesn't apply to all samples - either some VCFs are finished, or the pointers + # are at different points in the file. + print("TODO some vcfs!") + # TODO: selectively advance + + # TODO: for each STR, need to: + # - ensure they're compatible (same ID with same # of loci perhaps?) + # - resolve ref (which can vary between) - probably selecting the longest one and + # pasting any needed prefix (diff. between sample's ref and current ref) onto it. + # TODO: for SNVs, we can only deliver heterozygous calls. need to note this for the user. + + finally: + for fh in fhs: + fh.close() diff --git a/strkit/vcf_utils/__init__.py b/strkit/vcf_utils/__init__.py new file mode 100644 index 0000000..aaa6cc6 --- /dev/null +++ b/strkit/vcf_utils/__init__.py @@ -0,0 +1,3 @@ +__all__ = ["header", "record"] + +from . import header, record diff --git a/strkit/vcf_utils/header.py b/strkit/vcf_utils/header.py new file mode 100644 index 0000000..62ea574 --- /dev/null +++ b/strkit/vcf_utils/header.py @@ -0,0 +1,166 @@ +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime +from pysam import VariantHeader, VariantHeaderMetadata +from typing import TYPE_CHECKING +from .. import __version__ + +if TYPE_CHECKING: + from pathlib import Path + +__all__ = [ + "VCF_INFO_VT", + "VCF_INFO_MOTIF", + "VCF_INFO_REFMC", + "VCF_INFO_BED_START", + "VCF_INFO_BED_END", + "VCF_INFO_ANCH", + "VT_STR", + "VT_SNV", + "build_vcf_header", +] + + +@dataclass +class VcfInfo: + key: str + n: int | str # number of values in record row + type: str # VCF type + description: str + + def add_to_info(self, info: VariantHeaderMetadata): + """ + Add info field to variant file header metadata via mutation. + :param info: VariantHeaderMetadata instance to add the record to. This object is mutated. + """ + info.add(self.key, self.n, self.type, self.description) + + +VCF_INFO_VT = VcfInfo("VT", 1, "String", "Variant record type (str/snv)") +VCF_INFO_MOTIF = VcfInfo("MOTIF", 1, "String", "Motif string") +VCF_INFO_REFMC = VcfInfo("REFMC", 1, "Integer", "Motif copy number in the reference genome") +VCF_INFO_BED_START = VcfInfo( + "BED_START", + 1, + "Integer", + "Original start position of the locus as defined in the catalog (0-based inclusive)", +) +VCF_INFO_BED_END = VcfInfo( + "BED_END", + 1, + "Integer", + "Original end position of the locus as defined in the catalog (0-based exclusive, i.e., 1-based)", +) +VCF_INFO_ANCH = VcfInfo("ANCH", 1, "Integer", "Five-prime anchor size") + +VT_STR = "str" +VT_SNV = "snv" + +# VCF_ALLELE_CNV_TR = "" + +# VCF_TR_INFO_RECORDS: tuple[tuple[str, str, str, str], ...] = ( +# ("SVLEN", "A", "Integer", "Length of the structural variant"), +# ("CN", "A", "Float", "Copy number of allele"), +# ("RN", "A", "Integer", "Total number of repeat sequences in this allele"), +# ("RUS", ".", "String", "Repeat unit sequence of the corresponding repeat sequence"), +# ("RUL", ".", "Integer", "Repeat unit length of the corresponding repeat sequence"), +# ("RB", ".", "Integer", "Total number of bases in the corresponding repeat sequence"), +# ("CIRUC", ".", "Float", "Confidence interval around RUC"), +# ("CIRB", ".", "Integer", "Confidence interval around RB"), +# ) + + +def add_vcf_file_date(vh: VariantHeader): + """ + Adds a fileDate header metadata line to a variant header via mutation. + :param vh: The variant header to add to. + """ + now = datetime.now() + vh.add_meta("fileDate", f"{now.year}{now.month:02d}{now.day:02d}") + + +def build_vcf_header( + command: str, + sample_ids: tuple[str, ...], + reference_file: Path | str, # if Path, turn into file URI. Otherwise, treat "as-is". + partial_phasing: bool, + num_loci: int, + loci_hash: str, + contigs: tuple[tuple[str, int], ...] | None = None, +) -> VariantHeader: + vh = VariantHeader() # automatically sets VCF version to 4.2 + + # Add file date + add_vcf_file_date(vh) + + # Add source + vh.add_meta("source", "strkit") + + # Mark that we have partial phasing if we're using HP/SNVs + if partial_phasing: + vh.add_meta("phasing", "partial") + + # Add an absolute path to the reference genome. If we're passed a string (from merge), assume this is already a file + # URI. + vh.add_meta( + "reference", + f"file://{str(reference_file.resolve().absolute())}" if isinstance(reference_file, Path) else reference_file + ) + + if contigs is not None: + # If we're passed contig records directly (from merge), add them rather than pulling them from the reference. + for contig, contig_length in contigs: + vh.contigs.add(contig, length=contig_length) + else: + # Add all contigs from the reference genome file + lengths + from pysam import FastaFile + with FastaFile(str(reference_file)) as rf: + for contig in rf.references: + vh.contigs.add(contig, length=rf.get_reference_length(contig)) + + # Add STRkit-specific fields: + # - marking version + vh.add_meta("strkitVersion", str(__version__)) + # - the subcommand being used to generate this VCF (call|merge) + vh.add_meta("strkitCommand", command) + # - indicating number of loci provided (i.e., catalogue size) + vh.add_meta("strkitCatalogNumLoci", str(num_loci)) + # - indicating hash of STRkitLocus objects (for checking catalogue sameness) + vh.add_meta("strkitCatalogLociHash", loci_hash) + + # Add CNV:TR alt type (symbolic allele: tandem repeat) + # vh.add_meta("ALT", "") + + # Set up basic VCF formats + vh.formats.add("AD", ".", "Integer", "Read depth for each allele") + vh.formats.add("ANCL", ".", "Integer", "Anchor length for the ref and each alt, five-prime of TR sequence") + vh.formats.add("CONS", ".", "String", "Consensus methods used for each alt (single/poa/best_rep)") + vh.formats.add("DP", 1, "Integer", "Read depth") + vh.formats.add("DPS", 1, "Integer", "Read depth (supporting reads only)") + vh.formats.add("GT", 1, "String", "Genotype") + vh.formats.add("MC", ".", "Integer", "Motif copy number for each allele") + vh.formats.add("MCCI", ".", "String", "Motif copy number 95% confidence interval for each allele") + vh.formats.add("MCRL", ".", "String", "Read-level motif copy numbers for each allele") + vh.formats.add("MMAS", 1, "Float", "Mean model (candidate TR sequence) alignment score across reads.") + vh.formats.add("NSNV", 1, "Integer", "Number of supporting SNVs for the STR peak-call") + vh.formats.add("PS", 1, "Integer", "Phase set") + vh.formats.add("PM", 1, "String", "Peak-calling method (dist/snv+dist/snv/hp)") + + # Set up VCF info fields (mutates vh.info) + VCF_INFO_VT.add_to_info(vh.info) + VCF_INFO_MOTIF.add_to_info(vh.info) + VCF_INFO_REFMC.add_to_info(vh.info) + VCF_INFO_BED_START.add_to_info(vh.info) + VCF_INFO_BED_END.add_to_info(vh.info) + VCF_INFO_ANCH.add_to_info(vh.info) + + # Add INFO records for tandem repeat copies - these are new to VCF4.4! TODO + # for iv in VCF_TR_INFO_RECORDS: + # vh.info.add(*iv) + + # Add the sample(s) + for sample_id in sample_ids: + vh.add_sample(sample_id) + + return vh diff --git a/strkit/vcf_utils/record.py b/strkit/vcf_utils/record.py new file mode 100644 index 0000000..39e3df5 --- /dev/null +++ b/strkit/vcf_utils/record.py @@ -0,0 +1,23 @@ +class SkipWritingLocus(Exception): + pass + + +def blank_vcf_genotype(n_alleles: int) -> tuple[None, ...]: + """ + Generates a blank VCF genotype for a sample locus. + :param n_alleles: Number of alleles to use for the blank genotype. + :return: Tuple of [None] * + """ + return tuple([None] * n_alleles) + + +def genotype_indices(alleles: tuple[str, ...], call: tuple[str, ...] | None, n_alleles: int) -> tuple[int | None, ...]: + """ + Given possible variant alleles for a particular locus and a sample's call (the sample's alleles), plus the number of + possible alleles for the sample at the locus, return a VCF-compatible genotype index tuple. + :param alleles: Possible variant alleles: (ref, alt1, alt2, alt3, ...) + :param call: Call for the sample: (alt3, alt1) or None if not called. + :param n_alleles: Number of alleles for the sample locus (used for computing blank genotypes if not called.) + :return: Genotype index tuple, e.g., (3, 1). + """ + return tuple(map(alleles.index, call)) if call is not None else blank_vcf_genotype(n_alleles)