TWAS, cTWAS and MR#
Description#
This module provides software implementations for transcriptome-wide association analysis (TWAS), Quantile TWAS, and variant selection that yields sparse signals for cTWAS (causal TWAS) analysis following the multi-group cTWAS method of Qian et al. (2024+). It additionally performs Mendelian Randomization using fine-mapping instrumental variables (IV) as described in Zhang et al. (2020) for “causal” effect estimation and model validation. The unit of analysis is a single gene-trait pair.
This notebook runs TWAS followed by cTWAS fine-mapping on a toy chr22 dataset: starting from pre-computed SuSiE-TWAS prediction weights for one gene, it tests each molecular context for association with a GWAS trait, keeps the imputable models selected by cross-validation, and then jointly fine-maps genes and SNPs within an LD block to identify which signals are likely to be directly causal rather than driven by correlation. Quantile TWAS extends traditional TWAS by testing genetic effects at different quantiles of the trait distribution.
This procedure is a continuation of the SuSiE-TWAS workflow: it assumes that xQTL fine-mapping has been performed and molecular-trait prediction weights pre-computed (to be used for TWAS). Cross-validation of TWAS weights is optional but highly recommended.
Prerequisites: TWAS weights (protocol_example.twas_weights.rds) from mnm_regression, plus GWAS summary statistics and an LD matrix for the region of interest.
Timing (toy chr22 dataset): twas ~50 sec; ctwas ~42 sec.
Step 1: TWAS#
Extract the GWAS z-scores and the corresponding LD matrix for the region of interest, (optionally) run allele-matching QC, process the SuSiE-TWAS weights, run the association test for each molecular context across multiple sets of weights, and keep the best cross-validated model per gene. A gene-context pair is considered imputable when its cross-validated performance reaches adjusted \(r^2\) >=0.01 and p-value <0.05 in at least one method; non-imputable genes are dropped.
Notes: specifying a column_file_path (YAML) enables load_rss_data(), which standardizes column names and can generate missing z/beta columns via col_to_flip; without it, the simpler tabix_region() is used. Use --comment_string "#" to handle comment lines in the summary statistics file (by default no comment symbol is assumed). For LASSO, Elastic Net, and mr.ash the weights are taken as-is for QTL variants overlapping GWAS variants, while SuSiE weights can be adjusted to exactly match GWAS variants.
Input — GWAS data interface (similar to susie_rss):
GWAS summary statistics files: tab-delimited, tabix-indexed by
chrom/pos; first 4 columnschrom,pos,A1,A2. For MR,effect_allele_frequencyand sample-size columns are required.Optional column-mapping YAML: required
chrom,pos,A1,A2,z(orbetahat/sebetahat); optionaln,var_y.Optional GWAS meta-file (
study_id, chrom, file path, optional mapping file; chrom0= genome-wide):
study_id chrom file_path column_mapping_file
study1 1 gwas1.tsv.gz column_mapping.yml
study1 2 gwas2.tsv.gz column_mapping.yml
LD reference metadata: single TSV with
#chrom,start,end, LD-matrix path (comma-separated matrix,bim), genome build. See the LD reference preparation doc.xQTL weight database (RDS, region -> context -> weight matrix), metadata columns
#chr,start,end,TSS,region_id,original_data:
#chr start end region_id TSS original_data
chr1 0 6480000 ENSG00000008128 1724356 KNIGHT_pQTL.ENSG00000008128.univariate_susie_twas_weights.rds, ... (truncated)
Output — TWAS result table (imputable genes only): gwas_study, chrom, start, end, block, gene, TSS, context, is_imputable, method, is_selected_method, rsq_adj_cv, pval_cv, twas_z, twas_pval. Key columns: TSS = transcription start site; start/end = gene window from the extended TADB window; is_imputable = CV r-square >0.01 and pvalue <0.05 in >=1 method; is_selected_method = best model (highest CV r-square, CV pvalue <0.05); block = LD region of the gene’s TSS based on predefined LD blocks.
chr molecular_id TSS start end context gwas_study method is_imputable is_selected_method rsq_cv pval_cv twas_z twas_pval
1 ENSG00000060718 103108871 101000000 104000000 AC_DeJager_eQTL Bellenguez_EADB_2022 bayes_r TRUE TRUE 0.25 3.39e-39 0.39 0.69
10 ENSG00000048740 10798396 9320000 12336675 Ast_DeJager_eQTL Bellenguez_EADI_2022 enet TRUE FALSE 0.16 2.72e-17 -0.21 0.83
sos run pipeline/twas_ctwas.ipynb twas \
--cwd output --name protocol_example \
--gwas_meta_data input/twas/protocol_example.twas.gwas_meta.tsv \
--xqtl_meta_data input/twas/protocol_example.twas.xqtl_meta.tsv \
--ld_meta_data input/ld_reference/protocol_example.ld_meta_file.tsv \
--ld_reference_sample_size 17000 \
--regions input/twas/protocol_example.twas.LD_blocks.chr22.bed \
--xqtl_type_table input/twas/protocol_example.twas.data_type_table.txt \
--rsq_pval_cutoff 0.05 --rsq_cutoff 0.01 \
--region-name chr22_10000000_19000000
Step 2: cTWAS — region assembly, global parameters, and fine-mapping#
Combine the selected best-performing TWAS prediction models, TWAS gene z-scores, GWAS SNP z-scores, and LD reference region information into the per-region cTWAS input; estimate the global group prior across all regions; then screen and fine-map regions to obtain posterior inclusion probabilities (PIP) for genes and SNPs. The section is split into three runs: region-data assembly, global-parameter estimation (--run_param_est), and causal fine-mapping (--run_finemapping).
The cTWAS R package is required: remotes::install_github("xinhe-lab/ctwas", ref = "multigroup"). Variant selection is performed based on the top_loci table, SuSiE CS set, or twas-weights effect size when any of twas_weight_cutoff (default=0), cs_min_cor (default=0), min_pip_cutoff (default=0), or max_num_variants (default=Inf) is set away from its default. The prior can use a single shared group via --prior_var_structure shared_all or the multi-group model via --multi_group.
Input — TWAS region information (multiple TWAS and SNP data within each region are combined for joint inference to select variables — genes or SNPs — likely to be directly associated with the phenotype rather than via correlation):
chrom start end block_id
1 1000 5000 block1
2 2000 6000 block2
3 3000 7000 block3
Output — cTWAS fine-mapping (per variable: id, molecular_id, type, context, susie_pip, group, cs, region_id, z):
id molecular_id type context susie_pip cs region_id z
ENSG00000148429|eQTL_Inh_DeJager_eQTL ENSG00000148429 eQTL Inh_DeJager_eQTL 0.07121779052573 L1 10_10500888_12817813 -6.20856901762341
10:10504379:T:C 10:10504379:T:C SNP SNP 0.987261359280169 10_10500888_12817813 1.03614457831325
For candidate genes, MR can subsequently be run: limit MR to genes with cTWAS significance AND a strong instrumental variable (fine-mapping PIP or CS); use fine-mapped xQTL with GWAS data; for multiple IVs aggregate via fixed-effect meta-analysis; and exclude results with severe exclusion-restriction (ER) violations.
cTWAS Step 1: Assemble region data#
sos run pipeline/twas_ctwas.ipynb ctwas \
--cwd output --name protocol_example \
--thin 1 --prior_var_structure shared_all \
--gwas_meta_data input/twas/protocol_example.twas.gwas_meta.tsv \
--xqtl_meta_data input/twas/protocol_example.twas.xqtl_meta.tsv \
--ld_meta_data input/ld_reference/protocol_example.ld_meta_file.tsv \
--regions input/twas/protocol_example.twas.LD_blocks.chr22.bed \
--twas_weight_cutoff 0 \
--region-name chr22_10000000_19000000
cTWAS Step 2: Estimate global parameters#
sos run pipeline/twas_ctwas.ipynb ctwas \
--run_param_est --skip_assembly --thin 1 \
--prior_var_structure shared_all \
--cwd output --name protocol_example \
--gwas_meta_data input/twas/protocol_example.twas.gwas_meta.tsv \
--xqtl_meta_data input/twas/protocol_example.twas.xqtl_meta.tsv \
--ld_meta_data input/ld_reference/protocol_example.ld_meta_file.tsv \
--regions input/twas/protocol_example.twas.LD_blocks.chr22.bed
cTWAS Step 3: Fine-map causal genes and SNPs#
sos run pipeline/twas_ctwas.ipynb ctwas \
--run_finemapping --skip_assembly \
--prior_var_structure shared_all \
--cwd output --name protocol_example \
--gwas_meta_data input/twas/protocol_example.twas.gwas_meta.tsv \
--xqtl_meta_data input/twas/protocol_example.twas.xqtl_meta.tsv \
--ld_meta_data input/ld_reference/protocol_example.ld_meta_file.tsv \
--regions input/twas/protocol_example.twas.LD_blocks.chr22.bed \
--region-name chr22_10000000_19000000
Step 3: Quantile TWAS#
Use pre-computed TWAS weights for quantile-specific testing: for each quantile level, cluster and integrate by fixed and dynamic region groups, extract the relevant GWAS z-scores and LD matrix for the region, and perform quantile region-specific association tests, identifying variants whose effects vary across different quantile regions of the phenotype distribution. Use --region-name (formatted as chr_start_stop) to focus on specific blocks, or --region to analyze a selected list of regions.
sos run pipeline/twas_ctwas.ipynb quantile_twas \
--cwd output --name protocol_example \
--gwas_meta_data input/twas/protocol_example.twas.gwas_meta.tsv \
--xqtl_meta_data input/twas/protocol_example.twas.xqtl_meta.tsv \
--ld_meta_data input/ld_reference/protocol_example.ld_meta_file.tsv \
--ld_reference_sample_size 17000 \
--regions input/twas/protocol_example.twas.LD_blocks.chr22.bed \
--xqtl_type_table input/twas/protocol_example.twas.data_type_table.txt \
--region-name chr22_10000000_19000000
Workflow implementation#
The following cells define the SoS workflow steps ([global], [get_analysis_regions], [twas], [ctwas_1], [ctwas_2], [ctwas_3], [quantile_twas]) invoked by the commands in the steps above.
[global]
parameter: cwd = path("output/")
parameter: gwas_meta_data = path()
parameter: xqtl_meta_data = path()
parameter: ld_meta_data = path()
parameter: ld_reference_sample_size = 17000
parameter: xqtl_type_table = ''
parameter: gwas_name = []
parameter: gwas_data = []
parameter: column_mapping = []
parameter: regions = path()
# Optional: if a region name is provided
# the analysis would be focused on the union of provides region list and region names
parameter: region_name = []
parameter: name = f"{xqtl_meta_data:bn}.{gwas_meta_data:bn}"
parameter: container = ''
parameter: job_size = 100
parameter: walltime = "5m"
parameter: mem = "8G"
parameter: numThreads = 1
# name suffix add to end of the name variable for the output files
parameter: name_suffix = ""
# optional parameter in ctwas to only perform gwas specific analysis
parameter: gwas_study=[]
import os
import pandas as pd
def adapt_file_path(file_path, reference_file):
"""
Adapt a single file path based on its existence and a reference file's path.
Args:
- file_path (str): The file path to adapt.
- reference_file (str): File path to use as a reference for adaptation.
Returns:
- str: Adapted file path.
Raises:
- FileNotFoundError: If no valid file path is found.
"""
reference_path = os.path.dirname(reference_file)
# Check if the file exists
if os.path.isfile(file_path):
return file_path
# Check file name without path
file_name = os.path.basename(file_path)
if os.path.isfile(file_name):
return file_name
# Check file name in reference file's directory
file_in_ref_dir = os.path.join(reference_path, file_name)
if os.path.isfile(file_in_ref_dir):
return file_in_ref_dir
# Check original file path prefixed with reference file's directory
file_prefixed = os.path.join(reference_path, file_path)
if os.path.isfile(file_prefixed):
return file_prefixed
# If all checks fail, raise an error
raise FileNotFoundError(f"No valid path found for file: {file_path}")
def group_by_region(lst, partition):
# from itertools import accumulate
# partition = [len(x) for x in partition]
# Compute the cumulative sums once
# cumsum_vector = list(accumulate(partition))
# Use slicing based on the cumulative sums
# return [lst[(cumsum_vector[i-1] if i > 0 else 0):cumsum_vector[i]] for i in range(len(partition))]
return partition
[get_analysis_regions: shared = ["filtered_region_info", "filtered_regional_xqtl_files", "regional_data"]]
from collections import OrderedDict
def check_required_columns(df, required_columns):
"""Check if the required columns are present in the dataframe."""
missing_columns = [col for col in required_columns if col not in list(df.columns)]
if missing_columns:
raise ValueError(f"Missing required columns: {', '.join(missing_columns)}")
def extract_regional_data(gwas_meta_data, xqtl_meta_data, regions, region_name, gwas_name, gwas_data, column_mapping):
"""
Extracts data from GWAS and xQTL metadata files and additional GWAS data provided.
Args:
- gwas_meta_data (str): File path to the GWAS metadata file.
- xqtl_meta_data (str): File path to the xQTL weight metadata file.
- gwas_name (list): vector of GWAS study names.
- gwas_data (list): vector of GWAS data.
- column_mapping (list, optional): vector of column mapping files.
Returns:
- Tuple of two dictionaries:
- GWAS Dictionary: Maps study IDs to a list containing chromosome number,
GWAS file path, and optional column mapping file path.
- xQTL Dictionary: Nested dictionary with region IDs as keys.
Raises:
- FileNotFoundError: If any specified file path does not exist.
- ValueError: If required columns are missing in the input files or vector lengths mismatch.
"""
# Check vector lengths
if len(gwas_name) != len(gwas_data):
raise ValueError("gwas_name and gwas_data must be of equal length")
if len(column_mapping)>0 and len(column_mapping) != len(gwas_name):
raise ValueError("If column_mapping is provided, it must be of the same length as gwas_name and gwas_data")
# Required columns for each file type
required_gwas_columns = ['study_id', 'chrom', 'file_path']
required_xqtl_columns = ['region_id', '#chr', 'start', 'end', "TSS", 'original_data'] #region_id here is gene name
required_ld_columns = ['chr', 'start', 'stop']
# Reading the GWAS metadata file
gwas_df = pd.read_csv(gwas_meta_data, sep="\t")
check_required_columns(gwas_df, required_gwas_columns)
gwas_dict = OrderedDict()
# Reading LD regions info
# Initialize empty DataFrame for regions
regions_df = pd.DataFrame(columns=['chr', 'start', 'stop'])
# Check if regions file exists and read it
if os.path.isfile(regions):
file_regions_df = pd.read_csv(regions, sep="\t", skipinitialspace=True)
file_regions_df.columns = [col.strip() for col in file_regions_df.columns] # Strip spaces from column names
file_regions_df['chr'] = file_regions_df['chr'].str.strip()
check_required_columns(file_regions_df, required_ld_columns)
regions_df = pd.concat([regions_df, file_regions_df])
# Process region_name if provided:
# fomat: region_name = ["chr1_16103_2888443", "chr1_4320284_5853833"]
if len(region_name) > 0:
# Split region_name entries into chr, start, and stop columns
extra_regions = [name.split("_") for name in region_name]
extra_regions_df = pd.DataFrame(extra_regions, columns=['chr', 'start', 'stop'])
extra_regions_df['start'] = extra_regions_df['start'].astype(int)
extra_regions_df['stop'] = extra_regions_df['stop'].astype(int)
# Add extra regions to regions_df
regions_df = extra_regions_df
# Remove duplicates and reset index
regions_df = regions_df.drop_duplicates().reset_index(drop=True)
regions_dict = OrderedDict()
# Reading the xQTL weight metadata file
xqtl_df = pd.read_csv(xqtl_meta_data, sep="\t")
check_required_columns(xqtl_df, required_xqtl_columns)
xqtl_dict = OrderedDict()
# Process additional GWAS data from R vectors
for name, data, mapping in zip(gwas_name, gwas_data, column_mapping or [None]*len(gwas_name)):
gwas_dict[name] = {0: [data, mapping]}
for _, row in gwas_df.iterrows():
file_path = row['file_path']
mapping_file = row.get('column_mapping_file')
# Adjust paths if necessary
file_path = adapt_file_path(file_path, gwas_meta_data)
if mapping_file:
mapping_file = adapt_file_path(mapping_file, gwas_meta_data)
# Create or update the entry for the study_id
if row['study_id'] not in gwas_dict:
gwas_dict[row['study_id']] = {}
# Expand chrom 0 to chrom 1-22 or use the specified chrom
chrom_range = range(1, 23) if row['chrom'] == 0 else [row['chrom']]
for chrom in chrom_range:
if chrom in gwas_dict[row['study_id']]:
existing_entry = gwas_dict[row['study_id']][f'chr{chrom}']
raise ValueError(f"Duplicate chromosome specification for study_id {row['study_id']}, chrom {chrom}. "
f"Conflicting entries: {existing_entry} and {[file_path, mapping_file]}")
gwas_dict[row['study_id']][f'chr{chrom}'] = [file_path, mapping_file]
for _, row in regions_df.iterrows():
LD_region_id = f"{row['chr']}_{row['start']}_{row['stop']}"
overlapping_xqtls = xqtl_df[(xqtl_df['#chr'] == row['chr']) &
(xqtl_df['TSS'] <= row['stop']) &
(xqtl_df['TSS'] >= (row['start']))]
file_paths = []
mapped_genes = []
# Collect file paths for xQTLs overlapping this region
for _, xqtl_row in overlapping_xqtls.iterrows():
original_data = xqtl_row['original_data']
file_list = original_data.split(',') if ',' in original_data else [original_data]
file_paths.extend([adapt_file_path(fp.strip(), xqtl_meta_data) for fp in file_list])
mapped_genes.extend([xqtl_row['region_id']] * len(file_list))
# Store metadata and files in the dictionary
regions_dict[LD_region_id] = {
"meta_info": [row['chr'], row['start'], row['stop'], LD_region_id, mapped_genes],
"files": file_paths
}
for _, row in xqtl_df.iterrows():
file_paths = [adapt_file_path(fp.strip(), xqtl_meta_data) for fp in row['original_data'].split(',')] # Splitting and stripping file paths
xqtl_dict[row['region_id']] = {"meta_info": [row['#chr'], row['start'], row['end'], row['region_id'], row['contexts']],
"files": file_paths}
return gwas_dict, xqtl_dict, regions_dict
gwas_dict, xqtl_dict, regions_dict = extract_regional_data(gwas_meta_data, xqtl_meta_data,regions,region_name,gwas_name, gwas_data, column_mapping)
regional_data = dict([("GWAS", gwas_dict), ("xQTL", xqtl_dict), ("Regions", regions_dict)])
# get regions data
region_info = [x["meta_info"] for x in regional_data['Regions'].values()]
regional_xqtl_files = [x["files"] for x in regional_data['Regions'].values()]
# Filter out empty xQTL file paths
filtered_region_info = []
filtered_regional_xqtl_files = []
skipped_regions =[]
for region, files in zip(region_info, regional_xqtl_files):
if files:
filtered_region_info.append(region)
filtered_regional_xqtl_files.append(files)
else:
skipped_regions.append(region)
print(f"Skipping {len(skipped_regions)} out of {len(regional_xqtl_files)} regions, no overlapping xQTL weights found. ")
[twas]
# Per-region TWAS-Z + Mendelian Randomization, bridged to the pecotmr S4
# wrappers (no inline analysis R; no Python helpers). Restores the legacy
# per-region fan-out: one task per analysis region, looping the region's genes
# inside. Per region it (1) builds one multi-study GwasSumStats over the region
# via gwas_sumstats_construct.R (its LD sketch spans whatever LD-reference
# blocks the region overlaps), then per gene (2) converts the legacy
# univariate_twas_weights.rds to a pecotmr TwasWeights via
# legacy_twas_weights_convert.R and (3) runs twas.R (causalInferencePipeline)
# to emit a per-gene .twas.rds. MR is deferred (no fine-mapping result is
# passed), so only TWAS Z + p-value are produced.
depends: sos_variable("filtered_regional_xqtl_files")
# Legacy CLI retained. Wired to the pecotmr selection knobs: rsq_cutoff,
# rsq_pval_cutoff, rsq_option, rsq_pval_option (CV weight selection). MR is
# deferred, so mr_pval_cutoff and the remaining legacy params are kept declared
# for CLI stability but are not consumed by the S4 path.
parameter: coverage = "cs_coverage_0.95"
# Thresholds for rsq and CV p-value for imputability/best-model selection
parameter: rsq_cutoff = 0.01
parameter: rsq_pval_cutoff = 0.05
parameter: mr_pval_cutoff = 0.05
parameter: save_ctwas_data = True
parameter: save_mr_result = True
parameter: rsq_option = "rsq"
parameter: rsq_pval_option = ["adj_rsq_pval", "pval"]
# load by batches if memory resource is limited, default to load all at once
parameter: batch_load_memory = 500
parameter: event_filter_rules = path()
parameter: comment_string = "NULL"
parameter: rename_column = False
stop_if(len(filtered_regional_xqtl_files) == 0,
"No regions with overlapping xQTL weights found; skipping TWAS step.")
input: filtered_regional_xqtl_files, group_by = lambda x: group_by_region(x, filtered_regional_xqtl_files), group_with = "filtered_region_info"
output: [f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.{gene}.twas.rds' for gene in sorted(set(_filtered_region_info[4]))]
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output[0]:bn}'
bash: expand = '${ }', stdout = f"{_output[0]:n}.stdout", stderr = f"{_output[0]:n}.stderr", container = container
set -e
# SoS expands ${...}; bare $shellvars (no braces) are left for bash.
outdir=${cwd:a}/${step_name}
mkdir -p "$outdir"
region_id=${_filtered_region_info[3]}
ld_block=${_filtered_region_info[0]}:${_filtered_region_info[1]}-${_filtered_region_info[2]}
# GWAS studies covering this region's chromosome (paths already resolved by
# get_analysis_regions). studies and gwas_tsvs iterate the same dict order.
studies="${",".join([s for s in regional_data['GWAS'] if _filtered_region_info[0] in regional_data['GWAS'][s]])}"
gwas_tsvs="${",".join([regional_data['GWAS'][s][_filtered_region_info[0]][0] for s in regional_data['GWAS'] if _filtered_region_info[0] in regional_data['GWAS'][s]])}"
# (1) One multi-study GwasSumStats over the whole region.
gss="$outdir/${name}.$region_id.gwas_sumstats.rds"
Rscript code/script/pecotmr_integration/gwas_sumstats_construct.R \
--study "$studies" \
--gwas-tsv "$gwas_tsvs" \
--ld-block "$ld_block" \
--ld-meta "${ld_meta_data}" \
--output "$gss"
# (2)+(3) Per unique gene in this region (first weight file per gene):
# convert legacy weights (+FMR for MR), then run TWAS-Z + MR. The expansion
# below emits a flat "gene file gene file ..." token list (first file per
# gene); the loop consumes it two tokens at a time.
n_files=${len(_input)}
n_genes=${len(set([str(g) for g in _filtered_region_info[4]]))}
if [ "$n_files" -ne "$n_genes" ]; then
echo "NOTE: $n_files weight file(s) across $n_genes gene(s) in region $region_id; using the first file per gene." >&2
fi
set -- ${" ".join([tok for i, (g, f) in enumerate(zip(_filtered_region_info[4], _input)) if str(g) not in [str(h) for h in _filtered_region_info[4][:i]] for tok in (str(g), str(f))])}
while [ "$#" -ge 2 ]; do
gene="$1"
weight="$2"
shift 2
tw="$outdir/${name}.$region_id.$gene.twas_weights.rds"
Rscript code/script/pecotmr_integration/legacy_twas_weights_convert.R \
--legacy "$weight" \
--study "${name}" \
--output "$tw"
Rscript code/script/pecotmr_integration/twas.R \
--twas-weights "$tw" \
--gwas-sumstats "$gss" \
--rsq-cutoff ${rsq_cutoff} \
--rsq-pval-cutoff ${rsq_pval_cutoff} \
--rsq-option ${rsq_option} \
--rsq-pval-option "${",".join(rsq_pval_option)}" \
--output "$outdir/${name}.$region_id.$gene.twas.rds"
done
[ctwas_1]
# cTWAS step 1 (assemble): build the per-LD-block inputs for one gene-bearing
# chromosome and assemble them via the pecotmr S4 wrappers (no inline analysis
# R; no Python helpers). Per chromosome: (1) ctwas_manifest.R enumerates the
# chromosome's LD blocks from ld_meta_data, maps each gene (by TSS in
# xqtl_meta_data) to its home block, and pairs each block with a per-block
# GwasSumStats path + the gene's TwasWeights; (2) gwas_sumstats_construct.R
# builds one GwasSumStats per block (SNP background); (3) ctwas_assemble.R runs
# assembleCtwasInputs -> {name}.ctwas_inputs.rds. cTWAS runs over the whole
# chromosome's LD blocks, NOT the coarse twas analysis region.
depends: sos_variable("filtered_region_info")
# chromosome to assemble (integer or 'chrN'); default: the lone gene-bearing chrom
parameter: chrom = ""
# weight pre-filters passed to assembleCtwasInputs
parameter: twas_weight_cutoff = 0.0
parameter: max_num_variants = "Inf"
parameter: cs_min_cor = 0.0
parameter: min_pip_cutoff = 0.0
# declared for CLI stability; consumed downstream / not by assembleCtwasInputs
parameter: thin = 1.0
parameter: maxSNP = 20000
parameter: multi_group = True
parameter: numThreads = 4
parameter: skip_assembly = False
# TwasWeights source (option c): empty -> the upstream twas step's per-gene
# weights ({cwd}/twas/{name}.{region}.{gene}.twas_weights.rds); override with a
# list of prebuilt TwasWeights RDS (e.g. the blessed ctwas weights extracted by
# legacy_ctwas_weights_to_s4.R) when the toy twas weights carry no signal.
parameter: twas_weights = []
skip_if(skip_assembly == True, "Skip [ctwas_1] assemble.")
gene_chroms = sorted(set(str(ri[0]) for ri in filtered_region_info if ri[4]))
ctwas_chrom = (f"chr{int(chrom)}" if str(chrom).isdigit() else str(chrom)) if str(chrom) else (gene_chroms[0] if len(gene_chroms) == 1 else "")
stop_if(not ctwas_chrom, f"Specify --chrom: expected one gene-bearing chromosome, found {gene_chroms}.")
ctwas_weights = list(twas_weights) if twas_weights else [f"{cwd:a}/twas/{name}.{ri[3]}.{g}.twas_weights.rds" for ri in filtered_region_info if str(ri[0]) == ctwas_chrom for g in sorted(set(ri[4]))]
ctwas_studies = [s for s in regional_data['GWAS'] if ctwas_chrom in regional_data['GWAS'][s]]
stop_if(len(ctwas_studies) == 0, f"No GWAS study covers {ctwas_chrom}.")
output: f"{cwd:a}/ctwas/{name}.ctwas_inputs.rds"
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f"{step_name}_{_output:bn}"
bash: expand = '${ }', stdout = f"{_output:n}.stdout", stderr = f"{_output:n}.stderr", container = container
set -e
outdir=${cwd:a}/ctwas
mkdir -p "$outdir"
manifest="$outdir/${name}.ctwas_manifest.${ctwas_chrom}.tsv"
# (1) per-LD-block manifest (block enumeration + gene->home-block mapping).
Rscript code/script/pecotmr_integration/ctwas_manifest.R \
--ld-meta "${ld_meta_data}" \
--chrom "${ctwas_chrom}" \
--xqtl-meta "${xqtl_meta_data}" \
--twas-weights "${",".join([str(w) for w in ctwas_weights])}" \
--gwas-sumstats-dir "$outdir" \
--output "$manifest"
# (2) one GwasSumStats per LD block (all studies on this chromosome).
studies="${",".join(ctwas_studies)}"
gwas_tsvs="${",".join([regional_data['GWAS'][s][ctwas_chrom][0] for s in ctwas_studies])}"
tail -n +2 "$manifest" | while IFS=$'\t' read -r region_id region gwas_rds twas_w; do
Rscript code/script/pecotmr_integration/gwas_sumstats_construct.R \
--study "$studies" \
--gwas-tsv "$gwas_tsvs" \
--ld-block "$region" \
--ld-meta "${ld_meta_data}" \
--output "$gwas_rds"
done
# (3) assemble cTWAS inputs.
Rscript code/script/pecotmr_integration/ctwas_assemble.R \
--manifest "$manifest" \
--twas-weight-cutoff ${twas_weight_cutoff} \
--cs-min-cor ${cs_min_cor} \
--min-pip-cutoff ${min_pip_cutoff} \
--max-num-variants ${max_num_variants} \
--output ${_output}
[ctwas_2]
# cTWAS step 2 (estimate priors): estCtwasParam via ctwas_est.R. Reads the
# assembled inputs from [ctwas_1] on disk so it can be run independently.
# --fallback-to-prefit recovers the prefit estimate when the accurate EM
# cannot be estimated (e.g. the single-gene MWE), mirroring the legacy ctwas_2
# workaround.
parameter: run_param_est = False
parameter: skip_assembly = False
parameter: thin = 1.0
parameter: prior_var_structure = "shared_all"
parameter: niter = 50
# declared for CLI stability; not consumed by the S4 path
parameter: multi_group = True
parameter: numThreads = 8
skip_if(run_param_est == False, "Skip [ctwas_2] parameter estimation.")
input: f"{cwd:a}/ctwas/{name}.ctwas_inputs.rds"
output: f"{cwd:a}/ctwas/{name}.ctwas_est.rds"
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f"{step_name}_{_output:bn}"
bash: expand = '${ }', stdout = f"{_output:n}.stdout", stderr = f"{_output:n}.stderr", container = container
set -e
Rscript code/script/pecotmr_integration/ctwas_est.R \
--inputs ${_input} \
--thin ${thin} \
--niter ${niter} \
--group-prior-var-structure ${prior_var_structure} \
--min-group-size 1 \
--fallback-to-prefit \
--ncore ${numThreads} \
--output ${_output}
[ctwas_3]
# cTWAS step 3 (screen + finemap): screenCtwasRegions + finemapCtwasRegions via
# ctwas_finemap.R. Reads the estimated params from [ctwas_2] on disk so it can
# be run independently. Emits the per-gene cTWAS fine-mapping result.
parameter: run_finemapping = False
parameter: L = 5
parameter: min_nonSNP_PIP = 0.5
# Boundary-gene region merging (legacy ctwas_3 merge_regions, default-off): when
# True, after fine-mapping, each high-PIP boundary gene's adjacent LD blocks are
# merged and re-fine-mapped via mergeCtwasBoundaryRegions (require-in-CS, like
# the legacy susie_pip>0.5 & !is.na(cs) selection); maxSNP caps SNPs per merged
# region.
parameter: merge_regions = False
parameter: maxSNP = 20000
# declared for CLI stability; not consumed by the S4 path
parameter: thin = 1.0
parameter: max_iter = 0
parameter: prior_var_structure = "shared_all"
parameter: region_name = []
parameter: subset_context = []
parameter: multi_group = True
parameter: numThreads = 4
parameter: p_diff_thresh = 5e-8
parameter: alias = "NULL"
skip_if(run_finemapping == False, "Skip [ctwas_3] fine-mapping.")
input: f"{cwd:a}/ctwas/{name}.ctwas_est.rds"
output: f"{cwd:a}/ctwas/{name}.ctwas_finemap.rds"
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f"{step_name}_{_output:bn}"
bash: expand = '${ }', stdout = f"{_output:n}.stdout", stderr = f"{_output:n}.stderr", container = container
set -e
Rscript code/script/pecotmr_integration/ctwas_finemap.R \
--est ${_input} \
--L ${L} \
--min-nonsnp-pip ${min_nonSNP_PIP} \
${('--merge-regions --merge-filter-cs --max-snp ' + str(maxSNP)) if merge_regions else ''} \
--ncore ${numThreads} \
--output ${_output}
[quantile_twas]
depends: sos_variable("filtered_regional_xqtl_files")
parameter: save_ctwas_data = True
parameter: save_mr_result = False
input: filtered_regional_xqtl_files, group_by = lambda x: group_by_region(x, filtered_regional_xqtl_files), group_with = "filtered_region_info"
output_files = [f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.quantile_twas.tsv.gz']
if save_ctwas_data:
output_files.append(f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.quantile_twas_data.rds')
if save_mr_result:
output_files.append(f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.quantile_mr_result.tsv.gz')
output: output_files
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output[0]:bn}'
R: expand = '${ }', stdout = f"{_output[0]:n}.stdout", stderr = f"{_output[0]:n}.stderr", container = container
library(dplyr)
library(data.table)
library(pecotmr)
library(readr)
# get xQTL weight information
xqtl_meta_df <- fread("${xqtl_meta_data}") # Get related gene information from the xqtl_meta data table
xqtl_type_table <- if (isTRUE(file.exists("${xqtl_type_table}"))) fread("${xqtl_type_table}") else NULL
gwas_studies = c(${paths(regional_data["GWAS"].keys()):r,})
gwas_files = c(${paths([v[_filtered_region_info[0]] for k, v in regional_data["GWAS"].items()]):r,})
gene_list <- c(${', '.join([f"'{gene}'" for gene in _filtered_region_info[4]])})
# Initialize export_twas_weights_db
export_twas_weights_db <- list()
export_twas_weights_db[["${_filtered_region_info[3]}"]] <- list()
# Initialize twas_weights_results
twas_weights_results <- list()
# Create initial weight_db_list
weight_db_list <- c(${_input:r,})
names(weight_db_list) <- gene_list
# Split weight_db_list
weight_db_list <- split(weight_db_list, names(weight_db_list))
# Update weight_db_list
weight_db_list_update <- lapply(weight_db_list, function(file_list) {
valid_files <- Filter(function(file) {
if (file.size(file) > 200) {
content <- tryCatch({
data <- readRDS(file)
}, error = function(e) {
warning(paste("Failed to read RDS file:", file))
NULL
})
return(!is.null(content) && any(sapply(content, function(gene_data) {
is.list(gene_data) && any(sapply(gene_data, function(context_data) {
is.list(context_data) && "twas_variant_names" %in% names(context_data)
}))
})))
}
return(FALSE)
}, file_list)
if (length(valid_files) == 0) return(NULL)
return(valid_files)
})
weight_db_list_update <- Filter(Negate(is.null), weight_db_list_update)
if (length(weight_db_list_update) == 0 || all(sapply(weight_db_list_update, length) == 0)) {
message("No valid twas weight files found after filtering. Exiting the script.")
quit(save = "no", status = 0)
}
# Check if weight_db_list_update is empty
# Define tau_values
tau_values <- seq(0.01, 0.99, 0.01)
# Main processing loop
for (gene_db in names(weight_db_list_update)) {
weight_dbs <- weight_db_list_update[[gene_db]]
twas_weights_results[[gene_db]] <- load_quantile_twas_weights(
weight_db_files = weight_dbs,
tau_values = tau_values,
between_cluster = 0.8,
num_intervals = 3
)
if (!is.null(twas_weights_results[[gene_db]]) && !is.null(twas_weights_results[[gene_db]]$weights)) {
twas_weights_results[[gene_db]]$data_type <- setNames(
lapply(names(twas_weights_results[[gene_db]]$weights), function(context) {
xqtl_type_table$type[sapply(xqtl_type_table$context, function(x) grepl(x, context))]
}),
names(twas_weights_results[[gene_db]]$weights)
)
} else {
print(paste("Warning: No valid weights found for gene:", gene_db))
}
}
if (length(twas_weights_results) == 0 || all(sapply(twas_weights_results, is.null))) {
stop("twas_weights_results is empty or invalid. Exiting.")
}
saveRDS(twas_weights_results, "${_output[0]:nnn}.grouped_quantile_twas_weight.rds", compress='xz')
#twas_weights_results
#Step 2: twas analysis for imputable genes across contexts
twas_results_db <- twas_pipeline(twas_weights_data = twas_weights_results,
ld_meta_file_path = "${ld_meta_data}",
gwas_meta_file = "${gwas_meta_data}",
region_block = "${_filtered_region_info[3]}",
ld_reference_sample_size = ${ld_reference_sample_size},
quantile_twas = TRUE,
output_twas_data = ${"TRUE" if save_ctwas_data else "FALSE"} )
# Merging with xQTL meta-data
if (is.null(twas_results_db$twas_result) || nrow(twas_results_db$twas_result) == 0) {
message("twas_results_db$twas_result is NULL. Exiting script normally.")
quit(save = "no", status = 0)
}
message("Merging twas_result with xqtl_meta_df...")
# Check twas_results_db before merging
common_ids <- intersect(twas_results_db$twas_result$molecular_id, xqtl_meta_df$region_id)
if (length(common_ids) > 0) {
twas_results_db$twas_result <- merge(
twas_results_db$twas_result,
xqtl_meta_df[, c("region_id", "TSS", "start", "end")],
by.x = "molecular_id",
by.y = "region_id"
)
twas_results_db$twas_result <- unique(twas_results_db$twas_result)
} else {
warning("No common molecular_id and region_id. Skipping merge.")
}
fwrite(twas_results_db$twas_result[, c(2, 1, (ncol(twas_results_db$twas_result)-2):ncol(twas_results_db$twas_result), 3:(ncol(twas_results_db$twas_result)-3))], file = ${_output[0]:r}, sep = "\t", compress = "gzip")
# Step 3: reformat for follow up cTWAS analysis
if (${"TRUE" if save_ctwas_data else "FALSE"}) {
saveRDS(twas_results_db$twas_data, "${_output[0]:nnn}.quantile_twas_data.rds", compress='xz')
}
if (${"TRUE" if save_mr_result else "FALSE"}) {
fwrite(twas_results_db$mr_result, file = "${_output[0]:nnn}.quantile_mr_result.tsv.gz", sep = "\t", compress = "gzip")
}
message("quantile twas analysis is completed in this block.")