RSS LD Sketch Pipeline#

Overview#

This pipeline generates a stochastic genotype sample U = \(W^T G\) from whole-genome sequencing VCF files and stores it as a PLINK2 pgen file for use as an LD reference panel with SuSiE-RSS fine-mapping.

Key idea: Rather than storing the full genotype matrix G (n x p, ~530 MB/block), we compute U = \(W^T G\) (B x p, ~320 MB/block) using a random projection matrix \(W \sim N(0, 1/\sqrt{n})\). The approximate LD matrix R = \(U^T U / B \approx R = G^T G / n\) by the Johnson-Lindenstrauss lemma. G is never stored.

Matrix dimensions:

  • G : \((n \times p)\)\(n\) individuals \(\times\) \(p\) variants (~8,000 per block)

  • W : \((n \times B)\) – projection matrix, \(W \sim N(0, 1/\sqrt{n})\), generated once per cohort

  • U : \((B \times p)\) – stochastic genotype sample = \(W^T G\), stored in pgen

  • \(\hat{R}\) : \((p \times p)\) – approximate LD matrix, computed on-the-fly by SuSiE-RSS from U

Pipeline steps:

  1. generate_W – generate W once from cohort sample size n (saved as .npy, shared across all chromosomes)

  2. process_block – for each LD block in parallel: load VCF, filter variants, compute U = \(W^T G\), scale to [0,2], write compressed dosage + allele frequencies

  3. merge_chrom – for each chromosome in parallel: convert per-block dosage files to sorted pgen, merge into one per-chromosome pgen, clean up all intermediates

Input Files#

File

Description

VCF (.bgz)

Per-chromosome VCF files with tabix index

sample_list.txt

Optional. One sample ID per line. Leave empty to use all samples.

EUR_LD_blocks.bed

LD block BED file (0-based half-open, e.g. Berisa & Pickrell EUR blocks)

Parameters#

Global parameters:

Parameter

Default

Description

walltime

24:00:00

Per-task walltime

mem

32G

Per-task memory

numThreads

8

Per-task CPU cores

generate_W parameters:

Parameter

Default

Description

n_samples

required

Total number of individuals in the cohort

output_dir

required

Output directory for W_B{B}.npy

B

10000

Sketch dimension (number of pseudo-individuals)

seed

123

Random seed

process_block parameters:

Parameter

Default

Description

vcf_base

required

Directory containing VCF (.bgz) files

vcf_prefix

required

Shared filename prefix of all VCF files

ld_block_file

required

Path to LD block BED file

chrom

0

Chromosome 1-22, or 0 for all chromosomes

cohort_id

ADSP.R5.EUR

Cohort prefix for output filenames

output_dir

required

Base output directory

W_matrix

required

Path to W .npy output from generate_W

B

10000

Must match W

sample_list

""

Optional path to sample ID file

maf_min

0.0005

Minimum minor allele frequency

mac_min

5

Minimum minor allele count

msng_min

0.05

Maximum missingness rate per variant

merge_chrom parameters:

Parameter

Default

Description

chrom

0

Chromosome 1-22, or 0 for all chromosomes

output_dir

required

Base output directory (same as process_block)

cohort_id

ADSP.R5.EUR

Must match process_block cohort_id

plink2_bin

plink2

Path to plink2 binary

Output Files#

File

Description

W_B{B}.npy

Projection matrix W, shape \((n \times B)\). Output of generate_W.

{cohort_id}.chr{N}.pgen

Final output – per-chromosome stochastic genotype sample U (plink2 native)

{cohort_id}.chr{N}.pvar.zst

Per-chromosome variant information (zstd compressed, plink2 native)

{cohort_id}.chr{N}.psam

Pseudo-individual IDs S1…S{B}

{cohort_id}.chr{N}.afreq

Per-chromosome allele frequencies (OBS_CT = 2 x non-missing samples per variant)

(cleaned up after merge)

Per-block .dosage.gz, .map, .afreq, .meta intermediate files

