Quantile regression for QTL association testing

Quantile regression for QTL association testing#

Description#

This module performs quantile QTL (QR) association testing and computes quantile TWAS weights. For each genomic region it fits quantile regression across quantile levels (0.05, 0.1, …, 0.95) and integrates the quantile-level p-values into a single QR p-value using the Cauchy combination method, then derives TWAS weights for use in downstream gene-trait association.

The analysis is region-based: regions can be selected by name (--region-name), by a region list (--region-list), or, if neither is given, the union of all regions present in the phenotype region lists is analyzed.

Timing: ~2-5 min on typical compute infrastructure.

Step 1: Quantile QTL association and TWAS weight#

For each region the workflow fits quantile regression of the molecular phenotype on genotype across the quantile grid, combines the per-quantile p-values into a single QR p-value via the Cauchy combination method, and computes quantile TWAS weights. Covariates (genotype PCs, hidden factors, fixed covariates) are regressed out, and cis/trans windows are taken from a customized association-window file when provided, otherwise a fixed cis-window around each region is used.

Inputs

Option

Description

--genoFile

Either a list of per-chromosome genotype files, or one PLINK .bed file for the whole genome.

--phenoFile

One or more lists of phenotype files per region (bed.gz with .bed.gz.tbi index), output of the phenotype_per_region/annotate_coord preprocessing steps.

--covFile

One or more covariate files matched to the phenotype lists (genotype PCs, hidden factors, fixed covariates).

--customized-association-windows

Optional 4-column file (chr, start, end, region ID) defining cis/trans windows. If omitted, a fixed cis-window around each region is used.

--region-name / --region-list

Optional ways to restrict the analysis to selected regions. Region IDs must match the 4th column of the phenotype list.

--phenotype-names

Names for the phenotypic conditions (cond1 cond2 ...).

Example genotype list (--genoFile):

#chr	path
chr21	protocol_example.genotype.chr21.bed
chr22	protocol_example.genotype.chr22.bed

Alternatively use protocol_example.genotype.chr21_22.bed if all chromosomes are in one file.

Example association-window file (strictly 4 columns, header commented out):

#chr	start	end	gene_id
chr10	0	6480000	ENSG00000008128
chr1	0	6480000	ENSG00000008130
chr1	0	6480000	ENSG00000067606

Output

The pipeline writes results to the --cwd directory, named after the workflow step. Each region yields a quantile-QTL association result table (per-quantile and integrated Cauchy QR p-values) and the corresponding quantile TWAS weight object used downstream.

sos run pipeline/qr_and_twas.ipynb quantile_qtl_twas_weight \
    --name protocol_example_protein \
    --genoFile output/plink/protocol_example.genotype.chr22.bed \
    --phenoFile output/phenotype_protein/protocol_example_protein.phenotype_by_chrom_files.region_list.txt \
    --covFile output/covariate_protein/protocol_example_protein.chr22.protocol_example.covariates.protocol_example.genotype.merged.plink_qc.plink_qc.prune.pca.Marchenko_PC.gz \
    --customized-association-windows input/reference_data/TAD/protocol_example_protein.enhanced_cis_chr22.bed \
    --region-name ENSG00000241973_P42356 \
    --cwd output/quantile_twas \
    --phenotype-names protein 
    
sos run pipeline/qr_and_twas.ipynb quantile_qtl_twas_weight \
    --name protocol_example_protein \
    --genoFile output/plink/protocol_example.genotype.chr22.bed \
    --phenoFile output/phenotype_protein/protocol_example_protein.phenotype_by_chrom_files.region_list.txt \
    --covFile output/covariate_protein/protocol_example_protein.chr22.protocol_example.covariates.protocol_example.genotype.merged.plink_qc.plink_qc.prune.pca.Marchenko_PC.gz \
    --customized-association-windows input/reference_data/TAD/protocol_example_protein.enhanced_cis_chr22.bed \
    --region-list input/reference_data/TAD/protocol_example_protein.enhanced_cis_chr22.bed \
    --cwd output/quantile_twas \
    --phenotype-names protein 
