TWAS, cTWAS and MR#

Introduction#

This module provides software implementations for transcriptome-wide association analysis (TWAS), Quantile TWAS and performs variant selection for providing sparse signals for cTWAS (causal TWAS) analysis as described in Qian et al (2024+) the multi-group cTWAS method. It will additionally perform Mendelian Randomization using fine-mapping instrumental variables (IV) as described in Zhang et al 2020 for “causal” effects estimation and model validation, with the unit of analysis being a single gene-trait pair.

This procedure is a continuation of the SuSiE-TWAS workflow — it assumes that xQTL fine-mapping has been performed and moleuclar traits prediction weights pre-computed (to be used for TWAS). Cross validation for TWAS weights is optional but highly recommended.

Quantile TWAS extends traditional TWAS by testing genetic effects at different quantiles of the trait distribution, which provides insights into genetic associations that vary across the distribution rather than just at the mean.

GWAS data required are GWAS summary statistics and LD matrix for the region of interest.

Step 1: TWAS#

  1. Extract GWAS z-score for region of interest and corresponding LD matrix.

  2. (Optional) perform allele matching QC for the LD matrix with summary stats.

  3. Process weights: for a number of methods such as LASSO, Elastic Net and mr.ash we have to take the weights as is for QTL variants overlapping with GWAS variants. For SuSiE weights it can be adjusted to exactly match GWAS variants.

  4. Perofrm TWAS test for multiple sets of weights.

  5. For each gene, filter TWAS results by keeping the best model selected by CV. Drop the genes that don’t show good evidence of TWAS prediction weights.

Step 2: Variant Selection for Imputable Genes via the Best Prediction Methods#

  1. Determine if the gene is imputable at each context based on the twas_cv performance by adjusted \(r^2\) (>=0.01) and p-values (<0.05).

  2. The imputable gene-context pair will go through variant selection step. Maximum 10 variants with top pip selected from either top_loci table or SuSiE CS set.

  3. Harmonize weights against LD reference and udpate SuSiE weight.

  4. Extract weights by best model for the context then by the variant names were selected from the previous step

Step 3: cTWAS analysis#

FIXME: add more documentation here

Step 4: MR for candidate genes#

  1. Limit MR only to those showing some evidence of cTWAS significance AND have strong instrumental variable (fine-mapping PIP or CS).

  2. Use fine-mapped xQTL with GWAS data to perform MR.

  3. For multiple IV, aggregate individual IV estimates using a fixed-effect meta-analysis procedure.

  4. Identify and exclude results with severe violations of the exclusion restriction (ER) assumption.

Step 5: Quantile TWAS analysis#

  • Use pre-computed TWAS weights (beta for now) for quantile-specific testing.

  • For each quantile level, cluster and integrate by fixed and dynamic region groups, and extract relevant GWAS z-scores and LD matrix for the region of interest.

  • Perform quantile region-specific association tests, identifying genetic variants with effects that vary across different quantile regions of the phenotype distribution.

Input#

GWAS Data Input Interface (Similar to susie_rss)#