Step 0 – Generate W (run once per cohort)#

sos run pipeline/rss_ld_sketch.ipynb generate_W \
    --n-samples 16571 \
    --output-dir output/rss_ld_sketch \
    --B 10000 \
    --seed 123

Step 1 – Process all blocks (parallelized)#

# Process one chromosome
sos run pipeline/rss_ld_sketch.ipynb process_block \
    --vcf-base /path/to/vcfs/ \
    --vcf-prefix your.prefix. \
    --ld-block-file /path/to/EUR_LD_blocks.bed \
    --cohort-id ADSP.R4.EUR \
    --chrom 22 \
    --output-dir output/rss_ld_sketch \
    --W-matrix output/rss_ld_sketch/W_B10000.npy \
    --mem 4G \
    -J 16
# Process all 22 chromosomes
sos run pipeline/rss_ld_sketch.ipynb process_block \
    --vcf-base /restricted/projectnb/xqtl/R4_QC_NHWonly_rm_monomorphic \
    --vcf-prefix gcad.qc.r4.wgs.36361.GATK.2022.08.15.biallelic.genotypes. \
    --ld-block-file /restricted/projectnb/casa/oaolayin/ROSMAP_NIA_geno/EUR_LD_blocks.bed \
    --cohort-id ADSP.R4.EUR \
    --chrom 0 \
    --output-dir output/rss_ld_sketch \
    --W-matrix output/rss_ld_sketch/W_B10000.npy \
    --mem 4G \
    -J 126

Step 2 – Merge per-block dosage into per-chromosome pgen (parallelized)#

sos run pipeline/rss_ld_sketch.ipynb merge_chrom \
    --output-dir output/rss_ld_sketch \
    --cohort-id ADSP.R4.EUR \
    --chrom 0 \
    -s force

Command Interface#

sos run pipeline/rss_ld_sketch.ipynb -h
usage: sos run pipeline/rss_ld_sketch.ipynb
               [workflow_name | -t targets] [options] [workflow_options]
  workflow_name:        Single or combined workflows defined in this script
  options:              Single-hyphen sos parameters (see "sos run -h" for details)
  workflow_options:     Double-hyphen workflow-specific parameters

Workflows:
  generate_W
  process_block
  merge_chrom

Global Workflow Options:
  --walltime 24:00:00
  --mem 32G
  --numThreads 8 (as int)
  --plink2-bin plink2

Sections
  generate_W:
    Workflow Options:
      --n-samples VAL (as int, required)
                        Generate projection matrix $W \sim N(0, 1/\sqrt{n})$, shape $(n \times B)$.
                        Run ONCE per cohort before any chromosome processing.
                        W depends only on n (sample size) and B -- not on variant data.
                        All 22 chromosomes reuse the same W.
      --output-dir VAL (as str, required)
      --B 10000 (as int)
      --seed 123 (as int)

  process_block:
    Workflow Options:
      --vcf-base VAL (as str, required)
                        Directory containing VCF (.bgz) files, one or more per chromosome.
      --vcf-prefix VAL (as str, required)
                        Shared filename prefix of all VCF files (everything before "chr{N}:" or "chr{N}.").
      --ld-block-file VAL (as str, required)
                        Path to LD block BED file (e.g. EUR_LD_blocks.bed).
      --chrom 0 (as int)
                        Chromosome 1-22, or 0 for all chromosomes.
      --cohort-id ADSP.R5.EUR (as str)
                        Cohort prefix for output filenames.
      --output-dir VAL (as str, required)
      --W-matrix VAL (as str, required)
      --B 10000 (as int)
      --maf-min 0.0005 (as float)
      --mac-min 5 (as int)
      --msng-min 0.05 (as float)
      --sample-list "" (optional path to sample ID file)
    Output per block (intermediate, cleaned up by merge_chrom):
      {cohort_id}.{chr}_{start}_{end}.dosage.gz
      {cohort_id}.{chr}_{start}_{end}.map
      {cohort_id}.{chr}_{start}_{end}.afreq
      {cohort_id}.{chr}_{start}_{end}.meta

  merge_chrom:
    Workflow Options:
      --chrom 0 (as int)
                        Chromosome 1-22, or 0 for all chromosomes.
      --cohort-id ADSP.R5.EUR (as str)
                        Must match cohort-id used in process_block.
      --output-dir VAL (as str, required)
    Output per chromosome (final, block intermediates cleaned up):
      {cohort_id}.chr{N}.pgen
      {cohort_id}.chr{N}.pvar
      {cohort_id}.chr{N}.psam
      {cohort_id}.chr{N}.afreq

