Single-nuclei Pseudobulk Preprocessing (RNA-seq and ATAC-seq) Pipeline#

Overview#

This pipeline preprocesses single-nuclei pseudobulk count data (snATAC-seq or snRNA-seq) for downstream QTL analysis and region-specific studies.

Goals:

  • Aggregate single-cell counts into pseudobulk count matrices

  • Transform raw pseudobulk counts into analysis-ready formats

  • Remove technical confounders

  • Generate QTL-ready phenotype files or region-specific datasets

Pipeline Structure#

Step 1: Pseudobulk count matrix generation                [pseudobulk_count]
                    ↓
Step 2: Sample ID Mapping                                 [sampleid_mapping]
                    ↓
Step 3: Pseudobulk QC                                     [pseudobulk_qc]
        (optional) Region Peak/Gene Filtering          
        (optional) Batch Correction (ComBat or limma)
        (optional) Quantile Normalization
                    ↓
Step 4: Phenotype Reformatting  → BED                     [phenotype_formatting]
        (genome-wide QTL mapping, snATAC-seq only)  

Modality Support#

Feature

snATAC-seq

snRNA-seq

Pseudobulk count generation

TBD

Sample ID mapping

Region/gene filtering

✓ (--regions)

✓ (--gene-list)

Blacklist filtering

pseudobulk_qc step

phenotype_formatting step

— (refer to this pipeline)

Input Files#

All toy input files required to run this pipeline can be downloaded here.

File

Used in

celltyped_seuratobj{i}.rds

Step 1

pseudobulk_peaks_counts_{celltype}.csv.gz (snATAC-seq)

Step 2, Step 3

pseudobulk_counts_{celltype}.csv.gz (snRNA-seq)

Step 2, Step 3

metadata_{celltype}.csv

Step 2, Step 3

rosmap_sample_mapping_data.csv

Step 2

tech_vars_{celltype}.csv

Step 2

hg38-blacklist.v2.bed.gz

Step 2 (snATAC-seq only)

Minimal Working Example#

Step 1: Pseudobulk Count Matrix Generation#

Aggregates single-nuclei counts into pseudobulk count matrices per cell type from Seurat objects.

This step is upstream of sampleid_mapping and pseudobulk_qc. Output feeds directly into the existing preprocessing pipeline.

Input#

File

Description

celltyped_seuratobj{i}.rds

Seurat objects with celltype and sample annotations in meta.data

Process#

  1. Load each Seurat object and subset to target cell type — skips objects where cell type is not present

  2. Merge all subsets across objects and join layers

  3. Aggregate raw counts by sample (AggregateExpression)

  4. Filter out samples with fewer than min_cells cells (default: 10)

  5. Strip Ensembl version suffixes from gene IDs (ENSG00000000010.1ENSG00000000010)

  6. Save as pseudobulk_counts_{celltype}.csv.gz — raw counts only, normalization handled downstream in pseudobulk_qc

GLU cell type: Due to its large size, process in two batches (files 1–6 and 7–11) and pass separately.

Parameters#

Parameter

Default

Description

seurat_files

required

One or more Seurat .rds files

output_dir

required

Output directory for count matrix

celltype

MIC

Cell type to extract (must match celltype column in meta.data)

min_cells

10

Minimum number of cells per sample to retain

Output#

File

Description

pseudobulk_counts_{celltype}.csv.gz

Raw pseudobulk count matrix (genes × samples)

Timing: 10–30 min per cell type depending on object size

sos run pipeline/pseudobulk_preprocessing.ipynb pseudobulk_counts \
    --seurat-files /restricted/projectnb/xqtl/ROSMAP_snRNAseq_newreference/singleR_results/celltyped_seuratobj11.rds \
    --output-dir output/snrna_seq \
    --celltype MIC 

Step 1: Sample ID Mapping#

Maps original sample identifiers (individualID) to standardized sample IDs (sampleid) across metadata and count matrix files.

Input#

File

Description

rosmap_sample_mapping_data.csv

Mapping reference: individualID sampleid

metadata_{celltype}.csv

Per-cell-type sample metadata

pseudobulk_peaks_counts_{celltype}.csv.gz (snATAC-seq)

Per-cell-type peak count matrices

pseudobulk_counts_{celltype}.csv.gz (snRNA-seq)

Per-cell-type gene count matrices

Process#

Part 1 — Metadata files

For each metadata file:

  1. Look up each individualID in the mapping reference

  2. Assign sampleid — falls back to individualID if no mapping found

  3. Reorder columns: sampleid first, then individualID, then the rest

  4. Save updated file

Part 2 — Count matrix files

