Stratified LD Score Regression#

This notebook implements the pipepline of S-LDSC for LD score and functional enrichment analysis.

Important: the S-LDSC implementation comes for the polyfun package, not the original LDSC from bulik/ldsc GitHub repo.

The purpose of this pipeline is to use LD Score Regression (LDSC) to analyze the heritability and enrichment of genome annotations across GWAS traits. By integrating genome annotation files and GWAS summary statistics, this pipeline allows single tau analysis (individual annotation contributions) and joint tau analysis (independent contributions of multiple annotations after removing shared effects).

The pipeline is developed to integrate GWAS summary statistics data, annotation data, and LD reference panel data to compute functional enrichment for each of the epigenomic annotations that the user provides using the S-LDSC model. We will first start off with an introduction, instructions to set up, and the minimal working examples. Then the workflow code that can be run using SoS on any data will be at the end.

A brief review on Stratified LD score regression#

Here I briefly review LD Score Regression and what it is used for. For more in depth information on LD Score Regression please read the following three papers:

  1. “LD Score regression distinguishes confounding from polygenicity in genome-wide association studies” by Sullivan et al (2015)

  2. “Partitioning heritability by functional annotation using genome-wide association summary statistics” by Finucane et al (2015)

  3. “Linkage disequilibrium–dependent architecture of human complex traits shows action of negative selection” by Gazal et al (2017)

As stated in Sullivan et al 2015, confounding factors and polygenic effects can cause inflated test statistics and other methods cannot distinguish between inflation from confounding bias and a true signal. LD Score Regression (LDSC) is a technique that aims to identify the impact of confounding factors and polygenic effects using information from GWAS summary statistics.

This approach involves using regression to mesaure the relationship between Linkage Disequilibrium (LD) scores and test statistics of SNPs from the GWAS summary statistics. Variants in LD with a “causal” variant show an elevation in test statistics in association analysis proportional to their LD (measured by \(r^2\)) with the causal variant within a certain window size (could be 1 cM, 1kB, etc.). In contrast, inflation from confounders such as population stratification that occur purely from genetic drift will not correlate with LD. For a polygenic trait, SNPs with a high LD score will have more significant χ2 statistics on average than SNPs with a low LD score. Thus, if we regress the \(\chi^2\) statistics from GWAS against LD Score, the intercept minus one is an estimator of the mean contribution of confounding bias to the inflation in the test statistics. The regression model is known as LD Score regression.

LDSC model#

Under a polygenic assumption, in which effect sizes for variants are drawn independently from distributions with variance proportional to \(1/(p(1-p))\) where p is the minor allele frequency (MAF), the expected \(\chi^2\) statistic of variant j is:

\[E[\chi^2|l_j] = Nh^2l_j/M + Na + 1 \quad (1)\]

where \(N\) is the sample size; \(M\) is the number of SNPs, such that \(h^2/M\) is the average heritability explained per SNP; \(a\) measures the contribution of confounding biases, such as cryptic relatedness and population stratification; and \(l_j = \sum_k r^2_{jk}\) is the LD Score of variant \(j\), which measures the amount of genetic variation tagged by \(j\). A full derivation of this equation is provided in the Supplementary Note of Sullivan et al (2015). An alternative derivation is provided in Supplementary Note of Zhu and Stephens (2017) AoAS.

From this we can see that LD Score regression can be used to compute SNP-based heritability for a phenotype or trait, from GWAS summary statistics and does not require genotype information like other methods such as REML do.

Stratified LDSC#

Heritability is the proportion of phenotypic variation (VP) that is due to variation in genetic values (VG) and thus can tell us how much of the difference in observed phenotypes in a sample is due to difference in genetics in the sample. It can also be extended to analyze partitioned heritability for a phenotype/trait split over categories.

For Partitioned Heritability or Stratified LD Score Regression (S-LDSC) more power is added to our analysis by leveraging LD Score information as well as using SNPs that haven’t reached Genome Wide Significance to partition heritability for a trait over categories which many other methods do not do.

S-LDSC relies on the fact that the \(\chi^2\) association statistic for a given SNP includes the effects of all SNPs tagged by this SNP meaning that in a region of high LD in the genome the given SNP from the GWAS represents the effects of a group of SNPs in that region.

S-LDSC determines that a category of SNPs is enriched for heritability if SNPs with high LD to that category have more significant \(\chi^2\) statistics than SNPs with low LD to that category.

Here, enrichment of a category is defined as the proportion of SNP heritability in the category divided by the proportion of SNPs in that category.

More precisely, under a polygenic model, the expected \(\chi^2\) statistic of SNP \(j\) is

\[E[\chi^2_j] = N\sum_CT_Cl(j,C) + Na + 1 \quad (2)\]

where \(N\) is sample size, C indexes categories, \(ℓ(j, C)\) is the LD score of SNP j with respect to category \(l(j,C) = \sum_{k\epsilon C} r^2_{jk}\), \(a\) is a term that measures the contribution of confounding biases, and if the categories are disjoint, \(\tau_C\) is the per-SNP heritability in category \(C\); if the categories overlap, then the per-SNP heritability of SNP j is \(\sum_{C:j\epsilon C} \tau_C\). Equation 2 allows us to estimate \(\tau_C\) via a (computationally simple) multiple regression of \(\chi^2\) against \(ℓ(j, C)\), for either a quantitative or case-control study.

To see how these methods have been applied to real world data as well as a further discussion on methods and comparisons to other methods please read the three papers listed at the top of the document.

Tau Estimation and Enrichment Analysis#

To quantify the contribution of functional annotations to trait heritability and evaluate their statistical significance, while accounting for linkage disequilibrium (LD) structure and annotation-specific properties.

Tau (τ):#

The standardized per-SNP heritability contribution of an annotation.

In joint-annotation analysis: Represents the independent contribution of an annotation after controlling for overlapping effects of other annotations.

Statistical Workflow#

1. Inputs#

  • .annot.gz: Annotation files defining SNP membership in functional categories.

  • .part_delete: Jackknife delete-values for τ estimates (200 genomic blocks × annotations).

  • .results: Regression coefficients (τ), enrichment statistics, and proportions.

  • .log: Total heritability (\(h_g^2\)) and quality control metrics.

2. Tau Standardization#

For each annotation j:

\[\tau_j^{std} = \tau_j \times \frac{sd_j \cdot M_{ref}}{h_g^2}\]
  • \(sd_j\): Standard deviation of annotation j across SNPs.

  • \(M_{ref} = 5,961,159\): Reference SNP count for genome-wide scaling.

  • \(h_g^2\): Total heritability (normalizes τ to per-unit contribution).

  • \(\tau_j^{std}\) defined as the additive change in per-SNP heritability associated to a 1 standard deviation increase in the value of the annotation, divided by the average per-SNP heritability over all SNPs for the trait.

3. Enrichment Calculation#

Enrichment is defined as the proportion of heritability explained by variants in the target annotation divided by the proportion of variants in the annotation.

a. Direct Enrichment Statistics#

For each annotation j:

\[E_j = \frac{\text{Prop.\_h2}_j}{\text{Prop.\_SNPs}_j}= \tau_j \times \frac{M_{ref}}{h_g^2}\]

Where:

  • \(\text{Prop.\_h2}_j\): Proportion of heritability explained by annotation j

  • \(\text{Prop.\_SNPs}_j\): Proportion of SNPs in annotation j

b. Standardized Enrichment Statistics#

The goal is to compute the p-value based on the assumption of a normal distribution.

\[\text{EnrichStat}_j = \frac{h_g^2}{M_{ref}} \times \left[\frac{\text{Prop.\_h2}_j}{\text{Prop.\_SNPs}_j} - \frac{1-\text{Prop.\_h2}_j}{1-\text{Prop.\_SNPs}_j}\right]\]

Standard error derivation: $\(Z_j = \Phi^{-1}(\text{Enrichment\_p}_j/2)\)\( \)\(SE_{\text{EnrichStat}_j} = \frac{\text{EnrichStat}_j}{Z_j}\)$

Where:

  • \(\Phi^{-1}\): Inverse normal cumulative distribution function

  • \(\text{Enrichment\_p}_j\): Enrichment p-value from .results file