Pipeline Implementation#

[global]
parameter: cwd        = path("output")
parameter: job_size   = 1
parameter: walltime   = "24:00:00"
parameter: mem        = "32G"
parameter: numThreads = 8

cwd = path(f'{cwd:a}')

generate_W#

Generates the random projection matrix \(W \sim N(0, 1/\sqrt{n})\), shape \((n \times B)\). Run once per cohort. W is shared across all chromosomes.

[generate_W]
# Generate projection matrix $W \sim N(0, 1/\sqrt{n})$, shape (n x B).
# Run ONCE before processing any chromosome.
#
# W depends only on n (total sample size) and B -- not on any variant data.
# n_samples is passed directly as a parameter; no VCF reading is needed.
# All 22 chromosomes reuse the same W so that per-chromosome stochastic
# genotype samples can be arithmetically merged for meta-analysis.
parameter: n_samples = int
parameter: output_dir    = str
parameter: B         = 10000
parameter: seed      = 123

import os
input:  []
output: f'{output_dir}/W_B{B}.npy'
task: trunk_workers = 1, trunk_size = 1, walltime = '00:05:00', mem = '4G', cores = 1
python: expand = "${ }", stdout = f'{_output:n}.stdout', stderr = f'{_output:n}.stderr'

    import numpy as np
    import os

    n      = ${n_samples}
    B      = ${B}
    seed   = ${seed}
    W_out  = "${_output}"

    # -- Generate $W \sim N(0, 1/\sqrt{n})$ -----------------------------
    # Convention: W = np.random.normal(0, 1/np.sqrt(n), size=(n, B))
    # W is shared across all chromosomes -- do not regenerate per chromosome.
    print(f"Generating W ~ N(0, 1/sqrt({n})),  shape ({n}, {B}),  seed={seed}")
    np.random.seed(seed)
    W = np.random.normal(0, 1.0 / np.sqrt(n), size=(n, B)).astype(np.float32)

    os.makedirs(os.path.dirname(os.path.abspath(W_out)), exist_ok=True)
    np.save(W_out, W)
    print(f"Saved: {W_out}")
    print(f"Shape: {W.shape},  size: {os.path.getsize(W_out)/1e9:.2f} GB")

process_block#

Reads the LD block BED file and processes each block in parallel (via for_each). For each block: reads VCF in a single pass, filters variants by MAF/MAC/missingness, computes U = \(W^T G\), scales to [0,2], and writes .dosage.gz, .map, .afreq, .meta. G is never stored. Output is intermediate – cleaned up by merge_chrom.

[process_block]
parameter: ld_block_file = str
parameter: chrom         = 0
parameter: vcf_base      = str
parameter: vcf_prefix    = str
parameter: cohort_id     = "ADSP.R5.EUR"
parameter: output_dir    = str
parameter: W_matrix      = str
parameter: B             = 10000
parameter: maf_min       = 0.0005
parameter: mac_min       = 5
parameter: msng_min      = 0.05
parameter: sample_list   = ""

import os

