Integrative Analysis Output Processing#

This notebook is to post-process the susie results into different text file

Extracting susie results#

sos run pipeline/SuSiE_post_processing.ipynb susie_to_tsv \
    --cwd output/test --rds_path `ls output/test/cache/*rds | head ` --region-list <(head -50  ./dlpfc_region_list) --container containers/stephenslab.sif 

Extracting susie_rss results for ADGWAS#

sos run pipeline/SuSiE_post_processing.ipynb susie_to_tsv \
    --cwd output/ADGWAS_finemapping_extracted/Bellenguez/ --rds_path `ls GWAS_Finemapping_Results/Bellenguez/ADGWAS2022*rds ` \
    --region-list ~/1300_hg38_EUR_LD_blocks_orig.tsv \
    --container containers/stephenslab.sif 

sos run pipeline/SuSiE_post_processing.ipynb susie_tsv_collapse \
    --cwd output/ADGWAS_finemapping_extracted --tsv_path `ls output/ADGWAS_finemapping_extracted/*lbf.tsv` \
    --container containers/stephenslab.sif 

Extracting fsusie results#

sos run pipeline/SuSiE_post_processing.ipynb fsusie_to_tsv \
    --cwd output/f_susie_tad_haQTL_pos --rds_path `ls output/f_susie_tad_haQTL_pos/cache/*rds ` \
    --region-list ../eqtl/dlpfc_tad_list \
    --container containers/stephenslab.sif -s build
sos run pipeline/SuSiE_post_processing.ipynb fsusie_to_tsv \
    --cwd output/f_susie_tad_meQTL_pos_selected/ --rds_path `ls output/f_susie_tad_meQTL_pos_selected//cache/*1204*rds ` \
    --region-list ../eqtl/dlpfc_tad_list \
    --container containers/stephenslab.sif -s build
sos run pipeline/SuSiE_post_processing.ipynb fsusie_to_tsv \
    --cwd output/f_susie_tad_meQTL_pos_2/ --rds_path `ls output/f_susie_tad_meQTL_pos_2//cache/*rds ` \
    --region-list ../eqtl/dlpfc_tad_list \
    --container containers/stephenslab.sif -s build
sos run pipeline/SuSiE_post_processing.ipynb fsusie_to_tsv \
    --cwd output/f_susie_tad_meQTL_pos/ --rds_path `ls output/f_susie_tad_meQTL_pos//cache/*rds ` \
    --region-list ../eqtl/dlpfc_tad_list \
    --container containers/stephenslab.sif -s build
sos run pipeline/SuSiE_post_processing.ipynb fsusie_to_tsv \
    --cwd output/f_susie_tad_haQTL_pos_check_pure_2 --rds_path `ls output/f_susie_tad_haQTL_pos_check_pure_2/cache/*rds ` \
    --region-list ../eqtl/dlpfc_tad_list \
    --container containers/stephenslab.sif -s build
sos run pipeline/SuSiE_post_processing.ipynb fsusie_to_tsv \
    --cwd output/f_susie_tad_meQTL_pos_2/ --rds_path `ls output/f_susie_tad_meQTL_pos_2//cache/*rds ` \
    --region-list ../eqtl/dlpfc_tad_list \
    --container containers/stephenslab.sif -s build

Plotting the pip plot#

sos run pipeline/SuSiE_post_processing.ipynb susie_pip_landscape_plot \
    --cwd output/test/ --plot_list plot_recipe --annot_tibble ~/Annotatr_builtin_annotation_tibble.tsv -s force &
sos run pipeline/SuSiE_post_processing.ipynb susie_upsetR_plot \
    --cwd output/test/ --plot_list plot_recipe_1  -s force &

The required input for this step is a tab-delimited plot_recipe file that specifies the path to each of the variant.tsv files generated from this module. Each column represents a molecular phenotype, and each row indicates the files that share common variants. Since one TAD may correspond to multiple genes, additional eQTL are permitted. If there are additional molecular phenotypes or ADGWAS datasets, additional columns can be appended.

The built-in Annotatr_builtin_annotation_tibble.tsv can be downloaded from synapse, please download it and specify the path.

cat /mnt/vast/hpc/csg/molecular_phenotype_calling/QTL_fine_mapping/plot_recipe
haQTL	mQTL	eQTL	eQTL	ADGWAS
/mnt/vast/hpc/csg/molecular_phenotype_calling/QTL_fine_mapping/output/f_susie_tad_meQTL_pos//meQTL.yuqi_mqtl.tad100.uni_Fsusie.mixture_normal_per_scale.variant.tsv	/mnt/vast/hpc/csg/molecular_phenotype_calling/QTL_fine_mapping/output/f_susie_tad_haQTL_pos//haQTL.rosmap_haqtl.tad100.uni_Fsusie.mixture_normal_per_scale.variant.tsv	/mnt/vast/hpc/csg/molecular_phenotype_calling/eqtl//output/susie_per_gene_tad//demo.ENSG00000117322.unisusie.fit.variant.tsv	/mnt/vast/hpc/csg/molecular_phenotype_calling/eqtl//output/susie_per_gene_tad//demo.ENSG00000203710.unisusie.fit.variant.tsv	/mnt/vast/hpc/csg/xqtl_workflow_testing/susie_rss/output/ADGWAS_finemapping_extracted/Bellenguez/ADGWAS2022.chr1.sumstat.chr1_205972031_208461272.unisusie_rss.fit.variant.tsv

Exporting cis_analysis susie_twas results#

meta file is produced by:

get_condition <- function(conditions, Author, qtl_type){
    strings <- c()
    for(condition in conditions){
        string =  paste(condition, Author, qtl_type, sep = "_")  
        string = paste(unique(unlist(strsplit(string, "_"))), collapse = "_")
        strings <- c(strings, string)
    }
    return(strings)
}

raw_name<- c("Mic","Ast","Oli","OPC","Exc","Inh","DLPFC","PCC","AC")
raw_name_kellis<- c("Mic_Kellis","Ast_Kellis","Oli_Kellis","OPC_Kellis","Exc_Kellis","Inh_Kellis","Ast.10","Mic.12","Mic.13")

dejager_name <- get_condition(raw_name, "De_Jager","eQTL")
kellis_name <- get_condition(raw_name_kellis, "Kellis","eQTL")
eQTL_meta <- data.frame(raw_name = c(raw_name, raw_name_kellis), new_name = c(dejager_name, kellis_name))
write_delim(eQTL_meta, "/mnt/vast/hpc/csg/rf2872/Work/Multivariate/susie_2024_new/eQTL_meta.tsv")
head /mnt/vast/hpc/csg/rf2872/Work/Multivariate/susie_2024_new/eQTL_meta.tsv
raw_name new_name
Mic Mic_De_Jager_eQTL
Ast Ast_De_Jager_eQTL
Oli Oli_De_Jager_eQTL
OPC OPC_De_Jager_eQTL
Exc Exc_De_Jager_eQTL
Inh Inh_De_Jager_eQTL
DLPFC DLPFC_De_Jager_eQTL
PCC PCC_De_Jager_eQTL
AC AC_De_Jager_eQTL
#susie
sos run /home/rf2872/codes/xqtl-pipeline/pipeline/fine_mapping_post_processing.ipynb  cis_results_export    \
    --region_file /mnt/vast/hpc/homes/rf2872/codes/xqtl-analysis/resource/TADB_enhanced_cis.protein_coding.bed   \
    --file_path  /mnt/vast/hpc/homes/rf2872/aws/rds_files   \
    --name demo_susie   \
    --suffix univariate_susie_twas_weights.rds   \
    --prefix  MiGA_eQTL KNIGHT_pQTL  \
    --min_corr 0.8 \
    --geno_ref /mnt/vast/hpc/csg/rf2872/data/Fungen_xqtl/geno_align/Fungen_xQTL.ROSMAP_NIA_WGS.ROSMAP_NIA_WGS.MSBB_WGS_ADSP_hg38.MiGA.MAP_Brain-xQTL_Gwas_geno_0.STARNET.aligned.bim.gz  \
    --context-meta /mnt/vast/hpc/homes/rf2872/codes/xqtl-analysis/resource/context_meta.tsv  \
    --cwd demo_susie \
    --step1_only #optional to keep cache file
#fsusie
sos run /home/rf2872/codes/xqtl-pipeline/pipeline/fine_mapping_post_processing.ipynb  cis_results_export    \
    --region_file /mnt/vast/hpc/homes/rf2872/codes/xqtl-analysis/resource/extended_TADB.bed  \
    --file_path  /mnt/vast/hpc/homes/rf2872/aws/rds_files     \
    --name demo_fsusie   \
    --suffix fsusie_mixture_normal_top_pc_weights.rds    \
    --prefix  ROSMAP_mQTL ROSMAP_haQTL  \
    --min_corr 0.8  \
    --geno_ref /mnt/vast/hpc/csg/rf2872/data/Fungen_xqtl/geno_align/Fungen_xQTL.ROSMAP_NIA_WGS.ROSMAP_NIA_WGS.MSBB_WGS_ADSP_hg38.MiGA.MAP_Brain-xQTL_Gwas_geno_0.STARNET.aligned.bim.gz  \
    --context-meta /mnt/vast/hpc/homes/rf2872/codes/xqtl-analysis/resource/context_meta.tsv   \
    --cwd demo_fsusie \
    --fsusie \
    --step1_only #optional to keep cache file
#meta QTL
sos run /home/rf2872/codes/xqtl-pipeline/pipeline/fine_mapping_post_processing.ipynb  cis_results_export      \
    --region_file /mnt/vast/hpc/homes/rf2872/codes/xqtl-analysis/resource/hg38_1362_blocks.bed     \
    --file_path  /mnt/vast/hpc/homes/rf2872/aws/rds_files       \
    --name demo_metaQTL     \
    --suffix univariate_susie_twas_weights.rds        \
    --prefix  ROSMAP_metaQTL  \
    --min_corr 0.8   \
    --geno_ref /mnt/vast/hpc/csg/rf2872/data/Fungen_xqtl/geno_align/Fungen_xQTL.ROSMAP_NIA_WGS.ROSMAP_NIA_WGS.MSBB_WGS_ADSP_hg38.MiGA.MAP_Brain-xQTL_Gwas_geno_0.STARNET.aligned.bim.gz    \
    --context-meta /mnt/vast/hpc/homes/rf2872/codes/xqtl-analysis/resource/context_meta.tsv    \
    --cwd demo_metaQTL \
    --metaQTL \
    --step1_only #optional to keep cache file
# multi gene mvsusie
# multi gene mvsusie
    sos run /data/interactive_analysis/rf2872/codes/Jan/xqtl-pipeline/pipeline/fine_mapping_post_processing.ipynb  cis_results_export  \
    --region_file  /data/xqtl-analysis/resource/extended_TADB.bed  \
    --file_path /data/analysis_result/mnm/ROSMAP/mnm_genes/   \
    --name ROSMAP_AC_DeJager_eQTL   \
    --suffix multigene_bvsr.rds   \
    --prefix  ROSMAP_AC_DeJager_eQTL \
    --min_corr 0.8 \
    --geno_ref  /data/resource/geno_align/Fungen_xQTL.ROSMAP_NIA_WGS.ROSMAP_NIA_WGS.MSBB_WGS_ADSP_hg38.MiGA.MAP_Brain-xQTL_Gwas_geno_0.STARNET.aligned.bim.gz \
    --cwd demo_multigene \
    --mnm \
    --region-name  chr10_100845599_104855543 \#optional for testing with one region
    --step1_only #optional to keep cache file

Export gwas data#

 sos run /home/rf2872/codes/xqtl-pipeline/pipeline/fine_mapping_post_processing.ipynb  gwas_results_export      \
    --region_file /mnt/vast/hpc/homes/rf2872/codes/xqtl-analysis/resource/hg38_1362_blocks.bed     \
    --file_path  /home/hs3393/RSS_QC/GWAS_finemapping_Apr9/univariate_rss      \
    --name demo_gwas   \
    --suffix  univariate_susie_rss.rds       \
    --prefix  RSS_QC_RAISS_imputed   \
    --min_corr 0.8    \
    --geno_ref /mnt/vast/hpc/csg/rf2872/data/Fungen_xqtl/geno_align/Fungen_xQTL.ROSMAP_NIA_WGS.ROSMAP_NIA_WGS.MSBB_WGS_ADSP_hg38.MiGA.MAP_Brain-xQTL_Gwas_geno_0.STARNET.aligned.bim.gz    \
    --context-meta /mnt/vast/hpc/homes/rf2872/codes/xqtl-analysis/resource/context_meta.tsv    \
    --cwd demo_gwas \
    --gwas \
    --region-name chr9_19882538_22992379  \#optional 
    --step1_only #optional 

combine seperate meta file#

sos run /home/rf2872/codes/xqtl-pipeline/pipeline/fine_mapping_post_processing.ipynb   combine_export_meta \
   --cache_path demo_susie_back/demo_susie_cache \
   --output_file test_combine

Overlapped gwas data and eQTL data#

sos run /home/rf2872/codes/xqtl-pipeline/pipeline/fine_mapping_post_processing.ipynb   overlap_qtl_gwas \
    --name demo_overlap \
    --qtl_meta_path /mnt/vast/hpc/csg/rf2872/Work/Multivariate/susie_2024_new/demo_susie/demo_susie.cis_results_db.tsv \
    --gwas_meta_path /mnt/vast/hpc/csg/rf2872/Work/Multivariate/susie_2024_new/demo_gwas/demo_gwas.block_results_db.tsv \
    --cwd demo_overlap

Export all top loci#

sos run /home/rf2872/codes/xqtl-pipeline/pipeline/fine_mapping_post_processing.ipynb   export_top_loci \
--export_path /home/ubuntu/demo_singlecontext/ \
--region ENSG00000197106 \
--prefix ROSMAP_DeJager \
--suffix  cis_results_db.export.rds
[global]
import glob
import pandas as pd
# A region list file documenting the chr_pos_ref_alt of a susie_object
parameter: cwd = path("output")
parameter: name = "demo"