For each count file:

  1. Extract the header row (column names only)

  2. Keep the first column (peak or gene IDs) unchanged

  3. Map remaining column names (individualIDsampleid) where mapping exists, otherwise keep original

  4. Write new header and stream data rows unchanged

  5. Recompress with gzip

Parameters#

Parameter

Default

Description

map_file

required

CSV with individualIDsampleid mapping

meta_files

required

Metadata CSV files to remap

count_files

required

Count CSV.gz files to remap

output_dir

required

Parent output directory; writes to {output_dir}/1_files_with_sampleid/

Output#

Output directory: {output_dir}/1_files_with_sampleid/

File

Description

metadata_{celltype}.csv

Metadata with sampleid column prepended

pseudobulk_peaks_counts_{celltype}.csv.gz (snATAC-seq)

Count matrices with mapped column headers

pseudobulk_counts_{celltype}.csv.gz (snRNA-seq)

Count matrices with mapped column headers

Timing: < 1 min

sos run pipeline/pseudobulk_preprocessing.ipynb sampleid_mapping \
    --output-dir output/snatac_seq \
    --map-file data/rosmap_sample_mapping_data.csv \
    --meta-files data/snatac_seq/metadata_Mic_50nuc.csv \
    --count-files data/snatac_seq/0_pseudobulk_count/pseudobulk_peaks_counts_Mic_50nuc.csv.gz

Step 2: Pseudobulk QC#

Regresses out technical covariates for downstream QTL analysis. Works for both snATAC-seq and snRNA-seq.

Input#

File

Description

metadata_{celltype}.csv

Sample-level metadata (nuclei counts, batch info)

pseudobulk_*counts_{celltype}.csv.gz

Pseudobulk count matrix

tech_vars.csv

Technical covariates (sampleid + tech var columns, pre-processed)

hg38-blacklist.v2.bed.gz (snATAC-seq, optional)

Blacklisted genomic regions

Process#

  1. Load count matrix and auto-detect modality (snATAC-seq vs snRNA-seq)

  2. (Optional) Filter to specific genomic regions (snATAC-seq) or gene list (snRNA-seq)

  3. Load metadata; filter samples with fewer than min_nuclei nuclei (default: 20)

  4. Align samples between metadata and count matrix

  5. (Optional) Filter blacklisted genomic regions (snATAC-seq only)

  6. Merge tech vars from tech_vars_file by sampleid

  7. Drop samples with NA in any tech var

  8. Apply expression filtering (filterByExpr):

    • min_count = 5: minimum reads in at least one sample

    • min_total_count = 15: minimum total reads across all samples

    • min_prop = 0.1: feature expressed in ≥10% of samples

  9. TMM normalization

  10. (Optional) Batch correction on sequencingBatch:

    • limma::removeBatchEffect (default)

    • ComBat (on log-CPM)

  11. Add sequencingBatch and Library to model if present and multi-level

  12. Fit linear model (voom + lmFit + eBayes) with tech vars + batch vars only

  13. Compute offset + residuals as final adjusted values:

    • offset: intercept + batch effects at reference level

    • residuals: variation after removing technical effects; biological signal retained

  14. (Optional) Quantile normalization of final values

Model formula:

~ {tech_vars} + [sequencingBatch] + [Library]

sequencingBatch and Library included only if present and have more than one level. Biological variables (pmi, study, msex, age_death etc.) are not included — they should not be regressed out as they may be associated with genotype.

Parameters#

Parameter

Default

Description

meta_files

required

Metadata CSV files (one per cell type)

count_files

required

Count CSV.gz files (one per cell type, same order as meta_files)

output_dir

required

Parent output directory; writes to {output_dir}/2_residuals/{ct}/

tech_vars_file

required

CSV with sampleid + tech var columns

blacklist_file

''

Genomic blacklist BED file (snATAC-seq only)

regions

''

Comma-separated genomic regions e.g. chr7:28000000-28300000 (snATAC-seq)

gene_list

''

Comma-separated gene IDs e.g. ENSG00000000010 (snRNA-seq)

batch_correction

FALSE

Apply batch correction (TRUE/FALSE)

batch_method

limma

Batch correction method (limma or combat)

quant_norm

FALSE

Apply quantile normalization after residuals

min_count

5

Min reads in at least one sample

min_total_count

15

Min total reads across all samples

min_prop

0.1

Min proportion of samples with expression

min_nuclei

20

Min nuclei per sample

Output#

Output directory: {output_dir}/2_residuals/{celltype}/

File

Description

{celltype}_residuals.txt

Tech-covariate-adjusted values (log2-CPM)

{celltype}_residuals_qn.txt

Quantile-normalized adjusted values (if quant_norm=TRUE)

{celltype}_results.rds

Full results: DGEList, fit, offset, residuals, design, parameters

