RSS LD Sketch Pipeline#
Overview#
This pipeline generates a stochastic genotype sample U = \(W^T G\) from whole-genome sequencing VCF files and stores it as a PLINK2 pgen file for use as an LD reference panel with SuSiE-RSS fine-mapping.
Key idea: Rather than storing the full genotype matrix G (n x p, ~530 MB/block), we compute U = \(W^T G\) (B x p, ~320 MB/block) using a random projection matrix \(W \sim N(0, 1/\sqrt{n})\). The approximate LD matrix R = \(U^T U / B \approx R = G^T G / n\) by the Johnson-Lindenstrauss lemma. G is never stored.
Matrix dimensions:
G : \((n \times p)\) – \(n\) individuals \(\times\) \(p\) variants (~8,000 per block)
W : \((n \times B)\) – projection matrix, \(W \sim N(0, 1/\sqrt{n})\), generated once per cohort
U : \((B \times p)\) – stochastic genotype sample = \(W^T G\), stored in pgen
\(\hat{R}\) : \((p \times p)\) – approximate LD matrix, computed on-the-fly by SuSiE-RSS from U
Pipeline steps:
generate_W– generate W once from cohort sample size n (saved as.npy, shared across all chromosomes)process_block– for each LD block in parallel: load VCF, filter variants, compute U = \(W^T G\), scale to [0,2], write compressed dosage + allele frequenciesmerge_chrom– for each chromosome in parallel: convert per-block dosage files to sorted pgen, merge into one per-chromosome pgen, clean up all intermediates
Input Files#
File |
Description |
|---|---|
VCF (.bgz) |
Per-chromosome VCF files with tabix index |
|
Optional. One sample ID per line. Leave empty to use all samples. |
|
LD block BED file (0-based half-open, e.g. Berisa & Pickrell EUR blocks) |
Parameters#
Global parameters:
Parameter |
Default |
Description |
|---|---|---|
|
|
Per-task walltime |
|
|
Per-task memory |
|
|
Per-task CPU cores |
generate_W parameters:
Parameter |
Default |
Description |
|---|---|---|
|
required |
Total number of individuals in the cohort |
|
required |
Output directory for |
|
|
Sketch dimension (number of pseudo-individuals) |
|
|
Random seed |
process_block parameters:
Parameter |
Default |
Description |
|---|---|---|
|
required |
Directory containing VCF (.bgz) files |
|
required |
Shared filename prefix of all VCF files |
|
required |
Path to LD block BED file |
|
|
Chromosome 1-22, or 0 for all chromosomes |
|
|
Cohort prefix for output filenames |
|
required |
Base output directory |
|
required |
Path to W |
|
|
Must match W |
|
|
Optional path to sample ID file |
|
|
Minimum minor allele frequency |
|
|
Minimum minor allele count |
|
|
Maximum missingness rate per variant |
merge_chrom parameters:
Parameter |
Default |
Description |
|---|---|---|
|
|
Chromosome 1-22, or 0 for all chromosomes |
|
required |
Base output directory (same as |
|
|
Must match |
|
|
Path to plink2 binary |
Output Files#
File |
Description |
|---|---|
|
Projection matrix W, shape \((n \times B)\). Output of |
|
Final output – per-chromosome stochastic genotype sample U (plink2 native) |
|
Per-chromosome variant information (zstd compressed, plink2 native) |
|
Pseudo-individual IDs S1…S{B} |
|
Per-chromosome allele frequencies (OBS_CT = 2 x non-missing samples per variant) |
(cleaned up after merge) |
Per-block |
Step 0 – Generate W (run once per cohort)#
sos run pipeline/rss_ld_sketch.ipynb generate_W \
--n-samples 16571 \
--output-dir output/rss_ld_sketch \
--B 10000 \
--seed 123
Step 1 – Process all blocks (parallelized)#
# Process one chromosome
sos run pipeline/rss_ld_sketch.ipynb process_block \
--vcf-base /path/to/vcfs/ \
--vcf-prefix your.prefix. \
--ld-block-file /path/to/EUR_LD_blocks.bed \
--cohort-id ADSP.R4.EUR \
--chrom 22 \
--output-dir output/rss_ld_sketch \
--W-matrix output/rss_ld_sketch/W_B10000.npy \
--mem 4G \
-J 16
# Process all 22 chromosomes
sos run pipeline/rss_ld_sketch.ipynb process_block \
--vcf-base /restricted/projectnb/xqtl/R4_QC_NHWonly_rm_monomorphic \
--vcf-prefix gcad.qc.r4.wgs.36361.GATK.2022.08.15.biallelic.genotypes. \
--ld-block-file /restricted/projectnb/casa/oaolayin/ROSMAP_NIA_geno/EUR_LD_blocks.bed \
--cohort-id ADSP.R4.EUR \
--chrom 0 \
--output-dir output/rss_ld_sketch \
--W-matrix output/rss_ld_sketch/W_B10000.npy \
--mem 4G \
-J 126
Step 2 – Merge per-block dosage into per-chromosome pgen (parallelized)#
sos run pipeline/rss_ld_sketch.ipynb merge_chrom \
--output-dir output/rss_ld_sketch \
--cohort-id ADSP.R4.EUR \
--chrom 0 \
-s force
Command Interface#
sos run pipeline/rss_ld_sketch.ipynb -h
usage: sos run pipeline/rss_ld_sketch.ipynb
[workflow_name | -t targets] [options] [workflow_options]
workflow_name: Single or combined workflows defined in this script
options: Single-hyphen sos parameters (see "sos run -h" for details)
workflow_options: Double-hyphen workflow-specific parameters
Workflows:
generate_W
process_block
merge_chrom
Global Workflow Options:
--walltime 24:00:00
--mem 32G
--numThreads 8 (as int)
--plink2-bin plink2
Sections
generate_W:
Workflow Options:
--n-samples VAL (as int, required)
Generate projection matrix $W \sim N(0, 1/\sqrt{n})$, shape $(n \times B)$.
Run ONCE per cohort before any chromosome processing.
W depends only on n (sample size) and B -- not on variant data.
All 22 chromosomes reuse the same W.
--output-dir VAL (as str, required)
--B 10000 (as int)
--seed 123 (as int)
process_block:
Workflow Options:
--vcf-base VAL (as str, required)
Directory containing VCF (.bgz) files, one or more per chromosome.
--vcf-prefix VAL (as str, required)
Shared filename prefix of all VCF files (everything before "chr{N}:" or "chr{N}.").
--ld-block-file VAL (as str, required)
Path to LD block BED file (e.g. EUR_LD_blocks.bed).
--chrom 0 (as int)
Chromosome 1-22, or 0 for all chromosomes.
--cohort-id ADSP.R5.EUR (as str)
Cohort prefix for output filenames.
--output-dir VAL (as str, required)
--W-matrix VAL (as str, required)
--B 10000 (as int)
--maf-min 0.0005 (as float)
--mac-min 5 (as int)
--msng-min 0.05 (as float)
--sample-list "" (optional path to sample ID file)
Output per block (intermediate, cleaned up by merge_chrom):
{cohort_id}.{chr}_{start}_{end}.dosage.gz
{cohort_id}.{chr}_{start}_{end}.map
{cohort_id}.{chr}_{start}_{end}.afreq
{cohort_id}.{chr}_{start}_{end}.meta
merge_chrom:
Workflow Options:
--chrom 0 (as int)
Chromosome 1-22, or 0 for all chromosomes.
--cohort-id ADSP.R5.EUR (as str)
Must match cohort-id used in process_block.
--output-dir VAL (as str, required)
Output per chromosome (final, block intermediates cleaned up):
{cohort_id}.chr{N}.pgen
{cohort_id}.chr{N}.pvar
{cohort_id}.chr{N}.psam
{cohort_id}.chr{N}.afreq
Pipeline Implementation#
[global]
parameter: cwd = path("output")
parameter: job_size = 1
parameter: walltime = "24:00:00"
parameter: mem = "32G"
parameter: numThreads = 8
cwd = path(f'{cwd:a}')
generate_W#
Generates the random projection matrix \(W \sim N(0, 1/\sqrt{n})\), shape \((n \times B)\). Run once per cohort. W is shared across all chromosomes.
[generate_W]
# Generate projection matrix $W \sim N(0, 1/\sqrt{n})$, shape (n x B).
# Run ONCE before processing any chromosome.
#
# W depends only on n (total sample size) and B -- not on any variant data.
# n_samples is passed directly as a parameter; no VCF reading is needed.
# All 22 chromosomes reuse the same W so that per-chromosome stochastic
# genotype samples can be arithmetically merged for meta-analysis.
parameter: n_samples = int
parameter: output_dir = str
parameter: B = 10000
parameter: seed = 123
import os
input: []
output: f'{output_dir}/W_B{B}.npy'
task: trunk_workers = 1, trunk_size = 1, walltime = '00:05:00', mem = '4G', cores = 1
python: expand = "${ }", stdout = f'{_output:n}.stdout', stderr = f'{_output:n}.stderr'
import numpy as np
import os
n = ${n_samples}
B = ${B}
seed = ${seed}
W_out = "${_output}"
# -- Generate $W \sim N(0, 1/\sqrt{n})$ -----------------------------
# Convention: W = np.random.normal(0, 1/np.sqrt(n), size=(n, B))
# W is shared across all chromosomes -- do not regenerate per chromosome.
print(f"Generating W ~ N(0, 1/sqrt({n})), shape ({n}, {B}), seed={seed}")
np.random.seed(seed)
W = np.random.normal(0, 1.0 / np.sqrt(n), size=(n, B)).astype(np.float32)
os.makedirs(os.path.dirname(os.path.abspath(W_out)), exist_ok=True)
np.save(W_out, W)
print(f"Saved: {W_out}")
print(f"Shape: {W.shape}, size: {os.path.getsize(W_out)/1e9:.2f} GB")
process_block#
Reads the LD block BED file and processes each block in parallel (via for_each).
For each block: reads VCF in a single pass, filters variants by MAF/MAC/missingness,
computes U = \(W^T G\), scales to [0,2], and writes .dosage.gz, .map, .afreq, .meta.
G is never stored. Output is intermediate – cleaned up by merge_chrom.
[process_block]
parameter: ld_block_file = str
parameter: chrom = 0
parameter: vcf_base = str
parameter: vcf_prefix = str
parameter: cohort_id = "ADSP.R5.EUR"
parameter: output_dir = str
parameter: W_matrix = str
parameter: B = 10000
parameter: maf_min = 0.0005
parameter: mac_min = 5
parameter: msng_min = 0.05
parameter: sample_list = ""
import os
def _read_blocks(bed, chrom_filter):
blocks = []
with open(bed) as fh:
for line in fh:
if line.startswith("#") or not line.strip():
continue
parts = line.split()
c = parts[0]
if not (c.startswith("chr") and c[3:].isdigit()):
continue
cnum = int(c[3:])
if not (1 <= cnum <= 22):
continue
if chrom_filter != 0 and cnum != chrom_filter:
continue
blocks.append({"chr": c, "start": int(parts[1]), "end": int(parts[2])})
if not blocks:
raise ValueError(f"No blocks found for chrom={chrom_filter} in {bed}")
return blocks
blocks = _read_blocks(ld_block_file, chrom)
print(f" {len(blocks)} LD blocks queued")
input: for_each = "blocks"
output: f'{output_dir}/{_blocks["chr"]}/{_blocks["chr"]}_{_blocks["start"]}_{_blocks["end"]}/{cohort_id}.{_blocks["chr"]}_{_blocks["start"]}_{_blocks["end"]}.dosage.gz'
task: trunk_workers = 1, trunk_size = 1, walltime = walltime, mem = mem, cores = numThreads
python: expand = "${ }"
import numpy as np
import os
import gzip
import sys
import atexit
from math import nan
from cyvcf2 import VCF
from os import listdir
# Block coordinates from for_each loop
chrm_str = "${_blocks['chr']}"
block_start = ${_blocks["start"]}
block_end = ${_blocks["end"]}
vcf_base = "${vcf_base}"
vcf_prefix = "${vcf_prefix}"
W_path = "${W_matrix}"
B = ${B}
maf_min = ${maf_min}
mac_min = ${mac_min}
msng_min = ${msng_min}
sample_list = "${sample_list}"
cohort_id = "${cohort_id}"
base_dir = "${output_dir}"
block_tag = f"{chrm_str}_{block_start}_{block_end}"
output_dir = os.path.join(base_dir, chrm_str, block_tag)
os.makedirs(output_dir, exist_ok=True)
log_path = os.path.join(output_dir, f"{block_tag}.log")
log_fh = open(log_path, "w")
sys.stdout = log_fh
sys.stderr = log_fh
atexit.register(log_fh.close)
# -- Load sample subset (optional) -----------------------------
sample_subset = None
if sample_list:
if not os.path.exists(sample_list):
raise FileNotFoundError(f"sample_list not found: {sample_list}")
with open(sample_list) as fh:
sample_subset = set(line.strip() for line in fh if line.strip())
print(f" Sample subset: {len(sample_subset):,} samples")
# -- Helpers ---------------------------------------------------
def get_vcf_files(chrm_str):
files = sorted([
os.path.join(vcf_base, x)
for x in listdir(vcf_base)
if x.endswith(".bgz") and (
x.startswith(vcf_prefix + chrm_str + ":") or
x.startswith(vcf_prefix + chrm_str + ".")
)
])
if not files:
raise FileNotFoundError(f"No VCF files for {chrm_str} in {vcf_base}")
return files
def open_vcf(vf, sample_subset):
"""Open a VCF file, applying sample subset if provided."""
vcf = VCF(vf)
if sample_subset is not None:
vcf_samples = vcf.samples
keep = [s for s in vcf_samples if s in sample_subset]
if not keep:
raise ValueError(f"No sample_list samples in {os.path.basename(vf)}")
vcf.set_samples(keep)
return vcf
def extract_dosage(var):
"""Extract diploid dosage from a cyvcf2 variant. Returns list of floats (nan for missing)."""
return [sum(x[0:2]) for x in [[nan if v == -1 else v for v in gt] for gt in var.genotypes]]
def fill_missing_col_means(G):
col_means = np.nanmean(G, axis=0)
return np.where(np.isnan(G), col_means, G)
# -- Single-pass: scan variants, filter, and collect dosages ---
# BED is 0-based half-open [start, end); VCF is 1-based.
print(f"[1/3] Scanning {chrm_str} [{block_start:,}, {block_end:,}) ...")
vcf_files = get_vcf_files(chrm_str)
region = f"{chrm_str}:{block_start+1}-{block_end}"
var_info = []
dosage_matrix = []
n_samples = None
# Filter counters
n_total = 0
n_multiallelic = 0
n_monomorphic = 0
n_all_na = 0
n_low_maf = 0
n_low_mac = 0
n_high_msng = 0
for vf in vcf_files:
vcf = open_vcf(vf, sample_subset)
if n_samples is None:
n_samples = len(vcf.samples)
for var in vcf(region):
if not (block_start <= var.POS - 1 < block_end):
continue
n_total += 1
if len(var.ALT) != 1:
n_multiallelic += 1
continue
dosage = extract_dosage(var)
if np.nanvar(dosage) == 0:
n_monomorphic += 1
continue
nan_count = int(np.sum(np.isnan(dosage)))
n_non_na = len(dosage) - nan_count
if n_non_na == 0:
n_all_na += 1
continue
alt_sum = float(np.nansum(dosage))
mac = min(2 * n_non_na - alt_sum, alt_sum)
maf = mac / (2 * n_non_na)
af = alt_sum / (2 * n_non_na)
msng_rate = nan_count / len(dosage)
if msng_rate > msng_min:
n_high_msng += 1
continue
if maf < maf_min:
n_low_maf += 1
continue
if mac < mac_min:
n_low_mac += 1
continue
var_info.append({
"chr": var.CHROM, "pos": var.POS,
"ref": var.REF, "alt": var.ALT[0],
"af": round(float(af), 6),
"id": f"{var.CHROM}:{var.POS}:{var.REF}:{var.ALT[0]}",
"obs_ct": 2 * n_non_na,
})
dosage_matrix.append(dosage)
vcf.close()
n_passed = len(var_info)
print(f" {n_total:,} total variants in region")
print(f" {n_passed:,} passed filters (n={n_samples:,})")
print(f" Filtered: {n_multiallelic:,} multiallelic, "
f"{n_monomorphic:,} monomorphic, {n_all_na:,} all-NA, "
f"{n_high_msng:,} high-missingness, "
f"{n_low_maf:,} low-MAF, {n_low_mac:,} low-MAC")
if not var_info:
raise ValueError(f"No passing variants in {chrm_str} [{block_start:,}, {block_end:,})")
# -- Load W ----------------------------------------------------
print(f"[2/3] Loading W ...")
W = np.load(W_path)
if W.shape != (n_samples, B):
raise ValueError(f"W shape mismatch: {W.shape} vs ({n_samples},{B})")
W = W.astype(np.float32)
print(f" W: {W.shape}")
# -- Compute U = $W^T G$ and write output files --------------------
print(f"[3/3] Computing U and writing output files ...")
dosage_path = os.path.join(output_dir, f"{cohort_id}.{block_tag}.dosage.gz")
map_path = os.path.join(output_dir, f"{cohort_id}.{block_tag}.map")
afreq_path = os.path.join(output_dir, f"{cohort_id}.{block_tag}.afreq")
meta_path = os.path.join(output_dir, f"{cohort_id}.{block_tag}.meta")
# Write .map
with open(map_path, "w") as fh:
for v in var_info:
fh.write(f"{v['chr']}\t{v['id']}\t0\t{v['pos']}\n")
# Write .meta
with open(meta_path, "w") as fh:
fh.write(f"source_n_samples={n_samples}\nB={B}\n")
fh.write(f"chrom={chrm_str}\nblock_start={block_start}\nblock_end={block_end}\n")
fh.write(f"n_total={n_total}\nn_passed={n_passed}\n")
fh.write(f"n_multiallelic={n_multiallelic}\nn_monomorphic={n_monomorphic}\n")
fh.write(f"n_all_na={n_all_na}\nn_high_msng={n_high_msng}\n")
fh.write(f"n_low_maf={n_low_maf}\nn_low_mac={n_low_mac}\n")
# Build G from collected dosages, compute U = $W^T G$, write dosage.gz
# Dosage format=1: ID ALT REF val_S1 ... val_SB
# Min-max scaling to [0, 2] makes the output plink2-compatible as dosage.
# This preserves correlation structure (cor is scale-invariant) which is
# what matters for LD computation downstream.
G = np.array(dosage_matrix, dtype=np.float32).T # (n_samples, n_variants)
del dosage_matrix
G = fill_missing_col_means(G)
# variant-wise scaling
col_mean = G.mean(axis=0, keepdims=True)
col_std = G.std(axis=0, keepdims=True)
# avoid division by zero
col_std[col_std == 0] = 1.0
G = (G - col_mean) / col_std
U = W.T @ G # (B, n_variants)
del G
col_min = U.min(axis=0)
col_max = U.max(axis=0)
denom = col_max - col_min
denom[denom == 0] = 1.0
U = 2.0 * (U - col_min) / denom
U = np.round(U, 4)
# Record the col min and max for U
with open(afreq_path, "w") as fh:
# Add column headers
fh.write("#CHROM\tID\tREF\tALT\tALT_FREQS\tOBS_CT\tU_MIN\tU_MAX\n")
for j, v in enumerate(var_info):
fh.write(f"{v['chr']}\t{v['id']}\t{v['ref']}\t{v['alt']}\t"
f"{v['af']:.6f}\t{v['obs_ct']}\t"
f"{col_min[j]:.6f}\t{col_max[j]:.6f}\n")
with gzip.open(dosage_path, "wt", compresslevel=4) as gz:
for j, v in enumerate(var_info):
vals = " ".join(f"{x:.4f}" for x in U[:, j])
gz.write(f"{v['id']} {v['alt']} {v['ref']} {vals}\n")
del U
print(f" Written: {len(var_info):,} variants -> {os.path.basename(dosage_path)}")
print(f" Written: {os.path.basename(map_path)}, {os.path.basename(afreq_path)}")
print(f"\nDone: {chrm_str} [{block_start:,}, {block_end:,})")
merge_chrom#
For each chromosome (parallelized via for_each): converts per-block .dosage.gz files
to sorted pgen (two-pass: import then --sort-vars), merges all blocks into one
per-chromosome pgen using plink2 --pmerge-list, concatenates allele frequencies,
and removes all per-block intermediate directories.
Output per chromosome:
{cohort_id}.chr{N}.pgen– binary genotype data (uncompressed – plink2 does not support .pgen compression){cohort_id}.chr{N}.pvar.zst– variant info, zstd-compressed via--make-pgen vzs{cohort_id}.chr{N}.psam– sample info{cohort_id}.chr{N}.afreq– allele frequencies
[merge_chrom]
parameter: chrom = 0
parameter: output_dir = str
parameter: cohort_id = str
parameter: plink2_bin = "plink2"
import os, glob
def _chroms_to_process(output_dir, chrom_filter):
if chrom_filter != 0:
return [f"chr{chrom_filter}"]
return sorted(set(
os.path.basename(d)
for d in glob.glob(os.path.join(output_dir, "chr*"))
if os.path.isdir(d)
))
chroms = _chroms_to_process(output_dir, chrom)
input: for_each = "chroms"
output: f"{output_dir}/{_chroms}/{cohort_id}.{_chroms}.pgen"
task: trunk_workers = 1, trunk_size = 1, walltime = walltime, mem = mem, cores = numThreads
bash: expand = "$[ ]"
set -euo pipefail
shopt -s nullglob
chrom_dir="$[output_dir]/$[_chroms]"
final_prefix="${chrom_dir}/$[cohort_id].$[_chroms]"
merge_list="${chrom_dir}/$[cohort_id].$[_chroms]_pmerge_list.txt"
# Step 1: Convert each block dosage.gz -> sorted per-block pgen
> "${merge_list}"
files=("${chrom_dir}"/*/*.dosage.gz)
if [ ${#files[@]} -eq 0 ]; then
echo "No dosage files found in ${chrom_dir}" >&2
exit 1
fi
for dosage_gz in "${files[@]}"; do
block_dir=$(dirname "${dosage_gz}")
block_tag=$(basename "${block_dir}")
prefix="${block_dir}/$[cohort_id].${block_tag}_tmp"
map_file="${block_dir}/$[cohort_id].${block_tag}.map"
psam_file="${block_dir}/$[cohort_id].${block_tag}.psam"
meta_file="${block_dir}/$[cohort_id].${block_tag}.meta"
B=$(grep "^B=" "${meta_file}" | cut -d= -f2)
printf '#FID\tIID\n' > "${psam_file}"
for i in $(seq 1 ${B}); do
printf 'S%d\tS%d\n' ${i} ${i} >> "${psam_file}"
done
$[plink2_bin] \
--import-dosage "${dosage_gz}" format=1 noheader \
--psam "${psam_file}" \
--map "${map_file}" \
--make-pgen \
--out "${prefix}_unsorted" \
--silent
$[plink2_bin] \
--pfile "${prefix}_unsorted" \
--make-pgen \
--sort-vars \
--out "${prefix}" \
--silent
rm -f "${prefix}_unsorted.pgen" "${prefix}_unsorted.pvar" "${prefix}_unsorted.psam"
echo "${prefix}" >> "${merge_list}"
done
# Step 2: Merge all per-block pgens -> one per-chrom pgen
$[plink2_bin] \
--pmerge-list "${merge_list}" pfile \
--make-pgen vzs \
--sort-vars \
--out "${final_prefix}"
# Step 3: Concatenate .afreq
first=1
for f in "${chrom_dir}"/*/*.afreq; do
if [ "${first}" -eq 1 ]; then
cat "${f}" > "${final_prefix}.afreq"
first=0
else
tail -n +2 "${f}" >> "${final_prefix}.afreq"
fi
done
# Step 4: Add U_MIN and U_MAX to the final .pvar file
# Extract ID, MIN, and MAX from the concatenated .afreq (columns 2, 7, 8)
awk 'NR > 1 {print $2, $7, $8}' "${final_prefix}.afreq" > "${final_prefix}.recovery_params"
# Decompress the .pvar.zst
$[plink2_bin] --zst-decompress "${final_prefix}.pvar.zst" > "${final_prefix}.pvar.tmp"
awk 'NR==FNR {min[$1]=$2; max[$1]=$3; next}
/^#/ {print $0 "\tU_MIN\tU_MAX"; next}
{if ($3 in min) print $0 "\t" min[$3] "\t" max[$3]; else print $0 "\tNA\tNA"}' \
"${final_prefix}.recovery_params" "${final_prefix}.pvar.tmp" > "${final_prefix}.pvar"
# Replace the compressed file with the updated text file
rm "${final_prefix}.pvar.tmp" "${final_prefix}.pvar.zst" "${final_prefix}.recovery_params"
R: expand = "$[ ]"
library(data.table)
meta_files <- list.files("$[output_dir]/$[_chroms]",
pattern = "[.]meta$", recursive = TRUE,
full.names = TRUE)
if (length(meta_files) > 0) {
fields <- c("n_total", "n_passed", "n_multiallelic", "n_monomorphic",
"n_all_na", "n_high_msng", "n_low_maf", "n_low_mac")
stats <- rbindlist(lapply(meta_files, function(f) {
lines <- grep("^n_", readLines(f), value = TRUE)
kv <- strsplit(lines, "=")
vals <- setNames(as.integer(sapply(kv, `[`, 2)), sapply(kv, `[`, 1))
as.data.table(as.list(vals[fields]))
}))
totals <- colSums(stats, na.rm = TRUE)
summary <- data.frame(t(totals))
summary$pct_dropped <- round(100 * (1 - summary$n_passed / summary$n_total), 1)
cat("\n=== Filter Summary for $[_chroms] ===\n")
print(data.frame(value = unlist(summary), row.names = names(summary)))
}
bash: expand = "$[ ]"
# Step 5: Cleanup block intermediates
chrom_dir="$[output_dir]/$[_chroms]"
final_prefix="${chrom_dir}/$[cohort_id].$[_chroms]"
rm -f "${final_prefix}_pmerge_list.txt"
rm -f "${final_prefix}-merge.pgen" "${final_prefix}-merge.pvar" "${final_prefix}-merge.psam"
for block_dir in "${chrom_dir}"/*/; do
rm -rf "${block_dir}"
done
Step 4 – Load LD sketch data into R#
Load the final pgen output into R using pecotmr::load_LD_matrix() via a metadata TSV.
Metadata TSV format for PLINK-based LD: one row per chromosome with start=0, end=0, and path as the PLINK file prefix (resolved relative to the TSV directory).
#chrom start end path
22 0 0 ADSP.R5.EUR.chr22
library(pecotmr)
meta <- "/path/to/ld_meta_chr22.tsv"
region <- "chr22:30000000-32000000" # 2Mb region
# Load genotype matrix X (for susie_rss z+X interface)
system.time({
res_x <- load_LD_matrix(meta, region, return_genotype = TRUE)
})
cat("X dimensions (samples x variants):", dim(res_x$LD_matrix), "\n")
head(res_x$ref_panel, 3)
res_x$LD_matrix[1:3, 1:3]
# Load LD correlation matrix R (for susie_rss z+R interface)
system.time({
res_r <- load_LD_matrix(meta, region, return_genotype = FALSE)
})
cat("R dimensions:", dim(res_r$LD_matrix), "\n")
res_r$LD_matrix[1:3, 1:3]
Tested output (ADSP R5 EUR chr22, 10000 pseudo-samples, 2Mb region, 9188 variants):
# Genotype matrix X
user system elapsed
0.680 0.170 0.900
X dimensions (samples x variants): 10000 9188
chrom pos A2 A1 variant_id allele_freq
1 22 30000353 G T chr22:30000353:G:T 0.734539
2 22 30000443 C T chr22:30000443:C:T 0.001058
3 22 30000447 C T chr22:30000447:C:T 0.005081
chr22:30000353:G:T chr22:30000443:C:T chr22:30000447:C:T
S1 0.6563110 0.7753906 1.065613
S2 0.7034302 0.3729858 0.805481
S3 0.8706055 1.1093750 1.046509
# LD correlation matrix R
user system elapsed
6.390 0.410 6.880
R dimensions: 9188 9188
chr22:30000353:G:T chr22:30000443:C:T chr22:30000447:C:T
chr22:30000353:G:T 1.00000000 0.052049420 0.096841034
chr22:30000443:C:T 0.05204942 1.000000000 -0.007827702
chr22:30000447:C:T 0.09684103 -0.007827702 1.000000000
Use return_genotype = TRUE for the susie_rss(z, X=X) interface, or FALSE for the susie_rss(z, R=R) interface.
For repeated analyses on the same region, compute R once and save with saveRDS() for reuse.