## Path to work directory where output locates
## Containers that contains the necessary packages
parameter: container = ""
import re
parameter: entrypoint= ('micromamba run -a "" -n' + ' ' + re.sub(r'(_apptainer:latest|_docker:latest|\.sif)$', '', container.split('/')[-1])) if container else ""
# For cluster jobs, number commands to run per job
parameter: job_size = 50
# Wall clock time expected
parameter: walltime = "96h"
# Memory expected
parameter: mem = "6G"
# Number of threads
parameter: numThreads = 2
parameter: windows = 1000000
# use this function to edit memory string for PLINK input
from sos.utils import expand_size
[susie_to_tsv_1]
# Input
# For complete susie, region_list or tad_list, for susie_rss , LD region list 
parameter: region_list = path
region_tbl = pd.read_csv(region_list,sep = "\t")
parameter: rds_path = paths
input: rds_path, group_by = 1
output: f"{cwd}/{_input:bn}.variant.tsv",f"{cwd}/{_input:bn}.lbf.tsv",f"{cwd}/{_input:bn}.effect.tsv"
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output[0]:bn}'
R: expand = '${ }', stdout = f"{_output[0]:nn}.stdout", stderr = f"{_output[0]:nn}.stderr", container = container, entrypoint = entrypoint
    library("dplyr")
    library("tibble")
    library("purrr")
    library("tidyr")
    library("readr")
    library("stringr")
    library("susieR")
    extract_lbf = function(susie_obj){
    
    if("variants" %in% names(susie_obj) ){
    ss_bf = tibble(snps = susie_obj$variants, cs_index = ifelse(is.null(susie_obj$sets$cs_index), 0, paste0(susie_obj$sets$cs_index,collapse =",")),names = "${f'{_input:br}'.split('.')[-4]}")
    }
      else 
      {
    ss_bf = tibble(snps = susie_obj$variable_name, cs_index = ifelse(is.null(susie_obj$sets$cs_index), 0, paste0(susie_obj$sets$cs_index,collapse =",")),names = "${_input:bnnn}")
     }
    
    ss_bf = ss_bf%>%cbind(susie_obj$lbf_variable%>%t)%>%as_tibble()
    
    return(ss_bf)
    }
    
    extract_variants_pip = function(susie_obj,region_list){
    susie_tb = tibble( variants =  names(susie_obj$pip)[which( susie_obj$pip >= 0)],
                           snps_index = which(( susie_obj$pip >= 0))) %>%
        mutate(chromosome = map_chr(variants, ~read.table(text = .x, sep = ":")$V1%>%str_replace("chr","") ),
                position  = map_chr(variants, ~read.table(text = .x, sep = ":")$V2  ),
                ref = map_chr(position , ~read.table(text = .x, sep = "_",colClasses = "character")$V2  ),
                alt = map_chr(position , ~read.table(text = .x, sep = "_",colClasses = "character")$V3  ),
                position  = map_dbl(position , ~read.table(text = .x, sep = "_",as.is = T)$V1  )
                             )
      susie_tb = susie_tb%>%mutate(cs_order =(map(susie_tb$snps_index , ~tryCatch(which(pmap(list( a= susie_obj$sets$cs) , function(a) .x %in% a )%>%unlist()), error = function(e) return(0) )  ))%>%as.character%>%str_replace("integer\\(0\\)","0"),
                         cs_id = map_chr(cs_order,~ifelse(.x =="0", "None" ,names(susie_obj$sets$cs)[.x%>%str_split(":")%>%unlist%>%as.numeric] ) ),
                         log10_base_factor = map_chr(snps_index,~paste0( susie_obj$lbf_variable[,.x],  collapse = ";")),
                         pip = susie_obj$pip,
                         posterior_mean = coef.susie(susie_obj)[-1],
                         posterior_sd = susie_get_posterior_sd(susie_obj),
                         z = posterior_mean/posterior_sd)
    
          susie_tb =  susie_tb%>%mutate(  molecular_trait_id = region_list$molecular_trait_id,
                             finemapped_region_start = region_list$finemapped_region_start,
                             finemapped_region_end = region_list$finemapped_region_end)
          return(susie_tb)    }
          
        

     extract_effect_pip = function(susie_obj,region_list,susie_tb){
      result_tb =  tibble(phenotype = susie_obj$name,
        V = susie_obj$V,effect_id = paste0("L",1:length(V) ) ,
        cs_log10bf = susie_obj$lbf)
        if(is.null(susie_obj$sets$cs)){
            cs_min_r2 = cs_avg_r2 =  coverage =  0 
            cs = "None"} else {         cs = map_chr(susie_obj$sets$cs[result_tb$effect_id],~susie_tb$variants[.x]%>%paste0(collapse = ";"))
        coverage = map(result_tb$effect_id, ~susie_obj$sets$coverage[which(names(susie_obj$sets$cs) == .x )])%>%as.numeric%>%replace_na(0)
        cs_min_r2  = (susie_obj$sets$purity[result_tb$effect_id,1])%>%as.numeric%>%replace_na(0)  
        cs_avg_r2  = (susie_obj$sets$purity[result_tb$effect_id,2])%>%as.numeric%>%replace_na(0) }
        result_tb = result_tb%>%mutate(cs_min_r2 = cs_min_r2,cs_avg_r2 = cs_avg_r2 ,coverage = coverage%>%unlist,cs = cs )            
      return(result_tb)
      }
       
  
    susie_obj = readRDS("${_input:a}")
    if("variants" %in% names(susie_obj) ){susie_obj$variants = susie_obj$variants%>%str_replace("_",":")}
    if(is.null(names(susie_obj$pip ))){names(susie_obj$pip) = susie_obj$variants}
    lbf = extract_lbf(susie_obj)
    region_list = read_delim("${region_list}","\t")
    if(ncol(region_list) == 3 ){   region_list =  region_list%>%mutate(`#chr` = `#chr`%>%str_remove_all(" ") , ID = paste0(`#chr`,"_",start,"_",end) ) } # LD_list 
    if(region_list$start[1] - region_list$end[1]  == -1 ){ 
        region_list = region_list%>%mutate( start = start - ${windows} ,end = start +${windows}) # region_list for fix cis windows  
          } 
      if("gene_id" %in% colnames(region_list)){region_list = region_list%>%mutate(ID = gene_id)  } # region_list for gene
    region_list = region_list%>%select(molecular_trait_id = ID, chromosome  = `#chr`,finemapped_region_start = start ,finemapped_region_end = end)  # Formatting
    region_list = region_list%>%filter(molecular_trait_id == "${f'{_input:br}'.split('.')[-4]}")
    variants_pip = extract_variants_pip( susie_obj , region_list)
    effect_pip = extract_effect_pip( susie_obj , region_list,variants_pip)
    lbf%>%write_delim("${_output[1]}","\t")
    variants_pip%>%write_delim("${_output[0]}","\t")
    effect_pip%>%write_delim("${_output[2]}","\t")
[sQTL_susie_to_tsv_1]
# Input
# For complete susie, region_list or tad_list, for susie_rss , LD region list 
parameter: region_list = path
region_tbl = pd.read_csv(region_list,sep = "\t")
parameter: rds_path = paths
input: rds_path, group_by = 1
input_name=f"{_input:bn}"
input_name=input_name.replace('*', 'N') # "*" in leafcutter2 would be ignored in shell and cause error 
output: f"{cwd}/{input_name}.variant.tsv",f"{cwd}/{input_name}.lbf.tsv",f"{cwd}/{input_name}.effect.tsv"
tags = f'{step_name}_{_output[0]:bn}'
tags = tags.replace(':', '_').replace('+', 'ps') # also for other symbols in tag id 
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = tags
R: expand = '${ }', stdout = f"{_output[0]:nn}.stdout", stderr = f"{_output[0]:nn}.stderr", container = container
    library("dplyr")
    library("tibble")
    library("purrr")
    library("tidyr")
    library("readr")
    library("stringr")
    library("susieR")
    extract_lbf = function(susie_obj){
    
    if("variants" %in% names(susie_obj) ){
    ss_bf = tibble(snps = susie_obj$variants, cs_index = ifelse(is.null(susie_obj$sets$cs_index), 0, paste0(susie_obj$sets$cs_index,collapse =",")),names = "${f'{_input:br}'.split('.')[-4]}")
    }
      else 
      {
    ss_bf = tibble(snps = susie_obj$variable_name, cs_index = ifelse(is.null(susie_obj$sets$cs_index), 0, paste0(susie_obj$sets$cs_index,collapse =",")),names = "${_input:bnnn}")
     }
    
    ss_bf = ss_bf%>%cbind(susie_obj$lbf_variable%>%t)%>%as_tibble()
    
    return(ss_bf)
    }
    
    extract_variants_pip = function(susie_obj,region_list){
    susie_tb = tibble( variants =  names(susie_obj$pip)[which( susie_obj$pip >= 0)],
                           snps_index = which(( susie_obj$pip >= 0))) %>%
        mutate(chromosome = map_chr(variants, ~read.table(text = .x, sep = ":")$V1%>%str_replace("chr","") ),
                position  = map_chr(variants, ~read.table(text = .x, sep = ":")$V2  ),
                ref = map_chr(position , ~read.table(text = .x, sep = "_",colClasses = "character")$V2  ),
                alt = map_chr(position , ~read.table(text = .x, sep = "_",colClasses = "character")$V3  ),
                position  = map_dbl(position , ~read.table(text = .x, sep = "_",as.is = T)$V1  )
                             )
      susie_tb = susie_tb%>%mutate(cs_order =(map(susie_tb$snps_index , ~tryCatch(which(pmap(list( a= susie_obj$sets$cs) , function(a) .x %in% a )%>%unlist()), error = function(e) return(0) )  ))%>%as.character%>%str_replace("integer\\(0\\)","0"),
                         cs_id = map_chr(cs_order,~ifelse(.x =="0", "None" ,names(susie_obj$sets$cs)[.x%>%str_split(":")%>%unlist%>%as.numeric] ) ),
                         log10_base_factor = map_chr(snps_index,~paste0( susie_obj$lbf_variable[,.x],  collapse = ";")),
                         pip = susie_obj$pip,
                         posterior_mean = coef.susie(susie_obj)[-1],
                         posterior_sd = susie_get_posterior_sd(susie_obj),
                         z = posterior_mean/posterior_sd)
    
          susie_tb =  susie_tb%>%mutate(  molecular_trait_id = region_list$molecular_trait_id,
                             finemapped_region_start = region_list$finemapped_region_start,
                             finemapped_region_end = region_list$finemapped_region_end)
          return(susie_tb)    }
          
        

     extract_effect_pip = function(susie_obj,region_list,susie_tb){
      result_tb =  tibble(phenotype = susie_obj$name,
        V = susie_obj$V,effect_id = paste0("L",1:length(V) ) ,
        cs_log10bf = susie_obj$lbf)
        if(is.null(susie_obj$sets$cs)){
            cs_min_r2 = cs_avg_r2 =  coverage =  0 
            cs = "None"} else {         cs = map_chr(susie_obj$sets$cs[result_tb$effect_id],~susie_tb$variants[.x]%>%paste0(collapse = ";"))
        coverage = map(result_tb$effect_id, ~susie_obj$sets$coverage[which(names(susie_obj$sets$cs) == .x )])%>%as.numeric%>%replace_na(0)
        cs_min_r2  = (susie_obj$sets$purity[result_tb$effect_id,1])%>%as.numeric%>%replace_na(0)  
        cs_avg_r2  = (susie_obj$sets$purity[result_tb$effect_id,2])%>%as.numeric%>%replace_na(0) }
        result_tb = result_tb%>%mutate(cs_min_r2 = cs_min_r2,cs_avg_r2 = cs_avg_r2 ,coverage = coverage%>%unlist,cs = cs )            
      return(result_tb)
      }
       
  
    susie_obj = readRDS("${_input:a}")
    if("variants" %in% names(susie_obj) ){susie_obj$variants = susie_obj$variants%>%str_replace("_",":")}
    if(is.null(names(susie_obj$pip ))){names(susie_obj$pip) = susie_obj$variants}
    lbf = extract_lbf(susie_obj)
    region_list = read_delim("${region_list}","\t")
    if(ncol(region_list) == 3 ){   region_list =  region_list%>%mutate(`#chr` = `#chr`%>%str_remove_all(" ") , ID = paste0(`#chr`,"_",start,"_",end) ) } # LD_list 
    if(region_list$start[1] - region_list$end[1]  == -1 ){ 
        region_list = region_list%>%mutate( start = start - ${windows} ,end = start +${windows}) # region_list for fix cis windows  
          } 
      if("gene_id" %in% colnames(region_list)){region_list = region_list%>%mutate(ID = gene_id)  } # region_list for gene
    region_list = region_list%>%select(molecular_trait_id = ID, chromosome  = `#chr`,finemapped_region_start = start ,finemapped_region_end = end)  # Formatting
    mole_id = "${f'{_input:br}'.split('.')[-4]}"%>%gsub("_N:","_*:",.)#for sQTL
    region_list = region_list%>%filter(molecular_trait_id == mole_id)
    variants_pip = extract_variants_pip( susie_obj , region_list)
    effect_pip = extract_effect_pip( susie_obj , region_list,variants_pip)
    lbf%>%write_delim("${_output[1]}","\t")
    variants_pip%>%write_delim("${_output[0]}","\t")
    effect_pip%>%write_delim("${_output[2]}","\t")
[fsusie_to_tsv_1]
# Input
# For complete susie, region_list or tad_list, for susie_rss , LD region list 
parameter: region_list = path
region_tbl = pd.read_csv(region_list,sep = "\t")
parameter: rds_path = paths
input: rds_path, group_by = 1
output: f"{cwd}/{_input:bn}.variant.tsv"
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output[0]:bn}'
R: expand = '${ }', stdout = f"{_output[0]:nn}.stdout", stderr = f"{_output[0]:nn}.stderr", container = container, entrypoint = entrypoint
    library("dplyr")
    library("tibble")
    library("purrr")
    library("tidyr")
    library("readr")
    library("stringr")
    library("susieR")

    extract_variants_pip = function(susie_obj,region_list){
        susie_tb = tibble( variants =  names(susie_obj$csd_X),
                               snps_index = which(( susie_obj$pip >= 0))) %>%
            mutate(chromosome = map_chr(variants, ~read.table(text = .x, sep = ":")$V1%>%str_replace("chr","") ),
                    position  = map_chr(variants, ~read.table(text = .x, sep = ":")$V2  ),
                    ref = map_chr(position , ~read.table(text = .x, sep = "_",colClasses = "character")$V2  ),
                    alt = map_chr(position , ~read.table(text = .x, sep = "_",colClasses = "character")$V3  ),
                    position  = map_dbl(position , ~read.table(text = .x, sep = "_",as.is = T)$V1  )
                                 )
          susie_tb = susie_tb%>%mutate(cs_order =(map(susie_tb$snps_index , ~tryCatch(which(pmap(list( a= susie_obj$cs) , function(a) .x %in% a )%>%unlist()), error = function(e) return(0) )  ))%>%as.character%>%str_replace("integer\\(0\\)","0"),
                             pip = susie_obj$pip)
          susie_tb =  susie_tb%>%mutate(  molecular_trait_id = region_list$tad_index,
                                 finemapped_region_start = region_list$start,
                                 finemapped_region_end = region_list$end)
          if("purity" %in% names(susie_obj)){
              susie_tb = susie_tb%>%mutate(purity = map_dbl(susie_tb$cs_order, ~ifelse(.x%>%as.numeric > 0, susie_obj$purity[[as.numeric(.x)]], NA ) ), is_dummy = as.numeric(purity < 0.5)  )
              }
    susie_tb = susie_tb%>%mutate(effect_peak_pos = map_dbl(cs_order, ~ifelse(.x%>%as.numeric > 0, susie_obj$outing_grid[which(abs(susie_obj$fitted_func[[as.numeric(.x)]]) == max(abs(susie_obj$fitted_func[[as.numeric(.x)]])))] , NA ) )) 
    susie_tb_lbf = cbind(susie_tb%>%select(molecular_trait_id,variants,cs_order),Reduce(cbind, susie_obj$lBF)%>%as.tibble%>%`colnames<-`(1:length(susie_obj$lBF)))
          return(list(susie_tb, susie_tb_lbf))    }
    susie_obj = readRDS("${_input:a}")
    region_list = read_delim("${region_list}","\t")
    region_list = region_list%>%filter(tad_index == "${f'{_input:br}'.split('.')[-4]}")
    variants_pip = extract_variants_pip( susie_obj , region_list)[[1]]
    variants_lbf = extract_variants_pip( susie_obj , region_list)[[2]]
    print(paste0("fsusie run time is ", round(susie_obj$runtime[[3]]/60),"min"))
    variants_pip%>%write_delim("${_output}","\t")
    variants_pip%>%write_delim("${_output:nn}.lbf.tsv","\t")