{celltype}_filtered_raw_counts.txt

Filtered raw counts before normalization

Timing: < 5 min per cell type

Pseudobulk QC#

#snATAC-seq
sos run pipeline/pseudobulk_preprocessing.ipynb pseudobulk_qc \
    --meta-files output/snatac_seq/1_files_with_sampleid/metadata_Mic_50nuc.csv \
    --count-files output/snatac_seq/1_files_with_sampleid/pseudobulk_peaks_counts_Mic_50nuc.csv.gz  \
    --output-dir output/snatac_seq \
    --tech-vars-file data/snatac_seq/tech_vars_MIC.csv \
    --blacklist-file data/hg38-blacklist.v2.bed.gz #only for snATAC-seq

#snRNA-seq
sos run pipeline/pseudobulk_preprocessing.ipynb pseudobulk_qc \
    --meta-files output/snrna_seq/1_files_with_sampleid/metadata_MIC.csv \
    --count-files output/snrna_seq/1_files_with_sampleid/pseudobulk_counts_MIC.csv.gz  \
    --output-dir output/snrna_seq \
    --tech-vars-file data/snrna_seq/tech_vars_MIC.csv \
    --gene-list ENSG00000000010,ENSG00000000020 

Additional parameters#

--min-count 5
--min-total-count 15
--min-prop 0.1
--min-nuclei 20
--quant-norm TRUE
--batch-correction TRUE 
--batch-method combat # or limma
--gene-list ENSG00000000010,ENSG00000000020  # for snRNA-seq
--regions chr7:28000000-28300000  # for snATAC-seq

Step 3: Phenotype Reformatting (snATAC-seq only)#

Converts residuals into a QTL-ready BED format for genome-wide caQTL mapping.

For snRNA-seq, please follow this pipeline.

Input#

File

Description

{celltype}_residuals.txt

Residuals from pseudobulk_qc

Process#

  1. Read residuals file with proper handling of feature IDs and sample columns

  2. Parse peak coordinates from peak IDs (chr-start-end format)

  3. Convert to midpoint coordinates (standard for QTLtools):

start = floor((peak_start + peak_end) / 2)
end   = start + 1
  1. Build BED format: #chr, start, end, ID followed by per-sample values

  2. Sort by chromosome and position

  3. Compress with bgzip and index with tabix

Parameters#

Parameter

Default

Description

residual_files

required

Residual txt files from pseudobulk_qc

output_dir

required

Parent output directory; writes to {output_dir}/3_pheno_reformat/

Output#

Output directory: {output_dir}/3_pheno_reformat/

File

Description

{celltype}_phenotype.bed.gz

bgzip-compressed BED with midpoint coordinates

{celltype}_phenotype.bed.gz.tbi

tabix index for random-access queries

Compatible with FastQTL, TensorQTL, and QTLtools.

Timing: < 1 min per cell type

sos run pipeline/pseudobulk_preprocessing.ipynb phenotype_formatting \
    --residual-files output/snatac_seq/2_residuals/Mic_50nuc/Mic_50nuc_residuals.txt \
    --output-dir output/snatac_seq

Command interface#

sos run pipeline/pseudobulk_preprocessing.ipynb -h

Setup and global parameters#

[global]
parameter: cwd = path("output")
parameter: job_size = 1
parameter: walltime = "5h"
parameter: mem = "16G"
parameter: numThreads = 8
parameter: container = ""

import re
from sos.utils import expand_size

) if container else ""

cwd = path(f'{cwd:a}')
usage: sos run pipeline/pseudobulk_preprocessing.ipynb
               [workflow_name | -t targets] [options] [workflow_options]
  workflow_name:        Single or combined workflows defined in this script
  targets:              One or more targets to generate
  options:              Single-hyphen sos parameters (see "sos run -h" for details)
  workflow_options:     Double-hyphen workflow-specific parameters
Workflows:
  sampleid_mapping
  pseudobulk_qc
  phenotype_formatting
Global Workflow Options:
  --cwd output (as path)
  --job-size 1 (as int)
  --walltime 5h
  --mem 16G
  --numThreads 8 (as int)
  --container ''
Sections
  sampleid_mapping:
    Workflow Options:
      --map-file VAL (as str, required)
      --output-dir VAL (as str, required)
      --meta-files  (as list)
      --count-files  (as list)
  pseudobulk_qc:
    Workflow Options:
      --meta-files  (as list)
      --count-files  (as list)
      --output-dir VAL (as str, required)
      --tech-vars-file VAL (as str, required)
      --blacklist-file ''
      --batch-correction FALSE
      --batch-method limma
      --quant-norm FALSE
      --min-count 5 (as int)
      --min-total-count 15 (as int)
      --min-prop 0.1 (as float)
      --min-nuclei 20 (as int)
      --regions ''
      --gene-list ''
  phenotype_formatting:
    Workflow Options:
      --residual-files  (as list)
      --output-dir VAL (as str, required)