def _read_blocks(bed, chrom_filter):
    blocks = []
    with open(bed) as fh:
        for line in fh:
            if line.startswith("#") or not line.strip():
                continue
            parts = line.split()
            c = parts[0]
            if not (c.startswith("chr") and c[3:].isdigit()):
                continue
            cnum = int(c[3:])
            if not (1 <= cnum <= 22):
                continue
            if chrom_filter != 0 and cnum != chrom_filter:
                continue
            blocks.append({"chr": c, "start": int(parts[1]), "end": int(parts[2])})
    if not blocks:
        raise ValueError(f"No blocks found for chrom={chrom_filter} in {bed}")
    return blocks

blocks = _read_blocks(ld_block_file, chrom)
print(f"  {len(blocks)} LD blocks queued")

input: for_each = "blocks"
output: f'{output_dir}/{_blocks["chr"]}/{_blocks["chr"]}_{_blocks["start"]}_{_blocks["end"]}/{cohort_id}.{_blocks["chr"]}_{_blocks["start"]}_{_blocks["end"]}.dosage.gz'
task: trunk_workers = 1, trunk_size = 1, walltime = walltime, mem = mem, cores = numThreads
python: expand = "${ }"

    import numpy as np
    import os
    import gzip
    import sys
    import atexit
    from math import nan
    from cyvcf2 import VCF
    from os import listdir

    # Block coordinates from for_each loop
    chrm_str    = "${_blocks['chr']}"
    block_start = ${_blocks["start"]}
    block_end   = ${_blocks["end"]}

    vcf_base    = "${vcf_base}"
    vcf_prefix  = "${vcf_prefix}"
    W_path      = "${W_matrix}"
    B           = ${B}
    maf_min     = ${maf_min}
    mac_min     = ${mac_min}
    msng_min    = ${msng_min}
    sample_list = "${sample_list}"
    cohort_id   = "${cohort_id}"
    base_dir    = "${output_dir}"

    block_tag   = f"{chrm_str}_{block_start}_{block_end}"
    output_dir  = os.path.join(base_dir, chrm_str, block_tag)
    os.makedirs(output_dir, exist_ok=True)

    log_path = os.path.join(output_dir, f"{block_tag}.log")
    log_fh   = open(log_path, "w")
    sys.stdout = log_fh
    sys.stderr = log_fh
    atexit.register(log_fh.close)

    # -- Load sample subset (optional) -----------------------------
    sample_subset = None
    if sample_list:
        if not os.path.exists(sample_list):
            raise FileNotFoundError(f"sample_list not found: {sample_list}")
        with open(sample_list) as fh:
            sample_subset = set(line.strip() for line in fh if line.strip())
        print(f"  Sample subset: {len(sample_subset):,} samples")

    # -- Helpers ---------------------------------------------------
    def get_vcf_files(chrm_str):
        files = sorted([
            os.path.join(vcf_base, x)
            for x in listdir(vcf_base)
            if x.endswith(".bgz") and (
                x.startswith(vcf_prefix + chrm_str + ":") or
                x.startswith(vcf_prefix + chrm_str + ".")
            )
        ])
        if not files:
            raise FileNotFoundError(f"No VCF files for {chrm_str} in {vcf_base}")
        return files

    def open_vcf(vf, sample_subset):
        """Open a VCF file, applying sample subset if provided."""
        vcf = VCF(vf)
        if sample_subset is not None:
            vcf_samples = vcf.samples
            keep = [s for s in vcf_samples if s in sample_subset]
            if not keep:
                raise ValueError(f"No sample_list samples in {os.path.basename(vf)}")
            vcf.set_samples(keep)
        return vcf

    def extract_dosage(var):
        """Extract diploid dosage from a cyvcf2 variant. Returns list of floats (nan for missing)."""
        return [sum(x[0:2]) for x in [[nan if v == -1 else v for v in gt] for gt in var.genotypes]]

    def fill_missing_col_means(G):
        col_means = np.nanmean(G, axis=0)
        return np.where(np.isnan(G), col_means, G)

    # -- Single-pass: scan variants, filter, and collect dosages ---
    # BED is 0-based half-open [start, end); VCF is 1-based.
    print(f"[1/3] Scanning {chrm_str} [{block_start:,}, {block_end:,}) ...")
    vcf_files = get_vcf_files(chrm_str)
    region    = f"{chrm_str}:{block_start+1}-{block_end}"
    var_info  = []
    dosage_matrix = []
    n_samples = None
    # Filter counters
    n_total = 0
    n_multiallelic = 0
    n_monomorphic = 0
    n_all_na = 0
    n_low_maf = 0
    n_low_mac = 0
    n_high_msng = 0

    for vf in vcf_files:
        vcf = open_vcf(vf, sample_subset)
        if n_samples is None:
            n_samples = len(vcf.samples)
        for var in vcf(region):
            if not (block_start <= var.POS - 1 < block_end):
                continue
            n_total += 1
            if len(var.ALT) != 1:
                n_multiallelic += 1
                continue
            dosage = extract_dosage(var)
            if np.nanvar(dosage) == 0:
                n_monomorphic += 1
                continue
            nan_count = int(np.sum(np.isnan(dosage)))
            n_non_na  = len(dosage) - nan_count
            if n_non_na == 0:
                n_all_na += 1
                continue
            alt_sum   = float(np.nansum(dosage))
            mac       = min(2 * n_non_na - alt_sum, alt_sum)
            maf       = mac / (2 * n_non_na)
            af        = alt_sum / (2 * n_non_na)
            msng_rate = nan_count / len(dosage)
            if msng_rate > msng_min:
                n_high_msng += 1
                continue
            if maf < maf_min:
                n_low_maf += 1
                continue
            if mac < mac_min:
                n_low_mac += 1
                continue
            var_info.append({
                "chr": var.CHROM, "pos": var.POS,
                "ref": var.REF,   "alt": var.ALT[0],
                "af":  round(float(af), 6),
                "id":  f"{var.CHROM}:{var.POS}:{var.REF}:{var.ALT[0]}",
                "obs_ct": 2 * n_non_na,
            })
            dosage_matrix.append(dosage)
        vcf.close()

    n_passed = len(var_info)
    print(f"  {n_total:,} total variants in region")
    print(f"  {n_passed:,} passed filters (n={n_samples:,})")
    print(f"  Filtered: {n_multiallelic:,} multiallelic, "
          f"{n_monomorphic:,} monomorphic, {n_all_na:,} all-NA, "
          f"{n_high_msng:,} high-missingness, "
          f"{n_low_maf:,} low-MAF, {n_low_mac:,} low-MAC")

    if not var_info:
        raise ValueError(f"No passing variants in {chrm_str} [{block_start:,}, {block_end:,})")

    # -- Load W ----------------------------------------------------
    print(f"[2/3] Loading W ...")
    W = np.load(W_path)
    if W.shape != (n_samples, B):
        raise ValueError(f"W shape mismatch: {W.shape} vs ({n_samples},{B})")
    W = W.astype(np.float32)
    print(f"  W: {W.shape}")

    # -- Compute U = $W^T G$ and write output files --------------------
    print(f"[3/3] Computing U and writing output files ...")

    dosage_path = os.path.join(output_dir, f"{cohort_id}.{block_tag}.dosage.gz")
    map_path    = os.path.join(output_dir, f"{cohort_id}.{block_tag}.map")
    afreq_path  = os.path.join(output_dir, f"{cohort_id}.{block_tag}.afreq")
    meta_path   = os.path.join(output_dir, f"{cohort_id}.{block_tag}.meta")

    # Write .map
    with open(map_path, "w") as fh:
        for v in var_info:
            fh.write(f"{v['chr']}\t{v['id']}\t0\t{v['pos']}\n")

    # Write .meta
    with open(meta_path, "w") as fh:
        fh.write(f"source_n_samples={n_samples}\nB={B}\n")
        fh.write(f"chrom={chrm_str}\nblock_start={block_start}\nblock_end={block_end}\n")
        fh.write(f"n_total={n_total}\nn_passed={n_passed}\n")
        fh.write(f"n_multiallelic={n_multiallelic}\nn_monomorphic={n_monomorphic}\n")
        fh.write(f"n_all_na={n_all_na}\nn_high_msng={n_high_msng}\n")
        fh.write(f"n_low_maf={n_low_maf}\nn_low_mac={n_low_mac}\n")


    # Build G from collected dosages, compute U = $W^T G$, write dosage.gz
    # Dosage format=1: ID ALT REF val_S1 ... val_SB
    # Min-max scaling to [0, 2] makes the output plink2-compatible as dosage.
    # This preserves correlation structure (cor is scale-invariant) which is
    # what matters for LD computation downstream.
    G = np.array(dosage_matrix, dtype=np.float32).T  # (n_samples, n_variants)
    del dosage_matrix
    G = fill_missing_col_means(G)

    # variant-wise scaling
    col_mean = G.mean(axis=0, keepdims=True)
    col_std  = G.std(axis=0, keepdims=True)
    # avoid division by zero
    col_std[col_std == 0] = 1.0
    G = (G - col_mean) / col_std

    U = W.T @ G  # (B, n_variants)
    del G

    col_min = U.min(axis=0)
    col_max = U.max(axis=0)
    denom   = col_max - col_min
    denom[denom == 0] = 1.0
    U = 2.0 * (U - col_min) / denom
    U = np.round(U, 4)

    # Record the col min and max for U

    with open(afreq_path, "w") as fh:
        # Add column headers
        fh.write("#CHROM\tID\tREF\tALT\tALT_FREQS\tOBS_CT\tU_MIN\tU_MAX\n")
        for j, v in enumerate(var_info):
            fh.write(f"{v['chr']}\t{v['id']}\t{v['ref']}\t{v['alt']}\t"
                     f"{v['af']:.6f}\t{v['obs_ct']}\t"
                     f"{col_min[j]:.6f}\t{col_max[j]:.6f}\n")

    with gzip.open(dosage_path, "wt", compresslevel=4) as gz:
        for j, v in enumerate(var_info):
            vals = " ".join(f"{x:.4f}" for x in U[:, j])
            gz.write(f"{v['id']} {v['alt']} {v['ref']} {vals}\n")

    del U
    print(f"  Written: {len(var_info):,} variants -> {os.path.basename(dosage_path)}")
    print(f"  Written: {os.path.basename(map_path)}, {os.path.basename(afreq_path)}")
    print(f"\nDone: {chrm_str} [{block_start:,}, {block_end:,})")

