Single-nuclei Pseudobulk Preprocessing (RNA-seq and ATAC-seq)#

Description#

Single-nuclei pseudobulk preprocessing for RNA-seq and ATAC-seq. Aggregates per-nucleus counts into per-sample pseudobulk matrices, harmonizes sample IDs, and regresses out technical covariates to produce QTL-ready phenotype values.

The pipeline has three stages:

  • pseudobulk_counts – aggregate a Seurat object into a raw pseudobulk count matrix for one cell type.

  • sampleid_mapping – remap individualID headers to standardized sampleid across metadata and count matrices.

  • pseudobulk_qc – filter, TMM-normalize, fit a technical-covariate model, and write residuals.

(phenotype_formatting reformats residuals into a BED for snATAC-seq caQTL mapping.)

Timing: ~5-10 min on typical compute infrastructure.

Input Files#

The toy example uses single-nuclei RNA-seq data for the MIC (microglia) cell type under input/snrnaseq/.

File

Description

protocol_example.snrnaseq.seurat_MIC.rds

Seurat object (MIC cells) with per-nucleus counts (input to pseudobulk_counts)

protocol_example.snrnaseq.pseudobulk_counts_MIC.csv.gz

Pre-aggregated pseudobulk count matrix (genes x samples)

protocol_example.snrnaseq.metadata_MIC.csv

Sample-level metadata (sampleid, batch, nuclei counts)

protocol_example.snrnaseq.tech_vars_MIC.csv

Technical covariates keyed by sampleid

protocol_example.snrnaseq.id_map.csv

individualID -> sampleid map (input to sampleid_mapping)

atac_residuals/MIC/protocol_example.snrnaseq.MIC_residuals.txt

Peak residuals with chr-start-end IDs (input to phenotype_formatting)

Step 1: Pseudobulk Count Matrix Generation#

Aggregates single-nuclei counts into pseudobulk count matrices per cell type from Seurat objects.

This step is upstream of sampleid_mapping and pseudobulk_qc. Output feeds directly into the existing preprocessing pipeline.

Input#

File

Description

celltyped_seuratobj{i}.rds

Seurat objects with celltype and sample annotations in meta.data

Process#

  1. Load each Seurat object and subset to target cell type — skips objects where cell type is not present

  2. Merge all subsets across objects and join layers

  3. Aggregate raw counts by sample (AggregateExpression)

  4. Filter out samples with fewer than min_cells cells (default: 10)

  5. Strip Ensembl version suffixes from gene IDs (ENSG00000000010.1ENSG00000000010)

  6. Save as pseudobulk_counts_{celltype}.csv.gz — raw counts only, normalization handled downstream in pseudobulk_qc

GLU cell type: Due to its large size, process in two batches (files 1–6 and 7–11) and pass separately.

Parameters#

Parameter

Default

Description

seurat_files

required

One or more Seurat .rds files

output_dir

required

Output directory for count matrix

celltype

MIC

Cell type to extract (must match celltype column in meta.data)

min_cells

10

Minimum number of cells per sample to retain

Output#

File

Description

pseudobulk_counts_{celltype}.csv.gz

Raw pseudobulk count matrix (genes × samples)

Timing: 10–30 min per cell type depending on object size

sos run pipeline/pseudobulk_preprocessing.ipynb pseudobulk_counts \
    --seurat-files input/snrnaseq/protocol_example.snrnaseq.seurat_MIC.rds \
    --celltype MIC \
    --output-dir output/snrna_seq

Step 2: Sample ID Mapping#

Maps original sample identifiers (individualID) to standardized sample IDs (sampleid) across metadata and count matrix files.

Input#

File

Description

rosmap_sample_mapping_data.csv

Mapping reference: individualID sampleid

metadata_{celltype}.csv

Per-cell-type sample metadata

pseudobulk_peaks_counts_{celltype}.csv.gz (snATAC-seq)

Per-cell-type peak count matrices

pseudobulk_counts_{celltype}.csv.gz (snRNA-seq)

Per-cell-type gene count matrices

Process#

Part 1 — Metadata files

For each metadata file:

  1. Look up each individualID in the mapping reference

  2. Assign sampleid — falls back to individualID if no mapping found

  3. Reorder columns: sampleid first, then individualID, then the rest

  4. Save updated file

Part 2 — Count matrix files

For each count file:

  1. Extract the header row (column names only)

  2. Keep the first column (peak or gene IDs) unchanged

  3. Map remaining column names (individualIDsampleid) where mapping exists, otherwise keep original

  4. Write new header and stream data rows unchanged

  5. Recompress with gzip

Parameters#

Parameter

Default

Description

map_file

required

CSV with individualIDsampleid mapping