pseudobulk_counts#

[pseudobulk_counts]
parameter: seurat_files = []
parameter: output_dir   = str
parameter: celltype     = 'MIC'
parameter: min_cells    = 10

import os

input:  seurat_files
output: f'{output_dir}/0_pseudobulk_counts/pseudobulk_counts_{celltype}.csv.gz'

task: trunk_workers = 1, trunk_size = 1, walltime = '4:00:00', mem = '64G', cores = 4

R: expand = "${ }", stdout = f'{_output:n}.stdout', stderr = f'{_output:n}.stderr'

    library(Seurat)
    library(data.table)

    seurat_files <- c(${','.join([f'"{f}"' for f in seurat_files])})
    celltype     <- "${celltype}"
    min_cells    <- ${min_cells}
    output_dir   <- "${output_dir}"
    out_file     <- "${_output}"

    message("Loading and subsetting Seurat objects for: ", celltype)

    # ── 1. Load and subset each Seurat object ─────────────────────
    subsets <- list()
    for (f in seurat_files) {
        message("Loading: ", basename(f))
        obj <- readRDS(f)
        if (celltype %in% obj$celltype) {
            subsets[[length(subsets) + 1]] <- subset(obj, subset = celltype == celltype)
        } else {
            message("  Skipping - celltype not found in: ", basename(f))
        }
        rm(obj)
        gc()
    }

    if (length(subsets) == 0) stop("No Seurat objects contain celltype: ", celltype)
    message("Found ", celltype, " in ", length(subsets), " objects")

    # ── 2. Merge and aggregate ────────────────────────────────────
    merged <- Reduce(merge, subsets)
    rm(subsets)
    gc()

    merged <- JoinLayers(merged)
    merged <- SetIdent(merged, value = "sample")

    cell_counts <- table(merged@meta.data$sample)
    expr        <- AggregateExpression(merged, group.by = "sample", slot = "counts")$RNA
    rm(merged)
    gc()

    # ── 3. Filter samples with < min_cells ────────────────────────
    valid_samples <- names(cell_counts[cell_counts >= min_cells])
    expr          <- expr[, valid_samples]
    message("Samples after min_cells (>= ", min_cells, ") filter: ", ncol(expr))

    # ── 4. Strip Ensembl version suffixes ─────────────────────────
    rownames(expr) <- gsub("\\..*$", "", rownames(expr))

    # ── 5. Save as csv.gz ─────────────────────────────────────────
    message("Genes: ", nrow(expr), " | Samples: ", ncol(expr))
    dt <- data.table(gene_id = rownames(expr), as.data.frame(expr))
    fwrite(dt, out_file, compress = "gzip")
    message("Saved: ", out_file)

sampleid_mapping#

[sampleid_mapping]
parameter: map_file    = str
parameter: output_dir  = str
parameter: meta_files  = []
parameter: count_files = []

import os

input:  meta_files + count_files
output: [f'{output_dir}/1_files_with_sampleid/{os.path.basename(f)}' for f in meta_files + count_files]
         
python: expand = "${ }"
import pandas as pd
import gzip
import os
import subprocess
import csv
import numpy as np
import tempfile

map_df = pd.read_csv("${map_file}")
id_map = dict(zip(map_df["individualID"], map_df["sampleid"]))
output_dir  = "${output_dir}/1_files_with_sampleid"
meta_files  = ${meta_files}
count_files = ${count_files}

os.makedirs(output_dir, exist_ok=True)

def map_id(ind_id):
    return id_map.get(ind_id, ind_id)

def format_value(val):
    if pd.isna(val):
        return ''
    if isinstance(val, (int, np.integer)):
        return str(val)
    if isinstance(val, (float, np.floating)):
        if val == int(val):
            return str(int(val))
        else:
            return str(val)
    return str(val)

# ── Process metadata ───────────────────────────────────────────────────────
for in_path in meta_files:
    fname    = os.path.basename(in_path)
    out_path = os.path.join(output_dir, fname)
    meta = pd.read_csv(in_path)
    if "individualID" not in meta.columns:
        print(f"Warning: individualID column not found in {fname}, skipping.")
        continue
    meta["sampleid"] = meta["individualID"].map(map_id)
    cols = meta.columns.tolist()
    cols.remove("sampleid")
    cols.remove("individualID")
    meta = meta[["sampleid", "individualID"] + cols]
    with open(out_path, 'w', newline='') as f:
        writer = csv.writer(f, quoting=csv.QUOTE_MINIMAL)
        writer.writerow(meta.columns)
        for _, row in meta.iterrows():
            writer.writerow([format_value(val) for val in row])

