From 1fa0e2cdcb0c8b54c88ae978835c2e1c3fbd317e Mon Sep 17 00:00:00 2001
From: Jenny Empawi <80464805+jaempawi@users.noreply.github.com>
Date: Fri, 27 Mar 2026 12:01:24 -0400
Subject: [PATCH 1/2] Delete code/reference_data/rss_ld_sketch.ipynb
---
code/reference_data/rss_ld_sketch.ipynb | 829 ------------------------
1 file changed, 829 deletions(-)
delete mode 100644 code/reference_data/rss_ld_sketch.ipynb
diff --git a/code/reference_data/rss_ld_sketch.ipynb b/code/reference_data/rss_ld_sketch.ipynb
deleted file mode 100644
index 5c2660238..000000000
--- a/code/reference_data/rss_ld_sketch.ipynb
+++ /dev/null
@@ -1,829 +0,0 @@
-{
- "cells": [
- {
- "cell_type": "markdown",
- "id": "a1b2c3d4-0001-4001-a001-000000000001",
- "metadata": {
- "kernel": "SoS"
- },
- "source": [
- "# RSS LD Sketch Pipeline\n",
- "\n",
- "## Overview\n",
- "This pipeline generates a **stochastic genotype sample** U from whole-genome sequencing VCF files and stores it as a PLINK2 pgen file for use as an LD reference panel.\n",
- "\n",
- "**Terminology:**\n",
- "- **Stochastic genotype sample**: U = WT G, shape (B × p). B pseudo-individuals × p variants. This is what we store in the pgen file (after scaling to $[0,2]$).\n",
- "- **LD sketch** (computed downstream, never stored here): R̂ = UTU / B, computed by PLINK2 or SuSiE-RSS when LD is requested from the pgen.\n",
- "\n",
- "**Steps:**\n",
- "1. Generate projection matrix W once from total cohort sample size n (saved as `.npy`, shared across all chromosomes)\n",
- "2. Read LD block BED file and emit one coordinate file per block (`process_block_1`)\n",
- "3. For each block in parallel: load VCF, compute U = WTG, scale to [0,2], write compressed dosage file + allele frequencies — no pgen yet (`process_block_2`)\n",
- "4. For each chromosome: convert per-block dosage files to pgen, merge into one per-chromosome pgen, clean up all intermediate block files (`merge_chrom`)\n",
- "\n",
- "**Matrix convention:**\n",
- "- G : (n × p) — individuals × variants\n",
- "- W : (n × B) — individuals × pseudo-individuals, W ~ N(0, 1/√n)\n",
- "- U : (B × p) — stochastic genotype sample = WT G\n",
- "\n",
- "## Input Files\n",
- "\n",
- "| File | Description |\n",
- "|------|-------------|\n",
- "| VCF (.bgz) | Per-chromosome VCF files with tabix index. If the cohort contains mixed ancestries, provide `--sample-list` to restrict to one source population. |\n",
- "| `sample_list.txt` | Optional. One sample ID per line. If provided, only these samples are loaded from the VCF. |\n",
- "| `EUR_LD_blocks.bed` | LD block BED file (0-based half-open, e.g. Berisa & Pickrell EUR blocks). |\n",
- "\n",
- "## Parameters\n",
- "\n",
- "**Global parameters:**\n",
- "\n",
- "| Parameter | Default | Description |\n",
- "|-----------|---------|-------------|\n",
- "| `plink2_bin` | `plink2` | Path to plink2 binary, or just `plink2` if on `$PATH` |\n",
- "\n",
- "**`generate_W` workflow parameters:**\n",
- "\n",
- "| Parameter | Default | Description |\n",
- "|-----------|---------|-------------|\n",
- "| `n_samples` | required | Total number of individuals in the cohort (determines W shape) |\n",
- "| `output_dir` | required | Output directory for `W_B{B}.npy` |\n",
- "| `B` | `10000` | Number of pseudo-individuals (sketch dimension) |\n",
- "| `seed` | `123` | Random seed for W generation |\n",
- "\n",
- "**`process_block` workflow parameters:**\n",
- "\n",
- "| Parameter | Default | Description |\n",
- "|-----------|---------|-------------|\n",
- "| `vcf_base` | required | Directory containing VCF (.bgz) files, one or more per chromosome |\n",
- "| `vcf_prefix` | required | Shared filename prefix of all VCF files (everything before `chr{N}:` or `chr{N}.`) |\n",
- "| `ld_block_file` | required | Path to LD block BED file (e.g. `EUR_LD_blocks.bed`) |\n",
- "| `chrom` | `0` | Chromosome number (1–22), or 0 for all chromosomes |\n",
- "| `cohort_id` | `ADSP.R5.EUR` | Cohort prefix for output filenames |\n",
- "| `output_dir` | required | Base output directory |\n",
- "| `W_matrix` | required | Path to W `.npy` output from `generate_W` |\n",
- "| `B` | `10000` | Number of pseudo-individuals — must match W |\n",
- "| `sample_list` | `\"\"` | Optional path to a file of sample IDs (one per line) to subset from the VCF. Leave empty to use all samples. |\n",
- "| `maf_min` | `0.0005` | Minimum minor allele frequency |\n",
- "| `mac_min` | `5` | Minimum minor allele count |\n",
- "| `msng_min` | `0.05` | Maximum missingness rate per variant |\n",
- "\n",
- "**`merge_chrom` workflow parameters:**\n",
- "\n",
- "| Parameter | Default | Description |\n",
- "|-----------|---------|-------------|\n",
- "| `chrom` | `0` | Chromosome to merge (1–22), or 0 for all |\n",
- "| `output_dir` | required | Base output directory (same as `process_block`) |\n",
- "| `cohort_id` | `ADSP.R5.EUR` | Must match the `process_block` cohort_id used |\n",
- "\n",
- "**Global parameters (shared across all workflows):**\n",
- "\n",
- "| Parameter | Default | Description |\n",
- "|-----------|---------|-------------|\n",
- "| `walltime` | `24:00:00` | Per-task walltime |\n",
- "| `mem` | `32G` | Per-task memory |\n",
- "| `numThreads` | `8` | Per-task CPU cores |\n",
- "\n",
- "## Output Files\n",
- "\n",
- "| File | Description |\n",
- "|------|-------------|\n",
- "| `W_B{B}.npy` | Projection matrix W, shape (n × B). Output of `generate_W`, input to `process_block`. |\n",
- "| `{cohort_id}.chr{N}_stocld_panel.pgen` | **Final output.** Per-chromosome stochastic genotype sample U as PLINK2 binary |\n",
- "| `{cohort_id}.chr{N}_stocld_panel.pvar` | Per-chromosome variant information |\n",
- "| `{cohort_id}.chr{N}_stocld_panel.psam` | Pseudo-individual IDs S1…S{B} |\n",
- "| `{cohort_id}.chr{N}_stocld_panel.afreq` | Per-chromosome allele frequencies (OBS_CT = 2 × n) |\n",
- "| *(cleaned up after merge)* | Per-block `.dosage.gz`, `.map`, `.afreq`, `.meta` intermediate files |\n",
- "\n",
- "**Timing:** ~7 hours for all 22 chromosomes with -J 32 (blocks parallelized), plus ~1 hour for merge_chrom\n",
- "\n",
- "## Minimal Working Example\n"
- ]
- },
- {
- "cell_type": "markdown",
- "id": "3547538e-ef0f-49d3-aa45-208a278f5d3c",
- "metadata": {},
- "source": [
- "### Step 0: Generate W once — provide your VCF directory, prefix, and LD block file"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "a1b2c3d4-0002-4002-a002-000000000002",
- "metadata": {
- "kernel": "SoS"
- },
- "outputs": [],
- "source": [
- "sos run pipeline/rss_ld_sketch.ipynb generate_W \\\n",
- " --n-samples 2000 \\\n",
- " --output-dir output/rss_ld_sketch \\\n",
- " --B 10000 \\\n",
- " --seed 123"
- ]
- },
- {
- "cell_type": "markdown",
- "id": "e2559a2e-8bdd-443b-980f-26fe7f728674",
- "metadata": {},
- "source": [
- "### Step 1: Process all blocks → compressed dosage files\n"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "a1b2c3d4-0003-4003-a003-000000000003",
- "metadata": {
- "kernel": "SoS"
- },
- "outputs": [],
- "source": [
- "# Process all blocks for one chromosome\n",
- "sos run pipeline/rss_ld_sketch.ipynb process_block \\\n",
- " --vcf-base /path/to/vcfs/ \\\n",
- " --vcf-prefix your.prefix. \\\n",
- " --ld-block-file /path/to/EUR_LD_blocks.bed \\\n",
- " --chrom 22 \\\n",
- " --output-dir output/rss_ld_sketch \\\n",
- " --W-matrix output/rss_ld_sketch/W_B10000.npy\n"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "7620754c-5060-460f-a7a3-576dcf686270",
- "metadata": {},
- "outputs": [],
- "source": [
- "# Process all 22 chromosomes\n",
- "sos run pipeline/rss_ld_sketch.ipynb process_block \\\n",
- " --vcf-base /restricted/projectnb/xqtl/R4_QC_NHWonly_rm_monomorphic \\\n",
- " --vcf-prefix gcad.qc.r4.wgs.36361.GATK.2022.08.15.biallelic.genotypes. \\\n",
- " --ld-block-file /restricted/projectnb/casa/oaolayin/ROSMAP_NIA_geno/EUR_LD_blocks.bed \\\n",
- " --chrom 0 \\\n",
- " --output-dir output/rss_ld_sketch \\\n",
- " --W-matrix output/rss_ld_sketch/W_B10000.npy"
- ]
- },
- {
- "cell_type": "markdown",
- "id": "19fdaf69-ae0d-45fb-b7a1-dc5a03ff9a51",
- "metadata": {},
- "source": [
- "#### Step 2: Merge per-block dosage files into per-chromosome pgen\n"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "f06fcbea-891d-4b3c-937d-817678c8858d",
- "metadata": {},
- "outputs": [],
- "source": [
- "sos run pipeline/rss_ld_sketch.ipynb merge_chrom \\\n",
- " --output-dir output/rss_ld_sketch \\\n",
- " --chrom 0"
- ]
- },
- {
- "cell_type": "markdown",
- "id": "a1b2c3d4-0004-4004-a004-000000000004",
- "metadata": {
- "kernel": "SoS"
- },
- "source": [
- "## Command Interface"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "a1b2c3d4-0005-4005-a005-000000000005",
- "metadata": {
- "kernel": "SoS"
- },
- "outputs": [],
- "source": [
- "sos run pipeline/rss_ld_sketch.ipynb -h"
- ]
- },
- {
- "cell_type": "markdown",
- "id": "a1b2c3d4-0006-4006-a006-000000000006",
- "metadata": {
- "kernel": "SoS"
- },
- "source": [
- "```\n",
- "usage: sos run pipeline/rss_ld_sketch.ipynb\n",
- " [workflow_name | -t targets] [options] [workflow_options]\n",
- " workflow_name: Single or combined workflows defined in this script\n",
- " options: Single-hyphen sos parameters (see \"sos run -h\" for details)\n",
- " workflow_options: Double-hyphen workflow-specific parameters\n",
- "\n",
- "Workflows:\n",
- " generate_W\n",
- " process_block\n",
- " merge_chrom\n",
- "\n",
- "Global Workflow Options:\n",
- " --walltime 24:00:00\n",
- " --mem 32G\n",
- " --numThreads 8 (as int)\n",
- " --plink2-bin plink2\n",
- "\n",
- "Sections\n",
- " generate_W:\n",
- " Workflow Options:\n",
- " --n-samples VAL (as int, required)\n",
- " Generate projection matrix W ~ N(0, 1/sqrt(n)), shape (n x B).\n",
- " Run ONCE per cohort before any chromosome processing.\n",
- " W depends only on n (sample size) and B — not on variant data.\n",
- " All 22 chromosomes reuse the same W.\n",
- " --output-dir VAL (as str, required)\n",
- " --B 10000 (as int)\n",
- " --seed 123 (as int)\n",
- "\n",
- " process_block:\n",
- " Workflow Options:\n",
- " --vcf-base VAL (as str, required)\n",
- " Directory containing VCF (.bgz) files, one or more per chromosome.\n",
- " --vcf-prefix VAL (as str, required)\n",
- " Shared filename prefix of all VCF files (everything before \"chr{N}:\" or \"chr{N}.\").\n",
- " --ld-block-file VAL (as str, required)\n",
- " Path to LD block BED file (e.g. EUR_LD_blocks.bed).\n",
- " --chrom 0 (as int)\n",
- " Chromosome 1–22, or 0 for all chromosomes.\n",
- " --cohort-id ADSP.R5.EUR (as str)\n",
- " Cohort prefix for output filenames.\n",
- " --output-dir VAL (as str, required)\n",
- " --W-matrix VAL (as str, required)\n",
- " --B 10000 (as int)\n",
- " --maf-min 0.0005 (as float)\n",
- " --mac-min 5 (as int)\n",
- " --msng-min 0.05 (as float)\n",
- " --sample-list \"\" (optional path to sample ID file)\n",
- " Output per block (intermediate, cleaned up by merge_chrom):\n",
- " {cohort_id}.{chr}_{start}_{end}.dosage.gz\n",
- " {cohort_id}.{chr}_{start}_{end}.map\n",
- " {cohort_id}.{chr}_{start}_{end}.afreq\n",
- " {cohort_id}.{chr}_{start}_{end}.meta\n",
- "\n",
- " merge_chrom:\n",
- " Workflow Options:\n",
- " --chrom 0 (as int)\n",
- " Chromosome 1–22, or 0 for all chromosomes.\n",
- " --cohort-id ADSP.R5.EUR (as str)\n",
- " Must match cohort-id used in process_block.\n",
- " --output-dir VAL (as str, required)\n",
- " Output per chromosome (final, block intermediates cleaned up):\n",
- " {cohort_id}.chr{N}_stocld_panel.pgen\n",
- " {cohort_id}.chr{N}_stocld_panel.pvar\n",
- " {cohort_id}.chr{N}_stocld_panel.psam\n",
- " {cohort_id}.chr{N}_stocld_panel.afreq\n",
- "```\n"
- ]
- },
- {
- "cell_type": "markdown",
- "id": "a1b2c3d4-0007-4007-a007-000000000007",
- "metadata": {
- "kernel": "SoS"
- },
- "source": [
- "# Setup and Global Parameters"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "a1b2c3d4-0008-4008-a008-000000000008",
- "metadata": {
- "kernel": "SoS"
- },
- "outputs": [],
- "source": [
- "[global]\n",
- "parameter: cwd = path(\"output\")\n",
- "parameter: job_size = 1\n",
- "parameter: walltime = \"4h\"\n",
- "parameter: mem = \"32G\"\n",
- "parameter: numThreads = 8\n",
- "parameter: container = \"\"\n",
- "parameter: plink2_bin = \"plink2\"\n",
- "parameter: walltime = \"24:00:00\"\n",
- "parameter: mem = \"32G\"\n",
- "parameter: numThreads = 8\n",
- "\n",
- "import re\n",
- "from sos.utils import expand_size\n",
- "\n",
- "entrypoint = (\n",
- " 'micromamba run -a \"\" -n' + ' ' +\n",
- " re.sub(r'(_apptainer:latest|_docker:latest|\\.sif)$', '', container.split('/')[-1])\n",
- ") if container else \"\"\n",
- "\n",
- "cwd = path(f'{cwd:a}')\n"
- ]
- },
- {
- "cell_type": "markdown",
- "id": "a1b2c3d4-0009-4009-a009-000000000009",
- "metadata": {
- "kernel": "SoS"
- },
- "source": [
- "## `generate_W`"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "a1b2c3d4-0010-4010-a010-000000000010",
- "metadata": {
- "kernel": "SoS"
- },
- "outputs": [],
- "source": [
- "[generate_W]\n",
- "# Generate projection matrix W ~ N(0, 1/sqrt(n)), shape (n x B).\n",
- "# Run ONCE before processing any chromosome.\n",
- "#\n",
- "# W depends only on n (total sample size) and B — not on any variant data.\n",
- "# n_samples is passed directly as a parameter; no VCF reading is needed.\n",
- "# All 22 chromosomes reuse the same W so that per-chromosome stochastic\n",
- "# genotype samples can be arithmetically merged for meta-analysis.\n",
- "parameter: n_samples = int\n",
- "parameter: output_dir = str\n",
- "parameter: B = 10000\n",
- "parameter: seed = 123\n",
- "\n",
- "import os\n",
- "input: []\n",
- "output: f'{output_dir}/W_B{B}.npy'\n",
- "task: trunk_workers = 1, trunk_size = 1, walltime = '00:05:00', mem = '4G', cores = 1\n",
- "python: expand = \"${ }\", stdout = f'{_output:n}.stdout', stderr = f'{_output:n}.stderr'\n",
- "\n",
- " import numpy as np\n",
- " import os\n",
- "\n",
- " n = ${n_samples}\n",
- " B = ${B}\n",
- " seed = ${seed}\n",
- " W_out = \"${_output}\"\n",
- "\n",
- " # ── Generate W ~ N(0, 1/sqrt(n)) ─────────────────────────────\n",
- " # Convention: W = np.random.normal(0, 1/np.sqrt(n), size=(n, B))\n",
- " # W is shared across all chromosomes — do not regenerate per chromosome.\n",
- " print(f\"Generating W ~ N(0, 1/sqrt({n})), shape ({n}, {B}), seed={seed}\")\n",
- " np.random.seed(seed)\n",
- " W = np.random.normal(0, 1.0 / np.sqrt(n), size=(n, B)).astype(np.float32)\n",
- "\n",
- " os.makedirs(os.path.dirname(os.path.abspath(W_out)), exist_ok=True)\n",
- " np.save(W_out, W)\n",
- " print(f\"Saved: {W_out}\")\n",
- " print(f\"Shape: {W.shape}, size: {os.path.getsize(W_out)/1e9:.2f} GB\")"
- ]
- },
- {
- "cell_type": "markdown",
- "id": "a1b2c3d4-0011-4011-a011-000000000011",
- "metadata": {
- "kernel": "SoS"
- },
- "source": [
- "## `process_block`\n"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "a1b2c3d4-0012-4012-a012-000000000012",
- "metadata": {
- "kernel": "SoS"
- },
- "outputs": [],
- "source": [
- "[process_block_1]\n",
- "# Read BED file and write one .block coord file per LD block.\n",
- "parameter: ld_block_file = str\n",
- "parameter: chrom = 0\n",
- "\n",
- "import os\n",
- "\n",
- "def _read_blocks(bed, chrom_filter):\n",
- " blocks = []\n",
- " with open(bed) as fh:\n",
- " for line in fh:\n",
- " if line.startswith(\"#\") or not line.strip():\n",
- " continue\n",
- " parts = line.split()\n",
- " c = parts[0]\n",
- " if not (c.startswith(\"chr\") and c[3:].isdigit()):\n",
- " continue\n",
- " cnum = int(c[3:])\n",
- " if not (1 <= cnum <= 22):\n",
- " continue\n",
- " if chrom_filter != 0 and cnum != chrom_filter:\n",
- " continue\n",
- " blocks.append((c, int(parts[1]), int(parts[2])))\n",
- " if not blocks:\n",
- " raise ValueError(f\"No blocks found for chrom={chrom_filter} in {bed}\")\n",
- " return blocks\n",
- "\n",
- "_blocks = _read_blocks(ld_block_file, chrom)\n",
- "print(f\" {len(_blocks)} LD blocks queued\")\n",
- "\n",
- "import tempfile\n",
- "block_dir = tempfile.mkdtemp(prefix=\"sos_blocks_\")\n",
- "\n",
- "input: []\n",
- "output: [f\"{block_dir}/{c}_{s}_{e}.block\" for c, s, e in _blocks]\n",
- "python: expand = \"${ }\"\n",
- " import os, json\n",
- " blocks = ${_blocks!r}\n",
- " block_dir = \"${block_dir}\"\n",
- " for c, s, e in blocks:\n",
- " path = os.path.join(block_dir, f\"{c}_{s}_{e}.block\")\n",
- " with open(path, \"w\") as fh:\n",
- " fh.write(f\"{c}\\n{s}\\n{e}\\n\")\n"
- ]
- },
- {
- "cell_type": "markdown",
- "id": "a1b2c3d4-0031-4031-a031-000000000031",
- "metadata": {},
- "source": [
- "## `process_block_2`\n"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "a1b2c3d4-0030-4030-a030-000000000030",
- "metadata": {
- "kernel": "SoS"
- },
- "outputs": [],
- "source": [
- "[process_block_2]\n",
- "parameter: vcf_base = str\n",
- "parameter: vcf_prefix = str\n",
- "parameter: cohort_id = \"ADSP.R5.EUR\"\n",
- "parameter: output_dir = str\n",
- "parameter: W_matrix = str\n",
- "parameter: B = 10000\n",
- "parameter: maf_min = 0.0005\n",
- "parameter: mac_min = 5\n",
- "parameter: msng_min = 0.05\n",
- "parameter: sample_list = \"\"\n",
- "input: output_from(\"process_block_1\"), group_by = 1\n",
- "task: trunk_workers = 1, trunk_size = 1, walltime = walltime, mem = mem, cores = numThreads\n",
- "python: expand = \"${ }\"\n",
- "\n",
- " import numpy as np\n",
- " import os\n",
- " import gzip\n",
- " import sys\n",
- " from math import nan\n",
- " from cyvcf2 import VCF\n",
- " from os import listdir\n",
- "\n",
- " # Read block coordinates from input file\n",
- " with open(\"${_input}\") as fh:\n",
- " lines = fh.read().splitlines()\n",
- " chrm_str = lines[0]\n",
- " block_start = int(lines[1])\n",
- " block_end = int(lines[2])\n",
- "\n",
- " vcf_base = \"${vcf_base}\"\n",
- " vcf_prefix = \"${vcf_prefix}\"\n",
- " W_path = \"${W_matrix}\"\n",
- " B = ${B}\n",
- " maf_min = ${maf_min}\n",
- " mac_min = ${mac_min}\n",
- " msng_min = ${msng_min}\n",
- " sample_list = \"${sample_list}\"\n",
- " cohort_id = \"${cohort_id}\"\n",
- " base_dir = \"${output_dir}\"\n",
- "\n",
- " block_tag = f\"{chrm_str}_{block_start}_{block_end}\"\n",
- " output_dir = os.path.join(base_dir, chrm_str, block_tag)\n",
- " os.makedirs(output_dir, exist_ok=True)\n",
- "\n",
- " log_path = os.path.join(output_dir, f\"{block_tag}.log\")\n",
- " log_fh = open(log_path, \"w\")\n",
- " sys.stdout = log_fh\n",
- " sys.stderr = log_fh\n",
- "\n",
- " # ── Load sample subset (optional) ─────────────────────────────\n",
- " sample_subset = None\n",
- " if sample_list:\n",
- " if not os.path.exists(sample_list):\n",
- " raise FileNotFoundError(f\"sample_list not found: {sample_list}\")\n",
- " with open(sample_list) as fh:\n",
- " sample_subset = set(line.strip() for line in fh if line.strip())\n",
- " print(f\" Sample subset: {len(sample_subset):,} samples\")\n",
- "\n",
- " # ── Helpers ───────────────────────────────────────────────────\n",
- " def get_vcf_files(chrm_str):\n",
- " files = sorted([\n",
- " os.path.join(vcf_base, x)\n",
- " for x in listdir(vcf_base)\n",
- " if x.endswith(\".bgz\") and (\n",
- " x.startswith(vcf_prefix + chrm_str + \":\") or\n",
- " x.startswith(vcf_prefix + chrm_str + \".\")\n",
- " )\n",
- " ])\n",
- " if not files:\n",
- " raise FileNotFoundError(f\"No VCF files for {chrm_str} in {vcf_base}\")\n",
- " return files\n",
- "\n",
- " def fill_missing_col_means(G):\n",
- " col_means = np.nanmean(G, axis=0)\n",
- " return np.where(np.isnan(G), col_means, G)\n",
- "\n",
- " # ── Step 1: Scan variants in block ────────────────────────────\n",
- " print(f\"[1/3] Scanning {chrm_str} [{block_start:,}, {block_end:,}) ...\")\n",
- " vcf_files = get_vcf_files(chrm_str)\n",
- " region = f\"{chrm_str}:{block_start+1}-{block_end}\"\n",
- " var_info = []\n",
- " n_samples = None\n",
- "\n",
- " for vf in vcf_files:\n",
- " vcf = VCF(vf)\n",
- " if sample_subset is not None:\n",
- " vcf_samples = vcf.samples\n",
- " keep_idx = [i for i, s in enumerate(vcf_samples) if s in sample_subset]\n",
- " if not keep_idx:\n",
- " raise ValueError(f\"No sample_list samples in {os.path.basename(vf)}\")\n",
- " vcf.set_samples([vcf_samples[i] for i in keep_idx])\n",
- " if n_samples is None:\n",
- " n_samples = len(vcf.samples)\n",
- " for var in vcf(region):\n",
- " if not (block_start <= var.POS - 1 < block_end):\n",
- " continue\n",
- " if len(var.ALT) != 1:\n",
- " continue\n",
- " dosage = [\n",
- " sum(x[0:2])\n",
- " for x in [[nan if v == -1 else v for v in gt]\n",
- " for gt in var.genotypes]\n",
- " ]\n",
- " if np.nanvar(dosage) == 0:\n",
- " continue\n",
- " counts = [np.nansum([2 - x for x in dosage]), np.nansum(dosage)]\n",
- " nan_count = int(np.sum(np.isnan(dosage)))\n",
- " n_non_na = len(dosage) - nan_count\n",
- " mac = min(counts)\n",
- " maf = mac / n_non_na\n",
- " af = float(np.nansum(dosage)) / (2 * n_non_na)\n",
- " msng_rate = nan_count / (n_non_na + nan_count)\n",
- " if maf < maf_min or mac < mac_min or msng_rate > msng_min:\n",
- " continue\n",
- " var_info.append({\n",
- " \"chr\": var.CHROM, \"pos\": var.POS,\n",
- " \"ref\": var.REF, \"alt\": var.ALT[0],\n",
- " \"af\": round(float(af), 6),\n",
- " \"id\": f\"{var.CHROM}:{var.POS}:{var.REF}:{var.ALT[0]}\",\n",
- " })\n",
- " vcf.close()\n",
- " print(f\" {len(var_info):,} variants passing filters (n={n_samples:,})\")\n",
- "\n",
- " if not var_info:\n",
- " raise ValueError(f\"No passing variants in {chrm_str} [{block_start:,}, {block_end:,})\")\n",
- "\n",
- " # ── Step 2: Load W ────────────────────────────────────────────\n",
- " print(f\"[2/3] Loading W ...\")\n",
- " W = np.load(W_path)\n",
- " if W.shape != (n_samples, B):\n",
- " raise ValueError(f\"W shape mismatch: {W.shape} vs ({n_samples},{B})\")\n",
- " W = W.astype(np.float32)\n",
- " print(f\" W: {W.shape}\")\n",
- "\n",
- " # ── Step 3: Compute U, write compressed dosage + map + afreq ──\n",
- " print(f\"[3/3] Computing U and writing output files ...\")\n",
- " var_lookup = {v[\"id\"]: v for v in var_info}\n",
- " var_id_set = set(var_lookup)\n",
- "\n",
- " dosage_path = os.path.join(output_dir, f\"{cohort_id}.{block_tag}.dosage.gz\")\n",
- " map_path = os.path.join(output_dir, f\"{cohort_id}.{block_tag}.map\")\n",
- " afreq_path = os.path.join(output_dir, f\"{cohort_id}.{block_tag}.afreq\")\n",
- " meta_path = os.path.join(output_dir, f\"{cohort_id}.{block_tag}.meta\")\n",
- "\n",
- " # Write .map\n",
- " with open(map_path, \"w\") as fh:\n",
- " for v in var_info:\n",
- " fh.write(f\"{v['chr'].replace('chr','')}\\t{v['id']}\\t0\\t{v['pos']}\\n\")\n",
- "\n",
- " # Write .meta\n",
- " with open(meta_path, \"w\") as fh:\n",
- " fh.write(f\"source_n_samples={n_samples}\\nB={B}\\n\")\n",
- " fh.write(f\"chrom={chrm_str}\\nblock_start={block_start}\\nblock_end={block_end}\\n\")\n",
- "\n",
- " # Write .afreq\n",
- " obs_ct = 2 * n_samples\n",
- " with open(afreq_path, \"w\") as fh:\n",
- " fh.write(\"#CHROM\\tID\\tREF\\tALT\\tALT_FREQS\\tOBS_CT\\n\")\n",
- " for v in var_info:\n",
- " fh.write(f\"{v['chr'].replace('chr','')}\\t{v['id']}\\t{v['ref']}\\t{v['alt']}\\t\"\n",
- " f\"{v['af']:.6f}\\t{obs_ct}\\n\")\n",
- "\n",
- " # Compute U and write compressed dosage (format=1: ID ALT REF val_S1 ... val_SB)\n",
- " total = 0\n",
- " with gzip.open(dosage_path, \"wt\", compresslevel=4) as gz:\n",
- " for vf in vcf_files:\n",
- " arr, var_keys = [], []\n",
- " vcf_obj = VCF(vf)\n",
- " for var in vcf_obj(region):\n",
- " if not (block_start <= var.POS - 1 < block_end):\n",
- " continue\n",
- " if len(var.ALT) != 1:\n",
- " continue\n",
- " vid = f\"{var.CHROM}:{var.POS}:{var.REF}:{var.ALT[0]}\"\n",
- " if vid not in var_id_set:\n",
- " continue\n",
- " dosage = [\n",
- " sum(x[0:2])\n",
- " for x in [[nan if v == -1 else v for v in gt]\n",
- " for gt in var.genotypes]\n",
- " ]\n",
- " arr.append(dosage)\n",
- " var_keys.append(vid)\n",
- " vcf_obj.close()\n",
- " if not arr:\n",
- " continue\n",
- " G_chunk = np.array(arr, dtype=np.float32).T\n",
- " G_chunk = fill_missing_col_means(G_chunk)\n",
- " U_chunk = W.T @ G_chunk\n",
- " del G_chunk\n",
- " col_min = U_chunk.min(axis=0)\n",
- " col_max = U_chunk.max(axis=0)\n",
- " denom = col_max - col_min\n",
- " denom[denom == 0] = 1.0\n",
- " U_chunk = 2.0 * (U_chunk - col_min) / denom\n",
- " U_chunk = np.round(U_chunk, 4)\n",
- " for j, vid in enumerate(var_keys):\n",
- " v = var_lookup[vid]\n",
- " vals = \" \".join(f\"{x:.4f}\" for x in U_chunk[:, j])\n",
- " gz.write(f\"{vid} {v['alt']} {v['ref']} {vals}\\n\")\n",
- " total += len(var_keys)\n",
- " del U_chunk\n",
- " print(f\" Written: {total:,} variants → {os.path.basename(dosage_path)}\")\n",
- " print(f\" Written: {os.path.basename(map_path)}, {os.path.basename(afreq_path)}\")\n",
- " print(f\"\\nDone: {chrm_str} [{block_start:,}, {block_end:,})\")\n"
- ]
- },
- {
- "cell_type": "markdown",
- "id": "a1b2c3d4-0040-4040-a040-000000000040",
- "metadata": {},
- "source": [
- "## `merge_chrom`\n"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "id": "a1b2c3d4-0041-4041-a041-000000000041",
- "metadata": {
- "kernel": "SoS"
- },
- "outputs": [],
- "source": [
- "[merge_chrom]\n",
- "# Merge all per-block dosage files for a chromosome into one per-chrom pgen.\n",
- "# Steps per chromosome:\n",
- "# 1. For each block: plink2 --import-dosage → per-block pgen (temp)\n",
- "# 2. plink2 --pmerge-list → one per-chrom pgen\n",
- "# 3. Clean up all per-block dirs\n",
- "#\n",
- "# Run after process_block completes (or run together: sos run ... process_block merge_chrom)\n",
- "parameter: chrom = 0\n",
- "parameter: output_dir = str\n",
- "parameter: cohort_id = \"ADSP.R5.EUR\"\n",
- "\n",
- "import os, glob\n",
- "\n",
- "def _chroms_to_process(output_dir, chrom_filter):\n",
- " if chrom_filter != 0:\n",
- " return [f\"chr{chrom_filter}\"]\n",
- " found = sorted(set(\n",
- " os.path.basename(d)\n",
- " for d in glob.glob(os.path.join(output_dir, \"chr*\"))\n",
- " if os.path.isdir(d)\n",
- " ))\n",
- " return found\n",
- "\n",
- "_chroms = _chroms_to_process(output_dir, chrom)\n",
- "print(f\" Merging {len(_chroms)} chromosome(s): {', '.join(_chroms)}\")\n",
- "\n",
- "input: []\n",
- "for_each: {\"_chrom\": _chroms}\n",
- "task: trunk_workers = 1, trunk_size = 1, walltime = walltime, mem = mem, cores = numThreads\n",
- "bash: expand = \"${ }\"\n",
- "\n",
- " set -euo pipefail\n",
- " chrom=\"${_chrom}\"\n",
- " cohort_id=\"${cohort_id}\"\n",
- " output_dir=\"${output_dir}\"\n",
- " plink2_bin=\"${plink2_bin}\"\n",
- " chrom_dir=\"${output_dir}/${chrom}\"\n",
- " final_prefix=\"${chrom_dir}/${cohort_id}.${chrom}_stocld_panel\"\n",
- " merge_list=\"${chrom_dir}/${cohort_id}.${chrom}_pmerge_list.txt\"\n",
- " psam_path=\"${chrom_dir}/${cohort_id}.${chrom}_stocld_panel.psam\"\n",
- "\n",
- " echo \"[merge_chrom] ${chrom}: starting\"\n",
- "\n",
- " # ── Step 1: Convert each block dosage.gz → temp per-block pgen ──\n",
- " > \"${merge_list}\"\n",
- " for dosage_gz in \"${chrom_dir}\"/*/*.dosage.gz; do\n",
- " block_dir=$(dirname \"${dosage_gz}\")\n",
- " block_tag=$(basename \"${block_dir}\")\n",
- " prefix=\"${block_dir}/${cohort_id}.${block_tag}_tmp\"\n",
- " map_file=\"${block_dir}/${cohort_id}.${block_tag}.map\"\n",
- " psam_file=\"${block_dir}/${cohort_id}.${block_tag}.psam\"\n",
- "\n",
- " # Write .psam if not present (B pseudo-individuals)\n",
- " B=$(grep \"^B=\" \"${block_dir}/${cohort_id}.${block_tag}.meta\" | cut -d= -f2)\n",
- " if [ ! -f \"${psam_file}\" ]; then\n",
- " echo \"#FID IID\" > \"${psam_file}\"\n",
- " for i in $(seq 1 ${B}); do echo \"S${i} S${i}\"; done >> \"${psam_file}\"\n",
- " fi\n",
- "\n",
- " echo \" importing ${block_tag} ($(zcat ${dosage_gz} | wc -l) variants)\"\n",
- " \"${plink2_bin}\" --import-dosage \"${dosage_gz}\" format=1 noheader --psam \"${psam_file}\" --map \"${map_file}\" --make-pgen --out \"${prefix}\" --silent\n",
- "\n",
- " echo \"${prefix}\" >> \"${merge_list}\"\n",
- " done\n",
- "\n",
- " # ── Step 2: Write shared .psam for the merged file ──\n",
- " B=$(grep \"^B=\" $(ls \"${chrom_dir}\"/*/*.meta | head -1) | cut -d= -f2)\n",
- " echo \"#FID IID\" > \"${psam_path}\"\n",
- " for i in $(seq 1 ${B}); do echo \"S${i} S${i}\"; done >> \"${psam_path}\"\n",
- "\n",
- " # ── Step 3: Merge all per-block pgens → one per-chrom pgen ──\n",
- " n_blocks=$(wc -l < \"${merge_list}\")\n",
- " echo \" merging ${n_blocks} blocks → ${final_prefix}\"\n",
- " \"${plink2_bin}\" --pmerge-list \"${merge_list}\" pfile --psam \"${psam_path}\" --make-pgen --out \"${final_prefix}\"\n",
- "\n",
- " # ── Step 4: Concatenate .afreq files ──\n",
- " afreq_out=\"${final_prefix}.afreq\"\n",
- " first=1\n",
- " for f in \"${chrom_dir}\"/*/*.afreq; do\n",
- " if [ \"${first}\" -eq 1 ]; then\n",
- " cat \"${f}\" > \"${afreq_out}\"\n",
- " first=0\n",
- " else\n",
- " tail -n +2 \"${f}\" >> \"${afreq_out}\"\n",
- " fi\n",
- " done\n",
- " echo \" afreq written: ${afreq_out}\"\n",
- "\n",
- " # ── Step 5: Clean up per-block intermediate files ──\n",
- " rm -f \"${merge_list}\" \"${psam_path}\"\n",
- " for block_dir in \"${chrom_dir}\"/*/; do\n",
- " rm -rf \"${block_dir}\"\n",
- " done\n",
- " echo \"[merge_chrom] ${chrom}: done → ${final_prefix}.{pgen,pvar,psam,afreq}\"\n"
- ]
- }
- ],
- "metadata": {
- "kernelspec": {
- "display_name": "Python 3 (ipykernel)",
- "language": "python",
- "name": "python3"
- },
- "language_info": {
- "codemirror_mode": {
- "name": "ipython",
- "version": 3
- },
- "file_extension": ".py",
- "mimetype": "text/x-python",
- "name": "python",
- "nbconvert_exporter": "python",
- "pygments_lexer": "ipython3",
- "version": "3.12.13"
- },
- "sos": {
- "kernels": [
- [
- "SoS",
- "sos",
- "sos",
- "",
- ""
- ]
- ],
- "version": ""
- }
- },
- "nbformat": 4,
- "nbformat_minor": 5
-}
From 7a4e42c9dce772f1da2ea5b53d0033b9b7517e69 Mon Sep 17 00:00:00 2001
From: Jenny Empawi <80464805+jaempawi@users.noreply.github.com>
Date: Fri, 27 Mar 2026 12:02:00 -0400
Subject: [PATCH 2/2] Fix merge_chrom bug
---
code/reference_data/rss_ld_sketch.ipynb | 831 ++++++++++++++++++++++++
1 file changed, 831 insertions(+)
create mode 100644 code/reference_data/rss_ld_sketch.ipynb
diff --git a/code/reference_data/rss_ld_sketch.ipynb b/code/reference_data/rss_ld_sketch.ipynb
new file mode 100644
index 000000000..53b626c50
--- /dev/null
+++ b/code/reference_data/rss_ld_sketch.ipynb
@@ -0,0 +1,831 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "id": "a1b2c3d4-0001-4001-a001-000000000001",
+ "metadata": {
+ "kernel": "SoS"
+ },
+ "source": [
+ "# RSS LD Sketch Pipeline\n",
+ "\n",
+ "## Overview\n",
+ "\n",
+ "This pipeline generates a stochastic genotype sample **U = WᵀG** from whole-genome sequencing VCF files and stores it as a PLINK2 pgen file for use as an LD reference panel with SuSiE-RSS fine-mapping.\n",
+ "\n",
+ "**Key idea:** Rather than storing the full genotype matrix G (n × p, ~530 MB/block), we compute U = WᵀG (B × p, ~320 MB/block) using a random projection matrix W ~ N(0, 1/√n). The approximate LD matrix R̂ = UᵀU / B ≈ R = GᵀG / n by the Johnson-Lindenstrauss lemma. G is never stored.\n",
+ "\n",
+ "**Matrix dimensions:**\n",
+ "- G : (n × p) — n individuals × p variants (~8,000 per block)\n",
+ "- W : (n × B) — projection matrix, W ~ N(0, 1/√n), generated once per cohort\n",
+ "- U : (B × p) — stochastic genotype sample = WᵀG, stored in pgen\n",
+ "- R̂ : (p × p) — approximate LD matrix, computed on-the-fly by SuSiE-RSS from U\n",
+ "\n",
+ "**Pipeline steps:**\n",
+ "1. `generate_W` — generate W once from cohort sample size n (saved as `.npy`, shared across all chromosomes)\n",
+ "2. `process_block_1` — read LD block BED file, emit one coordinate file per block\n",
+ "3. `process_block_2` — for each block in parallel: load VCF, compute U = WᵀG, scale to [0,2], write compressed dosage + allele frequencies\n",
+ "4. `merge_chrom` — for each chromosome: convert per-block dosage files to sorted pgen, merge into one per-chromosome pgen, clean up all intermediates\n",
+ "\n",
+ "## Input Files\n",
+ "\n",
+ "| File | Description |\n",
+ "|------|-------------|\n",
+ "| VCF (.bgz) | Per-chromosome VCF files with tabix index |\n",
+ "| `sample_list.txt` | Optional. One sample ID per line. Leave empty to use all samples. |\n",
+ "| `EUR_LD_blocks.bed` | LD block BED file (0-based half-open, e.g. Berisa & Pickrell EUR blocks) |\n",
+ "\n",
+ "## Parameters\n",
+ "\n",
+ "**Global parameters:**\n",
+ "\n",
+ "| Parameter | Default | Description |\n",
+ "|-----------|---------|-------------|\n",
+ "| `plink2_bin` | `plink2` | Path to plink2 binary |\n",
+ "| `walltime` | `24:00:00` | Per-task walltime |\n",
+ "| `mem` | `32G` | Per-task memory |\n",
+ "| `numThreads` | `8` | Per-task CPU cores |\n",
+ "\n",
+ "**`generate_W` parameters:**\n",
+ "\n",
+ "| Parameter | Default | Description |\n",
+ "|-----------|---------|-------------|\n",
+ "| `n_samples` | required | Total number of individuals in the cohort |\n",
+ "| `output_dir` | required | Output directory for `W_B{B}.npy` |\n",
+ "| `B` | `10000` | Sketch dimension (number of pseudo-individuals) |\n",
+ "| `seed` | `123` | Random seed |\n",
+ "\n",
+ "**`process_block` parameters:**\n",
+ "\n",
+ "| Parameter | Default | Description |\n",
+ "|-----------|---------|-------------|\n",
+ "| `vcf_base` | required | Directory containing VCF (.bgz) files |\n",
+ "| `vcf_prefix` | required | Shared filename prefix of all VCF files |\n",
+ "| `ld_block_file` | required | Path to LD block BED file |\n",
+ "| `chrom` | `0` | Chromosome 1–22, or 0 for all chromosomes |\n",
+ "| `cohort_id` | `ADSP.R5.EUR` | Cohort prefix for output filenames |\n",
+ "| `output_dir` | required | Base output directory |\n",
+ "| `W_matrix` | required | Path to W `.npy` output from `generate_W` |\n",
+ "| `B` | `10000` | Must match W |\n",
+ "| `sample_list` | `\"\"` | Optional path to sample ID file |\n",
+ "| `maf_min` | `0.0005` | Minimum minor allele frequency |\n",
+ "| `mac_min` | `5` | Minimum minor allele count |\n",
+ "| `msng_min` | `0.05` | Maximum missingness rate per variant |\n",
+ "\n",
+ "**`merge_chrom` parameters:**\n",
+ "\n",
+ "| Parameter | Default | Description |\n",
+ "|-----------|---------|-------------|\n",
+ "| `chrom` | `0` | Chromosome 1–22, or 0 for all chromosomes |\n",
+ "| `output_dir` | required | Base output directory (same as `process_block`) |\n",
+ "| `cohort_id` | `ADSP.R5.EUR` | Must match `process_block` cohort_id |\n",
+ "\n",
+ "## Output Files\n",
+ "\n",
+ "| File | Description |\n",
+ "|------|-------------|\n",
+ "| `W_B{B}.npy` | Projection matrix W, shape (n × B). Output of `generate_W`. |\n",
+ "| `{cohort_id}.chr{N}.pgen` | Final output — per-chromosome stochastic genotype sample U as PLINK2 binary |\n",
+ "| `{cohort_id}.chr{N}.pvar` | Per-chromosome variant information |\n",
+ "| `{cohort_id}.chr{N}.psam` | Pseudo-individual IDs S1…S{B} |\n",
+ "| `{cohort_id}.chr{N}.afreq` | Per-chromosome allele frequencies (OBS_CT = 2 × n) |\n",
+ "| *(cleaned up after merge)* | Per-block `.dosage.gz`, `.map`, `.afreq`, `.meta` intermediate files |\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "3547538e-ef0f-49d3-aa45-208a278f5d3c",
+ "metadata": {},
+ "source": [
+ "### Step 0 — Generate W (run once per cohort)\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "a1b2c3d4-0002-4002-a002-000000000002",
+ "metadata": {
+ "kernel": "SoS"
+ },
+ "outputs": [],
+ "source": [
+ "sos run pipeline/rss_ld_sketch.ipynb generate_W \\\n",
+ " --n-samples 16571 \\\n",
+ " --output-dir output/rss_ld_sketch \\\n",
+ " --B 10000 \\\n",
+ " --seed 123\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "e2559a2e-8bdd-443b-980f-26fe7f728674",
+ "metadata": {},
+ "source": [
+ "### Step 1+2 — Process all blocks → compressed dosage files\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "a1b2c3d4-0003-4003-a003-000000000003",
+ "metadata": {
+ "kernel": "SoS"
+ },
+ "outputs": [],
+ "source": [
+ "# Process one chromosome\n",
+ "sos run pipeline/rss_ld_sketch.ipynb process_block \\\n",
+ " --vcf-base /path/to/vcfs/ \\\n",
+ " --vcf-prefix your.prefix. \\\n",
+ " --ld-block-file /path/to/EUR_LD_blocks.bed \\\n",
+ " --chrom 22 \\\n",
+ " --output-dir output/rss_ld_sketch \\\n",
+ " --W-matrix output/rss_ld_sketch/W_B10000.npy "
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "7620754c-5060-460f-a7a3-576dcf686270",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# Process all 22 chromosomes\n",
+ "sos run pipeline/rss_ld_sketch.ipynb process_block \\\n",
+ " --vcf-base /restricted/projectnb/xqtl/R4_QC_NHWonly_rm_monomorphic \\\n",
+ " --vcf-prefix gcad.qc.r4.wgs.36361.GATK.2022.08.15.biallelic.genotypes. \\\n",
+ " --ld-block-file /restricted/projectnb/casa/oaolayin/ROSMAP_NIA_geno/EUR_LD_blocks.bed \\\n",
+ " --chrom 0 \\\n",
+ " --output-dir output/rss_ld_sketch \\\n",
+ " --W-matrix output/rss_ld_sketch/W_B10000.npy \n",
+ "\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "19fdaf69-ae0d-45fb-b7a1-dc5a03ff9a51",
+ "metadata": {},
+ "source": [
+ "### Step 3 — Merge per-block dosage files into per-chromosome pgen\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "f06fcbea-891d-4b3c-937d-817678c8858d",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "sos run pipeline/rss_ld_sketch.ipynb merge_chrom \\\n",
+ " --output-dir output/rss_ld_sketch \\\n",
+ " --chrom 0\n",
+ "\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "a1b2c3d4-0004-4004-a004-000000000004",
+ "metadata": {
+ "kernel": "SoS"
+ },
+ "source": [
+ "## Command Interface"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "a1b2c3d4-0005-4005-a005-000000000005",
+ "metadata": {
+ "kernel": "SoS"
+ },
+ "outputs": [],
+ "source": [
+ "sos run pipeline/rss_ld_sketch.ipynb -h"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "a1b2c3d4-0006-4006-a006-000000000006",
+ "metadata": {
+ "kernel": "SoS"
+ },
+ "source": [
+ "```\n",
+ "usage: sos run pipeline/rss_ld_sketch.ipynb\n",
+ " [workflow_name | -t targets] [options] [workflow_options]\n",
+ " workflow_name: Single or combined workflows defined in this script\n",
+ " options: Single-hyphen sos parameters (see \"sos run -h\" for details)\n",
+ " workflow_options: Double-hyphen workflow-specific parameters\n",
+ "\n",
+ "Workflows:\n",
+ " generate_W\n",
+ " process_block\n",
+ " merge_chrom\n",
+ "\n",
+ "Global Workflow Options:\n",
+ " --walltime 24:00:00\n",
+ " --mem 32G\n",
+ " --numThreads 8 (as int)\n",
+ " --plink2-bin plink2\n",
+ "\n",
+ "Sections\n",
+ " generate_W:\n",
+ " Workflow Options:\n",
+ " --n-samples VAL (as int, required)\n",
+ " Generate projection matrix W ~ N(0, 1/sqrt(n)), shape (n x B).\n",
+ " Run ONCE per cohort before any chromosome processing.\n",
+ " W depends only on n (sample size) and B — not on variant data.\n",
+ " All 22 chromosomes reuse the same W.\n",
+ " --output-dir VAL (as str, required)\n",
+ " --B 10000 (as int)\n",
+ " --seed 123 (as int)\n",
+ "\n",
+ " process_block:\n",
+ " Workflow Options:\n",
+ " --vcf-base VAL (as str, required)\n",
+ " Directory containing VCF (.bgz) files, one or more per chromosome.\n",
+ " --vcf-prefix VAL (as str, required)\n",
+ " Shared filename prefix of all VCF files (everything before \"chr{N}:\" or \"chr{N}.\").\n",
+ " --ld-block-file VAL (as str, required)\n",
+ " Path to LD block BED file (e.g. EUR_LD_blocks.bed).\n",
+ " --chrom 0 (as int)\n",
+ " Chromosome 1–22, or 0 for all chromosomes.\n",
+ " --cohort-id ADSP.R5.EUR (as str)\n",
+ " Cohort prefix for output filenames.\n",
+ " --output-dir VAL (as str, required)\n",
+ " --W-matrix VAL (as str, required)\n",
+ " --B 10000 (as int)\n",
+ " --maf-min 0.0005 (as float)\n",
+ " --mac-min 5 (as int)\n",
+ " --msng-min 0.05 (as float)\n",
+ " --sample-list \"\" (optional path to sample ID file)\n",
+ " Output per block (intermediate, cleaned up by merge_chrom):\n",
+ " {cohort_id}.{chr}_{start}_{end}.dosage.gz\n",
+ " {cohort_id}.{chr}_{start}_{end}.map\n",
+ " {cohort_id}.{chr}_{start}_{end}.afreq\n",
+ " {cohort_id}.{chr}_{start}_{end}.meta\n",
+ "\n",
+ " merge_chrom:\n",
+ " Workflow Options:\n",
+ " --chrom 0 (as int)\n",
+ " Chromosome 1–22, or 0 for all chromosomes.\n",
+ " --cohort-id ADSP.R5.EUR (as str)\n",
+ " Must match cohort-id used in process_block.\n",
+ " --output-dir VAL (as str, required)\n",
+ " Output per chromosome (final, block intermediates cleaned up):\n",
+ " {cohort_id}.chr{N}.pgen\n",
+ " {cohort_id}.chr{N}.pvar\n",
+ " {cohort_id}.chr{N}.psam\n",
+ " {cohort_id}.chr{N}.afreq\n",
+ "```\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "a1b2c3d4-0007-4007-a007-000000000007",
+ "metadata": {
+ "kernel": "SoS"
+ },
+ "source": [
+ "# Pipeline Implementation\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "a1b2c3d4-0008-4008-a008-000000000008",
+ "metadata": {
+ "kernel": "SoS"
+ },
+ "outputs": [],
+ "source": [
+ "[global]\n",
+ "parameter: cwd = path(\"output\")\n",
+ "parameter: job_size = 1\n",
+ "parameter: walltime = \"24:00:00\"\n",
+ "parameter: mem = \"16G\"\n",
+ "parameter: numThreads = 16\n",
+ "parameter: container = \"\"\n",
+ "parameter: plink2_bin = \"plink2\"\n",
+ "\n",
+ "import re\n",
+ "from sos.utils import expand_size\n",
+ "\n",
+ "entrypoint = (\n",
+ " 'micromamba run -a \"\" -n' + ' ' +\n",
+ " re.sub(r'(_apptainer:latest|_docker:latest|\\.sif)$', '', container.split('/')[-1])\n",
+ ") if container else \"\"\n",
+ "\n",
+ "cwd = path(f'{cwd:a}')\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "a1b2c3d4-0009-4009-a009-000000000009",
+ "metadata": {
+ "kernel": "SoS"
+ },
+ "source": [
+ "## `generate_W`\n",
+ "\n",
+ "Generates the random projection matrix W ~ N(0, 1/√n), shape (n × B). Run once per cohort. W is shared across all chromosomes.\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "a1b2c3d4-0010-4010-a010-000000000010",
+ "metadata": {
+ "kernel": "SoS"
+ },
+ "outputs": [],
+ "source": [
+ "[generate_W]\n",
+ "# Generate projection matrix W ~ N(0, 1/sqrt(n)), shape (n x B).\n",
+ "# Run ONCE before processing any chromosome.\n",
+ "#\n",
+ "# W depends only on n (total sample size) and B — not on any variant data.\n",
+ "# n_samples is passed directly as a parameter; no VCF reading is needed.\n",
+ "# All 22 chromosomes reuse the same W so that per-chromosome stochastic\n",
+ "# genotype samples can be arithmetically merged for meta-analysis.\n",
+ "parameter: n_samples = int\n",
+ "parameter: output_dir = str\n",
+ "parameter: B = 10000\n",
+ "parameter: seed = 123\n",
+ "\n",
+ "import os\n",
+ "input: []\n",
+ "output: f'{output_dir}/W_B{B}.npy'\n",
+ "task: trunk_workers = 1, trunk_size = 1, walltime = '00:05:00', mem = '4G', cores = 1\n",
+ "python: expand = \"${ }\", stdout = f'{_output:n}.stdout', stderr = f'{_output:n}.stderr'\n",
+ "\n",
+ " import numpy as np\n",
+ " import os\n",
+ "\n",
+ " n = ${n_samples}\n",
+ " B = ${B}\n",
+ " seed = ${seed}\n",
+ " W_out = \"${_output}\"\n",
+ "\n",
+ " # ── Generate W ~ N(0, 1/sqrt(n)) ─────────────────────────────\n",
+ " # Convention: W = np.random.normal(0, 1/np.sqrt(n), size=(n, B))\n",
+ " # W is shared across all chromosomes — do not regenerate per chromosome.\n",
+ " print(f\"Generating W ~ N(0, 1/sqrt({n})), shape ({n}, {B}), seed={seed}\")\n",
+ " np.random.seed(seed)\n",
+ " W = np.random.normal(0, 1.0 / np.sqrt(n), size=(n, B)).astype(np.float32)\n",
+ "\n",
+ " os.makedirs(os.path.dirname(os.path.abspath(W_out)), exist_ok=True)\n",
+ " np.save(W_out, W)\n",
+ " print(f\"Saved: {W_out}\")\n",
+ " print(f\"Shape: {W.shape}, size: {os.path.getsize(W_out)/1e9:.2f} GB\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "a1b2c3d4-0011-4011-a011-000000000011",
+ "metadata": {
+ "kernel": "SoS"
+ },
+ "source": [
+ "## `process_block`\n",
+ "\n",
+ "Two-step workflow. `process_block_1` reads the BED file and emits one coordinate file per LD block. `process_block_2` reads the VCF for each block, computes U = WᵀG, scales to [0,2], and writes compressed dosage + allele frequency files. G is never stored.\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "a1b2c3d4-0012-4012-a012-000000000012",
+ "metadata": {
+ "kernel": "SoS"
+ },
+ "outputs": [],
+ "source": [
+ "[process_block_1]\n",
+ "# Read BED file and write one .block coord file per LD block.\n",
+ "parameter: ld_block_file = str\n",
+ "parameter: chrom = 0\n",
+ "\n",
+ "import os\n",
+ "\n",
+ "def _read_blocks(bed, chrom_filter):\n",
+ " blocks = []\n",
+ " with open(bed) as fh:\n",
+ " for line in fh:\n",
+ " if line.startswith(\"#\") or not line.strip():\n",
+ " continue\n",
+ " parts = line.split()\n",
+ " c = parts[0]\n",
+ " if not (c.startswith(\"chr\") and c[3:].isdigit()):\n",
+ " continue\n",
+ " cnum = int(c[3:])\n",
+ " if not (1 <= cnum <= 22):\n",
+ " continue\n",
+ " if chrom_filter != 0 and cnum != chrom_filter:\n",
+ " continue\n",
+ " blocks.append((c, int(parts[1]), int(parts[2])))\n",
+ " if not blocks:\n",
+ " raise ValueError(f\"No blocks found for chrom={chrom_filter} in {bed}\")\n",
+ " return blocks\n",
+ "\n",
+ "_blocks = _read_blocks(ld_block_file, chrom)\n",
+ "print(f\" {len(_blocks)} LD blocks queued\")\n",
+ "\n",
+ "import tempfile\n",
+ "block_dir = tempfile.mkdtemp(prefix=\"sos_blocks_\")\n",
+ "\n",
+ "input: []\n",
+ "output: [f\"{block_dir}/{c}_{s}_{e}.block\" for c, s, e in _blocks]\n",
+ "python: expand = \"${ }\"\n",
+ " import os, json\n",
+ " blocks = ${_blocks!r}\n",
+ " block_dir = \"${block_dir}\"\n",
+ " for c, s, e in blocks:\n",
+ " path = os.path.join(block_dir, f\"{c}_{s}_{e}.block\")\n",
+ " with open(path, \"w\") as fh:\n",
+ " fh.write(f\"{c}\\n{s}\\n{e}\\n\")\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "a1b2c3d4-0031-4031-a031-000000000031",
+ "metadata": {},
+ "source": [
+ "## `process_block_2`\n",
+ "\n",
+ "Per-block parallel task. Reads VCF, computes U = WᵀG in chunks, scales to [0,2], writes `.dosage.gz`, `.map`, `.afreq`, `.meta`. Output is intermediate — cleaned up by `merge_chrom`.\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "a1b2c3d4-0030-4030-a030-000000000030",
+ "metadata": {
+ "kernel": "SoS"
+ },
+ "outputs": [],
+ "source": [
+ "[process_block_2]\n",
+ "parameter: vcf_base = str\n",
+ "parameter: vcf_prefix = str\n",
+ "parameter: cohort_id = \"ADSP.R5.EUR\"\n",
+ "parameter: output_dir = str\n",
+ "parameter: W_matrix = str\n",
+ "parameter: B = 10000\n",
+ "parameter: maf_min = 0.0005\n",
+ "parameter: mac_min = 5\n",
+ "parameter: msng_min = 0.05\n",
+ "parameter: sample_list = \"\"\n",
+ "input: output_from(\"process_block_1\"), group_by = 1\n",
+ "task: trunk_workers = 1, trunk_size = 1, walltime = walltime, mem = mem, cores = numThreads\n",
+ "python: expand = \"${ }\"\n",
+ "\n",
+ " import numpy as np\n",
+ " import os\n",
+ " import gzip\n",
+ " import sys\n",
+ " from math import nan\n",
+ " from cyvcf2 import VCF\n",
+ " from os import listdir\n",
+ "\n",
+ " # Read block coordinates from input file\n",
+ " with open(\"${_input}\") as fh:\n",
+ " lines = fh.read().splitlines()\n",
+ " chrm_str = lines[0]\n",
+ " block_start = int(lines[1])\n",
+ " block_end = int(lines[2])\n",
+ "\n",
+ " vcf_base = \"${vcf_base}\"\n",
+ " vcf_prefix = \"${vcf_prefix}\"\n",
+ " W_path = \"${W_matrix}\"\n",
+ " B = ${B}\n",
+ " maf_min = ${maf_min}\n",
+ " mac_min = ${mac_min}\n",
+ " msng_min = ${msng_min}\n",
+ " sample_list = \"${sample_list}\"\n",
+ " cohort_id = \"${cohort_id}\"\n",
+ " base_dir = \"${output_dir}\"\n",
+ "\n",
+ " block_tag = f\"{chrm_str}_{block_start}_{block_end}\"\n",
+ " output_dir = os.path.join(base_dir, chrm_str, block_tag)\n",
+ " os.makedirs(output_dir, exist_ok=True)\n",
+ "\n",
+ " log_path = os.path.join(output_dir, f\"{block_tag}.log\")\n",
+ " log_fh = open(log_path, \"w\")\n",
+ " sys.stdout = log_fh\n",
+ " sys.stderr = log_fh\n",
+ "\n",
+ " # ── Load sample subset (optional) ─────────────────────────────\n",
+ " sample_subset = None\n",
+ " if sample_list:\n",
+ " if not os.path.exists(sample_list):\n",
+ " raise FileNotFoundError(f\"sample_list not found: {sample_list}\")\n",
+ " with open(sample_list) as fh:\n",
+ " sample_subset = set(line.strip() for line in fh if line.strip())\n",
+ " print(f\" Sample subset: {len(sample_subset):,} samples\")\n",
+ "\n",
+ " # ── Helpers ───────────────────────────────────────────────────\n",
+ " def get_vcf_files(chrm_str):\n",
+ " files = sorted([\n",
+ " os.path.join(vcf_base, x)\n",
+ " for x in listdir(vcf_base)\n",
+ " if x.endswith(\".bgz\") and (\n",
+ " x.startswith(vcf_prefix + chrm_str + \":\") or\n",
+ " x.startswith(vcf_prefix + chrm_str + \".\")\n",
+ " )\n",
+ " ])\n",
+ " if not files:\n",
+ " raise FileNotFoundError(f\"No VCF files for {chrm_str} in {vcf_base}\")\n",
+ " return files\n",
+ "\n",
+ " def fill_missing_col_means(G):\n",
+ " col_means = np.nanmean(G, axis=0)\n",
+ " return np.where(np.isnan(G), col_means, G)\n",
+ "\n",
+ " # ── Step 1: Scan variants in block ────────────────────────────\n",
+ " print(f\"[1/3] Scanning {chrm_str} [{block_start:,}, {block_end:,}) ...\")\n",
+ " vcf_files = get_vcf_files(chrm_str)\n",
+ " region = f\"{chrm_str}:{block_start+1}-{block_end}\"\n",
+ " var_info = []\n",
+ " n_samples = None\n",
+ "\n",
+ " for vf in vcf_files:\n",
+ " vcf = VCF(vf)\n",
+ " if sample_subset is not None:\n",
+ " vcf_samples = vcf.samples\n",
+ " keep_idx = [i for i, s in enumerate(vcf_samples) if s in sample_subset]\n",
+ " if not keep_idx:\n",
+ " raise ValueError(f\"No sample_list samples in {os.path.basename(vf)}\")\n",
+ " vcf.set_samples([vcf_samples[i] for i in keep_idx])\n",
+ " if n_samples is None:\n",
+ " n_samples = len(vcf.samples)\n",
+ " for var in vcf(region):\n",
+ " if not (block_start <= var.POS - 1 < block_end):\n",
+ " continue\n",
+ " if len(var.ALT) != 1:\n",
+ " continue\n",
+ " dosage = [\n",
+ " sum(x[0:2])\n",
+ " for x in [[nan if v == -1 else v for v in gt]\n",
+ " for gt in var.genotypes]\n",
+ " ]\n",
+ " if np.nanvar(dosage) == 0:\n",
+ " continue\n",
+ " counts = [np.nansum([2 - x for x in dosage]), np.nansum(dosage)]\n",
+ " nan_count = int(np.sum(np.isnan(dosage)))\n",
+ " n_non_na = len(dosage) - nan_count\n",
+ " mac = min(counts)\n",
+ " maf = mac / n_non_na\n",
+ " af = float(np.nansum(dosage)) / (2 * n_non_na)\n",
+ " msng_rate = nan_count / (n_non_na + nan_count)\n",
+ " if maf < maf_min or mac < mac_min or msng_rate > msng_min:\n",
+ " continue\n",
+ " var_info.append({\n",
+ " \"chr\": var.CHROM, \"pos\": var.POS,\n",
+ " \"ref\": var.REF, \"alt\": var.ALT[0],\n",
+ " \"af\": round(float(af), 6),\n",
+ " \"id\": f\"{var.CHROM}:{var.POS}:{var.REF}:{var.ALT[0]}\",\n",
+ " })\n",
+ " vcf.close()\n",
+ " print(f\" {len(var_info):,} variants passing filters (n={n_samples:,})\")\n",
+ "\n",
+ " if not var_info:\n",
+ " raise ValueError(f\"No passing variants in {chrm_str} [{block_start:,}, {block_end:,})\")\n",
+ "\n",
+ " # ── Step 2: Load W ────────────────────────────────────────────\n",
+ " print(f\"[2/3] Loading W ...\")\n",
+ " W = np.load(W_path)\n",
+ " if W.shape != (n_samples, B):\n",
+ " raise ValueError(f\"W shape mismatch: {W.shape} vs ({n_samples},{B})\")\n",
+ " W = W.astype(np.float32)\n",
+ " print(f\" W: {W.shape}\")\n",
+ "\n",
+ " # ── Step 3: Compute U, write compressed dosage + map + afreq ──\n",
+ " print(f\"[3/3] Computing U and writing output files ...\")\n",
+ " var_lookup = {v[\"id\"]: v for v in var_info}\n",
+ " var_id_set = set(var_lookup)\n",
+ "\n",
+ " dosage_path = os.path.join(output_dir, f\"{cohort_id}.{block_tag}.dosage.gz\")\n",
+ " map_path = os.path.join(output_dir, f\"{cohort_id}.{block_tag}.map\")\n",
+ " afreq_path = os.path.join(output_dir, f\"{cohort_id}.{block_tag}.afreq\")\n",
+ " meta_path = os.path.join(output_dir, f\"{cohort_id}.{block_tag}.meta\")\n",
+ "\n",
+ " # Write .map\n",
+ " with open(map_path, \"w\") as fh:\n",
+ " for v in var_info:\n",
+ " fh.write(f\"{v['chr'].replace('chr','')}\\t{v['id']}\\t0\\t{v['pos']}\\n\")\n",
+ "\n",
+ " # Write .meta\n",
+ " with open(meta_path, \"w\") as fh:\n",
+ " fh.write(f\"source_n_samples={n_samples}\\nB={B}\\n\")\n",
+ " fh.write(f\"chrom={chrm_str}\\nblock_start={block_start}\\nblock_end={block_end}\\n\")\n",
+ "\n",
+ " # Write .afreq\n",
+ " obs_ct = 2 * n_samples\n",
+ " with open(afreq_path, \"w\") as fh:\n",
+ " fh.write(\"#CHROM\\tID\\tREF\\tALT\\tALT_FREQS\\tOBS_CT\\n\")\n",
+ " for v in var_info:\n",
+ " fh.write(f\"{v['chr'].replace('chr','')}\\t{v['id']}\\t{v['ref']}\\t{v['alt']}\\t\"\n",
+ " f\"{v['af']:.6f}\\t{obs_ct}\\n\")\n",
+ "\n",
+ " # Compute U and write compressed dosage (format=1: ID ALT REF val_S1 ... val_SB)\n",
+ " total = 0\n",
+ " with gzip.open(dosage_path, \"wt\", compresslevel=4) as gz:\n",
+ " for vf in vcf_files:\n",
+ " arr, var_keys = [], []\n",
+ " vcf_obj = VCF(vf)\n",
+ " for var in vcf_obj(region):\n",
+ " if not (block_start <= var.POS - 1 < block_end):\n",
+ " continue\n",
+ " if len(var.ALT) != 1:\n",
+ " continue\n",
+ " vid = f\"{var.CHROM}:{var.POS}:{var.REF}:{var.ALT[0]}\"\n",
+ " if vid not in var_id_set:\n",
+ " continue\n",
+ " dosage = [\n",
+ " sum(x[0:2])\n",
+ " for x in [[nan if v == -1 else v for v in gt]\n",
+ " for gt in var.genotypes]\n",
+ " ]\n",
+ " arr.append(dosage)\n",
+ " var_keys.append(vid)\n",
+ " vcf_obj.close()\n",
+ " if not arr:\n",
+ " continue\n",
+ " G_chunk = np.array(arr, dtype=np.float32).T\n",
+ " G_chunk = fill_missing_col_means(G_chunk)\n",
+ " U_chunk = W.T @ G_chunk\n",
+ " del G_chunk\n",
+ " col_min = U_chunk.min(axis=0)\n",
+ " col_max = U_chunk.max(axis=0)\n",
+ " denom = col_max - col_min\n",
+ " denom[denom == 0] = 1.0\n",
+ " U_chunk = 2.0 * (U_chunk - col_min) / denom\n",
+ " U_chunk = np.round(U_chunk, 4)\n",
+ " for j, vid in enumerate(var_keys):\n",
+ " v = var_lookup[vid]\n",
+ " vals = \" \".join(f\"{x:.4f}\" for x in U_chunk[:, j])\n",
+ " gz.write(f\"{vid} {v['alt']} {v['ref']} {vals}\\n\")\n",
+ " total += len(var_keys)\n",
+ " del U_chunk\n",
+ " print(f\" Written: {total:,} variants → {os.path.basename(dosage_path)}\")\n",
+ " print(f\" Written: {os.path.basename(map_path)}, {os.path.basename(afreq_path)}\")\n",
+ " print(f\"\\nDone: {chrm_str} [{block_start:,}, {block_end:,})\")\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "a1b2c3d4-0040-4040-a040-000000000040",
+ "metadata": {},
+ "source": [
+ "## `merge_chrom`\n",
+ "\n",
+ "For each chromosome: converts per-block `.dosage.gz` files to sorted pgen (two-pass: import then `--sort-vars`), merges all blocks into one per-chromosome pgen using `plink2 --pmerge-list`, concatenates allele frequencies, and removes all per-block intermediate directories. Output: `{cohort_id}.chr{N}.{pgen,pvar,psam,afreq}`.\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "a1b2c3d4-0041-4041-a041-000000000041",
+ "metadata": {
+ "kernel": "SoS"
+ },
+ "outputs": [],
+ "source": [
+ "[merge_chrom]\n",
+ "# Merge all per-block dosage files for a chromosome into one per-chrom pgen.\n",
+ "# Steps per chromosome:\n",
+ "# 1. For each block: plink2 --import-dosage → unsorted pgen, then --sort-vars → sorted pgen\n",
+ "# 2. plink2 --pmerge-list → one per-chrom pgen\n",
+ "# 3. Clean up all intermediates including -merge.* files\n",
+ "#\n",
+ "# Run after process_block completes (or run together: sos run ... process_block merge_chrom)\n",
+ "parameter: chrom = 0\n",
+ "parameter: output_dir = str\n",
+ "parameter: cohort_id = \"ADSP.R5.EUR\"\n",
+ "\n",
+ "import os, glob\n",
+ "\n",
+ "def _chroms_to_process(output_dir, chrom_filter):\n",
+ " if chrom_filter != 0:\n",
+ " return [f\"chr{chrom_filter}\"]\n",
+ " return sorted(set(\n",
+ " os.path.basename(d)\n",
+ " for d in glob.glob(os.path.join(output_dir, \"chr*\"))\n",
+ " if os.path.isdir(d)\n",
+ " ))\n",
+ "\n",
+ "_chroms = _chroms_to_process(output_dir, chrom)\n",
+ "\n",
+ "input: []\n",
+ "output: [f\"{output_dir}/{c}/{cohort_id}.{c}.pgen\" for c in _chroms]\n",
+ "\n",
+ "for _chrom in _chroms:\n",
+ " bash(f\"\"\"\n",
+ " set -euo pipefail\n",
+ " shopt -s nullglob\n",
+ "\n",
+ " chrom_dir=\"{output_dir}/{_chrom}\"\n",
+ " final_prefix=\"${{chrom_dir}}/{cohort_id}.{_chrom}\"\n",
+ " merge_list=\"${{chrom_dir}}/{cohort_id}.{_chrom}_pmerge_list.txt\"\n",
+ "\n",
+ " # ── Step 1: Convert each block dosage.gz → sorted per-block pgen (two-pass) ──\n",
+ " > \"${{merge_list}}\"\n",
+ " files=(\"${{chrom_dir}}\"/*/*.dosage.gz)\n",
+ " if [ ${{#files[@]}} -eq 0 ]; then\n",
+ " exit 1\n",
+ " fi\n",
+ " for dosage_gz in \"${{files[@]}}\"; do\n",
+ " block_dir=$(dirname \"${{dosage_gz}}\")\n",
+ " block_tag=$(basename \"${{block_dir}}\")\n",
+ " prefix=\"${{block_dir}}/{cohort_id}.${{block_tag}}_tmp\"\n",
+ " map_file=\"${{block_dir}}/{cohort_id}.${{block_tag}}.map\"\n",
+ " psam_file=\"${{block_dir}}/{cohort_id}.${{block_tag}}.psam\"\n",
+ " meta_file=\"${{block_dir}}/{cohort_id}.${{block_tag}}.meta\"\n",
+ " B=$(grep \"^B=\" \"${{meta_file}}\" | cut -d= -f2)\n",
+ " printf '#FID\\tIID\\n' > \"${{psam_file}}\"\n",
+ " for i in $(seq 1 ${{B}}); do\n",
+ " printf 'S%d\\tS%d\\n' ${{i}} ${{i}} >> \"${{psam_file}}\"\n",
+ " done\n",
+ " # Pass 1: import dosage → unsorted pgen\n",
+ " \"{plink2_bin}\" \\\n",
+ " --import-dosage \"${{dosage_gz}}\" format=1 noheader \\\n",
+ " --psam \"${{psam_file}}\" \\\n",
+ " --map \"${{map_file}}\" \\\n",
+ " --make-pgen \\\n",
+ " --out \"${{prefix}}_unsorted\" \\\n",
+ " --silent\n",
+ " # Pass 2: sort variants\n",
+ " \"{plink2_bin}\" \\\n",
+ " --pfile \"${{prefix}}_unsorted\" \\\n",
+ " --make-pgen \\\n",
+ " --sort-vars \\\n",
+ " --out \"${{prefix}}\" \\\n",
+ " --silent\n",
+ " rm -f \"${{prefix}}_unsorted.pgen\" \"${{prefix}}_unsorted.pvar\" \"${{prefix}}_unsorted.psam\"\n",
+ " echo \"${{prefix}}\" >> \"${{merge_list}}\"\n",
+ " done\n",
+ "\n",
+ " # ── Step 2: Merge all per-block pgens → one per-chrom pgen ──\n",
+ " \"{plink2_bin}\" \\\n",
+ " --pmerge-list \"${{merge_list}}\" pfile \\\n",
+ " --make-pgen \\\n",
+ " --sort-vars \\\n",
+ " --out \"${{final_prefix}}\"\n",
+ "\n",
+ " # ── Step 3: Concatenate .afreq ──\n",
+ " first=1\n",
+ " for f in \"${{chrom_dir}}\"/*/*.afreq; do\n",
+ " if [ \"${{first}}\" -eq 1 ]; then\n",
+ " cat \"${{f}}\" > \"${{final_prefix}}.afreq\"\n",
+ " first=0\n",
+ " else\n",
+ " tail -n +2 \"${{f}}\" >> \"${{final_prefix}}.afreq\"\n",
+ " fi\n",
+ " done\n",
+ "\n",
+ " # ── Step 4: Cleanup — remove merge intermediates and per-block dirs ──\n",
+ " rm -f \"${{merge_list}}\"\n",
+ " rm -f \"${{final_prefix}}-merge.pgen\" \"${{final_prefix}}-merge.pvar\" \"${{final_prefix}}-merge.psam\"\n",
+ " for block_dir in \"${{chrom_dir}}\"/*/; do\n",
+ " rm -rf \"${{block_dir}}\"\n",
+ " done\n",
+ " \"\"\")"
+ ]
+ }
+ ],
+ "metadata": {
+ "kernelspec": {
+ "display_name": "Python 3 (ipykernel)",
+ "language": "python",
+ "name": "python3"
+ },
+ "language_info": {
+ "codemirror_mode": {
+ "name": "ipython",
+ "version": 3
+ },
+ "file_extension": ".py",
+ "mimetype": "text/x-python",
+ "name": "python",
+ "nbconvert_exporter": "python",
+ "pygments_lexer": "ipython3",
+ "version": "3.12.13"
+ },
+ "sos": {
+ "kernels": [
+ [
+ "SoS",
+ "sos",
+ "sos",
+ "",
+ ""
+ ]
+ ],
+ "version": ""
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 5
+}