meta_files

required

Metadata CSV files to remap

count_files

required

Count CSV.gz files to remap

output_dir

required

Parent output directory; writes to {output_dir}/1_files_with_sampleid/

Output#

Output directory: {output_dir}/1_files_with_sampleid/

File

Description

metadata_{celltype}.csv

Metadata with sampleid column prepended

pseudobulk_peaks_counts_{celltype}.csv.gz (snATAC-seq)

Count matrices with mapped column headers

pseudobulk_counts_{celltype}.csv.gz (snRNA-seq)

Count matrices with mapped column headers

Timing: < 1 min

sos run pipeline/pseudobulk_preprocessing.ipynb sampleid_mapping \
    --map-file input/snrnaseq/protocol_example.snrnaseq.id_map.csv \
    --meta-files input/snrnaseq/protocol_example.snrnaseq.metadata_MIC.csv \
    --output-dir output/snrna_seq

Step 2: Pseudobulk QC#

Regresses out technical covariates for downstream QTL analysis. Works for both snATAC-seq and snRNA-seq.

Input#

File

Description

metadata_{celltype}.csv

Sample-level metadata (nuclei counts, batch info)

pseudobulk_*counts_{celltype}.csv.gz

Pseudobulk count matrix

tech_vars.csv

Technical covariates (sampleid + tech var columns, pre-processed)

hg38-blacklist.v2.bed.gz (snATAC-seq, optional)

Blacklisted genomic regions

Process#

  1. Load count matrix and auto-detect modality (snATAC-seq vs snRNA-seq)

  2. (Optional) Filter to specific genomic regions (snATAC-seq) or gene list (snRNA-seq)

  3. Load metadata; filter samples with fewer than min_nuclei nuclei (default: 20)

  4. Align samples between metadata and count matrix

  5. (Optional) Filter blacklisted genomic regions (snATAC-seq only)

  6. Merge tech vars from tech_vars_file by sampleid

  7. Drop samples with NA in any tech var

  8. Apply expression filtering (filterByExpr):

    • min_count = 5: minimum reads in at least one sample

    • min_total_count = 15: minimum total reads across all samples

    • min_prop = 0.1: feature expressed in ≥10% of samples

  9. TMM normalization

  10. (Optional) Batch correction on sequencingBatch:

    • limma::removeBatchEffect (default)

    • ComBat (on log-CPM)

  11. Add sequencingBatch and Library to model if present and multi-level

  12. Fit linear model (voom + lmFit + eBayes) with tech vars + batch vars only

  13. Compute offset + residuals as final adjusted values:

    • offset: intercept + batch effects at reference level

    • residuals: variation after removing technical effects; biological signal retained

  14. (Optional) Quantile normalization of final values

Model formula:

~ {tech_vars} + [sequencingBatch] + [Library]

sequencingBatch and Library included only if present and have more than one level. Biological variables (pmi, study, msex, age_death etc.) are not included — they should not be regressed out as they may be associated with genotype.

Parameters#

Parameter

Default

Description

meta_files

required

Metadata CSV files (one per cell type)

count_files

required

Count CSV.gz files (one per cell type, same order as meta_files)

output_dir

required

Parent output directory; writes to {output_dir}/2_residuals/{ct}/

tech_vars_file

required

CSV with sampleid + tech var columns

blacklist_file

''

Genomic blacklist BED file (snATAC-seq only)

regions

''

Comma-separated genomic regions e.g. chr7:28000000-28300000 (snATAC-seq)

gene_list

''

Comma-separated gene IDs e.g. ENSG00000000010 (snRNA-seq)

batch_correction

FALSE

Apply batch correction (TRUE/FALSE)

batch_method

limma

Batch correction method (limma or combat)

quant_norm

FALSE

Apply quantile normalization after residuals

min_count

5

Min reads in at least one sample

min_total_count

15

Min total reads across all samples

min_prop

0.1

Min proportion of samples with expression

min_nuclei

20

Min nuclei per sample

Output#

Output directory: {output_dir}/2_residuals/{celltype}/

File

Description

{celltype}_residuals.txt

Tech-covariate-adjusted values (log2-CPM)

{celltype}_residuals_qn.txt

Quantile-normalized adjusted values (if quant_norm=TRUE)

{celltype}_results.rds

Full results: DGEList, fit, offset, residuals, design, parameters

{celltype}_filtered_raw_counts.txt

Filtered raw counts before normalization

Timing: < 5 min per cell type