\[ Z_j = \frac{\text{EnrichStat}_j}{SE_{\text{EnrichStat}_j}} \]

Meta-Analysis of Partitioned Heritability#

Purpose#

To integrate τ estimates across multiple traits or annotation groups, improving power and generalizability.

Random-Effects Meta-Analysis#

Models heterogeneity across traits:

\[\tau_{meta} = \frac{\sum w_i\tau_i}{\sum w_i}, \quad w_i = \frac{1}{SE_i^2 + \sigma^2}\]
  • \(\sigma^2\): Between-trait variance component.

  • \(SE_i\): Standard error of τ for trait i.

For enrichment meta-analysis:

  • Effect sizes use direct enrichment statistics \((E_j, SE_{E_j})\)

  • P-values derived from standardized statistics meta-analysis \(({\text{EnrichStat}_j}, SE_{\text{EnrichStat}_j})\)

Z-score: $\(Z = \frac{\tau_{meta}}{SE_{meta}}\)$

P-value: $\(p = 2 \times \Phi(-|Z|)\)$

Input#

1. Annotation File#

  • Purpose: Specifies genome annotation files for single or joint tau analysis.

  • Formats:

    • Text file (.txt) containing paths to annotation files, annotation files can be rds/tsv/txt format.

    • Alternatively, specific annotation files for a single chromosome can be provided directly.

    • Examples:

      • Single Annotation (joint_tau = False):

        #id   path
        1     /home/al4225/project/quantile_twas/analysis/output/157genes/step2_rq_lr_summary/enrich_info_by_context_class/AC_DeJager_eQTL.unique_qr.rds
        22    /home/al4225/project/quantile_twas/analysis/output/157genes/step2_rq_lr_summary/enrich_info_by_context_class/AC_DeJager_eQTL.unique_qr.rds
        
        
      • Joint Annotation (joint_tau = True):

        #id   path1                     path2
        1     /path/to/annotation1.rds   /path/to/annotation2.rds
        22    /path/to/annotation1.rds   /path/to/annotation2.rds
        
        
    • Example Files:

    • Format (the score column is optional, if this column doen’t exit, add score as 1):

        1. is_range = False

          chr   pos   score
          1    10001   1
          1    10002   1
      
        1. is_range = True

          chr   start   end   score
          1    10001   20001  1
          1    30001   40001  1
      

2. Reference Annotation File#

  • Purpose: Provides reference annotation files in .annot.gz format for each chromosome.

  • Formats:

    • Text file listing annotation files for all chromosomes.

    • Alternatively, files for specific chromosomes can be provided directly.

    • Example: /home/al4225/project/quantile_twas/analysis/SLDSC/data/reference_annotation.txt

      #id   path
      1     /mnt/vast/hpc/csg/xc2270/colocboost/post/SLDSC/example_anno/ABC_Road_GI_BRN.1.annot.gz
      22    /mnt/vast/hpc/csg/xc2270/colocboost/post/SLDSC/example_anno/ABC_Road_GI_BRN.22.annot.gz
      
      

3. Genome Reference File#

  • Purpose: Specifies genome reference .bed files PLINK format for each chromosome.

  • Formats:

    • Text file listing reference files for all chromosomes.

    • Alternatively, files for specific chromosomes can be provided directly.

    • Example: /home/al4225/project/quantile_twas/analysis/SLDSC/data/genome_reference_bfile.txt

      #id   path
      1     /mnt/vast/hpc/csg/xc2270/colocboost/post/SLDSC/plink_files/1000G.EUR.hg38.1.bed
      22    /mnt/vast/hpc/csg/xc2270/colocboost/post/SLDSC/plink_files/1000G.EUR.hg38.22.bed
      
      

4. SNP List#

  • File Path: /mnt/vast/hpc/csg/xc2270/colocboost/post/SLDSC/1000G_EUR_Phase3_hg38/list.txt

  • Purpose: Specifies SNPs for LDSC analysis.

  • Format: A list of rsids (e.g., 1217311 rows).

    rs12345
    rs67890
    ...
    

5. ALL Group Files#

  • Purpose: GWAS summary statistics containning all sumtats.

  • Example Files:

    /home/al4225/project/quantile_twas/analysis/SLDSC/data/sumstats_test_all.txt

  • Format:

    CAD_META.filtered.sumstats.gz
    UKB.Lym.BOLT.sumstats.gz
    

Workflow Steps#

Step 1: Annotation File Processing#

  • Purpose: Generate LD score files and genome annotations for each chromosome.

  • Inputs:

    • annotation_file

    • reference_anno_file

    • genome_ref_file

  • Key Parameters:

    • joint_tau = False: Single annotation analysis (one column path).

    • joint_tau = True: Joint annotation analysis (multiple path1, path2 columns starting with path).

    • chromosome: Optional, restricts processing to specific chromosomes.

  • Outputs:

    • Per chromosome:

      • .annot.gz: Genome annotation scores.

      • .l2.ldscore.parquet: LD score file.

      • .l2.M: Total SNP count.

      • .l2.M_5_50: SNP count in LD blocks.

      • .log: Processing logs.

Step 2: Munge Summary Statistics#

  • Purpose: Preprocess GWAS summary statistics into LDSC-compatible format.

  • Inputs:

    • GWAS summary statistics.

    • HapMap3 SNP list.

  • Outputs:

    • Harmonized GWAS summary statistics.

Step 3: Heritability Analysis#

  • Purpose: Estimate heritability () of traits using LDSC regression.

  • Inputs:

    • sumstat_dir: GWAS summary statistics.

    • target_anno_dir: Annotation directory.

    • baseline_ld_dir: Baseline LD scores.

  • Outputs:

    • Per trait:

      • .results: Heritability estimates and enrichment (Enrichment, Prop_h2).

      • .log: Analysis logs.

      • .part_delete: SNP-level contributions for annotations (target + baseline).

      • .delete: Aggregate contributions across SNPs for each annotation.

Step 4: Initial Processed Statistics#

  • Purpose: Calculate tau statistics and prepare intermediate files for meta-analysis.

  • Inputs:

    • All GWAS traits and their annotation scores.

  • Outputs:

    • RDS file containing:

      • single_tau: Contribution of each annotation to heritability.

      • enrichment: Enrichment ratio of heritability to SNP density.

      • joint_tau: Independent effect of each annotation after removing shared contributions.

Step 5: Meta Analysis#

  • Purpose: Perform meta-analysis across trait groups.

  • Inputs:

    • trait_group_paths: Paths to files listing traits in each group.

    • trait_group_names: Names of trait groups.

  • Outputs:

    • Results for each trait group:

      • mean, p-value, SE.

    • Includes:

      • single_tau: Single annotation effects.

      • enrichment: Heritability enrichment.

      • joint_tau: Independent contributions of annotations.

MWE:#

1. make_annotation_files_ldscore#

annotation file can be a txt file with #id, and path1 path2 …, also can be rds files seperate by ‘,’

1.1 single tau analysis, with one annotation as a input#

 # case 1: txt file as input
 sos run pipeline/sldsc_enrichment.ipynb make_annotation_files_ldscore \
    --annotation_file data/polyfun/input/colocboost_test_annotation_path.txt \
    --reference_anno_file data/polyfun/input/reference_annotation0.txt \
    --genome_ref_file data/polyfun/input/genome_reference_bfile.txt \
    --annotation_name test_colocboost \
    --plink_name reference. \
    --baseline_name annotations. \
    --weight_name weights. \
    --python_exec python \
    --polyfun_path data/github/polyfun \
    --cwd output/polyfun/ -j 22

Alternatively, we can also use files with specific chromosome, instead of txt list.

# single file format
 sos run pipeline/sldsc_enrichment.ipynb make_annotation_files_ldscore \
    --annotation_file data/polyfun/input/colocboost_test.tsv \
    --reference_anno_file data/polyfun/example_annot0/annotations.1.annot.gz \
    --genome_ref_file data/polyfun/example_data/reference.1.bed \
    --annotation_name test_colocboost \
    --plink_name reference. \
    --baseline_name annotations. \
    --weight_name weights. \
    --python_exec python \
    --polyfun_path data/github/polyfun \
    --cwd output/polyfun/ --chromosome 1
1.2 joint tau#

with more than one annotation as the input