sos run pipeline/qr_and_twas.ipynb -h

Workflow implementation#

The cells below define the SoS workflow steps used by the commands above. They are not meant to be edited for routine analysis.

[global]
# It is required to input the name of the analysis
parameter: name = str
parameter: cwd = path("output")
# A list of file paths for genotype data, or the genotype data itself. 
parameter: genoFile = path
# One or multiple lists of file paths for phenotype data.
parameter: phenoFile = paths
# One or multiple lists of file paths for phenotype ID mapping file. The first column should be the original ID, the 2nd column should be the ID to be mapped to.
parameter: phenoIDFile = paths()
# Covariate file path
parameter: covFile = paths
# Optional: if a region list is provide the analysis will be focused on provided region. 
# The LAST column of this list will contain the ID of regions to focus on
# Otherwise, all regions with both genotype and phenotype files will be analyzed
parameter: region_list = path()
# Optional: if a region name is provided 
# the analysis would be focused on the union of provides region list and region names
parameter: region_name = []
# Only focus on a subset of samples
parameter: keep_samples = path()
# An optional list documenting the custom association window for each region to analyze, with four column, chr, start, end, region ID (eg gene ID).
# If this list is not provided, the default `window` parameter (see below) will be used.
parameter: customized_association_windows = path()
# Specify the cis window for the up and downstream radius to analyze around the region of interest in units of bp
# When this is set to negative, we will rely on using customized_association_windows
parameter: cis_window = -1
# save data object or not
parameter: save_data = False
# Name of phenotypes
parameter: phenotype_names = [f'{x:bn}' for x in phenoFile]
parameter: seed = 999
# remove a variant if it has more than imiss missing individual level data
parameter: imiss = 1.0
# MAF cutoff
parameter: maf = 0.0025
# MAC cutoff, on top of MAF cutoff
parameter: mac = 5
# Remove indels if indel = False
parameter: indel = True
parameter: min_twas_maf = 0.01
parameter: screen_threshold = 0.01
parameter: screen_method = "qvalue"
parameter: screen_significant = True
parameter: pre_filter_by_pqr = False
parameter: initial_corr_filter_cutoff = 0.8
parameter: full_rank_corr_filter_cutoff = "seq(0.75, 0.5, by = -0.05)"
parameter: ld_reference_meta_file = path()
# Only focus on a subset of variants
parameter: keep_variants = path()
parameter: marginal_beta_calculate = True
parameter: twas_weight_calculate = True
parameter: qrank_screen_calculate = True
parameter: vqtl_calculate = True
# Analysis environment settings
parameter: container = ""
# For cluster jobs, number commands to run per job
parameter: job_size = 200
# Wall clock time expected
parameter: walltime = "1h"
# Memory expected
parameter: mem = "20G"
# Number of threads
parameter: numThreads = 1

if len(phenoFile) != len(covFile):
    raise ValueError("Number of input phenotypes files must match that of covariates files")
if len(phenoFile) != len(phenotype_names):
    raise ValueError("Number of input phenotypes files must match the number of phenotype names")
if len(phenoIDFile) > 0 and len(phenoFile) != len(phenoIDFile):
    raise ValueError("Number of input phenotypes files must match the number of phenotype ID mapping files")

def group_by_region(lst, partition):
    # from itertools import accumulate
    # partition = [len(x) for x in partition]
    # Compute the cumulative sums once
    # cumsum_vector = list(accumulate(partition))
    # Use slicing based on the cumulative sums
    # return [lst[(cumsum_vector[i-1] if i > 0 else 0):cumsum_vector[i]] for i in range(len(partition))]
    return partition

import os
import pandas as pd