sos run pipeline/pseudobulk_preprocessing.ipynb pseudobulk_qc \
    --meta-files input/snrnaseq/protocol_example.snrnaseq.metadata_MIC.csv \
    --count-files input/snrnaseq/protocol_example.snrnaseq.pseudobulk_counts_MIC.csv.gz \
    --tech-vars-file input/snrnaseq/protocol_example.snrnaseq.tech_vars_MIC.csv \
    --output-dir output/snrna_seq

Step 3: Phenotype Reformatting (snATAC-seq only)#

Converts residuals into a QTL-ready BED format for genome-wide caQTL mapping.

For snRNA-seq, please follow this pipeline.

Input#

File

Description

{celltype}_residuals.txt

Residuals from pseudobulk_qc

Process#

  1. Read residuals file with proper handling of feature IDs and sample columns

  2. Parse peak coordinates from peak IDs (chr-start-end format)

  3. Convert to midpoint coordinates (standard for QTLtools):

start = floor((peak_start + peak_end) / 2)
end   = start + 1
  1. Build BED format: #chr, start, end, ID followed by per-sample values

  2. Sort by chromosome and position

  3. Compress with bgzip and index with tabix

Parameters#

Parameter

Default

Description

residual_files

required

Residual txt files from pseudobulk_qc

output_dir

required

Parent output directory; writes to {output_dir}/3_pheno_reformat/

Output#

Output directory: {output_dir}/3_pheno_reformat/

File

Description

{celltype}_phenotype.bed.gz

bgzip-compressed BED with midpoint coordinates

{celltype}_phenotype.bed.gz.tbi

tabix index for random-access queries

Compatible with FastQTL, TensorQTL, and QTLtools.

Timing: < 1 min per cell type

sos run pipeline/pseudobulk_preprocessing.ipynb phenotype_formatting \
    --residual-files input/snrnaseq/atac_residuals/MIC/protocol_example.snrnaseq.MIC_residuals.txt \
    --output-dir output/snrna_seq

Output Files#

Written to {output_dir}/2_residuals/{celltype}/ (e.g. output/snrna_seq/2_residuals/MIC/).

File

Description

{celltype}_residuals.txt

Tech-covariate-adjusted values (log2-CPM)

{celltype}_results.rds

Full results: DGEList, fit, offset, residuals, design

{celltype}_filtered_raw_counts.txt

Filtered raw counts before normalization

Stage 1 (pseudobulk_counts) writes 0_pseudobulk_counts/pseudobulk_counts_{celltype}.csv.gz; sampleid_mapping writes 1_files_with_sampleid/.

Anticipated Results#

The pipeline produces output files in the output/ subdirectory named after the workflow step. Verify success by checking that output files exist and are non-empty. See the Output section above for the expected file names and formats.

Command interface#

sos run pipeline/pseudobulk_preprocessing.ipynb -h

Workflow implementation#

The cells below define the SoS workflow steps invoked by the commands above. They are not meant to be edited for routine analysis.

Setup and global parameters#

[global]
parameter: cwd = path("output")
parameter: job_size = 1
parameter: walltime = "5h"
parameter: mem = "16G"
parameter: numThreads = 8
parameter: container = ""


cwd = path(f'{cwd:a}')
usage: sos run pipeline/pseudobulk_preprocessing.ipynb
               [workflow_name | -t targets] [options] [workflow_options]
  workflow_name:        Single or combined workflows defined in this script
  targets:              One or more targets to generate
  options:              Single-hyphen sos parameters (see "sos run -h" for details)
  workflow_options:     Double-hyphen workflow-specific parameters
Workflows:
  sampleid_mapping
  pseudobulk_qc
  phenotype_formatting
Global Workflow Options:
  --cwd output (as path)
  --job-size 1 (as int)
  --walltime 5h
  --mem 16G
  --numThreads 8 (as int)
  --container ''
Sections
  sampleid_mapping:
    Workflow Options:
      --map-file VAL (as str, required)
      --output-dir VAL (as str, required)
      --meta-files  (as list)
      --count-files  (as list)
  pseudobulk_qc:
    Workflow Options:
      --meta-files  (as list)
      --count-files  (as list)
      --output-dir VAL (as str, required)
      --tech-vars-file VAL (as str, required)
      --blacklist-file ''
      --batch-correction FALSE
      --batch-method limma
      --quant-norm FALSE
      --min-count 5 (as int)
      --min-total-count 15 (as int)
      --min-prop 0.1 (as float)
      --min-nuclei 20 (as int)
      --regions ''
      --gene-list ''
  phenotype_formatting:
    Workflow Options:
      --residual-files  (as list)
      --output-dir VAL (as str, required)

pseudobulk_counts#

[pseudobulk_counts]
parameter: seurat_files = []
parameter: output_dir   = str
parameter: celltype     = 'MIC'
parameter: min_cells    = 10

import os