# input case1: txt file format
 sos run /home/al4225/project/quantile_twas/analysis/SLDSC/scripts/ldsc.ipynb make_annotation_files_ldscore \
    --annotation_file /home/al4225/project/quantile_twas/analysis/SLDSC/data/quantile_qtl_annotation/test/AC_DeJager_eQTL.joint_tau_example.txt \
    --reference_anno_file /home/al4225/project/quantile_twas/analysis/SLDSC/data/reference_annotation.txt \
    --genome_ref_file /home/al4225/project/quantile_twas/analysis/SLDSC/data/genome_reference_bfile.txt \
    --snp_list /mnt/vast/hpc/csg/xc2270/colocboost/post/SLDSC/1000G_EUR_Phase3_hg38/list.txt \
    --annotation_name joint_test \
    --cwd /home/al4225/project/quantile_twas/analysis/SLDSC/output --joint_tau --chromosome 1
 # input case 2: single file format, annotation files separate by ','.
 sos run /home/al4225/project/quantile_twas/analysis/SLDSC/scripts/ldsc.ipynb make_annotation_files_ldscore \
    --annotation_file /home/al4225/project/quantile_twas/analysis/output/157genes/step2_rq_lr_summary/enrich_info_by_context_class/AC_DeJager_eQTL.unique_qr.rds,/home/al4225/project/quantile_twas/analysis/output/157genes/step2_rq_lr_summary/enrich_info_by_context_class/AC_DeJager_eQTL.shared_homo.rds \
    --reference_anno_file /mnt/vast/hpc/csg/xc2270/colocboost/post/SLDSC/example_anno/ABC_Road_GI_BRN.1.annot.gz \
    --genome_ref_file /mnt/vast/hpc/csg/xc2270/colocboost/post/SLDSC/plink_files/1000G.EUR.hg38.1.bed \
    --snp_list /mnt/vast/hpc/csg/xc2270/colocboost/post/SLDSC/1000G_EUR_Phase3_hg38/list.txt \
    --annotation_name joint_test \
    --cwd /home/al4225/project/quantile_twas/analysis/SLDSC/output/mwe/joint_tau --joint_tau --chromosome 1

2. get_heritability#

sos run pipeline/sldsc_enrichment.ipynb get_heritability \
    --target_anno_dir output/polyfun/test_colocboost \
    --sumstat_dir data/polyfun/example_data \
    --baseline_ld_dir data/polyfun/example_data \
    --python_exec python \
    --polyfun_path data/github/polyfun \
    --weights_dir data/polyfun/example_data \
    --plink_name reference. \
    --baseline_name annotations. \
    --weight_name weights. \
    --annotation_name test_colocboost \
    --cwd output/polyfun/test_colocboost/sumstats/ \
    --all_traits_file data/polyfun/input/sumstats_test_all.txt \
    -s build -j 2

3. processed_stats#

3.1 single tau analysis, with one annotation as a input#

# processed stats cwd has to be the same with get_heritability
sos run pipeline/sldsc_enrichment.ipynb processed_stats \
    --target_anno_dir output/polyfun/test_colocboost \
    --sumstat_dir data/polyfun/example_data \
    --baseline_ld_dir data/polyfun/example_data \
    --python_exec python \
    --polyfun_path data/github/polyfun \
    --weights_dir data/polyfun/example_data \
    --plink_name reference. \
    --baseline_name annotations. \
    --weight_name weights. \
    --annotation_name test_colocboost \
    --cwd output/polyfun/test_colocboost/sumstats/ \
    --trait_group_paths "data/polyfun/input/sumstats_test_all.txt data/polyfun/input/sumstats_test_category1.txt" \
    --trait_group_names "All category1" \
    --all_traits_file data/polyfun/input/sumstats_test_all.txt
3.2 joint tau#

with more than one annotation as the input

# processed stats cwd has to be the same with get_heritability
sos run /home/al4225/project/quantile_twas/analysis/SLDSC/scripts/ldsc.ipynb processed_stats \
    --target_anno_dir /home/al4225/project/quantile_twas/analysis/SLDSC/output/joint_test/ \
    --baseline_ld_dir /mnt/vast/hpc/csg/xc2270/colocboost/post/SLDSC/1000G_EUR_Phase3_hg38/baselineLD_v2.2 \
    --annotation_name joint_test \
    --cwd /home/al4225/project/quantile_twas/analysis/SLDSC/output/joint_test/heritability \
    --trait_group_paths "/home/al4225/project/quantile_twas/analysis/SLDSC/data/sumstats_test_all.txt /home/al4225/project/quantile_twas/analysis/SLDSC/data/sumstats_test_brain_trait.txt /home/al4225/project/quantile_twas/analysis/SLDSC/data/sumstats_test_blood_trait.txt" \
    --trait_group_names "All Brain Blood" \
    --all_traits_file /home/al4225/project/quantile_twas/analysis/SLDSC/data/sumstats_test_all.txt -s build --joint_tau --joint_annotation_names "AC.eqtl.unique_qr AC.eqtl.shared_heter"
[global]
# Path to the work directory of the analysis.
parameter: cwd = path('output')
# Prefix for the analysis output
parameter: annotation_name = str
parameter: joint_annotation_names = [] #if joint analysis is used, pass annotation names
parameter: joint_tau = False
parameter: python_exec = "python" # e.g. "/home/you/.conda/envs/polyfun/bin/python"
parameter: polyfun_path   = path # e.g. "/home/you/tools/polyfun"

# for make_annotation_files_ldscore workflow:
parameter: annotation_file = path()
parameter: reference_anno_file = path()
parameter: genome_ref_file = path() # with .bed 
parameter: chromosome = []
parameter: snp_list = path()
parameter: ld_wind_kb = 0 # use kb if the value is provided
parameter: ld_wind_cm = 1.0 # default using ld_wind_cm

# for get_heritability workflow
parameter: all_traits_file = path() # txt file, each row contains all GWAS summary statistics name: e.g. CAD_META.filtered.sumstats.gz
parameter: sumstat_dir = path() # Directory containing GWAS summary statistics
parameter: target_anno_dir = path()  # Directory containing target annotation files: output of ldscore
parameter: baseline_ld_dir = path()  # Directory containing baseline LD score files
parameter: frqfile_dir = path()  # Directory containing allele frequency files
parameter: plink_name = "ADSP_chr"
parameter: weights_dir = path()  # Directory containing LD weights
parameter: baseline_name = "baseline_chr"  # Prefix of baseline annotation files
parameter: weight_name = "weights_chr"  # Prefix of LD weights files
parameter: Mref = -1 # Reference number of SNP value. If > 0, use provided value; if <= 0, auto-calculate from ldscore files

# Number of threads
parameter: numThreads = 16
# For cluster jobs, number commands to run per job
parameter: job_size = 1
parameter: walltime = '12h'
parameter: mem = '16G'

Make Annotation File#

[make_annotation_files_ldscore]
parameter: score_column = 3
parameter: is_range = False # parameter to handle range expansion

import pandas as pd
import os

def adapt_file_path(file_path, reference_file):
    """
    Adapt a single file path based on its existence and a reference file's path.

    Args:
    - file_path (str): The file path to adapt.
    - reference_file (str): File path to use as a reference for adaptation.

    Returns:
    - str: Adapted file path.

    Raises:
    - FileNotFoundError: If no valid file path is found.
    """
    reference_path = os.path.dirname(reference_file)

    # Check if the file exists
    if os.path.isfile(file_path):
        return file_path

    # Check file name without path
    file_name = os.path.basename(file_path)
    if os.path.isfile(file_name):
        return file_name

    # Check file name in reference file's directory
    file_in_ref_dir = os.path.join(reference_path, file_name)
    if os.path.isfile(file_in_ref_dir):
        return file_in_ref_dir

    # Check original file path prefixed with reference file's directory
    file_prefixed = os.path.join(reference_path, file_path)
    if os.path.isfile(file_prefixed):
        return file_prefixed

    # If all checks fail, raise an error
    raise FileNotFoundError(f"No valid path found for file: {file_path}")


def adapt_file_path_all(df, column_name, reference_file):
    return df[column_name].apply(lambda x: adapt_file_path(x, reference_file))

