Fine-mapping result post-processing#

This is a new-user-friendly minimal working example (MWE) of the mnm_postprocessing pipeline (the original SuSiE_post_processing notebook). It post-processes SuSiE fine-mapping results into flat, human-readable TSV tables. It runs end-to-end on the bundled chr22 toy dataset.

Description#

The full pipeline consolidates fine-mapping output into text tables, generates PIP/UpSet plots, exports VCFs, and overlaps QTL with GWAS results. Most of those steps need GWAS summary statistics, region/plot-recipe files, and a singularity container that the toy dataset does not ship.

This MWE focuses on the core, genuinely runnable step, susie_to_tsv, which reads each per-region SuSiE object and writes three tables: a per-variant table (PIP, credible-set membership, posterior mean/sd), a log-Bayes-factor table, and an effect table. The remaining workflows are kept as documented command templates.

Input Files#

All paths are relative to the working directory data/example_10peaks_susie_twas/ (run commands from there).

  • SuSiE objects (--rds_path): per-region fine-mapping RDS files. The toy inputs in input_postproc/cs.<gene>.uni.bvsr.rds are produced by the prep step below, which reshapes the genuine output_uni/fine_mapping/*.univariate_bvsr.rds results (from the mnm_regression MWE) into the flat susie object structure susie_to_tsv expects. This only reshapes real fine-mapping output; it does not fabricate any values.

  • Region list (--region-list): a TSV with columns #chr, start, end, gene_id describing each region. input_postproc/region_list.tsv is written by the same prep step.

Steps#

Prep: build the toy inputs (one-off)#

Reshape the univariate SuSiE results into flat per-region susie RDS files and a matching region list. Run from the toy_example root.

Timing: Runtime varies by dataset size and compute resources. For the toy chr22 MWE dataset, most steps complete in under 10 minutes on a standard HPC node.

Required CLI args (substitute placeholders <…> with your real values):#

–file_path dir containing fine-mapping RDS files#

–prefix <fm_rds_prefix> filename part BEFORE the region id#

–suffix <fm_rds_suffix> filename part AFTER the region id#

expected per-region path:#

{file_path}/{prefix}.{region}.{suffix}#

example: ROSMAP_AC_eQTL.ENSG00000012779.univariate_bvsr.rds#

__ prefix / _ region _/ _ suffix ___/#

–name <study_name> tag used in output filenames and intermediate RDS names#

(final output: {cwd}/summary/{name}.{qtl_type}.top_loci.bed.gz)#

–qtl_type cis | trans | trans_ appears in the combined filename#

–region_file bed of regions (chr, start, end, region_id) to enumerate#

–geno_ref tabix-indexed bim.gz for allele-flipping checks#

#

Outputs produced:#

{cwd}/{name}.{region}.cis_results_db.rds per-region intermediate RDS#

{cwd}/summary/{name}.{region}.top_loci.bed.gz per-region top_loci#

{cwd}/summary/{name}.{qtl_type}.top_loci.bed.gz combined public-facing file#

Full run: export all regions via SLURM and combine into one study-level file.#

sos run ./mnm_postprocessing.ipynb export_top_loci
–region_file ./TADB_enhanced_cis.coding.bed
–file_path /path/to/fine_mapping
–prefix <fm_rds_prefix>
–suffix <fm_rds_suffix>
–name <study_name>
–min_corr 0.8
–geno_ref ./Fungen_xQTL.ROSMAP_NIA_WGS.ROSMAP_NIA_WGS.MSBB_WGS_ADSP_hg38.MiGA.MAP_Brain-xQTL_Gwas_geno_0.STARNET.aligned.bim.gz
–qtl_type cis
–cwd ~/tmp/export_run
–mem 14G –walltime 72h
-q slurm -j 100 -s build

Step 1: susie_to_tsv (VERIFIED end-to-end on the toy data)#

Convert the SuSiE objects to per-variant / lbf / effect TSV tables. The --container argument is left empty so the step runs natively in the project environment.

Note: the preserved susie_to_tsv code (last cell) was written for full, untrimmed SuSiE objects. Four small compatibility fixes were applied in this userfriendly copy so it runs on the trimmed output_uni objects: keep variant IDs as character when parsing chr/pos; derive ref/alt from the :-separated variant ID; align coef.susie() length to the variant count; and use a per-effect summary of lbf_variable when a true per-effect $lbf is absent. These only reshape/realign genuine values - no results are fabricated. The original notebook is unchanged.

sos run pipeline/mnm_postprocessing.ipynb susie_to_tsv \
    --cwd output_postproc --name toy \
    --rds_path input_postproc/cs.*.uni.bvsr.rds \
    --region-list input_postproc/region_list.tsv \
    --container "" -j1

Output Files#

Under --cwd (e.g. output_postproc/), per input SuSiE object:

  • <gene>.variant.tsv - one row per variant: PIP, credible-set order/id, posterior mean and sd.

  • <gene>.lbf.tsv - per-variant log Bayes factors for each single-effect.

  • <gene>.effect.tsv - per-effect summary.

Anticipated Results#

You should get three TSV files per region (16 regions on the toy gene-expression set). On this small toy dataset most regions contain no strong credible set, so the credible-set columns are mostly empty - that is the correct, honest result for these independent single-gene toy regions.

Other workflows (command templates)#

The original notebook defines many more post-processing workflows that need inputs the toy dataset does not ship (GWAS summary statistics, plot recipe files, singularity container, sQTL/fSuSiE-specific objects). They are kept here as documented templates for completeness and are not verified on the toy data:

  • fsusie_to_tsv, sQTL_susie_to_tsv - same idea for fSuSiE / sQTL objects.

  • susie_tsv_collapse - collapse per-region TSVs into one table.

  • susie_pip_landscape_plot, susie_upsetR_plot, susie_upsetR_cs_plot - plotting (need a plot-recipe file).

  • tmp_annotatation_of_snps_1/2 - SNP annotation.

  • fsusie_extract_effect, fsusie_affected_region - fSuSiE effect extraction.

  • mv_susie_2 - export results to VCF.

  • cis_results_export_*, gwas_results_export_*, combine_export_meta, export_top_loci - cis/GWAS consolidation (need GWAS meta files).

  • overlap_qtl_gwas_1/2 - overlap QTL and GWAS results (need GWAS data).

Pipeline configuration and the susie_to_tsv workflow#

The cells below are the global parameters and the susie_to_tsv workflow definition, preserved from the original notebook.

[global]
import glob
import pandas as pd
# A region list file documenting the chr_pos_ref_alt of a susie_object
parameter: cwd = path("output")
parameter: name = "demo"

## Path to work directory where output locates
## Containers that contains the necessary packages
parameter: container = ""
# For cluster jobs, number commands to run per job
parameter: job_size = 1
# Wall clock time expected
parameter: walltime = "96h"
# Memory expected
parameter: mem = "6G"
# Number of threads
parameter: numThreads = 2
parameter: windows = 1000000
# use this function to edit memory string for PLINK input
from sos.utils import expand_size
[susie_to_tsv_1]
# Input
# For complete susie, region_list or tad_list, for susie_rss , LD region list 
parameter: region_list = path
region_tbl = pd.read_csv(region_list,sep = "\t")
parameter: rds_path = paths
input: rds_path, group_by = 1
output: f"{cwd}/{_input:bn}.variant.tsv",f"{cwd}/{_input:bn}.lbf.tsv",f"{cwd}/{_input:bn}.effect.tsv"
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output[0]:bn}'
R: expand = '${ }', stdout = f"{_output[0]:nn}.stdout", stderr = f"{_output[0]:nn}.stderr", container = container
    library("dplyr")
    library("tibble")
    library("purrr")
    library("tidyr")
    library("readr")
    library("stringr")
    library("susieR")
    extract_lbf = function(susie_obj){
    
    if("variants" %in% names(susie_obj) ){
    ss_bf = tibble(snps = susie_obj$variants, cs_index = ifelse(is.null(susie_obj$sets$cs_index), 0, paste0(susie_obj$sets$cs_index,collapse =",")),names = "${f'{_input:br}'.split('.')[-4]}")
    }
      else 
      {
    ss_bf = tibble(snps = susie_obj$variable_name, cs_index = ifelse(is.null(susie_obj$sets$cs_index), 0, paste0(susie_obj$sets$cs_index,collapse =",")),names = "${_input:bnnn}")
     }
    
    ss_bf = ss_bf%>%cbind(susie_obj$lbf_variable%>%t)%>%as_tibble()
    
    return(ss_bf)
    }
    
    extract_variants_pip = function(susie_obj,region_list){
    susie_tb = tibble( variants =  names(susie_obj$pip)[which( susie_obj$pip >= 0)],
                           snps_index = which(( susie_obj$pip >= 0))) %>%
        mutate(chromosome = map_chr(variants, ~read.table(text = .x, sep = ":")$V1%>%str_replace("chr","") ),
                position  = map_chr(variants, ~read.table(text = .x, sep = ":")$V2  ),
                ref = map_chr(position , ~read.table(text = .x, sep = "_",colClasses = "character")$V2  ),
                alt = map_chr(position , ~read.table(text = .x, sep = "_",colClasses = "character")$V3  ),
                position  = map_dbl(position , ~read.table(text = .x, sep = "_",as.is = T)$V1  )
                             )
      susie_tb = susie_tb%>%mutate(cs_order =(map(susie_tb$snps_index , ~tryCatch(which(pmap(list( a= susie_obj$sets$cs) , function(a) .x %in% a )%>%unlist()), error = function(e) return(0) )  ))%>%as.character%>%str_replace("integer\\(0\\)","0"),
                         cs_id = map_chr(cs_order,~ifelse(.x =="0", "None" ,names(susie_obj$sets$cs)[.x%>%str_split(":")%>%unlist%>%as.numeric] ) ),
                         log10_base_factor = map_chr(snps_index,~paste0( susie_obj$lbf_variable[,.x],  collapse = ";")),
                         pip = susie_obj$pip,
                         posterior_mean = coef.susie(susie_obj)[-1],
                         posterior_sd = susie_get_posterior_sd(susie_obj),
                         z = posterior_mean/posterior_sd)
    
          susie_tb =  susie_tb%>%mutate(  molecular_trait_id = region_list$molecular_trait_id,
                             finemapped_region_start = region_list$finemapped_region_start,
                             finemapped_region_end = region_list$finemapped_region_end)
          return(susie_tb)    }
          
        

     extract_effect_pip = function(susie_obj,region_list,susie_tb){
      result_tb =  tibble(phenotype = susie_obj$name,
        V = susie_obj$V,effect_id = paste0("L",1:length(V) ) ,
        cs_log10bf = susie_obj$lbf)
        if(is.null(susie_obj$sets$cs)){
            cs_min_r2 = cs_avg_r2 =  coverage =  0 
            cs = "None"} else {         cs = map_chr(susie_obj$sets$cs[result_tb$effect_id],~susie_tb$variants[.x]%>%paste0(collapse = ";"))
        coverage = map(result_tb$effect_id, ~susie_obj$sets$coverage[which(names(susie_obj$sets$cs) == .x )])%>%as.numeric%>%replace_na(0)
        cs_min_r2  = (susie_obj$sets$purity[result_tb$effect_id,1])%>%as.numeric%>%replace_na(0)  
        cs_avg_r2  = (susie_obj$sets$purity[result_tb$effect_id,2])%>%as.numeric%>%replace_na(0) }
        result_tb = result_tb%>%mutate(cs_min_r2 = cs_min_r2,cs_avg_r2 = cs_avg_r2 ,coverage = coverage%>%unlist,cs = cs )            
      return(result_tb)
      }
       
  
    susie_obj = readRDS("${_input:a}")
    if("variants" %in% names(susie_obj) ){susie_obj$variants = susie_obj$variants%>%str_replace("_",":")}
    if(is.null(names(susie_obj$pip ))){names(susie_obj$pip) = susie_obj$variants}
    lbf = extract_lbf(susie_obj)
    region_list = read_delim("${region_list}","\t")
    if(ncol(region_list) == 3 ){   region_list =  region_list%>%mutate(`#chr` = `#chr`%>%str_remove_all(" ") , ID = paste0(`#chr`,"_",start,"_",end) ) } # LD_list 
    if(region_list$start[1] - region_list$end[1]  == -1 ){ 
        region_list = region_list%>%mutate( start = start - ${windows} ,end = start +${windows}) # region_list for fix cis windows  
          } 
      if("gene_id" %in% colnames(region_list)){region_list = region_list%>%mutate(ID = gene_id)  } # region_list for gene
    region_list = region_list%>%select(molecular_trait_id = ID, chromosome  = `#chr`,finemapped_region_start = start ,finemapped_region_end = end)  # Formatting
    region_list = region_list%>%filter(molecular_trait_id == "${f'{_input:br}'.split('.')[-4]}")
    variants_pip = extract_variants_pip( susie_obj , region_list)
    effect_pip = extract_effect_pip( susie_obj , region_list,variants_pip)
    lbf%>%write_delim("${_output[1]}","\t")
    variants_pip%>%write_delim("${_output[0]}","\t")
    effect_pip%>%write_delim("${_output[2]}","\t")
[sQTL_susie_to_tsv_1]
# Input
# For complete susie, region_list or tad_list, for susie_rss , LD region list 
parameter: region_list = path
region_tbl = pd.read_csv(region_list,sep = "\t")
parameter: rds_path = paths
input: rds_path, group_by = 1
input_name=f"{_input:bn}"
input_name=input_name.replace('*', 'N') # "*" in leafcutter2 would be ignored in shell and cause error 
output: f"{cwd}/{input_name}.variant.tsv",f"{cwd}/{input_name}.lbf.tsv",f"{cwd}/{input_name}.effect.tsv"
tags = f'{step_name}_{_output[0]:bn}'
tags = tags.replace(':', '_').replace('+', 'ps') # also for other symbols in tag id 
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = tags
R: expand = '${ }', stdout = f"{_output[0]:nn}.stdout", stderr = f"{_output[0]:nn}.stderr", container = container
    library("dplyr")
    library("tibble")
    library("purrr")
    library("tidyr")
    library("readr")
    library("stringr")
    library("susieR")
    extract_lbf = function(susie_obj){
    
    if("variants" %in% names(susie_obj) ){
    ss_bf = tibble(snps = susie_obj$variants, cs_index = ifelse(is.null(susie_obj$sets$cs_index), 0, paste0(susie_obj$sets$cs_index,collapse =",")),names = "${f'{_input:br}'.split('.')[-4]}")
    }
      else 
      {
    ss_bf = tibble(snps = susie_obj$variable_name, cs_index = ifelse(is.null(susie_obj$sets$cs_index), 0, paste0(susie_obj$sets$cs_index,collapse =",")),names = "${_input:bnnn}")
     }
    
    ss_bf = ss_bf%>%cbind(susie_obj$lbf_variable%>%t)%>%as_tibble()
    
    return(ss_bf)
    }
    
    extract_variants_pip = function(susie_obj,region_list){
    susie_tb = tibble( variants =  names(susie_obj$pip)[which( susie_obj$pip >= 0)],
                           snps_index = which(( susie_obj$pip >= 0))) %>%
        mutate(chromosome = map_chr(variants, ~read.table(text = .x, sep = ":")$V1%>%str_replace("chr","") ),
                position  = map_chr(variants, ~read.table(text = .x, sep = ":")$V2  ),
                ref = map_chr(position , ~read.table(text = .x, sep = "_",colClasses = "character")$V2  ),
                alt = map_chr(position , ~read.table(text = .x, sep = "_",colClasses = "character")$V3  ),
                position  = map_dbl(position , ~read.table(text = .x, sep = "_",as.is = T)$V1  )
                             )
      susie_tb = susie_tb%>%mutate(cs_order =(map(susie_tb$snps_index , ~tryCatch(which(pmap(list( a= susie_obj$sets$cs) , function(a) .x %in% a )%>%unlist()), error = function(e) return(0) )  ))%>%as.character%>%str_replace("integer\\(0\\)","0"),
                         cs_id = map_chr(cs_order,~ifelse(.x =="0", "None" ,names(susie_obj$sets$cs)[.x%>%str_split(":")%>%unlist%>%as.numeric] ) ),
                         log10_base_factor = map_chr(snps_index,~paste0( susie_obj$lbf_variable[,.x],  collapse = ";")),
                         pip = susie_obj$pip,
                         posterior_mean = coef.susie(susie_obj)[-1],
                         posterior_sd = susie_get_posterior_sd(susie_obj),
                         z = posterior_mean/posterior_sd)
    
          susie_tb =  susie_tb%>%mutate(  molecular_trait_id = region_list$molecular_trait_id,
                             finemapped_region_start = region_list$finemapped_region_start,
                             finemapped_region_end = region_list$finemapped_region_end)
          return(susie_tb)    }
          
        

     extract_effect_pip = function(susie_obj,region_list,susie_tb){
      result_tb =  tibble(phenotype = susie_obj$name,
        V = susie_obj$V,effect_id = paste0("L",1:length(V) ) ,
        cs_log10bf = susie_obj$lbf)
        if(is.null(susie_obj$sets$cs)){
            cs_min_r2 = cs_avg_r2 =  coverage =  0 
            cs = "None"} else {         cs = map_chr(susie_obj$sets$cs[result_tb$effect_id],~susie_tb$variants[.x]%>%paste0(collapse = ";"))
        coverage = map(result_tb$effect_id, ~susie_obj$sets$coverage[which(names(susie_obj$sets$cs) == .x )])%>%as.numeric%>%replace_na(0)
        cs_min_r2  = (susie_obj$sets$purity[result_tb$effect_id,1])%>%as.numeric%>%replace_na(0)  
        cs_avg_r2  = (susie_obj$sets$purity[result_tb$effect_id,2])%>%as.numeric%>%replace_na(0) }
        result_tb = result_tb%>%mutate(cs_min_r2 = cs_min_r2,cs_avg_r2 = cs_avg_r2 ,coverage = coverage%>%unlist,cs = cs )            
      return(result_tb)
      }
       
  
    susie_obj = readRDS("${_input:a}")
    if("variants" %in% names(susie_obj) ){susie_obj$variants = susie_obj$variants%>%str_replace("_",":")}
    if(is.null(names(susie_obj$pip ))){names(susie_obj$pip) = susie_obj$variants}
    lbf = extract_lbf(susie_obj)
    region_list = read_delim("${region_list}","\t")
    if(ncol(region_list) == 3 ){   region_list =  region_list%>%mutate(`#chr` = `#chr`%>%str_remove_all(" ") , ID = paste0(`#chr`,"_",start,"_",end) ) } # LD_list 
    if(region_list$start[1] - region_list$end[1]  == -1 ){ 
        region_list = region_list%>%mutate( start = start - ${windows} ,end = start +${windows}) # region_list for fix cis windows  
          } 
      if("gene_id" %in% colnames(region_list)){region_list = region_list%>%mutate(ID = gene_id)  } # region_list for gene
    region_list = region_list%>%select(molecular_trait_id = ID, chromosome  = `#chr`,finemapped_region_start = start ,finemapped_region_end = end)  # Formatting
    mole_id = "${f'{_input:br}'.split('.')[-4]}"%>%gsub("_N:","_*:",.)#for sQTL
    region_list = region_list%>%filter(molecular_trait_id == mole_id)
    variants_pip = extract_variants_pip( susie_obj , region_list)
    effect_pip = extract_effect_pip( susie_obj , region_list,variants_pip)
    lbf%>%write_delim("${_output[1]}","\t")
    variants_pip%>%write_delim("${_output[0]}","\t")
    effect_pip%>%write_delim("${_output[2]}","\t")
[fsusie_to_tsv_1]
# Input
# For complete susie, region_list or tad_list, for susie_rss , LD region list 
parameter: region_list = path
region_tbl = pd.read_csv(region_list,sep = "\t")
parameter: rds_path = paths
input: rds_path, group_by = 1
output: f"{cwd}/{_input:bn}.variant.tsv"
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output[0]:bn}'
R: expand = '${ }', stdout = f"{_output[0]:nn}.stdout", stderr = f"{_output[0]:nn}.stderr", container = container
    library("dplyr")
    library("tibble")
    library("purrr")
    library("tidyr")
    library("readr")
    library("stringr")
    library("susieR")

    extract_variants_pip = function(susie_obj,region_list){
        susie_tb = tibble( variants =  names(susie_obj$csd_X),
                               snps_index = which(( susie_obj$pip >= 0))) %>%
            mutate(chromosome = map_chr(variants, ~read.table(text = .x, sep = ":")$V1%>%str_replace("chr","") ),
                    position  = map_chr(variants, ~read.table(text = .x, sep = ":")$V2  ),
                    ref = map_chr(position , ~read.table(text = .x, sep = "_",colClasses = "character")$V2  ),
                    alt = map_chr(position , ~read.table(text = .x, sep = "_",colClasses = "character")$V3  ),
                    position  = map_dbl(position , ~read.table(text = .x, sep = "_",as.is = T)$V1  )
                                 )
          susie_tb = susie_tb%>%mutate(cs_order =(map(susie_tb$snps_index , ~tryCatch(which(pmap(list( a= susie_obj$cs) , function(a) .x %in% a )%>%unlist()), error = function(e) return(0) )  ))%>%as.character%>%str_replace("integer\\(0\\)","0"),
                             pip = susie_obj$pip)
          susie_tb =  susie_tb%>%mutate(  molecular_trait_id = region_list$tad_index,
                                 finemapped_region_start = region_list$start,
                                 finemapped_region_end = region_list$end)
          if("purity" %in% names(susie_obj)){
              susie_tb = susie_tb%>%mutate(purity = map_dbl(susie_tb$cs_order, ~ifelse(.x%>%as.numeric > 0, susie_obj$purity[[as.numeric(.x)]], NA ) ), is_dummy = as.numeric(purity < 0.5)  )
              }
    susie_tb = susie_tb%>%mutate(effect_peak_pos = map_dbl(cs_order, ~ifelse(.x%>%as.numeric > 0, susie_obj$outing_grid[which(abs(susie_obj$fitted_func[[as.numeric(.x)]]) == max(abs(susie_obj$fitted_func[[as.numeric(.x)]])))] , NA ) )) 
    susie_tb_lbf = cbind(susie_tb%>%select(molecular_trait_id,variants,cs_order),Reduce(cbind, susie_obj$lBF)%>%as.tibble%>%`colnames<-`(1:length(susie_obj$lBF)))
          return(list(susie_tb, susie_tb_lbf))    }
    susie_obj = readRDS("${_input:a}")
    region_list = read_delim("${region_list}","\t")
    region_list = region_list%>%filter(tad_index == "${f'{_input:br}'.split('.')[-4]}")
    variants_pip = extract_variants_pip( susie_obj , region_list)[[1]]
    variants_lbf = extract_variants_pip( susie_obj , region_list)[[2]]
    print(paste0("fsusie run time is ", round(susie_obj$runtime[[3]]/60),"min"))
    variants_pip%>%write_delim("${_output}","\t")
    variants_pip%>%write_delim("${_output:nn}.lbf.tsv","\t")
[*_to_tsv_2]
parameter: name = f'{_input[0]:b}'.split(".")[0]
input: group_by = "all"
output: f"{cwd}/{name}.all_variants.tsv"
bash: expand = '${ }', stdout = f"{_output:n}.stdout", stderr = f"{_output:n}.stderr", container = container
    head -1 ${_input[0]} > ${_output}
    cat ${_input[0]:d}/*variant.tsv | grep -v cs_order >> ${_output}
[susie_tsv_collapse]
parameter: tsv_path = paths # TSV needs have the name ends with  *.chr1_2_3.unisusie(_rss).lbf.tsv
tsv_list  = pd.DataFrame({"lbf_path" : [str(x) for x in tsv_path]})
chromosome = list(set([f'{x.split(".")[-5].split("_")[0].replace("chr","")}'  for x in tsv_list.lbf_path ])) ## Add chr if there is no chr prefix. This is to accomodata chr XY and M
input: tsv_path, for_each = "chromosome"
output: f'{cwd}/{_input[0]:bnnnnnnn}.chr{_chromosome}.unisusie_rss.lbf.tsv'
bash: expand = '${ }', stdout = f"{_output}.stdout", stderr = f"{_output}.stderr", container = container
        head -1 ${_input[0]} > ${_output}
        cat ${_input[0]:d}/*.chr${_chromosome}_*lbf.tsv | grep -v cs_index >> ${_output}
        
[susie_pip_landscape_plot]
parameter: plot_list = path
parameter: annot_tibble = path("~/Annotatr_builtin_annotation_tibble.tsv")
import pandas as pd
plot_list  = pd.read_csv(plot_list,sep = "\t")
file_type = plot_list.columns.values.tolist()
file_type = [x.split(".")[0] for x in file_type ]
plot_list = plot_list.to_dict("records")
input: plot_list, group_by = len(file_type)
output: f'{cwd}/{"_".join(file_type)}.{str(_input[0]).split(".")[-5]}.pip_landscape_plot.rds',f'{cwd}/{"_".join(file_type)}.{str(_input[0]).split(".")[-5]}.pip_landscape_plot.pdf'
R: expand = '${ }', stdout = f"{_output[0]}.stdout", stderr = f"{_output[0]}.stderr", container = container
    library("dplyr")
    library("readr") 
    library("ggplot2")
    library("purrr")
    color = c("black", "dodgerblue2", "green4", "#6A3D9A", 
          "#FF7F00", "gold1", "skyblue2", "#FB9A99", "palegreen2",
          "#CAB2D6", "#FDBF6F", "gray70", "khaki2", "maroon", "orchid1",
          "deeppink1", "blue1", "steelblue4", "darkturquoise", "green1", 
          "yellow4", "yellow3","darkorange4","brown","navyblue","#FF0000",
          "darkgreen","#FFFF00","purple","#00FF00","pink","#0000FF",
          "orange","#FF00FF","cyan","#00FFFF","#FFFFFF")
    extract_table = function(variant_df,type){ 
    if("purity" %in% colnames(variant_df) ){
      variant_df$purity[is.na(variant_df$purity)] = 0
      variant_df[abs(variant_df$purity) < 0.5,7] = 0
      }
     variant_df = variant_df%>%mutate(CS = (cs_order%>%as.factor%>%as.numeric-1)%>%as.factor)%>%
          select( y = pip ,snp = variants,pos = position , CS, molecular_trait_id)%>%mutate(molecular_trait_id = paste0(type,"_",molecular_trait_id ) )
    return(variant_df)
    }
    plot_recipe = tibble( type =  c('${"','".join(file_type) }'), path = c(${_input:r,}))
    plot_list = map2(plot_recipe$type,plot_recipe$path, ~read_delim(.y, guess_max = 10000000)%>%extract_table(.x) )
    plot_df = Reduce(rbind,plot_list)
    plot_range = (plot_df%>%group_by(molecular_trait_id)%>%summarize(start = (min(pos)), end = (max(pos)))%>%mutate(start = median(start),end = median(end)))[1,c(2,3)]%>%as.matrix
    plot_chr = (plot_df$snp[1]%>%stringr::str_split(":"))[[1]][1]
    plot_df = plot_df%>%mutate(Shared = as.logical(map(snp, ~(plot_df%>%filter( snp ==.x ,   CS%>%as.numeric !=  1 )%>%nrow()) > 1  )))
    pip_plot <- plot_df%>%ggplot2::ggplot(aes(y = y, x = pos,
                                  col =  CS, shape = Shared )) + facet_grid(molecular_trait_id ~.)+
      geom_point(size = 7) +
      scale_color_manual("CS",values = color) +
      theme(axis.ticks.x = element_blank()) +
     ylab("Posterior Inclusion Probability (PIP)")+xlim(plot_range)+
      theme(axis.ticks.x = element_blank()) +
            theme(strip.text.y.right = element_text(angle = 0))+
            xlab("") + 
            theme(text = element_text(size = 30))+ggtitle("Overview of fine-mapping")
  
    annot = read_delim("${annot_tibble}")
    annot = annot%>%filter(seqnames == plot_chr, start > plot_range[1], end < plot_range[2])
    annot_plot   = annot%>%filter(!type%in%c("hg38_genes_introns","hg38_genes_1to5kb"))%>%
                        ggplot(aes())+
                        geom_segment( aes(x = start,xend = end, y = "Regulartory Element", yend = "Regulartory Element", color = type ), linewidth =10)+
                        ylab("")+xlab("")+xlim(plot_range)+theme(axis.text.x=element_blank(),text = element_text(size = 20))+scale_color_brewer(palette="Dark2")
    gene_plot = annot%>%filter(type%in%c("hg38_genes_1to5kb"))%>%group_by(symbol)%>%
                            summarise(start = min(start), end = max(end))%>%na.omit%>%
                            ggplot(aes())+geom_segment( aes(x = start,xend = end, y = "Gene", yend = "Gene", color = symbol ), linewidth =10)+
                            geom_label(aes(x = (start+end)/2,y = "Gene", label = symbol ),size = 5)+ylab("")+xlab("POS")+
                            theme(legend.position="none")+theme(text = element_text(size = 20))+xlim(plot_range)
      
    list(pip_plot,plot_df,annot_plot,gene_plot)%>%saveRDS("${_output[0]}")
    cowplot::plot_grid(plotlist = list(pip_plot,annot_plot,gene_plot),ncol = 1, align = "v",axis = "tlbr",rel_heights = c(8,1,1))%>%ggsave(filename = "${_output[1]}",device = "pdf",dpi = "retina",width = 30, height = 30)
[susie_upsetR_plot]
parameter: plot_list = path
import pandas as pd
plot_list  = pd.read_csv(plot_list, sep = "\t")
file_type = plot_list.columns.values.tolist()
file_type = [x.split(".")[0] for x in file_type ]
plot_list = plot_list.to_dict("records")
input: plot_list
output: f'{cwd}/{"_".join(file_type)}.UpSetR.rds',f'{cwd}/{"_".join(file_type)}.UpSetR.pdf'
R: expand = '${ }', stdout = f"{_output[0]}.stdout", stderr = f"{_output[0]}.stderr", container = container
    library("dplyr")
    library("readr") 
    library("ggplot2")
    library("purrr")
    library("UpSetR")
    library("ComplexUpset")
    plot_recipe = tibble( type =  c('${"','".join(file_type) }'), path = c(${_input:r,}))
    plot_list = map2(plot_recipe$type,plot_recipe$path, ~read_delim(.y, guess_max = 10000000)%>%mutate(cs = cs_order != 0 )%>%filter(cs > 0)%>%select(variants,cs)%>%`colnames<-`(c("variants",.x))%>%distinct() )
    cs_sharing = Reduce(full_join,plot_list)
    cs_upsetR_sharing = cs_sharing
    cs_upsetR_sharing[,2:ncol(cs_upsetR_sharing)]%>%mutate_all(as.numeric)-> cs_upsetR_sharing[,2:ncol(cs_upsetR_sharing)]
    a = upset(cs_upsetR_sharing%>%as.data.frame,intersect = colnames(cs_upsetR_sharing[2:ncol(cs_upsetR_sharing)]),
      keep_empty_groups = F,
          base_annotations=list(`Intersection size` = intersection_size( bar_number_threshold = 1, position = position_dodge(0.5), width = 0.3 ,text = list(size = 5)   )  ) ,
              themes=upset_default_themes(axis.text=element_text(size=30))     ,
              min_degree = 1)
    
    a%>%ggsave(filename = "${_output[1]}",device = "pdf",dpi = "retina",width=18.5, height=10.5)
    list(cs_upsetR_sharing)%>%saveRDS("${_output[0]}")
[susie_upsetR_cs_plot]
parameter: plot_list = path
import pandas as pd
plot_list  = pd.read_csv(plot_list, sep = "\t")
file_type = plot_list.columns.values.tolist()
file_type = [x.split(".")[0] for x in file_type ]
plot_list = plot_list.to_dict("records")
parameter: trait_to_select =  1 
input: plot_list
output: f'{cwd}/{"_".join(file_type)}.UpSetR_{file_type[trait_to_select-1]}_cs.rds',f'{cwd}/{"_".join(file_type)}.UpSetR_{file_type[trait_to_select-1]}_cs.pdf'
R: expand = '${ }', stdout = f"{_output[0]}.stdout", stderr = f"{_output[0]}.stderr", container = container

    library("dplyr")
    library("readr") 
    library("ggplot2")
    library("purrr")
    library("UpSetR")
    library("ComplexUpset")

    cs_sharing_identifer = function(upsetR_input,df){
    inner_join(upsetR_input, df%>%select(variants,molecular_trait_id, cs_order)%>%filter(cs_order != 0))%>%select(-variants)-> dfL_CS_sharing
    dfL_CS_sharing[is.na(dfL_CS_sharing)] = FALSE
    dfL_CS_sharing = dfL_CS_sharing%>%group_by(molecular_trait_id,cs_order)%>%summarize(across(everything(), list(mean))   )
    dfL_CS_sharing = dfL_CS_sharing%>%mutate(across(colnames(dfL_CS_sharing)[3:ncol(dfL_CS_sharing)], ~.x != 0  ))%>%`colnames<-`(c("molecular_trait_id","cs_order",colnames(cs_sharing)[2:ncol(cs_sharing)]))
    }
  
  
    plot_recipe = tibble( type =  c('${"','".join(file_type) }'), path = c(${_input:r,}))
    plot_list = map2(plot_recipe$type,plot_recipe$path, ~read_delim(.y, guess_max = 10000000)%>%mutate(cs = cs_order != 0 )%>%filter(cs > 0)%>%select(variants,cs)%>%`colnames<-`(c("variants",.x))%>%distinct() )
    cs_sharing = Reduce(full_join,plot_list)
    cs_upsetR_sharing = cs_sharing
    cs_upsetR_sharing[,2:ncol(cs_upsetR_sharing)]%>%mutate_all(as.numeric)-> cs_upsetR_sharing[,2:ncol(cs_upsetR_sharing)]

    df = read_delim(plot_recipe$path[[${trait_to_select}]]) 
    
    cs_sharing_df = cs_sharing_identifer(cs_upsetR_sharing,df)

    a = upset(cs_sharing_df%>%as.data.frame,intersect = colnames(cs_sharing_df[3:ncol(cs_sharing_df)]),
          keep_empty_groups = F,
          base_annotations=list(`Intersection size` = intersection_size( bar_number_threshold = 1, position = position_dodge(0.5), width = 0.3 ,text = list(size = 8)   )  ),
          themes=upset_default_themes(axis.text=element_text(size=30)),
          set_size = F  ,  min_degree = 1,wrap = T) + ggtitle( paste0(plot_recipe$type[[${trait_to_select}]],'CS shared with other phenotypes') )   + theme(plot.title = element_text(size = 40, face = "bold"))
  
    a%>%ggsave(filename = "${_output[1]}",device = "pdf",dpi = "retina",width=18.5, height=10.5)
    list(cs_sharing_df)%>%saveRDS("${_output[0]}")
[tmp_annotatation_of_snps_1]
parameter: SNP_list = path
parameter: annotation_rds = path
input: SNP_list
output: f'{cwd}/{_input:b}.annotated.rds'
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output:bn}'
R: expand = '${ }', stdout = f"{_output}.stdout", stderr = f"{_output}.stderr", container = container
    library("dplyr")
    library("readr") 
    library("purrr")
    library("stringr")
    sharing_snp = readRDS("${_input}")
    sharing_snp_fsusie = sharing_snp[[1]]%>%filter(haQTL == 1 | mQTL == 1)
    sharing_snp_fsusie = sharing_snp_fsusie%>%mutate(X1 =  read.table(text = sharing_snp_fsusie$variants, sep = ":")$V1, X2 = read.table(text = read.table(text = sharing_snp_fsusie$variants, sep = ":")$V2 , sep = "_")$V1  )
    sharing_snp_fsusie = sharing_snp_fsusie%>%select(variants,chr = X1, pos = X2)
    annotation = readRDS("${annotation_rds}")
    print("data loaded")
    result = sharing_snp_fsusie%>%mutate(annot = map2( chr,pos , ~ annotation%>%filter(X1 == .x, X2 <= .y, X3 >= .y)%>%pull(X5)))%>%mutate(annot = map_chr(annot, ~paste0(.x ,collapse = ",")) )
    print("snp annotated")
    result%>%saveRDS("${_output}")
[tmp_annotatation_of_snps_2]
parameter: SNP_list = path
parameter: annotation_rds = path
output: f'{cwd}/{_input:b}.annotated_rev.rds'
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output:bn}'
R: expand = '${ }', stdout = f"{_output}.stdout", stderr = f"{_output}.stderr", container = container
    library("dplyr")
    library("readr") 
    library("purrr")
    library("stringr")
    result = readRDS(${_input:r})
    result_rev = tibble(annot = unique(annotation$X5))%>%mutate(variants = map(annot, ~  result%>%filter( str_detect(annot,.x))%>%pull(variants)) )%>%mutate( variants = map_chr(variants,~paste0(.x ,collapse = ","))  )
    result_rev%>%saveRDS("${_output}")
[fsusie_extract_effect]
parameter: rds_path = paths
parameter: annot_tibble = path("~/Annotatr_builtin_annotation_tibble.tsv")
input: rds_path, group_by = 1
output: f'{cwd}/{_input:bn}.estimated_effect.tsv',f'{cwd}/{_input:bn}.estimated_effect.pdf'
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output:bn}'
R: expand = '${ }', stdout = f"{_output[0]}.stdout", stderr = f"{_output[0]}.stderr", container = container
    library("stringr")
    library("dplyr")
    library("readr") 
    library("ggplot2")
    library("purrr")
    library("tidyr")
    color = c("black", "dodgerblue2", "green4", "#6A3D9A", 
          "#FF7F00", "gold1", "skyblue2", "#FB9A99", "palegreen2",
          "#CAB2D6", "#FDBF6F", "gray70", "khaki2", "maroon", "orchid1",
          "deeppink1", "blue1", "steelblue4", "darkturquoise", "green1", 
          "yellow4", "yellow3","darkorange4","brown","navyblue","#FF0000",
          "darkgreen","#FFFF00","purple","#00FF00","pink","#0000FF",
          "orange","#FF00FF","cyan","#00FFFF","#FFFFFF")
    
    effect_extract = function(fsusie){
        plot_df = fsusie$fitted_func%>%as_tibble(.name_repair = "universal")%>%mutate(pos =  fsusie$outing_grid)%>%`colnames<-`(c(paste0("Effect_",1:length(fsusie$cs)),"pos"))%>%mutate(`#chr` = str_split(names(fsusie$csd_X)[[1]],":")[[1]][[1]] )%>%select(`#chr`, pos, everything())
        plot=  plot_df%>%pivot_longer(cols = 3:ncol(plot_df),names_to = "effect", values_to = "values"   ) %>%ggplot(aes(x = pos, y = values,color = effect),linewidth = 7)+
        geom_line()+ylab("Estimated Effect") + xlab("POS")+facet_grid(effect~. )+
            scale_color_manual("Credible set",values = color[2:length(color)])+geom_line(aes(y = 0), color = "black")
            theme(strip.text.y.right = element_text(angle = 0))+
            xlab("") + 
            ylab("Estimated Effect")+ 
            theme(text = element_text(size = 50))+
            ggtitle(paste0( "Estimated effect for ${f'{_input:br}'.split('.')[-4]}"))
        return(list(plot_df,plot))
        }
    susie_obj =  readRDS("${_input}")
    output = effect_extract(susie_obj)
    effect_tbl = output[[1]]
    annot = read_delim("${annot_tibble}")
    annot = annot%>%filter(seqnames == (effect_tbl$`#chr`)[[1]], start > min(effect_tbl$pos), end < max(effect_tbl$pos))
    plot_range = c(min(effect_tbl$pos),  max(effect_tbl$pos))
    annot_plot   = annot%>%filter(!type%in%c("hg38_genes_introns","hg38_genes_1to5kb"))%>%
                        ggplot(aes())+
                        geom_segment( aes(x = start,xend = end, y = "Regulartory Element", yend = "Regulartory Element", color = type ), linewidth =10)+
                        ylab("")+xlab("")+xlim(plot_range)+theme(axis.text.x=element_blank(),text = element_text(size = 20))+scale_color_brewer(palette="Dark2")
    gene_plot = annot%>%filter(type%in%c("hg38_genes_1to5kb"))%>%group_by(symbol)%>%
                            summarise(start = min(start), end = max(end))%>%na.omit%>%
                            ggplot(aes())+geom_segment( aes(x = start,xend = end, y = "Gene", yend = "Gene", color = symbol ), linewidth =10)+
                            geom_label(aes(x = (start+end)/2,y = "Gene", label = symbol ),size = 5)+ylab("")+xlab("POS")+
                            theme(legend.position="none")+theme(text = element_text(size = 20))+xlim(plot_range)
    cowplot::plot_grid(plotlist = list(output[[2]]),ncol = 1, align = "v",axis = "tlbr",rel_heights = c(8))%>%ggsave(filename = "${_output[1]}",device = "pdf",dpi = "retina",width = 30, height = 30)
    effect_tbl%>%write_delim("${_output[0]}","\t")
[fsusie_affected_region]
parameter: rds_path = paths
input: rds_path, group_by = 1
output: f'{cwd}/{_input:bn}.affected_region.tsv',f'{cwd}/{_input:bn}.affected_region.pdf'
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output[0]:bn}'
R: expand = '${ }', stdout = f"{_output[0]}.stdout", stderr = f"{_output[0]}.stderr", container = container
    library("stringr")
    library("dplyr")
    library("readr") 
    library("ggplot2")
    library("purrr")
    library("tidyr")
    library(susiF.alpha)
    library(ashr)
    library(wavethresh)
    color = c("black", "dodgerblue2", "green4", "#6A3D9A", 
          "#FF7F00", "gold1", "skyblue2", "#FB9A99", "palegreen2",
          "#CAB2D6", "#FDBF6F", "gray70", "khaki2", "maroon", "orchid1",
          "deeppink1", "blue1", "steelblue4", "darkturquoise", "green1", 
          "yellow4", "yellow3","darkorange4","brown","navyblue","#FF0000",
          "darkgreen","#FFFF00","purple","#00FF00","pink","#0000FF",
          "orange","#FF00FF","cyan","#00FFFF","#FFFFFF")
    ## Define Function
    update_cal_credible_band2 <- function(susiF.obj )
    {
    
    
    
      if(sum( is.na(unlist(susiF.obj$alpha))))
      {
        stop("Error: some alpha value not updated, please update alpha value first")
      }
      temp <- wavethresh::wd(rep(0, susiF.obj$n_wac))
    
    
      for ( l in 1:susiF.obj$L)
      {
        Smat <-  susiF.obj$fitted_wc2[[l]]
        W1   <- ((wavethresh::GenW(n=  ncol(Smat )  , filter.number = 10, family = "DaubLeAsymm")))
        tt   <- diag( W1%*%diag(c(susiF.obj$alpha[[l]]%*%Smat ))%*% t(W1 ))
    
        up                       <-  susiF.obj$fitted_func[[l]]+ 3*sqrt(tt)
        low                      <-  susiF.obj$fitted_func[[l]]- 3*sqrt(tt)
        susiF.obj$cred_band[[l]] <- rbind(up, low)
      }
    
    
    
      return(susiF.obj)
    }
    
    affected_reg <- function( susiF.obj){
      outing_grid <- susiF.obj$outing_grid
    
      reg <-  list()
      h <- 1
      for (   l in 1:length(susiF.obj$cs)){
    
        pos_up <-  which(susiF.obj$cred_band[[l]][1,]<0)
        pos_low <- which(susiF.obj$cred_band[[l]][2,]>0)
    
    
        reg_up <- split( pos_up,cumsum(c(1,diff( pos_up)!=1)))
    
        reg_low <- split( pos_low,cumsum(c(1,diff( pos_low)!=1)))
        for( k in 1:length(reg_up)){
          reg[[h]] <- c(l, outing_grid[reg_up[[k]][1]], outing_grid[reg_up[[k]][length(reg_up[[k]])]])
    
          h <- h+1
        }
        for( k in 1:length(reg_low )){
          reg[[h]] <- c(l, outing_grid[reg_low [[k]][1]], outing_grid[reg_low [[k]][length(reg_low [[k]])]])
    
          h <- h+1
        }
    
    
      }
      reg <-  do.call(rbind, reg)
      colnames(reg) <- c("CS", "Start","End")
      return(reg)
    }
    
    
    susiF_obj =  readRDS("${_input}")
    susiF_obj = update_cal_credible_band2(susiF_obj)
    affected_tbl = affected_reg(susiF_obj)
    affected_tbl = affected_tbl%>%as_tibble%>%mutate(analysis = susiF_obj$name,
                          chr = (names(susiF_obj$csd_X)[[1]]%>%stringr::str_split(":"))[[1]][[1]],
                          molecular_trait_id =  "${f'{_input:br}'.split('.')[-4]}", 
                          purity  = purrr::map_dbl(CS,~susiF_obj$purity[[.x]] )  )
    
    affected_tbl%>%as_tibble%>%write_delim("${_output[0]}","\t")
    plt = plot_susiF(susiF_obj, cred.band = T)
    plt%>%ggsave(filename = "${_output[1]}",device = "pdf",dpi = "retina",width = 30, height = 30)
#[uni_susie_2]
input: group_with = "genoFile"
output: f"{_input:n}.vcf.bgz"
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output[0]:bn}'
R: expand = '${ }', stdout = f"{_output:nn}.stdout", stderr = f"{_output:nn}.stderr", container = container
   ## Define create_vcf function
           create_vcf = function (chrom, pos, nea, ea, snp = NULL, ea_af = NULL, effect = NULL, 
        se = NULL, pval = NULL, name = NULL,cs = NULL, pip = NULL) 
    {
        stopifnot(length(chrom) == length(pos))
        if (is.null(snp)) {
            snp <- paste0(chrom, ":", pos)
        }
        snp <- paste0(chrom, ":", pos)
        nsnp <- length(chrom)
        gen <- list()
        ## Setupt data content for each sample column
        if (!is.null(ea_af)) 
            gen[["AF"]] <- matrix(ea_af, nsnp)
        if (!is.null(effect)) 
            gen[["ES"]] <- matrix(effect, nsnp)
        if (!is.null(se)) 
            gen[["SE"]] <- matrix(se, nsnp)
        if (!is.null(pval)) 
            gen[["LP"]] <- matrix(-log10(pval), nsnp)
        if (!is.null(cs)) 
            gen[["CS"]] <- matrix(cs, nsnp)
        if (!is.null(pip)) 
            gen[["PIP"]] <- matrix(pip, nsnp)
        gen <- S4Vectors::SimpleList(gen)
        
      ## Setup snps info for the fix columns
        gr <- GenomicRanges::GRanges(chrom, IRanges::IRanges(start = pos, 
            end = pos + pmax(nchar(nea), nchar(ea)) - 1, names = snp))
         coldata <- S4Vectors::DataFrame(Studies = name, row.names = name)
    ## Setup header informations
        hdr <- VariantAnnotation::VCFHeader(header = IRanges::DataFrameList(fileformat = S4Vectors::DataFrame(Value = "VCFv4.2", 
            row.names = "fileformat")), sample = name)
        VariantAnnotation::geno(hdr) <- S4Vectors::DataFrame(Number = c("A", 
            "A", "A", "A", "A", "A"), Type = c("Float", "Float", 
            "Float", "Float", "Float", "Float"), Description = c("Effect size estimate relative to the alternative allele", 
            "Standard error of effect size estimate", "-log10 p-value for effect estimate",  
            "Alternate allele frequency in the association study",
            "The CS this variate are captured, 0 indicates not in any cs", "The posterior inclusion probability to a CS"), 
            row.names = c("ES", "SE", "LP", "AF", "CS", "PIP"))
      ## Save only the meta information in the sample columns 
        VariantAnnotation::geno(hdr) <- subset(VariantAnnotation::geno(hdr), 
            rownames(VariantAnnotation::geno(hdr)) %in% names(gen))
      ## Save VCF 
        vcf <- VariantAnnotation::VCF(rowRanges = gr, colData = coldata, 
            exptData = list(header = hdr), geno = gen)
        VariantAnnotation::alt(vcf) <- Biostrings::DNAStringSetList(as.list(ea))
        VariantAnnotation::ref(vcf) <- Biostrings::DNAStringSet(nea)
      ## Add fixed values
        VariantAnnotation::fixed(vcf)$FILTER <- "PASS"
          return(sort(vcf))
        }
    library("susieR")
    library("dplyr")
    library("tibble")
    library("purrr")
    library("readr")
    library("tidyr")
    library("stringr")
    
    # Get list of cs snps
    susie_list = readRDS(${_input:r})
    susie_tb_ls = list()
    for (i in 1:length(susie_list)){
        susie_tb = tibble( snps =  names(susie_list[[1]]$pip)[which( susie_list[[i]]$pip >= 0)], snps_index = which(( susie_list[[i]]$pip >= 0))  )
        susie_tb_ls[[i]]= susie_tb%>%mutate( cs = map(snps_index,~which( susie_list[[i]]$sets$cs %in% .x))%>%as.numeric%>%replace_na(0),
                                 pip = map_dbl(snps_index,~( susie_list[[i]]$pip[.x])),
                                 coef = map_dbl(snps_index,~(coef.susie( susie_list[[i]])[.x+1])))
        }
    if(length(susie_tb_ls) >= 2){ 
      for(i in 2:length(susie_tb_ls)){
          susie_tb_ls[[i]] = full_join(susie_tb_ls[[i-1]],susie_tb_ls[[i]], by = "snps") 
        }
    }
    m = c("cs","pip","coef")    
    output = list()
    for(i in m){
    output[[i]] = susie_tb_ls[[length(susie_tb_ls)]]%>%select(contains(i))%>%as.matrix
    }
    snps_tb = susie_tb_ls[[length(susie_tb_ls)]]%>%mutate(
                         chr = map_chr(snps,~read.table(text = .x,sep = ":",as.is = T)$V1),
                         pos_alt_ref = map_chr(snps,~read.table(text = .x,sep = ":",as.is = TRUE)$V2),
                         pos = map_dbl(pos_alt_ref,~read.table(text = .x,sep = "_",as.is = TRUE)$V1),
                         alt = map_chr(pos_alt_ref,~read.table(text = .x,sep = "_",as.is = TRUE, colClass = "character")$V2),
                         ref = map_chr(pos_alt_ref,~read.table(text = .x,sep = "_",as.is = TRUE, colClass = "character")$V3))
    
    snps_tb = snps_tb%>%filter(str_detect(ref, "[ACTG]") & str_detect(alt, "[ACTG]"))
    output_vcf = create_vcf(
            chrom = snps_tb$chr,
             pos = snps_tb$pos,
             ea = snps_tb$alt,
             nea = snps_tb$ref,
             effect = snps_tb%>%select(contains("coef"))%>%as.matrix ,
             pip = snps_tb%>%select(contains("pip"))%>%as.matrix,
             cs = snps_tb%>%select(contains("cs"))%>%as.matrix,
             name = names(susie_list))
    VariantAnnotation::writeVcf(output_vcf,${_output:nr},index = TRUE)
[mv_susie_2]
input: group_with = "genoFile"
output: f"{_input:n}.vcf.bgz"
task: trunk_workers = 1, trunk_size = 1, walltime = '2h', mem = '55G', cores = 1, tags = f'{step_name}_{_output[0]:bn}'
R: expand = '${ }', stdout = f"{_output:nn}.stdout", stderr = f"{_output:nn}.stderr", container = container
   ## Define create_vcf function
           create_vcf = function (chrom, pos, nea, ea, snp = NULL, ea_af = NULL, effect = NULL, 
        se = NULL, pval = NULL, name = NULL,cs = NULL, pip = NULL) 
    {
        stopifnot(length(chrom) == length(pos))
        if (is.null(snp)) {
            snp <- paste0(chrom, ":", pos)
        }
        snp <- paste0(chrom, ":", pos)
        nsnp <- length(chrom)
        gen <- list()
        ## Setupt data content for each sample column
        if (!is.null(ea_af)) 
            gen[["AF"]] <- matrix(ea_af, nsnp)
        if (!is.null(effect)) 
            gen[["ES"]] <- matrix(effect, nsnp)
        if (!is.null(se)) 
            gen[["SE"]] <- matrix(se, nsnp)
        if (!is.null(pval)) 
            gen[["LP"]] <- matrix(-log10(pval), nsnp)
        if (!is.null(cs)) 
            gen[["CS"]] <- matrix(cs, nsnp)
        if (!is.null(pip)) 
            gen[["PIP"]] <- matrix(pip, nsnp)
        gen <- S4Vectors::SimpleList(gen)
        
      ## Setup snps info for the fix columns
        gr <- GenomicRanges::GRanges(chrom, IRanges::IRanges(start = pos, 
            end = pos + pmax(nchar(nea), nchar(ea)) - 1, names = snp))
         coldata <- S4Vectors::DataFrame(Studies = name, row.names = name)
    ## Setup header informations
        hdr <- VariantAnnotation::VCFHeader(header = IRanges::DataFrameList(fileformat = S4Vectors::DataFrame(Value = "VCFv4.2", 
            row.names = "fileformat")), sample = name)
        VariantAnnotation::geno(hdr) <- S4Vectors::DataFrame(Number = c("A", 
            "A", "A", "A", "A", "A"), Type = c("Float", "Float", 
            "Float", "Float", "Float", "Float"), Description = c("Effect size estimate relative to the alternative allele", 
            "Standard error of effect size estimate", "-log10 p-value for effect estimate",  
            "Alternate allele frequency in the association study",
            "The CS this variate are captured, 0 indicates not in any cs", "The posterior inclusion probability to a CS"), 
            row.names = c("ES", "SE", "LP", "AF", "CS", "PIP"))
      ## Save only the meta information in the sample columns 
        VariantAnnotation::geno(hdr) <- subset(VariantAnnotation::geno(hdr), 
            rownames(VariantAnnotation::geno(hdr)) %in% names(gen))
      ## Save VCF 
        vcf <- VariantAnnotation::VCF(rowRanges = gr, colData = coldata, 
            exptData = list(header = hdr), geno = gen)
        VariantAnnotation::alt(vcf) <- Biostrings::DNAStringSetList(as.list(ea))
        VariantAnnotation::ref(vcf) <- Biostrings::DNAStringSet(nea)
      ## Add fixed values
        VariantAnnotation::fixed(vcf)$FILTER <- "PASS"
          return(sort(vcf))
        }
    library("susieR")
    library("dplyr")
    library("tibble")
    library("purrr")
    library("readr")
    library("tidyr")
    
    # Get list of cs snps
    res = readRDS(${_input:r})
    output_snps = tibble( snps = res$variable_name[which(res$pip >= 0)], snps_index = which((res$pip >= 0))  )
    output_snps = output_snps%>%mutate( cs = map(snps_index,~which(res$sets$cs %in% .x))%>%as.numeric%>%replace_na(0),
                             pip = map_dbl(snps_index,~(res$pip[.x])),
                     chr = map_chr(snps,~read.table(text = .x,sep = ":",as.is = T)$V1),
                     pos_alt_ref = map_chr(snps,~read.table(text = .x,sep = ":",as.is = TRUE)$V2),
                     pos = map_dbl(pos_alt_ref,~read.table(text = .x,sep = "_",as.is = TRUE)$V1),
                     alt = map_chr(pos_alt_ref,~read.table(text = .x,sep = "_",as.is = TRUE, colClass = "character")$V2),
                     ref = map_chr(pos_alt_ref,~read.table(text = .x,sep = "_",as.is = TRUE, colClass = "character")$V3))
    
    effect_mtr = res$coef[output_snps$snps_index+1]%>%as.matrix
    colnames(effect_mtr) = "${name}"
    rownames(effect_mtr) = output_snps$snps
    cs_mtr = effect_mtr
    for(i in 1:nrow(cs_mtr)) cs_mtr[i,] =  output_snps$cs[[i]]  
    pip_mtr = effect_mtr
    for(i in 1:nrow(pip_mtr)) pip_mtr[i,] =  output_snps$pip[[i]]  
    
    output_vcf = create_vcf(
           chrom = output_snps$chr,
            pos = output_snps$pos,
            ea = output_snps$alt,
            nea = output_snps$ref,
            effect = effect_mtr ,
            pip = pip_mtr,
            cs = cs_mtr,
            name = colnames(effect_mtr)
              )
    VariantAnnotation::writeVcf(output_vcf,${_output:nr},index = TRUE)
# Exporting cis susie_twas results
[cis_results_export_1, gwas_results_export_1, export_top_loci_1]
# per chunk we process at most 200 datasets
parameter: per_chunk = 200
# Region list should have last column being region name. 
parameter:region_file=path()
# the path stored original output files
parameter:file_path=''
# assuming orinal files are named as prefix.id.suffix
# prefix of the original files (before id) to identify file.
parameter:prefix=[]
# suffix of the original files (after id) to identify file.
parameter:suffix=str
# if need to rename context, please load condition meta file
parameter: condition_meta = path()
# if the pip all variants in a cs < this threshold, then remove this cs
parameter: pip_thres = 0.05
# only keep top cs_size variants in one cs 
parameter: cs_size = 3
# provide exported meta to filter the exported genes 
parameter: exported_file = path()
# Optional: if a region list is provide the analysis will be focused on provided region. 
# The LAST column of this list will contain the ID of regions to focus on
# Otherwise, all regions with both genotype and phenotype files will be analyzed
parameter: region_list = path()
# Optional: if a region name is provided 
# the analysis would be focused on the union of provides region list and region names
parameter: region_name = []
# the (aligned) geno bim file to check allele flipping
parameter: geno_ref = path()
# context meta file to map renamed context backs
parameter: context_meta = path()
# default cretria in susie purity filtering, recommended to use 0.8 here 
parameter: min_corr = []
# set this parameter as `True` when exporting gwas data.
parameter: gwas = False
# set this parameter as `True` when exporting fsusie data.
parameter: fsusie = False
# set this parameter as `True` when exporting metaQTL data.
parameter: metaQTL = False
# set this parameter as `True` when exporting mnm data.
parameter: mnm = False

import pandas as pd
import os

region = pd.read_csv(region_file, sep='\t', names=['chr', 'start', 'end', 'id'])

region_ids = []
# If region_list is provided, read the file and extract IDs
if not region_list.is_dir():
    if region_list.is_file():
        region_list_df = pd.read_csv(region_list, sep='\t', header=None, comment = "#")
        region_ids = region_list_df.iloc[:, -1].unique()  # Extracting the last column for IDs
    else:
        raise ValueError("The region_list path provided is not a file.")
        
if len(region_name) > 0:
    region_ids = list(set(region_ids).union(set(region_name)))
    
if len(region_ids) > 0:
    region = region[region['id'].isin(region_ids)]

def filter_existing_paths(row):
    existing_paths = [path for path in row if os.path.exists(path)]
    return existing_paths
    
# Function to create list of formatted strings for each row
def create_formatted_list(row):
    rid_plain = str(row['id'])                  # name with chr
    rid_chr   = f"{row['chr']}_{row['id']}"     # name without chr
    
    if len(prefix) > 0:
        candidates_plain = [f"{file_path}/{p}.{rid_plain}.{suffix}" for p in prefix]
        candidates_chr   = [f"{file_path}/{p}.{rid_chr}.{suffix}"   for p in prefix]
    else:
        candidates_plain = [f"{file_path}/{rid_plain}.{suffix}"] # GWAS data do not have prefix
        candidates_chr   = [f"{file_path}/{rid_chr}.{suffix}"]
    
    # prioritize plain
    existing_plain = filter_existing_paths(candidates_plain)
    if existing_plain:
        return existing_plain
    
    existing_chr = filter_existing_paths(candidates_chr)
    return existing_chr

# Apply the function to each row
region['original_data'] = region.apply(create_formatted_list, axis=1)

def group_by_region(lst, partition):
    # from itertools import accumulate
    # partition = [len(x) for x in partition]
    # Compute the cumulative sums once
    # cumsum_vector = list(accumulate(partition))
    # Use slicing based on the cumulative sums
    # return [lst[(cumsum_vector[i-1] if i > 0 else 0):cumsum_vector[i]] for i in range(len(partition))]
    return partition


def is_file_exported(paths, results_set):
    for path in paths:
        basename = os.path.basename(path)
        if basename not in results_set:
            return False
    return True

region['original_data'] = region['original_data'].apply(filter_existing_paths)
region = region[region['original_data'].map(bool)]

# if provided exported meta, check if the original data are all exported already, isfo skip them 
if not exported_file.is_dir():
    if exported_file.is_file():
        results = pd.read_csv(exported_file, sep='\t')
        results_set = set(results['original_data'])
        results_set = {item.strip() for sub in results_set for item in sub.split(',')}
        mask = region['original_data'].apply(lambda paths: not is_file_exported(paths, results_set))
        region = region[mask]
    else:
        raise ValueError("The exported_file path provided is not a file.")

# added because there is frequently changed ID from upstream analysis!!!        
# If after filtering no region remains, update 'id' by concatenating 'chr' and original 'id' with '_'
if region.empty:
    print("No regions remain after filtering. Reattempting using concatenated id (chr_id).")
    # Update the 'id' column using the concatenation of the first column (chr) and the original 'id'
    region = pd.read_csv(region_file, sep='\t', names=['chr', 'start', 'end', 'id'])
    region['id_chr'] = region['chr'].astype(str) + "_" + region['id'].astype(str)
    
    if len(region_ids) > 0:
        region = region[region['id'].isin(region_ids) | region['id_chr'].isin(region_ids)]
    
    if region.empty:
        print("Still no regions matched after using id_chr.")
    else:
        region['original_data'] = region.apply(create_formatted_list, axis=1)
        region = region[region['original_data'].map(bool)]
    
    # Reapply exported_file filtering if applicable
    if not exported_file.is_dir():
        if exported_file.is_file():
            results = pd.read_csv(exported_file, sep='\t')
            results_set = set(results['original_data'])
            results_set = {item.strip() for sub in results_set for item in sub.split(',')}
            mask = region['original_data'].apply(lambda paths: not is_file_exported(paths, results_set))
            region = region[mask]
        else:
            raise ValueError("The exported_file path provided is not a file.")
regional_data = {
    'meta': [(row['chr'], row['start'], row['end'], row['id']) for _, row in region.iterrows()],
    'data': [(row['original_data']) for _, row in region.iterrows()]
}

meta_info = regional_data['meta']
stop_if(len(regional_data['data']) == 0, f' All files have been exported already')

input: regional_data["data"], group_by = lambda x: group_by_region(x, regional_data["data"]), group_with = "meta_info"
output: f"{cwd}/{name}_cache/{name}_{_meta_info[3]}.tsv"
task: trunk_workers = job_size, walltime = walltime, trunk_size = job_size, mem = mem, cores = numThreads, tags = f'{_output:bn}'
R: expand = "${ }",stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout', container = container
    library(tidyverse)
    library(data.table)
    library(susieR)
    library(pecotmr)

    # check top_loci existing or not
    has_rows <- function(df) {
      !is.null(df) && nrow(df) > 0
    }

    # function to allele flipping checking by mapping them to aligned genotype bim file
    align_to_genoref <- function(var_list, geno_ref, region ){
        geno_ref <- pecotmr:::tabix_region(file= geno_ref,
                    region = region)
        colnames(geno_ref) <- c('chr', 'pos', 'alt', 'ref')
        geno_ref <- geno_ref %>% mutate(chr = gsub('chr','',chr))
        var_list_df <- data.frame(chr = str_split(var_list,":|_",simplify = T)[,1] %>% gsub('chr','',.),
            pos = str_split(var_list,":|_",simplify = T)[,2],
            ref = str_split(var_list,":|_",simplify = T)[,3],
            alt = str_split(var_list,":|_",simplify = T)[,4])
        # merge_genotype_data from below cell
        aligned_var_df <- merge_genotype_data(geno_ref, var_list_df, all=FALSE)
        aligned_var <- aligned_var_df %>%
          mutate(id = {
            if (grepl(":", var_list[1])) {
              if (grepl("_", var_list[1])) {
                paste(chr, paste(pos, ref, alt, sep = "_"),sep = ':')
              } else {
                paste(chr, pos, ref, alt, sep = ":")
              }
            } else {
              paste(chr, pos, ref, alt, sep = "_")
            }
          }) %>%
          pull(id)
        if (grepl("chr", var_list[1]))  aligned_var <- paste0("chr",aligned_var)
        return(aligned_var)
    }
    
    # function to map variant list to geno ref
    merge_genotype_data <- function(df1, df2, all = TRUE) {
      setDT(df1)
      setDT(df2)
      df1[, key := paste(chr, pos, pmin(alt, ref), pmax(alt, ref))]
      df2[, key := paste(chr, pos, pmin(alt, ref), pmax(alt, ref))]
      df2[df1, on = "key", flip := i.alt == ref & i.ref == alt, by = .EACHI]
      df2[flip == TRUE, c("alt", "ref") := .(ref, alt)]
      if (all) {
        df_combined <- unique(rbindlist(list(df1[, .(chr, pos, alt, ref)], df2[, .(chr, pos, alt, ref)])), by = c("chr", "pos", "alt", "ref"))
      } else {
        df_combined <- df2[, .(chr, pos, alt, ref)]
      }
      return(df_combined)
    }

    # Function to filter credible sets with all variants PIP < 0.05 and update rows based on _min_corr suffix condition
    update_and_filter_cs_ids <- function(dat_con, dat_susie, df, purity_thresholds) {
      # Identify cs_coverage columns
      cs_columns <- grep("^cs_coverage", names(df), value = TRUE)
    
      # Flag rows with any cs_coverage > 0
      df$cs_all_non_zero_orig <- rowSums(df[cs_columns] == 0) != length(cs_columns)
    
      # Iterate over cs_coverage columns and their unique CS IDs
      for (cs_column in cs_columns) {
        unique_cs_ids <- unique(df[[cs_column]])
        # Update top loci based on minimum correlation for each coverage column
          for(purity_threshold in purity_thresholds){
              df <- update_top_loci_cs_annotation(dat_con, dat_susie, top_loci_df = df, coverage_value = cs_column, threshold = purity_threshold)
          }
    
        for (cs_id in unique_cs_ids[unique_cs_ids > 0]) {
          # Flag CS IDs where all PIPs < pip_thres (default 0.05)
          pip_check <- df[df[[cs_column]] == cs_id, "pip"] < 0.05
          # label the whole cluster if all pip < pip_thres (default 0.05)
          if (all(pip_check, na.rm = TRUE)) {
            df[[cs_column]] <- ifelse(df[[cs_column]] == cs_id, 0, df[[cs_column]])
          }
        }
      }
      # Identify min corr cs_coverage columns
      mincor_columns <- grep("_min_corr", names(df), value = TRUE) # Identify _mincor columns
    
      # Filter out rows where all cs_coverage columns are 0, all _mincor columns are 0,
      # and cs_all_non_zero_orig is TRUE
      df <- df %>%
        filter(!(cs_all_non_zero_orig &
                   rowSums(df[cs_columns] == 0) == length(cs_columns) &
                   rowSums(df[mincor_columns] == 0) == length(mincor_columns))) %>%
        select(-cs_all_non_zero_orig) # Remove the temporary flag column
      df <- df %>%
        select(-starts_with("cs_coverage"), all_of(cs_columns), all_of(mincor_columns))
      return(df)
     }
    
    
     # update top loci table
     update_top_loci_cs_annotation <- function(dat_con, dat_susie, top_loci_df, coverage_value, threshold ) {
       # update susie obj based CS
       if(coverage_value == 'cs_coverage_0.95') dat_susie_tmp <- dat_susie
        else dat_susie_tmp <- dat_susie$sets_secondary[[gsub('cs_','',coverage_value)]]
       #get purity res  
       purity_res <- dat_susie_tmp$sets$purity 
    
       not_pass_min_cs <- rownames(purity_res)[purity_res$min.abs.corr < threshold] %>%
         gsub('L', '', .)
       top_loci_df[[paste0( coverage_value, "_min_corr_", threshold)]] <- top_loci_df[[coverage_value]]
       if (length(not_pass_min_cs) > 0) {
         top_loci_df[[paste0( coverage_value, "_min_corr_", threshold)]][top_loci_df[[coverage_value]] %in% not_pass_min_cs] <- 0
       }
       return(top_loci_df)
     }
    
    # function to decide run update_and_filter_cs_ids or not
    process_top_loci <- function(dat_con, dat_susie,purity_thresholds) {
      data_frame <- dat_con$top_loci
      if (has_rows(data_frame)) {
        return(update_and_filter_cs_ids(dat_con, dat_susie, data_frame, purity_thresholds))
      } else {
        return(data_frame)
      }
    }

    # function to get pip with their variant name
    get_pip_withname <- function(susie_obj){
      pip = susie_obj$susie_result_trimmed$pip
      if(!(is.null(pip)))  names(pip) = susie_obj$variant_names
      return(pip)
    }
  
    # add column in top loci to indicate if the variant is identified in susie_on_top_pc top loci table or not
    annotate_susie <- function(obj, top_loci){
        tryCatch({
            df <- data.frame()
    
            results <- lapply(obj[['susie_on_top_pc']], function(x) {
              x[['top_loci']]
            })
            results <- results[!sapply(results, is.null)]
                              
            df <- bind_rows(results)
    
            top_loci_others <-  df %>%
              filter(rowSums(select(., starts_with('cs_coverage_'))) == 0)
            top_loci_cs <- df %>%
              filter(rowSums(select(., starts_with('cs_coverage_'))) > 0)
    
            susie_vars <- rbind(top_loci_others %>% filter(pip >= 0.05), top_loci_cs) %>% pull(variant_id)
            if(length(susie_vars) > 0) susie_vars <- align_to_genoref(var_list = susie_vars, geno_ref = geno_ref, region = paste0(sub("^(chr)?","chr","${_meta_info[0]}"), ":", "${_meta_info[1]}", "-", "${_meta_info[2]}"))
    
            susie_vars_cs <- top_loci_cs %>% pull(variant_id)
            if(length(susie_vars_cs) > 0)  susie_vars_cs <- align_to_genoref(var_list = susie_vars_cs, geno_ref = geno_ref, region = paste0(sub("^(chr)?","chr","${_meta_info[0]}"), ":", "${_meta_info[1]}", "-", "${_meta_info[2]}"))
    
            top_loci <- top_loci %>% mutate(annotated_susie = ifelse(variant_id %in% susie_vars, 1, 0),
                               annotated_susie_cs = ifelse(variant_id %in% susie_vars_cs, 1, 0))
    
            return(top_loci)
        }, error = function(e) {
            warning("Error in annotate_susie: ", e$message)
        })
    }
    
    # function to match context values and add analysis_name
    match_contexts <- function(df, context_meta) {
      # Split context_meta's context column into separate rows, one for each comma-separated value
      context_meta_expanded <- context_meta %>% 
        separate_rows(context, sep = ",") %>%
        mutate(context = trimws(context)) # Remove potential leading and trailing whitespace

      # Loop through each context in df to find the most precise match in context_meta_expanded
      df$super_context <- sapply(df$context, function(df_context) {
        # Find all potential matches
        potential_matches <- context_meta_expanded %>%
          filter(str_detect(df_context, context)) %>%
          arrange(desc(nchar(context))) # Sort potential matches by descending length of context

        # Select the longest match as the most precise one
        if(nrow(potential_matches) > 0) {
          return(potential_matches$context[1])
        } else {
          return(NA) # Return NA if no match is found
        }
      })

      return(df)
    }
    # Define a function to process context data and match contexts
    process_and_match_contexts <- function(context_meta_path, cons_top_loci, res_minp) {
      # Read context metadata
      context_meta <- fread(context_meta_path)

      # Extract super contexts
      super_contexts <- cons_top_loci[[1]] %>% unlist %>% names %>% as.character
      # Create a data frame of context and minimum p-values
      context_df <- data.frame(context = super_contexts,
                               min_p = res_minp[[1]][super_contexts] %>% unlist %>% as.numeric)
      # Match contexts using the previously defined match_contexts function
      context_df <- match_contexts(context_df, context_meta)
      # Filter context_df by super_context
      context_df_no_order <- context_df %>% filter(context == super_context)
      context_df_with_order <- context_df %>% filter(context != super_context) 

      return(list(context_df_no_order= context_df_no_order,context_df_with_order = context_df_with_order ))
    }

    # Function to filter introns based on leafcutter2 cluster
    process_lf2_cluster <- function(df){
        df <- df%>%
        mutate(cluster_id = str_extract(context, "clu_\\d+"), 
               category = str_extract(context, "([^:]+)(?=:EN)")) %>%
        group_by(cluster_id) %>%
        # filter the intron clusters with NE and IN events
        mutate(cluster_status = ifelse(any(category %in% c("NE", "IN")), "no", "yes")) %>%
        filter(cluster_status == "yes") %>%
        ungroup %>%
        filter(!str_detect(context, ":PR:")) %>%  #FIXME: only keep unproductive events in leafcutter2 results. 
        select(-cluster_status, -cluster_id, -category)
        return(df)
    }

    # Function to combine the contexted with and without order 
    combine_order <- function(context_df_no_order, context_df_with_order){
     context_df_with_order <- context_df_with_order %>%
        group_by(super_context) %>%
        summarise(min_p = min(min_p), .groups = 'keep') %>%
        left_join(context_df_with_order, by = c("super_context", "min_p" = "min_p"))

      # Combine context_df_no_order and context_df_with_order contexts
      cons_top_loci_minp <- c(context_df_no_order$context, context_df_with_order$context)
        return(cons_top_loci_minp)
    }
    
    # Revised summarise_lfsr_by_marker function with distinct error cases
    summarise_lfsr_by_marker <- function(multicontext_res,
                                           pos = NULL,
                                           markers = NULL,
                                           conditions = NULL,
                                           poslim = NULL,
                                           lfsr_cutoff = 0.01,
                                           sentinel_only = FALSE,
                                           cs_plot = NULL,
                                           conditional_effect = TRUE) {
      tryCatch({
        # Extract fitted object from multicontext_res
        fit <- multicontext_res[[1]]$mvsusie_fitted
    
        # Set default parameters if not provided
        if (is.null(markers)) markers <- fit$variable_names
        if (is.null(conditions)) conditions <- fit$condition_names
        if (is.null(pos)) pos <- seq(1, length(markers))
        if (is.null(poslim)) poslim <- range(pos)
        if (is.null(cs_plot)) cs_plot <- names(fit$sets$cs)
    
        # Create initial data frame for plotting
        pdat <- data.frame(pip = fit$pip, pos = pos, cs = as.character(NA),
                           stringsAsFactors = FALSE)
        css <- names(fit$sets$cs)
        for (i in css) {
          j <- fit$sets$cs[[i]]
          pdat[j, "cs"] <- i
        }
        rows <- which(!is.na(pdat$cs))
        pdat_cs <- pdat[rows, ]
    
        # Filter by position limits
        rows1 <- which(pdat$pos >= poslim[1] & pdat$pos <= poslim[2])
        rows2 <- which(pdat_cs$pos >= poslim[1] & pdat_cs$pos <= poslim[2])
        pdat <- pdat[rows1, ]
        pdat_cs <- pdat_cs[rows2, ]
    
        # Order and label credible sets
        pdat_cs$cs <- factor(pdat_cs$cs)
        css <- levels(pdat_cs$cs)
        L <- length(css)
        cs_pos <- sapply(fit$sets$cs[css], function(x) median(pos[x]))
        css <- css[order(cs_pos)]
        pdat_cs$cs <- factor(pdat_cs$cs, levels = css)
        cs_size <- sapply(fit$sets$cs[css], length)
        for (i in 1:L) {
          j <- css[i]
          if (cs_size[i] == 1) {
            levels(pdat_cs$cs)[i] <- sprintf("%s (1 SNP)", j)
          } else {
            levels(pdat_cs$cs)[i] <- sprintf("%s (%d SNPs, %0.3f purity)",
                                             j, cs_size[j], fit$sets$purity[j, "min.abs.corr"])
          }
        }
    
        # Set up traits and effects matrix
        traits <- conditions
        r <- length(traits)
        lmax <- nrow(fit$alpha)
        fit$b1_rescaled <- fit$b1_rescaled[, -1, ]
        rownames(fit$b1_rescaled) <- paste0("L", 1:lmax)
        rownames(fit$single_effect_lfsr) <- paste0("L", 1:lmax)
        colnames(fit$single_effect_lfsr) <- traits
        rownames(fit$alpha) <- paste0("L", 1:lmax)
        effects <- matrix(0, r, L)
        rownames(effects) <- traits
        colnames(effects) <- css
    
        # Initialize effect data frame
        effect_dat <- data.frame(matrix(as.numeric(NA),
                                          prod(length(conditions) * length(markers)), 8))
        names(effect_dat) <- c("trait", "marker", "pos", "effect", "z", "lfsr", "cs", "sentinel")
        effect_dat$trait <- rep(conditions, length(markers))
        effect_dat$marker <- rep(markers, each = length(conditions))
        effect_dat$pos <- rep(pos, each = length(conditions))
        effect_dat$sentinel <- 0
    
        # Process each credible set
        for (i in 1:L) {
          l <- css[i]
          j <- fit$sets$cs[[l]]
          b <- fit$b1_rescaled[l, j, ]
          if (conditional_effect) {
            b <- b / fit$alpha[l, j]
          }
          marker_names <- markers[j]
          marker_idx <- which(effect_dat$marker %in% marker_names)
          effect_dat[marker_idx, "cs"] <- l
          effect_dat[marker_idx, "lfsr"] <- rep(fit$single_effect_lfsr[l, ], length(marker_names))
          effect_dat[marker_idx, "effect"] <- as.vector(t(b))
          if (!is.null(fit$z)) {
            effect_dat[marker_idx, "z"] <- as.vector(t(fit$z[j, ]))
          }
          max_idx <- which.max(fit$alpha[l, j])
          effect_dat[which(effect_dat$marker == marker_names[max_idx]), "sentinel"] <- 1
          effects[, i] <- ifelse(is.null(nrow(b)), b, b[max_idx, ])
        }
    
        # Filter and process effect data
        effect_dat <- effect_dat[which(!is.na(effect_dat$cs)), ]
        rows1 <- which(effect_dat$pos >= poslim[1] & effect_dat$pos <= poslim[2])
        effect_dat <- effect_dat[rows1, ]
        effect_dat$marker_cs <- paste0(effect_dat$marker, "(", effect_dat$cs, ")")
        pdat_sentinel <- effect_dat[which(effect_dat$sentinel == 1), ]
        pdat_sentinel <- unique(pdat_sentinel[, c("marker", "marker_cs", "pos")])
        pdat_sentinel$pip <- fit$pip[match(pdat_sentinel$marker, fit$variable_names)]
    
        # Optionally keep only sentinel variants
        if (sentinel_only) {
          effect_dat <- effect_dat[which(effect_dat$sentinel == 1), ]
        }
        # Optionally filter by specified credible sets to plot
        if (!missing(cs_plot)) {
          effect_dat <- effect_dat[which(effect_dat$cs %in% cs_plot), ]
        }
        effect_dat$cs <- factor(effect_dat$cs, levels = css)
        effect_dat$trait <- factor(effect_dat$trait, traits)
        rows <- which(effect_dat$lfsr < lfsr_cutoff)
        effect_dat <- effect_dat[rows, ]
    
        # Error Case 1: No variants left after LFSR cutoff filtering
        if (nrow(effect_dat) == 0) {
          message("Warning Case 1: No variants passed the LFSR cutoff threshold; returning NA values.")
          top_loci <- multicontext_res[[1]][['top_loci']]
          top_loci$trait <- NA
          top_loci$lfsr <- NA
          return(top_loci)
        }
    
        effect_dat <- effect_dat %>% 
          mutate(pip = fit$pip[match(marker, names(fit$pip))],
                 variant_id = marker)
    
        # Summarise results and merge with top loci
        result <- effect_dat %>%
          group_by(variant_id, cs) %>%
          arrange(trait) %>%
          summarise(trait = paste(trait, collapse = ";"), 
                    lfsr = paste(lfsr, collapse = ";"),
                    .groups = "drop") %>% 
          mutate(cs_coverage_0.95 = gsub('L', '', cs)) %>%
          select(-cs)
    
        top_loci <- multicontext_res[[1]][['top_loci']]
        merge(top_loci, result, by = c('variant_id', 'cs_coverage_0.95'), all.x = TRUE)
      }, error = function(e) {
        # Error Case 2: Any unexpected error in processing
        message("Warning Case 2: An unexpected error occurred: ", e$message)
        top_loci <- tryCatch(multicontext_res[[1]][['top_loci']],
                             error = function(x) data.frame(variant_id = NA, cs_coverage_0.95 = NA))
        top_loci$trait <- NA
        top_loci$lfsr <- NA
        return(top_loci)
      })
    }
    
    # function to add effect size
    add_coef_column <- function(dat_study) {
      # Ensure the required components exist
      # Check if dat_study contains both required components
    if (!("susie_result_trimmed" %in% names(dat_study)) || !("top_loci" %in% names(dat_study))) {
      # Return a null list instead of stopping
      return(data.frame())
    }
          
      # Extract the coefficient matrix and ensure top_loci is a data.frame
      coef_matrix <- dat_study[["susie_result_trimmed"]][["coef"]][-1,] / dat_study[["susie_result_trimmed"]][['pip']]
      dt <- dat_study[["top_loci"]]
      if (!is.data.frame(dt)) {
        dt <- as.data.frame(dt)
      }
      
      # Check that dt has the required columns
      if (!all(c("variant_id", "trait") %in% names(dt))) {
        stop("The 'top_loci' data frame must contain 'variant_id' and 'trait' columns.")
      }
      
      # Add the 'coef' column by matching each variant with its corresponding traits
      dt[["conditional_effect"]] <- mapply(function(variant, trait_str) {
        # Return NA if trait_str is missing or variant not found in coef_matrix
        if (is.na(trait_str) || !(variant %in% rownames(coef_matrix))) {
          return(NA_character_)
        }
        # Split the trait string by ';' and trim extra whitespace
        traits <- trimws(unlist(strsplit(trait_str, ";")))
        # Subset coefficients for the variant and specified traits
        coefs <- coef_matrix[variant, traits, drop = TRUE]
        paste(coefs, collapse = ";")
      }, dt[["variant_id"]], dt[["trait"]], USE.NAMES = FALSE)
      
      dat_study[["top_loci"]] <- dt
      return(dt)
    }

    ###### MAIN ######
    # Process each path and collect results
    orig_files = c(${",".join(['"%s"' % x.absolute() for x in _input])})
     # Extract info from each RDS file
    results <- list()
    gene = "${_meta_info[3]}"
    geno_ref <- "${geno_ref}"

    # Post Processing: Extracting info
    # use for loop instead of apply to save memory
    res <- res_sum <- res_minp <- cons_top_loci <- list()
    pip_sum <- data.frame()
    for(i in seq_along(orig_files)) {
      rds_path <- orig_files[i]
       dat <- tryCatch({
        readRDS(rds_path)
      }, error = function(e) {
        writeLines(rds_path %>% basename, gsub(".tsv","_error","${_output}"))
        return(NULL) # 
      })

      if(is.null(dat)) next
  
      #extract qtl type from susie rds file name, if we have set decent condtiton name, this could be removed 
      temp_list <- list() # Temporary list to store inner results
      genes <- names(dat)
        for(id in genes){
          dat_study <- dat[[id]]
          temp_list <- list()
          if(${"TRUE" if mnm else "FALSE"}) {
              if (length(dat[[id]]) <= 1) {
                message("No multi-contexts results due to no twas results; skipping multi-context export.")
                next
              }
              conditions <- paste(orig_files[i] %>% basename %>% str_split(., '[.]', simplify = T) %>% .[,1],dat_study[['context_names']] %>% paste(  ., collapse = ';'), sep = ':') 
              dat_study[['top_loci']] <- summarise_lfsr_by_marker(dat)
              dat_study[['top_loci']] <- add_coef_column(dat_study)
            } else conditions <- names(dat_study)[sapply(dat_study, function(x) length(x) > 0)]
  
          if(length(conditions) > 0) {
  
            for(condition in conditions) {
                if (${"FALSE" if mnm else "TRUE"}) dat_con <- dat_study[[condition]]${[['fsusie_summary']] if fsusie else ''} else dat_con <- dat_study${[['fsusie_summary']] if fsusie else ''}
                if (${"TRUE" if gwas else "FALSE"}){
                    method <- names(dat_study[[condition]])[names(dat_study[[condition]]) != 'rss_data_analyzed'] ## FIXME: this method is not showing in meta (not need if only one method). or we can use `context@method` in meta
                    if(length(method) == 1) dat_con <- dat_con[[method]] else stop('more than 1 method, please check.')
                }
                dat_susie <- dat_con$susie_result_trimmed
                
                # calculate the sum of pip for the vairnats with pip > 0
                pip_sum = rbind(pip_sum, 
                                data.frame(pip_sum = dat_susie[['pip']] [dat_susie[['pip']] > 0] %>% sum, 
                                           condition = condition)
                               )
                if(${"FALSE" if mnm else "TRUE"} & has_rows(dat_con[['top_loci']])) dat_con[['top_loci']] <- dat_con[['top_loci']] %>% mutate(conditional_effect = (coef(dat_susie)/ (dat_susie[['pip']]))[variant_id])
                
                # rename context names if needed
                if(${"TRUE" if condition_meta.is_file() else "FALSE"}){
                      meta <- suppressMessages(read_delim("${condition_meta}", col_names = F))
                      context <- meta %>% filter(X1 == condition) %>% pull(X2)
                      if (length(context) == 0) {
                        context <- condition
                        message("No matching entries found. context has been set to the condition value.")
                      }
                    } else {context <- condition}
                  
                # align variants to aligned geno
                if(has_rows(dat_con$top_loci) || has_rows(dat_con$preset_top_loci)) cons_top_loci[[id]][[context]] <- context else  cons_top_loci[[id]][[context]] <- NULL    
                variant_ids <- c(dat_con$top_loci$variant_id, dat_con$variant_names, dat_con$preset_variants_result$top_loci$variant_id, dat_con$preset_variants_result$variant_names)
                unique_variant_ids <- unique(variant_ids)
                aligned_variant_ids <- align_to_genoref(unique_variant_ids, geno_ref, paste0(sub("^(chr)?","chr","${_meta_info[0]}"), ":", "${_meta_info[1]}", "-", "${_meta_info[2]}"))
                names(aligned_variant_ids) <- unique_variant_ids

                # change beta or z in top loci and sumstats
                top_loci_changed_indexes <- which(dat_con$top_loci$variant_id !=  aligned_variant_ids[dat_con$top_loci$variant_id] %>% as.character )
                if(has_rows(dat_con$top_loci) & length(top_loci_changed_indexes) > 0) {
                      dat_con$top_loci$conditional_effect[top_loci_changed_indexes] <- (-1)* dat_con$top_loci$conditional_effect[top_loci_changed_indexes] # FIXME if coef does not need to check flip
                      if (${"FALSE" if gwas else "TRUE"}) {
                          dat_con$top_loci$betahat[top_loci_changed_indexes] <- (-1)* dat_con$top_loci$betahat[top_loci_changed_indexes]
                      } else {
                          dat_con$top_loci$z[top_loci_changed_indexes] <- (-1)* dat_con$top_loci$z[top_loci_changed_indexes]
                      }
                }
                all_changed_indexes <- which(dat_con$variant_names !=  aligned_variant_ids[dat_con$variant_names] %>% as.character )
                if(length(all_changed_indexes) > 0) {
                      if (${"FALSE" if gwas else "TRUE"}) {
                          dat_con$sumstats$betahat[all_changed_indexes] <- (-1)* dat_con$sumstats$betahat[all_changed_indexes]
                      } else {
                           dat_con$sumstats$z[all_changed_indexes] <- (-1)* dat_con$sumstats$z[all_changed_indexes]
                      }
                }

                # change variant names 
                if(has_rows(dat_con$top_loci)) dat_con$top_loci$variant_id <- aligned_variant_ids[dat_con$top_loci$variant_id] %>% as.character ###
                dat_con$variant_names <- aligned_variant_ids[dat_con$variant_names] %>% as.character ###

                
                if (${"FALSE" if gwas else "TRUE"}) { 
                    res[[id]][[context]] <- list(
                      top_loci = process_top_loci(dat_con, dat_susie, purity_thresholds = c(${",".join(['"%s"' % x for x in min_corr])})),
                      pip = get_pip_withname(dat_con)
                    )
                    # the preset data in fsusie is actually from first PC and analyzed by susie, we'd like to remove them to avoid misleading
                    # although no preset res in mnm, we can put it here with the same layer structure. It would not report preset results
                    if (${"FALSE" if fsusie else "TRUE"}) {
                          if(has_rows(dat_con$preset_variants_result$top_loci)) {
                              dat_con$preset_variants_result$top_loci$variant_id <- aligned_variant_ids[dat_con$preset_variants_result$top_loci$variant_id] %>% as.character
                              # Calculate and add the conditional effect in one step
                              if(has_rows(dat_con$preset_variants_result[['top_loci']])){
                                  dat_con$preset_variants_result[['top_loci']] <- dat_con$preset_variants_result[['top_loci']] %>%
                                      mutate(conditional_effect = (coef(dat_con$preset_variants_result$susie_result_trimmed) /
                                                                     dat_con$preset_variants_result$susie_result_trimmed[['pip']])[variant_id])
                                  dat_con$top_loci$conditional_effect[top_loci_changed_indexes] <- (-1)* dat_con$top_loci$conditional_effect[top_loci_changed_indexes] # FIXME if coef does not need to check flip
                                }
                              }
                          dat_con$preset_variants_result$variant_names <- aligned_variant_ids[dat_con$preset_variants_result$variant_names] %>% as.character
                          # change beta in preset top loci
                          preset_top_loci_changed_indexes <- which(dat_con$preset_variants_result$top_loci$variant_id !=  aligned_variant_ids[dat_con$preset_variants_result$top_loci$variant_id] %>% as.character )
                          if(has_rows(dat_con$top_loci) & length(preset_top_loci_changed_indexes) > 0) dat_con$preset_variants_result$top_loci$betahat[preset_top_loci_changed_indexes] <- (-1)* dat_con$preset_variants_result$top_loci$betahat[preset_top_loci_changed_indexes]   

                          res[[id]][[context]][['region_info']] = dat_con$region_info
                          res[[id]][[context]][['CV_table']] = dat_con$twas_cv_result$performance
                          res[[id]][[context]][['preset_top_loci']] = process_top_loci(dat_con$preset_variants_result, dat_con$preset_variants_result$susie_result_trimmed, purity_thresholds = c(${",".join(['"%s"' % x for x in min_corr])}))
                          res[[id]][[context]][['preset_pip']] = get_pip_withname(dat_con$preset_variants_result)
                    } else {
                          res[[id]][[context]][['region_info']] = dat_study[[condition]]$region_info
                          res[[id]][[context]][['CV_table']] = dat_study[[condition]]$twas_cv_result$performance
                          res[[id]][[context]][['top_loci']] = annotate_susie(dat_study[[condition]], res[[id]][[context]][['top_loci']])
                          
                    } 
                # fsusie do not have sumstats or p value
                if (${"FALSE" if fsusie or mnm else "TRUE"}) {
                    res_sum[[id]][[context]] <- list(
                      variant_names = dat_con$variant_names,
                      sumstats = dat_con$sumstats
                    )

                    if (${"FALSE" if fsusie or mnm else "TRUE"}) res_minp[[id]][[context]] <- min(pecotmr:::wald_test_pval(dat_con$sumstats$betahat, dat_con$sumstats$sebetahat,  n = 1000)) ##assuming sample size is 1000
                }
  
                } else {
                    res[[id]][[context]][[method]] <- list(
                      top_loci = process_top_loci(dat_con, dat_susie, purity_thresholds = c(${",".join(['"%s"' % x for x in min_corr])})),
                      pip = get_pip_withname(dat_con)
                    )
                    res_sum[[id]][[context]][[method]] <- list(
                      variant_names = dat_con$variant_names,
                      sumstats = dat_con$sumstats
                    )

                }
                

                if(has_rows(dat_con$top_loci) || has_rows(dat_con$preset_top_loci)) cons_top_loci[[id]][[context]] <- context else  cons_top_loci[[id]][[context]] <- NULL
              }
            }
        }
    }

    cons_top_loci <- cons_top_loci %>% compact()  # Use 'compact' to remove NULLs
    cons_top_loci <- if(length(cons_top_loci) > 0) cons_top_loci else NA

    combine_data = combine_data_sumstats = cons_top_loci_minp = ''
    combine_data = paste0("${_output:add}","/","${name}", ".", ${'"epigenomics_"' if fsusie else '""'}, ${ '"metabolomics_"' if metaQTL else '""'}, gene, ".cis_results_db.rds")
    if (${"FALSE" if fsusie else "TRUE"}) combine_data_sumstats = gsub("\\.rds$", ".sumstats.rds", combine_data)
  
    if (${"TRUE" if exported_file.is_file() else "FALSE"}){
        if (file.exists(combine_data)) {
          res_exp <- readRDS(combine_data)
          res[[gene]] <- c(res[[gene]], res_exp[[gene]]) # this may need to change if tehre are multiple genes in one rds file. 
        }
        if (file.exists(combine_data_sumstats)) {
          res_sum_exp <- readRDS(combine_data_sumstats)
          res_sum[[gene]] <- c(res_sum[[gene]], res_sum_exp[[gene]])
        }
    }
    saveRDS(res, combine_data)
    # only save sumstats results when NOT fsusie or multi gene mvsusie
    if (${"FALSE" if fsusie or mnm else "TRUE"}) saveRDS(res_sum, combine_data_sumstats)
  
    # generate md5 for data transferring
    system(paste("md5sum ",combine_data, " > ", paste0(combine_data, ".md5")))
    if (${"FALSE" if fsusie or mnm else "TRUE"}) system(paste("md5sum ",combine_data_sumstats, " > ", paste0(combine_data_sumstats, ".md5")))
  
      
    TSS <- tryCatch({dat_con$region_info$region_coord$start},  error = function(e) {return(NA)})
    if (length(res) > 0) conditions = paste(names(res[[1]]), collapse = ",") else conditions = ''
    
    # fsusie does not have sumstats or pvalue, do not need to run this

    # if (${"FALSE" if fsusie or gwas or mnm else "TRUE"})  {
    #     context_map <- process_and_match_contexts('${context_meta}', cons_top_loci, res_minp)
    #     context_map$context_df_with_order <- process_lf2_cluster(context_map$context_df_with_order)
    #     cons_top_loci_minp <- combine_order(context_map$context_df_no_order, context_map$context_df_with_order)
    # }

    # save pip sum file 
     write_delim(pip_sum, 
                 paste0("${_output:add}","/","${name}", ".", ${'"epigenomics_"' if fsusie else '""'}, ${ '"metabolomics_"' if metaQTL else '""'}, gene, ".pip_sum"), 
                 delim = '\t')

    meta = data.frame(chr="${_meta_info[0]}", start="${_meta_info[1]}", end="${_meta_info[2]}", region_id="${_meta_info[3]}", TSS =  if(is.null(TSS)) NA else TSS, 
                      original_data = paste(basename(orig_files), collapse = ","), combined_data = basename(combine_data), combined_data_sumstats = basename(combine_data_sumstats), 
                      conditions = conditions, 
                      conditions_top_loci = if(length(cons_top_loci) > 0) cons_top_loci[[1]] %>% unlist %>% names %>% as.character %>% paste(., collapse = ',') else ''
                      # , conditions_top_loci_minp = if(length(cons_top_loci_minp) > 0) cons_top_loci_minp %>% paste(., collapse = ',') else ''
                      )

    write_delim(meta, "${_output}", delim = '\t')
  
[cis_results_export_2, gwas_results_export_2]
# set it as True if you only want run first step (running parallel) and combine them manually after all finished 
parameter: step1_only = False
skip_if(step1_only)
# provide exported meta to filter the exported genes 
parameter: exported_file = path()
# optional: qtl or gwas, there is slightly different in qtl and gwas rds file
parameter: gwas = False
input: group_by = 'all'
output: f"{cwd}/{name}.{'block_results_db' if gwas else 'cis_results_db'}.tsv" 
# stop_if(_input[0] not in locals().keys(), 'All files have been exported already') #FIXME should we remove to a separate file. sothat we can stop globally as above
task: trunk_workers = 1, walltime = '1h', trunk_size = 1, mem = '16G', cores = 1, tags = f'{_output:bn}'
bash: expand = "${ }", container = container, stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout'
    if [ -e "${_output:ad}/${name}_cache/" ]; then
        sed 's/^chr/#chr/' `ls ${_output:ad}/${name}_cache/*tsv |head -n1` | head -n 1 > ${_output:an}.temp
        tail -n +2 -q ${_output:ad}/${name}_cache/*.tsv >> ${_output:an}.temp
        error_files=$(find "${_output:ad}/${name}_cache/" -type f -name "*_error")

        if [[ -n $error_files ]]; then
            cat $error_files >> ${_output:an}.error_genes
        else
            echo "No truncated files detected"
        fi
    else
        echo "All files have been exported already"
    fi
    
R: expand = "${ }", container = container, stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout'
    if (file.exists(paste0(${_output:anr},".temp"))) {
        library(tidyverse)
        meta <- read_delim(paste0(${_output:anr},".temp"), delim = '\t')

        if (${"TRUE" if exported_file.is_file() else "FALSE"}){
          exp_meta <- read_delim(${exported_file:r}, delim = '\t')
          meta <- bind_rows(meta, exp_meta) %>%
              group_by(`#chr`, start, end, region_id, TSS) %>%
              summarise(across(c(original_data, combined_data, combined_data_sumstats, conditions, conditions_top_loci), 
                               ~paste(unique(.), collapse = ",")),
                        .groups = 'drop')
              }

        write_delim(meta, ${_output:r}, delim = '\t')
    }
[cis_results_export_3]
# set it as True if you only want run first step (running parallel) and combine them manually after all finished 
parameter: step1_only = False

skip_if(step1_only)
bash: expand = "${ }", container = container, stderr = f'{_input:n}.stderr', stdout = f'{_input:n}.stdout'   
    rm -rf "${_input:ad}/${name}_cache/"
    rm -rf ${_input:an}.temp
#get union of step1
#1200 blocks costed ~5mins with in one for loop
[gwas_results_export_3]
parameter: step1_only = False

skip_if(step1_only)
output: f"{cwd}/{name}.union_export.tsv.gz" 
task: trunk_workers = 1, walltime = '1h', trunk_size = 1, mem = '16G', cores = 1, tags = f'{_output:bn}'
bash: expand = "${ }", container = container, stderr = f'{_input:n}.stderr', stdout = f'{_input:n}.stdout'   
    rm -rf "${_input:ad}/${name}_cache/"
    rm -rf ${_input:an}.temp

R: expand = "${ }", container = container, stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout'
    library(tidyverse)
    library(data.table)

    mtx <- read_delim(${_input:r})
    files <- mtx %>% filter(!is.na(conditions_top_loci)) %>% pull(combined_data) %>% paste0(${_input[0]:dr},'/',.)
    all_top_loci <- data.frame()

    for (i in seq_along(files)) {
      file <- files[i]
      res <- readRDS(file)

      file_top_loci <- lapply(names(res), function(block) {
        lapply(names(res[[block]]), function(study) {
          lapply(names(res[[block]][[study]]), function(method) {
            if (!is.null(res[[block]][[study]][[method]]$top_loci)) {
              temp_df <- res[[block]][[study]][[method]]$top_loci
              mutate(temp_df, study = study, method = method, block = block)
            } else {
              NULL  
            }
          })
        }) %>% bind_rows()  
      }) %>% bind_rows() 

      all_top_loci <- bind_rows(all_top_loci, file_top_loci)
      if (i %% 100 == 0) {
        message(sprintf("Have processed %d files.", i))
      }
    }
    fwrite(all_top_loci, ${_output:r})
# simply combine seperate meta files, not works for having an exsiting exported file for now. 
[combine_export_meta]
parameter: cache_path=path
parameter: output_file = str
parameter: remove_cache = False
output: f"{cache_path:d}/{output_file}" 
bash: expand = "${ }", container = container, stderr = f'{_input:n}.stderr', stdout = f'{_input:n}.stdout'   
    head -n 1 -q ${cache_path}/*.tsv | sed 's/^chr/#chr/' | head -n 1 > ${_output}
    tail -n +2 -q ${cache_path}/*.tsv >> ${_output}
    if ${"true" if remove_cache else "false"}; then
        rm -rf ${cache_path}
    fi
[export_top_loci_2]
# `export_suffix` deliberately differs from step 1's `--suffix` (input rds) to avoid CLI collision.
parameter: export_path = cwd
parameter: export_prefix = ''  # falls back to step-1 `name` if empty
parameter: export_suffix = 'cis_results_db.rds'
parameter: fsusie_prefix = ''
parameter: preset_top_loci = False
parameter: lfsr_thres = 0.01

_pfx = export_prefix if export_prefix else name
import glob as _glob
_rds_files = sorted(_glob.glob(f"{export_path}/{_pfx}.{fsusie_prefix}*.{export_suffix}"))
stop_if(len(_rds_files) == 0, f"No RDS files matched: {export_path}/{_pfx}.{fsusie_prefix}*.{export_suffix}")

input: _rds_files, group_by = 1
output: f"{cwd}/summary/{_input:bnn}.top_loci.bed.gz"
task: trunk_workers = 1, walltime = '1h', trunk_size = job_size, mem = '16G', cores = 1, tags = f'{_output:bn}'
R: expand = "${ }", container = container, stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout'
    library(tidyverse)
    library(data.table)
    lfsr_thres <- as.numeric(${lfsr_thres})
    #function to filter lfsr
    filter_lfsr <- function(df, lfsr_thres){
        df_na <- df %>% filter(is.na(lfsr)|lfsr == '') %>% mutate(event_ID = NA)
        df %>%
            mutate(
              lfsr_list = str_split(lfsr, ";") %>% map(as.numeric),
              effect_list = str_split(conditional_effect, ";"),
              event_list = str_split(event_ID, ";"),
              valid_indices = map(lfsr_list, ~ which(.x < lfsr_thres))
            ) %>%
            rowwise() %>%
            mutate(
              event_ID = paste(event_list[valid_indices], collapse = ";"),
              conditional_effect = paste(effect_list[valid_indices], collapse = ";"),
              lfsr = paste(lfsr_list[valid_indices], collapse = ";")
            ) %>%
            filter(event_ID != "") %>%
            select(any_of(colnames(file_top_loci_rename))) %>%
            ungroup() %>% rbind(df, df_na)
        }
    
    ## main
    res <- readRDS("${_input}")
    file_top_loci <- lapply(names(res), function(gene) {
        lapply(names(res[[gene]]), function(context) {
          lapply(c("top_loci"${',"preset_top_loci"' if preset_top_loci else ''}), function(method) {
            if ("RSS_QC_RAISS_imputed" %in% names(res[[gene]][[context]])) { # that is for rss output
              temp_df <- res[[gene]][[context]][['RSS_QC_RAISS_imputed']][[method]]
            } else {
              temp_df <- res[[gene]][[context]][[method]]
            }
            if (!is.null(temp_df)) {
              mutate(temp_df, study = context, method = method, region = gene)
            } else {
              NULL  
            }
          })
        }) %>% bind_rows()  
    }) %>% bind_rows() 

    # Check if file_top_loci is not empty before processing
    if (!is.null(file_top_loci) && nrow(file_top_loci) > 0) {
        file_top_loci_rename <- file_top_loci %>% 
          # Select necessary columns from the original table
          select(any_of(c(
            "variant_id", "pip", 
            "cs_coverage_0.95_min_corr","cs_coverage_0.7_min_corr", "cs_coverage_0.5_min_corr",
            "cs_coverage_0.95_min_corr_0.8","cs_coverage_0.7_min_corr_0.8", "cs_coverage_0.5_min_corr_0.8",
            "cs_coverage_0.95_min_corr_0.5","cs_coverage_0.7_min_corr_0.5", "cs_coverage_0.5_min_corr_0.5",
            "study", "method", "region", "conditional_effect", "lfsr", "trait", "maf", "betahat", "sebetahat", "z"
            
          ))) %>% 
          mutate(
            # Split variant_id into separate components
            chr = str_split(variant_id, ":", simplify = TRUE)[, 1],  # Chromosome
            pos = str_split(variant_id, ":", simplify = TRUE)[, 2],  # Position
            a1  = str_split(variant_id, ":", simplify = TRUE)[, 4],  # Allele 1
            a2  = str_split(variant_id, ":", simplify = TRUE)[, 3],  # Allele 2
            # Rename and duplicate columns as required
            variant_ID = variant_id,
            gene_ID    = region,
            event_ID   = study,
            PIP = pip,
            MAF = maf,
            betahat = betahat,
            sebetahat = sebetahat,
            z = if ("z" %in% colnames(.)) z else NA,
            
          )  %>%
          {
            if (any(c("cs_coverage_0.95_min_corr_0.8", "cs_coverage_0.7_min_corr_0.8", "cs_coverage_0.5_min_corr_0.8", 
                      "cs_coverage_0.95_min_corr_0.5", "cs_coverage_0.7_min_corr_0.5", "cs_coverage_0.5_min_corr_0.5", 
                      "cs_coverage_0.95_min_corr", "cs_coverage_0.7_min_corr", "cs_coverage_0.5_min_corr") %in% colnames(.))) {
              mutate(.,
                cs_coverage_0.95 = if ("cs_coverage_0.95_min_corr_0.8" %in% colnames(.)) get("cs_coverage_0.95_min_corr_0.8") else 
                                   if ("cs_coverage_0.95_min_corr" %in% colnames(.)) get("cs_coverage_0.95_min_corr") else NA,

                cs_coverage_0.7 = if ("cs_coverage_0.7_min_corr_0.8" %in% colnames(.)) get("cs_coverage_0.7_min_corr_0.8") else 
                                  if ("cs_coverage_0.7_min_corr" %in% colnames(.)) get("cs_coverage_0.7_min_corr") else NA,

                cs_coverage_0.5 = if ("cs_coverage_0.5_min_corr_0.8" %in% colnames(.)) get("cs_coverage_0.5_min_corr_0.8") else 
                                  if ("cs_coverage_0.5_min_corr" %in% colnames(.)) get("cs_coverage_0.5_min_corr") else NA,

                cs_coverage_0.95_purity0.5 = if ("cs_coverage_0.95_min_corr_0.5" %in% colnames(.)) get("cs_coverage_0.95_min_corr_0.5") else NA,
                cs_coverage_0.7_purity0.5 = if ("cs_coverage_0.7_min_corr_0.5" %in% colnames(.)) get("cs_coverage_0.7_min_corr_0.5") else NA,
                cs_coverage_0.5_purity0.5 = if ("cs_coverage_0.5_min_corr_0.5" %in% colnames(.)) get("cs_coverage_0.5_min_corr_0.5") else NA
              )
            } else .
          } %>%
            # Conditionally add lfsr if the column exists
          mutate(region_id = basename("${_input}") %>% str_split(., '[.]', simplify = T) %>% .[,2])  %>% 
          # Conditionally add lfsr if the column exists
          { if ("lfsr" %in% colnames(.)) mutate(., lfsr = lfsr, context = str_split(event_ID, ':', simplify = T) %>% .[,1]) else . } %>% 
          { if ("lfsr" %in% colnames(.) & lfsr_thres != 0) mutate(., event_ID = trait) else . } %>% 
          # Select and order the final set of columns
          select(any_of(c("chr", "pos", "a1", "a2", "variant_ID", "MAF", "betahat", "sebetahat", "z", "gene_ID", "event_ID",
                          "cs_coverage_0.95", "cs_coverage_0.7", "cs_coverage_0.5",
                          "cs_coverage_0.95_purity0.5", "cs_coverage_0.7_purity0.5", "cs_coverage_0.5_purity0.5", "PIP", "conditional_effect", 
                          "lfsr", "region_id", "context"${'",method"' if preset_top_loci else ''}))) %>% 
          # Sort by chromosome and position (convert pos to numeric if necessary)
          arrange(chr, as.numeric(pos))
        # Check if "lfsr" is in column names
        if ("lfsr" %in% colnames(file_top_loci_rename) & lfsr_thres != 0) file_top_loci_rename <- filter_lfsr(file_top_loci_rename, lfsr_thres = lfsr_thres)
        fwrite(file_top_loci_rename,"${_output}")
    } else {
      # Create empty output if input is empty or NULL
      cat("Input data is empty. Creating empty output file.\n")

      # Create an empty data frame with the expected column structure
      empty_df <- data.frame(
        chr = character(0),
        pos = character(0)
      )

      # Write empty file
        fwrite(empty_df,"${_output}")
    }

[export_top_loci_3]
# Combine per-region .top_loci.bed.gz files into one study-level file.
# Public-facing name: {name}.{qtl_type}[.{variant_tag}].top_loci.bed.gz
parameter: qtl_type = str
parameter: variant_tag = ''
# 'auto' (default): skip step 3 if region_name or region_list is set; 'yes': force run; 'no': force skip.
parameter: combine = 'auto'
# Re-declared so this step can detect step-1 subset filters.
parameter: region_list = path()
parameter: region_name = []
stop_if(combine not in ('auto', 'yes', 'no'), f"--combine must be auto|yes|no, got {combine}")
_region_filter_set = (region_list.is_file() or len(region_name) > 0)
_do_combine = (combine == 'yes') or (combine == 'auto' and not _region_filter_set)
skip_if(not _do_combine, f"Skipping combine: combine={combine}, region_filter_set={_region_filter_set}")
_tag = f".{variant_tag}" if variant_tag else ""
input: group_by = 'all'
output: f"{cwd}/summary/{name}.{qtl_type}{_tag}.top_loci.bed.gz"
task: trunk_workers = 1, walltime = '2h', trunk_size = 1, mem = '8G', cores = 1, tags = f'{_output:bn}'
bash: expand = "${ }", container = container, stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout', entrypoint=entrypoint
    set -euo pipefail
    mkdir -p "$(dirname "${_output}")"
    GZ=$(command -v bgzip || echo gzip)
    {
        # header from first non-empty file
        for f in ${_input}; do
            if [ "$(zcat -f "$f" 2>/dev/null | wc -l)" -ge 2 ]; then
                zcat "$f" | head -1
                break
            fi
        done
        # data rows from every non-empty file
        for f in ${_input}; do
            if [ "$(zcat -f "$f" 2>/dev/null | wc -l)" -ge 2 ]; then
                zcat "$f" | tail -n +2
            fi
        done
    } | $GZ > ${_output}
[overlap_qtl_gwas_1]
parameter: per_chunk = 100
parameter: gwas_meta_path = path()
parameter: qtl_meta_path = path()
parameter: gwas_file_path = ''
parameter: qtl_file_path = ''
# Optional: if a region list is provide the analysis will be focused on provided region. 
# The LAST column of this list will contain the ID of regions to focus on
# Otherwise, all regions with both genotype and phenotype files will be analyzed
parameter: region_list = path()
# Optional: if a region name is provided 
# the analysis would be focused on the union of provides region list and region names
parameter: region_name = []
import pandas as pd
import numpy as np
from pathlib import Path

if qtl_file_path == '':
    qtl_file_path = qtl_meta_path.parent
if gwas_file_path == '':
    gwas_file_path = gwas_meta_path.parent
    
# Load the data, suppressing messages is not typically done in pandas as it does not inherently output messages when loading files
gwas_meta = pd.read_csv(gwas_meta_path, sep='\t', low_memory=False)
gwas_meta = gwas_meta[gwas_meta['conditions_top_loci'].notna()]

qtl_meta = pd.read_csv(qtl_meta_path, sep='\t', low_memory=False)
qtl_meta = qtl_meta[qtl_meta['conditions_top_loci'].notna()]

# Filter and mutate operations, translated to pandas
gwas_meta['combined_data_toploci'] = gwas_meta.apply(lambda row: row['combined_data'] if pd.notnull(row['conditions_top_loci']) else pd.NA, axis=1)

region_ids=[]
# If region_list is provided, read the file and extract IDs
if not region_list.is_dir():
    if region_list.is_file():
        region_list_df = pd.read_csv(region_list, sep='\t', header=None, comment = "#")
        region_ids = region_list_df.iloc[:, -1].unique()  # Extracting the last column for IDs
    else:
        raise ValueError("The region_list path provided is not a file.")
        
if len(region_name) > 0:
    region_ids = list(set(region_ids).union(set(region_name)))
    
if len(region_ids) > 0:
    qtl_meta = qtl_meta[qtl_meta['region_id'].isin(region_ids)]
    
def group_by_region(lst, partition):
    # from itertools import accumulate
    # partition = [len(x) for x in partition]
    # Compute the cumulative sums once
    # cumsum_vector = list(accumulate(partition))
    # Use slicing based on the cumulative sums
    # return [lst[(cumsum_vector[i-1] if i > 0 else 0):cumsum_vector[i]] for i in range(len(partition))]
    return partition

grouped_gwas_meta = {k: v for k, v in gwas_meta.groupby('#chr')}
def check_overlap(gene_row, grouped_gwas_meta):
    chr_group = gene_row['#chr']
    if chr_group in grouped_gwas_meta:
        block_region = grouped_gwas_meta[chr_group]
        overlaps = block_region[
            (block_region['start'] <= gene_row['end']) &
            (block_region['end'] >= gene_row['start'])
        ]
        if not overlaps.empty:
            return ','.join(overlaps['combined_data_toploci'].astype(str))
    return pd.NA

stop_if(len(qtl_meta) == 0, f'No file left for analysis ')

qtl_meta_cand = qtl_meta.apply(lambda row: pd.Series({
    'gwas_file': check_overlap(row, grouped_gwas_meta)
}), axis=1)

# Concatenate the new columns to the original qtl_meta DataFrame
qtl_meta_cand = pd.concat([qtl_meta, qtl_meta_cand], axis=1)
qtl_meta_filtered = qtl_meta_cand[qtl_meta_cand['gwas_file'].notna()]
qtl_meta_filtered = qtl_meta_filtered.dropna(subset=['gwas_file'])


regional_data = {
    'meta': [(row['#chr'], row['start'], row['end'], row['region_id'], row['gwas_file'].split(',')) for _, row in qtl_meta_filtered.iterrows()],
    'qtl_data': [f"{qtl_file_path}/{row['combined_data']}" for _, row in qtl_meta_filtered.iterrows()]
}

meta_info = regional_data['meta']
stop_if(len(regional_data['qtl_data']) == 0, f'No file left for analysis ')

input: regional_data["qtl_data"], group_by = lambda x: group_by_region(x, regional_data["qtl_data"]), group_with = "meta_info"
output: f"{cwd}/gwas_qtl/cache/{name}_gwas_batch_meta_{_meta_info[3]}.tsv"
task: trunk_workers = 1, walltime = '1h', trunk_size = job_size, mem = '16G', cores = 1, tags = f'{_output:bn}'
R: expand = "${ }", container = container, stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout'
    library(tidyverse)
    library(plyr)
    library(data.table)

    # Function to add 'chr' in variants
    add_chr_prefix <- function(var) {
      if (any(grepl("chr", var))) {
        var <- var
      } else {
        var <- paste0("chr", var)
      }
      return(var)
    }

    # Function to check if a dataframe has rows
    has_rows <- function(df) {
      !is.null(df) && nrow(df) > 0
    }


    extract_top_loci <- function(res, include_method = FALSE) {
      all_top_loci <- lapply(names(res), function(region) {
        lapply(names(res[[region]]), function(study) {
          if (include_method) {
            method_results <- lapply(names(res[[region]][[study]]), function(method) {
              top_loci <- NULL
              if (!is.null(res[[region]][[study]][[method]]$top_loci) && nrow(res[[region]][[study]][[method]]$top_loci) > 0) {
                top_loci <- mutate(res[[region]][[study]][[method]]$top_loci, study = study, method = method, region = region)
              }
              return(top_loci)
            })
            return(bind_rows(method_results))
          } else {
            top_loci <- list()
            if (!is.null(res[[region]][[study]]$top_loci) && nrow(res[[region]][[study]]$top_loci) > 0) {
              top_loci[[length(top_loci) + 1]] <- mutate(res[[region]][[study]]$top_loci, study = study, region = region, method = 'top_loci')
            }
            if (!is.null(res[[region]][[study]]$preset_top_loci) && nrow(res[[region]][[study]]$preset_top_loci) > 0) {
              top_loci[[length(top_loci) + 1]] <- mutate(res[[region]][[study]]$preset_top_loci, study = study, region = region, method = 'preset_top_loci')
            }
            return(bind_rows(top_loci))
          }
        })
      }) %>% bind_rows() %>% na.omit()

      return(all_top_loci)
    }

  
    # load data 
    qtl_file = c(${",".join(['"%s"' % x.absolute() for x in _input])})
     # Extract info from each RDS file
    gwas_files = c(${",".join('"%s"' % x for x in _meta_info[4])}) %>% paste0('${gwas_file_path}','/',.)
    # Process GWAS files
    gwas_all_top_loci <- do.call(rbind.fill, lapply(gwas_files, function(file) {
      res <- readRDS(file)
      gwas_all_top_loci <- extract_top_loci(res, include_method = TRUE) 
    }))
    if(!is.null(gwas_all_top_loci) && nrow(gwas_all_top_loci) > 0){# fixme: could remove this judge if we get a solid enough meta
      gwas_all_top_loci <- gwas_all_top_loci%>% select(-c('z'))

      # Process QTL file
      qtl <- readRDS(qtl_file)
      qtl_all_top_loci <- extract_top_loci(qtl) 
      if(!is.null(qtl_all_top_loci) && nrow(qtl_all_top_loci) > 0){# fixme: could remove this judge if we get a solid enough meta
        # qtl_all_top_loci <- qtl_all_top_loci%>% select(-c('betahat','sebetahat','maf'))

        cs_cal <- c('cs_coverage_0.95','cs_coverage_0.7','cs_coverage_0.5')

        qtl_all_var <- qtl_all_top_loci %>%
            #filter(rowSums(.[,cs_cal]) > 0 ) %>% #fixme
            pull(variant_id)

        gwas_all_var <- gwas_all_top_loci %>%
            #filter(rowSums(.[,cs_cal]) > 0 ) %>% #fixme
            pull(variant_id)

        gwas_all_var <- if(any(grepl("chr", qtl_all_var))) add_chr_prefix(gwas_all_var) else gsub("chr", "", gwas_all_var)
        gwas_all_top_loci$variant_id <- gwas_all_var
        # since both qtl and gwas haven mapped to geno ref in exporting, here we intersect them directly
        int_var <- intersect(qtl_all_var, gwas_all_var)
        if(length(int_var) > 0){
            gwas_all_top_loci <- gwas_all_top_loci %>% filter(variant_id %in% int_var) # fixme: keep all gwas variants or intersected ones

            all_top_loci <- rbind.fill(gwas_all_top_loci, qtl_all_top_loci)
            fwrite(all_top_loci,  gsub('_gwas_batch_meta','_gwas_batch_export',${_output:r}))

            new_gwas <- split(gwas_all_top_loci, gwas_all_top_loci$study)
            new_gwas <- lapply(new_gwas, function(df) {
              split(df, df$method)
            })

            qtl[[1]] <- c(qtl[[1]], new_gwas)
            new_qtl_path <-  paste0(${_output:ddr},"/",gsub(".rds",".overlapped.gwas.rds",basename(qtl_file)))
            saveRDS(qtl, new_qtl_path)
  
            block_top_loci = gwas_all_top_loci$region %>% unique %>% paste(., collapse = ',')
            final_combined_data = new_qtl_path %>% basename
        } else {block_top_loci = final_combined_data = NA} # fixme: could remove this judge if we get a solid enough meta
      } else {block_top_loci = final_combined_data = NA} # fixme: could remove this judge if we get a solid enough meta
    } else {
        block_top_loci = final_combined_data = NA
    }
 
    qtl_meta <- suppressMessages(read_delim('${qtl_meta_path}'))
    qtl_meta <- qtl_meta %>% filter(region_id == '${_meta_info[3]}')  %>% mutate(block_top_loci = block_top_loci,
                                                     final_combined_data = final_combined_data)
    fwrite(qtl_meta, ${_output:r})
[overlap_qtl_gwas_2]
# set it as True if you only want run first step (running parallel) and combine them manually after all finished 
parameter: step1_only = False
skip_if(step1_only)
input: group_by = 'all'
output: f"{cwd}/{name}.overlapped.gwas.tsv"
task: trunk_workers = 1, walltime = '1h', trunk_size = job_size, mem = '16G', cores = 1, tags = f'{_output:bn}'
R: expand = "${ }", container = container, stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout'
    library(data.table)
    exp_path <- ${_input[0]:adr}
    meta_files <- c(${",".join(['"%s"' % x.absolute() for x in _input])})
    exp_files <- list.files(exp_path, "_gwas_batch_export", full.names = T)
    meta_list <- exp_list <- list()
    meta_combined <- rbindlist(lapply(meta_files, fread), fill = TRUE)
    exp_combined <- rbindlist(lapply(exp_files, fread), fill = TRUE)
    fwrite(exp_combined, gsub("tsv","export.csv.gz","${_output}"))
    fwrite(meta_combined, "${_output}", sep = '\t')
  
#bash: expand = "${ }", container = container, stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout'
    # rm -rf ${_input[0]:adr}