def adapt_file_path(file_path, reference_file):
    """
    Adapt a single file path based on its existence and a reference file's path.

    Args:
    - file_path (str): The file path to adapt.
    - reference_file (str): File path to use as a reference for adaptation.

    Returns:
    - str: Adapted file path.

    Raises:
    - FileNotFoundError: If no valid file path is found.
    """
    reference_path = os.path.dirname(reference_file)

    # Check if the file exists
    if os.path.isfile(file_path):
        return file_path

    # Check file name without path
    file_name = os.path.basename(file_path)
    if os.path.isfile(file_name):
        return file_name

    # Check file name in reference file's directory
    file_in_ref_dir = os.path.join(reference_path, file_name)
    if os.path.isfile(file_in_ref_dir):
        return file_in_ref_dir

    # Check original file path prefixed with reference file's directory
    file_prefixed = os.path.join(reference_path, file_path)
    if os.path.isfile(file_prefixed):
        return file_prefixed

    # If all checks fail, raise an error
    raise FileNotFoundError(f"No valid path found for file: {file_path}")

def adapt_file_path_all(df, column_name, reference_file):
    return df[column_name].apply(lambda x: adapt_file_path(x, reference_file))
[get_analysis_regions: shared = "regional_data"]
# input is genoFile, phenoFile, covFile and optionally region_list. If region_list presents then we only analyze what's contained in the list.
# regional_data should be a dictionary like:
#{'data': [("genotype_1.bed", "phenotype_1.bed.gz", "covariate_1.gz"), ("genotype_2.bed", "phenotype_1.bed.gz", "phenotype_2.bed.gz", "covariate_1.gz", "covariate_2.gz") ... ],
# 'meta_info': [("chr12:752578-752579","chr12:752577-752580", "gene_1", "trait_1"), ("chr13:852580-852581","chr13:852579-852580", "gene_2", "trait_1", "trait_2") ... ]}
import numpy as np

def preload_id_map(id_map_files):
    id_maps = {}
    for id_map_file in id_map_files:
        if id_map_file is not None and os.path.isfile(id_map_file):
            df = pd.read_csv(id_map_file, sep=r'\s+', header=None, comment='#', names=['old_ID', 'new_ID'])
            id_maps[id_map_file] = df.set_index('old_ID')['new_ID'].to_dict()
    return id_maps

def load_and_apply_id_map(pheno_path, id_map_path, preloaded_id_maps):
    pheno_df = pd.read_csv(pheno_path, sep=r"\s+", header=0)
    pheno_df['Original_ID'] = pheno_df['ID']
    if id_map_path in preloaded_id_maps:
        id_map = preloaded_id_maps[id_map_path]
        pheno_df['ID'] = pheno_df['ID'].map(id_map).fillna(pheno_df['ID'])
    return pheno_df

def filter_by_region_ids(data, region_ids):
    if region_ids is not None and len(region_ids) > 0:
        # Check both ID (mapped) and Original_ID (unmapped) to handle phenoIDFile cases
        # where region_name uses the original ID but ID column has been mapped
        if 'Original_ID' in data.columns:
            data = data[data['ID'].isin(region_ids) | data['Original_ID'].isin(region_ids)]
        else:
            data = data[data['ID'].isin(region_ids)]
    return data

def custom_join(series):
    # Initialize an empty list to hold the processed items
    result = []
    for item in series:
        if ',' in item:
            # If the item contains commas, split by comma and convert to tuple
            result.append(tuple(item.split(',')))
        else:
            # If the item does not contain commas, add it directly
            result.append(item)
    # Convert the list of items to a tuple and return
    return tuple(result)

def aggregate_phenotype_data(accumulated_pheno_df):
    if not accumulated_pheno_df.empty:
        accumulated_pheno_df = accumulated_pheno_df.groupby(['#chr','ID','cond','path','cov_path'], as_index=False).agg({
            '#chr': lambda x: np.unique(x).astype(str)[0],
            'ID': lambda x: np.unique(x).astype(str)[0],
            'Original_ID': ','.join,
            'start': 'min',
            'end': 'max'
        }).groupby(['#chr','ID'], as_index=False).agg({
            'cond': ','.join,
            'path': ','.join,
            'Original_ID': custom_join,
            'cov_path': ','.join,
            'start': 'min',
            'end': 'max'
        })
    return accumulated_pheno_df