# Process input files based on file type
if (str(annotation_file).endswith(('rds', 'tsv', 'txt', 'tsv.gz', 'txt.gz')) and 
    str(reference_anno_file).endswith('annot.gz')):
    # Case 1: Direct file paths
    # joint tau: annotation_fills: rds/tsv/txt files separete with ","    
    if joint_tau:
        target_files = str(annotation_file).split(',')
        input_files = [[*target_files, reference_anno_file, genome_ref_file]]
    else:
        # single annotation
        input_files = [[annotation_file, reference_anno_file, genome_ref_file]]
    
    if len(chromosome) > 0:
        input_chroms = [int(x) for x in chromosome]
    else:
        input_chroms = [0]
else:
    # Case 2: Files with #id and cols starting with path columns
    target_files = pd.read_csv(annotation_file, sep="\t")
    reference_files = pd.read_csv(reference_anno_file, sep="\t")
    genome_ref_files = pd.read_csv(genome_ref_file, sep="\t")
    
    # Standardize #id 
    target_files["#id"] = [x.replace("chr", "") for x in target_files["#id"].astype(str)]
    reference_files["#id"] = [x.replace("chr", "") for x in reference_files["#id"].astype(str)]
    genome_ref_files["#id"] = [x.replace("chr", "") for x in genome_ref_files["#id"].astype(str)]
    
    # process target annotation files
    path_columns = [col for col in target_files.columns if col.startswith('path')]
    for col in path_columns:
        target_files[col] = target_files[col].apply(lambda x: adapt_file_path(x, annotation_file))
    
    # process reference and genome files
    reference_files["path"] = reference_files["path"].apply(lambda x: adapt_file_path(x, reference_anno_file))
    genome_ref_files["path"] = genome_ref_files["path"].apply(lambda x: adapt_file_path(x, genome_ref_file))
    
    # Merge the files based on #id
    input_files = target_files.merge(reference_files, on="#id").merge(genome_ref_files, on="#id")
    
    # Filter by specified chromosomes, if any
    if len(chromosome) > 0:
        input_files = input_files[input_files['#id'].isin(chromosome)]
    
    # Extract relevant columns as a list of file paths
    input_files = input_files.values.tolist()
    input_chroms = [x[0] for x in input_files]  # Chromosome IDs
    
    if joint_tau:
        # joint tau, keep all path columns
        input_files = [[*x[1:len(path_columns)+1], x[-2], x[-1]] for x in input_files]
    else:
        # single annotation
        input_files = [[x[1], x[-2], x[-1]] for x in input_files]
        
input: input_files, group_by = len(input_files[0]), group_with = "input_chroms"
# Dynamically determine output format based on snp_list parameter
import os
use_print_snps = snp_list.is_file()

if use_print_snps:
    # ldsc.py outputs .gz format
    ldscore_ext = "l2.ldscore.gz"
else:
    # compute_ldscores.py outputs .parquet format  
    ldscore_ext = "l2.ldscore.parquet"

if ld_wind_kb > 0:
    use_kb_window = True
    ld_window_param = ld_wind_kb
    ld_window_flag = "--ld-wind-kb"
else:
    use_kb_window = False
    ld_window_param = ld_wind_cm 
    ld_window_flag = "--ld-wind-cm"

output: dict([
   ('annot', f'{cwd:a}/{annotation_name}/{annotation_name}.{input_chroms[_index]}.annot.gz'),
   ('ldscore', f'{cwd:a}/{annotation_name}/{annotation_name}.{input_chroms[_index]}.{ldscore_ext}'),
   ('Mfile',  f'{cwd:a}/{annotation_name}/{annotation_name}.{input_chroms[_index]}.l2.M') # set output for generation .M files
])

task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output[0]:bnn}'

R: expand= "${ }", stderr = f'{_output["annot"]}.stderr', stdout = f'{_output["annot"]}.stdout'
    library(data.table)

    # Function to clean chromosome notation
    clean_chr <- function(x) {
        as.numeric(gsub("^chr", "", x))
    }

    # Function to process range data - only expands ranges for the specified chromosome
    process_range_data <- function(data, chr_value) {
        # First filter for the specific chromosome
        data$chr <- clean_chr(data$chr)
        data <- data[data$chr == chr_value,]
        
        if(nrow(data) == 0) {
            return(NULL)
        }
        
        # Only expand ranges for the matched chromosome
        expanded_rows <- lapply(1:nrow(data), function(j) {
            row <- data[j,]
            pos_seq <- seq(row$start, row$end-1)
            
            # Create base data frame with chr and pos
            result <- data.frame(
                chr = rep(row$chr, length(pos_seq)),
                pos = pos_seq
            )
            
            # Add additional columns if they exist
            if(ncol(data) > 3) {
                for(col in 4:ncol(data)) {
                    result[[names(data)[col]]] <- rep(row[[col]], length(pos_seq))
                }
            }
            
            return(result)
        })
        result <- unique(rbindlist(expanded_rows))
        return(result)
    }

    # Main annotation processing function
    process_annotation <- function(target_anno, ref_anno, score_column_value) {
        target_anno <- as.data.frame(target_anno)
        ref_anno <- as.data.frame(ref_anno)
        
        # Clean chromosome notation
        target_anno$chr <- clean_chr(target_anno$chr)
        ref_anno$CHR <- clean_chr(ref_anno$CHR)
        
        chr_value = unique(ref_anno$CHR)
        pos <- which(target_anno$chr == chr_value)
        anno_scores <- rep(0, nrow(ref_anno))
        
        match_pos <- match(target_anno$pos, ref_anno$BP)
        valid_pos <- as.numeric(na.omit(match_pos))
        
        if (score_column_value <= ncol(target_anno)) {
            anno_scores[valid_pos] <- target_anno[[score_column_value]][!is.na(match_pos)]
        } else {
            anno_scores[valid_pos] <- 1
            print("Warning: Score column does not exist. Setting scores to 1")
        }
        
        return(anno_scores)
    }

    # Read reference annotation file
    ref_anno <- fread(${_input[-2]:ar})
    ref_anno <- as.data.frame(ref_anno)
    if("ANNOT" %in% colnames(ref_anno)) {
        ref_anno <- ref_anno[,-which(colnames(ref_anno) == "ANNOT")]
    }
    result_anno <- ref_anno

    score_column_value <- ${score_column}  
    is_joint <- ${"TRUE" if joint_tau else "FALSE"}
    is_range <- ${"TRUE" if is_range else "FALSE"}

    if(is_joint) {
        target_files = c(${",".join(['"%s"' % x.absolute() for x in _input[:-2]])})
        for(i in seq_along(target_files)) {
            # Read file based on format
            if(endsWith(target_files[i], "rds")) {
                target_anno <- readRDS(target_files[i])
            } else {  # Handle TSV/TXT files
                target_anno <- fread(target_files[i])
                
                # Process range data if needed
                if(is_range) {
                    names(target_anno)[1:3] <- c("chr", "start", "end")
                    target_anno <- process_range_data(target_anno, unique(ref_anno$CHR))
                    if(is.null(target_anno)) {
                        anno_scores <- rep(0, nrow(ref_anno))
                    } else {
                        anno_scores <- process_annotation(target_anno, ref_anno, score_column_value)
                    }
                } else {
                    names(target_anno)[1:2] <- c("chr", "pos")
                    anno_scores <- process_annotation(target_anno, ref_anno, score_column_value)
                }
            }
            
            print(paste("Processing annotation file", i))
            result_anno[[paste0("ANNOT_", i)]] <- anno_scores
        }
    } else {
        # Read file based on format
        if(endsWith(${_input[0]:ar}, "rds")) {
            target_anno <- readRDS(${_input[0]:ar})
        } else {  # Handle TSV/TXT files
            target_anno <- fread(${_input[0]:ar})
            
            # Process range data if needed
            if(is_range) {
                names(target_anno)[1:3] <- c("chr", "start", "end")
                target_anno <- process_range_data(target_anno, unique(ref_anno$CHR))
                if(is.null(target_anno)) {
                    anno_scores <- rep(0, nrow(ref_anno))
                } else {
                    anno_scores <- process_annotation(target_anno, ref_anno, score_column_value)
                }
            } else {
                names(target_anno)[1:2] <- c("chr", "pos")
                anno_scores <- process_annotation(target_anno, ref_anno, score_column_value)
            }
        }
        
        result_anno$ANNOT <- anno_scores
    }

    # Write results
    fwrite(result_anno, ${_output["annot"]:nr}, 
        quote=FALSE, col.names=TRUE, row.names=FALSE, sep="\t")