# ── Process count files ────────────────────────────────────────────────────
for in_path in count_files:
    fname    = os.path.basename(in_path)
    out_path = os.path.join(output_dir, fname)
    with gzip.open(in_path, "rt") as fh:
        header_line = fh.readline().rstrip("\n")
    col_names       = header_line.split(",")
    peak_id_col     = col_names[0]
    new_sample_cols = [map_id(s) for s in col_names[1:]]
    new_header      = ",".join([peak_id_col] + new_sample_cols)
    tmp = tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt')
    tmp.write(new_header + "\n")
    tmp.close()
    cmd = f"zcat {in_path} | tail -n +2 | cat {tmp.name} - | gzip -6 > {out_path}"
    subprocess.run(cmd, shell=True, check=True)
    os.unlink(tmp.name)

pseudobulk_qc#

[pseudobulk_qc]
parameter: meta_files       = []
parameter: count_files      = []
parameter: output_dir       = str
parameter: tech_vars_file   = str
parameter: blacklist_file   = ''
parameter: batch_correction = "FALSE"
parameter: batch_method     = "limma"
parameter: quant_norm       = "FALSE"
parameter: min_count        = 5
parameter: min_total_count  = 15
parameter: min_prop         = 0.1
parameter: min_nuclei       = 20
parameter: regions          = ''
parameter: gene_list        = ''

import os

_cts = [os.path.basename(f).replace('metadata_','').replace('.csv','') for f in meta_files]

input:  meta_files + count_files
output: [f'{output_dir}/2_residuals/{ct}/{ct}_residuals.txt' for ct in _cts]

task: trunk_workers = 1, trunk_size = 1, walltime = '6:00:00', mem = '64G', cores = 4