input:  seurat_files
output: f'{output_dir}/0_pseudobulk_counts/pseudobulk_counts_{celltype}.csv.gz'

task: trunk_workers = 1, trunk_size = 1, walltime = '4:00:00', mem = '64G', cores = 4

R: expand = "${ }", stdout = f'{_output:n}.stdout', stderr = f'{_output:n}.stderr'

    library(Seurat)
    library(data.table)

    seurat_files <- c(${','.join([f'"{f}"' for f in seurat_files])})
    celltype     <- "${celltype}"
    min_cells    <- ${min_cells}
    output_dir   <- "${output_dir}"
    out_file     <- "${_output}"

    message("Loading and subsetting Seurat objects for: ", celltype)

    # ── 1. Load and subset each Seurat object ─────────────────────
    subsets <- list()
    for (f in seurat_files) {
        message("Loading: ", basename(f))
        obj <- readRDS(f)
        if (celltype %in% obj$celltype) {
            subsets[[length(subsets) + 1]] <- subset(obj, subset = celltype == celltype)
        } else {
            message("  Skipping - celltype not found in: ", basename(f))
        }
        rm(obj)
        gc()
    }

    if (length(subsets) == 0) stop("No Seurat objects contain celltype: ", celltype)
    message("Found ", celltype, " in ", length(subsets), " objects")

    # ── 2. Merge and aggregate ────────────────────────────────────
    merged <- Reduce(merge, subsets)
    rm(subsets)
    gc()

    merged <- JoinLayers(merged)
    merged <- SetIdent(merged, value = "sample")

    cell_counts <- table(merged@meta.data$sample)
    expr        <- AggregateExpression(merged, group.by = "sample", slot = "counts")$RNA
    rm(merged)
    gc()

    # ── 3. Filter samples with < min_cells ────────────────────────
    valid_samples <- names(cell_counts[cell_counts >= min_cells])
    # Seurat AggregateExpression replaces underscores in identities with dashes;
    # map valid_samples to the dash-form used in colnames(expr) for subsetting.
    valid_cols <- gsub("_", "-", valid_samples)
    valid_cols <- valid_cols[valid_cols %in% colnames(expr)]
    expr          <- expr[, valid_cols, drop = FALSE]
    colnames(expr) <- gsub("-", "_", colnames(expr))  # restore underscore sample names
    message("Samples after min_cells (>= ", min_cells, ") filter: ", ncol(expr))

    # ── 4. Strip Ensembl version suffixes ─────────────────────────
    rownames(expr) <- gsub("\\..*$", "", rownames(expr))

    # ── 5. Save as csv.gz ─────────────────────────────────────────
    message("Genes: ", nrow(expr), " | Samples: ", ncol(expr))
    dt <- data.table(gene_id = rownames(expr), as.data.frame(expr))
    fwrite(dt, out_file, compress = "gzip")
    message("Saved: ", out_file)

sampleid_mapping#

[sampleid_mapping]
parameter: map_file    = str
parameter: output_dir  = str
parameter: meta_files  = []
parameter: count_files = []

import os

input:  meta_files + count_files
output: [f'{output_dir}/1_files_with_sampleid/{os.path.basename(f)}' for f in meta_files + count_files]
         
python: expand = "${ }"
import pandas as pd
import gzip
import os
import subprocess
import csv
import numpy as np
import tempfile

map_df = pd.read_csv("${map_file}")
id_map = dict(zip(map_df["individualID"], map_df["sampleid"]))
output_dir  = "${output_dir}/1_files_with_sampleid"
meta_files  = ${meta_files}
count_files = ${count_files}

os.makedirs(output_dir, exist_ok=True)

def map_id(ind_id):
    return id_map.get(ind_id, ind_id)

def format_value(val):
    if pd.isna(val):
        return ''
    if isinstance(val, (int, np.integer)):
        return str(val)
    if isinstance(val, (float, np.floating)):
        if val == int(val):
            return str(int(val))
        else:
            return str(val)
    return str(val)

# ── Process metadata ───────────────────────────────────────────────────────
for in_path in meta_files:
    fname    = os.path.basename(in_path)
    out_path = os.path.join(output_dir, fname)
    meta = pd.read_csv(in_path)
    if "individualID" not in meta.columns:
        print(f"Warning: individualID column not found in {fname}, skipping.")
        continue
    meta["sampleid"] = meta["individualID"].map(map_id)
    cols = meta.columns.tolist()
    cols.remove("sampleid")
    cols.remove("individualID")
    meta = meta[["sampleid", "individualID"] + cols]
    with open(out_path, 'w', newline='') as f:
        writer = csv.writer(f, quoting=csv.QUOTE_MINIMAL)
        writer.writerow(meta.columns)
        for _, row in meta.iterrows():
            writer.writerow([format_value(val) for val in row])

