Single-nuclei Pseudobulk Preprocessing (RNA-seq and ATAC-seq) Pipeline#
Overview#
This pipeline preprocesses single-nuclei pseudobulk count data (snATAC-seq or snRNA-seq) for downstream QTL analysis and region-specific studies.
Goals:
Aggregate single-cell counts into pseudobulk count matrices
Transform raw pseudobulk counts into analysis-ready formats
Remove technical confounders
Generate QTL-ready phenotype files or region-specific datasets
Pipeline Structure#
Step 1: Pseudobulk count matrix generation [pseudobulk_count]
↓
Step 2: Sample ID Mapping [sampleid_mapping]
↓
Step 3: Pseudobulk QC [pseudobulk_qc]
(optional) Region Peak/Gene Filtering
(optional) Batch Correction (ComBat or limma)
(optional) Quantile Normalization
↓
Step 4: Phenotype Reformatting → BED [phenotype_formatting]
(genome-wide QTL mapping, snATAC-seq only)
Modality Support#
Feature |
snATAC-seq |
snRNA-seq |
|---|---|---|
Pseudobulk count generation |
TBD |
✓ |
Sample ID mapping |
✓ |
✓ |
Region/gene filtering |
✓ ( |
✓ ( |
Blacklist filtering |
✓ |
— |
|
✓ |
✓ |
|
✓ |
— (refer to this pipeline) |
Input Files#
All toy input files required to run this pipeline can be downloaded here.
File |
Used in |
|---|---|
|
Step 1 |
|
Step 2, Step 3 |
|
Step 2, Step 3 |
|
Step 2, Step 3 |
|
Step 2 |
|
Step 2 |
|
Step 2 (snATAC-seq only) |
Minimal Working Example#
Step 1: Pseudobulk Count Matrix Generation#
Aggregates single-nuclei counts into pseudobulk count matrices per cell type from Seurat objects.
This step is upstream of
sampleid_mappingandpseudobulk_qc. Output feeds directly into the existing preprocessing pipeline.
Input#
File |
Description |
|---|---|
|
Seurat objects with |
Process#
Load each Seurat object and subset to target cell type — skips objects where cell type is not present
Merge all subsets across objects and join layers
Aggregate raw counts by sample (
AggregateExpression)Filter out samples with fewer than
min_cellscells (default: 10)Strip Ensembl version suffixes from gene IDs (
ENSG00000000010.1→ENSG00000000010)Save as
pseudobulk_counts_{celltype}.csv.gz— raw counts only, normalization handled downstream inpseudobulk_qc
GLU cell type: Due to its large size, process in two batches (files 1–6 and 7–11) and pass separately.
Parameters#
Parameter |
Default |
Description |
|---|---|---|
|
required |
One or more Seurat |
|
required |
Output directory for count matrix |
|
|
Cell type to extract (must match |
|
|
Minimum number of cells per sample to retain |
Output#
File |
Description |
|---|---|
|
Raw pseudobulk count matrix (genes × samples) |
Timing: 10–30 min per cell type depending on object size
sos run pipeline/pseudobulk_preprocessing.ipynb pseudobulk_counts \
--seurat-files /restricted/projectnb/xqtl/ROSMAP_snRNAseq_newreference/singleR_results/celltyped_seuratobj11.rds \
--output-dir output/snrna_seq \
--celltype MIC
Step 1: Sample ID Mapping#
Maps original sample identifiers (individualID) to standardized sample IDs (sampleid)
across metadata and count matrix files.
Input#
File |
Description |
|---|---|
|
Mapping reference: |
|
Per-cell-type sample metadata |
|
Per-cell-type peak count matrices |
|
Per-cell-type gene count matrices |
Process#
Part 1 — Metadata files
For each metadata file:
Look up each
individualIDin the mapping referenceAssign
sampleid— falls back toindividualIDif no mapping foundReorder columns:
sampleidfirst, thenindividualID, then the restSave updated file
Part 2 — Count matrix files
For each count file:
Extract the header row (column names only)
Keep the first column (peak or gene IDs) unchanged
Map remaining column names (
individualID→sampleid) where mapping exists, otherwise keep originalWrite new header and stream data rows unchanged
Recompress with gzip
Parameters#
Parameter |
Default |
Description |
|---|---|---|
|
required |
CSV with |
|
required |
Metadata CSV files to remap |
|
required |
Count CSV.gz files to remap |
|
required |
Parent output directory; writes to |
Output#
Output directory: {output_dir}/1_files_with_sampleid/
File |
Description |
|---|---|
|
Metadata with |
|
Count matrices with mapped column headers |
|
Count matrices with mapped column headers |
Timing: < 1 min
sos run pipeline/pseudobulk_preprocessing.ipynb sampleid_mapping \
--output-dir output/snatac_seq \
--map-file data/rosmap_sample_mapping_data.csv \
--meta-files data/snatac_seq/metadata_Mic_50nuc.csv \
--count-files data/snatac_seq/0_pseudobulk_count/pseudobulk_peaks_counts_Mic_50nuc.csv.gz
Step 2: Pseudobulk QC#
Regresses out technical covariates for downstream QTL analysis. Works for both snATAC-seq and snRNA-seq.
Input#
File |
Description |
|---|---|
|
Sample-level metadata (nuclei counts, batch info) |
|
Pseudobulk count matrix |
|
Technical covariates (sampleid + tech var columns, pre-processed) |
|
Blacklisted genomic regions |
Process#
Load count matrix and auto-detect modality (snATAC-seq vs snRNA-seq)
(Optional) Filter to specific genomic regions (snATAC-seq) or gene list (snRNA-seq)
Load metadata; filter samples with fewer than
min_nucleinuclei (default: 20)Align samples between metadata and count matrix
(Optional) Filter blacklisted genomic regions (snATAC-seq only)
Merge tech vars from
tech_vars_filebysampleidDrop samples with NA in any tech var
Apply expression filtering (
filterByExpr):min_count = 5: minimum reads in at least one samplemin_total_count = 15: minimum total reads across all samplesmin_prop = 0.1: feature expressed in ≥10% of samples
TMM normalization
(Optional) Batch correction on
sequencingBatch:limma::removeBatchEffect(default)ComBat(on log-CPM)
Add
sequencingBatchandLibraryto model if present and multi-levelFit linear model (
voom+lmFit+eBayes) with tech vars + batch vars onlyCompute
offset + residualsas final adjusted values:offset: intercept + batch effects at reference levelresiduals: variation after removing technical effects; biological signal retained
(Optional) Quantile normalization of final values
Model formula:
~ {tech_vars} + [sequencingBatch] + [Library]
sequencingBatchandLibraryincluded only if present and have more than one level. Biological variables (pmi,study,msex,age_deathetc.) are not included — they should not be regressed out as they may be associated with genotype.
Parameters#
Parameter |
Default |
Description |
|---|---|---|
|
required |
Metadata CSV files (one per cell type) |
|
required |
Count CSV.gz files (one per cell type, same order as |
|
required |
Parent output directory; writes to |
|
required |
CSV with |
|
|
Genomic blacklist BED file (snATAC-seq only) |
|
|
Comma-separated genomic regions e.g. |
|
|
Comma-separated gene IDs e.g. |
|
|
Apply batch correction ( |
|
|
Batch correction method ( |
|
|
Apply quantile normalization after residuals |
|
|
Min reads in at least one sample |
|
|
Min total reads across all samples |
|
|
Min proportion of samples with expression |
|
|
Min nuclei per sample |
Output#
Output directory: {output_dir}/2_residuals/{celltype}/
File |
Description |
|---|---|
|
Tech-covariate-adjusted values (log2-CPM) |
|
Quantile-normalized adjusted values (if |
|
Full results: DGEList, fit, offset, residuals, design, parameters |
|
Filtered raw counts before normalization |
Timing: < 5 min per cell type
Pseudobulk QC#
#snATAC-seq
sos run pipeline/pseudobulk_preprocessing.ipynb pseudobulk_qc \
--meta-files output/snatac_seq/1_files_with_sampleid/metadata_Mic_50nuc.csv \
--count-files output/snatac_seq/1_files_with_sampleid/pseudobulk_peaks_counts_Mic_50nuc.csv.gz \
--output-dir output/snatac_seq \
--tech-vars-file data/snatac_seq/tech_vars_MIC.csv \
--blacklist-file data/hg38-blacklist.v2.bed.gz #only for snATAC-seq
#snRNA-seq
sos run pipeline/pseudobulk_preprocessing.ipynb pseudobulk_qc \
--meta-files output/snrna_seq/1_files_with_sampleid/metadata_MIC.csv \
--count-files output/snrna_seq/1_files_with_sampleid/pseudobulk_counts_MIC.csv.gz \
--output-dir output/snrna_seq \
--tech-vars-file data/snrna_seq/tech_vars_MIC.csv \
--gene-list ENSG00000000010,ENSG00000000020
Additional parameters#
--min-count 5
--min-total-count 15
--min-prop 0.1
--min-nuclei 20
--quant-norm TRUE
--batch-correction TRUE
--batch-method combat # or limma
--gene-list ENSG00000000010,ENSG00000000020 # for snRNA-seq
--regions chr7:28000000-28300000 # for snATAC-seq
Step 3: Phenotype Reformatting (snATAC-seq only)#
Converts residuals into a QTL-ready BED format for genome-wide caQTL mapping.
For snRNA-seq, please follow this pipeline.
Input#
File |
Description |
|---|---|
|
Residuals from |
Process#
Read residuals file with proper handling of feature IDs and sample columns
Parse peak coordinates from peak IDs (
chr-start-endformat)Convert to midpoint coordinates (standard for QTLtools):
start = floor((peak_start + peak_end) / 2)
end = start + 1
Build BED format:
#chr,start,end,IDfollowed by per-sample valuesSort by chromosome and position
Compress with
bgzipand index withtabix
Parameters#
Parameter |
Default |
Description |
|---|---|---|
|
required |
Residual txt files from |
|
required |
Parent output directory; writes to |
Output#
Output directory: {output_dir}/3_pheno_reformat/
File |
Description |
|---|---|
|
bgzip-compressed BED with midpoint coordinates |
|
tabix index for random-access queries |
Compatible with FastQTL, TensorQTL, and QTLtools.
Timing: < 1 min per cell type
sos run pipeline/pseudobulk_preprocessing.ipynb phenotype_formatting \
--residual-files output/snatac_seq/2_residuals/Mic_50nuc/Mic_50nuc_residuals.txt \
--output-dir output/snatac_seq
Command interface#
sos run pipeline/pseudobulk_preprocessing.ipynb -h
Setup and global parameters#
[global]
parameter: cwd = path("output")
parameter: job_size = 1
parameter: walltime = "5h"
parameter: mem = "16G"
parameter: numThreads = 8
parameter: container = ""
import re
from sos.utils import expand_size
) if container else ""
cwd = path(f'{cwd:a}')
usage: sos run pipeline/pseudobulk_preprocessing.ipynb
[workflow_name | -t targets] [options] [workflow_options]
workflow_name: Single or combined workflows defined in this script
targets: One or more targets to generate
options: Single-hyphen sos parameters (see "sos run -h" for details)
workflow_options: Double-hyphen workflow-specific parameters
Workflows:
sampleid_mapping
pseudobulk_qc
phenotype_formatting
Global Workflow Options:
--cwd output (as path)
--job-size 1 (as int)
--walltime 5h
--mem 16G
--numThreads 8 (as int)
--container ''
Sections
sampleid_mapping:
Workflow Options:
--map-file VAL (as str, required)
--output-dir VAL (as str, required)
--meta-files (as list)
--count-files (as list)
pseudobulk_qc:
Workflow Options:
--meta-files (as list)
--count-files (as list)
--output-dir VAL (as str, required)
--tech-vars-file VAL (as str, required)
--blacklist-file ''
--batch-correction FALSE
--batch-method limma
--quant-norm FALSE
--min-count 5 (as int)
--min-total-count 15 (as int)
--min-prop 0.1 (as float)
--min-nuclei 20 (as int)
--regions ''
--gene-list ''
phenotype_formatting:
Workflow Options:
--residual-files (as list)
--output-dir VAL (as str, required)
pseudobulk_counts#
[pseudobulk_counts]
parameter: seurat_files = []
parameter: output_dir = str
parameter: celltype = 'MIC'
parameter: min_cells = 10
import os
input: seurat_files
output: f'{output_dir}/0_pseudobulk_counts/pseudobulk_counts_{celltype}.csv.gz'
task: trunk_workers = 1, trunk_size = 1, walltime = '4:00:00', mem = '64G', cores = 4
R: expand = "${ }", stdout = f'{_output:n}.stdout', stderr = f'{_output:n}.stderr'
library(Seurat)
library(data.table)
seurat_files <- c(${','.join([f'"{f}"' for f in seurat_files])})
celltype <- "${celltype}"
min_cells <- ${min_cells}
output_dir <- "${output_dir}"
out_file <- "${_output}"
message("Loading and subsetting Seurat objects for: ", celltype)
# ── 1. Load and subset each Seurat object ─────────────────────
subsets <- list()
for (f in seurat_files) {
message("Loading: ", basename(f))
obj <- readRDS(f)
if (celltype %in% obj$celltype) {
subsets[[length(subsets) + 1]] <- subset(obj, subset = celltype == celltype)
} else {
message(" Skipping - celltype not found in: ", basename(f))
}
rm(obj)
gc()
}
if (length(subsets) == 0) stop("No Seurat objects contain celltype: ", celltype)
message("Found ", celltype, " in ", length(subsets), " objects")
# ── 2. Merge and aggregate ────────────────────────────────────
merged <- Reduce(merge, subsets)
rm(subsets)
gc()
merged <- JoinLayers(merged)
merged <- SetIdent(merged, value = "sample")
cell_counts <- table(merged@meta.data$sample)
expr <- AggregateExpression(merged, group.by = "sample", slot = "counts")$RNA
rm(merged)
gc()
# ── 3. Filter samples with < min_cells ────────────────────────
valid_samples <- names(cell_counts[cell_counts >= min_cells])
expr <- expr[, valid_samples]
message("Samples after min_cells (>= ", min_cells, ") filter: ", ncol(expr))
# ── 4. Strip Ensembl version suffixes ─────────────────────────
rownames(expr) <- gsub("\\..*$", "", rownames(expr))
# ── 5. Save as csv.gz ─────────────────────────────────────────
message("Genes: ", nrow(expr), " | Samples: ", ncol(expr))
dt <- data.table(gene_id = rownames(expr), as.data.frame(expr))
fwrite(dt, out_file, compress = "gzip")
message("Saved: ", out_file)
sampleid_mapping#
[sampleid_mapping]
parameter: map_file = str
parameter: output_dir = str
parameter: meta_files = []
parameter: count_files = []
import os
input: meta_files + count_files
output: [f'{output_dir}/1_files_with_sampleid/{os.path.basename(f)}' for f in meta_files + count_files]
python: expand = "${ }"
import pandas as pd
import gzip
import os
import subprocess
import csv
import numpy as np
import tempfile
map_df = pd.read_csv("${map_file}")
id_map = dict(zip(map_df["individualID"], map_df["sampleid"]))
output_dir = "${output_dir}/1_files_with_sampleid"
meta_files = ${meta_files}
count_files = ${count_files}
os.makedirs(output_dir, exist_ok=True)
def map_id(ind_id):
return id_map.get(ind_id, ind_id)
def format_value(val):
if pd.isna(val):
return ''
if isinstance(val, (int, np.integer)):
return str(val)
if isinstance(val, (float, np.floating)):
if val == int(val):
return str(int(val))
else:
return str(val)
return str(val)
# ── Process metadata ───────────────────────────────────────────────────────
for in_path in meta_files:
fname = os.path.basename(in_path)
out_path = os.path.join(output_dir, fname)
meta = pd.read_csv(in_path)
if "individualID" not in meta.columns:
print(f"Warning: individualID column not found in {fname}, skipping.")
continue
meta["sampleid"] = meta["individualID"].map(map_id)
cols = meta.columns.tolist()
cols.remove("sampleid")
cols.remove("individualID")
meta = meta[["sampleid", "individualID"] + cols]
with open(out_path, 'w', newline='') as f:
writer = csv.writer(f, quoting=csv.QUOTE_MINIMAL)
writer.writerow(meta.columns)
for _, row in meta.iterrows():
writer.writerow([format_value(val) for val in row])
# ── Process count files ────────────────────────────────────────────────────
for in_path in count_files:
fname = os.path.basename(in_path)
out_path = os.path.join(output_dir, fname)
with gzip.open(in_path, "rt") as fh:
header_line = fh.readline().rstrip("\n")
col_names = header_line.split(",")
peak_id_col = col_names[0]
new_sample_cols = [map_id(s) for s in col_names[1:]]
new_header = ",".join([peak_id_col] + new_sample_cols)
tmp = tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt')
tmp.write(new_header + "\n")
tmp.close()
cmd = f"zcat {in_path} | tail -n +2 | cat {tmp.name} - | gzip -6 > {out_path}"
subprocess.run(cmd, shell=True, check=True)
os.unlink(tmp.name)
pseudobulk_qc#
[pseudobulk_qc]
parameter: meta_files = []
parameter: count_files = []
parameter: output_dir = str
parameter: tech_vars_file = str
parameter: blacklist_file = ''
parameter: batch_correction = "FALSE"
parameter: batch_method = "limma"
parameter: quant_norm = "FALSE"
parameter: min_count = 5
parameter: min_total_count = 15
parameter: min_prop = 0.1
parameter: min_nuclei = 20
parameter: regions = ''
parameter: gene_list = ''
import os
_cts = [os.path.basename(f).replace('metadata_','').replace('.csv','') for f in meta_files]
input: meta_files + count_files
output: [f'{output_dir}/2_residuals/{ct}/{ct}_residuals.txt' for ct in _cts]
task: trunk_workers = 1, trunk_size = 1, walltime = '6:00:00', mem = '64G', cores = 4
R: expand = "${ }", stdout = f'{_output[0]:n}.stdout', stderr = f'{_output[0]:n}.stderr'
library(edgeR)
library(limma)
library(data.table)
library(GenomicRanges)
if (as.logical("${batch_correction}") && "${batch_method}" == "combat") library(sva)
# ── predictOffset ──────────────────────────────────────────────────────
predictOffset <- function(fit, tech_vars) {
D <- fit$design
Dm <- D
for (col in colnames(D)) {
if (col == "(Intercept)") next
is_tech <- any(sapply(tech_vars, function(v) grepl(paste0("^", v), col)))
if (is_tech) {
if (is.numeric(D[, col]) && !all(D[, col] %in% c(0, 1)))
Dm[, col] <- median(D[, col], na.rm=TRUE)
else
Dm[, col] <- 0
} else {
Dm[, col] <- 0
}
}
B <- fit$coefficients
B[is.na(B)] <- 0
off <- B %*% t(Dm)
colnames(off) <- rownames(fit$design)
return(off)
}
filter_blacklist <- function(mat, bed, feat_label) {
peaks <- data.table(id = rownames(mat))
peaks[, c("chr","start","end") := tstrsplit(gsub("_","-",id), "-")]
peaks[, `:=`(start = as.numeric(start), end = as.numeric(end))]
bl <- fread(bed)[, 1:3]
setnames(bl, c("chr","start","end"))
bl[, `:=`(start = as.numeric(start), end = as.numeric(end))]
gr1 <- GRanges(peaks$chr, IRanges(peaks$start, peaks$end))
gr2 <- GRanges(bl$chr, IRanges(bl$start, bl$end))
blacklisted <- unique(queryHits(findOverlaps(gr1, gr2)))
if (length(blacklisted) > 0) {
message("Blacklisted ", feat_label, " removed: ", length(blacklisted))
return(mat[-blacklisted, , drop=FALSE])
}
return(mat)
}
parse_regions <- function(region_str) {
if (is.null(region_str) || region_str == "") return(NULL)
lapply(strsplit(region_str, ",")[[1]], function(r) {
parts <- strsplit(trimws(r), ":|−|-")[[1]]
list(chr=parts[1], start=as.integer(parts[2]), end=as.integer(parts[3]))
})
}
filter_regions <- function(mat, regions) {
peaks <- data.table(id = rownames(mat))
peaks[, c("chr","start","end") := tstrsplit(gsub("_","-",id), "-")]
peaks[, `:=`(start = as.integer(start), end = as.integer(end))]
gr_peaks <- GRanges(peaks$chr, IRanges(peaks$start, peaks$end))
gr_regions <- GRanges(
sapply(regions, `[[`, "chr"),
IRanges(sapply(regions, `[[`, "start"), sapply(regions, `[[`, "end"))
)
keep <- unique(queryHits(findOverlaps(gr_peaks, gr_regions)))
if (length(keep) == 0) stop("No peaks overlap the specified regions.")
message("Peaks after region filter: ", length(keep))
mat[keep, , drop=FALSE]
}
meta_files <- c(${','.join([f'"{f}"' for f in meta_files])})
count_files <- c(${','.join([f'"{f}"' for f in count_files])})
if (length(meta_files) != length(count_files))
stop("meta_files and count_files must have the same length and order.")
# ── Load tech vars from file ───────────────────────────────────────────
tech_df <- fread("${tech_vars_file}")
tech_vars <- setdiff(colnames(tech_df), "sampleid")
message("Tech vars: ", paste(tech_vars, collapse=", "))
regions <- parse_regions("${regions}")
gene_list <- trimws(strsplit("${gene_list}", ",")[[1]])
gene_list <- gene_list[gene_list != ""]
for (i in seq_along(meta_files)) {
meta_file <- meta_files[i]
counts_file <- count_files[i]
ct <- sub("\\.csv$", "", sub("^metadata_", "", basename(meta_file)))
message("\n", paste(rep("=", 40), collapse=""))
message("Processing: ", ct)
message("Batch correction: ", ifelse(as.logical("${batch_correction}"), "${batch_method}", "none"))
message("Quantile normalization: ", as.logical("${quant_norm}"))
message(paste(rep("=", 40), collapse=""))
outdir <- file.path("${output_dir}/2_residuals", ct)
dir.create(outdir, recursive=TRUE, showWarnings=FALSE)
# ── 1. Load counts ─────────────────────────────────────────────────
counts_raw <- fread(counts_file)
counts <- as.matrix(counts_raw[, -1, with=FALSE])
rownames(counts) <- counts_raw[[1]]
rm(counts_raw)
# ── Auto-detect modality ───────────────────────────────────────────
is_atac <- grepl("^chr.*-[0-9]+-[0-9]+$", rownames(counts)[1])
feat_label <- ifelse(is_atac, "peaks", "genes")
message("Modality: ", ifelse(is_atac, "snATAC-seq", "snRNA-seq"))
message("Loaded: ", nrow(counts), " ", feat_label, " x ", ncol(counts), " samples")
# ── 1b. Region/gene filtering (optional) ──────────────────────────
if (is_atac && !is.null(regions)) {
message("Filtering peaks to specified regions...")
counts <- filter_regions(counts, regions)
} else if (!is_atac && length(gene_list) > 0) {
genes_present <- intersect(rownames(counts), gene_list)
if (length(genes_present) == 0) stop("No matching genes found in count matrix.")
message("Genes after gene_list filter: ", length(genes_present))
counts <- counts[genes_present, , drop=FALSE]
}
# ── 2. Load metadata ───────────────────────────────────────────────
meta <- fread(meta_file)
idcol <- intersect(c("sampleid","sampleID","individualID","projid"), colnames(meta))[1]
if (is.na(idcol)) stop("Cannot find sample ID column in metadata.")
# ── 3. Nuclei filter ──────────────────────────────────────────────
n_nuclei_col <- intersect(c("n_nuclei","n.nuclei","nNuclei","nuclei_count"), colnames(meta))[1]
if (!is.na(n_nuclei_col)) {
meta <- meta[meta[[n_nuclei_col]] > ${min_nuclei}]
message("Samples after nuclei (>${min_nuclei}) filter: ", nrow(meta))
}
# ── 4. Align samples ──────────────────────────────────────────────
common <- intersect(meta[[idcol]], colnames(counts))
if (length(common) == 0) stop("Zero sample overlap between metadata and count matrix.")
counts <- counts[, common, drop=FALSE]
message("Samples after alignment: ", length(common))
# ── 5. Blacklist filtering ─────────────────────────────────────────
if ("${blacklist_file}" != "" && file.exists("${blacklist_file}")) {
counts <- filter_blacklist(counts, "${blacklist_file}", feat_label)
message(feat_label, " after blacklist filter: ", nrow(counts))
} else {
message("No blacklist file - skipping.")
}
# ── 6. Merge tech vars by sampleid ────────────────────────────────
tech_sub <- tech_df[tech_df$sampleid %in% common]
tech_sub <- tech_sub[match(common, tech_sub$sampleid)]
# ── 7. Drop samples with NA in tech vars ──────────────────────────
keep_rows <- complete.cases(tech_sub[, ..tech_vars])
tech_sub <- tech_sub[keep_rows]
counts <- counts[, tech_sub$sampleid, drop=FALSE]
message("Valid samples for modelling: ", nrow(tech_sub))
# ── 8. Expression filtering ────────────────────────────────────────
dge <- DGEList(counts=counts, samples=tech_sub)
dge$samples$group <- factor(rep("all", ncol(dge)))
message(feat_label, " before filter: ", nrow(dge))
keep <- filterByExpr(dge, group=dge$samples$group,
min.count=${min_count},
min.total.count=${min_total_count},
min.prop=${min_prop})
dge <- dge[keep,, keep.lib.sizes=FALSE]
message(feat_label, " after filter: ", nrow(dge))
write.table(dge$counts,
file.path(outdir, paste0(ct, "_filtered_raw_counts.txt")),
sep="\t", quote=FALSE, col.names=NA)
# ── 9. TMM normalization ───────────────────────────────────────────
dge <- calcNormFactors(dge, method="TMM")
# ── 10. Optional batch correction ──────────────────────────────────
if (as.logical("${batch_correction}") && "sequencingBatch" %in% colnames(dge$samples)) {
batches <- dge$samples$sequencingBatch
batch_counts <- table(batches)
valid_batches <- names(batch_counts[batch_counts > 1])
keep_bc <- batches %in% valid_batches
dge <- dge[, keep_bc, keep.lib.sizes=FALSE]
batches <- batches[keep_bc]
message("Samples after singleton batch removal: ", ncol(dge))
if ("${batch_method}" == "combat") {
logCPM <- cpm(dge, log=TRUE, prior.count=1)
logCPM <- ComBat(dat=logCPM, batch=factor(batches))
dge$counts <- round(pmax(2^logCPM, 0))
message("ComBat applied on log-CPM.")
} else {
logCPM <- cpm(dge, log=TRUE, prior.count=1)
logCPM <- removeBatchEffect(logCPM, batch=factor(batches))
dge$counts <- round(pmax(2^logCPM, 0))
message("limma removeBatchEffect applied.")
}
}
# ── 11. Add batch vars to model if multi-level ────────────────────
batch_vars <- c()
if ("sequencingBatch" %in% colnames(dge$samples) &&
length(unique(dge$samples$sequencingBatch)) > 1) {
dge$samples$sequencingBatch_factor <- factor(dge$samples$sequencingBatch)
batch_vars <- c(batch_vars, "sequencingBatch_factor")
}
if ("Library" %in% colnames(dge$samples) &&
length(unique(dge$samples$Library)) > 1) {
dge$samples$Library_factor <- factor(dge$samples$Library)
batch_vars <- c(batch_vars, "Library_factor")
}
# ── 12. Build design matrix ────────────────────────────────────────
all_model_vars <- intersect(c(tech_vars, batch_vars), colnames(dge$samples))
form <- as.formula(paste("~", paste(all_model_vars, collapse=" + ")))
design <- model.matrix(form, data=dge$samples)
message("Formula: ", deparse(form))
if (!is.fullrank(design)) {
message("Design not full rank - trimming.")
qr_d <- qr(design)
design <- design[, qr_d$pivot[seq_len(qr_d$rank)], drop=FALSE]
}
message("Design matrix: ", nrow(design), " x ", ncol(design))
# ── 13. Voom + lmFit + eBayes ─────────────────────────────────────
v <- voom(dge, design, plot=FALSE)
fit <- lmFit(v, design)
fit <- eBayes(fit)
# ── 14. Offset + residuals ─────────────────────────────────────────
off <- predictOffset(fit, tech_vars=tech_vars)
res <- residuals(fit, v$E)
final <- off + res
# ── 15. Save residuals ─────────────────────────────────────────────
out_file <- file.path(outdir, paste0(ct, "_residuals.txt"))
write.table(final, out_file, sep="\t", quote=FALSE, col.names=NA)
message("Saved: ", out_file)
message(" ", ifelse(is_atac,"Peaks","Genes"), ": ", nrow(final), " | Samples: ", ncol(final))
# ── 16. Optional quantile normalization ───────────────────────────
if (as.logical("${quant_norm}")) {
final_qn <- t(apply(final, 1, rank, ties.method="average"))
final_qn <- stats::qnorm(final_qn / (ncol(final_qn) + 1))
qn_file <- file.path(outdir, paste0(ct, "_residuals_qn.txt"))
write.table(final_qn, qn_file, sep="\t", quote=FALSE, col.names=NA)
message("Saved QN: ", qn_file)
saveRDS(list(
dge=dge, offset=off, residuals=res,
final_data=final, final_data_qn=final_qn,
valid_samples=colnames(dge), design=design, fit=fit, model=form,
tech_vars=tech_vars, batch_vars=batch_vars,
batch_correction=as.logical("${batch_correction}"),
batch_method=ifelse(as.logical("${batch_correction}"), "${batch_method}", "none"),
quant_norm=TRUE,
modality=ifelse(is_atac, "snATAC-seq", "snRNA-seq")
), file.path(outdir, paste0(ct, "_results_qn.rds")))
} else {
saveRDS(list(
dge=dge, offset=off, residuals=res,
final_data=final,
valid_samples=colnames(dge), design=design, fit=fit, model=form,
tech_vars=tech_vars, batch_vars=batch_vars,
batch_correction=as.logical("${batch_correction}"),
batch_method=ifelse(as.logical("${batch_correction}"), "${batch_method}", "none"),
quant_norm=FALSE,
modality=ifelse(is_atac, "snATAC-seq", "snRNA-seq")
), file.path(outdir, paste0(ct, "_results.rds")))
}
message("Completed: ", ct, " -> ", outdir)
}
phenotype_reformatting#
[phenotype_formatting]
parameter: residual_files = []
parameter: output_dir = str
import os
_cts = [os.path.basename(os.path.dirname(f)) for f in residual_files]
input: residual_files
output: [f'{output_dir}/3_pheno_reformat/{ct}_phenotype.bed.gz' for ct in _cts]
task: trunk_workers = 1, trunk_size = 1, walltime = '2:00:00', mem = '16G', cores = 2
python: expand = "${ }", stderr = f'{_output[0]:n}.stderr', stdout = f'{_output[0]:n}.stdout'
import os
import subprocess
import pandas as pd
residual_files = ${residual_files}
output_dir = "${output_dir}"
def read_residuals(path):
first_line = open(path).readline().rstrip("\n")
col_names = first_line.split("\t")
df = pd.read_csv(path, sep="\t", header=None, skiprows=1)
if df.shape[1] > len(col_names):
peak_ids = df.iloc[:, 0].values
df = df.iloc[:, 1:]
df.columns = col_names
else:
peak_ids = df.iloc[:, 0].values
df = df.iloc[:, 1:]
df.columns = col_names[1:]
return peak_ids, df
def to_midpoint_bed(peak_ids, residuals):
parts = pd.Series(peak_ids).str.split("-", expand=True)
chrs = parts[0].values
starts = parts[1].astype(int).values
ends = parts[2].astype(int).values
mids = ((starts + ends) // 2).astype(int)
bed = pd.DataFrame({
"#chr": chrs,
"start": mids,
"end": mids + 1,
"ID": peak_ids
})
bed = pd.concat([bed, residuals.reset_index(drop=True)], axis=1)
return bed.sort_values(["#chr", "start"]).reset_index(drop=True)
def run_cmd(cmd, label):
r = subprocess.run(cmd, capture_output=True)
if r.returncode != 0:
print(f"WARNING: {label} failed: {r.stderr.decode()}")
else:
print(f"{label}: OK")
out_dir = os.path.join(output_dir, "3_pheno_reformat")
os.makedirs(out_dir, exist_ok=True)
for res_path in residual_files:
ct = os.path.basename(os.path.dirname(res_path))
print(f"\n{'='*40}\nPhenotype Formatting: {ct}\n{'='*40}")
if not os.path.exists(res_path):
print(f"WARNING: {res_path} not found, skipping.")
continue
peak_ids, residuals = read_residuals(res_path)
print(f"Loaded {len(peak_ids)} peaks x {residuals.shape[1]} samples")
bed = to_midpoint_bed(peak_ids, residuals)
out_bed = os.path.join(out_dir, f"{ct}_phenotype.bed")
bed.to_csv(out_bed, sep="\t", index=False, float_format="%.15f")
print(f"Written: {out_bed}")
run_cmd(["bgzip", "-f", out_bed], "bgzip")
run_cmd(["tabix", "-p", "bed", f"{out_bed}.gz"], "tabix")
print(f"Completed: {ct} -> {out_dir}")