R: expand = "${ }", stdout = f'{_output[0]:n}.stdout', stderr = f'{_output[0]:n}.stderr'

    library(edgeR)
    library(limma)
    library(data.table)
    library(GenomicRanges)
    if (as.logical("${batch_correction}") && "${batch_method}" == "combat") library(sva)

    # ── predictOffset ──────────────────────────────────────────────────────
    predictOffset <- function(fit, tech_vars) {
        D  <- fit$design
        Dm <- D
        for (col in colnames(D)) {
            if (col == "(Intercept)") next
            is_tech <- any(sapply(tech_vars, function(v) grepl(paste0("^", v), col)))
            if (is_tech) {
                if (is.numeric(D[, col]) && !all(D[, col] %in% c(0, 1)))
                    Dm[, col] <- median(D[, col], na.rm=TRUE)
                else
                    Dm[, col] <- 0
            } else {
                Dm[, col] <- 0
            }
        }
        B <- fit$coefficients
        B[is.na(B)] <- 0
        off <- B %*% t(Dm)
        colnames(off) <- rownames(fit$design)
        return(off)
    }

    filter_blacklist <- function(mat, bed, feat_label) {
        peaks <- data.table(id = rownames(mat))
        peaks[, c("chr","start","end") := tstrsplit(gsub("_","-",id), "-")]
        peaks[, `:=`(start = as.numeric(start), end = as.numeric(end))]
        bl <- fread(bed)[, 1:3]
        setnames(bl, c("chr","start","end"))
        bl[, `:=`(start = as.numeric(start), end = as.numeric(end))]
        gr1 <- GRanges(peaks$chr, IRanges(peaks$start, peaks$end))
        gr2 <- GRanges(bl$chr,    IRanges(bl$start,    bl$end))
        blacklisted <- unique(queryHits(findOverlaps(gr1, gr2)))
        if (length(blacklisted) > 0) {
            message("Blacklisted ", feat_label, " removed: ", length(blacklisted))
            return(mat[-blacklisted, , drop=FALSE])
        }
        return(mat)
    }

    parse_regions <- function(region_str) {
        if (is.null(region_str) || region_str == "") return(NULL)
        lapply(strsplit(region_str, ",")[[1]], function(r) {
            parts <- strsplit(trimws(r), ":|−|-")[[1]]
            list(chr=parts[1], start=as.integer(parts[2]), end=as.integer(parts[3]))
        })
    }

    filter_regions <- function(mat, regions) {
        peaks <- data.table(id = rownames(mat))
        peaks[, c("chr","start","end") := tstrsplit(gsub("_","-",id), "-")]
        peaks[, `:=`(start = as.integer(start), end = as.integer(end))]
        gr_peaks <- GRanges(peaks$chr, IRanges(peaks$start, peaks$end))
        gr_regions <- GRanges(
            sapply(regions, `[[`, "chr"),
            IRanges(sapply(regions, `[[`, "start"), sapply(regions, `[[`, "end"))
        )
        keep <- unique(queryHits(findOverlaps(gr_peaks, gr_regions)))
        if (length(keep) == 0) stop("No peaks overlap the specified regions.")
        message("Peaks after region filter: ", length(keep))
        mat[keep, , drop=FALSE]
    }

    meta_files  <- c(${','.join([f'"{f}"' for f in meta_files])})
    count_files <- c(${','.join([f'"{f}"' for f in count_files])})

    if (length(meta_files) != length(count_files))
        stop("meta_files and count_files must have the same length and order.")

    # ── Load tech vars from file ───────────────────────────────────────────
    tech_df   <- fread("${tech_vars_file}")
    tech_vars <- setdiff(colnames(tech_df), "sampleid")
    message("Tech vars: ", paste(tech_vars, collapse=", "))

    regions   <- parse_regions("${regions}")
    gene_list <- trimws(strsplit("${gene_list}", ",")[[1]])
    gene_list <- gene_list[gene_list != ""]

    for (i in seq_along(meta_files)) {
        meta_file   <- meta_files[i]
        counts_file <- count_files[i]
        ct          <- sub("\\.csv$", "", sub("^metadata_", "", basename(meta_file)))

        message("\n", paste(rep("=", 40), collapse=""))
        message("Processing: ", ct)
        message("Batch correction: ", ifelse(as.logical("${batch_correction}"), "${batch_method}", "none"))
        message("Quantile normalization: ", as.logical("${quant_norm}"))
        message(paste(rep("=", 40), collapse=""))

        outdir <- file.path("${output_dir}/2_residuals", ct)
        dir.create(outdir, recursive=TRUE, showWarnings=FALSE)

        # ── 1. Load counts ─────────────────────────────────────────────────
        counts_raw       <- fread(counts_file)
        counts           <- as.matrix(counts_raw[, -1, with=FALSE])
        rownames(counts) <- counts_raw[[1]]
        rm(counts_raw)

        # ── Auto-detect modality ───────────────────────────────────────────
        is_atac    <- grepl("^chr.*-[0-9]+-[0-9]+$", rownames(counts)[1])
        feat_label <- ifelse(is_atac, "peaks", "genes")
        message("Modality: ", ifelse(is_atac, "snATAC-seq", "snRNA-seq"))
        message("Loaded: ", nrow(counts), " ", feat_label, " x ", ncol(counts), " samples")

        # ── 1b. Region/gene filtering (optional) ──────────────────────────
        if (is_atac && !is.null(regions)) {
            message("Filtering peaks to specified regions...")
            counts <- filter_regions(counts, regions)
        } else if (!is_atac && length(gene_list) > 0) {
            genes_present <- intersect(rownames(counts), gene_list)
            if (length(genes_present) == 0) stop("No matching genes found in count matrix.")
            message("Genes after gene_list filter: ", length(genes_present))
            counts <- counts[genes_present, , drop=FALSE]
        }

        # ── 2. Load metadata ───────────────────────────────────────────────
        meta  <- fread(meta_file)
        idcol <- intersect(c("sampleid","sampleID","individualID","projid"), colnames(meta))[1]
        if (is.na(idcol)) stop("Cannot find sample ID column in metadata.")

        # ── 3. Nuclei filter ──────────────────────────────────────────────
        n_nuclei_col <- intersect(c("n_nuclei","n.nuclei","nNuclei","nuclei_count"), colnames(meta))[1]
        if (!is.na(n_nuclei_col)) {
            meta <- meta[meta[[n_nuclei_col]] > ${min_nuclei}]
            message("Samples after nuclei (>${min_nuclei}) filter: ", nrow(meta))
        }

        # ── 4. Align samples ──────────────────────────────────────────────
        common <- intersect(meta[[idcol]], colnames(counts))
        if (length(common) == 0) stop("Zero sample overlap between metadata and count matrix.")
        counts <- counts[, common, drop=FALSE]
        message("Samples after alignment: ", length(common))

        # ── 5. Blacklist filtering ─────────────────────────────────────────
        if ("${blacklist_file}" != "" && file.exists("${blacklist_file}")) {
            counts <- filter_blacklist(counts, "${blacklist_file}", feat_label)
            message(feat_label, " after blacklist filter: ", nrow(counts))
        } else {
            message("No blacklist file - skipping.")
        }

        # ── 6. Merge tech vars by sampleid ────────────────────────────────
        tech_sub <- tech_df[tech_df$sampleid %in% common]
        tech_sub <- tech_sub[match(common, tech_sub$sampleid)]

        # ── 7. Drop samples with NA in tech vars ──────────────────────────
        keep_rows <- complete.cases(tech_sub[, ..tech_vars])
        tech_sub  <- tech_sub[keep_rows]
        counts    <- counts[, tech_sub$sampleid, drop=FALSE]
        message("Valid samples for modelling: ", nrow(tech_sub))

        # ── 8. Expression filtering ────────────────────────────────────────
        dge <- DGEList(counts=counts, samples=tech_sub)
        dge$samples$group <- factor(rep("all", ncol(dge)))
        message(feat_label, " before filter: ", nrow(dge))

        keep <- filterByExpr(dge, group=dge$samples$group,
                             min.count=${min_count},
                             min.total.count=${min_total_count},
                             min.prop=${min_prop})
        dge <- dge[keep,, keep.lib.sizes=FALSE]
        message(feat_label, " after filter: ", nrow(dge))

        write.table(dge$counts,
                    file.path(outdir, paste0(ct, "_filtered_raw_counts.txt")),
                    sep="\t", quote=FALSE, col.names=NA)

        # ── 9. TMM normalization ───────────────────────────────────────────
        dge <- calcNormFactors(dge, method="TMM")

        # ── 10. Optional batch correction ──────────────────────────────────
        if (as.logical("${batch_correction}") && "sequencingBatch" %in% colnames(dge$samples)) {
            batches       <- dge$samples$sequencingBatch
            batch_counts  <- table(batches)
            valid_batches <- names(batch_counts[batch_counts > 1])
            keep_bc       <- batches %in% valid_batches
            dge           <- dge[, keep_bc, keep.lib.sizes=FALSE]
            batches       <- batches[keep_bc]
            message("Samples after singleton batch removal: ", ncol(dge))

            if ("${batch_method}" == "combat") {
                logCPM     <- cpm(dge, log=TRUE, prior.count=1)
                logCPM     <- ComBat(dat=logCPM, batch=factor(batches))
                dge$counts <- round(pmax(2^logCPM, 0))
                message("ComBat applied on log-CPM.")
            } else {
                logCPM     <- cpm(dge, log=TRUE, prior.count=1)
                logCPM     <- removeBatchEffect(logCPM, batch=factor(batches))
                dge$counts <- round(pmax(2^logCPM, 0))
                message("limma removeBatchEffect applied.")
            }
        }

        # ── 11. Add batch vars to model if multi-level ────────────────────
        batch_vars <- c()
        if ("sequencingBatch" %in% colnames(dge$samples) &&
            length(unique(dge$samples$sequencingBatch)) > 1) {
            dge$samples$sequencingBatch_factor <- factor(dge$samples$sequencingBatch)
            batch_vars <- c(batch_vars, "sequencingBatch_factor")
        }
        if ("Library" %in% colnames(dge$samples) &&
            length(unique(dge$samples$Library)) > 1) {
            dge$samples$Library_factor <- factor(dge$samples$Library)
            batch_vars <- c(batch_vars, "Library_factor")
        }

        # ── 12. Build design matrix ────────────────────────────────────────
        all_model_vars <- intersect(c(tech_vars, batch_vars), colnames(dge$samples))
        form           <- as.formula(paste("~", paste(all_model_vars, collapse=" + ")))
        design         <- model.matrix(form, data=dge$samples)
        message("Formula: ", deparse(form))

        if (!is.fullrank(design)) {
            message("Design not full rank - trimming.")
            qr_d   <- qr(design)
            design <- design[, qr_d$pivot[seq_len(qr_d$rank)], drop=FALSE]
        }
        message("Design matrix: ", nrow(design), " x ", ncol(design))

        # ── 13. Voom + lmFit + eBayes ─────────────────────────────────────
        v   <- voom(dge, design, plot=FALSE)
        fit <- lmFit(v, design)
        fit <- eBayes(fit)

        # ── 14. Offset + residuals ─────────────────────────────────────────
        off   <- predictOffset(fit, tech_vars=tech_vars)
        res   <- residuals(fit, v$E)
        final <- off + res

        # ── 15. Save residuals ─────────────────────────────────────────────
        out_file <- file.path(outdir, paste0(ct, "_residuals.txt"))
        write.table(final, out_file, sep="\t", quote=FALSE, col.names=NA)
        message("Saved: ", out_file)
        message("  ", ifelse(is_atac,"Peaks","Genes"), ": ", nrow(final), " | Samples: ", ncol(final))

        # ── 16. Optional quantile normalization ───────────────────────────
        if (as.logical("${quant_norm}")) {
            final_qn <- t(apply(final, 1, rank, ties.method="average"))
            final_qn <- stats::qnorm(final_qn / (ncol(final_qn) + 1))
            qn_file  <- file.path(outdir, paste0(ct, "_residuals_qn.txt"))
            write.table(final_qn, qn_file, sep="\t", quote=FALSE, col.names=NA)
            message("Saved QN: ", qn_file)

            saveRDS(list(
                dge=dge, offset=off, residuals=res,
                final_data=final, final_data_qn=final_qn,
                valid_samples=colnames(dge), design=design, fit=fit, model=form,
                tech_vars=tech_vars, batch_vars=batch_vars,
                batch_correction=as.logical("${batch_correction}"),
                batch_method=ifelse(as.logical("${batch_correction}"), "${batch_method}", "none"),
                quant_norm=TRUE,
                modality=ifelse(is_atac, "snATAC-seq", "snRNA-seq")
            ), file.path(outdir, paste0(ct, "_results_qn.rds")))
        } else {
            saveRDS(list(
                dge=dge, offset=off, residuals=res,
                final_data=final,
                valid_samples=colnames(dge), design=design, fit=fit, model=form,
                tech_vars=tech_vars, batch_vars=batch_vars,
                batch_correction=as.logical("${batch_correction}"),
                batch_method=ifelse(as.logical("${batch_correction}"), "${batch_method}", "none"),
                quant_norm=FALSE,
                modality=ifelse(is_atac, "snATAC-seq", "snRNA-seq")
            ), file.path(outdir, paste0(ct, "_results.rds")))
        }

        message("Completed: ", ct, " -> ", outdir)
    }

