From 7a73f5ff125058724723fa113e4735203113b25f Mon Sep 17 00:00:00 2001 From: Jenny Empawi <80464805+jaempawi@users.noreply.github.com> Date: Fri, 27 Mar 2026 13:55:22 -0400 Subject: [PATCH 1/2] Delete code/reference_data/rss_ld_sketch.ipynb --- code/reference_data/rss_ld_sketch.ipynb | 831 ------------------------ 1 file changed, 831 deletions(-) delete mode 100644 code/reference_data/rss_ld_sketch.ipynb diff --git a/code/reference_data/rss_ld_sketch.ipynb b/code/reference_data/rss_ld_sketch.ipynb deleted file mode 100644 index 53b626c50..000000000 --- a/code/reference_data/rss_ld_sketch.ipynb +++ /dev/null @@ -1,831 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "id": "a1b2c3d4-0001-4001-a001-000000000001", - "metadata": { - "kernel": "SoS" - }, - "source": [ - "# RSS LD Sketch Pipeline\n", - "\n", - "## Overview\n", - "\n", - "This pipeline generates a stochastic genotype sample **U = WᵀG** from whole-genome sequencing VCF files and stores it as a PLINK2 pgen file for use as an LD reference panel with SuSiE-RSS fine-mapping.\n", - "\n", - "**Key idea:** Rather than storing the full genotype matrix G (n × p, ~530 MB/block), we compute U = WᵀG (B × p, ~320 MB/block) using a random projection matrix W ~ N(0, 1/√n). The approximate LD matrix R̂ = UᵀU / B ≈ R = GᵀG / n by the Johnson-Lindenstrauss lemma. G is never stored.\n", - "\n", - "**Matrix dimensions:**\n", - "- G : (n × p) — n individuals × p variants (~8,000 per block)\n", - "- W : (n × B) — projection matrix, W ~ N(0, 1/√n), generated once per cohort\n", - "- U : (B × p) — stochastic genotype sample = WᵀG, stored in pgen\n", - "- R̂ : (p × p) — approximate LD matrix, computed on-the-fly by SuSiE-RSS from U\n", - "\n", - "**Pipeline steps:**\n", - "1. `generate_W` — generate W once from cohort sample size n (saved as `.npy`, shared across all chromosomes)\n", - "2. `process_block_1` — read LD block BED file, emit one coordinate file per block\n", - "3. `process_block_2` — for each block in parallel: load VCF, compute U = WᵀG, scale to [0,2], write compressed dosage + allele frequencies\n", - "4. `merge_chrom` — for each chromosome: convert per-block dosage files to sorted pgen, merge into one per-chromosome pgen, clean up all intermediates\n", - "\n", - "## Input Files\n", - "\n", - "| File | Description |\n", - "|------|-------------|\n", - "| VCF (.bgz) | Per-chromosome VCF files with tabix index |\n", - "| `sample_list.txt` | Optional. One sample ID per line. Leave empty to use all samples. |\n", - "| `EUR_LD_blocks.bed` | LD block BED file (0-based half-open, e.g. Berisa & Pickrell EUR blocks) |\n", - "\n", - "## Parameters\n", - "\n", - "**Global parameters:**\n", - "\n", - "| Parameter | Default | Description |\n", - "|-----------|---------|-------------|\n", - "| `plink2_bin` | `plink2` | Path to plink2 binary |\n", - "| `walltime` | `24:00:00` | Per-task walltime |\n", - "| `mem` | `32G` | Per-task memory |\n", - "| `numThreads` | `8` | Per-task CPU cores |\n", - "\n", - "**`generate_W` parameters:**\n", - "\n", - "| Parameter | Default | Description |\n", - "|-----------|---------|-------------|\n", - "| `n_samples` | required | Total number of individuals in the cohort |\n", - "| `output_dir` | required | Output directory for `W_B{B}.npy` |\n", - "| `B` | `10000` | Sketch dimension (number of pseudo-individuals) |\n", - "| `seed` | `123` | Random seed |\n", - "\n", - "**`process_block` parameters:**\n", - "\n", - "| Parameter | Default | Description |\n", - "|-----------|---------|-------------|\n", - "| `vcf_base` | required | Directory containing VCF (.bgz) files |\n", - "| `vcf_prefix` | required | Shared filename prefix of all VCF files |\n", - "| `ld_block_file` | required | Path to LD block BED file |\n", - "| `chrom` | `0` | Chromosome 1–22, or 0 for all chromosomes |\n", - "| `cohort_id` | `ADSP.R5.EUR` | Cohort prefix for output filenames |\n", - "| `output_dir` | required | Base output directory |\n", - "| `W_matrix` | required | Path to W `.npy` output from `generate_W` |\n", - "| `B` | `10000` | Must match W |\n", - "| `sample_list` | `\"\"` | Optional path to sample ID file |\n", - "| `maf_min` | `0.0005` | Minimum minor allele frequency |\n", - "| `mac_min` | `5` | Minimum minor allele count |\n", - "| `msng_min` | `0.05` | Maximum missingness rate per variant |\n", - "\n", - "**`merge_chrom` parameters:**\n", - "\n", - "| Parameter | Default | Description |\n", - "|-----------|---------|-------------|\n", - "| `chrom` | `0` | Chromosome 1–22, or 0 for all chromosomes |\n", - "| `output_dir` | required | Base output directory (same as `process_block`) |\n", - "| `cohort_id` | `ADSP.R5.EUR` | Must match `process_block` cohort_id |\n", - "\n", - "## Output Files\n", - "\n", - "| File | Description |\n", - "|------|-------------|\n", - "| `W_B{B}.npy` | Projection matrix W, shape (n × B). Output of `generate_W`. |\n", - "| `{cohort_id}.chr{N}.pgen` | Final output — per-chromosome stochastic genotype sample U as PLINK2 binary |\n", - "| `{cohort_id}.chr{N}.pvar` | Per-chromosome variant information |\n", - "| `{cohort_id}.chr{N}.psam` | Pseudo-individual IDs S1…S{B} |\n", - "| `{cohort_id}.chr{N}.afreq` | Per-chromosome allele frequencies (OBS_CT = 2 × n) |\n", - "| *(cleaned up after merge)* | Per-block `.dosage.gz`, `.map`, `.afreq`, `.meta` intermediate files |\n" - ] - }, - { - "cell_type": "markdown", - "id": "3547538e-ef0f-49d3-aa45-208a278f5d3c", - "metadata": {}, - "source": [ - "### Step 0 — Generate W (run once per cohort)\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "a1b2c3d4-0002-4002-a002-000000000002", - "metadata": { - "kernel": "SoS" - }, - "outputs": [], - "source": [ - "sos run pipeline/rss_ld_sketch.ipynb generate_W \\\n", - " --n-samples 16571 \\\n", - " --output-dir output/rss_ld_sketch \\\n", - " --B 10000 \\\n", - " --seed 123\n" - ] - }, - { - "cell_type": "markdown", - "id": "e2559a2e-8bdd-443b-980f-26fe7f728674", - "metadata": {}, - "source": [ - "### Step 1+2 — Process all blocks → compressed dosage files\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "a1b2c3d4-0003-4003-a003-000000000003", - "metadata": { - "kernel": "SoS" - }, - "outputs": [], - "source": [ - "# Process one chromosome\n", - "sos run pipeline/rss_ld_sketch.ipynb process_block \\\n", - " --vcf-base /path/to/vcfs/ \\\n", - " --vcf-prefix your.prefix. \\\n", - " --ld-block-file /path/to/EUR_LD_blocks.bed \\\n", - " --chrom 22 \\\n", - " --output-dir output/rss_ld_sketch \\\n", - " --W-matrix output/rss_ld_sketch/W_B10000.npy " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "7620754c-5060-460f-a7a3-576dcf686270", - "metadata": {}, - "outputs": [], - "source": [ - "# Process all 22 chromosomes\n", - "sos run pipeline/rss_ld_sketch.ipynb process_block \\\n", - " --vcf-base /restricted/projectnb/xqtl/R4_QC_NHWonly_rm_monomorphic \\\n", - " --vcf-prefix gcad.qc.r4.wgs.36361.GATK.2022.08.15.biallelic.genotypes. \\\n", - " --ld-block-file /restricted/projectnb/casa/oaolayin/ROSMAP_NIA_geno/EUR_LD_blocks.bed \\\n", - " --chrom 0 \\\n", - " --output-dir output/rss_ld_sketch \\\n", - " --W-matrix output/rss_ld_sketch/W_B10000.npy \n", - "\n" - ] - }, - { - "cell_type": "markdown", - "id": "19fdaf69-ae0d-45fb-b7a1-dc5a03ff9a51", - "metadata": {}, - "source": [ - "### Step 3 — Merge per-block dosage files into per-chromosome pgen\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "f06fcbea-891d-4b3c-937d-817678c8858d", - "metadata": {}, - "outputs": [], - "source": [ - "sos run pipeline/rss_ld_sketch.ipynb merge_chrom \\\n", - " --output-dir output/rss_ld_sketch \\\n", - " --chrom 0\n", - "\n" - ] - }, - { - "cell_type": "markdown", - "id": "a1b2c3d4-0004-4004-a004-000000000004", - "metadata": { - "kernel": "SoS" - }, - "source": [ - "## Command Interface" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "a1b2c3d4-0005-4005-a005-000000000005", - "metadata": { - "kernel": "SoS" - }, - "outputs": [], - "source": [ - "sos run pipeline/rss_ld_sketch.ipynb -h" - ] - }, - { - "cell_type": "markdown", - "id": "a1b2c3d4-0006-4006-a006-000000000006", - "metadata": { - "kernel": "SoS" - }, - "source": [ - "```\n", - "usage: sos run pipeline/rss_ld_sketch.ipynb\n", - " [workflow_name | -t targets] [options] [workflow_options]\n", - " workflow_name: Single or combined workflows defined in this script\n", - " options: Single-hyphen sos parameters (see \"sos run -h\" for details)\n", - " workflow_options: Double-hyphen workflow-specific parameters\n", - "\n", - "Workflows:\n", - " generate_W\n", - " process_block\n", - " merge_chrom\n", - "\n", - "Global Workflow Options:\n", - " --walltime 24:00:00\n", - " --mem 32G\n", - " --numThreads 8 (as int)\n", - " --plink2-bin plink2\n", - "\n", - "Sections\n", - " generate_W:\n", - " Workflow Options:\n", - " --n-samples VAL (as int, required)\n", - " Generate projection matrix W ~ N(0, 1/sqrt(n)), shape (n x B).\n", - " Run ONCE per cohort before any chromosome processing.\n", - " W depends only on n (sample size) and B — not on variant data.\n", - " All 22 chromosomes reuse the same W.\n", - " --output-dir VAL (as str, required)\n", - " --B 10000 (as int)\n", - " --seed 123 (as int)\n", - "\n", - " process_block:\n", - " Workflow Options:\n", - " --vcf-base VAL (as str, required)\n", - " Directory containing VCF (.bgz) files, one or more per chromosome.\n", - " --vcf-prefix VAL (as str, required)\n", - " Shared filename prefix of all VCF files (everything before \"chr{N}:\" or \"chr{N}.\").\n", - " --ld-block-file VAL (as str, required)\n", - " Path to LD block BED file (e.g. EUR_LD_blocks.bed).\n", - " --chrom 0 (as int)\n", - " Chromosome 1–22, or 0 for all chromosomes.\n", - " --cohort-id ADSP.R5.EUR (as str)\n", - " Cohort prefix for output filenames.\n", - " --output-dir VAL (as str, required)\n", - " --W-matrix VAL (as str, required)\n", - " --B 10000 (as int)\n", - " --maf-min 0.0005 (as float)\n", - " --mac-min 5 (as int)\n", - " --msng-min 0.05 (as float)\n", - " --sample-list \"\" (optional path to sample ID file)\n", - " Output per block (intermediate, cleaned up by merge_chrom):\n", - " {cohort_id}.{chr}_{start}_{end}.dosage.gz\n", - " {cohort_id}.{chr}_{start}_{end}.map\n", - " {cohort_id}.{chr}_{start}_{end}.afreq\n", - " {cohort_id}.{chr}_{start}_{end}.meta\n", - "\n", - " merge_chrom:\n", - " Workflow Options:\n", - " --chrom 0 (as int)\n", - " Chromosome 1–22, or 0 for all chromosomes.\n", - " --cohort-id ADSP.R5.EUR (as str)\n", - " Must match cohort-id used in process_block.\n", - " --output-dir VAL (as str, required)\n", - " Output per chromosome (final, block intermediates cleaned up):\n", - " {cohort_id}.chr{N}.pgen\n", - " {cohort_id}.chr{N}.pvar\n", - " {cohort_id}.chr{N}.psam\n", - " {cohort_id}.chr{N}.afreq\n", - "```\n" - ] - }, - { - "cell_type": "markdown", - "id": "a1b2c3d4-0007-4007-a007-000000000007", - "metadata": { - "kernel": "SoS" - }, - "source": [ - "# Pipeline Implementation\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "a1b2c3d4-0008-4008-a008-000000000008", - "metadata": { - "kernel": "SoS" - }, - "outputs": [], - "source": [ - "[global]\n", - "parameter: cwd = path(\"output\")\n", - "parameter: job_size = 1\n", - "parameter: walltime = \"24:00:00\"\n", - "parameter: mem = \"16G\"\n", - "parameter: numThreads = 16\n", - "parameter: container = \"\"\n", - "parameter: plink2_bin = \"plink2\"\n", - "\n", - "import re\n", - "from sos.utils import expand_size\n", - "\n", - "entrypoint = (\n", - " 'micromamba run -a \"\" -n' + ' ' +\n", - " re.sub(r'(_apptainer:latest|_docker:latest|\\.sif)$', '', container.split('/')[-1])\n", - ") if container else \"\"\n", - "\n", - "cwd = path(f'{cwd:a}')\n" - ] - }, - { - "cell_type": "markdown", - "id": "a1b2c3d4-0009-4009-a009-000000000009", - "metadata": { - "kernel": "SoS" - }, - "source": [ - "## `generate_W`\n", - "\n", - "Generates the random projection matrix W ~ N(0, 1/√n), shape (n × B). Run once per cohort. W is shared across all chromosomes.\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "a1b2c3d4-0010-4010-a010-000000000010", - "metadata": { - "kernel": "SoS" - }, - "outputs": [], - "source": [ - "[generate_W]\n", - "# Generate projection matrix W ~ N(0, 1/sqrt(n)), shape (n x B).\n", - "# Run ONCE before processing any chromosome.\n", - "#\n", - "# W depends only on n (total sample size) and B — not on any variant data.\n", - "# n_samples is passed directly as a parameter; no VCF reading is needed.\n", - "# All 22 chromosomes reuse the same W so that per-chromosome stochastic\n", - "# genotype samples can be arithmetically merged for meta-analysis.\n", - "parameter: n_samples = int\n", - "parameter: output_dir = str\n", - "parameter: B = 10000\n", - "parameter: seed = 123\n", - "\n", - "import os\n", - "input: []\n", - "output: f'{output_dir}/W_B{B}.npy'\n", - "task: trunk_workers = 1, trunk_size = 1, walltime = '00:05:00', mem = '4G', cores = 1\n", - "python: expand = \"${ }\", stdout = f'{_output:n}.stdout', stderr = f'{_output:n}.stderr'\n", - "\n", - " import numpy as np\n", - " import os\n", - "\n", - " n = ${n_samples}\n", - " B = ${B}\n", - " seed = ${seed}\n", - " W_out = \"${_output}\"\n", - "\n", - " # ── Generate W ~ N(0, 1/sqrt(n)) ─────────────────────────────\n", - " # Convention: W = np.random.normal(0, 1/np.sqrt(n), size=(n, B))\n", - " # W is shared across all chromosomes — do not regenerate per chromosome.\n", - " print(f\"Generating W ~ N(0, 1/sqrt({n})), shape ({n}, {B}), seed={seed}\")\n", - " np.random.seed(seed)\n", - " W = np.random.normal(0, 1.0 / np.sqrt(n), size=(n, B)).astype(np.float32)\n", - "\n", - " os.makedirs(os.path.dirname(os.path.abspath(W_out)), exist_ok=True)\n", - " np.save(W_out, W)\n", - " print(f\"Saved: {W_out}\")\n", - " print(f\"Shape: {W.shape}, size: {os.path.getsize(W_out)/1e9:.2f} GB\")" - ] - }, - { - "cell_type": "markdown", - "id": "a1b2c3d4-0011-4011-a011-000000000011", - "metadata": { - "kernel": "SoS" - }, - "source": [ - "## `process_block`\n", - "\n", - "Two-step workflow. `process_block_1` reads the BED file and emits one coordinate file per LD block. `process_block_2` reads the VCF for each block, computes U = WᵀG, scales to [0,2], and writes compressed dosage + allele frequency files. G is never stored.\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "a1b2c3d4-0012-4012-a012-000000000012", - "metadata": { - "kernel": "SoS" - }, - "outputs": [], - "source": [ - "[process_block_1]\n", - "# Read BED file and write one .block coord file per LD block.\n", - "parameter: ld_block_file = str\n", - "parameter: chrom = 0\n", - "\n", - "import os\n", - "\n", - "def _read_blocks(bed, chrom_filter):\n", - " blocks = []\n", - " with open(bed) as fh:\n", - " for line in fh:\n", - " if line.startswith(\"#\") or not line.strip():\n", - " continue\n", - " parts = line.split()\n", - " c = parts[0]\n", - " if not (c.startswith(\"chr\") and c[3:].isdigit()):\n", - " continue\n", - " cnum = int(c[3:])\n", - " if not (1 <= cnum <= 22):\n", - " continue\n", - " if chrom_filter != 0 and cnum != chrom_filter:\n", - " continue\n", - " blocks.append((c, int(parts[1]), int(parts[2])))\n", - " if not blocks:\n", - " raise ValueError(f\"No blocks found for chrom={chrom_filter} in {bed}\")\n", - " return blocks\n", - "\n", - "_blocks = _read_blocks(ld_block_file, chrom)\n", - "print(f\" {len(_blocks)} LD blocks queued\")\n", - "\n", - "import tempfile\n", - "block_dir = tempfile.mkdtemp(prefix=\"sos_blocks_\")\n", - "\n", - "input: []\n", - "output: [f\"{block_dir}/{c}_{s}_{e}.block\" for c, s, e in _blocks]\n", - "python: expand = \"${ }\"\n", - " import os, json\n", - " blocks = ${_blocks!r}\n", - " block_dir = \"${block_dir}\"\n", - " for c, s, e in blocks:\n", - " path = os.path.join(block_dir, f\"{c}_{s}_{e}.block\")\n", - " with open(path, \"w\") as fh:\n", - " fh.write(f\"{c}\\n{s}\\n{e}\\n\")\n" - ] - }, - { - "cell_type": "markdown", - "id": "a1b2c3d4-0031-4031-a031-000000000031", - "metadata": {}, - "source": [ - "## `process_block_2`\n", - "\n", - "Per-block parallel task. Reads VCF, computes U = WᵀG in chunks, scales to [0,2], writes `.dosage.gz`, `.map`, `.afreq`, `.meta`. Output is intermediate — cleaned up by `merge_chrom`.\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "a1b2c3d4-0030-4030-a030-000000000030", - "metadata": { - "kernel": "SoS" - }, - "outputs": [], - "source": [ - "[process_block_2]\n", - "parameter: vcf_base = str\n", - "parameter: vcf_prefix = str\n", - "parameter: cohort_id = \"ADSP.R5.EUR\"\n", - "parameter: output_dir = str\n", - "parameter: W_matrix = str\n", - "parameter: B = 10000\n", - "parameter: maf_min = 0.0005\n", - "parameter: mac_min = 5\n", - "parameter: msng_min = 0.05\n", - "parameter: sample_list = \"\"\n", - "input: output_from(\"process_block_1\"), group_by = 1\n", - "task: trunk_workers = 1, trunk_size = 1, walltime = walltime, mem = mem, cores = numThreads\n", - "python: expand = \"${ }\"\n", - "\n", - " import numpy as np\n", - " import os\n", - " import gzip\n", - " import sys\n", - " from math import nan\n", - " from cyvcf2 import VCF\n", - " from os import listdir\n", - "\n", - " # Read block coordinates from input file\n", - " with open(\"${_input}\") as fh:\n", - " lines = fh.read().splitlines()\n", - " chrm_str = lines[0]\n", - " block_start = int(lines[1])\n", - " block_end = int(lines[2])\n", - "\n", - " vcf_base = \"${vcf_base}\"\n", - " vcf_prefix = \"${vcf_prefix}\"\n", - " W_path = \"${W_matrix}\"\n", - " B = ${B}\n", - " maf_min = ${maf_min}\n", - " mac_min = ${mac_min}\n", - " msng_min = ${msng_min}\n", - " sample_list = \"${sample_list}\"\n", - " cohort_id = \"${cohort_id}\"\n", - " base_dir = \"${output_dir}\"\n", - "\n", - " block_tag = f\"{chrm_str}_{block_start}_{block_end}\"\n", - " output_dir = os.path.join(base_dir, chrm_str, block_tag)\n", - " os.makedirs(output_dir, exist_ok=True)\n", - "\n", - " log_path = os.path.join(output_dir, f\"{block_tag}.log\")\n", - " log_fh = open(log_path, \"w\")\n", - " sys.stdout = log_fh\n", - " sys.stderr = log_fh\n", - "\n", - " # ── Load sample subset (optional) ─────────────────────────────\n", - " sample_subset = None\n", - " if sample_list:\n", - " if not os.path.exists(sample_list):\n", - " raise FileNotFoundError(f\"sample_list not found: {sample_list}\")\n", - " with open(sample_list) as fh:\n", - " sample_subset = set(line.strip() for line in fh if line.strip())\n", - " print(f\" Sample subset: {len(sample_subset):,} samples\")\n", - "\n", - " # ── Helpers ───────────────────────────────────────────────────\n", - " def get_vcf_files(chrm_str):\n", - " files = sorted([\n", - " os.path.join(vcf_base, x)\n", - " for x in listdir(vcf_base)\n", - " if x.endswith(\".bgz\") and (\n", - " x.startswith(vcf_prefix + chrm_str + \":\") or\n", - " x.startswith(vcf_prefix + chrm_str + \".\")\n", - " )\n", - " ])\n", - " if not files:\n", - " raise FileNotFoundError(f\"No VCF files for {chrm_str} in {vcf_base}\")\n", - " return files\n", - "\n", - " def fill_missing_col_means(G):\n", - " col_means = np.nanmean(G, axis=0)\n", - " return np.where(np.isnan(G), col_means, G)\n", - "\n", - " # ── Step 1: Scan variants in block ────────────────────────────\n", - " print(f\"[1/3] Scanning {chrm_str} [{block_start:,}, {block_end:,}) ...\")\n", - " vcf_files = get_vcf_files(chrm_str)\n", - " region = f\"{chrm_str}:{block_start+1}-{block_end}\"\n", - " var_info = []\n", - " n_samples = None\n", - "\n", - " for vf in vcf_files:\n", - " vcf = VCF(vf)\n", - " if sample_subset is not None:\n", - " vcf_samples = vcf.samples\n", - " keep_idx = [i for i, s in enumerate(vcf_samples) if s in sample_subset]\n", - " if not keep_idx:\n", - " raise ValueError(f\"No sample_list samples in {os.path.basename(vf)}\")\n", - " vcf.set_samples([vcf_samples[i] for i in keep_idx])\n", - " if n_samples is None:\n", - " n_samples = len(vcf.samples)\n", - " for var in vcf(region):\n", - " if not (block_start <= var.POS - 1 < block_end):\n", - " continue\n", - " if len(var.ALT) != 1:\n", - " continue\n", - " dosage = [\n", - " sum(x[0:2])\n", - " for x in [[nan if v == -1 else v for v in gt]\n", - " for gt in var.genotypes]\n", - " ]\n", - " if np.nanvar(dosage) == 0:\n", - " continue\n", - " counts = [np.nansum([2 - x for x in dosage]), np.nansum(dosage)]\n", - " nan_count = int(np.sum(np.isnan(dosage)))\n", - " n_non_na = len(dosage) - nan_count\n", - " mac = min(counts)\n", - " maf = mac / n_non_na\n", - " af = float(np.nansum(dosage)) / (2 * n_non_na)\n", - " msng_rate = nan_count / (n_non_na + nan_count)\n", - " if maf < maf_min or mac < mac_min or msng_rate > msng_min:\n", - " continue\n", - " var_info.append({\n", - " \"chr\": var.CHROM, \"pos\": var.POS,\n", - " \"ref\": var.REF, \"alt\": var.ALT[0],\n", - " \"af\": round(float(af), 6),\n", - " \"id\": f\"{var.CHROM}:{var.POS}:{var.REF}:{var.ALT[0]}\",\n", - " })\n", - " vcf.close()\n", - " print(f\" {len(var_info):,} variants passing filters (n={n_samples:,})\")\n", - "\n", - " if not var_info:\n", - " raise ValueError(f\"No passing variants in {chrm_str} [{block_start:,}, {block_end:,})\")\n", - "\n", - " # ── Step 2: Load W ────────────────────────────────────────────\n", - " print(f\"[2/3] Loading W ...\")\n", - " W = np.load(W_path)\n", - " if W.shape != (n_samples, B):\n", - " raise ValueError(f\"W shape mismatch: {W.shape} vs ({n_samples},{B})\")\n", - " W = W.astype(np.float32)\n", - " print(f\" W: {W.shape}\")\n", - "\n", - " # ── Step 3: Compute U, write compressed dosage + map + afreq ──\n", - " print(f\"[3/3] Computing U and writing output files ...\")\n", - " var_lookup = {v[\"id\"]: v for v in var_info}\n", - " var_id_set = set(var_lookup)\n", - "\n", - " dosage_path = os.path.join(output_dir, f\"{cohort_id}.{block_tag}.dosage.gz\")\n", - " map_path = os.path.join(output_dir, f\"{cohort_id}.{block_tag}.map\")\n", - " afreq_path = os.path.join(output_dir, f\"{cohort_id}.{block_tag}.afreq\")\n", - " meta_path = os.path.join(output_dir, f\"{cohort_id}.{block_tag}.meta\")\n", - "\n", - " # Write .map\n", - " with open(map_path, \"w\") as fh:\n", - " for v in var_info:\n", - " fh.write(f\"{v['chr'].replace('chr','')}\\t{v['id']}\\t0\\t{v['pos']}\\n\")\n", - "\n", - " # Write .meta\n", - " with open(meta_path, \"w\") as fh:\n", - " fh.write(f\"source_n_samples={n_samples}\\nB={B}\\n\")\n", - " fh.write(f\"chrom={chrm_str}\\nblock_start={block_start}\\nblock_end={block_end}\\n\")\n", - "\n", - " # Write .afreq\n", - " obs_ct = 2 * n_samples\n", - " with open(afreq_path, \"w\") as fh:\n", - " fh.write(\"#CHROM\\tID\\tREF\\tALT\\tALT_FREQS\\tOBS_CT\\n\")\n", - " for v in var_info:\n", - " fh.write(f\"{v['chr'].replace('chr','')}\\t{v['id']}\\t{v['ref']}\\t{v['alt']}\\t\"\n", - " f\"{v['af']:.6f}\\t{obs_ct}\\n\")\n", - "\n", - " # Compute U and write compressed dosage (format=1: ID ALT REF val_S1 ... val_SB)\n", - " total = 0\n", - " with gzip.open(dosage_path, \"wt\", compresslevel=4) as gz:\n", - " for vf in vcf_files:\n", - " arr, var_keys = [], []\n", - " vcf_obj = VCF(vf)\n", - " for var in vcf_obj(region):\n", - " if not (block_start <= var.POS - 1 < block_end):\n", - " continue\n", - " if len(var.ALT) != 1:\n", - " continue\n", - " vid = f\"{var.CHROM}:{var.POS}:{var.REF}:{var.ALT[0]}\"\n", - " if vid not in var_id_set:\n", - " continue\n", - " dosage = [\n", - " sum(x[0:2])\n", - " for x in [[nan if v == -1 else v for v in gt]\n", - " for gt in var.genotypes]\n", - " ]\n", - " arr.append(dosage)\n", - " var_keys.append(vid)\n", - " vcf_obj.close()\n", - " if not arr:\n", - " continue\n", - " G_chunk = np.array(arr, dtype=np.float32).T\n", - " G_chunk = fill_missing_col_means(G_chunk)\n", - " U_chunk = W.T @ G_chunk\n", - " del G_chunk\n", - " col_min = U_chunk.min(axis=0)\n", - " col_max = U_chunk.max(axis=0)\n", - " denom = col_max - col_min\n", - " denom[denom == 0] = 1.0\n", - " U_chunk = 2.0 * (U_chunk - col_min) / denom\n", - " U_chunk = np.round(U_chunk, 4)\n", - " for j, vid in enumerate(var_keys):\n", - " v = var_lookup[vid]\n", - " vals = \" \".join(f\"{x:.4f}\" for x in U_chunk[:, j])\n", - " gz.write(f\"{vid} {v['alt']} {v['ref']} {vals}\\n\")\n", - " total += len(var_keys)\n", - " del U_chunk\n", - " print(f\" Written: {total:,} variants → {os.path.basename(dosage_path)}\")\n", - " print(f\" Written: {os.path.basename(map_path)}, {os.path.basename(afreq_path)}\")\n", - " print(f\"\\nDone: {chrm_str} [{block_start:,}, {block_end:,})\")\n" - ] - }, - { - "cell_type": "markdown", - "id": "a1b2c3d4-0040-4040-a040-000000000040", - "metadata": {}, - "source": [ - "## `merge_chrom`\n", - "\n", - "For each chromosome: converts per-block `.dosage.gz` files to sorted pgen (two-pass: import then `--sort-vars`), merges all blocks into one per-chromosome pgen using `plink2 --pmerge-list`, concatenates allele frequencies, and removes all per-block intermediate directories. Output: `{cohort_id}.chr{N}.{pgen,pvar,psam,afreq}`.\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "a1b2c3d4-0041-4041-a041-000000000041", - "metadata": { - "kernel": "SoS" - }, - "outputs": [], - "source": [ - "[merge_chrom]\n", - "# Merge all per-block dosage files for a chromosome into one per-chrom pgen.\n", - "# Steps per chromosome:\n", - "# 1. For each block: plink2 --import-dosage → unsorted pgen, then --sort-vars → sorted pgen\n", - "# 2. plink2 --pmerge-list → one per-chrom pgen\n", - "# 3. Clean up all intermediates including -merge.* files\n", - "#\n", - "# Run after process_block completes (or run together: sos run ... process_block merge_chrom)\n", - "parameter: chrom = 0\n", - "parameter: output_dir = str\n", - "parameter: cohort_id = \"ADSP.R5.EUR\"\n", - "\n", - "import os, glob\n", - "\n", - "def _chroms_to_process(output_dir, chrom_filter):\n", - " if chrom_filter != 0:\n", - " return [f\"chr{chrom_filter}\"]\n", - " return sorted(set(\n", - " os.path.basename(d)\n", - " for d in glob.glob(os.path.join(output_dir, \"chr*\"))\n", - " if os.path.isdir(d)\n", - " ))\n", - "\n", - "_chroms = _chroms_to_process(output_dir, chrom)\n", - "\n", - "input: []\n", - "output: [f\"{output_dir}/{c}/{cohort_id}.{c}.pgen\" for c in _chroms]\n", - "\n", - "for _chrom in _chroms:\n", - " bash(f\"\"\"\n", - " set -euo pipefail\n", - " shopt -s nullglob\n", - "\n", - " chrom_dir=\"{output_dir}/{_chrom}\"\n", - " final_prefix=\"${{chrom_dir}}/{cohort_id}.{_chrom}\"\n", - " merge_list=\"${{chrom_dir}}/{cohort_id}.{_chrom}_pmerge_list.txt\"\n", - "\n", - " # ── Step 1: Convert each block dosage.gz → sorted per-block pgen (two-pass) ──\n", - " > \"${{merge_list}}\"\n", - " files=(\"${{chrom_dir}}\"/*/*.dosage.gz)\n", - " if [ ${{#files[@]}} -eq 0 ]; then\n", - " exit 1\n", - " fi\n", - " for dosage_gz in \"${{files[@]}}\"; do\n", - " block_dir=$(dirname \"${{dosage_gz}}\")\n", - " block_tag=$(basename \"${{block_dir}}\")\n", - " prefix=\"${{block_dir}}/{cohort_id}.${{block_tag}}_tmp\"\n", - " map_file=\"${{block_dir}}/{cohort_id}.${{block_tag}}.map\"\n", - " psam_file=\"${{block_dir}}/{cohort_id}.${{block_tag}}.psam\"\n", - " meta_file=\"${{block_dir}}/{cohort_id}.${{block_tag}}.meta\"\n", - " B=$(grep \"^B=\" \"${{meta_file}}\" | cut -d= -f2)\n", - " printf '#FID\\tIID\\n' > \"${{psam_file}}\"\n", - " for i in $(seq 1 ${{B}}); do\n", - " printf 'S%d\\tS%d\\n' ${{i}} ${{i}} >> \"${{psam_file}}\"\n", - " done\n", - " # Pass 1: import dosage → unsorted pgen\n", - " \"{plink2_bin}\" \\\n", - " --import-dosage \"${{dosage_gz}}\" format=1 noheader \\\n", - " --psam \"${{psam_file}}\" \\\n", - " --map \"${{map_file}}\" \\\n", - " --make-pgen \\\n", - " --out \"${{prefix}}_unsorted\" \\\n", - " --silent\n", - " # Pass 2: sort variants\n", - " \"{plink2_bin}\" \\\n", - " --pfile \"${{prefix}}_unsorted\" \\\n", - " --make-pgen \\\n", - " --sort-vars \\\n", - " --out \"${{prefix}}\" \\\n", - " --silent\n", - " rm -f \"${{prefix}}_unsorted.pgen\" \"${{prefix}}_unsorted.pvar\" \"${{prefix}}_unsorted.psam\"\n", - " echo \"${{prefix}}\" >> \"${{merge_list}}\"\n", - " done\n", - "\n", - " # ── Step 2: Merge all per-block pgens → one per-chrom pgen ──\n", - " \"{plink2_bin}\" \\\n", - " --pmerge-list \"${{merge_list}}\" pfile \\\n", - " --make-pgen \\\n", - " --sort-vars \\\n", - " --out \"${{final_prefix}}\"\n", - "\n", - " # ── Step 3: Concatenate .afreq ──\n", - " first=1\n", - " for f in \"${{chrom_dir}}\"/*/*.afreq; do\n", - " if [ \"${{first}}\" -eq 1 ]; then\n", - " cat \"${{f}}\" > \"${{final_prefix}}.afreq\"\n", - " first=0\n", - " else\n", - " tail -n +2 \"${{f}}\" >> \"${{final_prefix}}.afreq\"\n", - " fi\n", - " done\n", - "\n", - " # ── Step 4: Cleanup — remove merge intermediates and per-block dirs ──\n", - " rm -f \"${{merge_list}}\"\n", - " rm -f \"${{final_prefix}}-merge.pgen\" \"${{final_prefix}}-merge.pvar\" \"${{final_prefix}}-merge.psam\"\n", - " for block_dir in \"${{chrom_dir}}\"/*/; do\n", - " rm -rf \"${{block_dir}}\"\n", - " done\n", - " \"\"\")" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.12.13" - }, - "sos": { - "kernels": [ - [ - "SoS", - "sos", - "sos", - "", - "" - ] - ], - "version": "" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} From 70e282a51d0da2f00fe932066533fc92833557b2 Mon Sep 17 00:00:00 2001 From: Jenny Empawi <80464805+jaempawi@users.noreply.github.com> Date: Fri, 27 Mar 2026 13:56:11 -0400 Subject: [PATCH 2/2] added compress pvar, psam and pgen outputs --- code/reference_data/rss_ld_sketch.ipynb | 840 ++++++++++++++++++++++++ 1 file changed, 840 insertions(+) create mode 100644 code/reference_data/rss_ld_sketch.ipynb diff --git a/code/reference_data/rss_ld_sketch.ipynb b/code/reference_data/rss_ld_sketch.ipynb new file mode 100644 index 000000000..c35d080f4 --- /dev/null +++ b/code/reference_data/rss_ld_sketch.ipynb @@ -0,0 +1,840 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "a1b2c3d4-0001-4001-a001-000000000001", + "metadata": { + "kernel": "SoS" + }, + "source": [ + "# RSS LD Sketch Pipeline\n", + "\n", + "## Overview\n", + "\n", + "This pipeline generates a stochastic genotype sample **U = WᵀG** from whole-genome sequencing VCF files and stores it as a PLINK2 pgen file for use as an LD reference panel with SuSiE-RSS fine-mapping.\n", + "\n", + "**Key idea:** Rather than storing the full genotype matrix G (n × p, ~530 MB/block), we compute U = WᵀG (B × p, ~320 MB/block) using a random projection matrix W ~ N(0, 1/√n). The approximate LD matrix R̂ = UᵀU / B ≈ R = GᵀG / n by the Johnson-Lindenstrauss lemma. G is never stored.\n", + "\n", + "**Matrix dimensions:**\n", + "- G : (n × p) — n individuals × p variants (~8,000 per block)\n", + "- W : (n × B) — projection matrix, W ~ N(0, 1/√n), generated once per cohort\n", + "- U : (B × p) — stochastic genotype sample = WᵀG, stored in pgen\n", + "- R̂ : (p × p) — approximate LD matrix, computed on-the-fly by SuSiE-RSS from U\n", + "\n", + "**Pipeline steps:**\n", + "1. `generate_W` — generate W once from cohort sample size n (saved as `.npy`, shared across all chromosomes)\n", + "2. `process_block_1` — read LD block BED file, emit one coordinate file per block\n", + "3. `process_block_2` — for each block in parallel: load VCF, compute U = WᵀG, scale to [0,2], write compressed dosage + allele frequencies\n", + "4. `merge_chrom` — for each chromosome: convert per-block dosage files to sorted pgen, merge into one per-chromosome pgen, clean up all intermediates\n", + "\n", + "## Input Files\n", + "\n", + "| File | Description |\n", + "|------|-------------|\n", + "| VCF (.bgz) | Per-chromosome VCF files with tabix index |\n", + "| `sample_list.txt` | Optional. One sample ID per line. Leave empty to use all samples. |\n", + "| `EUR_LD_blocks.bed` | LD block BED file (0-based half-open, e.g. Berisa & Pickrell EUR blocks) |\n", + "\n", + "## Parameters\n", + "\n", + "**Global parameters:**\n", + "\n", + "| Parameter | Default | Description |\n", + "|-----------|---------|-------------|\n", + "| `plink2_bin` | `plink2` | Path to plink2 binary |\n", + "| `walltime` | `24:00:00` | Per-task walltime |\n", + "| `mem` | `32G` | Per-task memory |\n", + "| `numThreads` | `8` | Per-task CPU cores |\n", + "\n", + "**`generate_W` parameters:**\n", + "\n", + "| Parameter | Default | Description |\n", + "|-----------|---------|-------------|\n", + "| `n_samples` | required | Total number of individuals in the cohort |\n", + "| `output_dir` | required | Output directory for `W_B{B}.npy` |\n", + "| `B` | `10000` | Sketch dimension (number of pseudo-individuals) |\n", + "| `seed` | `123` | Random seed |\n", + "\n", + "**`process_block` parameters:**\n", + "\n", + "| Parameter | Default | Description |\n", + "|-----------|---------|-------------|\n", + "| `vcf_base` | required | Directory containing VCF (.bgz) files |\n", + "| `vcf_prefix` | required | Shared filename prefix of all VCF files |\n", + "| `ld_block_file` | required | Path to LD block BED file |\n", + "| `chrom` | `0` | Chromosome 1–22, or 0 for all chromosomes |\n", + "| `cohort_id` | `ADSP.R5.EUR` | Cohort prefix for output filenames |\n", + "| `output_dir` | required | Base output directory |\n", + "| `W_matrix` | required | Path to W `.npy` output from `generate_W` |\n", + "| `B` | `10000` | Must match W |\n", + "| `sample_list` | `\"\"` | Optional path to sample ID file |\n", + "| `maf_min` | `0.0005` | Minimum minor allele frequency |\n", + "| `mac_min` | `5` | Minimum minor allele count |\n", + "| `msng_min` | `0.05` | Maximum missingness rate per variant |\n", + "\n", + "**`merge_chrom` parameters:**\n", + "\n", + "| Parameter | Default | Description |\n", + "|-----------|---------|-------------|\n", + "| `chrom` | `0` | Chromosome 1–22, or 0 for all chromosomes |\n", + "| `output_dir` | required | Base output directory (same as `process_block`) |\n", + "| `cohort_id` | `ADSP.R5.EUR` | Must match `process_block` cohort_id |\n", + "\n", + "## Output Files\n", + "\n", + "| File | Description |\n", + "|------|-------------|\n", + "| `W_B{B}.npy` | Projection matrix W, shape (n × B). Output of `generate_W`. |\n", + "| `{cohort_id}.chr{N}.pgen` | Final output — per-chromosome stochastic genotype sample U as PLINK2 binary |\n", + "| `{cohort_id}.chr{N}.pvar` | Per-chromosome variant information |\n", + "| `{cohort_id}.chr{N}.psam` | Pseudo-individual IDs S1…S{B} |\n", + "| `{cohort_id}.chr{N}.afreq` | Per-chromosome allele frequencies (OBS_CT = 2 × n) |\n", + "| *(cleaned up after merge)* | Per-block `.dosage.gz`, `.map`, `.afreq`, `.meta` intermediate files |\n" + ] + }, + { + "cell_type": "markdown", + "id": "3547538e-ef0f-49d3-aa45-208a278f5d3c", + "metadata": {}, + "source": [ + "### Step 0 — Generate W (run once per cohort)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a1b2c3d4-0002-4002-a002-000000000002", + "metadata": { + "kernel": "SoS" + }, + "outputs": [], + "source": [ + "sos run pipeline/rss_ld_sketch.ipynb generate_W \\\n", + " --n-samples 16571 \\\n", + " --output-dir output/rss_ld_sketch \\\n", + " --B 10000 \\\n", + " --seed 123\n" + ] + }, + { + "cell_type": "markdown", + "id": "e2559a2e-8bdd-443b-980f-26fe7f728674", + "metadata": {}, + "source": [ + "### Step 1+2 — Process all blocks → compressed dosage files\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a1b2c3d4-0003-4003-a003-000000000003", + "metadata": { + "kernel": "SoS" + }, + "outputs": [], + "source": [ + "# Process one chromosome\n", + "sos run pipeline/rss_ld_sketch.ipynb process_block \\\n", + " --vcf-base /path/to/vcfs/ \\\n", + " --vcf-prefix your.prefix. \\\n", + " --ld-block-file /path/to/EUR_LD_blocks.bed \\\n", + " --chrom 22 \\\n", + " --output-dir output/rss_ld_sketch \\\n", + " --W-matrix output/rss_ld_sketch/W_B10000.npy " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7620754c-5060-460f-a7a3-576dcf686270", + "metadata": {}, + "outputs": [], + "source": [ + "# Process all 22 chromosomes\n", + "sos run pipeline/rss_ld_sketch.ipynb process_block \\\n", + " --vcf-base /restricted/projectnb/xqtl/R4_QC_NHWonly_rm_monomorphic \\\n", + " --vcf-prefix gcad.qc.r4.wgs.36361.GATK.2022.08.15.biallelic.genotypes. \\\n", + " --ld-block-file /restricted/projectnb/casa/oaolayin/ROSMAP_NIA_geno/EUR_LD_blocks.bed \\\n", + " --chrom 0 \\\n", + " --output-dir output/rss_ld_sketch \\\n", + " --W-matrix output/rss_ld_sketch/W_B10000.npy \n", + "\n" + ] + }, + { + "cell_type": "markdown", + "id": "19fdaf69-ae0d-45fb-b7a1-dc5a03ff9a51", + "metadata": {}, + "source": [ + "### Step 3 — Merge per-block dosage files into per-chromosome pgen\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f06fcbea-891d-4b3c-937d-817678c8858d", + "metadata": {}, + "outputs": [], + "source": [ + "sos run pipeline/rss_ld_sketch.ipynb merge_chrom \\\n", + " --output-dir output/rss_ld_sketch \\\n", + " --chrom 0\n", + "\n" + ] + }, + { + "cell_type": "markdown", + "id": "a1b2c3d4-0004-4004-a004-000000000004", + "metadata": { + "kernel": "SoS" + }, + "source": [ + "## Command Interface" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a1b2c3d4-0005-4005-a005-000000000005", + "metadata": { + "kernel": "SoS" + }, + "outputs": [], + "source": [ + "sos run pipeline/rss_ld_sketch.ipynb -h" + ] + }, + { + "cell_type": "markdown", + "id": "a1b2c3d4-0006-4006-a006-000000000006", + "metadata": { + "kernel": "SoS" + }, + "source": [ + "```\n", + "usage: sos run pipeline/rss_ld_sketch.ipynb\n", + " [workflow_name | -t targets] [options] [workflow_options]\n", + " workflow_name: Single or combined workflows defined in this script\n", + " options: Single-hyphen sos parameters (see \"sos run -h\" for details)\n", + " workflow_options: Double-hyphen workflow-specific parameters\n", + "\n", + "Workflows:\n", + " generate_W\n", + " process_block\n", + " merge_chrom\n", + "\n", + "Global Workflow Options:\n", + " --walltime 24:00:00\n", + " --mem 32G\n", + " --numThreads 8 (as int)\n", + " --plink2-bin plink2\n", + "\n", + "Sections\n", + " generate_W:\n", + " Workflow Options:\n", + " --n-samples VAL (as int, required)\n", + " Generate projection matrix W ~ N(0, 1/sqrt(n)), shape (n x B).\n", + " Run ONCE per cohort before any chromosome processing.\n", + " W depends only on n (sample size) and B — not on variant data.\n", + " All 22 chromosomes reuse the same W.\n", + " --output-dir VAL (as str, required)\n", + " --B 10000 (as int)\n", + " --seed 123 (as int)\n", + "\n", + " process_block:\n", + " Workflow Options:\n", + " --vcf-base VAL (as str, required)\n", + " Directory containing VCF (.bgz) files, one or more per chromosome.\n", + " --vcf-prefix VAL (as str, required)\n", + " Shared filename prefix of all VCF files (everything before \"chr{N}:\" or \"chr{N}.\").\n", + " --ld-block-file VAL (as str, required)\n", + " Path to LD block BED file (e.g. EUR_LD_blocks.bed).\n", + " --chrom 0 (as int)\n", + " Chromosome 1–22, or 0 for all chromosomes.\n", + " --cohort-id ADSP.R5.EUR (as str)\n", + " Cohort prefix for output filenames.\n", + " --output-dir VAL (as str, required)\n", + " --W-matrix VAL (as str, required)\n", + " --B 10000 (as int)\n", + " --maf-min 0.0005 (as float)\n", + " --mac-min 5 (as int)\n", + " --msng-min 0.05 (as float)\n", + " --sample-list \"\" (optional path to sample ID file)\n", + " Output per block (intermediate, cleaned up by merge_chrom):\n", + " {cohort_id}.{chr}_{start}_{end}.dosage.gz\n", + " {cohort_id}.{chr}_{start}_{end}.map\n", + " {cohort_id}.{chr}_{start}_{end}.afreq\n", + " {cohort_id}.{chr}_{start}_{end}.meta\n", + "\n", + " merge_chrom:\n", + " Workflow Options:\n", + " --chrom 0 (as int)\n", + " Chromosome 1–22, or 0 for all chromosomes.\n", + " --cohort-id ADSP.R5.EUR (as str)\n", + " Must match cohort-id used in process_block.\n", + " --output-dir VAL (as str, required)\n", + " Output per chromosome (final, block intermediates cleaned up):\n", + " {cohort_id}.chr{N}.pgen\n", + " {cohort_id}.chr{N}.pvar\n", + " {cohort_id}.chr{N}.psam\n", + " {cohort_id}.chr{N}.afreq\n", + "```\n" + ] + }, + { + "cell_type": "markdown", + "id": "a1b2c3d4-0007-4007-a007-000000000007", + "metadata": { + "kernel": "SoS" + }, + "source": [ + "# Pipeline Implementation\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a1b2c3d4-0008-4008-a008-000000000008", + "metadata": { + "kernel": "SoS" + }, + "outputs": [], + "source": [ + "[global]\n", + "parameter: cwd = path(\"output\")\n", + "parameter: job_size = 1\n", + "parameter: walltime = \"24:00:00\"\n", + "parameter: mem = \"16G\"\n", + "parameter: numThreads = 16\n", + "parameter: container = \"\"\n", + "parameter: plink2_bin = \"plink2\"\n", + "\n", + "import re\n", + "from sos.utils import expand_size\n", + "\n", + "entrypoint = (\n", + " 'micromamba run -a \"\" -n' + ' ' +\n", + " re.sub(r'(_apptainer:latest|_docker:latest|\\.sif)$', '', container.split('/')[-1])\n", + ") if container else \"\"\n", + "\n", + "cwd = path(f'{cwd:a}')\n" + ] + }, + { + "cell_type": "markdown", + "id": "a1b2c3d4-0009-4009-a009-000000000009", + "metadata": { + "kernel": "SoS" + }, + "source": [ + "## `generate_W`\n", + "\n", + "Generates the random projection matrix W ~ N(0, 1/√n), shape (n × B). Run once per cohort. W is shared across all chromosomes.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a1b2c3d4-0010-4010-a010-000000000010", + "metadata": { + "kernel": "SoS" + }, + "outputs": [], + "source": [ + "[generate_W]\n", + "# Generate projection matrix W ~ N(0, 1/sqrt(n)), shape (n x B).\n", + "# Run ONCE before processing any chromosome.\n", + "#\n", + "# W depends only on n (total sample size) and B — not on any variant data.\n", + "# n_samples is passed directly as a parameter; no VCF reading is needed.\n", + "# All 22 chromosomes reuse the same W so that per-chromosome stochastic\n", + "# genotype samples can be arithmetically merged for meta-analysis.\n", + "parameter: n_samples = int\n", + "parameter: output_dir = str\n", + "parameter: B = 10000\n", + "parameter: seed = 123\n", + "\n", + "import os\n", + "input: []\n", + "output: f'{output_dir}/W_B{B}.npy'\n", + "task: trunk_workers = 1, trunk_size = 1, walltime = '00:05:00', mem = '4G', cores = 1\n", + "python: expand = \"${ }\", stdout = f'{_output:n}.stdout', stderr = f'{_output:n}.stderr'\n", + "\n", + " import numpy as np\n", + " import os\n", + "\n", + " n = ${n_samples}\n", + " B = ${B}\n", + " seed = ${seed}\n", + " W_out = \"${_output}\"\n", + "\n", + " # ── Generate W ~ N(0, 1/sqrt(n)) ─────────────────────────────\n", + " # Convention: W = np.random.normal(0, 1/np.sqrt(n), size=(n, B))\n", + " # W is shared across all chromosomes — do not regenerate per chromosome.\n", + " print(f\"Generating W ~ N(0, 1/sqrt({n})), shape ({n}, {B}), seed={seed}\")\n", + " np.random.seed(seed)\n", + " W = np.random.normal(0, 1.0 / np.sqrt(n), size=(n, B)).astype(np.float32)\n", + "\n", + " os.makedirs(os.path.dirname(os.path.abspath(W_out)), exist_ok=True)\n", + " np.save(W_out, W)\n", + " print(f\"Saved: {W_out}\")\n", + " print(f\"Shape: {W.shape}, size: {os.path.getsize(W_out)/1e9:.2f} GB\")" + ] + }, + { + "cell_type": "markdown", + "id": "a1b2c3d4-0011-4011-a011-000000000011", + "metadata": { + "kernel": "SoS" + }, + "source": [ + "## `process_block`\n", + "\n", + "Two-step workflow. `process_block_1` reads the BED file and emits one coordinate file per LD block. `process_block_2` reads the VCF for each block, computes U = WᵀG, scales to [0,2], and writes compressed dosage + allele frequency files. G is never stored.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a1b2c3d4-0012-4012-a012-000000000012", + "metadata": { + "kernel": "SoS" + }, + "outputs": [], + "source": [ + "[process_block_1]\n", + "# Read BED file and write one .block coord file per LD block.\n", + "parameter: ld_block_file = str\n", + "parameter: chrom = 0\n", + "\n", + "import os\n", + "\n", + "def _read_blocks(bed, chrom_filter):\n", + " blocks = []\n", + " with open(bed) as fh:\n", + " for line in fh:\n", + " if line.startswith(\"#\") or not line.strip():\n", + " continue\n", + " parts = line.split()\n", + " c = parts[0]\n", + " if not (c.startswith(\"chr\") and c[3:].isdigit()):\n", + " continue\n", + " cnum = int(c[3:])\n", + " if not (1 <= cnum <= 22):\n", + " continue\n", + " if chrom_filter != 0 and cnum != chrom_filter:\n", + " continue\n", + " blocks.append((c, int(parts[1]), int(parts[2])))\n", + " if not blocks:\n", + " raise ValueError(f\"No blocks found for chrom={chrom_filter} in {bed}\")\n", + " return blocks\n", + "\n", + "_blocks = _read_blocks(ld_block_file, chrom)\n", + "print(f\" {len(_blocks)} LD blocks queued\")\n", + "\n", + "import tempfile\n", + "block_dir = tempfile.mkdtemp(prefix=\"sos_blocks_\")\n", + "\n", + "input: []\n", + "output: [f\"{block_dir}/{c}_{s}_{e}.block\" for c, s, e in _blocks]\n", + "python: expand = \"${ }\"\n", + " import os, json\n", + " blocks = ${_blocks!r}\n", + " block_dir = \"${block_dir}\"\n", + " for c, s, e in blocks:\n", + " path = os.path.join(block_dir, f\"{c}_{s}_{e}.block\")\n", + " with open(path, \"w\") as fh:\n", + " fh.write(f\"{c}\\n{s}\\n{e}\\n\")\n" + ] + }, + { + "cell_type": "markdown", + "id": "a1b2c3d4-0031-4031-a031-000000000031", + "metadata": {}, + "source": [ + "## `process_block_2`\n", + "\n", + "Per-block parallel task. Reads VCF, computes U = WᵀG in chunks, scales to [0,2], writes `.dosage.gz`, `.map`, `.afreq`, `.meta`. Output is intermediate — cleaned up by `merge_chrom`.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a1b2c3d4-0030-4030-a030-000000000030", + "metadata": { + "kernel": "SoS" + }, + "outputs": [], + "source": [ + "[process_block_2]\n", + "parameter: vcf_base = str\n", + "parameter: vcf_prefix = str\n", + "parameter: cohort_id = \"ADSP.R5.EUR\"\n", + "parameter: output_dir = str\n", + "parameter: W_matrix = str\n", + "parameter: B = 10000\n", + "parameter: maf_min = 0.0005\n", + "parameter: mac_min = 5\n", + "parameter: msng_min = 0.05\n", + "parameter: sample_list = \"\"\n", + "input: output_from(\"process_block_1\"), group_by = 1\n", + "task: trunk_workers = 1, trunk_size = 1, walltime = walltime, mem = mem, cores = numThreads\n", + "python: expand = \"${ }\"\n", + "\n", + " import numpy as np\n", + " import os\n", + " import gzip\n", + " import sys\n", + " from math import nan\n", + " from cyvcf2 import VCF\n", + " from os import listdir\n", + "\n", + " # Read block coordinates from input file\n", + " with open(\"${_input}\") as fh:\n", + " lines = fh.read().splitlines()\n", + " chrm_str = lines[0]\n", + " block_start = int(lines[1])\n", + " block_end = int(lines[2])\n", + "\n", + " vcf_base = \"${vcf_base}\"\n", + " vcf_prefix = \"${vcf_prefix}\"\n", + " W_path = \"${W_matrix}\"\n", + " B = ${B}\n", + " maf_min = ${maf_min}\n", + " mac_min = ${mac_min}\n", + " msng_min = ${msng_min}\n", + " sample_list = \"${sample_list}\"\n", + " cohort_id = \"${cohort_id}\"\n", + " base_dir = \"${output_dir}\"\n", + "\n", + " block_tag = f\"{chrm_str}_{block_start}_{block_end}\"\n", + " output_dir = os.path.join(base_dir, chrm_str, block_tag)\n", + " os.makedirs(output_dir, exist_ok=True)\n", + "\n", + " log_path = os.path.join(output_dir, f\"{block_tag}.log\")\n", + " log_fh = open(log_path, \"w\")\n", + " sys.stdout = log_fh\n", + " sys.stderr = log_fh\n", + "\n", + " # ── Load sample subset (optional) ─────────────────────────────\n", + " sample_subset = None\n", + " if sample_list:\n", + " if not os.path.exists(sample_list):\n", + " raise FileNotFoundError(f\"sample_list not found: {sample_list}\")\n", + " with open(sample_list) as fh:\n", + " sample_subset = set(line.strip() for line in fh if line.strip())\n", + " print(f\" Sample subset: {len(sample_subset):,} samples\")\n", + "\n", + " # ── Helpers ───────────────────────────────────────────────────\n", + " def get_vcf_files(chrm_str):\n", + " files = sorted([\n", + " os.path.join(vcf_base, x)\n", + " for x in listdir(vcf_base)\n", + " if x.endswith(\".bgz\") and (\n", + " x.startswith(vcf_prefix + chrm_str + \":\") or\n", + " x.startswith(vcf_prefix + chrm_str + \".\")\n", + " )\n", + " ])\n", + " if not files:\n", + " raise FileNotFoundError(f\"No VCF files for {chrm_str} in {vcf_base}\")\n", + " return files\n", + "\n", + " def fill_missing_col_means(G):\n", + " col_means = np.nanmean(G, axis=0)\n", + " return np.where(np.isnan(G), col_means, G)\n", + "\n", + " # ── Step 1: Scan variants in block ────────────────────────────\n", + " print(f\"[1/3] Scanning {chrm_str} [{block_start:,}, {block_end:,}) ...\")\n", + " vcf_files = get_vcf_files(chrm_str)\n", + " region = f\"{chrm_str}:{block_start+1}-{block_end}\"\n", + " var_info = []\n", + " n_samples = None\n", + "\n", + " for vf in vcf_files:\n", + " vcf = VCF(vf)\n", + " if sample_subset is not None:\n", + " vcf_samples = vcf.samples\n", + " keep_idx = [i for i, s in enumerate(vcf_samples) if s in sample_subset]\n", + " if not keep_idx:\n", + " raise ValueError(f\"No sample_list samples in {os.path.basename(vf)}\")\n", + " vcf.set_samples([vcf_samples[i] for i in keep_idx])\n", + " if n_samples is None:\n", + " n_samples = len(vcf.samples)\n", + " for var in vcf(region):\n", + " if not (block_start <= var.POS - 1 < block_end):\n", + " continue\n", + " if len(var.ALT) != 1:\n", + " continue\n", + " dosage = [\n", + " sum(x[0:2])\n", + " for x in [[nan if v == -1 else v for v in gt]\n", + " for gt in var.genotypes]\n", + " ]\n", + " if np.nanvar(dosage) == 0:\n", + " continue\n", + " counts = [np.nansum([2 - x for x in dosage]), np.nansum(dosage)]\n", + " nan_count = int(np.sum(np.isnan(dosage)))\n", + " n_non_na = len(dosage) - nan_count\n", + " mac = min(counts)\n", + " maf = mac / n_non_na\n", + " af = float(np.nansum(dosage)) / (2 * n_non_na)\n", + " msng_rate = nan_count / (n_non_na + nan_count)\n", + " if maf < maf_min or mac < mac_min or msng_rate > msng_min:\n", + " continue\n", + " var_info.append({\n", + " \"chr\": var.CHROM, \"pos\": var.POS,\n", + " \"ref\": var.REF, \"alt\": var.ALT[0],\n", + " \"af\": round(float(af), 6),\n", + " \"id\": f\"{var.CHROM}:{var.POS}:{var.REF}:{var.ALT[0]}\",\n", + " })\n", + " vcf.close()\n", + " print(f\" {len(var_info):,} variants passing filters (n={n_samples:,})\")\n", + "\n", + " if not var_info:\n", + " raise ValueError(f\"No passing variants in {chrm_str} [{block_start:,}, {block_end:,})\")\n", + "\n", + " # ── Step 2: Load W ────────────────────────────────────────────\n", + " print(f\"[2/3] Loading W ...\")\n", + " W = np.load(W_path)\n", + " if W.shape != (n_samples, B):\n", + " raise ValueError(f\"W shape mismatch: {W.shape} vs ({n_samples},{B})\")\n", + " W = W.astype(np.float32)\n", + " print(f\" W: {W.shape}\")\n", + "\n", + " # ── Step 3: Compute U, write compressed dosage + map + afreq ──\n", + " print(f\"[3/3] Computing U and writing output files ...\")\n", + " var_lookup = {v[\"id\"]: v for v in var_info}\n", + " var_id_set = set(var_lookup)\n", + "\n", + " dosage_path = os.path.join(output_dir, f\"{cohort_id}.{block_tag}.dosage.gz\")\n", + " map_path = os.path.join(output_dir, f\"{cohort_id}.{block_tag}.map\")\n", + " afreq_path = os.path.join(output_dir, f\"{cohort_id}.{block_tag}.afreq\")\n", + " meta_path = os.path.join(output_dir, f\"{cohort_id}.{block_tag}.meta\")\n", + "\n", + " # Write .map\n", + " with open(map_path, \"w\") as fh:\n", + " for v in var_info:\n", + " fh.write(f\"{v['chr'].replace('chr','')}\\t{v['id']}\\t0\\t{v['pos']}\\n\")\n", + "\n", + " # Write .meta\n", + " with open(meta_path, \"w\") as fh:\n", + " fh.write(f\"source_n_samples={n_samples}\\nB={B}\\n\")\n", + " fh.write(f\"chrom={chrm_str}\\nblock_start={block_start}\\nblock_end={block_end}\\n\")\n", + "\n", + " # Write .afreq\n", + " obs_ct = 2 * n_samples\n", + " with open(afreq_path, \"w\") as fh:\n", + " fh.write(\"#CHROM\\tID\\tREF\\tALT\\tALT_FREQS\\tOBS_CT\\n\")\n", + " for v in var_info:\n", + " fh.write(f\"{v['chr'].replace('chr','')}\\t{v['id']}\\t{v['ref']}\\t{v['alt']}\\t\"\n", + " f\"{v['af']:.6f}\\t{obs_ct}\\n\")\n", + "\n", + " # Compute U and write compressed dosage (format=1: ID ALT REF val_S1 ... val_SB)\n", + " total = 0\n", + " with gzip.open(dosage_path, \"wt\", compresslevel=4) as gz:\n", + " for vf in vcf_files:\n", + " arr, var_keys = [], []\n", + " vcf_obj = VCF(vf)\n", + " for var in vcf_obj(region):\n", + " if not (block_start <= var.POS - 1 < block_end):\n", + " continue\n", + " if len(var.ALT) != 1:\n", + " continue\n", + " vid = f\"{var.CHROM}:{var.POS}:{var.REF}:{var.ALT[0]}\"\n", + " if vid not in var_id_set:\n", + " continue\n", + " dosage = [\n", + " sum(x[0:2])\n", + " for x in [[nan if v == -1 else v for v in gt]\n", + " for gt in var.genotypes]\n", + " ]\n", + " arr.append(dosage)\n", + " var_keys.append(vid)\n", + " vcf_obj.close()\n", + " if not arr:\n", + " continue\n", + " G_chunk = np.array(arr, dtype=np.float32).T\n", + " G_chunk = fill_missing_col_means(G_chunk)\n", + " U_chunk = W.T @ G_chunk\n", + " del G_chunk\n", + " col_min = U_chunk.min(axis=0)\n", + " col_max = U_chunk.max(axis=0)\n", + " denom = col_max - col_min\n", + " denom[denom == 0] = 1.0\n", + " U_chunk = 2.0 * (U_chunk - col_min) / denom\n", + " U_chunk = np.round(U_chunk, 4)\n", + " for j, vid in enumerate(var_keys):\n", + " v = var_lookup[vid]\n", + " vals = \" \".join(f\"{x:.4f}\" for x in U_chunk[:, j])\n", + " gz.write(f\"{vid} {v['alt']} {v['ref']} {vals}\\n\")\n", + " total += len(var_keys)\n", + " del U_chunk\n", + " print(f\" Written: {total:,} variants → {os.path.basename(dosage_path)}\")\n", + " print(f\" Written: {os.path.basename(map_path)}, {os.path.basename(afreq_path)}\")\n", + " print(f\"\\nDone: {chrm_str} [{block_start:,}, {block_end:,})\")\n" + ] + }, + { + "cell_type": "markdown", + "id": "a1b2c3d4-0040-4040-a040-000000000040", + "metadata": {}, + "source": [ + "## `merge_chrom`\n", + "\n", + "For each chromosome: converts per-block `.dosage.gz` files to sorted pgen (two-pass: import then `--sort-vars`), merges all blocks into one per-chromosome pgen using `plink2 --pmerge-list`, concatenates allele frequencies, and removes all per-block intermediate directories. Output: `{cohort_id}.chr{N}.{pgen,pvar,psam,afreq}`.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a1b2c3d4-0041-4041-a041-000000000041", + "metadata": { + "kernel": "SoS" + }, + "outputs": [], + "source": [ + "[merge_chrom]\n", + "# Merge all per-block dosage files for a chromosome into one per-chrom pgen.\n", + "# Steps per chromosome:\n", + "# 1. For each block: plink2 --import-dosage → unsorted pgen, then --sort-vars → sorted pgen\n", + "# 2. plink2 --pmerge-list → one per-chrom pgen\n", + "# 3. Concatenate .afreq\n", + "# 4. Clean up intermediates including -merge.* files and per-block dirs\n", + "# 5. zstd compress final pgen/pvar/psam/afreq\n", + "#\n", + "# Run after process_block completes (or run together: sos run ... process_block merge_chrom)\n", + "parameter: chrom = 0\n", + "parameter: output_dir = str\n", + "parameter: cohort_id = \"ADSP.R5.EUR\"\n", + "\n", + "import os, glob\n", + "\n", + "def _chroms_to_process(output_dir, chrom_filter):\n", + " if chrom_filter != 0:\n", + " return [f\"chr{chrom_filter}\"]\n", + " return sorted(set(\n", + " os.path.basename(d)\n", + " for d in glob.glob(os.path.join(output_dir, \"chr*\"))\n", + " if os.path.isdir(d)\n", + " ))\n", + "\n", + "_chroms = _chroms_to_process(output_dir, chrom)\n", + "\n", + "input: []\n", + "output: [f\"{output_dir}/{c}/{cohort_id}.{c}.pgen.zst\" for c in _chroms]\n", + "\n", + "for _chrom in _chroms:\n", + " bash(f\"\"\"\n", + " set -euo pipefail\n", + " shopt -s nullglob\n", + "\n", + " chrom_dir=\"{output_dir}/{_chrom}\"\n", + " final_prefix=\"${{chrom_dir}}/{cohort_id}.{_chrom}\"\n", + " merge_list=\"${{chrom_dir}}/{cohort_id}.{_chrom}_pmerge_list.txt\"\n", + "\n", + " # ── Step 1: Convert each block dosage.gz → sorted per-block pgen (two-pass) ──\n", + " > \"${{merge_list}}\"\n", + " files=(\"${{chrom_dir}}\"/*/*.dosage.gz)\n", + " if [ ${{#files[@]}} -eq 0 ]; then\n", + " exit 1\n", + " fi\n", + " for dosage_gz in \"${{files[@]}}\"; do\n", + " block_dir=$(dirname \"${{dosage_gz}}\")\n", + " block_tag=$(basename \"${{block_dir}}\")\n", + " prefix=\"${{block_dir}}/{cohort_id}.${{block_tag}}_tmp\"\n", + " map_file=\"${{block_dir}}/{cohort_id}.${{block_tag}}.map\"\n", + " psam_file=\"${{block_dir}}/{cohort_id}.${{block_tag}}.psam\"\n", + " meta_file=\"${{block_dir}}/{cohort_id}.${{block_tag}}.meta\"\n", + " B=$(grep \"^B=\" \"${{meta_file}}\" | cut -d= -f2)\n", + " printf '#FID\\tIID\\n' > \"${{psam_file}}\"\n", + " for i in $(seq 1 ${{B}}); do\n", + " printf 'S%d\\tS%d\\n' ${{i}} ${{i}} >> \"${{psam_file}}\"\n", + " done\n", + " # Pass 1: import dosage → unsorted pgen\n", + " \"{plink2_bin}\" \\\n", + " --import-dosage \"${{dosage_gz}}\" format=1 noheader \\\n", + " --psam \"${{psam_file}}\" \\\n", + " --map \"${{map_file}}\" \\\n", + " --make-pgen \\\n", + " --out \"${{prefix}}_unsorted\" \\\n", + " --silent\n", + " # Pass 2: sort variants\n", + " \"{plink2_bin}\" \\\n", + " --pfile \"${{prefix}}_unsorted\" \\\n", + " --make-pgen \\\n", + " --sort-vars \\\n", + " --out \"${{prefix}}\" \\\n", + " --silent\n", + " rm -f \"${{prefix}}_unsorted.pgen\" \"${{prefix}}_unsorted.pvar\" \"${{prefix}}_unsorted.psam\"\n", + " echo \"${{prefix}}\" >> \"${{merge_list}}\"\n", + " done\n", + "\n", + " # ── Step 2: Merge all per-block pgens → one per-chrom pgen ──\n", + " \"{plink2_bin}\" \\\n", + " --pmerge-list \"${{merge_list}}\" pfile \\\n", + " --make-pgen \\\n", + " --sort-vars \\\n", + " --out \"${{final_prefix}}\"\n", + "\n", + " # ── Step 3: Concatenate .afreq ──\n", + " first=1\n", + " for f in \"${{chrom_dir}}\"/*/*.afreq; do\n", + " if [ \"${{first}}\" -eq 1 ]; then\n", + " cat \"${{f}}\" > \"${{final_prefix}}.afreq\"\n", + " first=0\n", + " else\n", + " tail -n +2 \"${{f}}\" >> \"${{final_prefix}}.afreq\"\n", + " fi\n", + " done\n", + "\n", + " # ── Step 4: Cleanup — remove merge intermediates and per-block dirs ──\n", + " rm -f \"${{merge_list}}\"\n", + " rm -f \"${{final_prefix}}-merge.pgen\" \"${{final_prefix}}-merge.pvar\" \"${{final_prefix}}-merge.psam\"\n", + " for block_dir in \"${{chrom_dir}}\"/*/; do\n", + " rm -rf \"${{block_dir}}\"\n", + " done\n", + "\n", + " # ── Step 5: zstd compress final outputs ──\n", + " for ext in pgen pvar psam afreq; do\n", + " zstd --ultra -22 -T{numThreads} \"${{final_prefix}}.${{ext}}\" \\\n", + " -o \"${{final_prefix}}.${{ext}}.zst\" \\\n", + " && rm -f \"${{final_prefix}}.${{ext}}\"\n", + " done\n", + " \"\"\")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.13" + }, + "sos": { + "kernels": [ + [ + "SoS", + "sos", + "sos", + "", + "" + ] + ], + "version": "" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}