I. GWAS Summary Statistics Files

  • Input: Vector of files for one or more GWAS studies.

  • Format:

    • Tab-delimited files that is tabix-indexed by the first chrom(or #chrom) and second pos column.

    • First 4 columns: chrom or #chrom, pos, A1, A2

    • Additional columns can be loaded using column mapping file see below

  • Column Mapping files (optional):

    • Optional YAML file for custom column mapping.

    • Required columns: chrom, pos, A1, A2, z or (betahat and sebetahat).

    • Optional columns: n, var_y (relevant to fine-mapping).

II. GWAS Summary Statistics Meta-File: this is optional and helpful when there are lots of GWAS data to process via the same command

  • Columns: study_id, chromosome number, path to summary statistics file, optional path to column mapping file.

  • Note: Chromosome number 0 indicates a genome-wide file.

eg: gwas_meta.tsv

study_id    chrom    file_path                 column_mapping_file
study1      1        gwas1.tsv.gz         column_mapping.yml
study1      2        gwas2.tsv.gz         column_mapping.yml
study2      0        gwas3.tsv.gz         column_mapping.yml

If both summary stats file (I) and meta data file (II) are specified we will take the union of the two.

III. LD Reference Metadata File

  • Format: Single TSV file.

  • Contents:

    • Columns: #chrom, start, end, path to the LD matrix, genomic build.

    • LD matrix path format: comma-separated, first entry is the LD matrix, second is the bim file.

  • Documentation: Refer to our LD reference preparation document for detailed information.

Output of Fine-Mapping & TWAS Pipeline#

xQTL Weight Database Metadata File:

  • Essential columns: #chr, start, end, TSS, region_id, original_data

  • Structure of the weight database:

    • RDS format.

    • Organized hierarchically: region → context → weight matrix.

    • Each column represents a different method.

eg: xqtl_meta.tsv

#chr start end region_id TSS original_data combined_data combined_data_sumstats contexts contexts_top_loci
chr1 0 6480000 ENSG00000008128 1724356 "KNIGHT_pQTL.ENSG00000008128.univariate_susie_twas_weights.rds, MiGA_eQTL.ENSG00000008128.univariate_susie_twas_weights.rds, MSBB_eQTL.ENSG00000008128.univariate_susie_twas_weights.rds, ROSMAP_Bennett_Klein_pQTL.ENSG00000008128.univariate_susie_twas_weights.rds, ROSMAP_DeJager_eQTL.ENSG00000008128.univariate_susie_twas_weights.rds, ROSMAP_Kellis_eQTL.ENSG00000008128.univariate_susie_twas_weights.rds, ROSMAP_mega_eQTL.ENSG00000008128.univariate_susie_twas_weights.rds, STARNET_eQTL.ENSG00000008128.univariate_susie_twas_weights.rds" Fungen_xQTL.ENSG00000008128.cis_results_db.export.rds Fungen_xQTL.ENSG00000008128.cis_results_db.export_sumstats.rds Knight_eQTL_brain,MiGA_GFM_eQTL,MiGA_GTS_eQTL,MiGA_SVZ_eQTL,MiGA_THA_eQTL,BM_10_MSBB_eQTL,BM_22_MSBB_eQTL,BM_36_MSBB_eQTL,BM_44_MSBB_eQTL,monocyte_ROSMAP_eQTL,Mic_DeJager_eQTL,Ast_DeJager_eQTL,Oli_DeJager_eQTL,Exc_DeJager_eQTL,Inh_DeJager_eQTL,DLPFC_DeJager_eQTL,PCC_DeJager_eQTL,AC_DeJager_eQTL,Mic_Kellis_eQTL,Ast_Kellis_eQTL,Oli_Kellis_eQTL,OPC_Kellis_eQTL,Exc_Kellis_eQTL,Inh_Kellis_eQTL,Ast_mega_eQTL,Exc_mega_eQTL,Inh_mega_eQTL,Oli_mega_eQTL,STARNET_eQTL_Mac Knight_eQTL_brain,MiGA_GFM_eQTL,MiGA_GTS_eQTL,MiGA_SVZ_eQTL,MiGA_THA_eQTL,BM_10_MSBB_eQTL,BM_22_MSBB_eQTL,BM_36_MSBB_eQTL,BM_44_MSBB_eQTL,monocyte_ROSMAP_eQTL,Mic_DeJager_eQTL,Ast_DeJager_eQTL,Oli_DeJager_eQTL,Exc_DeJager_eQTL,Inh_DeJager_eQTL,DLPFC_DeJager_eQTL,PCC_DeJager_eQTL,AC_DeJager_eQTL,Mic_Kellis_eQTL,Ast_Kellis_eQTL,Oli_Kellis_eQTL,OPC_Kellis_eQTL,Exc_Kellis_eQTL,Inh_Kellis_eQTL,Ast_mega_eQTL,Exc_mega_eQTL,Inh_mega_eQTL,Oli_mega_eQTL,STARNET_eQTL_Mac

This file is automatically generated as part of the FunGen-xQTL protocol, although only the essential columns are relevant to our application here.

TWAS region information#

This is required for cTWAS analysis, where multiple TWAS and SNP data within each region are combined for joint inference to select the variables, either genes or SNPs, to figure out which variables are likely to be directly associated with the phenotype of interest, rather than being associated through correlations with true causal variables.

chrom    start    end    block_id  
1        1000     5000   block1    
2        2000     6000   block2
3        3000     7000   block3

Output#

I. A table with the following contents

gwas_study, chrom, start, end, block, gene, TSS, context, is_imputable, method, is_selected_method, rsq_adj_cv, pval_cv, twas_z, twas_pval

where

  • TSS: Transcription start site

  • start and end: start and end position of the gene from the extended TADB window for cis-finemapping

  • is_imputable: status for wether this gene-context pair has cross-validated performance with r-square > 0.01 and pvalue < 0.05 in at least one method for expression level prediction.

  • rsq_cv: cross validation test of r-square for a method.

  • pval_cv: cross-validation test of predictive performance p-value for a method.

  • is_selected_method: status of being best performing method (for each gene–context pair), we pick the best model with highest cross validation r-square with cross validation pvalue < 0.05

  • block: The LD region where the gene’s transcription start site (TSS) is located, based on predefined LD blocks.

If twas_z is NA it means the context is not imputable for the method of choice

II. a list of refined_twas_weights_data per block, in RDS format, of this structure:

$ region_id
   $ gene 
      $ context
        $ selected_model
        $ is_imputable 
        $ selected_top_variants
        $ selected_model_weights

The twas result table will only contain imputatable genes. It should come with a meta-data file like this:

chrom    start    end    block_id  refined_twas_db
1        1000     5000   block1    block1.rds
2        2000     6000   block2    block2.rds
3        3000     7000   block3    block3.rds

Example#

sos run xqtl-protocol/code/pecotmr_integration/twas.ipynb twas \
   --cwd /output/ --name test \
   --gwas_meta_data /mnt/vast/hpc/csg/cl4215/mrmash/workflow/GWAS/gwas_meta.tsv \
   --ld_meta_data /mnt/vast/hpc/csg/data_public/20240409_ADSP_LD_matrix/ld_meta_file.tsv \
   --regions /mnt/vast/hpc/csg/cl4215/mrmash/workflow/pipeline_data/picalm_EUR_LD_blocks.bed \
   --xqtl_meta_data /mnt/vast/hpc/csg/cl4215/mrmash/workflow/pipeline_data/test_data/twas_test_xqtl_meta.tsv \
   --xqtl_type_table /mnt/vast/hpc/csg/cl4215/mrmash/workflow/pipeline_data/data_type_table.txt \
   --rsq_pval_cutoff 0.05 --rsq_cutoff 0.01
sos run xqtl-protocol/code/pecotmr_integration/twas.ipynb ctwas \
   --cwd /output/ --name test \
   --gwas_meta_data /mnt/vast/hpc/csg/cl4215/mrmash/workflow/GWAS/gwas_meta.tsv \
   --ld_meta_data /mnt/vast/hpc/csg/data_public/20240409_ADSP_LD_matrix/ld_meta_file.tsv \
   --regions /mnt/vast/hpc/csg/cl4215/mrmash/workflow/pipeline_data/picalm_EUR_LD_blocks.bed \
   --xqtl_meta_data /mnt/vast/hpc/csg/cl4215/mrmash/workflow/pipeline_data/test_data/twas_test_xqtl_meta.tsv \
   --twas_weight_cutoff 0 \
   --region_name chr10_80126158_82231647 chr11_84267999_86714492 

Here using --region-name we focus the analysis on 2 blocks: format as chr_start_stop.

sos run xqtl-protocol/code/pecotmr_integration/twas.ipynb quantile_twas \
   --cwd /output/ --name test \
   --gwas_meta_data /mnt/vast/hpc/csg/cl4215/mrmash/workflow/GWAS/gwas_meta.tsv \
   --ld_meta_data /mnt/vast/hpc/csg/data_public/20240409_ADSP_LD_matrix/ld_meta_file.tsv \
   --region_name chr11_84267999_86714492 chr7_54681006_57314931 \
   --xqtl_meta_data /home/al4225/project/quantile_twas/quantile_twas_analysis/test_data/small_region_gene_meta_data_test.tsv

It is also possible to analyze a selected list of regions using option --regions. The 3 columns(chr, start, stop) of this file will be used for the block list to analyze. Here for example use the same list of regions as we used for LD block:

sos run xqtl-protocol/code/pecotmr_integration/twas.ipynb quantile_twas \
   --cwd /output/ --name test \
   --gwas_meta_data /mnt/vast/hpc/csg/cl4215/mrmash/workflow/GWAS/gwas_meta.tsv \
   --ld_meta_data /mnt/vast/hpc/csg/data_public/20240409_ADSP_LD_matrix/ld_meta_file.tsv \
   --regions /mnt/vast/hpc/csg/cl4215/mrmash/workflow/pipeline_data/EUR_LD_blocks.bed \
   --xqtl_meta_data /home/al4225/project/quantile_twas/quantile_twas_analysis/test_data/small_region_gene_meta_data_test.tsv
[global]
parameter: cwd = path("output/")
parameter: gwas_meta_data = path()
parameter: xqtl_meta_data = path()
parameter: ld_meta_data = path()
parameter: xqtl_type_table = ''
parameter: gwas_name = []
parameter: gwas_data = []
parameter: column_mapping = []
parameter: regions = path()
# Optional: if a region name is provided 
# the analysis would be focused on the union of provides region list and region names
parameter: region_name = []
parameter: name = f"{xqtl_meta_data:bn}.{gwas_meta_data:bn}"
parameter: container = ''
import re
parameter: entrypoint= ('micromamba run -a "" -n' + ' ' + re.sub(r'(_apptainer:latest|_docker:latest|\.sif)$', '', container.split('/')[-1])) if container else ""
parameter: job_size = 100
parameter: walltime = "5m"
parameter: mem = "8G"
parameter: numThreads = 1
# name suffix add to end of the name variable for the output files 
parameter: name_suffix = ""
# optional parameter in ctwas to only perform gwas specific analysis 
parameter: gwas_study=[]

import os
import pandas as pd

def adapt_file_path(file_path, reference_file):
    """
    Adapt a single file path based on its existence and a reference file's path.

    Args:
    - file_path (str): The file path to adapt.
    - reference_file (str): File path to use as a reference for adaptation.

    Returns:
    - str: Adapted file path.

    Raises:
    - FileNotFoundError: If no valid file path is found.
    """
    reference_path = os.path.dirname(reference_file)

    # Check if the file exists
    if os.path.isfile(file_path):
        return file_path

    # Check file name without path
    file_name = os.path.basename(file_path)
    if os.path.isfile(file_name):
        return file_name

    # Check file name in reference file's directory
    file_in_ref_dir = os.path.join(reference_path, file_name)
    if os.path.isfile(file_in_ref_dir):
        return file_in_ref_dir

    # Check original file path prefixed with reference file's directory
    file_prefixed = os.path.join(reference_path, file_path)
    if os.path.isfile(file_prefixed):
        return file_prefixed

    # If all checks fail, raise an error
    raise FileNotFoundError(f"No valid path found for file: {file_path}")

def group_by_region(lst, partition):
    # from itertools import accumulate
    # partition = [len(x) for x in partition]
    # Compute the cumulative sums once
    # cumsum_vector = list(accumulate(partition))
    # Use slicing based on the cumulative sums
    # return [lst[(cumsum_vector[i-1] if i > 0 else 0):cumsum_vector[i]] for i in range(len(partition))]
    return partition
[get_analysis_regions: shared = ["filtered_region_info", "filtered_regional_xqtl_files", "regional_data"]]
from collections import OrderedDict

def check_required_columns(df, required_columns):
    """Check if the required columns are present in the dataframe."""
    missing_columns = [col for col in required_columns if col not in list(df.columns)]
    if missing_columns:
        raise ValueError(f"Missing required columns: {', '.join(missing_columns)}")

def extract_regional_data(gwas_meta_data, xqtl_meta_data, regions, region_name, gwas_name, gwas_data, column_mapping):
    """
    Extracts data from GWAS and xQTL metadata files and additional GWAS data provided. 

    Args:
    - gwas_meta_data (str): File path to the GWAS metadata file.
    - xqtl_meta_data (str): File path to the xQTL weight metadata file.
    - gwas_name (list): vector of GWAS study names.
    - gwas_data (list): vector of GWAS data.
    - column_mapping (list, optional): vector of column mapping files.

    Returns:
    - Tuple of two dictionaries:
        - GWAS Dictionary: Maps study IDs to a list containing chromosome number, 
          GWAS file path, and optional column mapping file path.
        - xQTL Dictionary: Nested dictionary with region IDs as keys.

    Raises:
    - FileNotFoundError: If any specified file path does not exist.
    - ValueError: If required columns are missing in the input files or vector lengths mismatch.
    """
    # Check vector lengths
    if len(gwas_name) != len(gwas_data):
        raise ValueError("gwas_name and gwas_data must be of equal length")
    
    if len(column_mapping)>0 and len(column_mapping) != len(gwas_name):
        raise ValueError("If column_mapping is provided, it must be of the same length as gwas_name and gwas_data")

    # Required columns for each file type
    required_gwas_columns = ['study_id', 'chrom', 'file_path']
    required_xqtl_columns = ['region_id', '#chr', 'start', 'end', "TSS", 'original_data'] #region_id here is gene name
    required_ld_columns = ['chr', 'start', 'stop']
    
    # Reading the GWAS metadata file
    gwas_df = pd.read_csv(gwas_meta_data, sep="\t")
    check_required_columns(gwas_df, required_gwas_columns)
    gwas_dict = OrderedDict()
    
    # Reading LD regions info
    # Initialize empty DataFrame for regions
    regions_df = pd.DataFrame(columns=['chr', 'start', 'stop'])

    # Check if regions file exists and read it
    if os.path.isfile(regions):
        file_regions_df = pd.read_csv(regions, sep="\t", skipinitialspace=True)
        file_regions_df.columns = [col.strip() for col in file_regions_df.columns]  # Strip spaces from column names
        file_regions_df['chr'] = file_regions_df['chr'].str.strip()
        check_required_columns(file_regions_df, required_ld_columns)
        regions_df = pd.concat([regions_df, file_regions_df])
    # Process region_name if provided: 
    # fomat: region_name = ["chr1_16103_2888443", "chr1_4320284_5853833"]
    if len(region_name) > 0:
        # Split region_name entries into chr, start, and stop columns
        extra_regions = [name.split("_") for name in region_name]
        extra_regions_df = pd.DataFrame(extra_regions, columns=['chr', 'start', 'stop'])
        extra_regions_df['start'] = extra_regions_df['start'].astype(int)
        extra_regions_df['stop'] = extra_regions_df['stop'].astype(int)
        # Add extra regions to regions_df
        regions_df = pd.concat([regions_df, extra_regions_df])
    # Remove duplicates and reset index
    regions_df = regions_df.drop_duplicates().reset_index(drop=True)
    regions_dict = OrderedDict()

    # Reading the xQTL weight metadata file
    xqtl_df = pd.read_csv(xqtl_meta_data, sep=" ")
    check_required_columns(xqtl_df, required_xqtl_columns)
    xqtl_dict = OrderedDict()

    # Process additional GWAS data from R vectors
    for name, data, mapping in zip(gwas_name, gwas_data, column_mapping or [None]*len(gwas_name)):
        gwas_dict[name] = {0: [data, mapping]}

    for _, row in gwas_df.iterrows():
        file_path = row['file_path']
        mapping_file = row.get('column_mapping_file')
        
        # Adjust paths if necessary
        file_path = adapt_file_path(file_path, gwas_meta_data)
        if mapping_file:
            mapping_file = adapt_file_path(mapping_file,  gwas_meta_data)

       # Create or update the entry for the study_id
        if row['study_id'] not in gwas_dict:
            gwas_dict[row['study_id']] = {}

        # Expand chrom 0 to chrom 1-22 or use the specified chrom
        chrom_range = range(1, 23) if row['chrom'] == 0 else [row['chrom']]
        for chrom in chrom_range:
            if chrom in gwas_dict[row['study_id']]:
                existing_entry = gwas_dict[row['study_id']][f'chr{chrom}']
                raise ValueError(f"Duplicate chromosome specification for study_id {row['study_id']}, chrom {chrom}. "
                                 f"Conflicting entries: {existing_entry} and {[file_path, mapping_file]}")
            gwas_dict[row['study_id']][f'chr{chrom}'] = [file_path, mapping_file]
            
    for _, row in regions_df.iterrows():
        LD_region_id = f"{row['chr']}_{row['start']}_{row['stop']}"
        overlapping_xqtls = xqtl_df[(xqtl_df['#chr'] == row['chr']) & 
                                     (xqtl_df['TSS'] <= row['stop']) & 
                                     (xqtl_df['TSS'] >= (row['start']))]
        file_paths = []
        mapped_genes = []
        # Collect file paths for xQTLs overlapping this region
        for _, xqtl_row in overlapping_xqtls.iterrows():
            original_data = xqtl_row['original_data']
            file_list = original_data.split(',') if ',' in original_data else [original_data]
            file_paths.extend([adapt_file_path(fp.strip(), xqtl_meta_data) for fp in file_list])
            mapped_genes.extend([xqtl_row['region_id']] * len(file_list))

        # Store metadata and files in the dictionary
        regions_dict[LD_region_id] = {
            "meta_info": [row['chr'], row['start'], row['stop'], LD_region_id, mapped_genes],
            "files": file_paths
        }
        
    for _, row in xqtl_df.iterrows():
        file_paths = [adapt_file_path(fp.strip(), xqtl_meta_data) for fp in row['original_data'].split(',')]  # Splitting and stripping file paths
        xqtl_dict[row['region_id']] = {"meta_info": [row['#chr'], row['start'], row['end'], row['region_id'], row['contexts']],
                                       "files": file_paths}
    return gwas_dict, xqtl_dict, regions_dict


gwas_dict, xqtl_dict, regions_dict = extract_regional_data(gwas_meta_data, xqtl_meta_data,regions,region_name,gwas_name, gwas_data, column_mapping)
regional_data = dict([("GWAS", gwas_dict), ("xQTL", xqtl_dict), ("Regions", regions_dict)])


# get regions data 
region_info = [x["meta_info"] for x in regional_data['Regions'].values()]
regional_xqtl_files = [x["files"] for x in regional_data['Regions'].values()]

# Filter out empty xQTL file paths
filtered_region_info = []
filtered_regional_xqtl_files = []
skipped_regions =[]

for region, files in zip(region_info, regional_xqtl_files):
    if files:
        filtered_region_info.append(region)
        filtered_regional_xqtl_files.append(files)
    else:
        skipped_regions.append(region)
print(f"Skipping {len(skipped_regions)} out of {len(regional_xqtl_files)} regions, no overlapping xQTL weights found. ")
[twas]
depends: sos_variable("filtered_regional_xqtl_files")
parameter: coverage = "cs_coverage_0.95"
# Threashold for rsq and pvalue for imputability determination for a model 
parameter: rsq_cutoff = 0.01
parameter: rsq_pval_cutoff = 0.05
parameter: mr_pval_cutoff = 0.05
parameter: save_ctwas_data = True
parameter: save_mr_result = True
parameter: rsq_option="rsq"
parameter: rsq_pval_option=["adj_rsq_pval", "pval"]
# load by batches if memory resource is limited, default to load all at once 
parameter: batch_load_memory = 'inf'
parameter: event_filter_rules = path()

input: filtered_regional_xqtl_files, group_by = lambda x: group_by_region(x, filtered_regional_xqtl_files), group_with = "filtered_region_info"
output_files = [f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.twas.tsv.gz']
if save_ctwas_data:
    output_files.append(f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.twas_data.rds')
if save_mr_result:
    output_files.append(f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.mr_result.tsv.gz')
output: output_files
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output[0]:bn}'
R: expand = '${ }', stdout = f"{_output[0]:n}.stdout", stderr = f"{_output[0]:n}.stderr", container = container, entrypoint = entrypoint

    library(dplyr)
    library(data.table)
    library(pecotmr)
    library(readr)

    # get xQTL weight information 
    xqtl_meta_df <- fread("${xqtl_meta_data}", data.table=FALSE) # Get related gene information from the xqtl_meta data table
    xqtl_type_table <- if (isTRUE(file.exists("${xqtl_type_table}"))) fread("${xqtl_type_table}") else NULL
    gene_list <- c(${', '.join([f"'{gene}'" for gene in _filtered_region_info[4]])})
    event_filter_rules = ${"NULL" if not event_filter_rules.is_file() else "'%s'" % event_filter_rules}
    if (!is.null(event_filter_rules)) {
        event_filters <- read.table("${event_filter_rules}")
        event_filters <- lapply(1:nrow(event_filters), function(ii) as.list(event_filters[ii,] %>% unlist))
    } else { event_filters <- NULL }

    # Process weights 
    twas_weights_results <- list()
    weight_db_list <- c(${_input:r,})
    names(weight_db_list) <- gene_list
    weight_db_list <- split(weight_db_list, names(weight_db_list), c)
    # remove weight db files that only contain messages: "No SNPs found in the specified region ..."
    weight_db_list_update <- Filter(Negate(is.null), lapply(weight_db_list, function(file_list){do.call(c,lapply(file_list, function(file) if (file.size(file) > 200) file))}))
    if(length(weight_db_list_update) <= 0) stop(paste0("No weight information available from ", paste0(weight_db_list, collapse=","), ". "))

    # Step 1: Load TWAS weights and generate twas weight db, output export_twas_weights_db 
    for (gene_db in names(weight_db_list_update)) {
        weight_dbs <- weight_db_list_update[[gene_db]]
        twas_weights_results[[gene_db]] = load_twas_weights(weight_dbs, variable_name_obj="variant_names", 
                                                           susie_obj="susie_weights_intermediate",
                                                           twas_weights_table = "twas_weights")
        if (length(twas_weights_results[[gene_db]]) > 1) {
          twas_weights_results[[gene_db]]$data_type <- setNames(lapply(names(twas_weights_results[[gene_db]]$weights), function(context) xqtl_type_table$type[sapply(xqtl_type_table$context,
                                                                function(x) grepl(x, context))]), names(twas_weights_results[[gene_db]]$weights))      
        } else {
          twas_weights_results[[gene_db]] <- NULL
        }                   
    }
    rm(weight_db_list, weight_db_list_update, xqtl_type_table, gene_list)
    gc()
    # batch load twas weights to reduce memory consumption 
    twas_weights_results <- batch_load_twas_weights(
      twas_weights_results = twas_weights_results,
      meta_data_df = xqtl_meta_df,
      max_memory_per_batch = ${"Inf" if batch_load_memory == "inf" else batch_load_memory}
    )
    # Step 2: twas analysis for imputable genes across contexts
    twas_results_db <- list()
    for (batch in 1:length(twas_weights_results)){
        twas_results_db[[batch]] <- twas_pipeline(twas_weights_results[[batch]], 
                                     "${ld_meta_data}", 
                                     "${gwas_meta_data}",  
                                     region_block="${_filtered_region_info[3]}",
                                     rsq_cutoff = ${rsq_cutoff}, 
                                     rsq_option = "${rsq_option}", 
                                     rsq_pval_cutoff = ${rsq_pval_cutoff}, 
                                     rsq_pval_option=c(${", ".join([f'"{x}"' for x in rsq_pval_option])}), 
                                     mr_pval_cutoff = ${mr_pval_cutoff},
                                     mr_coverage_column = "${coverage}", 
                                     output_twas_data = ${"TRUE" if save_ctwas_data else "FALSE"},
                                     event_filters = event_filters)
    }
    rm(twas_weights_results)
    gc()
    twas_results_db <- Filter(Negate(is.null), twas_results_db)

    if(length(twas_results_db)!=0){
        for (batch in 1:length(twas_results_db)){
            twas_results_db[[batch]]$twas_result <- merge(twas_results_db[[batch]]$twas_result, xqtl_meta_df[, c("region_id","TSS","start","end")], 
                                                          by.x="molecular_id", by.y="region_id")
        }
        fwrite(do.call(rbind, lapply(twas_results_db, function(x) x$twas_result[, c(2,1,14:16,3:13)])), ${_output[0]:r}, sep = "\t", compress = "gzip")
    } else {
        fwrite(data.frame(), file = ${_output[0]:r}, sep = "\t", compress = "gzip")
    }
    # Step 3: reformat for follow up cTWAS analysis
    if (${"TRUE" if save_ctwas_data else "FALSE"} & length(twas_results_db)!=0) {
        if (length(twas_results_db) > 1){
            twas_results_db[[1]]$twas_data$weights <- do.call(c, lapply(twas_results_db, function(x) x$twas_data$weights))
            twas_results_db[[1]]$twas_data$susie_weights_intermediate_qced <- do.call(c, lapply(twas_results_db, function(x) x$twas_data$susie_weights_intermediate_qced))
            twas_results_db[[1]]$twas_data$snp_info <- do.call(c, lapply(twas_results_db, function(x) x$twas_data$snp_info))
            studies <- unique(names(find_data(twas_results_db, c(3, "z_gene"))))
            for (study in studies){
                twas_results_db[[1]][["twas_data"]][["z_gene"]][[study]] <- do.call(rbind, lapply(twas_results_db, function(x) x$twas_data$z_gene[[study]]))
                twas_results_db[[1]][["twas_data"]][["z_snp"]][[study]] <- do.call(rbind, lapply(twas_results_db, function(x) x$twas_data$z_snp[[study]]))
            }
        }
        saveRDS(twas_results_db[[1]]$twas_data, "${_output[0]:nnn}.twas_data.rds", compress='xz')
    }
    if (${"TRUE" if save_mr_result else "FALSE"}) {
        if(length(twas_results_db)!=0) {
           fwrite(do.call(rbind, lapply(twas_results_db, function(x) x$mr_result)), file = "${_output[0]:nnn}.mr_result.tsv.gz", sep = "\t", compress = "gzip")
        } else {
            fwrite(data.frame(), file = "${_output[0]:nnn}.mr_result.tsv.gz", sep = "\t", compress = "gzip")
        }
    }
[ctwas_1]
# ctwas_1: merge regions by each chromosome 
depends: sos_variable("filtered_region_info")
# chromosome number to merge for region data, can be an integer or a string, i.e. 'chr1'
parameter: chrom=""
# twas weight cutoff for ctwas variant selection
parameter: twas_weight_cutoff = 0.0
parameter: max_num_variants = Inf
# cs_min_cor cutoff in susie finemapping result for variant selection
parameter: cs_min_cor = 0
# minimum pip cutoff in susie finemapping result for variant selection
parameter: min_pip_cutoff =0 
# A scalar in (0,1]. The proportion of SNPs, reduce runtime at the cost of accuracy 
parameter: thin = 1.0
# Maximum number of SNPs in a region.
parameter: maxSNP = 20000
# skip this step if True
parameter: skip_assembly = False
skip_if(skip_assembly == True, " Skip [ctwas_1] assemble regions. " )
parameter: multi_group = True
# A string character add to the end of name variable
parameter: numThreads = 4

twas_output_files = {}
region_blocks_per_chrom = []
chromosome_list = []
chrom = f"chr{int(chrom)}" if str(chrom).isdigit() else chrom

for region_info in filtered_region_info:
    chrom_info = region_info[0]
    if chrom_info != chrom:
        continue  # Skip chromosomes other than chrom input 
    file_path = f"{cwd}/twas/{name}.{region_info[3]}.twas_data.rds"
    if chrom_info not in twas_output_files:
        twas_output_files[chrom_info] = [[], []]  # Structure: {"chrX": [[files], [region_names]]}
        chromosome_list.append(chrom_info)
    twas_output_files[chrom_info][0].append(file_path)  # File paths
    twas_output_files[chrom_info][1].append(region_info[3])  # Region names
twas_files=[twas_output_files[chr][0] for chr in chromosome_list]
region_blocks_per_chrom = [twas_output_files[chr][1] for chr in chromosome_list]
region_blocks_per_chrom = [f"""c("{'", "'.join(regions)}")""" for regions in region_blocks_per_chrom]
outdir = f"{cwd}/{step_name.split('_')[0]}"
if not os.path.exists(outdir):
    os.makedirs(outdir)
gwas_study = 'c(' + ', '.join(f'"{x}"' for x in gwas_study) + ')'
name_suffix = f'"{name_suffix}"' if name_suffix else 'NULL'

input: twas_files, group_by = lambda x: group_by_region(x, twas_files),  group_with=dict(region_names=region_blocks_per_chrom)
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads
R: expand = '${ }', stdout = f"{cwd:a}/{step_name.split('_')[0]}/{name}.ctwas_region_data.{chrom}.thin{thin}.stdout", 
stderr = f"{cwd:a}/{step_name.split('_')[0]}/{name}.ctwas_region_data.{chrom}.thin{thin}.stderr", container = container, entrypoint = entrypoint

    library(data.table)
    library(ctwas) # multigroup_ctwas
    library(pecotmr)
    outputdir = "${cwd:a}/${step_name.split('_')[0]}"
    
    # load genome-wide data across all regions
    gwas_meta_data <- fread("${gwas_meta_data}",data.table=FALSE)
    gwas_studies <- if (length(${gwas_study})!=0) ${gwas_study} else unique(gwas_meta_data$study_id)[,1]
    gwas_files <- gwas_meta_data[gwas_meta_data$chrom == readr::parse_number("${chrom}") & gwas_meta_data$study_id %in% gwas_studies,, drop=FALSE]
    gwas_files <- setNames(gwas_files$file_path, gwas_files$study_id)

    merge_context_names <- function(ctwas_dat_file){
        ctwas_dat<- readRDS(ctwas_dat_file)
        idxs <- which(grepl("_chr[0-9XY]+_[^_]+", names(ctwas_dat$weights)))
        if (length(idxs)>0) {
            names(ctwas_dat$weights) <- gsub("_chr[0-9XY]+_[A-Za-z0-9]+", "", names(ctwas_dat$weights))
            for (idx in idxs) {
                for (study in names(ctwas_dat$weights[[idx]])){
                    ctwas_dat$weights[[idx]][[study]]$weight_name <- gsub("_chr[0-9XY]+_[A-Za-z0-9]+", "", ctwas_dat$weights[[idx]][[study]]$weight_name)
                    ctwas_dat$weights[[idx]][[study]]$context <- gsub("_chr[0-9XY]+_[A-Za-z0-9]+", "", ctwas_dat$weights[[idx]][[study]]$context)
                }
            }
            for (study in names(ctwas_dat$z_gene)){
                ctwas_dat$z_gene[[study]]$id <- gsub("_chr[0-9XY]+_[A-Za-z0-9]+", "", ctwas_dat$z_gene[[study]]$id)
                ctwas_dat$z_gene[[study]]$context <- gsub("_chr[0-9XY]+_[A-Za-z0-9]+", "", ctwas_dat$z_gene[[study]]$context)
                ctwas_dat$z_gene[[study]]$group <- gsub("_chr[0-9XY]+_[A-Za-z0-9]+", "", ctwas_dat$z_gene[[study]]$group)
            }
            for (gene in names(ctwas_dat$susie_weights_intermediate_qced)){
                names(ctwas_dat$susie_weights_intermediate_qced[[gene]]) <-  gsub("_chr[0-9XY]+_[^_]+", "", names(ctwas_dat$susie_weights_intermediate_qced[[gene]]))
            }
        }
        idxs <- which(grepl("monocyte", names(ctwas_dat$weights)))
        if (length(idxs)>0) {
            names(ctwas_dat$weights) <- gsub("monocyte_eQTL", "eQTL", names(ctwas_dat$weights))
            for (idx in idxs) {
                for (study in names(ctwas_dat$weights[[idx]])){
                    ctwas_dat$weights[[idx]][[study]]$weight_name <- gsub("monocyte_eQTL", "eQTL", ctwas_dat$weights[[idx]][[study]]$weight_name)
                    ctwas_dat$weights[[idx]][[study]]$type <- "eQTL"
                }
            }
            for (study in names(ctwas_dat$z_gene)){
                ctwas_dat$z_gene[[study]]$id <- gsub("monocyte_eQTL", "eQTL", ctwas_dat$z_gene[[study]]$id)
                ctwas_dat$z_gene[[study]]$type <- gsub("monocyte_", "", ctwas_dat$z_gene[[study]]$type)
                ctwas_dat$z_gene[[study]]$group <- gsub("monocyte_eQTL", "eQTL", ctwas_dat$z_gene[[study]]$group)
            }
        }
        return(ctwas_dat)
    }
    weight_list = vector("list", length(gwas_studies))
    names(weight_list) <- gwas_studies
    z_gene <- weight_list  # empty list  
    z_snp <- weight_list
 
    # merge chromosome wide data, weight, z_gene, z_snp from the twas pipeline output of twas.data.rds files 
    for (region_dat in c(${_input:r,})){
        ctwas_dat <- merge_context_names(region_dat)
        if (is.null(ctwas_dat)) next 
        weight_tmp <- trim_ctwas_variants(ctwas_dat, twas_weight_cutoff=${twas_weight_cutoff}, cs_min_cor=${cs_min_cor},
                            min_pip_cutoff=${min_pip_cutoff}, max_num_variants=${max_num_variants})
        for (study in names(weight_tmp)) {
            if (!study %in% gwas_studies) next
            weight_list[[study]] <- c(weight_list[[study]], weight_tmp[[study]])
            z_gene[[study]] <- rbind(z_gene[[study]], ctwas_dat$z_gene[[study]])
            z_gene[[study]] <- z_gene[[study]][!is.na(z_gene[[study]]$z) & !is.infinite(z_gene[[study]]$z) & z_gene[[study]]$id %in% names(weight_list[[study]]),]        
        }
        rm(ctwas_dat, weight_tmp)
        gc()
    }
    z_gene <- Filter(Negate(is.null), z_gene)
    weight_list <- Filter(Negate(is.null), weight_list)
    if (length(z_gene)==0 | length(weight_list)==0) stop("Please check input data. No twas z-score value available. ")
    
    # get LD snp info table (snp_map) and ld variants 
    max_pos <- fread("${ld_meta_data}", data.table=FALSE)
    max_pos <- max(max_pos$end[max_pos[,1]=="${chrom}"])
    region_of_interest <- data.frame(chrom = "${chrom}", start = 0, end = max_pos+1)
    snp_map <- load_ld_snp_info("${ld_meta_data}", region_of_interest)
    ld_variants <- paste("chr", unlist(lapply(snp_map, function(x) x$id)))
    ld_variants <- ld_variants[!duplicated(ld_variants)]

    regions_data <- get_ctwas_meta_data("${ld_meta_data}", names(snp_map))
    region_info <- regions_data$region_info
    LD_map <- regions_data$LD_info
    if (!is.null(${name_suffix})) name <- paste0("${name}_", ${name_suffix})  else name <- "${name}"
    saveRDS(LD_map, file.path(outputdir, paste0(name, ".LD_map.${chrom}.rds")), compress='xz')
    saveRDS(snp_map, file.path(outputdir, paste0(name, ".snp_map.${chrom}.rds")), compress='xz')
    rm(regions_data, LD_map)
    gc()
    
    # for each gwas study - merge ctwas input data        
    for (study in names(weight_list)){
      if (is.null(z_gene[[study]])) next
      if (${"TRUE" if multi_group else "FALSE"}) {
          outname <- paste0(name, ".ctwas_region_data.", study, ".${chrom}")
      } else {
         contexts <- unique(do.call(c,lapply(z_gene, function(x) unique(x$context))))
         outname <- paste0(name, ".ctwas_region_data.", study, ".", contexts, ".${chrom}")
         names(outname) <- contexts
      }
      z_snp <- harmonize_gwas(gwas_files[study], query_region=paste0(readr::parse_number("${chrom}"), ":0-", max_pos+1), ld_variants, c("beta", "z"), match_min_prop = 0)
      z_snp <- z_snp[,c("variant_id", "A1", "A2", "z")]
      colnames(z_snp)[1] <-"id" 

      # iterate by study-context pair (single group) or by study itself (multigroup) 
      for (group_name in outname){ 
          # assemble regions         
          if (${"TRUE" if multi_group else "FALSE"}) {
              subset_inx <- names(weight_list[[study]])
          } else {
              context <- names(outname)[which(outname==group_name)]
              if (length(weight_list[[study]][grepl(context, names(weight_list[[study]]))])==0) next
              subset_inx <- which(grepl(context, names(weight_list[[study]])))
          }
          res <- assemble_region_data(region_info,
                                    z_snp,
                                    z_gene[[study]],
                                    weight_list[[study]][subset_inx],
                                    snp_map,
                                    thin = ${thin},
                                    maxSNP = ${maxSNP},
                                    min_group_size = 1,
                                    ncore = ${numThreads})
          region_data <- res$region_data
          boundary_genes <- res$boundary_genes
          region_data_file <- file.path(outputdir, paste0(group_name, ".thin", ${thin}, ".rds"))
          boundary_gene_file <- gsub("region_data","boundary_genes", region_data_file)
          saveRDS(region_data, region_data_file, compress='xz')
          saveRDS(boundary_genes, boundary_gene_file, compress='xz')   
      }
      saveRDS(weight_list[[study]], file.path(outputdir, paste0(name,".ctwas_weights.", study, ".${chrom}.rds")), compress='xz')
      saveRDS(list(z_snp=z_snp, z_gene=z_gene[[study]]), file.path(outputdir,  paste0(name, ".z_gene_snp.", study, ".${chrom}.rds")), compress='xz')
      weight_list[[study]] <- NULL
    }
[ctwas_2]
## ctwas_2: estimate global group prior using from all regions 
parameter: run_param_est = False
parameter: skip_assembly = False
parameter: thin=1.0
parameter: prior_var_structure = "shared_all"
parameter: multi_group = True
parameter: numThreads = 8

thin=int(thin) if thin == 1.0 else thin
skip_if(run_param_est == False, " Skip [ctwas_2] parameter estimation. " )
name = f"{name}_{name_suffix}" if name_suffix else name

gwas_study = 'c(' + ', '.join(f'"{x}"' for x in gwas_study) + ')'
if multi_group:
    input_pattern = f"{cwd:a}/{step_name.split('_')[0]}/{name}.ctwas_region_data.*.chr*.thin{thin}.rds" # by study
else:
    input_pattern = f"{cwd:a}/{step_name.split('_')[0]}/{name}.ctwas_region_data.*.*.chr*.thin{thin}.rds" # by study-context pair

input: input_pattern, group_by="all"
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads
R: expand = '${ }', stdout = f"{cwd:a}/{step_name.split('_')[0]}/{name}.ctwas_param_est.{prior_var_structure}.thin{thin}.stdout", 
stderr = f"{cwd:a}/{step_name.split('_')[0]}/{name}.ctwas_param_est.{prior_var_structure}.thin{thin}.stderr", container = container, entrypoint = entrypoint
    
    library(ctwas)
    library(data.table)
    library(pecotmr)
    outputdir = "${cwd:a}/${step_name.split('_')[0]}"
    region_data_files <- c(${_input:r,})
    names(region_data_files) <- gsub('^.*${name}.ctwas_region_data.\\s*|\\s*.chr*.*$', '', region_data_files)   
    gwas_studies <- if (length(${gwas_study})!=0) ${gwas_study} else names(region_data)

    # assess global parameters
    for (study in gwas_studies){
        region_files <- region_data_files[grepl(study, region_data_files)]
        region_data <- setNames(lapply(region_files, readRDS), names(region_files))
        contexts <- unique(find_data(region_data, c(4, "context")))
        # subset either study specific or study-context specific region data based on multigroup/single group
        if (${"TRUE" if multi_group else "FALSE"}) {
            ## subset region data files that are not context-specific but study-specific 
            region_data <- region_data[!sapply(names(region_data), function(x) any(sapply(contexts, function(i) grepl(i, x))))]
            group_name <- 'multi'
        } else {
            region_data <- region_data[sapply(names(region_data), function(x) any(sapply(contexts, function(i) grepl(i, x))))]
            group_name <- 'single'      
        }
        region_data <- lapply(split(region_data, names(region_data)), function(x) do.call(c, unname(x)))
        saveRDS(region_data, file.path(outputdir, paste0("${name}.region_data_merged.",  group_name, ".thin${thin}.rds")), compress='xz') #merge and save into one file to be used later
        if (length(region_data[[study]])==0) stop("Study ", study, " not present in region_data ", file.path(outputdir, paste0("${name}.region_data_merged.",  group_name, ".thin${thin}.rds")))
        group_prior_file <- file.path(outputdir, paste0("${name}.ctwas_param_${prior_var_structure}.", study, ".thin${thin}.rds"))
        message("Start global parameter estimation for ", study, ". ")
        param <- est_param(region_data[[study]],
                  group_prior_var_structure = "${prior_var_structure}", 
                  niter_prefit = 3,
                  niter = 50,
                  min_group_size = 10,
                  ncore = ${numThreads},
                  verbose = TRUE)
        saveRDS(param, group_prior_file, compress='xz')
        region_data[[study]] <- NULL
    }
[ctwas_3]
## ctwas_3: perform finemapping for each region 
parameter: thin=1.0
parameter: maxSNP = 20000
parameter: max_iter = 0
parameter: run_finemapping = False
parameter: prior_var_structure = "shared_all"
# A list of regions to be subset for screening and fine-mapping, for example: "10_80126158_82231647"
parameter: region_name =[]
parameter: numThreads = 4
parameter: multi_group = True
parameter: merge_regions=False
parameter: L=5
import glob

skip_if(run_finemapping == False, " Skip [ctwas_3] fine-mapping. " )
thin=int(thin) if thin == 1.0 else thin
name = f"{name}_{name_suffix}" if name_suffix else name
region_data_file = (
    f"{cwd:a}/{step_name.split('_')[0]}/{name}.region_data_merged.multi.thin{thin}.rds"
    if multi_group
    else f"{cwd:a}/{step_name.split('_')[0]}/{name}.region_data_merged.single.thin{thin}.rds"
)

region_info_list = []
for region in region_name:
    chrom = region.split("_")[0]
    base = f"{cwd}/{step_name.split('_')[0]}/{name}"
    if gwas_study:
        weights = [f"{base}.ctwas_weights.{study}.{chrom}.rds" for study in gwas_study]
        zfiles = [f"{base}.z_gene_snp.{study}.{chrom}.rds" for study in gwas_study]
        params = [x for study in gwas_study for x in glob.glob(f"{base}.ctwas_param_{prior_var_structure}.{study}*.thin{thin}.rds")]
    else:
        weights = [f"{base}.ctwas_weights.*.{chrom}.rds"]
        zfiles = [f"{base}.z_gene_snp.*.{chrom}.rds"]
        params = [f"{base}.ctwas_param_{prior_var_structure}.*.thin{thin}.rds"]
    region_info_list.append({
        "region_name": region,
        "weights": weights,
        "zfiles": zfiles,
        "params": params
    })
gwas_study = 'c(' + ', '.join(f'"{x}"' for x in gwas_study) + ')'

input: region_info_list[_index]["params"], for_each = "region_info_list"
region_name = region_info_list[_index]['region_name']
weight_files = region_info_list[_index]['weights']
z_snp_file  = region_info_list[_index]['zfiles']
params = region_info_list[_index]['params']
chrom = region_name.split("_")[0]
depends: region_data_file, f"{cwd}/{step_name.split('_')[0]}/{name}.snp_map.{chrom}.rds",f"{cwd}/{step_name.split('_')[0]}/{name}.ld_map.{chrom}.rds"
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads
R: expand = '${ }', stdout = f"{cwd:a}/{step_name.split('_')[0]}/{name}.ctwas_finemap_res.{prior_var_structure}.{region_name}.thin{thin}.stdout", 
stderr = f"{cwd:a}/{step_name.split('_')[0]}/{name}.ctwas_finemap_res.{prior_var_structure}.{region_name}.thin{thin}.stderr", container = container, entrypoint = entrypoint

    library(ctwas)
    library(pecotmr)
    library(data.table)
    outputdir = "${cwd:a}/${step_name.split('_')[0]}"

    gwas_meta_data <- fread("${gwas_meta_data}",data.table=FALSE)
    gwas_studies <- if (length(${gwas_study})!=0) ${gwas_study} else unique(gwas_meta_data$study_id)[,1]

    weight_files <- ${'c(' + ', '.join(f'"{x}"' for x in weight_files) + ')'}
    z_snp_files <- ${'c(' + ', '.join(f'"{x}"' for x in z_snp_file) + ')'}
    param_study_files <- ${'c(' + ', '.join(f'"{x}"' for x in params) + ')'}
    names(param_study_files) <- gsub('^.*.ctwas_param_${prior_var_structure}.\\s*|\\s*.thin${thin}.*$', '', param_study_files)
    param <- setNames(lapply(unname(param_study_files), readRDS), names(param_study_files))
    if (${"TRUE" if multi_group else "FALSE"}){
        param <- param[!sapply(names(param), function(x) any(sapply(paste0(gwas_studies, "."), function(c) grepl(c, x))))] # gwas_study per each prior 
    } else {
        param <- param[sapply(names(param), function(x) any(sapply(paste0(gwas_studies, "."), function(c) grepl(c, x))))] # gwas_study x context pair per each prior 
    }

    region_data <- readRDS("${region_data_file}")
    names(weight_files) <- gsub('^.*.ctwas_weights.\\s*|\\s*.${chrom}.*$', '', weight_files) # gwas study names regardless of single/multigroup
    LD_map <- readRDS("${cwd}/${step_name.split('_')[0]}/${name}.ld_map.${chrom}.rds")
    snp_map <- readRDS("${cwd}/${step_name.split('_')[0]}/${name}.snp_map.${chrom}.rds")
    names(z_snp_files) <- gsub('^.*.z_gene_snp.\\s*|\\s*.${chrom}.*$', '', z_snp_files)

    ## loop through gwas studies (multigroup) / gwas_study_context groups (single_group)
    for (study in names(param)){
        finemap_res_file <- file.path(outputdir, paste0("${name}.ctwas_finemap_res.${prior_var_structure}.${region_name}.", study, ".thin${thin}.tsv"))
        susie_alpha_file <- gsub("ctwas_finemap_res", "ctwas_susie_alpha_res", finemap_res_file)
        if (nrow(region_data[[study]][[gsub("chr", "", "${region_name}")]]$z_gene)==0) {
            message("No z_gene data available for ", study, " in ${region_name}. ")
            fwrite(data.frame(), finemap_res_file, sep = "\t", compress = "gzip")
            next
        }
        
        gwas_study <- study 
        if (${"TRUE" if multi_group else "FALSE"}){
            weights <- readRDS(weight_files[gwas_study])
        } else { # single group 
            context <- gsub( "\\|.*$", "", names(param[[study]][['group_prior']])[1])
            gwas_study <- gsub(paste0(".", context),"", study)
            weights <- readRDS(weight_files[gwas_study])
            if (length(weights[grepl(context, names(weights))])==0) next # no context specific weight data available 
            weights <- weights[which(grepl(context, names(weights)))] # subset
        }
        group_prior <- param[[study]]$group_prior
        group_prior_var <- param[[study]]$group_prior_var 
        z_snp_list <- readRDS(z_snp_files[gwas_study])
        z_snp <- z_snp_list$z_snp
        z_gene <- z_snp_list$z_gene

        if (${thin} < 1){
            region_data[[study]][gsub("chr", "", "${region_name}")] <- expand_region_data(region_data[[study]][[gsub("chr", "", "${region_name}")]],
                                              snp_map,
                                              z_snp,
                                              maxSNP = ${maxSNP},
                                              ncore = ${numThreads})
        }
        screen_res <- screen_regions(region_data[[study]][gsub("chr", "", "${region_name}")],
                                       group_prior = group_prior, group_prior_var = group_prior_var, min_nonSNP_PIP = 0.5, min_pval = 5e-8,
                                       ncore = ${numThreads}, verbose = FALSE, logfile = file.path(outputdir, 
                                       paste0("${name}.screen_regions.${prior_var_structure}.${region_name}.thin${thin}.log")))
        screened_region_data <- screen_res$screened_region_data
        # screen_summary <- screen_res$screen_summary
        saveRDS(screen_res, file.path(outputdir, paste0("${name}.screen_regions.${prior_var_structure}.${region_name}.", study, ".thin${thin}.rds")))
        if (length(screened_region_data)==0) {
            message("No region selected for ", study, " in ${region_name}. ")
            fwrite(data.frame(), finemap_res_file, sep = "\t", compress = "gzip")
            next
        }
        message("Screening region completed for ", study, ". ")

        # finemap_regions() argument input 
        args <- list(screened_region_data, LD_map = LD_map, weights = weights, group_prior = group_prior, 
                     group_prior_var = group_prior_var, L = ${L}, ncore = ${numThreads}, save_cor = FALSE, LD_format = "custom", 
                     LD_loader_fun = ctwas_ld_loader, snpinfo_loader_fun = ctwas_bimfile_loader, verbose = TRUE, logfile = file.path(outputdir, 
                     paste0("${name}.finemap_res.${prior_var_structure}.${region_name}.", study, ".thin${thin}.log")))
        if (as.logical(${max_iter})){
            finemap_res_file <- gsub(study, paste0(study,"_max_iter${max_iter}"),finemap_res_file)
            susie_alpha_file <- gsub(study, paste0(study,"_max_iter${max_iter}"),susie_alpha_file)
            args$max_iter <- ${max_iter}
        }
        # ctwas finemapping 
        res <- do.call(finemap_regions, args)
        finemap_res <- res$finemap_res
        finemap_res$pval <- z2p(finemap_res$z)
        susie_alpha_res <- res$susie_alpha_res
        fwrite(finemap_res, finemap_res_file, sep = "\t", compress = "gzip")
        saveRDS(susie_alpha_res, susie_alpha_file, compress='xz')

        # Get LD diagnosis 
        ld_diag <- diagnose_LD_mismatch_susie(region_ids = gsub("chr", "", "${region_name}"), 
                                  z_snp = z_snp,
                                  LD_map = LD_map, 
                                  gwas_n = nrow(z_snp),
                                  p_diff_thresh = 5e-8,
                                  ncore = ${numThreads},
                                  LD_format = "custom",
                                  LD_loader_fun = ctwas_ld_loader,
                                  snpinfo_loader_fun = ctwas_bimfile_loader)
        problematic_genes <- get_problematic_genes(ld_diag$problematic_snps, 
                                           weights, 
                                           finemap_res, 
                                           pip_thresh = 0.5)
        # finemapping with no LD for problematic genes 
        problematic_region_ids <- unlist(unique(finemap_res[finemap_res$id %in% problematic_genes, "region_id"]))
        if (length(problematic_region_ids) > 0) {
          message(problematic_region_ids)
          rerun_region_data <- screened_region_data[problematic_region_ids] # using already expanded screened_region_data
          res <- finemap_regions_noLD(rerun_region_data, 
                                      group_prior = group_prior,
                                      group_prior_var = group_prior_var)
          finemap_res <- res$finemap_res
          susie_alpha_res <- res$susie_alpha_res
          message("Fine-mapping without LD for region ${region_name} with ", study, ". ")
          finemap_res_file <- gsub(study, paste0(study, "_update"), finemap_res_file)
          susie_alpha_file <- gsub("ctwas_finemap_res", "ctwas_susie_alpha_res", finemap_res_file)
          fwrite(finemap_res, finemap_res_file, sep = "\t", compress = "gzip")
          saveRDS(susie_alpha_res, susie_alpha_file, compress='xz')
        }
        saveRDS(ld_diag, file.path(outputdir, paste0("${name}.ctwas_ld_diag.${prior_var_structure}.${region_name}.", study,".thin${thin}.rds")))
        message("Fine-mapping completed for region ${region_name} with ", study, ". ")
        rm(screened_region_data, ld_diag, susie_alpha_res, res, screen_res)
        gc()

        ## region merging for genes in multiple regions  
        boundary_genes <- file.path(outputdir, paste0("${name}.ctwas_boundary_genes.", study, ".${chrom}.thin${thin}.rds"))
        if (file.exists(boundary_genes) & ${"TRUE" if merge_regions else "FALSE"}){
            boundary_genes <- readRDS(boundary_genes)
            high_PIP_finemap_gene_res <- subset(finemap_res, group != "SNP" & susie_pip > 0.5 & !is.na(cs))
            high_PIP_genes <- unique(high_PIP_finemap_gene_res$id)
            selected_boundary_genes <- boundary_genes[boundary_genes$id %in% high_PIP_genes, , drop=FALSE]
            if (nrow(selected_boundary_genes)!=0){
                message("Fine-mapping for merged region for ${region_name} with ", study, ". ")
                region_info <- get_ctwas_meta_data("${ld_meta_data}", names(snp_map))$region_info
                merge_region_res <- merge_region_data(selected_boundary_genes,
                                               region_data[[study]],
                                               region_info = region_info,
                                               LD_map = LD_map,
                                               snp_map = snp_map,
                                               weights = weights,
                                               z_snp = z_snp,
                                               z_gene = z_gene,
                                               estimate_L = TRUE,
                                               maxSNP = ${maxSNP},
                                               LD_format = "custom", 
                                               LD_loader_fun = ctwas_ld_loader, 
                                               snpinfo_loader_fun = ctwas_bimfile_loader)
                finemap_merged_regions_res <- finemap_regions(merge_region_res$merged_region_data,
                                              LD_map = merge_region_res$merged_LD_map,
                                              weights = weights,
                                              group_prior = group_prior,
                                              group_prior_var = group_prior_var,
                                              L = merge_region_res$merged_region_L,
                                              save_cor = FALSE,
                                              LD_format = "custom", 
                                              LD_loader_fun = ctwas_ld_loader, 
                                              snpinfo_loader_fun = ctwas_bimfile_loader)
                finemap_res_file <- gsub(study, paste0(study,"_merged"),finemap_res_file)
                susie_alpha_file <- gsub("ctwas_finemap_res", "ctwas_susie_alpha_res", finemap_res_file)
                finemap_res <- finemap_merged_regions_res$finemap_res
                finemap_res$pval <- z2p(finemap_res$z)
                fwrite(finemap_res, finemap_res_file, sep = "\t", compress = "gzip")
                saveRDS(finemap_merged_regions_res$susie_alpha_res, susie_alpha_file, compress='xz')
            } else {
                message("No high pip genes found for merged region for fine-mapping. ")
            }
        }
        rm(merge_region_res,boundary_genes,finemap_res, weights, screen_res)
        gc()
    }
[quantile_twas]
depends: sos_variable("filtered_regional_xqtl_files")
parameter: save_ctwas_data = True
parameter: save_mr_result = False
input: filtered_regional_xqtl_files, group_by = lambda x: group_by_region(x, filtered_regional_xqtl_files), group_with = "filtered_region_info"
output_files = [f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.quantile_twas.tsv.gz']
if save_ctwas_data:
    output_files.append(f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.quantile_twas_data.rds')
if save_mr_result:
    output_files.append(f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.quantile_mr_result.tsv.gz')
output: output_files
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output[0]:bn}'
R: expand = '${ }', stdout = f"{_output[0]:n}.stdout", stderr = f"{_output[0]:n}.stderr", container = container, entrypoint = entrypoint

    library(dplyr)
    library(data.table)
    library(pecotmr)
    library(readr)
    
    # get xQTL weight information
    xqtl_meta_df <- fread("${xqtl_meta_data}") # Get related gene information from the xqtl_meta data table
    xqtl_type_table <- if (isTRUE(file.exists("${xqtl_type_table}"))) fread("${xqtl_type_table}") else NULL
    gwas_studies = c(${paths(regional_data["GWAS"].keys()):r,})
    gwas_files = c(${paths([v[_filtered_region_info[0]] for k, v in regional_data["GWAS"].items()]):r,})
    gene_list <- c(${', '.join([f"'{gene}'" for gene in _filtered_region_info[4]])})

    # Initialize export_twas_weights_db
    export_twas_weights_db <- list()
    export_twas_weights_db[["${_filtered_region_info[3]}"]] <- list()

    # Initialize twas_weights_results
    twas_weights_results <- list()

    # Create initial weight_db_list
    weight_db_list <- c(${_input:r,})
    names(weight_db_list) <- gene_list

    # Split weight_db_list
    weight_db_list <- split(weight_db_list, names(weight_db_list))

    # Update weight_db_list
    weight_db_list_update <- lapply(weight_db_list, function(file_list) {
        valid_files <- Filter(function(file) {
            if (file.size(file) > 200) {
                content <- tryCatch({
                    data <- readRDS(file)
                }, error = function(e) {
                    warning(paste("Failed to read RDS file:", file))
                    NULL
                })

                return(!is.null(content) && any(sapply(content, function(gene_data) {
                    is.list(gene_data) && any(sapply(gene_data, function(context_data) {
                        is.list(context_data) && "twas_variant_names" %in% names(context_data)
                    }))
                })))
            }
            return(FALSE)
        }, file_list)

        if (length(valid_files) == 0) return(NULL)

        return(valid_files)
    })

    weight_db_list_update <- Filter(Negate(is.null), weight_db_list_update)

    if (length(weight_db_list_update) == 0 || all(sapply(weight_db_list_update, length) == 0)) {
        message("No valid twas weight files found after filtering. Exiting the script.")
        quit(save = "no", status = 0) 
    }

    # Check if weight_db_list_update is empty
    # Define tau_values
    tau_values <- seq(0.01, 0.99, 0.01)
    # Main processing loop
    for (gene_db in names(weight_db_list_update)) {
        weight_dbs <- weight_db_list_update[[gene_db]]
        
        twas_weights_results[[gene_db]] <- load_quantile_twas_weights(
            weight_db_files = weight_dbs,
            tau_values = tau_values,
            between_cluster = 0.8,
            num_intervals = 3
        )
        
        if (!is.null(twas_weights_results[[gene_db]]) && !is.null(twas_weights_results[[gene_db]]$weights)) {
            twas_weights_results[[gene_db]]$data_type <- setNames(
                lapply(names(twas_weights_results[[gene_db]]$weights), function(context) {
                    xqtl_type_table$type[sapply(xqtl_type_table$context, function(x) grepl(x, context))]
                }),
                names(twas_weights_results[[gene_db]]$weights)
            )
        } else {
            print(paste("Warning: No valid weights found for gene:", gene_db))
        }
    }

    if (length(twas_weights_results) == 0 || all(sapply(twas_weights_results, is.null))) {
        stop("twas_weights_results is empty or invalid. Exiting.")
    }

    saveRDS(twas_weights_results, "${_output[0]:nnn}.grouped_quantile_twas_weight.rds", compress='xz')
    #twas_weights_results
    #Step 2: twas analysis for imputable genes across contexts
    twas_results_db <- twas_pipeline(twas_weights_data = twas_weights_results, 
                                    ld_meta_file_path = "${ld_meta_data}", 
                                    gwas_meta_file = "${gwas_meta_data}", 
                                    region_block = "${_filtered_region_info[3]}", 
                                    quantile_twas = TRUE,  
                                    output_twas_data = ${"TRUE" if save_ctwas_data else "FALSE"} )

    # Merging with xQTL meta-data      
    if (is.null(twas_results_db$twas_result) || nrow(twas_results_db$twas_result) == 0) {
        message("twas_results_db$twas_result is NULL. Exiting script normally.")
        quit(save = "no", status = 0)  
    }

    message("Merging twas_result with xqtl_meta_df...")    

    # Check twas_results_db before merging
    common_ids <- intersect(twas_results_db$twas_result$molecular_id, xqtl_meta_df$region_id)       
    if (length(common_ids) > 0) {
        twas_results_db$twas_result <- merge(
            twas_results_db$twas_result,
            xqtl_meta_df[, c("region_id", "TSS", "start", "end")],
            by.x = "molecular_id",
            by.y = "region_id"
        )
        twas_results_db$twas_result <- unique(twas_results_db$twas_result)
    } else {
        warning("No common molecular_id and region_id. Skipping merge.")
    }
    
    
    fwrite(twas_results_db$twas_result[, c(2, 1, (ncol(twas_results_db$twas_result)-2):ncol(twas_results_db$twas_result), 3:(ncol(twas_results_db$twas_result)-3))], file = ${_output[0]:r}, sep = "\t", compress = "gzip")

    # Step 3: reformat for follow up cTWAS analysis
    if (${"TRUE" if save_ctwas_data else "FALSE"}) {
        saveRDS(twas_results_db$twas_data, "${_output[0]:nnn}.quantile_twas_data.rds", compress='xz')
    }
    if (${"TRUE" if save_mr_result else "FALSE"}) {
        fwrite(twas_results_db$mr_result, file = "${_output[0]:nnn}.quantile_mr_result.tsv.gz", sep = "\t", compress = "gzip")
    }
    message("quantile twas analysis is completed in this block.")