merge_chrom#

For each chromosome (parallelized via for_each): converts per-block .dosage.gz files to sorted pgen (two-pass: import then --sort-vars), merges all blocks into one per-chromosome pgen using plink2 --pmerge-list, concatenates allele frequencies, and removes all per-block intermediate directories.

Output per chromosome:

  • {cohort_id}.chr{N}.pgen – binary genotype data (uncompressed – plink2 does not support .pgen compression)

  • {cohort_id}.chr{N}.pvar.zst – variant info, zstd-compressed via --make-pgen vzs

  • {cohort_id}.chr{N}.psam – sample info

  • {cohort_id}.chr{N}.afreq – allele frequencies

[merge_chrom]
parameter: chrom      = 0
parameter: output_dir = str
parameter: cohort_id  = str
parameter: plink2_bin = "plink2"

import os, glob

def _chroms_to_process(output_dir, chrom_filter):
    if chrom_filter != 0:
        return [f"chr{chrom_filter}"]
    return sorted(set(
        os.path.basename(d)
        for d in glob.glob(os.path.join(output_dir, "chr*"))
        if os.path.isdir(d)
    ))

chroms = _chroms_to_process(output_dir, chrom)

input: for_each = "chroms"
output: f"{output_dir}/{_chroms}/{cohort_id}.{_chroms}.pgen"
task: trunk_workers = 1, trunk_size = 1, walltime = walltime, mem = mem, cores = numThreads
bash: expand = "$[ ]"
    set -euo pipefail
    shopt -s nullglob

    chrom_dir="$[output_dir]/$[_chroms]"
    final_prefix="${chrom_dir}/$[cohort_id].$[_chroms]"
    merge_list="${chrom_dir}/$[cohort_id].$[_chroms]_pmerge_list.txt"

    # Step 1: Convert each block dosage.gz -> sorted per-block pgen
    > "${merge_list}"
    files=("${chrom_dir}"/*/*.dosage.gz)
    if [ ${#files[@]} -eq 0 ]; then
        echo "No dosage files found in ${chrom_dir}" >&2
        exit 1
    fi
    for dosage_gz in "${files[@]}"; do
        block_dir=$(dirname "${dosage_gz}")
        block_tag=$(basename "${block_dir}")
        prefix="${block_dir}/$[cohort_id].${block_tag}_tmp"
        map_file="${block_dir}/$[cohort_id].${block_tag}.map"
        psam_file="${block_dir}/$[cohort_id].${block_tag}.psam"
        meta_file="${block_dir}/$[cohort_id].${block_tag}.meta"
        B=$(grep "^B=" "${meta_file}" | cut -d= -f2)
        printf '#FID\tIID\n' > "${psam_file}"
        for i in $(seq 1 ${B}); do
            printf 'S%d\tS%d\n' ${i} ${i} >> "${psam_file}"
        done
        $[plink2_bin] \
            --import-dosage "${dosage_gz}" format=1 noheader \
            --psam "${psam_file}" \
            --map  "${map_file}" \
            --make-pgen \
            --out  "${prefix}_unsorted" \
            --silent
        $[plink2_bin] \
            --pfile "${prefix}_unsorted" \
            --make-pgen \
            --sort-vars \
            --out  "${prefix}" \
            --silent
        rm -f "${prefix}_unsorted.pgen" "${prefix}_unsorted.pvar" "${prefix}_unsorted.psam"
        echo "${prefix}" >> "${merge_list}"
    done

    # Step 2: Merge all per-block pgens -> one per-chrom pgen
    $[plink2_bin] \
        --pmerge-list "${merge_list}" pfile \
        --make-pgen vzs \
        --sort-vars \
        --out  "${final_prefix}"

    # Step 3: Concatenate .afreq
    first=1
    for f in "${chrom_dir}"/*/*.afreq; do
        if [ "${first}" -eq 1 ]; then
            cat "${f}" > "${final_prefix}.afreq"
            first=0
        else
            tail -n +2 "${f}" >> "${final_prefix}.afreq"
        fi
    done

    # Step 4: Add U_MIN and U_MAX to the final .pvar file
    # Extract ID, MIN, and MAX from the concatenated .afreq (columns 2, 7, 8)
    awk 'NR > 1 {print $2, $7, $8}' "${final_prefix}.afreq" > "${final_prefix}.recovery_params"

    # Decompress the .pvar.zst 
    $[plink2_bin] --zst-decompress "${final_prefix}.pvar.zst" > "${final_prefix}.pvar.tmp"

    awk 'NR==FNR {min[$1]=$2; max[$1]=$3; next} 
         /^#/ {print $0 "\tU_MIN\tU_MAX"; next} 
         {if ($3 in min) print $0 "\t" min[$3] "\t" max[$3]; else print $0 "\tNA\tNA"}' \
         "${final_prefix}.recovery_params" "${final_prefix}.pvar.tmp" > "${final_prefix}.pvar"

    # Replace the compressed file with the updated text file
    rm "${final_prefix}.pvar.tmp" "${final_prefix}.pvar.zst" "${final_prefix}.recovery_params"


R: expand = "$[ ]"
    library(data.table)
    meta_files <- list.files("$[output_dir]/$[_chroms]",
                             pattern = "[.]meta$", recursive = TRUE,
                             full.names = TRUE)
    if (length(meta_files) > 0) {
      fields <- c("n_total", "n_passed", "n_multiallelic", "n_monomorphic",
                  "n_all_na", "n_high_msng", "n_low_maf", "n_low_mac")
      stats <- rbindlist(lapply(meta_files, function(f) {
        lines <- grep("^n_", readLines(f), value = TRUE)
        kv <- strsplit(lines, "=")
        vals <- setNames(as.integer(sapply(kv, `[`, 2)), sapply(kv, `[`, 1))
        as.data.table(as.list(vals[fields]))
      }))
      totals <- colSums(stats, na.rm = TRUE)
      summary <- data.frame(t(totals))
      summary$pct_dropped <- round(100 * (1 - summary$n_passed / summary$n_total), 1)
      cat("\n=== Filter Summary for $[_chroms] ===\n")
      print(data.frame(value = unlist(summary), row.names = names(summary)))
    }
bash: expand = "$[ ]"
    # Step 5: Cleanup block intermediates
    chrom_dir="$[output_dir]/$[_chroms]"
    final_prefix="${chrom_dir}/$[cohort_id].$[_chroms]"
    rm -f "${final_prefix}_pmerge_list.txt"
    rm -f "${final_prefix}-merge.pgen" "${final_prefix}-merge.pvar" "${final_prefix}-merge.psam"
    for block_dir in "${chrom_dir}"/*/; do
        rm -rf "${block_dir}"
    done

Step 4 – Load LD sketch data into R#

Load the final pgen output into R using pecotmr::load_LD_matrix() via a metadata TSV.

Metadata TSV format for PLINK-based LD: one row per chromosome with start=0, end=0, and path as the PLINK file prefix (resolved relative to the TSV directory).

#chrom	start	end	path
22	0	0	ADSP.R5.EUR.chr22
library(pecotmr)

meta <- "/path/to/ld_meta_chr22.tsv"
region <- "chr22:30000000-32000000"  # 2Mb region

# Load genotype matrix X (for susie_rss z+X interface)
system.time({
  res_x <- load_LD_matrix(meta, region, return_genotype = TRUE)
})
cat("X dimensions (samples x variants):", dim(res_x$LD_matrix), "\n")
head(res_x$ref_panel, 3)
res_x$LD_matrix[1:3, 1:3]

# Load LD correlation matrix R (for susie_rss z+R interface)
system.time({
  res_r <- load_LD_matrix(meta, region, return_genotype = FALSE)
})
cat("R dimensions:", dim(res_r$LD_matrix), "\n")
res_r$LD_matrix[1:3, 1:3]

Tested output (ADSP R5 EUR chr22, 10000 pseudo-samples, 2Mb region, 9188 variants):

# Genotype matrix X
   user  system elapsed
  0.680   0.170   0.900

X dimensions (samples x variants): 10000 9188

  chrom      pos A2 A1         variant_id allele_freq
1    22 30000353  G  T chr22:30000353:G:T    0.734539
2    22 30000443  C  T chr22:30000443:C:T    0.001058
3    22 30000447  C  T chr22:30000447:C:T    0.005081

   chr22:30000353:G:T chr22:30000443:C:T chr22:30000447:C:T
S1          0.6563110          0.7753906           1.065613
S2          0.7034302          0.3729858           0.805481
S3          0.8706055          1.1093750           1.046509

# LD correlation matrix R
   user  system elapsed
  6.390   0.410   6.880

R dimensions: 9188 9188

                   chr22:30000353:G:T chr22:30000443:C:T chr22:30000447:C:T
chr22:30000353:G:T         1.00000000        0.052049420        0.096841034
chr22:30000443:C:T         0.05204942        1.000000000       -0.007827702
chr22:30000447:C:T         0.09684103       -0.007827702        1.000000000

Use return_genotype = TRUE for the susie_rss(z, X=X) interface, or FALSE for the susie_rss(z, R=R) interface. For repeated analyses on the same region, compute R once and save with saveRDS() for reuse.