bash: expand= "$[ ]", stderr = f'{_output["annot"]:nnn}.stderr', stdout = f'{_output["annot"]:nnn}.stdout'
   gzip -f $[_output["annot"]:n] 

bash: expand="${ }", stderr = f'{_output[1]}.stderr', stdout = f'{_output[1]}.stdout'
    echo "Running chromosome ${input_chroms[_index]}"
    echo "use_print_snps = ${use_print_snps}"
    echo "use_kb_window = ${use_kb_window}" 
    echo "ld_window_flag = ${ld_window_flag}"
    echo "ld_window_param = ${ld_window_param}"
    if [ "${use_kb_window}" = "True" ]; then
        echo "Using custom LD window: ${ld_window_param} kb"
    else
        echo "Using default LD window: ${ld_window_param} cM"
    fi
    
    # Use the dynamically determined mode
    if [ "${use_print_snps}" = "True" ]; then
        echo "SNP list found: ${snp_list}"
        echo "Using ldsc.py --print-snps mode"
        ${python_exec} ${polyfun_path}/ldsc.py \
          --print-snps ${snp_list} \
          ${ld_window_flag} ${ld_window_param} \
          --out ${_output["ldscore"]:nnn} \
          --bfile ${_input[-1]:nar} \
          --yes-really \
          --annot ${_output["annot"]:a} \
          --l2
    else
        echo "No SNP list provided, using compute_ldscores.py to save time"
        ${python_exec} ${polyfun_path}/compute_ldscores.py \
          --annot ${_output["annot"]:a} \
          --bfile ${_input[-1]:nar} \
          ${ld_window_flag} ${ld_window_param} \
          --out ${_output["ldscore"]} \
          --allow-missing
    fi

R: expand="${ }", stderr=f'{_output["Mfile"]}.stderr', stdout=f'{_output["Mfile"]}.stdout'
    # Check if --print-snps mode was used
    use_print_snps <- ${str(use_print_snps).upper()}
    out_file     <- ${_output["Mfile"]:ar}
    out_file_550 <- paste0(${_output["Mfile"]:ar}, "_5_50")
    
    if(use_print_snps) {
        cat("--print-snps mode was used, checking if M files already exist...\n")
        
        if(file.exists(out_file)) {
            cat("M file already exists:", out_file, "\n")
            if(file.exists(out_file_550)) {
                cat("M_5_50 file already exists:", out_file_550, "\n")
                cat("Both M files found, skipping generation\n")
                quit(save = "no", status = 0)
            }
        }
    }
    
    cat("Proceeding with M file generation...\n")

    # Generate .M and .M_5_50 files based on SNPs actually used in LD score computation
    suppressPackageStartupMessages({
        library(data.table)
        library(dplyr)
    })
    
    # File paths
    ldscore_file <- ${_output["ldscore"]:ar}   # polyfun generated .l2.ldscore.parquet
    annot_file   <- ${_output["annot"]:ar}     # annotation file
    bfile_prefix <- ${_input[-1]:nar}          # plink file prefix
    
    cat("Reading input files...\n")
    cat("LD score file path:", ldscore_file, "\n")
    
    # 1. Read LD scores file (polyfun output) - Fixed logic
    ldscore_dt <- NULL
    
    # Try reading the file based on its actual extension
    if(endsWith(ldscore_file, ".parquet")) {
        tryCatch({
            suppressPackageStartupMessages(library(arrow))
            ldscore_dt <- arrow::read_parquet(ldscore_file)
            cat("Successfully read LD scores from .parquet file\n")
        }, error = function(e) {
            cat("Error reading parquet file:", e$message, "\n")
            cat("Trying alternative methods...\n")
        })
    } else if(endsWith(ldscore_file, ".gz")) {
        tryCatch({
            ldscore_dt <- fread(ldscore_file)
            cat("Successfully read LD scores from .gz file\n")
        }, error = function(e) {
            cat("Error reading gz file:", e$message, "\n")
            cat("Trying alternative methods...\n")
        })
    } else {
        # Check if alternative file formats exist
        alt_files <- c(
            paste0(ldscore_file, ".parquet"),
            paste0(ldscore_file, ".gz"),
            paste0(gsub("\\.[^.]*$", "", ldscore_file), ".parquet"),
            paste0(gsub("\\.[^.]*$", "", ldscore_file), ".gz")
        )
        
        for(alt_file in alt_files) {
            if(file.exists(alt_file)) {
                cat("Found alternative file:", alt_file, "\n")
                tryCatch({
                    if(endsWith(alt_file, ".parquet")) {
                        suppressPackageStartupMessages(library(arrow))
                        ldscore_dt <- arrow::read_parquet(alt_file)
                        cat("Successfully read from alternative parquet file\n")
                        break
                    } else if(endsWith(alt_file, ".gz")) {
                        ldscore_dt <- fread(alt_file)
                        cat("Successfully read from alternative gz file\n")
                        break
                    }
                }, error = function(e) {
                    cat("Failed to read", alt_file, ":", e$message, "\n")
                })
            }
        }
        
        # Last resort: try direct read
        if(is.null(ldscore_dt) && file.exists(ldscore_file)) {
            tryCatch({
                ldscore_dt <- fread(ldscore_file)
                cat("Successfully read LD scores from direct file read\n")
            }, error = function(e) {
                cat("Error in direct file read:", e$message, "\n")
            })
        }
    }
    
    # Check if we successfully read the file
    if(is.null(ldscore_dt)) {
        stop("Failed to read LD scores file from any format. Please check the file: ", ldscore_file)
    }
    
    cat("LD scores data dimensions:", nrow(ldscore_dt), "x", ncol(ldscore_dt), "\n")
    cat("LD scores columns:", paste(names(ldscore_dt), collapse=", "), "\n")
    
    # 2. Read annotation file
    annot_dt <- fread(annot_file)
    cat(sprintf("Read annotation file with %d SNPs\n", nrow(annot_dt)))
    
    # 3. Check for .frq file and read MAF information if available
    frq_file <- paste0(bfile_prefix, ".frq")
    has_frq <- file.exists(frq_file)
    
    if(has_frq) {
        cat("Found .frq file, will calculate both M and M_5_50\n")
        frq_dt <- fread(frq_file)
        frq_dt <- frq_dt[, .(SNP, MAF)]
        cat(sprintf("Read MAF information for %d SNPs\n", nrow(frq_dt)))
    } else {
        cat("No .frq file found, will only calculate M file\n")
        frq_dt <- NULL
    }
    
    # 4. Filter based on SNPs actually used in LD scores computation
    # LD scores file contains the final set of SNPs used
    used_snps <- ldscore_dt$SNP
    cat(sprintf("LD scores file contains %d SNPs\n", length(used_snps)))
    
    # 5. Filter annotation file to keep only SNPs in LD scores
    annot_filtered <- annot_dt[annot_dt$SNP %in% used_snps, ]
    cat(sprintf("Annotation file after filtering: %d SNPs\n", nrow(annot_filtered)))
    
    # 6. Merge with MAF information if available
    if(has_frq) {
        annot_with_maf <- merge(annot_filtered, frq_dt, by = "SNP", all.x = TRUE)
        cat(sprintf("After merging with MAF: %d SNPs\n", nrow(annot_with_maf)))
    } else {
        annot_with_maf <- annot_filtered
    }
    
    # 7. Identify annotation columns (exclude standard columns)
    standard_cols <- c("CHR", "SNP", "BP", "CM", "A1", "A2")
    if(has_frq) {
        standard_cols <- c(standard_cols, "MAF")
    }
    
    annot_cols <- setdiff(names(annot_with_maf), standard_cols)
    
    # Add default annotation column if none exist
    if (length(annot_cols) == 0L) {
        annot_with_maf[, ANNOT := 1L]
        annot_cols <- "ANNOT"
        cat("No annotation columns found, added default ANNOT column\n")
    }
    
    cat(sprintf("Found %d annotation columns: %s\n", 
                length(annot_cols), paste(annot_cols, collapse=", ")))
    
    # 8. Calculate M values (all SNPs)
    M <- annot_with_maf[, lapply(.SD, sum, na.rm=TRUE), .SDcols = annot_cols]
    
    cat(sprintf("Total SNPs used for M calculation: %d\n", nrow(annot_with_maf)))
    
    # 9. Calculate M_5_50 values (MAF > 0.05 SNPs) if MAF data available
    if(has_frq) {
        annot_common <- annot_with_maf[!is.na(MAF) & MAF > 0.05, ]
        M_5_50 <- annot_common[, lapply(.SD, sum, na.rm=TRUE), .SDcols = annot_cols]
        cat(sprintf("Common SNPs (MAF>0.05) used for M_5_50: %d\n", nrow(annot_common)))
    }
    
    # 10. Write .l2.M file (ensure single row format)
    M_values <- as.numeric(M)
    writeLines(paste(M_values, collapse=" "), out_file)
    
    cat(sprintf("[SUCCESS] Wrote %s with M values: %s\n", 
                out_file, paste(M_values, collapse=" ")))
    
    # 11. Write .l2.M_5_50 file if MAF data available
    if(has_frq) {
        M_5_50_values <- as.numeric(M_5_50)
        writeLines(paste(M_5_50_values, collapse=" "), out_file_550)
        
        cat(sprintf("[SUCCESS] Wrote %s with M_5_50 values: %s\n", 
                    out_file_550, paste(M_5_50_values, collapse=" ")))
    } else {
        cat("[INFO] Skipped M_5_50 file generation (no MAF data available)\n")
    }
    
    cat("M file generation completed successfully!\n")