[*_to_tsv_2]
parameter: name = f'{_input[0]:b}'.split(".")[0]
input: group_by = "all"
output: f"{cwd}/{name}.all_variants.tsv"
bash: expand = '${ }', stdout = f"{_output:n}.stdout", stderr = f"{_output:n}.stderr", container = container, entrypoint = entrypoint
    head -1 ${_input[0]} > ${_output}
    cat ${_input[0]:d}/*variant.tsv | grep -v cs_order >> ${_output}
[susie_tsv_collapse]
parameter: tsv_path = paths # TSV needs have the name ends with  *.chr1_2_3.unisusie(_rss).lbf.tsv
tsv_list  = pd.DataFrame({"lbf_path" : [str(x) for x in tsv_path]})
chromosome = list(set([f'{x.split(".")[-5].split("_")[0].replace("chr","")}'  for x in tsv_list.lbf_path ])) ## Add chr if there is no chr prefix. This is to accomodata chr XY and M
input: tsv_path, for_each = "chromosome"
output: f'{cwd}/{_input[0]:bnnnnnnn}.chr{_chromosome}.unisusie_rss.lbf.tsv'
bash: expand = '${ }', stdout = f"{_output}.stdout", stderr = f"{_output}.stderr", container = container, entrypoint = entrypoint
        head -1 ${_input[0]} > ${_output}
        cat ${_input[0]:d}/*.chr${_chromosome}_*lbf.tsv | grep -v cs_index >> ${_output}
        
[susie_pip_landscape_plot]
parameter: plot_list = path
parameter: annot_tibble = path("~/Annotatr_builtin_annotation_tibble.tsv")
import pandas as pd
plot_list  = pd.read_csv(plot_list,sep = "\t")
file_type = plot_list.columns.values.tolist()
file_type = [x.split(".")[0] for x in file_type ]
plot_list = plot_list.to_dict("records")
input: plot_list, group_by = len(file_type)
output: f'{cwd}/{"_".join(file_type)}.{str(_input[0]).split(".")[-5]}.pip_landscape_plot.rds',f'{cwd}/{"_".join(file_type)}.{str(_input[0]).split(".")[-5]}.pip_landscape_plot.pdf'
R: expand = '${ }', stdout = f"{_output[0]}.stdout", stderr = f"{_output[0]}.stderr", container = container, entrypoint = entrypoint
    library("dplyr")
    library("readr") 
    library("ggplot2")
    library("purrr")
    color = c("black", "dodgerblue2", "green4", "#6A3D9A", 
          "#FF7F00", "gold1", "skyblue2", "#FB9A99", "palegreen2",
          "#CAB2D6", "#FDBF6F", "gray70", "khaki2", "maroon", "orchid1",
          "deeppink1", "blue1", "steelblue4", "darkturquoise", "green1", 
          "yellow4", "yellow3","darkorange4","brown","navyblue","#FF0000",
          "darkgreen","#FFFF00","purple","#00FF00","pink","#0000FF",
          "orange","#FF00FF","cyan","#00FFFF","#FFFFFF")
    extract_table = function(variant_df,type){ 
    if("purity" %in% colnames(variant_df) ){
      variant_df$purity[is.na(variant_df$purity)] = 0
      variant_df[abs(variant_df$purity) < 0.5,7] = 0
      }
     variant_df = variant_df%>%mutate(CS = (cs_order%>%as.factor%>%as.numeric-1)%>%as.factor)%>%
          select( y = pip ,snp = variants,pos = position , CS, molecular_trait_id)%>%mutate(molecular_trait_id = paste0(type,"_",molecular_trait_id ) )
    return(variant_df)
    }
    plot_recipe = tibble( type =  c('${"','".join(file_type) }'), path = c(${_input:r,}))
    plot_list = map2(plot_recipe$type,plot_recipe$path, ~read_delim(.y, guess_max = 10000000)%>%extract_table(.x) )
    plot_df = Reduce(rbind,plot_list)
    plot_range = (plot_df%>%group_by(molecular_trait_id)%>%summarize(start = (min(pos)), end = (max(pos)))%>%mutate(start = median(start),end = median(end)))[1,c(2,3)]%>%as.matrix
    plot_chr = (plot_df$snp[1]%>%stringr::str_split(":"))[[1]][1]
    plot_df = plot_df%>%mutate(Shared = as.logical(map(snp, ~(plot_df%>%filter( snp ==.x ,   CS%>%as.numeric !=  1 )%>%nrow()) > 1  )))
    pip_plot <- plot_df%>%ggplot2::ggplot(aes(y = y, x = pos,
                                  col =  CS, shape = Shared )) + facet_grid(molecular_trait_id ~.)+
      geom_point(size = 7) +
      scale_color_manual("CS",values = color) +
      theme(axis.ticks.x = element_blank()) +
     ylab("Posterior Inclusion Probability (PIP)")+xlim(plot_range)+
      theme(axis.ticks.x = element_blank()) +
            theme(strip.text.y.right = element_text(angle = 0))+
            xlab("") + 
            theme(text = element_text(size = 30))+ggtitle("Overview of fine-mapping")
  
    annot = read_delim("${annot_tibble}")
    annot = annot%>%filter(seqnames == plot_chr, start > plot_range[1], end < plot_range[2])
    annot_plot   = annot%>%filter(!type%in%c("hg38_genes_introns","hg38_genes_1to5kb"))%>%
                        ggplot(aes())+
                        geom_segment( aes(x = start,xend = end, y = "Regulartory Element", yend = "Regulartory Element", color = type ), linewidth =10)+
                        ylab("")+xlab("")+xlim(plot_range)+theme(axis.text.x=element_blank(),text = element_text(size = 20))+scale_color_brewer(palette="Dark2")
    gene_plot = annot%>%filter(type%in%c("hg38_genes_1to5kb"))%>%group_by(symbol)%>%
                            summarise(start = min(start), end = max(end))%>%na.omit%>%
                            ggplot(aes())+geom_segment( aes(x = start,xend = end, y = "Gene", yend = "Gene", color = symbol ), linewidth =10)+
                            geom_label(aes(x = (start+end)/2,y = "Gene", label = symbol ),size = 5)+ylab("")+xlab("POS")+
                            theme(legend.position="none")+theme(text = element_text(size = 20))+xlim(plot_range)
      
    list(pip_plot,plot_df,annot_plot,gene_plot)%>%saveRDS("${_output[0]}")
    cowplot::plot_grid(plotlist = list(pip_plot,annot_plot,gene_plot),ncol = 1, align = "v",axis = "tlbr",rel_heights = c(8,1,1))%>%ggsave(filename = "${_output[1]}",device = "pdf",dpi = "retina",width = 30, height = 30)
[susie_upsetR_plot]
parameter: plot_list = path
import pandas as pd
plot_list  = pd.read_csv(plot_list, sep = "\t")
file_type = plot_list.columns.values.tolist()
file_type = [x.split(".")[0] for x in file_type ]
plot_list = plot_list.to_dict("records")
input: plot_list
output: f'{cwd}/{"_".join(file_type)}.UpSetR.rds',f'{cwd}/{"_".join(file_type)}.UpSetR.pdf'
R: expand = '${ }', stdout = f"{_output[0]}.stdout", stderr = f"{_output[0]}.stderr", container = container, entrypoint = entrypoint
    library("dplyr")
    library("readr") 
    library("ggplot2")
    library("purrr")
    library("UpSetR")
    library("ComplexUpset")
    plot_recipe = tibble( type =  c('${"','".join(file_type) }'), path = c(${_input:r,}))
    plot_list = map2(plot_recipe$type,plot_recipe$path, ~read_delim(.y, guess_max = 10000000)%>%mutate(cs = cs_order != 0 )%>%filter(cs > 0)%>%select(variants,cs)%>%`colnames<-`(c("variants",.x))%>%distinct() )
    cs_sharing = Reduce(full_join,plot_list)
    cs_upsetR_sharing = cs_sharing
    cs_upsetR_sharing[,2:ncol(cs_upsetR_sharing)]%>%mutate_all(as.numeric)-> cs_upsetR_sharing[,2:ncol(cs_upsetR_sharing)]
    a = upset(cs_upsetR_sharing%>%as.data.frame,intersect = colnames(cs_upsetR_sharing[2:ncol(cs_upsetR_sharing)]),
      keep_empty_groups = F,
          base_annotations=list(`Intersection size` = intersection_size( bar_number_threshold = 1, position = position_dodge(0.5), width = 0.3 ,text = list(size = 5)   )  ) ,
              themes=upset_default_themes(axis.text=element_text(size=30))     ,
              min_degree = 1)
    
    a%>%ggsave(filename = "${_output[1]}",device = "pdf",dpi = "retina",width=18.5, height=10.5)
    list(cs_upsetR_sharing)%>%saveRDS("${_output[0]}")
[susie_upsetR_cs_plot]
parameter: plot_list = path
import pandas as pd
plot_list  = pd.read_csv(plot_list, sep = "\t")
file_type = plot_list.columns.values.tolist()
file_type = [x.split(".")[0] for x in file_type ]
plot_list = plot_list.to_dict("records")
parameter: trait_to_select =  1 
input: plot_list
output: f'{cwd}/{"_".join(file_type)}.UpSetR_{file_type[trait_to_select-1]}_cs.rds',f'{cwd}/{"_".join(file_type)}.UpSetR_{file_type[trait_to_select-1]}_cs.pdf'
R: expand = '${ }', stdout = f"{_output[0]}.stdout", stderr = f"{_output[0]}.stderr", container = container, entrypoint = entrypoint

    library("dplyr")
    library("readr") 
    library("ggplot2")
    library("purrr")
    library("UpSetR")
    library("ComplexUpset")

    cs_sharing_identifer = function(upsetR_input,df){
    inner_join(upsetR_input, df%>%select(variants,molecular_trait_id, cs_order)%>%filter(cs_order != 0))%>%select(-variants)-> dfL_CS_sharing
    dfL_CS_sharing[is.na(dfL_CS_sharing)] = FALSE
    dfL_CS_sharing = dfL_CS_sharing%>%group_by(molecular_trait_id,cs_order)%>%summarize(across(everything(), list(mean))   )
    dfL_CS_sharing = dfL_CS_sharing%>%mutate(across(colnames(dfL_CS_sharing)[3:ncol(dfL_CS_sharing)], ~.x != 0  ))%>%`colnames<-`(c("molecular_trait_id","cs_order",colnames(cs_sharing)[2:ncol(cs_sharing)]))
    }
  
  
    plot_recipe = tibble( type =  c('${"','".join(file_type) }'), path = c(${_input:r,}))
    plot_list = map2(plot_recipe$type,plot_recipe$path, ~read_delim(.y, guess_max = 10000000)%>%mutate(cs = cs_order != 0 )%>%filter(cs > 0)%>%select(variants,cs)%>%`colnames<-`(c("variants",.x))%>%distinct() )
    cs_sharing = Reduce(full_join,plot_list)
    cs_upsetR_sharing = cs_sharing
    cs_upsetR_sharing[,2:ncol(cs_upsetR_sharing)]%>%mutate_all(as.numeric)-> cs_upsetR_sharing[,2:ncol(cs_upsetR_sharing)]

    df = read_delim(plot_recipe$path[[${trait_to_select}]]) 
    
    cs_sharing_df = cs_sharing_identifer(cs_upsetR_sharing,df)

    a = upset(cs_sharing_df%>%as.data.frame,intersect = colnames(cs_sharing_df[3:ncol(cs_sharing_df)]),
          keep_empty_groups = F,
          base_annotations=list(`Intersection size` = intersection_size( bar_number_threshold = 1, position = position_dodge(0.5), width = 0.3 ,text = list(size = 8)   )  ),
          themes=upset_default_themes(axis.text=element_text(size=30)),
          set_size = F  ,  min_degree = 1,wrap = T) + ggtitle( paste0(plot_recipe$type[[${trait_to_select}]],'CS shared with other phenotypes') )   + theme(plot.title = element_text(size = 40, face = "bold"))
  
    a%>%ggsave(filename = "${_output[1]}",device = "pdf",dpi = "retina",width=18.5, height=10.5)
    list(cs_sharing_df)%>%saveRDS("${_output[0]}")
[tmp_annotatation_of_snps_1]
parameter: SNP_list = path
parameter: annotation_rds = path
input: SNP_list
output: f'{cwd}/{_input:b}.annotated.rds'
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output:bn}'
R: expand = '${ }', stdout = f"{_output}.stdout", stderr = f"{_output}.stderr", container = container, entrypoint = entrypoint
    library("dplyr")
    library("readr") 
    library("purrr")
    library("stringr")
    sharing_snp = readRDS("${_input}")
    sharing_snp_fsusie = sharing_snp[[1]]%>%filter(haQTL == 1 | mQTL == 1)
    sharing_snp_fsusie = sharing_snp_fsusie%>%mutate(X1 =  read.table(text = sharing_snp_fsusie$variants, sep = ":")$V1, X2 = read.table(text = read.table(text = sharing_snp_fsusie$variants, sep = ":")$V2 , sep = "_")$V1  )
    sharing_snp_fsusie = sharing_snp_fsusie%>%select(variants,chr = X1, pos = X2)
    annotation = readRDS("${annotation_rds}")
    print("data loaded")
    result = sharing_snp_fsusie%>%mutate(annot = map2( chr,pos , ~ annotation%>%filter(X1 == .x, X2 <= .y, X3 >= .y)%>%pull(X5)))%>%mutate(annot = map_chr(annot, ~paste0(.x ,collapse = ",")) )
    print("snp annotated")
    result%>%saveRDS("${_output}")
[tmp_annotatation_of_snps_2]
parameter: SNP_list = path
parameter: annotation_rds = path
output: f'{cwd}/{_input:b}.annotated_rev.rds'
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output:bn}'
R: expand = '${ }', stdout = f"{_output}.stdout", stderr = f"{_output}.stderr", container = container, entrypoint = entrypoint
    library("dplyr")
    library("readr") 
    library("purrr")
    library("stringr")
    result = readRDS(${_input:r})
    result_rev = tibble(annot = unique(annotation$X5))%>%mutate(variants = map(annot, ~  result%>%filter( str_detect(annot,.x))%>%pull(variants)) )%>%mutate( variants = map_chr(variants,~paste0(.x ,collapse = ","))  )
    result_rev%>%saveRDS("${_output}")
[fsusie_extract_effect]
parameter: rds_path = paths
parameter: annot_tibble = path("~/Annotatr_builtin_annotation_tibble.tsv")
input: rds_path, group_by = 1
output: f'{cwd}/{_input:bn}.estimated_effect.tsv',f'{cwd}/{_input:bn}.estimated_effect.pdf'
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output:bn}'
R: expand = '${ }', stdout = f"{_output[0]}.stdout", stderr = f"{_output[0]}.stderr", container = container, entrypoint = entrypoint
    library("stringr")
    library("dplyr")
    library("readr") 
    library("ggplot2")
    library("purrr")
    library("tidyr")
    color = c("black", "dodgerblue2", "green4", "#6A3D9A", 
          "#FF7F00", "gold1", "skyblue2", "#FB9A99", "palegreen2",
          "#CAB2D6", "#FDBF6F", "gray70", "khaki2", "maroon", "orchid1",
          "deeppink1", "blue1", "steelblue4", "darkturquoise", "green1", 
          "yellow4", "yellow3","darkorange4","brown","navyblue","#FF0000",
          "darkgreen","#FFFF00","purple","#00FF00","pink","#0000FF",
          "orange","#FF00FF","cyan","#00FFFF","#FFFFFF")
    
    effect_extract = function(fsusie){
        plot_df = fsusie$fitted_func%>%as_tibble(.name_repair = "universal")%>%mutate(pos =  fsusie$outing_grid)%>%`colnames<-`(c(paste0("Effect_",1:length(fsusie$cs)),"pos"))%>%mutate(`#chr` = str_split(names(fsusie$csd_X)[[1]],":")[[1]][[1]] )%>%select(`#chr`, pos, everything())
        plot=  plot_df%>%pivot_longer(cols = 3:ncol(plot_df),names_to = "effect", values_to = "values"   ) %>%ggplot(aes(x = pos, y = values,color = effect),linewidth = 7)+
        geom_line()+ylab("Estimated Effect") + xlab("POS")+facet_grid(effect~. )+
            scale_color_manual("Credible set",values = color[2:length(color)])+geom_line(aes(y = 0), color = "black")
            theme(strip.text.y.right = element_text(angle = 0))+
            xlab("") + 
            ylab("Estimated Effect")+ 
            theme(text = element_text(size = 50))+
            ggtitle(paste0( "Estimated effect for ${f'{_input:br}'.split('.')[-4]}"))
        return(list(plot_df,plot))
        }
    susie_obj =  readRDS("${_input}")
    output = effect_extract(susie_obj)
    effect_tbl = output[[1]]
    annot = read_delim("${annot_tibble}")
    annot = annot%>%filter(seqnames == (effect_tbl$`#chr`)[[1]], start > min(effect_tbl$pos), end < max(effect_tbl$pos))
    plot_range = c(min(effect_tbl$pos),  max(effect_tbl$pos))
    annot_plot   = annot%>%filter(!type%in%c("hg38_genes_introns","hg38_genes_1to5kb"))%>%
                        ggplot(aes())+
                        geom_segment( aes(x = start,xend = end, y = "Regulartory Element", yend = "Regulartory Element", color = type ), linewidth =10)+
                        ylab("")+xlab("")+xlim(plot_range)+theme(axis.text.x=element_blank(),text = element_text(size = 20))+scale_color_brewer(palette="Dark2")
    gene_plot = annot%>%filter(type%in%c("hg38_genes_1to5kb"))%>%group_by(symbol)%>%
                            summarise(start = min(start), end = max(end))%>%na.omit%>%
                            ggplot(aes())+geom_segment( aes(x = start,xend = end, y = "Gene", yend = "Gene", color = symbol ), linewidth =10)+
                            geom_label(aes(x = (start+end)/2,y = "Gene", label = symbol ),size = 5)+ylab("")+xlab("POS")+
                            theme(legend.position="none")+theme(text = element_text(size = 20))+xlim(plot_range)
    cowplot::plot_grid(plotlist = list(output[[2]]),ncol = 1, align = "v",axis = "tlbr",rel_heights = c(8))%>%ggsave(filename = "${_output[1]}",device = "pdf",dpi = "retina",width = 30, height = 30)
    effect_tbl%>%write_delim("${_output[0]}","\t")
[fsusie_affected_region]
parameter: rds_path = paths
input: rds_path, group_by = 1
output: f'{cwd}/{_input:bn}.affected_region.tsv',f'{cwd}/{_input:bn}.affected_region.pdf'
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output[0]:bn}'
R: expand = '${ }', stdout = f"{_output[0]}.stdout", stderr = f"{_output[0]}.stderr", container = container, entrypoint = entrypoint
    library("stringr")
    library("dplyr")
    library("readr") 
    library("ggplot2")
    library("purrr")
    library("tidyr")
    library(susiF.alpha)
    library(ashr)
    library(wavethresh)
    color = c("black", "dodgerblue2", "green4", "#6A3D9A", 
          "#FF7F00", "gold1", "skyblue2", "#FB9A99", "palegreen2",
          "#CAB2D6", "#FDBF6F", "gray70", "khaki2", "maroon", "orchid1",
          "deeppink1", "blue1", "steelblue4", "darkturquoise", "green1", 
          "yellow4", "yellow3","darkorange4","brown","navyblue","#FF0000",
          "darkgreen","#FFFF00","purple","#00FF00","pink","#0000FF",
          "orange","#FF00FF","cyan","#00FFFF","#FFFFFF")
    ## Define Function
    update_cal_credible_band2 <- function(susiF.obj )
    {
    
    
    
      if(sum( is.na(unlist(susiF.obj$alpha))))
      {
        stop("Error: some alpha value not updated, please update alpha value first")
      }
      temp <- wavethresh::wd(rep(0, susiF.obj$n_wac))
    
    
      for ( l in 1:susiF.obj$L)
      {
        Smat <-  susiF.obj$fitted_wc2[[l]]
        W1   <- ((wavethresh::GenW(n=  ncol(Smat )  , filter.number = 10, family = "DaubLeAsymm")))
        tt   <- diag( W1%*%diag(c(susiF.obj$alpha[[l]]%*%Smat ))%*% t(W1 ))
    
        up                       <-  susiF.obj$fitted_func[[l]]+ 3*sqrt(tt)
        low                      <-  susiF.obj$fitted_func[[l]]- 3*sqrt(tt)
        susiF.obj$cred_band[[l]] <- rbind(up, low)
      }
    
    
    
      return(susiF.obj)
    }
    
    affected_reg <- function( susiF.obj){
      outing_grid <- susiF.obj$outing_grid
    
      reg <-  list()
      h <- 1
      for (   l in 1:length(susiF.obj$cs)){
    
        pos_up <-  which(susiF.obj$cred_band[[l]][1,]<0)
        pos_low <- which(susiF.obj$cred_band[[l]][2,]>0)
    
    
        reg_up <- split( pos_up,cumsum(c(1,diff( pos_up)!=1)))
    
        reg_low <- split( pos_low,cumsum(c(1,diff( pos_low)!=1)))
        for( k in 1:length(reg_up)){
          reg[[h]] <- c(l, outing_grid[reg_up[[k]][1]], outing_grid[reg_up[[k]][length(reg_up[[k]])]])
    
          h <- h+1
        }
        for( k in 1:length(reg_low )){
          reg[[h]] <- c(l, outing_grid[reg_low [[k]][1]], outing_grid[reg_low [[k]][length(reg_low [[k]])]])
    
          h <- h+1
        }
    
    
      }
      reg <-  do.call(rbind, reg)
      colnames(reg) <- c("CS", "Start","End")
      return(reg)
    }
    
    
    susiF_obj =  readRDS("${_input}")
    susiF_obj = update_cal_credible_band2(susiF_obj)
    affected_tbl = affected_reg(susiF_obj)
    affected_tbl = affected_tbl%>%as_tibble%>%mutate(analysis = susiF_obj$name,
                          chr = (names(susiF_obj$csd_X)[[1]]%>%stringr::str_split(":"))[[1]][[1]],
                          molecular_trait_id =  "${f'{_input:br}'.split('.')[-4]}", 
                          purity  = purrr::map_dbl(CS,~susiF_obj$purity[[.x]] )  )
    
    affected_tbl%>%as_tibble%>%write_delim("${_output[0]}","\t")
    plt = plot_susiF(susiF_obj, cred.band = T)
    plt%>%ggsave(filename = "${_output[1]}",device = "pdf",dpi = "retina",width = 30, height = 30)

Into VCF format#

FIXME: These codes were moved from earlier workflows. To be cleaned up and tested.

#[uni_susie_2]
input: group_with = "genoFile"
output: f"{_input:n}.vcf.bgz"
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output[0]:bn}'
R: expand = '${ }', stdout = f"{_output:nn}.stdout", stderr = f"{_output:nn}.stderr", container = container, entrypoint = entrypoint
   ## Define create_vcf function
           create_vcf = function (chrom, pos, nea, ea, snp = NULL, ea_af = NULL, effect = NULL, 
        se = NULL, pval = NULL, name = NULL,cs = NULL, pip = NULL) 
    {
        stopifnot(length(chrom) == length(pos))
        if (is.null(snp)) {
            snp <- paste0(chrom, ":", pos)
        }
        snp <- paste0(chrom, ":", pos)
        nsnp <- length(chrom)
        gen <- list()
        ## Setupt data content for each sample column
        if (!is.null(ea_af)) 
            gen[["AF"]] <- matrix(ea_af, nsnp)
        if (!is.null(effect)) 
            gen[["ES"]] <- matrix(effect, nsnp)
        if (!is.null(se)) 
            gen[["SE"]] <- matrix(se, nsnp)
        if (!is.null(pval)) 
            gen[["LP"]] <- matrix(-log10(pval), nsnp)
        if (!is.null(cs)) 
            gen[["CS"]] <- matrix(cs, nsnp)
        if (!is.null(pip)) 
            gen[["PIP"]] <- matrix(pip, nsnp)
        gen <- S4Vectors::SimpleList(gen)
        
      ## Setup snps info for the fix columns
        gr <- GenomicRanges::GRanges(chrom, IRanges::IRanges(start = pos, 
            end = pos + pmax(nchar(nea), nchar(ea)) - 1, names = snp))
         coldata <- S4Vectors::DataFrame(Studies = name, row.names = name)
    ## Setup header informations
        hdr <- VariantAnnotation::VCFHeader(header = IRanges::DataFrameList(fileformat = S4Vectors::DataFrame(Value = "VCFv4.2", 
            row.names = "fileformat")), sample = name)
        VariantAnnotation::geno(hdr) <- S4Vectors::DataFrame(Number = c("A", 
            "A", "A", "A", "A", "A"), Type = c("Float", "Float", 
            "Float", "Float", "Float", "Float"), Description = c("Effect size estimate relative to the alternative allele", 
            "Standard error of effect size estimate", "-log10 p-value for effect estimate",  
            "Alternate allele frequency in the association study",
            "The CS this variate are captured, 0 indicates not in any cs", "The posterior inclusion probability to a CS"), 
            row.names = c("ES", "SE", "LP", "AF", "CS", "PIP"))
      ## Save only the meta information in the sample columns 
        VariantAnnotation::geno(hdr) <- subset(VariantAnnotation::geno(hdr), 
            rownames(VariantAnnotation::geno(hdr)) %in% names(gen))
      ## Save VCF 
        vcf <- VariantAnnotation::VCF(rowRanges = gr, colData = coldata, 
            exptData = list(header = hdr), geno = gen)
        VariantAnnotation::alt(vcf) <- Biostrings::DNAStringSetList(as.list(ea))
        VariantAnnotation::ref(vcf) <- Biostrings::DNAStringSet(nea)
      ## Add fixed values
        VariantAnnotation::fixed(vcf)$FILTER <- "PASS"
          return(sort(vcf))
        }
    library("susieR")
    library("dplyr")
    library("tibble")
    library("purrr")
    library("readr")
    library("tidyr")
    library("stringr")
    
    # Get list of cs snps
    susie_list = readRDS(${_input:r})
    susie_tb_ls = list()
    for (i in 1:length(susie_list)){
        susie_tb = tibble( snps =  names(susie_list[[1]]$pip)[which( susie_list[[i]]$pip >= 0)], snps_index = which(( susie_list[[i]]$pip >= 0))  )
        susie_tb_ls[[i]]= susie_tb%>%mutate( cs = map(snps_index,~which( susie_list[[i]]$sets$cs %in% .x))%>%as.numeric%>%replace_na(0),
                                 pip = map_dbl(snps_index,~( susie_list[[i]]$pip[.x])),
                                 coef = map_dbl(snps_index,~(coef.susie( susie_list[[i]])[.x+1])))
        }
    if(length(susie_tb_ls) >= 2){ 
      for(i in 2:length(susie_tb_ls)){
          susie_tb_ls[[i]] = full_join(susie_tb_ls[[i-1]],susie_tb_ls[[i]], by = "snps") 
        }
    }
    m = c("cs","pip","coef")    
    output = list()
    for(i in m){
    output[[i]] = susie_tb_ls[[length(susie_tb_ls)]]%>%select(contains(i))%>%as.matrix
    }
    snps_tb = susie_tb_ls[[length(susie_tb_ls)]]%>%mutate(
                         chr = map_chr(snps,~read.table(text = .x,sep = ":",as.is = T)$V1),
                         pos_alt_ref = map_chr(snps,~read.table(text = .x,sep = ":",as.is = TRUE)$V2),
                         pos = map_dbl(pos_alt_ref,~read.table(text = .x,sep = "_",as.is = TRUE)$V1),
                         alt = map_chr(pos_alt_ref,~read.table(text = .x,sep = "_",as.is = TRUE, colClass = "character")$V2),
                         ref = map_chr(pos_alt_ref,~read.table(text = .x,sep = "_",as.is = TRUE, colClass = "character")$V3))
    
    snps_tb = snps_tb%>%filter(str_detect(ref, "[ACTG]") & str_detect(alt, "[ACTG]"))
    output_vcf = create_vcf(
            chrom = snps_tb$chr,
             pos = snps_tb$pos,
             ea = snps_tb$alt,
             nea = snps_tb$ref,
             effect = snps_tb%>%select(contains("coef"))%>%as.matrix ,
             pip = snps_tb%>%select(contains("pip"))%>%as.matrix,
             cs = snps_tb%>%select(contains("cs"))%>%as.matrix,
             name = names(susie_list))
    VariantAnnotation::writeVcf(output_vcf,${_output:nr},index = TRUE)
[mv_susie_2]
input: group_with = "genoFile"
output: f"{_input:n}.vcf.bgz"
task: trunk_workers = 1, trunk_size = 1, walltime = '2h', mem = '55G', cores = 1, tags = f'{step_name}_{_output[0]:bn}'
R: expand = '${ }', stdout = f"{_output:nn}.stdout", stderr = f"{_output:nn}.stderr", container = container, entrypoint = entrypoint
   ## Define create_vcf function
           create_vcf = function (chrom, pos, nea, ea, snp = NULL, ea_af = NULL, effect = NULL, 
        se = NULL, pval = NULL, name = NULL,cs = NULL, pip = NULL) 
    {
        stopifnot(length(chrom) == length(pos))
        if (is.null(snp)) {
            snp <- paste0(chrom, ":", pos)
        }
        snp <- paste0(chrom, ":", pos)
        nsnp <- length(chrom)
        gen <- list()
        ## Setupt data content for each sample column
        if (!is.null(ea_af)) 
            gen[["AF"]] <- matrix(ea_af, nsnp)
        if (!is.null(effect)) 
            gen[["ES"]] <- matrix(effect, nsnp)
        if (!is.null(se)) 
            gen[["SE"]] <- matrix(se, nsnp)
        if (!is.null(pval)) 
            gen[["LP"]] <- matrix(-log10(pval), nsnp)
        if (!is.null(cs)) 
            gen[["CS"]] <- matrix(cs, nsnp)
        if (!is.null(pip)) 
            gen[["PIP"]] <- matrix(pip, nsnp)
        gen <- S4Vectors::SimpleList(gen)
        
      ## Setup snps info for the fix columns
        gr <- GenomicRanges::GRanges(chrom, IRanges::IRanges(start = pos, 
            end = pos + pmax(nchar(nea), nchar(ea)) - 1, names = snp))
         coldata <- S4Vectors::DataFrame(Studies = name, row.names = name)
    ## Setup header informations
        hdr <- VariantAnnotation::VCFHeader(header = IRanges::DataFrameList(fileformat = S4Vectors::DataFrame(Value = "VCFv4.2", 
            row.names = "fileformat")), sample = name)
        VariantAnnotation::geno(hdr) <- S4Vectors::DataFrame(Number = c("A", 
            "A", "A", "A", "A", "A"), Type = c("Float", "Float", 
            "Float", "Float", "Float", "Float"), Description = c("Effect size estimate relative to the alternative allele", 
            "Standard error of effect size estimate", "-log10 p-value for effect estimate",  
            "Alternate allele frequency in the association study",
            "The CS this variate are captured, 0 indicates not in any cs", "The posterior inclusion probability to a CS"), 
            row.names = c("ES", "SE", "LP", "AF", "CS", "PIP"))
      ## Save only the meta information in the sample columns 
        VariantAnnotation::geno(hdr) <- subset(VariantAnnotation::geno(hdr), 
            rownames(VariantAnnotation::geno(hdr)) %in% names(gen))
      ## Save VCF 
        vcf <- VariantAnnotation::VCF(rowRanges = gr, colData = coldata, 
            exptData = list(header = hdr), geno = gen)
        VariantAnnotation::alt(vcf) <- Biostrings::DNAStringSetList(as.list(ea))
        VariantAnnotation::ref(vcf) <- Biostrings::DNAStringSet(nea)
      ## Add fixed values
        VariantAnnotation::fixed(vcf)$FILTER <- "PASS"
          return(sort(vcf))
        }
    library("susieR")
    library("dplyr")
    library("tibble")
    library("purrr")
    library("readr")
    library("tidyr")
    
    # Get list of cs snps
    res = readRDS(${_input:r})
    output_snps = tibble( snps = res$variable_name[which(res$pip >= 0)], snps_index = which((res$pip >= 0))  )
    output_snps = output_snps%>%mutate( cs = map(snps_index,~which(res$sets$cs %in% .x))%>%as.numeric%>%replace_na(0),
                             pip = map_dbl(snps_index,~(res$pip[.x])),
                     chr = map_chr(snps,~read.table(text = .x,sep = ":",as.is = T)$V1),
                     pos_alt_ref = map_chr(snps,~read.table(text = .x,sep = ":",as.is = TRUE)$V2),
                     pos = map_dbl(pos_alt_ref,~read.table(text = .x,sep = "_",as.is = TRUE)$V1),
                     alt = map_chr(pos_alt_ref,~read.table(text = .x,sep = "_",as.is = TRUE, colClass = "character")$V2),
                     ref = map_chr(pos_alt_ref,~read.table(text = .x,sep = "_",as.is = TRUE, colClass = "character")$V3))
    
    effect_mtr = res$coef[output_snps$snps_index+1]%>%as.matrix
    colnames(effect_mtr) = "${name}"
    rownames(effect_mtr) = output_snps$snps
    cs_mtr = effect_mtr
    for(i in 1:nrow(cs_mtr)) cs_mtr[i,] =  output_snps$cs[[i]]  
    pip_mtr = effect_mtr
    for(i in 1:nrow(pip_mtr)) pip_mtr[i,] =  output_snps$pip[[i]]  
    
    output_vcf = create_vcf(
           chrom = output_snps$chr,
            pos = output_snps$pos,
            ea = output_snps$alt,
            nea = output_snps$ref,
            effect = effect_mtr ,
            pip = pip_mtr,
            cs = cs_mtr,
            name = colnames(effect_mtr)
              )
    VariantAnnotation::writeVcf(output_vcf,${_output:nr},index = TRUE)

Cis-window analysis Result consolidation#

# Exporting cis susie_twas results
[cis_results_export_1, gwas_results_export_1]
# per chunk we process at most 200 datasets
parameter: per_chunk = 200
# Region list should have last column being region name. 
parameter:region_file=path()
# the path stored original output files
parameter:file_path=''
# assuming orinal files are named as prefix.id.suffix
# prefix of the original files (before id) to identify file.
parameter:prefix=[]
# suffix of the original files (after id) to identify file.
parameter:suffix=str
# if need to rename context, please load condition meta file
parameter: condition_meta = path()
# if the pip all variants in a cs < this threshold, then remove this cs
parameter: pip_thres = 0.05
# only keep top cs_size variants in one cs 
parameter: cs_size = 3
# provide exported meta to filter the exported genes 
parameter: exported_file = path()
# Optional: if a region list is provide the analysis will be focused on provided region. 
# The LAST column of this list will contain the ID of regions to focus on
# Otherwise, all regions with both genotype and phenotype files will be analyzed
parameter: region_list = path()
# Optional: if a region name is provided 
# the analysis would be focused on the union of provides region list and region names
parameter: region_name = []
# the (aligned) geno bim file to check allele flipping
parameter: geno_ref = path()
# context meta file to map renamed context backs
parameter: context_meta = path()
# default cretria in susie purity filtering, recommended to use 0.8 here 
parameter: min_corr = []
# set this parameter as `True` when exporting gwas data.
parameter: gwas = False
# set this parameter as `True` when exporting fsusie data.
parameter: fsusie = False
# set this parameter as `True` when exporting metaQTL data.
parameter: metaQTL = False
# set this parameter as `True` when exporting mnm data.
parameter: mnm = False

import pandas as pd
import os

region = pd.read_csv(region_file, sep='\t', names=['chr', 'start', 'end', 'id'])

region_ids = []
# If region_list is provided, read the file and extract IDs
if not region_list.is_dir():
    if region_list.is_file():
        region_list_df = pd.read_csv(region_list, sep='\t', header=None, comment = "#")
        region_ids = region_list_df.iloc[:, -1].unique()  # Extracting the last column for IDs
    else:
        raise ValueError("The region_list path provided is not a file.")
        
if len(region_name) > 0:
    region_ids = list(set(region_ids).union(set(region_name)))
    
if len(region_ids) > 0:
    region = region[region['id'].isin(region_ids)]

# Function to create list of formatted strings for each row
def create_formatted_list(row):
    if len(prefix) > 0:
        formatted_list = [f"{file_path}/{p}.{row['id']}.{suffix}" for p in prefix]
    else:
        formatted_list = [f"{file_path}/{row['id']}.{suffix}"]  # GWAS data do not have prefix     
    return formatted_list

# Apply the function to each row
region['original_data'] = region.apply(create_formatted_list, axis=1)

def group_by_region(lst, partition):
    # from itertools import accumulate
    # partition = [len(x) for x in partition]
    # Compute the cumulative sums once
    # cumsum_vector = list(accumulate(partition))
    # Use slicing based on the cumulative sums
    # return [lst[(cumsum_vector[i-1] if i > 0 else 0):cumsum_vector[i]] for i in range(len(partition))]
    return partition

def filter_existing_paths(row):
    existing_paths = [path for path in row if os.path.exists(path)]
    return existing_paths

def is_file_exported(paths, results_set):
    for path in paths:
        basename = os.path.basename(path)
        if basename not in results_set:
            return False
    return True

region['original_data'] = region['original_data'].apply(filter_existing_paths)
region = region[region['original_data'].map(bool)]

# if provided exported meta, check if the original data are all exported already, isfo skip them 
if not exported_file.is_dir():
    if exported_file.is_file():
        results = pd.read_csv(exported_file, sep='\t')
        results_set = set(results['original_data'])
        results_set = {item.strip() for sub in results_set for item in sub.split(',')}
        mask = region['original_data'].apply(lambda paths: not is_file_exported(paths, results_set))
        region = region[mask]
    else:
        raise ValueError("The exported_file path provided is not a file.")

# added because there is frequently changed ID from upstream analysis!!!        
# If after filtering no region remains, update 'id' by concatenating 'chr' and original 'id' with '_'
if region.empty:
    print("No regions remain after filtering. Reattempting using concatenated id (chr_id).")
    # Update the 'id' column using the concatenation of the first column (chr) and the original 'id'
    region = pd.read_csv(region_file, sep='\t', names=['chr', 'start', 'end', 'id'])
    region['id'] = region['chr'] + "_" + region['id']
    
    # Reapply region_ids filter if provided
    if len(region_ids) > 0:
        region = region[region['id'].isin(region_ids)]
    
    # Recreate original_data with the updated id
    region['original_data'] = region.apply(create_formatted_list, axis=1)
    region['original_data'] = region['original_data'].apply(filter_existing_paths)
    region = region[region['original_data'].map(bool)]
    
    # Reapply exported_file filtering if applicable
    if not exported_file.is_dir():
        if exported_file.is_file():
            results = pd.read_csv(exported_file, sep='\t')
            results_set = set(results['original_data'])
            results_set = {item.strip() for sub in results_set for item in sub.split(',')}
            mask = region['original_data'].apply(lambda paths: not is_file_exported(paths, results_set))
            region = region[mask]
        else:
            raise ValueError("The exported_file path provided is not a file.")
regional_data = {
    'meta': [(row['chr'], row['start'], row['end'], row['id']) for _, row in region.iterrows()],
    'data': [(row['original_data']) for _, row in region.iterrows()]
}

meta_info = regional_data['meta']
stop_if(len(regional_data['data']) == 0, f' All files have been exported already')

input: regional_data["data"], group_by = lambda x: group_by_region(x, regional_data["data"]), group_with = "meta_info"
output: f"{cwd}/{name}_cache/{name}_{_meta_info[3]}.tsv"
task: trunk_workers = job_size, walltime = walltime, trunk_size = job_size, mem = mem, cores = numThreads, tags = f'{_output:bn}'
R: expand = "${ }",stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout', container = container, entrypoint=entrypoint
    library(tidyverse)
    library(data.table)
    library(susieR)
    library(pecotmr)

    # check top_loci existing or not
    has_rows <- function(df) {
      !is.null(df) && nrow(df) > 0
    }

    # function to allele flipping checking by mapping them to aligned genotype bim file
    align_to_genoref <- function(var_list, geno_ref, region ){
        geno_ref <- pecotmr:::tabix_region(file= geno_ref,
                    region = region)
        colnames(geno_ref) <- c('chr', 'pos', 'alt', 'ref')
        geno_ref <- geno_ref %>% mutate(chr = gsub('chr','',chr))
        var_list_df <- data.frame(chr = str_split(var_list,":|_",simplify = T)[,1] %>% gsub('chr','',.),
            pos = str_split(var_list,":|_",simplify = T)[,2],
            ref = str_split(var_list,":|_",simplify = T)[,3],
            alt = str_split(var_list,":|_",simplify = T)[,4])
        # merge_genotype_data from below cell
        aligned_var_df <- merge_genotype_data(geno_ref, var_list_df, all=FALSE)
        aligned_var <- aligned_var_df %>%
          mutate(id = {
            if (grepl(":", var_list[1])) {
              if (grepl("_", var_list[1])) {
                paste(chr, paste(pos, ref, alt, sep = "_"),sep = ':')
              } else {
                paste(chr, pos, ref, alt, sep = ":")
              }
            } else {
              paste(chr, pos, ref, alt, sep = "_")
            }
          }) %>%
          pull(id)
        if (grepl("chr", var_list[1]))  aligned_var <- paste0("chr",aligned_var)
        return(aligned_var)
    }
    
    # function to map variant list to geno ref
    merge_genotype_data <- function(df1, df2, all = TRUE) {
      setDT(df1)
      setDT(df2)
      df1[, key := paste(chr, pos, pmin(alt, ref), pmax(alt, ref))]
      df2[, key := paste(chr, pos, pmin(alt, ref), pmax(alt, ref))]
      df2[df1, on = "key", flip := i.alt == ref & i.ref == alt, by = .EACHI]
      df2[flip == TRUE, c("alt", "ref") := .(ref, alt)]
      if (all) {
        df_combined <- unique(rbindlist(list(df1[, .(chr, pos, alt, ref)], df2[, .(chr, pos, alt, ref)])), by = c("chr", "pos", "alt", "ref"))
      } else {
        df_combined <- df2[, .(chr, pos, alt, ref)]
      }
      return(df_combined)
    }

    # Function to filter credible sets with all variants PIP < 0.05 and update rows based on _min_corr suffix condition
    update_and_filter_cs_ids <- function(dat_con, dat_susie, df, purity_thresholds) {
      # Identify cs_coverage columns
      cs_columns <- grep("^cs_coverage", names(df), value = TRUE)
    
      # Flag rows with any cs_coverage > 0
      df$cs_all_non_zero_orig <- rowSums(df[cs_columns] == 0) != length(cs_columns)
    
      # Iterate over cs_coverage columns and their unique CS IDs
      for (cs_column in cs_columns) {
        unique_cs_ids <- unique(df[[cs_column]])
        # Update top loci based on minimum correlation for each coverage column
          for(purity_threshold in purity_thresholds){
              df <- update_top_loci_cs_annotation(dat_con, dat_susie, top_loci_df = df, coverage_value = cs_column, threshold = purity_threshold)
          }
    
        for (cs_id in unique_cs_ids[unique_cs_ids > 0]) {
          # Flag CS IDs where all PIPs < pip_thres (default 0.05)
          pip_check <- df[df[[cs_column]] == cs_id, "pip"] < 0.05
          # label the whole cluster if all pip < pip_thres (default 0.05)
          if (all(pip_check, na.rm = TRUE)) {
            df[[cs_column]] <- ifelse(df[[cs_column]] == cs_id, 0, df[[cs_column]])
          }
        }
      }
      # Identify min corr cs_coverage columns
      mincor_columns <- grep("_min_corr", names(df), value = TRUE) # Identify _mincor columns
    
      # Filter out rows where all cs_coverage columns are 0, all _mincor columns are 0,
      # and cs_all_non_zero_orig is TRUE
      df <- df %>%
        filter(!(cs_all_non_zero_orig &
                   rowSums(df[cs_columns] == 0) == length(cs_columns) &
                   rowSums(df[mincor_columns] == 0) == length(mincor_columns))) %>%
        select(-cs_all_non_zero_orig) # Remove the temporary flag column
      df <- df %>%
        select(-starts_with("cs_coverage"), all_of(cs_columns), all_of(mincor_columns))
      return(df)
     }
    
    
     # update top loci table
     update_top_loci_cs_annotation <- function(dat_con, dat_susie, top_loci_df, coverage_value, threshold ) {
       # update susie obj based CS
       if(coverage_value == 'cs_coverage_0.95') dat_susie_tmp <- dat_susie
        else dat_susie_tmp <- dat_susie$sets_secondary[[gsub('cs_','',coverage_value)]]
       #get purity res  
       purity_res <- dat_susie_tmp$sets$purity 
    
       not_pass_min_cs <- rownames(purity_res)[purity_res$min.abs.corr < threshold] %>%
         gsub('L', '', .)
       top_loci_df[[paste0( coverage_value, "_min_corr_", threshold)]] <- top_loci_df[[coverage_value]]
       if (length(not_pass_min_cs) > 0) {
         top_loci_df[[paste0( coverage_value, "_min_corr_", threshold)]][top_loci_df[[coverage_value]] %in% not_pass_min_cs] <- 0
       }
       return(top_loci_df)
     }
    
    # function to decide run update_and_filter_cs_ids or not
    process_top_loci <- function(dat_con, dat_susie,purity_thresholds) {
      data_frame <- dat_con$top_loci
      if (has_rows(data_frame)) {
        return(update_and_filter_cs_ids(dat_con, dat_susie, data_frame, purity_thresholds))
      } else {
        return(data_frame)
      }
    }

    # function to get pip with their variant name
    get_pip_withname <- function(susie_obj){
      pip = susie_obj$susie_result_trimmed$pip
      if(!(is.null(pip)))  names(pip) = susie_obj$variant_names
      return(pip)
    }
  
    # add column in top loci to indicate if the variant is identified in susie_on_top_pc top loci table or not
    annotate_susie <- function(obj, top_loci){
        tryCatch({
            df <- data.frame()
    
            results <- lapply(obj[['susie_on_top_pc']], function(x) {
              x[['top_loci']]
            })
            results <- results[!sapply(results, is.null)]
                              
            df <- bind_rows(results)
    
            top_loci_others <-  df %>%
              filter(rowSums(select(., starts_with('cs_coverage_'))) == 0)
            top_loci_cs <- df %>%
              filter(rowSums(select(., starts_with('cs_coverage_'))) > 0)
    
            susie_vars <- rbind(top_loci_others %>% filter(pip >= 0.05), top_loci_cs) %>% pull(variant_id)
            if(length(susie_vars) > 0) susie_vars <- align_to_genoref(var_list = susie_vars, geno_ref = geno_ref, region = paste0(gsub("chr","","${_meta_info[0]}"), ":", "${_meta_info[1]}", "-", "${_meta_info[2]}"))
    
            susie_vars_cs <- top_loci_cs %>% pull(variant_id)
            if(length(susie_vars_cs) > 0)  susie_vars_cs <- align_to_genoref(var_list = susie_vars_cs, geno_ref = geno_ref, region = paste0(gsub("chr","","${_meta_info[0]}"), ":", "${_meta_info[1]}", "-", "${_meta_info[2]}"))
    
            top_loci <- top_loci %>% mutate(annotated_susie = ifelse(variant_id %in% susie_vars, 1, 0),
                               annotated_susie_cs = ifelse(variant_id %in% susie_vars_cs, 1, 0))
    
            return(top_loci)
        }, error = function(e) {
            warning("Error in annotate_susie: ", e$message)
        })
    }
    
    # function to match context values and add analysis_name
    match_contexts <- function(df, context_meta) {
      # Split context_meta's context column into separate rows, one for each comma-separated value
      context_meta_expanded <- context_meta %>% 
        separate_rows(context, sep = ",") %>%
        mutate(context = trimws(context)) # Remove potential leading and trailing whitespace

      # Loop through each context in df to find the most precise match in context_meta_expanded
      df$super_context <- sapply(df$context, function(df_context) {
        # Find all potential matches
        potential_matches <- context_meta_expanded %>%
          filter(str_detect(df_context, context)) %>%
          arrange(desc(nchar(context))) # Sort potential matches by descending length of context

        # Select the longest match as the most precise one
        if(nrow(potential_matches) > 0) {
          return(potential_matches$context[1])
        } else {
          return(NA) # Return NA if no match is found
        }
      })

      return(df)
    }
    # Define a function to process context data and match contexts
    process_and_match_contexts <- function(context_meta_path, cons_top_loci, res_minp) {
      # Read context metadata
      context_meta <- fread(context_meta_path)

      # Extract super contexts
      super_contexts <- cons_top_loci[[1]] %>% unlist %>% names %>% as.character
      # Create a data frame of context and minimum p-values
      context_df <- data.frame(context = super_contexts,
                               min_p = res_minp[[1]][super_contexts] %>% unlist %>% as.numeric)
      # Match contexts using the previously defined match_contexts function
      context_df <- match_contexts(context_df, context_meta)
      # Filter context_df by super_context
      context_df_no_order <- context_df %>% filter(context == super_context)
      context_df_with_order <- context_df %>% filter(context != super_context) 

      return(list(context_df_no_order= context_df_no_order,context_df_with_order = context_df_with_order ))
    }

    # Function to filter introns based on leafcutter2 cluster
    process_lf2_cluster <- function(df){
        df <- df%>%
        mutate(cluster_id = str_extract(context, "clu_\\d+"), 
               category = str_extract(context, "([^:]+)(?=:EN)")) %>%
        group_by(cluster_id) %>%
        # filter the intron clusters with NE and IN events
        mutate(cluster_status = ifelse(any(category %in% c("NE", "IN")), "no", "yes")) %>%
        filter(cluster_status == "yes") %>%
        ungroup %>%
        filter(!str_detect(context, ":PR:")) %>%  #FIXME: only keep unproductive events in leafcutter2 results. 
        select(-cluster_status, -cluster_id, -category)
        return(df)
    }

    # Function to combine the contexted with and without order 
    combine_order <- function(context_df_no_order, context_df_with_order){
     context_df_with_order <- context_df_with_order %>%
        group_by(super_context) %>%
        summarise(min_p = min(min_p), .groups = 'keep') %>%
        left_join(context_df_with_order, by = c("super_context", "min_p" = "min_p"))

      # Combine context_df_no_order and context_df_with_order contexts
      cons_top_loci_minp <- c(context_df_no_order$context, context_df_with_order$context)
        return(cons_top_loci_minp)
    }
    
    # Revised summarise_lfsr_by_marker function with distinct error cases
    summarise_lfsr_by_marker <- function(multicontext_res,
                                           pos = NULL,
                                           markers = NULL,
                                           conditions = NULL,
                                           poslim = NULL,
                                           lfsr_cutoff = 0.01,
                                           sentinel_only = FALSE,
                                           cs_plot = NULL,
                                           conditional_effect = TRUE) {
      tryCatch({
        # Extract fitted object from multicontext_res
        fit <- multicontext_res[[1]]$mvsusie_fitted
    
        # Set default parameters if not provided
        if (is.null(markers)) markers <- fit$variable_names
        if (is.null(conditions)) conditions <- fit$condition_names
        if (is.null(pos)) pos <- seq(1, length(markers))
        if (is.null(poslim)) poslim <- range(pos)
        if (is.null(cs_plot)) cs_plot <- names(fit$sets$cs)
    
        # Create initial data frame for plotting
        pdat <- data.frame(pip = fit$pip, pos = pos, cs = as.character(NA),
                           stringsAsFactors = FALSE)
        css <- names(fit$sets$cs)
        for (i in css) {
          j <- fit$sets$cs[[i]]
          pdat[j, "cs"] <- i
        }
        rows <- which(!is.na(pdat$cs))
        pdat_cs <- pdat[rows, ]
    
        # Filter by position limits
        rows1 <- which(pdat$pos >= poslim[1] & pdat$pos <= poslim[2])
        rows2 <- which(pdat_cs$pos >= poslim[1] & pdat_cs$pos <= poslim[2])
        pdat <- pdat[rows1, ]
        pdat_cs <- pdat_cs[rows2, ]
    
        # Order and label credible sets
        pdat_cs$cs <- factor(pdat_cs$cs)
        css <- levels(pdat_cs$cs)
        L <- length(css)
        cs_pos <- sapply(fit$sets$cs[css], function(x) median(pos[x]))
        css <- css[order(cs_pos)]
        pdat_cs$cs <- factor(pdat_cs$cs, levels = css)
        cs_size <- sapply(fit$sets$cs[css], length)
        for (i in 1:L) {
          j <- css[i]
          if (cs_size[i] == 1) {
            levels(pdat_cs$cs)[i] <- sprintf("%s (1 SNP)", j)
          } else {
            levels(pdat_cs$cs)[i] <- sprintf("%s (%d SNPs, %0.3f purity)",
                                             j, cs_size[j], fit$sets$purity[j, "min.abs.corr"])
          }
        }
    
        # Set up traits and effects matrix
        traits <- conditions
        r <- length(traits)
        lmax <- nrow(fit$alpha)
        fit$b1_rescaled <- fit$b1_rescaled[, -1, ]
        rownames(fit$b1_rescaled) <- paste0("L", 1:lmax)
        rownames(fit$single_effect_lfsr) <- paste0("L", 1:lmax)
        colnames(fit$single_effect_lfsr) <- traits
        rownames(fit$alpha) <- paste0("L", 1:lmax)
        effects <- matrix(0, r, L)
        rownames(effects) <- traits
        colnames(effects) <- css
    
        # Initialize effect data frame
        effect_dat <- data.frame(matrix(as.numeric(NA),
                                          prod(length(conditions) * length(markers)), 8))
        names(effect_dat) <- c("trait", "marker", "pos", "effect", "z", "lfsr", "cs", "sentinel")
        effect_dat$trait <- rep(conditions, length(markers))
        effect_dat$marker <- rep(markers, each = length(conditions))
        effect_dat$pos <- rep(pos, each = length(conditions))
        effect_dat$sentinel <- 0
    
        # Process each credible set
        for (i in 1:L) {
          l <- css[i]
          j <- fit$sets$cs[[l]]
          b <- fit$b1_rescaled[l, j, ]
          if (conditional_effect) {
            b <- b / fit$alpha[l, j]
          }
          marker_names <- markers[j]
          marker_idx <- which(effect_dat$marker %in% marker_names)
          effect_dat[marker_idx, "cs"] <- l
          effect_dat[marker_idx, "lfsr"] <- rep(fit$single_effect_lfsr[l, ], length(marker_names))
          effect_dat[marker_idx, "effect"] <- as.vector(t(b))
          if (!is.null(fit$z)) {
            effect_dat[marker_idx, "z"] <- as.vector(t(fit$z[j, ]))
          }
          max_idx <- which.max(fit$alpha[l, j])
          effect_dat[which(effect_dat$marker == marker_names[max_idx]), "sentinel"] <- 1
          effects[, i] <- ifelse(is.null(nrow(b)), b, b[max_idx, ])
        }
    
        # Filter and process effect data
        effect_dat <- effect_dat[which(!is.na(effect_dat$cs)), ]
        rows1 <- which(effect_dat$pos >= poslim[1] & effect_dat$pos <= poslim[2])
        effect_dat <- effect_dat[rows1, ]
        effect_dat$marker_cs <- paste0(effect_dat$marker, "(", effect_dat$cs, ")")
        pdat_sentinel <- effect_dat[which(effect_dat$sentinel == 1), ]
        pdat_sentinel <- unique(pdat_sentinel[, c("marker", "marker_cs", "pos")])
        pdat_sentinel$pip <- fit$pip[match(pdat_sentinel$marker, fit$variable_names)]
    
        # Optionally keep only sentinel variants
        if (sentinel_only) {
          effect_dat <- effect_dat[which(effect_dat$sentinel == 1), ]
        }
        # Optionally filter by specified credible sets to plot
        if (!missing(cs_plot)) {
          effect_dat <- effect_dat[which(effect_dat$cs %in% cs_plot), ]
        }
        effect_dat$cs <- factor(effect_dat$cs, levels = css)
        effect_dat$trait <- factor(effect_dat$trait, traits)
        rows <- which(effect_dat$lfsr < lfsr_cutoff)
        effect_dat <- effect_dat[rows, ]
    
        # Error Case 1: No variants left after LFSR cutoff filtering
        if (nrow(effect_dat) == 0) {
          message("Warning Case 1: No variants passed the LFSR cutoff threshold; returning NA values.")
          top_loci <- multicontext_res[[1]][['top_loci']]
          top_loci$trait <- NA
          top_loci$lfsr <- NA
          return(top_loci)
        }
    
        effect_dat <- effect_dat %>% 
          mutate(pip = fit$pip[match(marker, names(fit$pip))],
                 variant_id = marker)
    
        # Summarise results and merge with top loci
        result <- effect_dat %>%
          group_by(variant_id, cs) %>%
          arrange(trait) %>%
          summarise(trait = paste(trait, collapse = ";"), 
                    lfsr = paste(lfsr, collapse = ";"),
                    .groups = "drop") %>% 
          mutate(cs_coverage_0.95 = gsub('L', '', cs)) %>%
          select(-cs)
    
        top_loci <- multicontext_res[[1]][['top_loci']]
        merge(top_loci, result, by = c('variant_id', 'cs_coverage_0.95'), all.x = TRUE)
      }, error = function(e) {
        # Error Case 2: Any unexpected error in processing
        message("Warning Case 2: An unexpected error occurred: ", e$message)
        top_loci <- tryCatch(multicontext_res[[1]][['top_loci']],
                             error = function(x) data.frame(variant_id = NA, cs_coverage_0.95 = NA))
        top_loci$trait <- NA
        top_loci$lfsr <- NA
        return(top_loci)
      })
    }
    
    # function to add effect size
    add_coef_column <- function(dat_study) {
      # Ensure the required components exist
      # Check if dat_study contains both required components
    if (!("susie_result_trimmed" %in% names(dat_study)) || !("top_loci" %in% names(dat_study))) {
      # Return a null list instead of stopping
      return(data.frame())
    }
          
      # Extract the coefficient matrix and ensure top_loci is a data.frame
      coef_matrix <- dat_study[["susie_result_trimmed"]][["coef"]][-1,] / dat_study[["susie_result_trimmed"]][['pip']]
      dt <- dat_study[["top_loci"]]
      if (!is.data.frame(dt)) {
        dt <- as.data.frame(dt)
      }
      
      # Check that dt has the required columns
      if (!all(c("variant_id", "trait") %in% names(dt))) {
        stop("The 'top_loci' data frame must contain 'variant_id' and 'trait' columns.")
      }
      
      # Add the 'coef' column by matching each variant with its corresponding traits
      dt[["conditional_effect"]] <- mapply(function(variant, trait_str) {
        # Return NA if trait_str is missing or variant not found in coef_matrix
        if (is.na(trait_str) || !(variant %in% rownames(coef_matrix))) {
          return(NA_character_)
        }
        # Split the trait string by ';' and trim extra whitespace
        traits <- trimws(unlist(strsplit(trait_str, ";")))
        # Subset coefficients for the variant and specified traits
        coefs <- coef_matrix[variant, traits, drop = TRUE]
        paste(coefs, collapse = ";")
      }, dt[["variant_id"]], dt[["trait"]], USE.NAMES = FALSE)
      
      dat_study[["top_loci"]] <- dt
      return(dt)
    }

    ###### MAIN ######
    # Process each path and collect results
    orig_files = c(${",".join(['"%s"' % x.absolute() for x in _input])})
     # Extract info from each RDS file
    results <- list()
    gene = "${_meta_info[3]}"
    geno_ref <- "${geno_ref}"

    # Post Processing: Extracting info
    # use for loop instead of apply to save memory
    res <- res_sum <- res_minp <- cons_top_loci <- list()
    pip_sum <- data.frame()
    for(i in seq_along(orig_files)) {
      rds_path <- orig_files[i]
       dat <- tryCatch({
        readRDS(rds_path)
      }, error = function(e) {
        writeLines(rds_path %>% basename, gsub(".tsv","_error","${_output}"))
        return(NULL) # 
      })

      if(is.null(dat)) next
  
      #extract qtl type from susie rds file name, if we have set decent condtiton name, this could be removed 
      temp_list <- list() # Temporary list to store inner results
      genes <- names(dat)
        for(id in genes){
          dat_study <- dat[[id]]
          temp_list <- list()
          if(${"TRUE" if mnm else "FALSE"}) {
              if (length(dat[[id]]) <= 1) {
                message("No multi-contexts results due to no twas results; skipping multi-context export.")
                next
              }
              conditions <- paste(orig_files[i] %>% basename %>% str_split(., '[.]', simplify = T) %>% .[,1],dat_study[['context_names']] %>% paste(  ., collapse = ';'), sep = ':') 
              dat_study[['top_loci']] <- summarise_lfsr_by_marker(dat)
              dat_study[['top_loci']] <- add_coef_column(dat_study)
            } else conditions <- names(dat_study)[sapply(dat_study, function(x) length(x) > 0)]
  
          if(length(conditions) > 0) {
  
            for(condition in conditions) {
                if (${"FALSE" if mnm else "TRUE"}) dat_con <- dat_study[[condition]]${[['fsusie_summary']] if fsusie else ''} else dat_con <- dat_study${[['fsusie_summary']] if fsusie else ''}
                if (${"TRUE" if gwas else "FALSE"}){
                    method <- names(dat_study[[condition]])[names(dat_study[[condition]]) != 'rss_data_analyzed'] ## FIXME: this method is not showing in meta (not need if only one method). or we can use `context@method` in meta
                    if(length(method) == 1) dat_con <- dat_con[[method]] else stop('more than 1 method, please check.')
                }
                dat_susie <- dat_con$susie_result_trimmed
                
                # calculate the sum of pip for the vairnats with pip > 0
                pip_sum = rbind(pip_sum, 
                                data.frame(pip_sum = dat_susie[['pip']] [dat_susie[['pip']] > 0] %>% sum, 
                                           condition = condition)
                               )
                if(${"FALSE" if mnm else "TRUE"} & has_rows(dat_con[['top_loci']])) dat_con[['top_loci']] <- dat_con[['top_loci']] %>% mutate(conditional_effect = (coef(dat_susie)/ (dat_susie[['pip']]))[variant_id])
                
                # rename context names if needed
                if(${"TRUE" if condition_meta.is_file() else "FALSE"}){
                      meta <- suppressMessages(read_delim("${condition_meta}", col_names = F))
                      context <- meta %>% filter(X1 == condition) %>% pull(X2)
                      if (length(context) == 0) {
                        context <- condition
                        message("No matching entries found. context has been set to the condition value.")
                      }
                    } else {context <- condition}
                  
                # align variants to aligned geno
                if(has_rows(dat_con$top_loci) || has_rows(dat_con$preset_top_loci)) cons_top_loci[[id]][[context]] <- context else  cons_top_loci[[id]][[context]] <- NULL    
                variant_ids <- c(dat_con$top_loci$variant_id, dat_con$variant_names, dat_con$preset_variants_result$top_loci$variant_id, dat_con$preset_variants_result$variant_names)
                unique_variant_ids <- unique(variant_ids)
                aligned_variant_ids <- align_to_genoref(unique_variant_ids, geno_ref, paste0(gsub("chr","","${_meta_info[0]}"), ":", "${_meta_info[1]}", "-", "${_meta_info[2]}"))
                names(aligned_variant_ids) <- unique_variant_ids

                # change beta or z in top loci and sumstats
                top_loci_changed_indexes <- which(dat_con$top_loci$variant_id !=  aligned_variant_ids[dat_con$top_loci$variant_id] %>% as.character )
                if(has_rows(dat_con$top_loci) & length(top_loci_changed_indexes) > 0) {
                      dat_con$top_loci$conditional_effect[top_loci_changed_indexes] <- (-1)* dat_con$top_loci$conditional_effect[top_loci_changed_indexes] # FIXME if coef does not need to check flip
                      if (${"FALSE" if gwas else "TRUE"}) {
                          dat_con$top_loci$betahat[top_loci_changed_indexes] <- (-1)* dat_con$top_loci$betahat[top_loci_changed_indexes]
                      } else {
                          dat_con$top_loci$z[top_loci_changed_indexes] <- (-1)* dat_con$top_loci$z[top_loci_changed_indexes]
                      }
                }
                all_changed_indexes <- which(dat_con$variant_names !=  aligned_variant_ids[dat_con$variant_names] %>% as.character )
                if(length(all_changed_indexes) > 0) {
                      if (${"FALSE" if gwas else "TRUE"}) {
                          dat_con$sumstats$betahat[all_changed_indexes] <- (-1)* dat_con$sumstats$betahat[all_changed_indexes]
                      } else {
                           dat_con$sumstats$z[all_changed_indexes] <- (-1)* dat_con$sumstats$z[all_changed_indexes]
                      }
                }

                # change variant names 
                if(has_rows(dat_con$top_loci)) dat_con$top_loci$variant_id <- aligned_variant_ids[dat_con$top_loci$variant_id] %>% as.character ###
                dat_con$variant_names <- aligned_variant_ids[dat_con$variant_names] %>% as.character ###

                
                if (${"FALSE" if gwas else "TRUE"}) { 
                    res[[id]][[context]] <- list(
                      top_loci = process_top_loci(dat_con, dat_susie, purity_thresholds = c(${",".join(['"%s"' % x for x in min_corr])})),
                      pip = get_pip_withname(dat_con)
                    )
                    # the preset data in fsusie is actually from first PC and analyzed by susie, we'd like to remove them to avoid misleading
                    # although no preset res in mnm, we can put it here with the same layer structure. It would not report preset results
                    if (${"FALSE" if fsusie else "TRUE"}) {
                          if(has_rows(dat_con$preset_variants_result$top_loci)) {
                              dat_con$preset_variants_result$top_loci$variant_id <- aligned_variant_ids[dat_con$preset_variants_result$top_loci$variant_id] %>% as.character
                              # Calculate and add the conditional effect in one step
                              if(has_rows(dat_con$preset_variants_result[['top_loci']])){
                                  dat_con$preset_variants_result[['top_loci']] <- dat_con$preset_variants_result[['top_loci']] %>%
                                      mutate(conditional_effect = (coef(dat_con$preset_variants_result$susie_result_trimmed) /
                                                                     dat_con$preset_variants_result$susie_result_trimmed[['pip']])[variant_id])
                                  dat_con$top_loci$conditional_effect[top_loci_changed_indexes] <- (-1)* dat_con$top_loci$conditional_effect[top_loci_changed_indexes] # FIXME if coef does not need to check flip
                                }
                              }
                          dat_con$preset_variants_result$variant_names <- aligned_variant_ids[dat_con$preset_variants_result$variant_names] %>% as.character
                          # change beta in preset top loci
                          preset_top_loci_changed_indexes <- which(dat_con$preset_variants_result$top_loci$variant_id !=  aligned_variant_ids[dat_con$preset_variants_result$top_loci$variant_id] %>% as.character )
                          if(has_rows(dat_con$top_loci) & length(preset_top_loci_changed_indexes) > 0) dat_con$preset_variants_result$top_loci$betahat[preset_top_loci_changed_indexes] <- (-1)* dat_con$preset_variants_result$top_loci$betahat[preset_top_loci_changed_indexes]   

                          res[[id]][[context]][['region_info']] = dat_con$region_info
                          res[[id]][[context]][['CV_table']] = dat_con$twas_cv_result$performance
                          res[[id]][[context]][['preset_top_loci']] = process_top_loci(dat_con$preset_variants_result, dat_con$preset_variants_result$susie_result_trimmed, purity_thresholds = c(${",".join(['"%s"' % x for x in min_corr])}))
                          res[[id]][[context]][['preset_pip']] = get_pip_withname(dat_con$preset_variants_result)
                    } else {
                          res[[id]][[context]][['region_info']] = dat_study[[condition]]$region_info
                          res[[id]][[context]][['CV_table']] = dat_study[[condition]]$twas_cv_result$performance
                          res[[id]][[context]][['top_loci']] = annotate_susie(dat_study[[condition]], res[[id]][[context]][['top_loci']])
                          
                    } 
                # fsusie do not have sumstats or p value
                if (${"FALSE" if fsusie or mnm else "TRUE"}) {
                    res_sum[[id]][[context]] <- list(
                      variant_names = dat_con$variant_names,
                      sumstats = dat_con$sumstats
                    )

                    if (${"FALSE" if fsusie or mnm else "TRUE"}) res_minp[[id]][[context]] <- min(pecotmr:::wald_test_pval(dat_con$sumstats$betahat, dat_con$sumstats$sebetahat,  n = 1000)) ##assuming sample size is 1000
                }
  
                } else {
                    res[[id]][[context]][[method]] <- list(
                      top_loci = process_top_loci(dat_con, dat_susie, purity_thresholds = c(${",".join(['"%s"' % x for x in min_corr])})),
                      pip = get_pip_withname(dat_con)
                    )
                    res_sum[[id]][[context]][[method]] <- list(
                      variant_names = dat_con$variant_names,
                      sumstats = dat_con$sumstats
                    )

                }
                

                if(has_rows(dat_con$top_loci) || has_rows(dat_con$preset_top_loci)) cons_top_loci[[id]][[context]] <- context else  cons_top_loci[[id]][[context]] <- NULL
              }
            }
        }
    }

    cons_top_loci <- cons_top_loci %>% compact()  # Use 'compact' to remove NULLs
    cons_top_loci <- if(length(cons_top_loci) > 0) cons_top_loci else NA

    combine_data = combine_data_sumstats = cons_top_loci_minp = ''
    combine_data = paste0("${_output:add}","/","${name}", ".", ${'"epigenomics_"' if fsusie else '""'}, ${ '"metabolomics_"' if metaQTL else '""'}, gene, ".cis_results_db.export.rds")
    if (${"FALSE" if fsusie else "TRUE"}) combine_data_sumstats = gsub("export.rds$", "export_sumstats.rds", combine_data)
  
    if (${"TRUE" if exported_file.is_file() else "FALSE"}){
        if (file.exists(combine_data)) {
          res_exp <- readRDS(combine_data)
          res[[gene]] <- c(res[[gene]], res_exp[[gene]]) # this may need to change if tehre are multiple genes in one rds file. 
        }
        if (file.exists(combine_data_sumstats)) {
          res_sum_exp <- readRDS(combine_data_sumstats)
          res_sum[[gene]] <- c(res_sum[[gene]], res_sum_exp[[gene]])
        }
    }
    saveRDS(res, combine_data)
    # only save sumstats results when NOT fsusie or multi gene mvsusie
    if (${"FALSE" if fsusie or mnm else "TRUE"}) saveRDS(res_sum, combine_data_sumstats)
  
    # generate md5 for data transferring
    system(paste("md5sum ",combine_data, " > ", paste0(combine_data, ".md5")))
    if (${"FALSE" if fsusie or mnm else "TRUE"}) system(paste("md5sum ",combine_data_sumstats, " > ", paste0(combine_data_sumstats, ".md5")))
  
      
    TSS <- tryCatch({dat_con$region_info$region_coord$start},  error = function(e) {return(NA)})
    if (length(res) > 0) conditions = paste(names(res[[1]]), collapse = ",") else conditions = ''
    
    # fsusie does not have sumstats or pvalue, do not need to run this

    # if (${"FALSE" if fsusie or gwas or mnm else "TRUE"})  {
    #     context_map <- process_and_match_contexts('${context_meta}', cons_top_loci, res_minp)
    #     context_map$context_df_with_order <- process_lf2_cluster(context_map$context_df_with_order)
    #     cons_top_loci_minp <- combine_order(context_map$context_df_no_order, context_map$context_df_with_order)
    # }

    # save pip sum file 
     write_delim(pip_sum, 
                 paste0("${_output:add}","/","${name}", ".", ${'"epigenomics_"' if fsusie else '""'}, ${ '"metabolomics_"' if metaQTL else '""'}, gene, ".pip_sum"), 
                 delim = '\t')

    meta = data.frame(chr="${_meta_info[0]}", start="${_meta_info[1]}", end="${_meta_info[2]}", region_id="${_meta_info[3]}", TSS =  if(is.null(TSS)) NA else TSS, 
                      original_data = paste(basename(orig_files), collapse = ","), combined_data = basename(combine_data), combined_data_sumstats = basename(combine_data_sumstats), 
                      conditions = conditions, 
                      conditions_top_loci = if(length(cons_top_loci) > 0) cons_top_loci[[1]] %>% unlist %>% names %>% as.character %>% paste(., collapse = ',') else ''
                      # , conditions_top_loci_minp = if(length(cons_top_loci_minp) > 0) cons_top_loci_minp %>% paste(., collapse = ',') else ''
                      )

    write_delim(meta, "${_output}", delim = '\t')
  
[cis_results_export_2, gwas_results_export_2]
# set it as True if you only want run first step (running parallel) and combine them manually after all finished 
parameter: step1_only = False
skip_if(step1_only)
# provide exported meta to filter the exported genes 
parameter: exported_file = path()
# optional: qtl or gwas, there is slightly different in qtl and gwas rds file
parameter: gwas = False
input: group_by = 'all'
output: f"{cwd}/{name}.{'block_results_db' if gwas else 'cis_results_db'}.tsv" 
# stop_if(_input[0] not in locals().keys(), 'All files have been exported already') #FIXME should we remove to a separate file. sothat we can stop globally as above
task: trunk_workers = 1, walltime = '1h', trunk_size = 1, mem = '16G', cores = 1, tags = f'{_output:bn}'
bash: expand = "${ }", container = container, stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout', entrypoint=entrypoint
    if [ -e "${_output:ad}/${name}_cache/" ]; then
        sed 's/^chr/#chr/' `ls ${_output:ad}/${name}_cache/*tsv |head -n1` | head -n 1 > ${_output:an}.temp
        tail -n +2 -q ${_output:ad}/${name}_cache/*.tsv >> ${_output:an}.temp
        error_files=$(find "${_output:ad}/${name}_cache/" -type f -name "*_error")

        if [[ -n $error_files ]]; then
            cat $error_files >> ${_output:an}.error_genes
        else
            echo "No truncated files detected"
        fi
    else
        echo "All files have been exported already"
    fi
    
R: expand = "${ }", container = container, stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout', entrypoint=entrypoint
    if (file.exists(paste0(${_output:anr},".temp"))) {
        library(tidyverse)
        meta <- read_delim(paste0(${_output:anr},".temp"), delim = '\t')

        if (${"TRUE" if exported_file.is_file() else "FALSE"}){
          exp_meta <- read_delim(${exported_file:r}, delim = '\t')
          meta <- bind_rows(meta, exp_meta) %>%
              group_by(`#chr`, start, end, region_id, TSS) %>%
              summarise(across(c(original_data, combined_data, combined_data_sumstats, conditions, conditions_top_loci), 
                               ~paste(unique(.), collapse = ",")),
                        .groups = 'drop')
              }

        write_delim(meta, ${_output:r}, delim = '\t')
    }
[cis_results_export_3]
# set it as True if you only want run first step (running parallel) and combine them manually after all finished 
parameter: step1_only = False

skip_if(step1_only)
bash: expand = "${ }", container = container, stderr = f'{_input:n}.stderr', stdout = f'{_input:n}.stdout', entrypoint=entrypoint   
    rm -rf "${_input:ad}/${name}_cache/"
    rm -rf ${_input:an}.temp

GWAS results consolidation#

#get union of step1
#1200 blocks costed ~5mins with in one for loop
[gwas_results_export_3]
parameter: step1_only = False

skip_if(step1_only)
output: f"{cwd}/{name}.union_export.tsv.gz" 
task: trunk_workers = 1, walltime = '1h', trunk_size = 1, mem = '16G', cores = 1, tags = f'{_output:bn}'
bash: expand = "${ }", container = container, stderr = f'{_input:n}.stderr', stdout = f'{_input:n}.stdout', entrypoint=entrypoint   
    rm -rf "${_input:ad}/${name}_cache/"
    rm -rf ${_input:an}.temp

R: expand = "${ }", container = container, stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout', entrypoint=entrypoint
    library(tidyverse)
    library(data.table)

    mtx <- read_delim(${_input:r})
    files <- mtx %>% filter(!is.na(conditions_top_loci)) %>% pull(combined_data) %>% paste0(${_input[0]:dr},'/',.)
    all_top_loci <- data.frame()

    for (i in seq_along(files)) {
      file <- files[i]
      res <- readRDS(file)

      file_top_loci <- lapply(names(res), function(block) {
        lapply(names(res[[block]]), function(study) {
          lapply(names(res[[block]][[study]]), function(method) {
            if (!is.null(res[[block]][[study]][[method]]$top_loci)) {
              temp_df <- res[[block]][[study]][[method]]$top_loci
              mutate(temp_df, study = study, method = method, block = block)
            } else {
              NULL  
            }
          })
        }) %>% bind_rows()  
      }) %>% bind_rows() 

      all_top_loci <- bind_rows(all_top_loci, file_top_loci)
      if (i %% 100 == 0) {
        message(sprintf("Have processed %d files.", i))
      }
    }
    fwrite(all_top_loci, ${_output:r})
# simply combine seperate meta files, not works for having an exsiting exported file for now. 
[combine_export_meta]
parameter: cache_path=path
parameter: output_file = str
parameter: remove_cache = False
output: f"{cache_path:d}/{output_file}" 
bash: expand = "${ }", container = container, stderr = f'{_input:n}.stderr', stdout = f'{_input:n}.stdout', entrypoint=entrypoint   
    head -n 1 -q ${cache_path}/*.tsv | sed 's/^chr/#chr/' | head -n 1 > ${_output}
    tail -n +2 -q ${cache_path}/*.tsv >> ${_output}
    if ${"true" if remove_cache else "false"}; then
        rm -rf ${cache_path}
    fi
[export_top_loci]
parameter: export_path=path
parameter: region = str
parameter: prefix = str
parameter: suffix = str
parameter: fsusie_prefix = ''
parameter: preset_top_loci = False
parameter: lfsr_thres = 0.01
input: f"{export_path}/{prefix}.{fsusie_prefix}{region}.{suffix}" 
output: f"{cwd:a}/{prefix}.{region}.toploci.bed.gz" 
task: trunk_workers = 1, walltime = '1h', trunk_size = job_size, mem = '16G', cores = 1, tags = f'{_output:bn}'
R: expand = "${ }", container = container, stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout', entrypoint=entrypoint
    library(tidyverse)
    library(data.table)
    lfsr_thres <- as.numeric(${lfsr_thres})
    #function to filter lfsr
    filter_lfsr <- function(df, lfsr_thres){
        df_na <- df %>% filter(is.na(lfsr)|lfsr == '') %>% mutate(event_ID = NA)
        df %>%
            mutate(
              lfsr_list = str_split(lfsr, ";") %>% map(as.numeric),
              effect_list = str_split(conditional_effect, ";"),
              event_list = str_split(event_ID, ";"),
              valid_indices = map(lfsr_list, ~ which(.x < lfsr_thres))
            ) %>%
            rowwise() %>%
            mutate(
              event_ID = paste(event_list[valid_indices], collapse = ";"),
              conditional_effect = paste(effect_list[valid_indices], collapse = ";"),
              lfsr = paste(lfsr_list[valid_indices], collapse = ";")
            ) %>%
            filter(event_ID != "") %>%
            select(any_of(colnames(file_top_loci_rename))) %>%
            ungroup() %>% rbind(df, df_na)
        }
    
    ## main
    res <- readRDS("${_input}")
    file_top_loci <- lapply(names(res), function(gene) {
        lapply(names(res[[gene]]), function(context) {
          lapply(c("top_loci"${',"preset_top_loci"' if preset_top_loci else ''}), function(method) {
            if ("RSS_QC_RAISS_imputed" %in% names(res[[gene]][[context]])) { # that is for rss output
              temp_df <- res[[gene]][[context]][['RSS_QC_RAISS_imputed']][[method]]
            } else {
              temp_df <- res[[gene]][[context]][[method]]
            }
            if (!is.null(temp_df)) {
              mutate(temp_df, study = context, method = method, region = gene)
            } else {
              NULL  
            }
          })
        }) %>% bind_rows()  
    }) %>% bind_rows() 

    file_top_loci_rename <- file_top_loci %>% 
      # Select necessary columns from the original table
      select(any_of(c(
        "variant_id", "pip", 
        "cs_coverage_0.95_min_corr","cs_coverage_0.7_min_corr", "cs_coverage_0.5_min_corr",
        "cs_coverage_0.95_min_corr_0.8","cs_coverage_0.7_min_corr_0.8", "cs_coverage_0.5_min_corr_0.8",
        "cs_coverage_0.95_min_corr_0.5","cs_coverage_0.7_min_corr_0.5", "cs_coverage_0.5_min_corr_0.5",
        "study", "method", "region", "conditional_effect", "lfsr", "trait"
      ))) %>% 
      mutate(
        # Split variant_id into separate components
        chr = str_split(variant_id, ":", simplify = TRUE)[, 1],  # Chromosome
        pos = str_split(variant_id, ":", simplify = TRUE)[, 2],  # Position
        a1  = str_split(variant_id, ":", simplify = TRUE)[, 4],  # Allele 1
        a2  = str_split(variant_id, ":", simplify = TRUE)[, 3],  # Allele 2
        # Rename and duplicate columns as required
        variant_ID = variant_id,
        gene_ID    = region,
        event_ID   = study,
        PIP = pip,
      )  %>%
      {
        if (any(c("cs_coverage_0.95_min_corr_0.8", "cs_coverage_0.7_min_corr_0.8", "cs_coverage_0.5_min_corr_0.8", 
                  "cs_coverage_0.95_min_corr_0.5", "cs_coverage_0.7_min_corr_0.5", "cs_coverage_0.5_min_corr_0.5", 
                  "cs_coverage_0.95_min_corr", "cs_coverage_0.7_min_corr", "cs_coverage_0.5_min_corr") %in% colnames(.))) {
          mutate(.,
            cs_coverage_0.95 = if ("cs_coverage_0.95_min_corr_0.8" %in% colnames(.)) get("cs_coverage_0.95_min_corr_0.8") else 
                               if ("cs_coverage_0.95_min_corr" %in% colnames(.)) get("cs_coverage_0.95_min_corr") else NA,
            
            cs_coverage_0.7 = if ("cs_coverage_0.7_min_corr_0.8" %in% colnames(.)) get("cs_coverage_0.7_min_corr_0.8") else 
                              if ("cs_coverage_0.7_min_corr" %in% colnames(.)) get("cs_coverage_0.7_min_corr") else NA,
            
            cs_coverage_0.5 = if ("cs_coverage_0.5_min_corr_0.8" %in% colnames(.)) get("cs_coverage_0.5_min_corr_0.8") else 
                              if ("cs_coverage_0.5_min_corr" %in% colnames(.)) get("cs_coverage_0.5_min_corr") else NA,
            
            cs_coverage_0.95_purity0.5 = if ("cs_coverage_0.95_min_corr_0.5" %in% colnames(.)) get("cs_coverage_0.95_min_corr_0.5") else NA,
            cs_coverage_0.7_purity0.5 = if ("cs_coverage_0.7_min_corr_0.5" %in% colnames(.)) get("cs_coverage_0.7_min_corr_0.5") else NA,
            cs_coverage_0.5_purity0.5 = if ("cs_coverage_0.5_min_corr_0.5" %in% colnames(.)) get("cs_coverage_0.5_min_corr_0.5") else NA
          )
        } else .
      } %>%
        # Conditionally add lfsr if the column exists
      mutate(region_id = basename("${_input}") %>% str_split(., '[.]', simplify = T) %>% .[,2])  %>% 
      # Conditionally add lfsr if the column exists
      { if ("lfsr" %in% colnames(.)) mutate(., lfsr = lfsr, context = str_split(event_ID, ':', simplify = T) %>% .[,1]) else . } %>% 
      { if ("lfsr" %in% colnames(.) & lfsr_thres != 0) mutate(., event_ID = trait) else . } %>% 
      # Select and order the final set of columns
      select(any_of(c("chr", "pos", "a1", "a2", "variant_ID", "gene_ID", "event_ID", "cs_coverage_0.95", "cs_coverage_0.7", "cs_coverage_0.5",
                      "cs_coverage_0.95_purity0.5", "cs_coverage_0.7_purity0.5", "cs_coverage_0.5_purity0.5", "PIP", "conditional_effect", 
                      "lfsr", "region_id", "context"${'",method"' if preset_top_loci else ''}))) %>% 
      # Sort by chromosome and position (convert pos to numeric if necessary)
      arrange(chr, as.numeric(pos))
    # Check if "lfsr" is in column names
    if ("lfsr" %in% colnames(file_top_loci_rename) & lfsr_thres != 0) file_top_loci_rename <- filter_lfsr(file_top_loci_rename, lfsr_thres = lfsr_thres)
    fwrite(file_top_loci_rename,"${_output}")

Overlap QTL and GWAS results#

[overlap_qtl_gwas_1]
parameter: per_chunk = 100
parameter: gwas_meta_path = path()
parameter: qtl_meta_path = path()
parameter: gwas_file_path = ''
parameter: qtl_file_path = ''
# Optional: if a region list is provide the analysis will be focused on provided region. 
# The LAST column of this list will contain the ID of regions to focus on
# Otherwise, all regions with both genotype and phenotype files will be analyzed
parameter: region_list = path()
# Optional: if a region name is provided 
# the analysis would be focused on the union of provides region list and region names
parameter: region_name = []
import pandas as pd
import numpy as np
from pathlib import Path

if qtl_file_path == '':
    qtl_file_path = qtl_meta_path.parent
if gwas_file_path == '':
    gwas_file_path = gwas_meta_path.parent
    
# Load the data, suppressing messages is not typically done in pandas as it does not inherently output messages when loading files
gwas_meta = pd.read_csv(gwas_meta_path, sep='\t', low_memory=False)
gwas_meta = gwas_meta[gwas_meta['conditions_top_loci'].notna()]

qtl_meta = pd.read_csv(qtl_meta_path, sep='\t', low_memory=False)
qtl_meta = qtl_meta[qtl_meta['conditions_top_loci'].notna()]

# Filter and mutate operations, translated to pandas
gwas_meta['combined_data_toploci'] = gwas_meta.apply(lambda row: row['combined_data'] if pd.notnull(row['conditions_top_loci']) else pd.NA, axis=1)

region_ids=[]
# If region_list is provided, read the file and extract IDs
if not region_list.is_dir():
    if region_list.is_file():
        region_list_df = pd.read_csv(region_list, sep='\t', header=None, comment = "#")
        region_ids = region_list_df.iloc[:, -1].unique()  # Extracting the last column for IDs
    else:
        raise ValueError("The region_list path provided is not a file.")
        
if len(region_name) > 0:
    region_ids = list(set(region_ids).union(set(region_name)))
    
if len(region_ids) > 0:
    qtl_meta = qtl_meta[qtl_meta['region_id'].isin(region_ids)]
    
def group_by_region(lst, partition):
    # from itertools import accumulate
    # partition = [len(x) for x in partition]
    # Compute the cumulative sums once
    # cumsum_vector = list(accumulate(partition))
    # Use slicing based on the cumulative sums
    # return [lst[(cumsum_vector[i-1] if i > 0 else 0):cumsum_vector[i]] for i in range(len(partition))]
    return partition

grouped_gwas_meta = {k: v for k, v in gwas_meta.groupby('#chr')}
def check_overlap(gene_row, grouped_gwas_meta):
    chr_group = gene_row['#chr']
    if chr_group in grouped_gwas_meta:
        block_region = grouped_gwas_meta[chr_group]
        overlaps = block_region[
            (block_region['start'] <= gene_row['end']) &
            (block_region['end'] >= gene_row['start'])
        ]
        if not overlaps.empty:
            return ','.join(overlaps['combined_data_toploci'].astype(str))
    return pd.NA

stop_if(len(qtl_meta) == 0, f'No file left for analysis ')

qtl_meta_cand = qtl_meta.apply(lambda row: pd.Series({
    'gwas_file': check_overlap(row, grouped_gwas_meta)
}), axis=1)

# Concatenate the new columns to the original qtl_meta DataFrame
qtl_meta_cand = pd.concat([qtl_meta, qtl_meta_cand], axis=1)
qtl_meta_filtered = qtl_meta_cand[qtl_meta_cand['gwas_file'].notna()]
qtl_meta_filtered = qtl_meta_filtered.dropna(subset=['gwas_file'])


regional_data = {
    'meta': [(row['#chr'], row['start'], row['end'], row['region_id'], row['gwas_file'].split(',')) for _, row in qtl_meta_filtered.iterrows()],
    'qtl_data': [f"{qtl_file_path}/{row['combined_data']}" for _, row in qtl_meta_filtered.iterrows()]
}

meta_info = regional_data['meta']
stop_if(len(regional_data['qtl_data']) == 0, f'No file left for analysis ')

input: regional_data["qtl_data"], group_by = lambda x: group_by_region(x, regional_data["qtl_data"]), group_with = "meta_info"
output: f"{cwd}/gwas_qtl/cache/{name}_gwas_batch_meta_{_meta_info[3]}.tsv"
task: trunk_workers = 1, walltime = '1h', trunk_size = job_size, mem = '16G', cores = 1, tags = f'{_output:bn}'
R: expand = "${ }", container = container, stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout', entrypoint=entrypoint
    library(tidyverse)
    library(plyr)
    library(data.table)

    # Function to add 'chr' in variants
    add_chr_prefix <- function(var) {
      if (any(grepl("chr", var))) {
        var <- var
      } else {
        var <- paste0("chr", var)
      }
      return(var)
    }

    # Function to check if a dataframe has rows
    has_rows <- function(df) {
      !is.null(df) && nrow(df) > 0
    }


    extract_top_loci <- function(res, include_method = FALSE) {
      all_top_loci <- lapply(names(res), function(region) {
        lapply(names(res[[region]]), function(study) {
          if (include_method) {
            method_results <- lapply(names(res[[region]][[study]]), function(method) {
              top_loci <- NULL
              if (!is.null(res[[region]][[study]][[method]]$top_loci) && nrow(res[[region]][[study]][[method]]$top_loci) > 0) {
                top_loci <- mutate(res[[region]][[study]][[method]]$top_loci, study = study, method = method, region = region)
              }
              return(top_loci)
            })
            return(bind_rows(method_results))
          } else {
            top_loci <- list()
            if (!is.null(res[[region]][[study]]$top_loci) && nrow(res[[region]][[study]]$top_loci) > 0) {
              top_loci[[length(top_loci) + 1]] <- mutate(res[[region]][[study]]$top_loci, study = study, region = region, method = 'top_loci')
            }
            if (!is.null(res[[region]][[study]]$preset_top_loci) && nrow(res[[region]][[study]]$preset_top_loci) > 0) {
              top_loci[[length(top_loci) + 1]] <- mutate(res[[region]][[study]]$preset_top_loci, study = study, region = region, method = 'preset_top_loci')
            }
            return(bind_rows(top_loci))
          }
        })
      }) %>% bind_rows() %>% na.omit()

      return(all_top_loci)
    }

  
    # load data 
    qtl_file = c(${",".join(['"%s"' % x.absolute() for x in _input])})
     # Extract info from each RDS file
    gwas_files = c(${",".join('"%s"' % x for x in _meta_info[4])}) %>% paste0('${gwas_file_path}','/',.)
    # Process GWAS files
    gwas_all_top_loci <- do.call(rbind.fill, lapply(gwas_files, function(file) {
      res <- readRDS(file)
      gwas_all_top_loci <- extract_top_loci(res, include_method = TRUE) 
    }))
    if(!is.null(gwas_all_top_loci) && nrow(gwas_all_top_loci) > 0){# fixme: could remove this judge if we get a solid enough meta
      gwas_all_top_loci <- gwas_all_top_loci%>% select(-c('z'))

      # Process QTL file
      qtl <- readRDS(qtl_file)
      qtl_all_top_loci <- extract_top_loci(qtl) 
      if(!is.null(qtl_all_top_loci) && nrow(qtl_all_top_loci) > 0){# fixme: could remove this judge if we get a solid enough meta
        # qtl_all_top_loci <- qtl_all_top_loci%>% select(-c('betahat','sebetahat','maf'))

        cs_cal <- c('cs_coverage_0.95','cs_coverage_0.7','cs_coverage_0.5')

        qtl_all_var <- qtl_all_top_loci %>%
            #filter(rowSums(.[,cs_cal]) > 0 ) %>% #fixme
            pull(variant_id)

        gwas_all_var <- gwas_all_top_loci %>%
            #filter(rowSums(.[,cs_cal]) > 0 ) %>% #fixme
            pull(variant_id)

        gwas_all_var <- if(any(grepl("chr", qtl_all_var))) add_chr_prefix(gwas_all_var) else gsub("chr", "", gwas_all_var)
        gwas_all_top_loci$variant_id <- gwas_all_var
        # since both qtl and gwas haven mapped to geno ref in exporting, here we intersect them directly
        int_var <- intersect(qtl_all_var, gwas_all_var)
        if(length(int_var) > 0){
            gwas_all_top_loci <- gwas_all_top_loci %>% filter(variant_id %in% int_var) # fixme: keep all gwas variants or intersected ones

            all_top_loci <- rbind.fill(gwas_all_top_loci, qtl_all_top_loci)
            fwrite(all_top_loci,  gsub('_gwas_batch_meta','_gwas_batch_export',${_output:r}))

            new_gwas <- split(gwas_all_top_loci, gwas_all_top_loci$study)
            new_gwas <- lapply(new_gwas, function(df) {
              split(df, df$method)
            })

            qtl[[1]] <- c(qtl[[1]], new_gwas)
            new_qtl_path <-  paste0(${_output:ddr},"/",gsub(".rds",".overlapped.gwas.rds",basename(qtl_file)))
            saveRDS(qtl, new_qtl_path)
  
            block_top_loci = gwas_all_top_loci$region %>% unique %>% paste(., collapse = ',')
            final_combined_data = new_qtl_path %>% basename
        } else {block_top_loci = final_combined_data = NA} # fixme: could remove this judge if we get a solid enough meta
      } else {block_top_loci = final_combined_data = NA} # fixme: could remove this judge if we get a solid enough meta
    } else {
        block_top_loci = final_combined_data = NA
    }
 
    qtl_meta <- suppressMessages(read_delim('${qtl_meta_path}'))
    qtl_meta <- qtl_meta %>% filter(region_id == '${_meta_info[3]}')  %>% mutate(block_top_loci = block_top_loci,
                                                     final_combined_data = final_combined_data)
    fwrite(qtl_meta, ${_output:r})
[overlap_qtl_gwas_2]
# set it as True if you only want run first step (running parallel) and combine them manually after all finished 
parameter: step1_only = False
skip_if(step1_only)
input: group_by = 'all'
output: f"{cwd}/{name}.overlapped.gwas.tsv"
task: trunk_workers = 1, walltime = '1h', trunk_size = job_size, mem = '16G', cores = 1, tags = f'{_output:bn}'
R: expand = "${ }", container = container, stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout', entrypoint=entrypoint
    library(data.table)
    exp_path <- ${_input[0]:adr}
    meta_files <- c(${",".join(['"%s"' % x.absolute() for x in _input])})
    exp_files <- list.files(exp_path, "_gwas_batch_export", full.names = T)
    meta_list <- exp_list <- list()
    meta_combined <- rbindlist(lapply(meta_files, fread), fill = TRUE)
    exp_combined <- rbindlist(lapply(exp_files, fread), fill = TRUE)
    fwrite(exp_combined, gsub("tsv","export.csv.gz","${_output}"))
    fwrite(meta_combined, "${_output}", sep = '\t')
  
#bash: expand = "${ }", container = container, stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout', entrypoint=entrypoint
    # rm -rf ${_input[0]:adr}