def process_cis_files(pheno_files, cov_files, phenotype_names, pheno_id_files, region_ids, preloaded_id_maps):
    '''
    Example output:
    #chr    start      end    ID  Original_ID   path     cov_path             cond
    chr12   752578   752579  ENSG00000060237  Q9H4A3,P62873  protocol_example.protein_1.bed.gz,protocol_example.protein_2.bed.gz  covar_1.gz,covar_2.gz  trait_A,trait_B
    '''
    accumulated_pheno_df = pd.DataFrame()
    pheno_id_files = [None] * len(pheno_files) if len(pheno_id_files) == 0 else pheno_id_files
    for pheno_path, cov_path, phenotype_name, id_map_path in zip(pheno_files, cov_files, phenotype_names, pheno_id_files):
        if not os.path.isfile(cov_path):
            raise FileNotFoundError(f"No valid path found for file: {cov_path}")
        pheno_df = load_and_apply_id_map(pheno_path, id_map_path, preloaded_id_maps)
        pheno_df = filter_by_region_ids(pheno_df, region_ids)
        if not pheno_df.empty:
            # Use 'path' column by name to handle both 5-col (#chr,start,end,ID,path)
            # and 6-col (#chr,start,end,ID,strand,path) region_list formats
            pheno_df['path'] = adapt_file_path_all(pheno_df, 'path', f"{pheno_path:a}")
            pheno_df = pheno_df.assign(cov_path=str(cov_path), cond=phenotype_name)           
            accumulated_pheno_df = pd.concat([accumulated_pheno_df, pheno_df], ignore_index=True)

    accumulated_pheno_df = aggregate_phenotype_data(accumulated_pheno_df)
    return accumulated_pheno_df

def process_trans_files(pheno_files, cov_files, phenotype_names, pheno_id_files, region_ids, customized_association_windows):
    '''
    Example output:
    #chr    start      end    ID  Original_ID   path     cov_path             cond
    chr21   0   0  chr21_18133254_19330300  carnitine,benzoate,hippurate  metabolon_1.bed.gz,metabolon_2.bed.gz  covar_1.gz,covar_2.gz  trait_A,trait_B
    '''
    
    if not os.path.isfile(customized_association_windows):
        raise ValueError("Customized association analysis window must be specified for trans analysis.")
    accumulated_pheno_df = pd.DataFrame()
    pheno_id_files = [None] * len(pheno_files) if len(pheno_id_files) == 0 else pheno_id_files
    genotype_windows = pd.read_csv(customized_association_windows, comment="#", header=None, names=["#chr","start","end","ID"], sep="\t")
    genotype_windows = filter_by_region_ids(genotype_windows, region_ids)
    if genotype_windows.empty:
        return accumulated_pheno_df
    
    for pheno_path, cov_path, phenotype_name, id_map_path in zip(pheno_files, cov_files, phenotype_names, pheno_id_files):
        if not os.path.isfile(cov_path):
            raise FileNotFoundError(f"No valid path found for file: {cov_path}")
        pheno_df = pd.read_csv(pheno_path, sep=r"\s+", header=0, names=['Original_ID', 'path'])
        if not pheno_df.empty:
            pheno_df.iloc[:, -1] = adapt_file_path_all(pheno_df, pheno_df.columns[-1], f"{pheno_path:a}")
            pheno_df = pheno_df.assign(cov_path=str(cov_path), cond=phenotype_name)
            # Here we combine genotype_windows which contains "#chr" and "ID" to pheno_df by creating a cartesian product
            pheno_df = pd.merge(genotype_windows.assign(key=1), pheno_df.assign(key=1), on='key').drop('key', axis=1)
            # then set start and end columns to zero
            pheno_df['start'] = 0
            pheno_df['end'] = 0
            if id_map_path is not None:
                # Filter pheno_df by specific association-window and phenotype pairs
                association_analysis_pair = pd.read_csv(id_map_path, sep=r'\s+', header=None, comment='#', names=['ID', 'Original_ID'])
                pheno_df = pd.merge(pheno_df, association_analysis_pair, on=['ID', 'Original_ID'])
            accumulated_pheno_df = pd.concat([accumulated_pheno_df, pheno_df], ignore_index=True)

    accumulated_pheno_df = aggregate_phenotype_data(accumulated_pheno_df)
    return accumulated_pheno_df