Munge Summary Statistics#

# The script munge_polyfun_sumstats.py automatically detects whether signed statistics 
# (Z, BETA/SE, etc.) are present and computes Z-scores if needed. 
[munge_sumstats_polyfun]
parameter: sumstats  = path
parameter: n       = 0
parameter: min_info = 0.6
parameter: min_maf  = 0.001
parameter: keep_hla = False
parameter: chi2_cut = 30
input: sumstats
output: f"{_input:n}.munged.parquet"
bash: expand=True, stderr=f'{_output:nn}.stderr', stdout=f'{_output:nn}.stdout'
    {python_exec} {polyfun_path}/munge_polyfun_sumstats.py \
        --sumstats {_input} \
        --out {_output} \
        {'--n {}'.format(n) if n>0 else ''} \
        {'--min-info {}'.format(min_info)} \
        {'--min-maf {}'.format(min_maf)} \
        {'--chi2-cutoff {}'.format(chi2_cut)} \
        {'--keep-hla' if keep_hla else ''} \
        --remove-strand-ambig

Calculate Functional Enrichment using Annotations#

[get_heritability]
# First read the traits file and create input paths
parameter: all_traits = [] 

import pandas as pd
import os

# Read traits and construct full paths
with open(all_traits_file, 'r') as f:
    all_traits = [os.path.join(sumstat_dir, line.strip()) for line in f if line.strip()]

input: all_traits, group_by = 1
output: f"{cwd}/{os.path.basename(_input[0])}.log", f"{cwd}/{os.path.basename(_input[0])}.results"
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output:bn}'

bash: expand = "${ }"
    trait="${os.path.basename(_input[0])}"
    output_dir="${cwd}"
    mkdir -p $output_dir

    # Check if frqfile_dir is provided and frq files exist (check a representative chromosome file)
    frq_file_check="${frqfile_dir}/${plink_name}22.frq"
    if [ -f "$frq_file_check" ]; then
        echo "Found frq files, using --frqfile-chr option"
        frq_option="--frqfile-chr ${frqfile_dir}/${plink_name}"
    else
        echo "No frq files found at ${frqfile_dir}/${plink_name}*.frq, using --not-M-5-50 option"
        frq_option="--not-M-5-50"
    fi

    # First attempt: original command
    ${python_exec} ${polyfun_path}/ldsc.py \
        --h2 ${sumstat_dir}/$trait \
        --ref-ld-chr ${target_anno_dir}/${annotation_name}.,${baseline_ld_dir}/${baseline_name} \
        --out ${cwd}/$trait \
        --overlap-annot \
        --w-ld-chr ${weights_dir}/${weight_name} \
        $frq_option \
        --print-coefficients \
        --print-delete-vals

    # Check if it failed with FloatingPointError
    if [ $? -ne 0 ] && grep -q "FloatingPointError\|invalid value encountered in sqrt" "${cwd}/$trait.log" 2>/dev/null; then
        echo "FloatingPointError detected, retrying with --chisq-max 30..."
        
        ${python_exec} ${polyfun_path}/ldsc.py \
            --h2 ${sumstat_dir}/$trait \
            --ref-ld-chr ${target_anno_dir}/${annotation_name}.,${baseline_ld_dir}/${baseline_name} \
            --out ${cwd}/$trait \
            --overlap-annot \
            --w-ld-chr ${weights_dir}/${weight_name} \
            $frq_option \
            --chisq-max 30 \
            --print-coefficients \
            --print-delete-vals
        
        # Check again if it still failed with FloatingPointError
        if [ $? -ne 0 ] && grep -q "FloatingPointError\|invalid value encountered in sqrt" "${cwd}/$trait.log" 2>/dev/null; then
            echo "Still FloatingPointError with --chisq-max 30, retrying with --chisq-max 20..."
            
            ${python_exec} ${polyfun_path}/ldsc.py \
                --h2 ${sumstat_dir}/$trait \
                --ref-ld-chr ${target_anno_dir}/${annotation_name}.,${baseline_ld_dir}/${baseline_name} \
                --out ${cwd}/$trait \
                --overlap-annot \
                --w-ld-chr ${weights_dir}/${weight_name} \
                $frq_option \
                --chisq-max 20 \
                --print-coefficients \
                --print-delete-vals
            
            # Check a third time if it still failed with FloatingPointError
            if [ $? -ne 0 ] && grep -q "FloatingPointError\|invalid value encountered in sqrt" "${cwd}/$trait.log" 2>/dev/null; then
                echo "Still FloatingPointError with --chisq-max 20, retrying with --chisq-max 10..."
                
                ${python_exec} ${polyfun_path}/ldsc.py \
                    --h2 ${sumstat_dir}/$trait \
                    --ref-ld-chr ${target_anno_dir}/${annotation_name}.,${baseline_ld_dir}/${baseline_name} \
                    --out ${cwd}/$trait \
                    --overlap-annot \
                    --w-ld-chr ${weights_dir}/${weight_name} \
                    $frq_option \
                    --chisq-max 10 \
                    --print-coefficients \
                    --print-delete-vals
                
                # Final check - if still failing, log the persistent error
                if [ $? -ne 0 ] && grep -q "FloatingPointError\|invalid value encountered in sqrt" "${cwd}/$trait.log" 2>/dev/null; then
                    echo "ERROR: FloatingPointError persists even with --chisq-max 10 for trait: $trait"
                    echo "This trait may have severe numerical instability issues in the summary statistics."
                fi
            fi
        fi
    fi
[processed_stats_1]
parameter: num_non_annotation_cols = -1 # number of non-annotation columns like CHR, BP, SNP, A1, A2, CM, MAF, default is calculate from pipeline, if provided, directly use this number

output: f'{cwd:a}/{annotation_name}.joint_tau.initial_processed_stats.rds' if joint_tau else f'{cwd:a}/{annotation_name}.single_tau.initial_processed_stats.rds'
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads

R: expand= "${ }", stderr = f'{_output[0]}.stderr', stdout = f'{_output[0]}.stdout'
    library(data.table)
    print(paste("Joint tau analysis:", ${str(joint_tau).upper()}))
    is_joint <- ${"TRUE" if joint_tau else "FALSE"}
    print(paste("Is joint analysis:", is_joint))
    provided_num_non_annotation_cols = ${num_non_annotation_cols}   
    standard_non_annot_cols <- c("CHR", "SNP", "BP", "CM", "A1", "A2", "MAF")

    # Function to auto-calculate Mref from ldscore files
    calculate_mref <- function(target_anno_dir) {
        message("Auto-calculating Mref from ldscore files...")
        
        # Find ldscore files (both .gz and .parquet formats)
        ldscore_files <- list.files(target_anno_dir, 
                                  pattern = "\\.l2\\.ldscore\\.(gz|parquet)$", 
                                  full.names = TRUE)
        
        if (length(ldscore_files) == 0) {
            stop("No ldscore files found in directory: ", target_anno_dir)
        }
        
        message("Found ", length(ldscore_files), " ldscore files:")
        
        total_snps <- 0
        file_details <- data.frame(
            file = character(),
            format = character(), 
            rows = integer(),
            stringsAsFactors = FALSE
        )
        
        for (file in ldscore_files) {
            tryCatch({
                if (endsWith(file, ".parquet")) {
                    suppressPackageStartupMessages(library(arrow))
                    data <- arrow::read_parquet(file)
                    file_format <- "parquet"
                } else if (endsWith(file, ".gz")) {
                    data <- fread(file)
                    file_format <- "gz"
                } else {
                    warning("Unsupported file format: ", file)
                    next
                }
                
                n_rows <- nrow(data)
                total_snps <- total_snps + n_rows
                
                file_details <- rbind(file_details, data.frame(
                    file = basename(file),
                    format = file_format,
                    rows = n_rows
                ))
                
                message("  ", basename(file), " (", file_format, "): ", n_rows, " SNPs")
                
            }, error = function(e) {
                warning("Error reading file ", file, ": ", e$message)
            })
        }
        
        message("Total SNPs across all files: ", total_snps)
        message("Using Mref = ", total_snps)
        
        return(total_snps)
    }
    
    # Determine Mref value
    mref_param <- ${Mref}
    if (mref_param > 0) {
        Mref <- mref_param
        message("Using provided Mref value: ", Mref)
    } else {
        Mref <- calculate_mref("${target_anno_dir}")
    }

    calculate_sd_annot <- function(cell_path, annot_index = 1) {
    ll <- list.files(cell_path, pattern = "\\.annot\\.gz$", full.names = TRUE)
    if(length(ll) == 0) {
        stop("No annotation files found in: ", cell_path)
    }

    # detect number of non-annotation cols 
    if(provided_num_non_annotation_cols > 0) {
        num_non_annotation_cols <- provided_num_non_annotation_cols
        message("Using provided num_non_annotation_cols: ", num_non_annotation_cols)
    } else {
        # calculate automatically
        first_file <- ll[1]
        sample_dat <- fread(first_file, nrows = 5)
        file_cols <- names(sample_dat)
        present_standard_cols <- intersect(file_cols, standard_non_annot_cols)
        num_non_annotation_cols <- length(present_standard_cols)
        
        message("Auto-detected annotation file structure:")
        message("  Total columns: ", length(file_cols))
        message("  Standard non-annotation columns found: ", paste(present_standard_cols, collapse=", "))
        message("  Number of non-annotation columns: ", num_non_annotation_cols)
    }
    
    num = numeric(length(annot_index))
    den = 0
    
    for(m in ll) {
        dat <- fread(m, fill = TRUE)
        if(length(annot_index) > 1) {
            cols <- paste0("ANNOT_", annot_index) 
            num <- num + ((nrow(dat)-1) * sapply(dat[, ..cols], var))
        } else {
            if((num_non_annotation_cols + annot_index) > ncol(dat)) stop(paste("Index out of range:", m))
            num <- num + ((nrow(dat)-1) * var(dat[, num_non_annotation_cols+annot_index, with=FALSE]))
        }
        den <- den + (nrow(dat)-1)
    }
    sqrt(num/den)
    }

    check_file_exists <- function(file_path) {
        if (!file.exists(file_path)) {
            warning(paste("File does not exist:", file_path))
            return(FALSE)
        }
        return(TRUE)
    }

    # Process single trait
    process_trait <- function(trait, cwd) {
    files <- list(
        result = paste0(cwd, "/", trait, ".results"),
        log = paste0(cwd, "/", trait, ".log"),
        delete = paste0(cwd, "/", trait, ".part_delete")
    )
    
    if(!all(sapply(files, file.exists))) {
        warning(paste("Missing files for trait:", trait))
        return(NULL)
    }

    tryCatch({
        results <- fread(files$result)
        h2g <- as.numeric(gsub(".*h2: (-?[0-9.]+).*", "\\1",
                            grep("Total Observed scale h2:", readLines(files$log), value=TRUE)))
        delete_values <- fread(files$delete)
        
        list(trait = trait, h2g = h2g, results = results, delete_values = delete_values)
    }, error = function(e) {
        warning(paste("Error processing trait:", trait, "-", e$message))
        NULL
    })
    }

    # Process tau analysis
    process_tau_analysis <- function(trait_data, sd_annots, base_index = NULL, is_joint = FALSE, Mref) {
    
    if(is_joint) {
        indices <- 1:(nrow(trait_data$results) - base_index)
        sc_matrices <- matrix(0, nrow=nrow(trait_data$delete_values), ncol=length(indices))
        
        for(i in seq_along(indices)) {
            coef <- sd_annots[i] * Mref / trait_data$h2g
            sc_matrices[,i] <- trait_data$delete_values[[indices[i]]] * coef
        }
        
        list(joint_stats = list(
            h2g = trait_data$h2g,
            sd_annots = sd_annots,
            sc_matrices = sc_matrices
        ))
    } else {
        coef <- sd_annots * Mref / trait_data$h2g
        sc_matrix <- trait_data$delete_values[[1]] * coef
        
        list(single_tau = list(
            h2g = trait_data$h2g,
            sd_annot = sd_annots,
            sc_matrix = matrix(as.vector(trait_data$delete_values[[1]] * coef), ncol=1)
        ))        
    }
    }

    # Process enrichment
    process_enrichment <- function(trait_data, Mref) {
    enrichment_p <- as.numeric(trait_data$results[1, "Enrichment_p"])
    enrich_ratio <- ((trait_data$results[1, "Prop._h2"] / trait_data$results[1, "Prop._SNPs"]) - 
                        (1 - trait_data$results[1, "Prop._h2"]) / (1 - trait_data$results[1, "Prop._SNPs"]))
    enrich_term <- trait_data$h2g / Mref * enrich_ratio
    
    list(
        enrichment_summary = data.table(
            Enrichment = trait_data$results[1, "Enrichment"],
            Enrichment_std_error = trait_data$results[1, "Enrichment_std_error"],
            Prop._h2 = trait_data$results[1, "Prop._h2"],
            Prop._SNPs = trait_data$results[1, "Prop._SNPs"],
            Enrichment_p = enrichment_p
        ),
        meta_enrstat = list(
            enrich_stat = as.numeric(enrich_term),
            enrich_z = as.numeric(qnorm(enrichment_p / 2)),
            enrich_sd = as.numeric(enrich_term / qnorm(enrichment_p / 2))
        ),
        meta_enr = list(
            Enrichment = as.numeric(trait_data$results[1, "Enrichment"]),
            Enrichment_std_error = as.numeric(trait_data$results[1, "Enrichment_std_error"]) 
        )
    )
    }

    # Main analysis
    traits <- scan("${all_traits_file}", what = "character")
    results <- list()
    problematic_traits <- c()

    if (is_joint) {
    # Get baseline info and indices
    base_path <- paste0("${baseline_ld_dir}", "/", "${baseline_name}", "22.annot")
    if (file.exists(paste0(base_path, ".parquet"))) {
        base <- arrow::read_parquet(paste0(base_path, ".parquet"))
    } else if (file.exists(paste0(base_path, ".gz"))) {
        base <- fread(paste0(base_path, ".gz"))
    } else {
        stop("Neither .parquet nor .gz file found for baseline annotation")
    }    

    if(provided_num_non_annotation_cols > 0) {
        base_num_non_annotation_cols <- provided_num_non_annotation_cols
        message("Using provided num_non_annotation_cols for baseline: ", base_num_non_annotation_cols)
    } else {
        base_cols <- names(base)
        base_standard_cols <- intersect(base_cols, standard_non_annot_cols)
        base_num_non_annotation_cols <- length(base_standard_cols)
        message("Auto-detected baseline structure:")
        message("  Total columns: ", ncol(base))
        message("  Non-annotation columns: ", base_num_non_annotation_cols)
    }

    base_index <- ncol(base) - base_num_non_annotation_cols
    
    tab2 <- fread(paste0("${cwd}", "/", traits[1], ".results"))
    indices <- 1:(nrow(tab2) - base_index)
    
    sd_annots <- calculate_sd_annot("${target_anno_dir}", indices)
    
    for (trait in traits) {
        if (!is.null(trait_data <- process_trait(trait, "${cwd}"))) {
            results[[trait]] <- process_tau_analysis(trait_data, sd_annots, base_index, TRUE, Mref)
        } else {
            problematic_traits <- c(problematic_traits, trait)
        }
    }
    } else {
    sd_annot <- calculate_sd_annot("${target_anno_dir}")
    
    for (trait in traits) {
        if (!is.null(trait_data <- process_trait(trait, "${cwd}"))) {
            results[[trait]] <- c(
                process_tau_analysis(trait_data, sd_annot, NULL, FALSE, Mref),
                list(enrichment = process_enrichment(trait_data, Mref))
            )
        } else {
            problematic_traits <- c(problematic_traits, trait)
        }
    }
    }

    if (length(problematic_traits) > 0) {
    warning("Problematic traits:", paste(problematic_traits, collapse=", "))
    }

    saveRDS(results, "${_output[0]}", compress = "xz")