# ── Process count files ────────────────────────────────────────────────────
for in_path in count_files:
    fname    = os.path.basename(in_path)
    out_path = os.path.join(output_dir, fname)
    with gzip.open(in_path, "rt") as fh:
        header_line = fh.readline().rstrip("\n")
    col_names       = header_line.split(",")
    peak_id_col     = col_names[0]
    new_sample_cols = [map_id(s) for s in col_names[1:]]
    new_header      = ",".join([peak_id_col] + new_sample_cols)
    tmp = tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt')
    tmp.write(new_header + "\n")
    tmp.close()
    cmd = f"zcat {in_path} | tail -n +2 | cat {tmp.name} - | gzip -6 > {out_path}"
    subprocess.run(cmd, shell=True, check=True)
    os.unlink(tmp.name)

pseudobulk_qc#

[pseudobulk_qc]
parameter: meta_files       = []
parameter: count_files      = []
parameter: output_dir       = str
parameter: tech_vars_file   = str
parameter: blacklist_file   = ''
parameter: batch_correction = "FALSE"
parameter: batch_method     = "limma"
parameter: quant_norm       = "FALSE"
parameter: min_count        = 5
parameter: min_total_count  = 15
parameter: min_prop         = 0.1
parameter: min_nuclei       = 20
parameter: regions          = ''
parameter: gene_list        = ''

import os

_cts = [os.path.basename(f).split('metadata_')[-1].replace('.csv','') for f in meta_files]

input:  meta_files + count_files
output: [f'{output_dir}/2_residuals/{ct}/{ct}_residuals.txt' for ct in _cts]

task: trunk_workers = 1, trunk_size = 1, walltime = '6:00:00', mem = '64G', cores = 4

