TWAS, cTWAS and MR#

Introduction#

This module provides software implementations for transcriptome-wide association analysis (TWAS), Quantile TWAS and performs variant selection for providing sparse signals for cTWAS (causal TWAS) analysis as described in Qian et al (2024+) the multi-group cTWAS method. It will additionally perform Mendelian Randomization using fine-mapping instrumental variables (IV) as described in Zhang et al 2020 for “causal” effects estimation and model validation, with the unit of analysis being a single gene-trait pair.

This procedure is a continuation of the SuSiE-TWAS workflow — it assumes that xQTL fine-mapping has been performed and moleuclar traits prediction weights pre-computed (to be used for TWAS). Cross validation for TWAS weights is optional but highly recommended.

Quantile TWAS extends traditional TWAS by testing genetic effects at different quantiles of the trait distribution, which provides insights into genetic associations that vary across the distribution rather than just at the mean.

GWAS data required are GWAS summary statistics and LD matrix for the region of interest.

Step 1: TWAS#

  1. Extract GWAS z-score for region of interest and corresponding LD matrix.

  2. (Optional) perform allele matching QC for the LD matrix with summary stats.

  3. Process weights: for a number of methods such as LASSO, Elastic Net and mr.ash we have to take the weights as is for QTL variants overlapping with GWAS variants. For SuSiE weights it can be adjusted to exactly match GWAS variants.

  4. Perofrm TWAS test for multiple sets of weights.

  5. For each gene, filter TWAS results by keeping the best model selected by CV. Drop the genes that don’t show good evidence of TWAS prediction weights.

Step 2: Variant Selection for Imputable Genes via the Best Prediction Methods#

  1. Determine if the gene is imputable at each context based on the twas_cv performance by adjusted \(r^2\) (>=0.01) and p-values (<0.05).

  2. The imputable gene-context pair will go through variant selection step. Maximum 10 variants with top pip selected from either top_loci table or SuSiE CS set.

  3. Harmonize weights against LD reference and udpate SuSiE weight.

  4. Extract weights by best model for the context then by the variant names were selected from the previous step

Step 3: cTWAS analysis#

FIXME: add more documentation here

Step 4: MR for candidate genes#

  1. Limit MR only to those showing some evidence of cTWAS significance AND have strong instrumental variable (fine-mapping PIP or CS).

  2. Use fine-mapped xQTL with GWAS data to perform MR.

  3. For multiple IV, aggregate individual IV estimates using a fixed-effect meta-analysis procedure.

  4. Identify and exclude results with severe violations of the exclusion restriction (ER) assumption.

Step 5: Quantile TWAS analysis#

  • Use pre-computed TWAS weights (beta for now) for quantile-specific testing.

  • For each quantile level, cluster and integrate by fixed and dynamic region groups, and extract relevant GWAS z-scores and LD matrix for the region of interest.

  • Perform quantile region-specific association tests, identifying genetic variants with effects that vary across different quantile regions of the phenotype distribution.

Input#

GWAS Data Input Interface (Similar to susie_rss)#