phenotype_reformatting#

[phenotype_formatting]
parameter: residual_files = []
parameter: output_dir     = str

import os

_cts = [os.path.basename(os.path.dirname(f)) for f in residual_files]

input:  residual_files
output: [f'{output_dir}/3_pheno_reformat/{ct}_phenotype.bed.gz' for ct in _cts]

task: trunk_workers = 1, trunk_size = 1, walltime = '2:00:00', mem = '16G', cores = 2

python: expand = "${ }", stderr = f'{_output[0]:n}.stderr', stdout = f'{_output[0]:n}.stdout'
    import os
    import subprocess
    import pandas as pd

    residual_files = ${residual_files}
    output_dir     = "${output_dir}"

    def read_residuals(path):
        first_line = open(path).readline().rstrip("\n")
        col_names  = first_line.split("\t")
        df = pd.read_csv(path, sep="\t", header=None, skiprows=1)
        if df.shape[1] > len(col_names):
            peak_ids   = df.iloc[:, 0].values
            df         = df.iloc[:, 1:]
            df.columns = col_names
        else:
            peak_ids   = df.iloc[:, 0].values
            df         = df.iloc[:, 1:]
            df.columns = col_names[1:]
        return peak_ids, df

    def to_midpoint_bed(peak_ids, residuals):
        parts  = pd.Series(peak_ids).str.split("-", expand=True)
        chrs   = parts[0].values
        starts = parts[1].astype(int).values
        ends   = parts[2].astype(int).values
        mids   = ((starts + ends) // 2).astype(int)
        bed = pd.DataFrame({
            "#chr":  chrs,
            "start": mids,
            "end":   mids + 1,
            "ID":    peak_ids
        })
        bed = pd.concat([bed, residuals.reset_index(drop=True)], axis=1)
        return bed.sort_values(["#chr", "start"]).reset_index(drop=True)

    def run_cmd(cmd, label):
        r = subprocess.run(cmd, capture_output=True)
        if r.returncode != 0:
            print(f"WARNING: {label} failed: {r.stderr.decode()}")
        else:
            print(f"{label}: OK")

    out_dir = os.path.join(output_dir, "3_pheno_reformat")
    os.makedirs(out_dir, exist_ok=True)

    for res_path in residual_files:
        ct = os.path.basename(os.path.dirname(res_path))

        print(f"\n{'='*40}\nPhenotype Formatting: {ct}\n{'='*40}")

        if not os.path.exists(res_path):
            print(f"WARNING: {res_path} not found, skipping.")
            continue

        peak_ids, residuals = read_residuals(res_path)
        print(f"Loaded {len(peak_ids)} peaks x {residuals.shape[1]} samples")

        bed     = to_midpoint_bed(peak_ids, residuals)
        out_bed = os.path.join(out_dir, f"{ct}_phenotype.bed")
        bed.to_csv(out_bed, sep="\t", index=False, float_format="%.15f")
        print(f"Written: {out_bed}")

        run_cmd(["bgzip", "-f", out_bed],                "bgzip")
        run_cmd(["tabix", "-p", "bed", f"{out_bed}.gz"], "tabix")
        print(f"Completed: {ct} -> {out_dir}")