R: expand = "${ }", stdout = f'{_output[0]:n}.stdout', stderr = f'{_output[0]:n}.stderr'

    library(edgeR)
    library(limma)
    library(data.table)
    library(GenomicRanges)
    if (as.logical("${batch_correction}") && "${batch_method}" == "combat") library(sva)

    # ── predictOffset ──────────────────────────────────────────────────────
    predictOffset <- function(fit, tech_vars) {
        D  <- fit$design
        Dm <- D
        for (col in colnames(D)) {
            if (col == "(Intercept)") next
            is_tech <- any(sapply(tech_vars, function(v) grepl(paste0("^", v), col)))
            if (is_tech) {
                if (is.numeric(D[, col]) && !all(D[, col] %in% c(0, 1)))
                    Dm[, col] <- median(D[, col], na.rm=TRUE)
                else
                    Dm[, col] <- 0
            } else {
                Dm[, col] <- 0
            }
        }
        B <- fit$coefficients
        B[is.na(B)] <- 0
        off <- B %*% t(Dm)
        colnames(off) <- rownames(fit$design)
        return(off)
    }

    filter_blacklist <- function(mat, bed, feat_label) {
        peaks <- data.table(id = rownames(mat))
        peaks[, c("chr","start","end") := tstrsplit(gsub("_","-",id), "-")]
        peaks[, `:=`(start = as.numeric(start), end = as.numeric(end))]
        bl <- fread(bed)[, 1:3]
        setnames(bl, c("chr","start","end"))
        bl[, `:=`(start = as.numeric(start), end = as.numeric(end))]
        gr1 <- GRanges(peaks$chr, IRanges(peaks$start, peaks$end))
        gr2 <- GRanges(bl$chr,    IRanges(bl$start,    bl$end))
        blacklisted <- unique(queryHits(findOverlaps(gr1, gr2)))
        if (length(blacklisted) > 0) {
            message("Blacklisted ", feat_label, " removed: ", length(blacklisted))
            return(mat[-blacklisted, , drop=FALSE])
        }
        return(mat)
    }

    parse_regions <- function(region_str) {
        if (is.null(region_str) || region_str == "") return(NULL)
        lapply(strsplit(region_str, ",")[[1]], function(r) {
            parts <- strsplit(trimws(r), ":|−|-")[[1]]
            list(chr=parts[1], start=as.integer(parts[2]), end=as.integer(parts[3]))
        })
    }

    filter_regions <- function(mat, regions) {
        peaks <- data.table(id = rownames(mat))
        peaks[, c("chr","start","end") := tstrsplit(gsub("_","-",id), "-")]
        peaks[, `:=`(start = as.integer(start), end = as.integer(end))]
        gr_peaks <- GRanges(peaks$chr, IRanges(peaks$start, peaks$end))
        gr_regions <- GRanges(
            sapply(regions, `[[`, "chr"),
            IRanges(sapply(regions, `[[`, "start"), sapply(regions, `[[`, "end"))
        )
        keep <- unique(queryHits(findOverlaps(gr_peaks, gr_regions)))
        if (length(keep) == 0) stop("No peaks overlap the specified regions.")
        message("Peaks after region filter: ", length(keep))
        mat[keep, , drop=FALSE]
    }

    meta_files  <- c(${','.join([f'"{f}"' for f in meta_files])})
    count_files <- c(${','.join([f'"{f}"' for f in count_files])})

    if (length(meta_files) != length(count_files))
        stop("meta_files and count_files must have the same length and order.")

    # ── Load tech vars from file ───────────────────────────────────────────
    tech_df   <- fread("${tech_vars_file}")
    tech_vars <- setdiff(colnames(tech_df), "sampleid")
    message("Tech vars: ", paste(tech_vars, collapse=", "))

    regions   <- parse_regions("${regions}")
    gene_list <- trimws(strsplit("${gene_list}", ",")[[1]])
    gene_list <- gene_list[gene_list != ""]

    for (i in seq_along(meta_files)) {
        meta_file   <- meta_files[i]
        counts_file <- count_files[i]
        ct          <- sub("\\.csv$", "", sub("^.*metadata_", "", basename(meta_file)))

        message("\n", paste(rep("=", 40), collapse=""))
        message("Processing: ", ct)
        message("Batch correction: ", ifelse(as.logical("${batch_correction}"), "${batch_method}", "none"))
        message("Quantile normalization: ", as.logical("${quant_norm}"))
        message(paste(rep("=", 40), collapse=""))

        outdir <- file.path("${output_dir}/2_residuals", ct)
        dir.create(outdir, recursive=TRUE, showWarnings=FALSE)

        # ── 1. Load counts ─────────────────────────────────────────────────
        counts_raw       <- fread(counts_file)
        counts           <- as.matrix(counts_raw[, -1, with=FALSE])
        rownames(counts) <- counts_raw[[1]]
        rm(counts_raw)

        # ── Auto-detect modality ───────────────────────────────────────────
        is_atac    <- grepl("^chr.*-[0-9]+-[0-9]+$", rownames(counts)[1])
        feat_label <- ifelse(is_atac, "peaks", "genes")
        message("Modality: ", ifelse(is_atac, "snATAC-seq", "snRNA-seq"))
        message("Loaded: ", nrow(counts), " ", feat_label, " x ", ncol(counts), " samples")

        # ── 1b. Region/gene filtering (optional) ──────────────────────────
        if (is_atac && !is.null(regions)) {
            message("Filtering peaks to specified regions...")
            counts <- filter_regions(counts, regions)
        } else if (!is_atac && length(gene_list) > 0) {
            genes_present <- intersect(rownames(counts), gene_list)
            if (length(genes_present) == 0) stop("No matching genes found in count matrix.")
            message("Genes after gene_list filter: ", length(genes_present))
            counts <- counts[genes_present, , drop=FALSE]
        }

        # ── 2. Load metadata ───────────────────────────────────────────────
        meta  <- fread(meta_file)
        idcol <- intersect(c("sampleid","sampleID","individualID","projid"), colnames(meta))[1]
        if (is.na(idcol)) stop("Cannot find sample ID column in metadata.")

        # ── 3. Nuclei filter ──────────────────────────────────────────────
        n_nuclei_col <- intersect(c("n_nuclei","n.nuclei","nNuclei","nuclei_count"), colnames(meta))[1]
        if (!is.na(n_nuclei_col)) {
            meta <- meta[meta[[n_nuclei_col]] > ${min_nuclei}]
            message("Samples after nuclei (>${min_nuclei}) filter: ", nrow(meta))
        }

        # ── 4. Align samples ──────────────────────────────────────────────
        common <- intersect(meta[[idcol]], colnames(counts))
        if (length(common) == 0) stop("Zero sample overlap between metadata and count matrix.")
        counts <- counts[, common, drop=FALSE]
        message("Samples after alignment: ", length(common))

        # ── 5. Blacklist filtering ─────────────────────────────────────────
        if ("${blacklist_file}" != "" && file.exists("${blacklist_file}")) {
            counts <- filter_blacklist(counts, "${blacklist_file}", feat_label)
            message(feat_label, " after blacklist filter: ", nrow(counts))
        } else {
            message("No blacklist file - skipping.")
        }

        # ── 6. Merge tech vars by sampleid ────────────────────────────────
        tech_sub <- tech_df[tech_df$sampleid %in% common]
        tech_sub <- tech_sub[match(common, tech_sub$sampleid)]

        # ── 7. Drop samples with NA in tech vars ──────────────────────────
        keep_rows <- complete.cases(tech_sub[, ..tech_vars])
        tech_sub  <- tech_sub[keep_rows]
        counts    <- counts[, tech_sub$sampleid, drop=FALSE]
        message("Valid samples for modelling: ", nrow(tech_sub))

        # ── 8. Expression filtering ────────────────────────────────────────
        dge <- DGEList(counts=counts, samples=tech_sub)
        dge$samples$group <- factor(rep("all", ncol(dge)))
        message(feat_label, " before filter: ", nrow(dge))

        keep <- filterByExpr(dge, group=dge$samples$group,
                             min.count=${min_count},
                             min.total.count=${min_total_count},
                             min.prop=${min_prop})
        dge <- dge[keep,, keep.lib.sizes=FALSE]
        message(feat_label, " after filter: ", nrow(dge))

        write.table(dge$counts,
                    file.path(outdir, paste0(ct, "_filtered_raw_counts.txt")),
                    sep="\t", quote=FALSE, col.names=NA)

        # ── 9. TMM normalization ───────────────────────────────────────────
        dge <- calcNormFactors(dge, method="TMM")

        # ── 10. Optional batch correction ──────────────────────────────────
        if (as.logical("${batch_correction}") && "sequencingBatch" %in% colnames(dge$samples)) {
            batches       <- dge$samples$sequencingBatch
            batch_counts  <- table(batches)
            valid_batches <- names(batch_counts[batch_counts > 1])
            keep_bc       <- batches %in% valid_batches
            dge           <- dge[, keep_bc, keep.lib.sizes=FALSE]
            batches       <- batches[keep_bc]
            message("Samples after singleton batch removal: ", ncol(dge))

            if ("${batch_method}" == "combat") {
                logCPM     <- cpm(dge, log=TRUE, prior.count=1)
                logCPM     <- ComBat(dat=logCPM, batch=factor(batches))
                dge$counts <- round(pmax(2^logCPM, 0))
                message("ComBat applied on log-CPM.")
            } else {
                logCPM     <- cpm(dge, log=TRUE, prior.count=1)
                logCPM     <- removeBatchEffect(logCPM, batch=factor(batches))
                dge$counts <- round(pmax(2^logCPM, 0))
                message("limma removeBatchEffect applied.")
            }
        }

        # ── 11. Add batch vars to model if multi-level ────────────────────
        batch_vars <- c()
        if ("sequencingBatch" %in% colnames(dge$samples) &&
            length(unique(dge$samples$sequencingBatch)) > 1) {
            dge$samples$sequencingBatch_factor <- factor(dge$samples$sequencingBatch)
            batch_vars <- c(batch_vars, "sequencingBatch_factor")
        }
        if ("Library" %in% colnames(dge$samples) &&
            length(unique(dge$samples$Library)) > 1) {
            dge$samples$Library_factor <- factor(dge$samples$Library)
            batch_vars <- c(batch_vars, "Library_factor")
        }

        # ── 12. Build design matrix ────────────────────────────────────────
        all_model_vars <- intersect(c(tech_vars, batch_vars), colnames(dge$samples))
        form           <- as.formula(paste("~", paste(all_model_vars, collapse=" + ")))
        design         <- model.matrix(form, data=dge$samples)
        message("Formula: ", deparse(form))

        if (!is.fullrank(design)) {
            message("Design not full rank - trimming.")
            qr_d   <- qr(design)
            design <- design[, qr_d$pivot[seq_len(qr_d$rank)], drop=FALSE]
        }
        message("Design matrix: ", nrow(design), " x ", ncol(design))

        # ── 13. Voom + lmFit + eBayes ─────────────────────────────────────
        v   <- voom(dge, design, plot=FALSE)
        fit <- lmFit(v, design)
        fit <- eBayes(fit)

        # ── 14. Offset + residuals ─────────────────────────────────────────
        off   <- predictOffset(fit, tech_vars=tech_vars)
        res   <- residuals(fit, v$E)
        final <- off + res

        # ── 15. Save residuals ─────────────────────────────────────────────
        out_file <- file.path(outdir, paste0(ct, "_residuals.txt"))
        write.table(final, out_file, sep="\t", quote=FALSE, col.names=NA)
        message("Saved: ", out_file)
        message("  ", ifelse(is_atac,"Peaks","Genes"), ": ", nrow(final), " | Samples: ", ncol(final))

        # ── 16. Optional quantile normalization ───────────────────────────
        if (as.logical("${quant_norm}")) {
            final_qn <- t(apply(final, 1, rank, ties.method="average"))
            final_qn <- stats::qnorm(final_qn / (ncol(final_qn) + 1))
            qn_file  <- file.path(outdir, paste0(ct, "_residuals_qn.txt"))
            write.table(final_qn, qn_file, sep="\t", quote=FALSE, col.names=NA)
            message("Saved QN: ", qn_file)

            saveRDS(list(
                dge=dge, offset=off, residuals=res,
                final_data=final, final_data_qn=final_qn,
                valid_samples=colnames(dge), design=design, fit=fit, model=form,
                tech_vars=tech_vars, batch_vars=batch_vars,
                batch_correction=as.logical("${batch_correction}"),
                batch_method=ifelse(as.logical("${batch_correction}"), "${batch_method}", "none"),
                quant_norm=TRUE,
                modality=ifelse(is_atac, "snATAC-seq", "snRNA-seq")
            ), file.path(outdir, paste0(ct, "_results_qn.rds")))
        } else {
            saveRDS(list(
                dge=dge, offset=off, residuals=res,
                final_data=final,
                valid_samples=colnames(dge), design=design, fit=fit, model=form,
                tech_vars=tech_vars, batch_vars=batch_vars,
                batch_correction=as.logical("${batch_correction}"),
                batch_method=ifelse(as.logical("${batch_correction}"), "${batch_method}", "none"),
                quant_norm=FALSE,
                modality=ifelse(is_atac, "snATAC-seq", "snRNA-seq")
            ), file.path(outdir, paste0(ct, "_results.rds")))
        }

        message("Completed: ", ct, " -> ", outdir)
    }