I. GWAS Summary Statistics Files

  • Input: Vector of files for one or more GWAS studies.

  • Format:

    • Tab-delimited files that is tabix-indexed by the first chrom(or #chrom) and second pos column.

    • First 4 columns: chrom or #chrom, pos, A1, A2

    • Additional columns can be loaded using column mapping file see below

  • Column Mapping files (optional):

    • Optional YAML file for custom column mapping.

    • Required columns: chrom, pos, A1, A2, z or (betahat and sebetahat).

    • Optional columns: n, var_y (relevant to fine-mapping).

II. GWAS Summary Statistics Meta-File: this is optional and helpful when there are lots of GWAS data to process via the same command

  • Columns: study_id, chromosome number, path to summary statistics file, optional path to column mapping file.

  • Note: Chromosome number 0 indicates a genome-wide file.

eg: gwas_meta.tsv

study_id    chrom    file_path                 column_mapping_file
study1      1        gwas1.tsv.gz         column_mapping.yml
study1      2        gwas2.tsv.gz         column_mapping.yml
study2      0        gwas3.tsv.gz         column_mapping.yml

If both summary stats file (I) and meta data file (II) are specified we will take the union of the two.

III. LD Reference Metadata File

  • Format: Single TSV file.

  • Contents:

    • Columns: #chrom, start, end, path to the LD matrix, genomic build.

    • LD matrix path format: comma-separated, first entry is the LD matrix, second is the bim file.

  • Documentation: Refer to our LD reference preparation document for detailed information.

Output of Fine-Mapping & TWAS Pipeline#

xQTL Weight Database Metadata File:

  • Essential columns: #chr, start, end, TSS, region_id, original_data

  • Structure of the weight database:

    • RDS format.

    • Organized hierarchically: region → context → weight matrix.

    • Each column represents a different method.

eg: xqtl_meta.tsv

#chr start end region_id TSS original_data combined_data combined_data_sumstats contexts contexts_top_loci
chr1 0 6480000 ENSG00000008128 1724356 "KNIGHT_pQTL.ENSG00000008128.univariate_susie_twas_weights.rds, MiGA_eQTL.ENSG00000008128.univariate_susie_twas_weights.rds, MSBB_eQTL.ENSG00000008128.univariate_susie_twas_weights.rds, ROSMAP_Bennett_Klein_pQTL.ENSG00000008128.univariate_susie_twas_weights.rds, ROSMAP_DeJager_eQTL.ENSG00000008128.univariate_susie_twas_weights.rds, ROSMAP_Kellis_eQTL.ENSG00000008128.univariate_susie_twas_weights.rds, ROSMAP_mega_eQTL.ENSG00000008128.univariate_susie_twas_weights.rds, STARNET_eQTL.ENSG00000008128.univariate_susie_twas_weights.rds" Fungen_xQTL.ENSG00000008128.cis_results_db.export.rds Fungen_xQTL.ENSG00000008128.cis_results_db.export_sumstats.rds Knight_eQTL_brain,MiGA_GFM_eQTL,MiGA_GTS_eQTL,MiGA_SVZ_eQTL,MiGA_THA_eQTL,BM_10_MSBB_eQTL,BM_22_MSBB_eQTL,BM_36_MSBB_eQTL,BM_44_MSBB_eQTL,monocyte_ROSMAP_eQTL,Mic_DeJager_eQTL,Ast_DeJager_eQTL,Oli_DeJager_eQTL,Exc_DeJager_eQTL,Inh_DeJager_eQTL,DLPFC_DeJager_eQTL,PCC_DeJager_eQTL,AC_DeJager_eQTL,Mic_Kellis_eQTL,Ast_Kellis_eQTL,Oli_Kellis_eQTL,OPC_Kellis_eQTL,Exc_Kellis_eQTL,Inh_Kellis_eQTL,Ast_mega_eQTL,Exc_mega_eQTL,Inh_mega_eQTL,Oli_mega_eQTL,STARNET_eQTL_Mac Knight_eQTL_brain,MiGA_GFM_eQTL,MiGA_GTS_eQTL,MiGA_SVZ_eQTL,MiGA_THA_eQTL,BM_10_MSBB_eQTL,BM_22_MSBB_eQTL,BM_36_MSBB_eQTL,BM_44_MSBB_eQTL,monocyte_ROSMAP_eQTL,Mic_DeJager_eQTL,Ast_DeJager_eQTL,Oli_DeJager_eQTL,Exc_DeJager_eQTL,Inh_DeJager_eQTL,DLPFC_DeJager_eQTL,PCC_DeJager_eQTL,AC_DeJager_eQTL,Mic_Kellis_eQTL,Ast_Kellis_eQTL,Oli_Kellis_eQTL,OPC_Kellis_eQTL,Exc_Kellis_eQTL,Inh_Kellis_eQTL,Ast_mega_eQTL,Exc_mega_eQTL,Inh_mega_eQTL,Oli_mega_eQTL,STARNET_eQTL_Mac

This file is automatically generated as part of the FunGen-xQTL protocol, although only the essential columns are relevant to our application here.

TWAS region information#

This is required for cTWAS analysis, where multiple TWAS and SNP data within each region are combined for joint inference to select the variables, either genes or SNPs, to figure out which variables are likely to be directly associated with the phenotype of interest, rather than being associated through correlations with true causal variables.

chrom    start    end    block_id  
1        1000     5000   block1    
2        2000     6000   block2
3        3000     7000   block3

Output#

I. A table with the following contents

gwas_study, chrom, start, end, block, gene, TSS, context, is_imputable, method, is_selected_method, rsq_adj_cv, pval_cv, twas_z, twas_pval

where

  • if twas_z is NA it means the context is not imputable for the method of choice

II. a list of refined_twas_weights_data per block, in RDS format, of this structure:

$ region_id
   $ gene 
      $ context
        $ selected_model
        $ is_imputable 
        $ selected_top_variants
        $ selected_model_weights

This will only contain imputatable genes. It should come with a meta-data file like this:

chrom    start    end    block_id  refined_twas_db
1        1000     5000   block1    block1.rds
2        2000     6000   block2    block2.rds
3        3000     7000   block3    block3.rds

Example#

sos run xqtl-protocol/code/pecotmr_integration/twas.ipynb twas \
   --cwd /output/ --name test \
   --gwas_meta_data /mnt/vast/hpc/csg/cl4215/mrmash/workflow/GWAS/gwas_meta.tsv \
   --ld_meta_data /mnt/vast/hpc/csg/data_public/20240409_ADSP_LD_matrix/ld_meta_file.tsv \
   --regions /mnt/vast/hpc/csg/cl4215/mrmash/workflow/pipeline_data/picalm_EUR_LD_blocks.bed \
   --xqtl_meta_data /mnt/vast/hpc/csg/cl4215/mrmash/workflow/pipeline_data/test_data/twas_test_xqtl_meta.tsv \
   --xqtl_type_table /mnt/vast/hpc/csg/cl4215/mrmash/workflow/pipeline_data/data_type_table.txt \
   --rsq_pval_cutoff 0.05 --rsq_cutoff 0.01
sos run xqtl-protocol/code/pecotmr_integration/twas.ipynb ctwas \
   --cwd /output/ --name test \
   --gwas_meta_data /mnt/vast/hpc/csg/cl4215/mrmash/workflow/GWAS/gwas_meta.tsv \
   --ld_meta_data /mnt/vast/hpc/csg/data_public/20240409_ADSP_LD_matrix/ld_meta_file.tsv \
   --regions /mnt/vast/hpc/csg/cl4215/mrmash/workflow/pipeline_data/picalm_EUR_LD_blocks.bed \
   --xqtl_meta_data /mnt/vast/hpc/csg/cl4215/mrmash/workflow/pipeline_data/test_data/twas_test_xqtl_meta.tsv \
   --twas_weight_cutoff 0 \
   --region_name chr10_80126158_82231647 chr11_84267999_86714492 

Here using --region-name we focus the analysis on 2 blocks: format as chr_start_stop.

sos run xqtl-protocol/code/pecotmr_integration/twas.ipynb quantile_twas \
   --cwd /output/ --name test \
   --gwas_meta_data /mnt/vast/hpc/csg/cl4215/mrmash/workflow/GWAS/gwas_meta.tsv \
   --ld_meta_data /mnt/vast/hpc/csg/data_public/20240409_ADSP_LD_matrix/ld_meta_file.tsv \
   --region_name chr11_84267999_86714492 chr7_54681006_57314931 \
   --xqtl_meta_data /home/al4225/project/quantile_twas/quantile_twas_analysis/test_data/small_region_gene_meta_data_test.tsv

It is also possible to analyze a selected list of regions using option --regions. The 3 columns(chr, start, stop) of this file will be used for the block list to analyze. Here for example use the same list of regions as we used for LD block:

sos run xqtl-protocol/code/pecotmr_integration/twas.ipynb quantile_twas \
   --cwd /output/ --name test \
   --gwas_meta_data /mnt/vast/hpc/csg/cl4215/mrmash/workflow/GWAS/gwas_meta.tsv \
   --ld_meta_data /mnt/vast/hpc/csg/data_public/20240409_ADSP_LD_matrix/ld_meta_file.tsv \
   --regions /mnt/vast/hpc/csg/cl4215/mrmash/workflow/pipeline_data/EUR_LD_blocks.bed \
   --xqtl_meta_data /home/al4225/project/quantile_twas/quantile_twas_analysis/test_data/small_region_gene_meta_data_test.tsv
[global]
parameter: cwd = path("output/")
parameter: gwas_meta_data = path()
parameter: xqtl_meta_data = path()
parameter: ld_meta_data = path()
parameter: xqtl_type_table = ''
parameter: gwas_name = []
parameter: gwas_data = []
parameter: column_mapping = []
parameter: regions = path()
# Optional: if a region name is provided 
# the analysis would be focused on the union of provides region list and region names
parameter: region_name = []
parameter: name = f"{xqtl_meta_data:bn}.{gwas_meta_data:bn}"
parameter: container = ''
import re
parameter: entrypoint= ('micromamba run -a "" -n' + ' ' + re.sub(r'(_apptainer:latest|_docker:latest|\.sif)$', '', container.split('/')[-1])) if container else ""
parameter: job_size = 100
parameter: walltime = "5m"
parameter: mem = "8G"
parameter: numThreads = 1

import os
import pandas as pd

def adapt_file_path(file_path, reference_file):
    """
    Adapt a single file path based on its existence and a reference file's path.

    Args:
    - file_path (str): The file path to adapt.
    - reference_file (str): File path to use as a reference for adaptation.

    Returns:
    - str: Adapted file path.

    Raises:
    - FileNotFoundError: If no valid file path is found.
    """
    reference_path = os.path.dirname(reference_file)

    # Check if the file exists
    if os.path.isfile(file_path):
        return file_path

    # Check file name without path
    file_name = os.path.basename(file_path)
    if os.path.isfile(file_name):
        return file_name

    # Check file name in reference file's directory
    file_in_ref_dir = os.path.join(reference_path, file_name)
    if os.path.isfile(file_in_ref_dir):
        return file_in_ref_dir

    # Check original file path prefixed with reference file's directory
    file_prefixed = os.path.join(reference_path, file_path)
    if os.path.isfile(file_prefixed):
        return file_prefixed

    # If all checks fail, raise an error
    raise FileNotFoundError(f"No valid path found for file: {file_path}")

def group_by_region(lst, partition):
    # from itertools import accumulate
    # partition = [len(x) for x in partition]
    # Compute the cumulative sums once
    # cumsum_vector = list(accumulate(partition))
    # Use slicing based on the cumulative sums
    # return [lst[(cumsum_vector[i-1] if i > 0 else 0):cumsum_vector[i]] for i in range(len(partition))]
    return partition
[get_analysis_regions: shared = ["filtered_region_info", "filtered_regional_xqtl_files", "regional_data"]]
from collections import OrderedDict

def check_required_columns(df, required_columns):
    """Check if the required columns are present in the dataframe."""
    missing_columns = [col for col in required_columns if col not in list(df.columns)]
    if missing_columns:
        raise ValueError(f"Missing required columns: {', '.join(missing_columns)}")

def extract_regional_data(gwas_meta_data, xqtl_meta_data, regions, region_name, gwas_name, gwas_data, column_mapping):
    """
    Extracts data from GWAS and xQTL metadata files and additional GWAS data provided. 

    Args:
    - gwas_meta_data (str): File path to the GWAS metadata file.
    - xqtl_meta_data (str): File path to the xQTL weight metadata file.
    - gwas_name (list): vector of GWAS study names.
    - gwas_data (list): vector of GWAS data.
    - column_mapping (list, optional): vector of column mapping files.

    Returns:
    - Tuple of two dictionaries:
        - GWAS Dictionary: Maps study IDs to a list containing chromosome number, 
          GWAS file path, and optional column mapping file path.
        - xQTL Dictionary: Nested dictionary with region IDs as keys.

    Raises:
    - FileNotFoundError: If any specified file path does not exist.
    - ValueError: If required columns are missing in the input files or vector lengths mismatch.
    """
    # Check vector lengths
    if len(gwas_name) != len(gwas_data):
        raise ValueError("gwas_name and gwas_data must be of equal length")
    
    if len(column_mapping)>0 and len(column_mapping) != len(gwas_name):
        raise ValueError("If column_mapping is provided, it must be of the same length as gwas_name and gwas_data")

    # Required columns for each file type
    required_gwas_columns = ['study_id', 'chrom', 'file_path']
    required_xqtl_columns = ['region_id', '#chr', 'start', 'end', "TSS", 'original_data'] #region_id here is gene name
    required_ld_columns = ['chr', 'start', 'stop']
    
    # Reading the GWAS metadata file
    gwas_df = pd.read_csv(gwas_meta_data, sep="\t")
    check_required_columns(gwas_df, required_gwas_columns)
    gwas_dict = OrderedDict()
    
    # Reading LD regions info
    # Initialize empty DataFrame for regions
    regions_df = pd.DataFrame(columns=['chr', 'start', 'stop'])

    # Check if regions file exists and read it
    if os.path.isfile(regions):
        file_regions_df = pd.read_csv(regions, sep="\t", skipinitialspace=True)
        file_regions_df.columns = [col.strip() for col in file_regions_df.columns]  # Strip spaces from column names
        file_regions_df['chr'] = file_regions_df['chr'].str.strip()
        check_required_columns(file_regions_df, required_ld_columns)
        regions_df = pd.concat([regions_df, file_regions_df])
    # Process region_name if provided: 
    # fomat: region_name = ["chr1_16103_2888443", "chr1_4320284_5853833"]
    if len(region_name) > 0:
        # Split region_name entries into chr, start, and stop columns
        extra_regions = [name.split("_") for name in region_name]
        extra_regions_df = pd.DataFrame(extra_regions, columns=['chr', 'start', 'stop'])
        extra_regions_df['start'] = extra_regions_df['start'].astype(int)
        extra_regions_df['stop'] = extra_regions_df['stop'].astype(int)
        # Add extra regions to regions_df
        regions_df = pd.concat([regions_df, extra_regions_df])
    # Remove duplicates and reset index
    regions_df = regions_df.drop_duplicates().reset_index(drop=True)
    regions_dict = OrderedDict()

    # Reading the xQTL weight metadata file
    xqtl_df = pd.read_csv(xqtl_meta_data, sep=" ")
    check_required_columns(xqtl_df, required_xqtl_columns)
    xqtl_dict = OrderedDict()

    # Process additional GWAS data from R vectors
    for name, data, mapping in zip(gwas_name, gwas_data, column_mapping or [None]*len(gwas_name)):
        gwas_dict[name] = {0: [data, mapping]}

    for _, row in gwas_df.iterrows():
        file_path = row['file_path']
        mapping_file = row.get('column_mapping_file')
        
        # Adjust paths if necessary
        file_path = adapt_file_path(file_path, gwas_meta_data)
        if mapping_file:
            mapping_file = adapt_file_path(mapping_file,  gwas_meta_data)

       # Create or update the entry for the study_id
        if row['study_id'] not in gwas_dict:
            gwas_dict[row['study_id']] = {}

        # Expand chrom 0 to chrom 1-22 or use the specified chrom
        chrom_range = range(1, 23) if row['chrom'] == 0 else [row['chrom']]
        for chrom in chrom_range:
            if chrom in gwas_dict[row['study_id']]:
                existing_entry = gwas_dict[row['study_id']][f'chr{chrom}']
                raise ValueError(f"Duplicate chromosome specification for study_id {row['study_id']}, chrom {chrom}. "
                                 f"Conflicting entries: {existing_entry} and {[file_path, mapping_file]}")
            gwas_dict[row['study_id']][f'chr{chrom}'] = [file_path, mapping_file]
            
    for _, row in regions_df.iterrows():
        LD_region_id = f"{row['chr']}_{row['start']}_{row['stop']}"
        overlapping_xqtls = xqtl_df[(xqtl_df['#chr'] == row['chr']) & 
                                     (xqtl_df['TSS'] <= row['stop']) & 
                                     (xqtl_df['TSS'] >= (row['start']))]
        file_paths = []
        mapped_genes = []
        # Collect file paths for xQTLs overlapping this region
        for _, xqtl_row in overlapping_xqtls.iterrows():
            original_data = xqtl_row['original_data']
            file_list = original_data.split(',') if ',' in original_data else [original_data]
            file_paths.extend([adapt_file_path(fp.strip(), xqtl_meta_data) for fp in file_list])
            mapped_genes.extend([xqtl_row['region_id']] * len(file_list))

        # Store metadata and files in the dictionary
        regions_dict[LD_region_id] = {
            "meta_info": [row['chr'], row['start'], row['stop'], LD_region_id, mapped_genes],
            "files": file_paths
        }
        
    for _, row in xqtl_df.iterrows():
        file_paths = [adapt_file_path(fp.strip(), xqtl_meta_data) for fp in row['original_data'].split(',')]  # Splitting and stripping file paths
        xqtl_dict[row['region_id']] = {"meta_info": [row['#chr'], row['start'], row['end'], row['region_id'], row['contexts']],
                                       "files": file_paths}
    return gwas_dict, xqtl_dict, regions_dict


gwas_dict, xqtl_dict, regions_dict = extract_regional_data(gwas_meta_data, xqtl_meta_data,regions,region_name,gwas_name, gwas_data, column_mapping)
regional_data = dict([("GWAS", gwas_dict), ("xQTL", xqtl_dict), ("Regions", regions_dict)])


# get regions data 
region_info = [x["meta_info"] for x in regional_data['Regions'].values()]
regional_xqtl_files = [x["files"] for x in regional_data['Regions'].values()]

# Filter out empty xQTL file paths
filtered_region_info = []
filtered_regional_xqtl_files = []
skipped_regions =[]

for region, files in zip(region_info, regional_xqtl_files):
    if files:
        filtered_region_info.append(region)
        filtered_regional_xqtl_files.append(files)
    else:
        skipped_regions.append(region)
print(f"Skipping {len(skipped_regions)} out of {len(regional_xqtl_files)} regions, no overlapping xQTL weights found. ")
[twas]
depends: sos_variable("filtered_regional_xqtl_files")
parameter: coverage = "cs_coverage_0.95"
# Threashold for rsq and pvalue for imputability determination for a model 
parameter: rsq_cutoff = 0.01
parameter: rsq_pval_cutoff = 0.05
parameter: mr_pval_cutoff = 0.05
parameter: save_ctwas_data = True
parameter: save_mr_result = True
parameter: rsq_option="rsq"
parameter: rsq_pval_option=["adj_rsq_pval", "pval"]
# load by batches if memory resource is limited, default to load all at once 
parameter: batch_load_memory = 'inf'

input: filtered_regional_xqtl_files, group_by = lambda x: group_by_region(x, filtered_regional_xqtl_files), group_with = "filtered_region_info"
output_files = [f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.twas.tsv.gz']
if save_ctwas_data:
    output_files.append(f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.twas_data.rds')
if save_mr_result:
    output_files.append(f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.mr_result.tsv.gz')
output: output_files
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output[0]:bn}'
R: expand = '${ }', stdout = f"{_output[0]:n}.stdout", stderr = f"{_output[0]:n}.stderr", container = container, entrypoint = entrypoint

    library(dplyr)
    library(data.table)
    library(pecotmr)
    library(readr)

    # get xQTL weight information 
    xqtl_meta_df <- fread("${xqtl_meta_data}", data.table=FALSE) # Get related gene information from the xqtl_meta data table
    xqtl_type_table <- if (isTRUE(file.exists("${xqtl_type_table}"))) fread("${xqtl_type_table}") else NULL
    gene_list <- c(${', '.join([f"'{gene}'" for gene in _filtered_region_info[4]])})

    # Process weights 
    twas_weights_results <- list()
    weight_db_list <- c(${_input:r,})
    names(weight_db_list) <- gene_list
    weight_db_list <- split(weight_db_list, names(weight_db_list), c)
    # remove weight db files that only contain messages: "No SNPs found in the specified region ..."
    weight_db_list_update <- Filter(Negate(is.null), lapply(weight_db_list, function(file_list){do.call(c,lapply(file_list, function(file) if (file.size(file) > 200) file))}))
    if(length(weight_db_list_update) <= 0) stop(paste0("No weight information available from ", paste0(weight_db_list, collapse=","), ". "))
    
    # Step 1: Load TWAS weights and generate twas weight db, output export_twas_weights_db 
    for (gene_db in names(weight_db_list_update)) {
        weight_dbs <- weight_db_list_update[[gene_db]]
        twas_weights_results[[gene_db]] = load_twas_weights(weight_dbs, variable_name_obj="variant_names", 
                                                           susie_obj="susie_weights_intermediate",
                                                           twas_weights_table = "twas_weights")
        if (length(twas_weights_results[[gene_db]]) > 1) {
          twas_weights_results[[gene_db]]$data_type <- setNames(lapply(names(twas_weights_results[[gene_db]]$weights), function(context) xqtl_type_table$type[sapply(xqtl_type_table$context,
                                                                function(x) grepl(x, context))]), names(twas_weights_results[[gene_db]]$weights))      
        } else {
          twas_weights_results[[gene_db]] <- NULL
        }                   
    }
    rm(weight_db_list, weight_db_list_update, xqtl_type_table, gene_list)
    gc()
    # batch load twas weights to reduce memory consumption 
    twas_weights_results <- batch_load_twas_weights(
      twas_weights_results = twas_weights_results,
      meta_data_df = xqtl_meta_df,
      max_memory_per_batch = ${"Inf" if batch_load_memory == "inf" else batch_load_memory}
    )
    # Step 2: twas analysis for imputable genes across contexts
    twas_results_db <- list()
    for (batch in 1:length(twas_weights_results)){
        twas_results_db[[batch]] <- twas_pipeline(twas_weights_results[[batch]], 
                                     "${ld_meta_data}", 
                                     "${gwas_meta_data}",  
                                     region_block="${_filtered_region_info[3]}",
                                     rsq_cutoff = ${rsq_cutoff}, 
                                     rsq_option = "${rsq_option}", 
                                     rsq_pval_cutoff = ${rsq_pval_cutoff}, 
                                     rsq_pval_option=c(${", ".join([f'"{x}"' for x in rsq_pval_option])}), 
                                     mr_pval_cutoff = ${mr_pval_cutoff},
                                     mr_coverage_column = "${coverage}", 
                                     output_twas_data = ${"TRUE" if save_ctwas_data else "FALSE"})     
    }
    rm(twas_weights_results)
    gc()
    if(!is.null(twas_results_db)){
        for (batch in 1:length(twas_results_db)){
            twas_results_db[[batch]]$twas_result <- merge(twas_results_db[[batch]]$twas_result, xqtl_meta_df[, c("region_id","TSS","start","end")], 
                                                          by.x="molecular_id", by.y="region_id")
        }
        fwrite(do.call(rbind, lapply(twas_results_db, function(x) x$twas_result[, c(2,1,14:16,3:13)])), ${_output[0]:r}, sep = "\t", compress = "gzip")
    } else {
        fwrite(data.frame(), file = ${_output[0]:r}, sep = "\t", compress = "gzip")
    }
    # Step 3: reformat for follow up cTWAS analysis
    if (${"TRUE" if save_ctwas_data else "FALSE"}) {
        if (length(twas_results_db) > 1){
            twas_results_db[[1]]$twas_data$weights <- do.call(c, lapply(twas_results_db, function(x) x$twas_data$weights))
            twas_results_db[[1]]$twas_data$susie_weights_intermediate_qced <- do.call(c, lapply(twas_results_db, function(x) x$twas_data$susie_weights_intermediate_qced))
            twas_results_db[[1]]$twas_data$snp_info <- do.call(c, lapply(twas_results_db, function(x) x$twas_data$snp_info))
            studies <- unique(names(find_data(twas_results_db, c(3, "z_gene"))))
            for (study in studies){
                twas_results_db[[1]][["twas_data"]][["z_gene"]][[study]] <- do.call(rbind, lapply(twas_results_db, function(x) x$twas_data$z_gene[[study]]))
                twas_results_db[[1]][["twas_data"]][["z_snp"]][[study]] <- do.call(rbind, lapply(twas_results_db, function(x) x$twas_data$z_snp[[study]]))
            }
        }
        saveRDS(twas_results_db[[1]]$twas_data, "${_output[0]:nnn}.twas_data.rds", compress='xz')
    }
    if (${"TRUE" if save_mr_result else "FALSE"}) {
        if(!is.null(twas_results_db)) {
           fwrite(do.call(rbind, lapply(twas_results_db, function(x) x$mr_result)), file = "${_output[0]:nnn}.mr_result.tsv.gz", sep = "\t", compress = "gzip")
        } else {
            fwrite(data.frame(), file = "${_output[0]:nnn}.mr_result.tsv.gz", sep = "\t", compress = "gzip")
        }
    }
[ctwas_1]
depends: sos_variable("filtered_region_info")
# twas weight cutoff for ctwas variant selection
parameter: twas_weight_cutoff = 0
# cs_min_cor cutoff in susie finemapping result for variant selection
parameter: cs_min_cor = 0
# minimum pip cutoff in susie finemapping result for variant selection
parameter: min_pip_cutoff =0 
# A scalar in (0,1]. The proportion of SNPs, reduce runtime at the cost of accuracy 
parameter: thin = 0.1
# Maximum number of SNPs in a region.
parameter: maxSNP = 20000
# A list of regions to be subset for screening and fine-mapping, for example: "10_80126158_82231647"
parameter: region_name = []
# skip this step if True
parameter: skip_assembly = False
skip_if(skip_assembly == True, " Skip [ctwas_1] assemble regions. " )

if region_name:
    filtered_region_info = [region_info for region_info in filtered_region_info if region_info[3] in region_name]

twas_output_files = {}
region_blocks_per_chrom = []
chromosome_list = []

for region_info in filtered_region_info:
    chrom = region_info[0]
    file_path = f"{cwd}/twas/{name}.{region_info[3]}.twas_data.rds"
    if chrom not in twas_output_files:
        twas_output_files[chrom] = [[], []]  # Structure: {"chrX": [[files], [region_names]]}
        chromosome_list.append(chrom)
    twas_output_files[chrom][0].append(file_path)  # File paths
    twas_output_files[chrom][1].append(region_info[3])  # Region names
twas_files=[twas_output_files[chr][0] for chr in chromosome_list]
region_blocks_per_chrom = [twas_output_files[chr][1] for chr in chromosome_list]
region_blocks_per_chrom = [f"""c("{'", "'.join(regions)}")""" for regions in region_blocks_per_chrom]
if not os.path.exists(step_name.split('_')[0]):
    os.makedirs(step_name.split('_')[0])
input: twas_files, group_by = lambda x: group_by_region(x, twas_files),  group_with=dict(region_names=region_blocks_per_chrom)
chrom =region_names.split('"')[1].split("_")[0]
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads
R: expand = '${ }', stdout = f"{cwd:a}/{step_name.split('_')[0]}/ctwas_assemb_{chrom}_{name}.stdout", 
stderr = f"{cwd:a}/{step_name.split('_')[0]}/ctwas_assemb_{chrom}_{name}.stderr", container = container, entrypoint = entrypoint
    
    library(data.table)
    library(ctwas) # multigroup_ctwas # documentation for ctwas multigroup version 
    library(pecotmr)

    outputdir = "${cwd:a}/ctwas/"
    
    # load genome-wide data across all regions
    gwas_studies <- unique(fread("${gwas_meta_data}",data.table=FALSE, select = "study_id"))[,1]

    merge_context_names <- function(ctwas_dat_file){
        ctwas_dat<- readRDS(ctwas_dat_file)
        idxs <- which(grepl("_chr[0-9XY]+_[^_]+", names(ctwas_dat$weights)))
        if (length(idxs)>0) {
            names(ctwas_dat$weights) <- gsub("_chr[0-9XY]+_[A-Za-z0-9]+", "", names(ctwas_dat$weights))
            for (idx in idxs) {
                for (study in names(ctwas_dat$weights[[idx]])){
                    ctwas_dat$weights[[idx]][[study]]$weight_name <- gsub("_chr[0-9XY]+_[A-Za-z0-9]+", "", ctwas_dat$weights[[idx]][[study]]$weight_name)
                    ctwas_dat$weights[[idx]][[study]]$context <- gsub("_chr[0-9XY]+_[A-Za-z0-9]+", "", ctwas_dat$weights[[idx]][[study]]$context)
                }
            }
            for (study in names(ctwas_dat$z_gene)){
                ctwas_dat$z_gene[[study]]$id <- gsub("_chr[0-9XY]+_[A-Za-z0-9]+", "", ctwas_dat$z_gene[[study]]$id)
                ctwas_dat$z_gene[[study]]$context <- gsub("_chr[0-9XY]+_[A-Za-z0-9]+", "", ctwas_dat$z_gene[[study]]$context)
                ctwas_dat$z_gene[[study]]$group <- gsub("_chr[0-9XY]+_[A-Za-z0-9]+", "", ctwas_dat$z_gene[[study]]$group)
            }
            for (gene in names(ctwas_dat$susie_weights_intermediate_qced)){
                names(ctwas_dat$susie_weights_intermediate_qced[[gene]]) <-  gsub("_chr[0-9XY]+_[^_]+", "", names(ctwas_dat$susie_weights_intermediate_qced[[gene]]))
            }
        }
        idxs <- which(grepl("monocyte", names(ctwas_dat$weights)))
        if (length(idxs)>0) {
            names(ctwas_dat$weights) <- gsub("monocyte_eQTL", "eQTL", names(ctwas_dat$weights))
            for (idx in idxs) {
                for (study in names(ctwas_dat$weights[[idx]])){
                    ctwas_dat$weights[[idx]][[study]]$weight_name <- gsub("monocyte_eQTL", "eQTL", ctwas_dat$weights[[idx]][[study]]$weight_name)
                    ctwas_dat$weights[[idx]][[study]]$type <- "eQTL"
                }
            }
            for (study in names(ctwas_dat$z_gene)){
                ctwas_dat$z_gene[[study]]$id <- gsub("monocyte_eQTL", "eQTL", ctwas_dat$z_gene[[study]]$id)
                ctwas_dat$z_gene[[study]]$type <- gsub("monocyte_", "", ctwas_dat$z_gene[[study]]$type)
                ctwas_dat$z_gene[[study]]$group <- gsub("monocyte_eQTL", "eQTL", ctwas_dat$z_gene[[study]]$group)
            }
        }
        return(ctwas_dat)
    }
    
    data <- lapply(c(${_input:r,}), merge_context_names)
    names(data) <- ${region_names}
    data <- Filter(Negate(is.null), data)
    weights <- do.call(c, lapply(names(data), function(region_block){
        trim_ctwas_variants(data[[region_block]], twas_weight_cutoff=${twas_weight_cutoff}, cs_min_cor=${cs_min_cor},
                            min_pip_cutoff=${min_pip_cutoff}, max_num_variants=Inf)
    }))
    weight_list <- setNames(lapply(unique(names(weights)), function(study) {
        do.call(c, unname(weights[names(weights) %in% study]))
    }),unique(names(weights)))
    rm(weights)
    
    # compile z_snp, z_gene, and snp_info across multiple regions
    snp_info <- do.call(c, lapply(unname(data), function(x) x$snp_info))
    snp_map <- snp_info[!duplicated(names(snp_info))]
    z_gene_dat <- do.call(c, lapply(unname(data), function(x)x$z_gene))
    z_snp_dat <-  do.call(c, lapply(unname(data), function(x)x$z_snp))
    z_gene <- setNames(lapply(gwas_studies, function(study)  do.call(rbind, z_gene_dat[names(z_gene_dat) == study])), gwas_studies)
    z_gene <- Filter(Negate(is.null), z_gene)
    z_gene <- lapply(z_gene, function(x) x[!is.na(x$z) & !is.infinite(x$z) & x$id %in% names(weight_list[[na.omit(unique(x$gwas_study))]]),])
    z_snp <- setNames(lapply(gwas_studies, function(study) {
        df <- do.call(rbind, z_snp_dat[names(z_snp_dat) == study])
        df[!duplicated(df$id) &  !is.infinite(df$z) & !is.na(df$z), ] 
    }), gwas_studies)
    rm(data, z_gene_dat, z_snp_dat) 
    
    regions_data <- get_ctwas_meta_data("${ld_meta_data}", names(snp_map))
    region_info <- regions_data$region_info
    LD_map <- regions_data$LD_info
    saveRDS(weight_list, file.path(outputdir, "${name}_${chrom}_ctwas_weights.rds"), compress='xz')
    saveRDS(LD_map, file.path(outputdir, "${name}_${chrom}_LD_map.rds"), compress='xz')
    saveRDS(list(z_snp=z_snp, snp_map=snp_map), file.path(outputdir, "${name}_${chrom}_z_snp_map.rds"), compress='xz')
    rm(regions_data, snp_info, LD_map)
                      
    for (study in names(weight_list)){
      outname = paste0("${name}", "_", "${chrom}","_", study)
      if ( is.null(z_snp[[study]]) || is.null(z_gene[[study]])) next
      # assemble regions
      region_data_file <- file.path(outputdir, paste0(outname, ".ctwas_region_data.thin", ${thin}, ".rds"))
      boundary_gene_file <- file.path(outputdir, paste0(outname, ".ctwas_boundary_genes.thin", ${thin}, ".rds"))

      res <- assemble_region_data(region_info,
                                z_snp[[study]],
                                z_gene[[study]],
                                weight_list[[study]],
                                snp_map,
                                thin = ${thin},
                                maxSNP = ${maxSNP},
                                min_group_size = 1,
                                ncore = ${numThreads})
      region_data <- res$region_data
      boundary_genes <- res$boundary_genes
      saveRDS(region_data, region_data_file, compress='xz')
      saveRDS(boundary_genes, boundary_gene_file, compress='xz')
    }
[ctwas_2]
parameter: run_param_est = False
parameter: skip_assembly = False
parameter: thin=0.1
parameter: prior_var_structure = "shared_all"
thin=int(thin) if thin == 1.0 else thin
skip_if(run_param_est == False, " Skip [ctwas_2] parameter estimation. " )

input: f"{cwd:a}/{step_name.split('_')[0]}/{name}*.ctwas_region_data.thin{thin}.rds", group_by="all"
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads
R: expand = '${ }', stdout = f"{cwd:a}/{step_name.split('_')[0]}/{name}_{prior_var_structure}_param_est_thin{thin}.stdout", 
stderr = f"{cwd:a}/{step_name.split('_')[0]}/{name}_{prior_var_structure}_param_est_thin{thin}.stderr", container = container, entrypoint = entrypoint
    
    library(ctwas)
    outputdir = "${cwd:a}/ctwas/"

    region_data_files <- c(${_input:r,})
    names(region_data_files) <- gsub('^.*${name}_\\s*|\\s*.ctwas_region_data.thin.*$', '', region_data_files)
    names(region_data_files) <- gsub('^chr[0-9]+_', '', names(region_data_files))
    region_data <- setNames(lapply(region_data_files, readRDS), names(region_data_files))
    region_data <- lapply(split(region_data, names(region_data)), function(x) do.call(c, unname(x)))
    saveRDS(region_data, paste0(outputdir, "${name}_region_data_thin${thin}.rds"), compress='xz')

    # assess global parameters
    for (study in names(region_data)){
        group_prior_file <- file.path(outputdir, paste0("${name}_ctwas_param_${prior_var_structure}_", study, "_thin${thin}.rds"))

        message("Start global parameter estimation for ", study, ". ")
        param <- est_param(region_data[[study]],
                  init_group_prior = NULL,
                  init_group_prior_var = NULL,
                  group_prior_var_structure = "${prior_var_structure}", 
                  niter_prefit = 3,
                  niter = 30,
                  min_var = 2,
                  min_gene = 1,
                  min_group_size = 1,
                  min_p_single_effect = 0.8,
                  ncore = ${numThreads},
                  verbose = TRUE)
        saveRDS(param, group_prior_file, compress='xz')
        region_data[[study]] <- NULL
    }
[ctwas_3]
parameter: thin=0.1
parameter: maxSNP = 20000
parameter: max_iter = 0
parameter: run_finemapping = False
parameter: prior_var_structure = "shared_all"
parameter: region_name= []
skip_if(run_finemapping == False, " Skip [ctwas_3] fine-mapping. " )
thin=int(thin) if thin == 1.0 else thin
region_data_file = f"{cwd:a}/{step_name.split('_')[0]}/{name}_region_data_thin{thin}.rds"
region_name = [f"{region}" for region in region_name]

depends: region_data_file, f"{cwd:a}/{step_name.split('_')[0]}/{name}_ctwas_param_{prior_var_structure}_*_thin{thin}.rds" 

input: f"{cwd:a}/{step_name.split('_')[0]}/{name}_ctwas_param_{prior_var_structure}_*_thin{thin}.rds", for_each='region_name'
chrom=_region_name.split("_")[0]
weight_file=f"{cwd}/{step_name.split('_')[0]}/{name}_{chrom}_ctwas_weights.rds"
LD_map_file=f"{cwd}/{step_name.split('_')[0]}/{name}_{chrom}_LD_map.rds"
z_snp_file=f"{cwd}/{step_name.split('_')[0]}/{name}_{chrom}_z_snp_map.rds"
fail_if(not os.path.exists(weight_file) or not os.path.exists(LD_map_file) or not os.path.exists(z_snp_file), 
        f'{weight_file} or {LD_map_file} or {z_snp_file} is not present')
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads
R: expand = '${ }', stdout = f"{cwd:a}/{step_name.split('_')[0]}/{name}_{prior_var_structure}_{_region_name}.ctwas_finemap_res.thin{thin}.stdout", 
stderr = f"{cwd:a}/{step_name.split('_')[0]}/{name}_{prior_var_structure}_{_region_name}.ctwas_finemap_res.thin{thin}.stderr", container = container, entrypoint = entrypoint

    library(ctwas)
    library(pecotmr)
    outputdir = "${cwd:a}/ctwas/"

    param_study_files <- c(${_input:r,})
    names(param_study_files) <- gsub('^.*_ctwas_param_${prior_var_structure}_\\s*|\\s*_thin${thin}.*$', '', param_study_files)
    region_data <- readRDS("${region_data_file}")
    weights <- readRDS("${weight_file}")
    param <- setNames(lapply(unname(param_study_files), readRDS), names(param_study_files))
    LD_map <- readRDS("${LD_map_file}")
    z_snp_list <- readRDS("${z_snp_file}")
    z_snp <- z_snp_list[["z_snp"]]
    snp_map <- z_snp_list[["snp_map"]]
    rm(z_snp_list)

    for (study in names(param_study_files)){
        group_prior <- param[[study]]$group_prior
        group_prior_var <- param[[study]]$group_prior_var 
        finemap_res_file <- file.path(outputdir, paste0("${name}_ctwas_finemap_res_${prior_var_structure}_${_region_name}_", study, "_thin${thin}.rds"))
        susie_alpha_file <- file.path(outputdir, paste0("${name}_ctwas_susie_alpha_res${prior_var_structure}_${_region_name}_", study, "_thin${thin}.rds"))
        if (nrow(region_data[[study]][[gsub("chr", "", "${_region_name}")]]$z_gene)==0) {
            message("No z_gene data available for ", study, " in ${_region_name}. ")
            saveRDS(data.frame(), finemap_res_file, compress='xz')
            next
        }
        screen_res <- screen_regions_noLD(region_data[[study]][gsub("chr", "", "${_region_name}")],
                                       group_prior = group_prior,group_prior_var = group_prior_var, min_var = 2,min_gene = 1,min_nonSNP_PIP = 0.5,
                                       ncore = ${numThreads},verbose = FALSE, logfile = file.path(outputdir, 
                                       paste0("${name}_screen_regions_${prior_var_structure}_${_region_name}_thin${thin}.log")))
        screened_region_data <- screen_res$screened_region_data
        if (length(screened_region_data)==0) {
            message("No region selected for ", study, " in ${_region_name}. ")
            saveRDS(data.frame(), finemap_res_file, compress='xz')
            next
        }
        screen_summary <- screen_res$screen_summary
        screened_region_data <- expand_region_data(screened_region_data, snp_map, z_snp[[study]],maxSNP = ${maxSNP}, ncore = ${numThreads})
        message("Screening region completed for ", study, ". ")

        if (as.logical(${max_iter})){
            finemap_res_file <- gsub(study, paste0(study,"_max_iter${max_iter}"),finemap_res_file)
            susie_alpha_file <- gsub(study, paste0(study,"_max_iter${max_iter}"),susie_alpha_file)
            res <- finemap_regions(screened_region_data, LD_map = LD_map, weights = weights[[study]], group_prior = group_prior, max_iter=${max_iter},
                                  group_prior_var = group_prior_var, L = 5, ncore = ${numThreads}, save_cor = FALSE, LD_format = "custom", 
                                  LD_loader_fun = ctwas_ld_loader, snpinfo_loader_fun = ctwas_bimfile_loader, verbose = TRUE, logfile = file.path(outputdir, 
                                  paste0("${name}_finemap_regions_${prior_var_structure}_${_region_name}_", study, "_thin${thin}.log")))
        } else {
            res <- finemap_regions(screened_region_data, LD_map = LD_map, weights = weights[[study]], group_prior = group_prior, 
                                  group_prior_var = group_prior_var, L = 5, ncore = ${numThreads}, save_cor = FALSE, LD_format = "custom", 
                                  LD_loader_fun = ctwas_ld_loader, snpinfo_loader_fun = ctwas_bimfile_loader, verbose = TRUE, logfile = file.path(outputdir, 
                                  paste0("${name}_finemap_regions_${prior_var_structure}_${_region_name}_", study, "_thin${thin}.log")))
        }
        weights[[study]] <- NULL
        finemap_res <- res$finemap_res
        finemap_res$pval <- z2p(finemap_res$z)
        susie_alpha_res <- res$susie_alpha_res
        saveRDS(finemap_res, finemap_res_file, compress='xz')
        saveRDS(susie_alpha_res, susie_alpha_file, compress='xz')
        message("Fine-mapping completed for ", study, ". ")
    }
[quantile_twas]
depends: sos_variable("filtered_regional_xqtl_files")
parameter: save_ctwas_data = True
parameter: save_mr_result = False
input: filtered_regional_xqtl_files, group_by = lambda x: group_by_region(x, filtered_regional_xqtl_files), group_with = "filtered_region_info"
output_files = [f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.quantile_twas.tsv.gz']
if save_ctwas_data:
    output_files.append(f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.quantile_twas_data.rds')
if save_mr_result:
    output_files.append(f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.quantile_mr_result.tsv.gz')
output: output_files
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output[0]:bn}'
R: expand = '${ }', stdout = f"{_output[0]:n}.stdout", stderr = f"{_output[0]:n}.stderr", container = container, entrypoint = entrypoint

    library(dplyr)
    library(data.table)
    library(pecotmr)
    library(readr)

    # get xQTL weight information
    xqtl_meta_df <- fread("${xqtl_meta_data}") # Get related gene information from the xqtl_meta data table
    xqtl_type_table <- if (isTRUE(file.exists("${xqtl_type_table}"))) fread("${xqtl_type_table}") else NULL
    gwas_studies = c(${paths(regional_data["GWAS"].keys()):r,})
    gwas_files = c(${paths([v[_filtered_region_info[0]] for k, v in regional_data["GWAS"].items()]):r,})
    gene_list <- c(${', '.join([f"'{gene}'" for gene in _filtered_region_info[4]])})

    # Initialize export_twas_weights_db
    export_twas_weights_db <- list()
    export_twas_weights_db[["${_filtered_region_info[3]}"]] <- list()

    # Initialize twas_weights_results
    twas_weights_results <- list()

    # Create initial weight_db_list
    weight_db_list <- c(${_input:r,})
    names(weight_db_list) <- gene_list

    # Split weight_db_list
    weight_db_list <- split(weight_db_list, names(weight_db_list))

    # Update weight_db_list
    weight_db_list_update <- lapply(weight_db_list, function(file_list){
        do.call(c, lapply(file_list, function(file) if (file.size(file) > 200) file))
    })

    # Check if weight_db_list_update is empty
    if(length(weight_db_list_update) <= 0) {
        stop(paste0("No weight information available from ", paste0(weight_db_list, collapse=","), ". "))
    }

    # Define tau_values
    tau_values <- seq(0.01, 0.99, 0.01)

    # Main processing loop
    for (gene_db in names(weight_db_list_update)) {
        weight_dbs <- weight_db_list_update[[gene_db]]
        
        twas_weights_results[[gene_db]] <- load_quantile_twas_weights(
            weight_db_files = weight_dbs,
            tau_values = tau_values,
            between_cluster = 0.8,
            num_intervals = 3
        )
        
        if (!is.null(twas_weights_results[[gene_db]]) && !is.null(twas_weights_results[[gene_db]]$weights)) {
            twas_weights_results[[gene_db]]$data_type <- setNames(
                lapply(names(twas_weights_results[[gene_db]]$weights), function(context) {
                    xqtl_type_table$type[sapply(xqtl_type_table$context, function(x) grepl(x, context))]
                }),
                names(twas_weights_results[[gene_db]]$weights)
            )
        } else {
            print(paste("Warning: No valid weights found for gene:", gene_db))
        }
    }

    #Step 2: twas analysis for imputable genes across contexts
    twas_results_db <- twas_pipeline(twas_weights_data = twas_weights_results, 
                                    ld_meta_file_path = "${ld_meta_data}", 
                                    gwas_meta_file = "${gwas_meta_data}", 
                                    region_block = "${_filtered_region_info[3]}", 
                                    quantile_twas = TRUE,  
                                    output_twas_data = ${"TRUE" if save_ctwas_data else "FALSE"} )

    # Merging with xQTL meta-data      
    message("Merging twas_result with xqtl_meta_df...")    
    twas_results_db$twas_result <- merge(twas_results_db$twas_result, xqtl_meta_df[, c("region_id","TSS","start","end")], by.x="molecular_id", by.y="region_id")
    twas_results_db$twas_result <- unique(twas_results_db$twas_result)
    fwrite(twas_results_db$twas_result[, c(2, 1, (ncol(twas_results_db$twas_result)-2):ncol(twas_results_db$twas_result), 3:(ncol(twas_results_db$twas_result)-3))], file = ${_output[0]:r}, sep = "\t", compress = "gzip")

    # Step 3: reformat for follow up cTWAS analysis
    if (${"TRUE" if save_ctwas_data else "FALSE"}) {
        saveRDS(twas_results_db$twas_data, "${_output[0]:nnn}.quantile_twas_data.rds", compress='xz')
    }
    if (${"TRUE" if save_mr_result else "FALSE"}) {
        fwrite(twas_results_db$mr_result, file = "${_output[0]:nnn}.quantile_mr_result.tsv.gz", sep = "\t", compress = "gzip")
    }
    message("quantile twas analysis is completed in this block.")