# Load genotype meta data
if f"{genoFile:x}" == ".bed":
    geno_meta_data = pd.DataFrame([("chr"+str(x), f"{genoFile:a}") for x in range(1,23)] + [("chrX", f"{genoFile:a}")], columns=['#chr', 'geno_path'])
else:
    geno_meta_data = pd.read_csv(f"{genoFile:a}", sep = r"\s+", header=0)
    geno_meta_data.iloc[:, 1] = adapt_file_path_all(geno_meta_data, geno_meta_data.columns[1], f"{genoFile:a}")
    geno_meta_data.columns = ['#chr', 'geno_path']
    geno_meta_data['#chr'] = geno_meta_data['#chr'].apply(lambda x: str(x) if str(x).startswith('chr') else f'chr{x}')

# Checking the DataFrame
valid_chr_values = [f'chr{x}' for x in range(1, 23)] + ['chrX']
if not all(value in valid_chr_values for value in geno_meta_data['#chr']):
    raise ValueError("Invalid chromosome values found. Allowed values are chr1 to chr22 and chrX.")

region_ids = []
# If region_list is provided, read the file and extract IDs
if region_list.is_file():
    region_list_df = pd.read_csv(region_list, sep=r'\s+', header=None, comment = "#")
    region_ids = region_list_df.iloc[:, -1].unique()  # Extracting the last column for IDs
# If region_name is provided, include those IDs as well
# --region-name A B C will result in a list of ["A", "B", "C"] here
if len(region_name) > 0:
    region_ids = list(set(region_ids).union(set(region_name)))

trans_analysis = False
if trans_analysis:
    meta_data = process_trans_files(phenoFile, covFile, phenotype_names, phenoIDFile, region_ids, customized_association_windows)
else:
    meta_data = process_cis_files(phenoFile, covFile, phenotype_names, phenoIDFile, region_ids, preload_id_map(phenoIDFile))

if not meta_data.empty:
    meta_data = meta_data.merge(geno_meta_data, on='#chr', how='inner')
    # Adjust association-window
    if os.path.isfile(customized_association_windows):
        print(f"Loading customized association analysis window from {customized_association_windows}")
        association_windows_list = pd.read_csv(customized_association_windows, comment="#", header=None, names=["#chr","start","end","ID"], sep="\t")
        meta_data = pd.merge(meta_data, association_windows_list, on=['#chr', 'ID'], how='left', suffixes=('', '_association'))
        mismatches = meta_data[meta_data['start_association'].isna()]
        if not mismatches.empty:
            raise ValueError(f"{len(mismatches)} regions to analyze cannot be found in ``{customized_association_windows}``. Please check your ``{customized_association_windows}`` database to make sure it contains all association-window definitions. ")
    else:
        if cis_window < 0 :
            raise ValueError("Please either input valid path to association-window file via ``--customized-association-windows``, or set ``--cis-window`` to a non-negative integer.")
        if cis_window == 0:
            print("Warning: only variants within the range of start and end of molecular phenotype will be considered since cis_window is set to zero and no customized association window file was found. Please make sure this is by design.")
        meta_data['start_association'] = meta_data['start'].apply(lambda x: max(x - cis_window, 0))
        meta_data['end_association'] = meta_data['end'] + cis_window

    # Example meta_data:
    # #chr    start      end    start_association       end_association           ID  Original_ID   path     cov_path             cond             coordinate     geno_path
    # 0  chr12   752578   752579  652578   852579  ENSG00000060237  Q9H4A3,P62873  protocol_example.protein_1.bed.gz,protocol_example.protein_2.bed.gz  covar_1.gz,covar_2.gz  trait_A,trait_B    chr12:752578-752579  protocol_example.genotype.chr21_22.bed       
    # Create the final dictionary
    regional_data = {
        'data': [(row['geno_path'], *row['path'].split(','), *row['cov_path'].split(',')) for _, row in meta_data.iterrows()],
        'meta_info': [(f"{row['#chr']}:{row['start']}-{row['end']}", # this is the phenotypic region to extract data from
                       f"{row['#chr']}:{row['start_association']}-{row['end_association']}", # this is the association window region
                       row['ID'], row['Original_ID'], *row['cond'].split(',')) for _, row in meta_data.iterrows()]
    }