phenotype_reformatting#

[phenotype_formatting]
parameter: residual_files = []
parameter: output_dir     = str

import os

_cts = [os.path.basename(os.path.dirname(f)) for f in residual_files]

input:  residual_files
output: [f'{output_dir}/3_pheno_reformat/{ct}_phenotype.bed.gz' for ct in _cts]

task: trunk_workers = 1, trunk_size = 1, walltime = '2:00:00', mem = '16G', cores = 2

python: expand = "${ }", stderr = f'{_output[0]:n}.stderr', stdout = f'{_output[0]:n}.stdout'
    import os
    import subprocess
    import pandas as pd

    residual_files = ${residual_files}
    output_dir     = "${output_dir}"

    def read_residuals(path):
        first_line = open(path).readline().rstrip("\n")
        col_names  = first_line.split("\t")
        df = pd.read_csv(path, sep="\t", header=None, skiprows=1)
        if df.shape[1] > len(col_names):
            peak_ids   = df.iloc[:, 0].values
            df         = df.iloc[:, 1:]
            df.columns = col_names
        else:
            peak_ids   = df.iloc[:, 0].values
            df         = df.iloc[:, 1:]
            df.columns = col_names[1:]
        return peak_ids, df

    def to_midpoint_bed(peak_ids, residuals):
        parts  = pd.Series(peak_ids).str.split("-", expand=True)
        chrs   = parts[0].values
        starts = parts[1].astype(int).values
        ends   = parts[2].astype(int).values
        mids   = ((starts + ends) // 2).astype(int)
        bed = pd.DataFrame({
            "#chr":  chrs,
            "start": mids,
            "end":   mids + 1,
            "ID":    peak_ids
        })
        bed = pd.concat([bed, residuals.reset_index(drop=True)], axis=1)
        return bed.sort_values(["#chr", "start"]).reset_index(drop=True)

    def run_cmd(cmd, label):
        r = subprocess.run(cmd, capture_output=True)
        if r.returncode != 0:
            print(f"WARNING: {label} failed: {r.stderr.decode()}")
        else:
            print(f"{label}: OK")

    out_dir = os.path.join(output_dir, "3_pheno_reformat")
    os.makedirs(out_dir, exist_ok=True)

    for res_path in residual_files:
        ct = os.path.basename(os.path.dirname(res_path))

        print(f"\n{'='*40}\nPhenotype Formatting: {ct}\n{'='*40}")

        if not os.path.exists(res_path):
            print(f"WARNING: {res_path} not found, skipping.")
            continue

        peak_ids, residuals = read_residuals(res_path)
        print(f"Loaded {len(peak_ids)} peaks x {residuals.shape[1]} samples")

        bed     = to_midpoint_bed(peak_ids, residuals)
        out_bed = os.path.join(out_dir, f"{ct}_phenotype.bed")
        bed.to_csv(out_bed, sep="\t", index=False, float_format="%.15f")
        print(f"Written: {out_bed}")

        run_cmd(["bgzip", "-f", out_bed],                "bgzip")
        run_cmd(["tabix", "-p", "bed", f"{out_bed}.gz"], "tabix")
        print(f"Completed: {ct} -> {out_dir}")