[processed_stats_2]
parameter: trait_group_paths = []     
parameter: trait_group_names = []
input: group_by = "all" 
output: f'{cwd:a}/{step_name}/{"joint_tau" if joint_tau else "single_tau"}.{annotation_name}.meta_processed_stats.rds'

task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads

R: expand = '${ }', stderr = f'{_output[0]}.stderr', stdout = f'{_output[0]}.stdout'
  library(data.table)
  library(rmeta)

  processed_path = ${_input:ar}
  results_data <- readRDS(processed_path)
  is_joint <- ${str(joint_tau).upper()}
  
  process_paths <- function(input_str) {
      cleaned <- gsub("^\\[|\\]$", "", input_str)
      cleaned <- gsub("'", "", cleaned)
      paths <- strsplit(cleaned, ",\\s*|\\s+")[[1]]
      paths <- trimws(paths)
      return(paths)
  }

  group_paths <- process_paths("${trait_group_paths}")
  group_names <- process_paths("${trait_group_names}")
  problematic_traits <- list()
  joint_names <- process_paths("${joint_annotation_names}")
  if (is_joint) {
      joint_tau_results <- list()
      
      for (i in 1:length(group_paths)) {
          traits <- readLines(group_paths[i])
          group_stats <- list()
          
          for (trait in traits) {
              if(trait %in% names(results_data) && !is.null(results_data[[trait]]$joint_stats)) {
                  group_stats[[trait]] <- results_data[[trait]]$joint_stats
              } else {
                  warning(paste("Invalid data for trait:", trait))
                  problematic_traits[[trait]] <- "Invalid data"
              }
          }
          
          if(length(group_stats) > 0) {
              n_annotations <- ncol(group_stats[[1]]$sc_matrices)
              meta_result <- matrix(0, nrow = n_annotations, ncol = 3)
              colnames(meta_result) <- c("Mean", "SD", "P")
              if(length(joint_names) > 0) {
                  rownames(meta_result) <- joint_names  # Add row names
              }              
              for(j in 1:n_annotations) {
                  df <- matrix(0, nrow = length(group_stats), ncol = 2)
                  for(k in 1:length(group_stats)) {
                      sc_values <- group_stats[[k]]$sc_matrices[,j]
                      df[k,] <- c(mean(sc_values), sqrt(199^2/200 * var(sc_values)))
                  }
                  
                  meta <- meta.summaries(df[,1], df[,2], method = "random")
                  meta_result[j,] <- c(
                      meta$summary, 
                      meta$se.summary,
                      2 * pnorm(-abs(meta$summary/meta$se.summary))
                  )
              }
              joint_tau_results[[group_names[i]]] <- meta_result
          }
      }
      saveRDS(joint_tau_results, '${_output[0]}')
      
  } else {
      tau_results <- list()
      enrichment_results <- list()
      
      for (i in 1:length(group_paths)) {
          traits <- readLines(group_paths[i])
          
          group_stats <- Filter(function(x) {
              !is.null(x) && !is.null(x$single_tau$sc_matrix)
          }, results_data[traits])
          
          if (length(group_stats) > 0) {
              sc_data <- lapply(group_stats, function(x) {
                  mean_sc <- mean(x$single_tau$sc_matrix[, 1])
                  se_sc <- sqrt(199^2 / 200 * var(x$single_tau$sc_matrix[, 1]))
                  return(c(mean_sc, se_sc))
              })
              sc_matrix <- do.call(rbind, sc_data)
              
              meta_tau <- meta.summaries(sc_matrix[, 1], sc_matrix[, 2], method = "random")
              tau_results[[group_names[i]]] <- c(meta_tau$summary, 
                                              meta_tau$se.summary,
                                              2 * pnorm(-abs(meta_tau$summary / meta_tau$se.summary)))
              
              enr_data <- lapply(group_stats, function(x) {
                  return(c(x$enrichment$meta_enr$Enrichment,
                          x$enrichment$meta_enr$Enrichment_std_error))
              })
         
              enr_matrix <- do.call(rbind, enr_data)
              valid_rows <- !is.na(enr_matrix[,1]) & !is.na(enr_matrix[,2]) & 
                          !is.nan(enr_matrix[,1]) & !is.nan(enr_matrix[,2]) &
                          enr_matrix[,2] > 0               
              trait_names <- names(group_stats)
              invalid_traits <- trait_names[!valid_rows]

              message(sprintf("Group %s - Total: %d, Valid: %d, Invalid: %d", 
                              group_names[i], nrow(enr_matrix), sum(valid_rows), sum(!valid_rows))) 
              if(sum(!valid_rows) > 0) {
                  invalid_traits <- trait_names[!valid_rows]
                  invalid_se_values <- enr_matrix[!valid_rows, 2]
                
                  message(sprintf("Invalid traits in group %s (SE = 0):", group_names[i]))
                  for(j in 1:length(invalid_traits)) {
                      se_val <- invalid_se_values[j]
                      if(is.na(se_val) || is.nan(se_val)) {
                          message(sprintf("  - %s: SE = NA/NaN", invalid_traits[j]))
                      } else if(se_val == 0) {
                          message(sprintf("  - %s: SE = 0", invalid_traits[j]))
                      } 
                  }
              }                                                       
              enrstat_data <- lapply(group_stats, function(x) {
                  return(c(x$enrichment$meta_enrstat$enrich_stat, x$enrichment$meta_enrstat$enrich_sd))})
              enrstat_matrix <- do.call(rbind, enrstat_data)

              if(sum(valid_rows) >= 2) {
                  meta_enr <- meta.summaries(enr_matrix[valid_rows, 1], 
                                          enr_matrix[valid_rows, 2], 
                                          method = "random")
              } else {
                  warning(paste("Insufficient valid data for", group_names[i]))
                  meta_enr <- list(summary = NA, se.summary = NA)
              }              
              meta_enrstat <- meta.summaries(enrstat_matrix[, 1], enrstat_matrix[, 2], method = "random")   
              enrichment_results[[group_names[i]]] <- c(meta_enr$summary,
                                                      meta_enr$se.summary,
                                                       2 * pnorm(-abs(meta_enrstat$summary / meta_enrstat$se.summary)))
          } else {
              warning(paste("No valid data for group:", group_names[i]))
          }
      }


                
      tau_df <- do.call(rbind, tau_results)
      enrichment_df <- do.call(rbind, enrichment_results)
      
      colnames(tau_df) <- c("Mean", "SD", "P")
      colnames(enrichment_df) <- c("Mean", "SD", "P")
      rownames(tau_df) <- group_names
      rownames(enrichment_df) <- group_names
      
      results <- list(tau = tau_df, enrichment = enrichment_df)
      saveRDS(results, '${_output[0]}')
  }

  if (length(problematic_traits) > 0) {
      warning("The following traits had issues:")
      print(problematic_traits)
  }