else:
    regional_data = {'data':[], 'meta_info':[]}
[quantile_qtl_twas_weight]
depends: sos_variable("regional_data")
# Check if both 'data' and 'meta_info' are empty lists
stop_if(len(regional_data['data']) == 0, f'Either genotype or phenotype data are not available for region {", ".join(region_name)}.')

meta_info = regional_data['meta_info']
input: regional_data["data"], group_by = lambda x: group_by_region(x, regional_data["data"]), group_with = "meta_info"
output: f'{cwd:a}/{step_name}/{name}.{_meta_info[0].split(":")[0]}_{_meta_info[2]}.univariate_qr_twas_weights.rds'
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output:bn}'
R: expand = '${ }', stdout = f"{_output:n}.stdout", stderr = f"{_output:n}.stderr", container = container
    options(warn=1)
    library(pecotmr)
    library(qQTLR)
    library(readr)
    start_time_total <- proc.time()
    
    phenotype_files = c(${",".join(['"%s"' % x.absolute() for x in _input[1:len(_input)//2+1]])})
    covariate_files = c(${",".join(['"%s"' % x.absolute() for x in _input[len(_input)//2+1:]])})
    conditions = c(${",".join(['"%s"' % x for x in _meta_info[4:]])})
    region = ${("'%s'" % _meta_info[0]) if int(_meta_info[0].split('-')[-1])>0 else 'NULL'} # if the end position is zero return NULL
    association_window = "${_meta_info[1]}"
    extract_region_name = list(${",".join([("c('"+x+"')") if isinstance(x, str) else ("c"+ str(x)) for x in _meta_info[3]])})
    phenotype_header = ${"4" if int(_meta_info[0].split('-')[-1])>0 else "1"}
    region_name_col = ${"4" if int(_meta_info[0].split('-')[-1])>0 else "1"} 


    # extract subset of samples
    keep_samples = NULL
    if (${"TRUE" if keep_samples.is_file() else "FALSE"}) {
      keep_samples = unlist(strsplit(readLines(${keep_samples:ar}), "\\s+"))
      message(paste(length(keep_samples), "samples are selected to be loaded for analysis"))
    }

    # Load variant filter list if provided
    keep_variants_list = NULL
    keep_variants_per_context = FALSE
    keep_variants_region_data = NULL
    if (${"TRUE" if keep_variants.is_file() else "FALSE"}) {
      keep_variants_path = ${keep_variants:ar}

      # Read input: RDS or text/tsv
      if (grepl("\\.rds$", keep_variants_path, ignore.case = TRUE)) {
        keep_variants_data = readRDS(keep_variants_path)
        # Ensure it's a data.frame/data.table
        if (!is.data.frame(keep_variants_data)) {
          # If RDS contains a vector, treat as simple variant list
          keep_variants_list = as.character(keep_variants_data)
          keep_variants_list = keep_variants_list[keep_variants_list != ""]
          message(paste(length(keep_variants_list), "variants are specified to be kept for analysis (from RDS vector)"))
          keep_variants_data = NULL
        }
      } else {
        keep_variants_data = data.table::fread(keep_variants_path, header = TRUE)
      }

      if (!is.null(keep_variants_data)) {
        # Fix column name if first column is "#chr"
        if ("#chr" %in% names(keep_variants_data)) {
          names(keep_variants_data)[names(keep_variants_data) == "#chr"] <- "chr"
        }

        if (ncol(keep_variants_data) == 1) {
          # Single column: treat as simple variant_id list (old behavior)
          keep_variants_list = trimws(as.character(keep_variants_data[[1]]))
          keep_variants_list = keep_variants_list[keep_variants_list != ""]
          message(paste(length(keep_variants_list), "variants are specified to be kept for analysis"))
        } else if (all(c("context", "molecular_trait_object_id", "variant_id") %in% names(keep_variants_data))) {
          # Multi-column format: filter by context and molecular_trait_object_id
          # For genotype loading: use union of all matching variants across all contexts
          current_region = "${_meta_info[2]}"
          original_region_ids = c(${",".join([("'%s'" % x) if isinstance(x, str) else ",".join(["'%s'" % i for i in x]) for x in _meta_info[3]])})
          all_region_ids = unique(c(current_region, original_region_ids))
          current_contexts = conditions
          keep_variants_region_data = keep_variants_data[
            keep_variants_data$molecular_trait_object_id %in% all_region_ids &
            keep_variants_data$context %in% current_contexts, ]
          keep_variants_list = unique(as.character(keep_variants_region_data$variant_id))
          keep_variants_list = keep_variants_list[keep_variants_list != ""]
          # Flag for per-context filtering in the loop
          keep_variants_per_context = TRUE
          message(paste(length(keep_variants_list), "variants matched for region", current_region,
                        "across", length(current_contexts), "contexts (per-context filtering enabled)"))
          if (length(keep_variants_list) == 0) {
            message("No matching variants found for this region, marginal coef will be skipped for contexts without variants")
          }
        } else {
          # Fallback: treat first column as variant_id list
          keep_variants_list = trimws(as.character(keep_variants_data[[1]]))
          keep_variants_list = keep_variants_list[keep_variants_list != ""]
          message(paste(length(keep_variants_list), "variants are specified to be kept for analysis (from first column)"))
        }
      }
    }

    # Load regional association data
    tryCatch({
    fdat = load_regional_association_data(genotype = ${_input[0]:anr},
                                          phenotype = phenotype_files,
                                          covariate = covariate_files,
                                          region = ${("'%s'" % _meta_info[0]) if int(_meta_info[0].split('-')[-1])>0 else 'NULL'}, # if the end position is zero return NULL
                                          association_window = "${_meta_info[1]}",
                                          conditions = conditions,
                                          maf_cutoff = ${maf},
                                          mac_cutoff = ${mac},
                                          imiss_cutoff = ${imiss},
                                          keep_indel = ${"TRUE" if indel else "FALSE"},
                                          keep_samples = keep_samples,
                                          extract_region_name = list(${",".join([("c('"+x+"')") if isinstance(x, str) else ("c"+ str(x)) for x in _meta_info[3]])}),
                                          phenotype_header = ${"4" if int(_meta_info[0].split('-')[-1])>0 else "1"},
                                          region_name_col = ${"4" if int(_meta_info[0].split('-')[-1])>0 else "1"},
                                          scale_residuals = FALSE)
    }, NoSNPsError = function(e) {
        message("Error: ", paste(e$message, "${_meta_info[2] + '@' + _meta_info[1]}"))
        saveRDS(list(${_meta_info[2]} = e$message), ${_output:ar}, compress='xz')
        quit(save="no")
    })
  
    if (${"TRUE" if save_data else "FALSE"}) {
        # save data object for debug purpose
        saveRDS(list(${_meta_info[2]} = fdat), "${_output:ann}.univariate.rds", compress='xz')
    }
    
    # setup univariate analysis pipeline options
    if ("${_meta_info[2]}" != "${_meta_info[3]}") {
        region_name = c("${_meta_info[2]}", c(${",".join([("c('"+x+"')") if isinstance(x, str) else ("c"+ str(x)) for x in _meta_info[3]])}))
    } else {
        region_name = "${_meta_info[2]}"
    }
  
    region_info = list(region_coord=parse_region("${_meta_info[0]}"), grange=parse_region("${_meta_info[1]}"), region_name=region_name)
    
    fitted = list()
    condition_names = vector()
    r = 1
    while (r<=length(fdat$Y)) {
        X <- fdat$X_data[[r]]
        Y <- fdat$Y[[r]]
        if (is.null(dim(Y))) {
            Y <- matrix(Y, nrow = length(Y), ncol = 1)
        }
        colnames(Y) <- extract_region_name[[r]]
        Z <- fdat$covar[[r]]
        # Update condition names first
        new_names = names(fdat$residual_Y)[r]
        new_col_names = list(${",".join([("c('"+x+"')") if isinstance(x, str) else ("c"+ str(x)) for x in _meta_info[3]])})[[r]]
        if (is.null(new_col_names)) {
            new_col_names = 1:ncol(fdat$residual_Y[[r]])
        }
        if(!(identical(new_names, new_col_names))) {
            new_names = paste(new_names, new_col_names, sep="_")
        }

        column_results <- lapply(1:ncol(Y), function(i) {
            Y_col <- matrix(Y[,i], ncol=1)
            colnames(Y_col) <- colnames(Y)[i]
            # Determine per-context variant list
            current_keep_variants = keep_variants_list
            if (keep_variants_per_context && !is.null(keep_variants_region_data)) {
              ctx_name = names(fdat$residual_Y)[r]
              ctx_filtered = keep_variants_region_data[keep_variants_region_data$context == ctx_name, ]
              if (nrow(ctx_filtered) > 0) {
                current_keep_variants = unique(as.character(ctx_filtered$variant_id))
                current_keep_variants = current_keep_variants[current_keep_variants != ""]
                message(paste(length(current_keep_variants), "variants for context", ctx_name))
              } else {
                current_keep_variants = character(0)
                message(paste("No context-specific variants for", ctx_name, "- skipping marginal coef fitting"))
              }
            }

            qr_results = quantile_twas_weight_pipeline(
                X = X,
                Y = Y_col,
                Z = Z,
                ld_reference_meta_file=${('"%s"' % ld_reference_meta_file) if not ld_reference_meta_file.is_dir() else "NULL"},
                maf = fdat$maf[[r]],
                twas_maf_cutoff = ${min_twas_maf},
                region_id = paste0(colnames(Y_col), "_", names(fdat$residual_Y)[r]),
                quantile_qtl_tau_list = seq(0.05, 0.95, by = 0.05),
                quantile_twas_tau_list = seq(0.01, 0.99, by = 0.01),
                screen_method = "${screen_method}",
                screen_threshold = ${screen_threshold},
                screen_significant = ${"TRUE" if screen_significant else "FALSE"},
                pre_filter_by_pqr = ${"TRUE" if pre_filter_by_pqr else "FALSE"},
                initial_corr_filter_cutoff = ${initial_corr_filter_cutoff},
                full_rank_corr_filter_cutoff = ${full_rank_corr_filter_cutoff},
                keep_variants = current_keep_variants,
                marginal_beta_calculate = ${"TRUE" if marginal_beta_calculate else "FALSE"},
                twas_weight_calculate = ${"TRUE" if twas_weight_calculate else "FALSE"},
                qrank_screen_calculate = ${"TRUE" if qrank_screen_calculate else "FALSE"},
                vqtl_calculate = ${"TRUE" if vqtl_calculate else "FALSE"}
            )

            if (!is.null(qr_results$message)) {
                message(qr_results$message)
            }

            qr_results$region_info = region_info
            qr_results$maf = fdat$maf[[r]]

            return(qr_results)
        })

        fitted <- c(fitted, column_results)
        condition_names <- c(condition_names, new_names)

        if (length(new_names) > 0) {
            message("Analysis completed for: ", paste(new_names, collapse=","))
        }

        # Release memory
        fdat$residual_X[[r]] <- NA
        fdat$residual_Y[[r]] <- NA
        r = r + 1
    }

    # Set names for the final results
    if (length(fitted) > 0) {
        names(fitted) <- condition_names
    }

    saveRDS(list("${_meta_info[2]}" = fitted), ${_output:ar}, compress='xz')
    end_time_total <- proc.time()
    total_time <- end_time_total - start_time_total
    print(total_time)