MASH analysis pipeline with posterior computation#

This notebook applies a fitted MASH model (from mash_fit) to compute posterior quantities (posterior mean, sd, lfsr, etc.) for a list of gene-SNP effect-size chunks, and derives downstream contrast and feature-score summaries. It is designed to run posterior computation in parallel over many input chunks.

Prerequisites#

Requires the MASH model from mash_fit (protocol_example.mash_model.rds).

Description#

For each input chunk (a list of matrices bhat/sbhat/Z), the posterior workflow loads the MASH model and calls mash_compute_posterior_matrices. Additional workflows compute posterior contrasts between conditions and feature-level scores (meta, fine-mapped, n-significant, and p-value pairs) from the contrast results.

Input#

  • --mash-model: a fitted MASH model RDS (the bare mash object), e.g. produced by mash_fit.

  • --analysis-units: a text file whose first column lists paths to posterior-input RDS chunks (one region per line).

  • --posterior-input / --posterior-vhat-files: posterior input chunk(s) (each a list with bhat/sbhat/Z matrices and snp) and matching residual-variance (vhat) matrices.

  • Downstream workflows additionally take --posterior-file and --sum-file produced by the posterior step.

Output#

  • Per-chunk posterior RDS files (*.posterior.rds) containing PosteriorMean, PosteriorSD, lfsr, lfdr, NegativeProb, PosteriorCov (effects x conditions).

  • A manifest list (mash_output_list_all) of the per-chunk posterior files.

  • Contrast and feature-score summary tables from the downstream workflows.

Steps#

The posterior workflow is the entry point; the remaining workflows derive contrasts and feature scores from its outputs. Toy inputs use the protocol_example.* prefix.

Step 1. Compute MASH posteriors for each input chunk listed in the analysis-units file:

Timing (toy chr22 dataset):

  • mash_posterior: ~30 sec

sos run pipeline/mash_posterior.ipynb posterior \
    --cwd output/mash_posterior \
    --analysis-units input/finemapping/protocol_example.analysis_units.txt \
    --mash-model input/mash/protocol_example.mash_model.rds \
    --posterior-vhat-files input/twas/protocol_example.posterior_vhat.rds \
    --data-table-name strong \
    --exclude-condition 1 3

Step 2. Compute posterior contrasts between conditions for the sliced data:

sos run pipeline/mash_posterior.ipynb mash_posterior_contrast \
    --cwd output/mash_posterior \
    --analysis-units input/finemapping/protocol_example.analysis_units.txt \
    --mash-model input/mash/protocol_example.mash_model.rds \
    --posterior-input input/twas/protocol_example.posterior_input.rds

Step 3. Plot the posterior contrast results:

sos run pipeline/mash_posterior.ipynb posterior_cntrast_plot \
    --cwd output/mash_posterior \
    --analysis-units input/finemapping/protocol_example.analysis_units.txt

Step 4. Compute meta feature scores from the contrast results:

sos run pipeline/mash_posterior.ipynb feature_score_meta \
    --cwd output/mash_posterior \
    --analysis-units input/finemapping/protocol_example.analysis_units.txt \
    --posterior-file input/protocol_example.posterior.rds \
    --sum-file input/protocol_example.sumstats.rds

Step 5. Compute feature scores from contrast results using fine-mapped eQTL/pQTL:

sos run pipeline/mash_posterior.ipynb feature_score_finemap \
    --cwd output/mash_posterior \
    --analysis-units input/finemapping/protocol_example.analysis_units.txt \
    --posterior-file input/protocol_example.posterior.rds \
    --sum-file input/protocol_example.sumstats.rds

Step 6. Compute n-significant feature scores from contrast results:

sos run pipeline/mash_posterior.ipynb feature_score_nsig \
    --cwd output/mash_posterior \
    --analysis-units input/finemapping/protocol_example.analysis_units.txt \
    --posterior-file input/protocol_example.posterior.rds \
    --sum-file input/protocol_example.sumstats.rds

Step 7. Compute p-value-pair feature scores from contrast results:

sos run pipeline/mash_posterior.ipynb feature_pval_pair \
    --cwd output/mash_posterior \
    --analysis-units input/finemapping/protocol_example.analysis_units.txt \
    --posterior-file input/protocol_example.posterior.rds \
    --sum-file input/protocol_example.sumstats.rds

Command interface#

sos run pipeline/mash_posterior.ipynb -h

Workflow implementation#

[global]
import os
# Work directory & output directory
parameter: cwd = path('./')
# The filename prefix for output data
parameter: name="test"
parameter: cells = ["Ast","Exc","Inh","Mic","OPC","Oli","DLPFC_pQTL"]#order is important
parameter: group1 = []
parameter: group2 = []
parameter: group3 = []
parameter: job_size = 1
parameter: container = ''
# handle N = per_chunk data-set in one job
parameter: per_chunk = 1
###add for test
parameter: output_prefix = ''
parameter: output_suffix = 'all'
# Exchangable effect (EE) or exchangable z-scores (EZ)
parameter: effect_model = 'EE'
# Identifier of $\hat{V}$ estimate file
# Options are "identity", "simple", "mle", "vhat_corshrink_xcondition", "vhat_simple_specific"
parameter: vhat = 'simple'
parameter: data = path("fastqtl_to_mash_output/FastQTLSumStats.mash.rds")
parameter: p_cut=0.00001

import pandas as pd
data = data.absolute()
cwd = cwd.absolute()

if len(output_prefix) == 0:
    output_prefix = f"{data:bn}"
vhat_data = file_target(f"{cwd:a}/{output_prefix}.{effect_model}.V_{vhat}.rds")
mash_model = file_target(f"{cwd:a}/{output_prefix}.{effect_model}.V_{vhat}.mash_model.rds")
posterior_list = file_target(f"{cwd:a}/{output_prefix}.{effect_model}.{output_suffix}.posterior_list")

def sort_uniq(seq):
    seen = set()
    return [x for x in seq if not (x in seen or seen.add(x))]

Posterior results#

Posterior#

take all the 13K genes, if slice_method = True for those with missing conditions we just drop those corresponding rows and cols in the prior model

# Apply posterior calculations with slice NA and set NaN/Inf 0/1E3, output_posterior_cov = T 
[posterior_1]
parameter: analysis_units = path
regions = [x.replace("\"","").strip().split() for x in open(analysis_units).readlines() if x.strip() and not x.strip().startswith('#')]
parameter: mash_model = path()
mash_model = mash_model.absolute()
parameter: posterior_input = [path(x[0]) for x in regions]
parameter: posterior_vhat_files = paths()
# eg, if data is saved in R list as data$strong, then
# when you specify `--data-table-name strong` it will read the data as
# readRDS('{_input:r}')$strong
parameter: data_table_name = ''
parameter: bhat_table_name = 'bhat'
parameter: shat_table_name = 'sbhat'
parameter: per_chunk = '100'
##  conditions can be excluded if needs arise. If nothing to exclude keep the default 0
parameter: exclude_condition = ["1","3"]

parameter: slice_method = False 
skip_if(len(posterior_input) == 0, msg = "No posterior input data to compute on. Please specify it using --posterior-input.")
fail_if(len(posterior_vhat_files) > 1 and len(posterior_vhat_files) != len(posterior_input), msg = "length of --posterior-input and --posterior-vhat-files do not agree.")
for p in posterior_input:
    fail_if(not p.is_file(), msg = f'Cannot find posterior input file ``{p}``')

input: posterior_input, group_by = per_chunk
output: f"{cwd}/cache/mash_output_list_{_index+1}"
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime,  mem = mem, tags = f'{step_name}_{_output:bn}'  
R: expand = "${ }", workdir = cwd, stderr = f"{_output:n}.stderr", stdout = f"{_output:n}.stdout", container = container
    library(mashr)
    library(dplyr)
    library(stringr)
    #library(ttt)
    handle_nan_etc = function(x) {
      x$bhat[which(is.nan(x$bhat))] = 0
      x$sbhat[which(is.nan(x$sbhat) | is.infinite(x$sbhat))] = 1E3
      return(x)
    }
    # Slice matrices
    slice_and_update_data <- function(data, vhat, snps, samples) {
        data$bhat <- data$bhat[snps, samples] %>% as.matrix
        data$sbhat <- data$sbhat[snps, samples] %>% as.matrix
        data$Z <- data$Z[snps, samples] %>% as.matrix
        vhat <- vhat[samples, samples] %>% as.matrix

        # Filter SNPs and update column names
        data$snp <- data$snp[data$snp %in% snps]
        colnames(data$bhat) <- colnames(data$sbhat) <- colnames(data$Z) <- colnames(vhat) <- samples

        return(list(data = data, vhat = vhat))
    }
  
    # Remove covariance matrices that are not needed
    remove_unnecessary_cov_matrices <- function(cov, all_samples, samples) {
      unwanted_samples <- setdiff(all.samples, samples)
      for (d in names(cov)) {
        if (d %in% unwanted_samples || d %in% paste0("ED_", unwanted_samples)) {
          cov[[d]] <- NULL
        }
      }
      return(cov)
    }

    # Update or adjust the covariance matrices
    adjust_cov_matrices <- function(cov, samples) {
      for (d in names(cov)) {
        if (d %in% samples) {
          cov[[d]] <- matrix(0, length(samples), length(samples))
          cov[[d]][which(samples == d), which(samples == d)] <- 1
        } else if (d == "identity") {
          cov[[d]] <- matrix(0, length(samples), length(samples))
          cov[[d]][1, 1] <- 1  
        } else if (is.null(colnames(cov[[d]]))) {
          cov[[d]] <- cov[[d]][1:length(samples), 1:length(samples)]
        } else {
          cov[[d]] <- cov[[d]][samples, samples]
        }
        cov[[d]] <- as.matrix(cov[[d]])
      }
      return(cov)
    }

    # Main function to update the covariance in the MASH model
    update_mash_model_cov <- function(mash_model, all_samples, samples) {
      cov <- mash_model$fitted_g$Ulist
      # Remove matrices that are not required
      cov <- remove_unnecessary_cov_matrices(cov, all_samples, samples)
  
      # Update or reshape the covariance matrices
      cov <- adjust_cov_matrices(cov, samples)
  
      # Update the covariance matrices in the model
      mash_model$fitted_g$Ulist <- cov
  
      # Update the 'pi' attribute of the model
      unwanted_samples <- setdiff(all.samples, samples)
      for (s in unwanted_samples) {
        mash_model$fitted_g$pi <- mash_model$fitted_g$pi[-grep(s, names(mash_model$fitted_g$pi))]
      }

      return(mash_model)
    }
  
    outlist = data.frame()
    for (f in c(${_input:r,})) try({

      data = readRDS(f)${('$' + data_table_name) if data_table_name else ''}
      data <- handle_nan_etc(data)

      if(c(${",".join(exclude_condition)})[1] > 0 ){
        message(paste("Excluding condition ${exclude_condition} from the analysis"))
        data$bhat = data$bhat[,-c(${",".join(exclude_condition)})]
        data$sbhat = data$sbhat[,-c(${",".join(exclude_condition)})]
        data$Z = data$Z[,-c(${",".join(exclude_condition)})]
      }

      vhat = readRDS("${vhat_data if len(posterior_vhat_files) == 0 else posterior_vhat_files[_index]}")
      mash_model <- readRDS("${mash_model}")
  
      slice_method <- ${'TRUE' if slice_method else 'FALSE'}
      if(slice_method){
        # All additional operations from the second script go here

        all.samples <- colnames(data$bhat)
        all.snps <- rownames(data$bhat)    

        #remove the rows and cols containing NA
        na.test <- data$bhat %>% as.data.frame %>% select_if(~any(!is.na(.))) %>% na.omit %>% as.matrix

        #recording meaningful rows and cols
        samples <- colnames(na.test)
        snps <- rownames(na.test)

        if(length(all.snps)!=length(snps) | length(all.samples)!=length(samples)){
            # slice data matrix
            data <- slice_and_update_data(data, vhat, snps, samples)

            if(length(all.samples)!=length(samples)){
                ##slice the prior
                mash_model <- update_mash_model_cov(mash_model, all_samples, samples)
            }
        }
      }

      mash_data = mash_set_data(data$${bhat_table_name}, Shat=data$${shat_table_name}, alpha=${1 if effect_model == 'EZ' else 0}, V=vhat, zero_Bhat_Shat_reset = 1E3)
      mash_output = mash_compute_posterior_matrices(mash_model, mash_data, output_posterior_cov=TRUE)
      mash_output$snps = data$snps
      samplename <- str_split(f, "/", simplify = T) %>% .[length(.)] %>% gsub('.rds', '', .)
      saveRDS(mash_output, paste0("${_output:d}", "/", samplename, ".posterior.rds"))
      outlist <- rbind(outlist, paste0("${_output:d}", "/", samplename, ".posterior.rds"))

    })
    write.table(outlist, ${_output:r}, col.names=F, row.names=F, quote=F)
[posterior_2]
input: group_by = "all"
output:f"{cwd}/mash_output_list_{output_suffix}"
bash: expand ='${ }', workdir = cwd, stderr = f"{_output:n}.stderr", stdout = f"{_output:n}.stdout"
     cd ${_input[0]:d}
     cat mash_output_list_*[0-9] >> posterior_file_list
     awk -F 'cis_long_table.' '{print $2}' posterior_file_list| awk -F '.posterior.rds' '{print $1}'|paste - posterior_file_list > ${_output:r}
     rm posterior_file_list

Posterior contrast#

  • Add group in this module

    e.g. add MiGA-Microglia-scRNA data to this analysis to supplement the Mic cell count shortage, but the MiGA data comes from four sites with four datasets, and here I handled it in such a way that the different Mic’s were always in the same state during the deviation contrast analysis. If Mic is the one be compared, each Mic sample would be set as (n_populations - 1)/n_Mic instead of n_populations

# perform mash posterior contrast for sliced data
[mash_posterior_contrast_1]
parameter: grouping_recipe = ""
parameter: posterior_file = path
parameter: sum_file = path

# Extract data from posterior_file
paths_posterior = [x.replace("\"","").strip().split()[1] for x in open(posterior_file).readlines() if x.strip() and not x.strip().startswith('#')]
# Create a dictionary from sum_file for quick lookup
dict_sum = dict([(x.replace("\"","").strip().split()[0], x.replace("\"","").strip().split()[1]) for x in open(sum_file).readlines() if x.strip() and not x.strip().startswith('#')])
# Use genes from posterior_file to fetch corresponding paths from sum_file
paths_sum = [dict_sum[x.replace("\"","").strip().split()[0]] for x in open(posterior_file).readlines() if x.strip() and not x.strip().startswith('#')]

input: paths_posterior, paired_with='paths_sum', group_by=1
output: f"{cwd}/{_input:bnn}_posterior_contrast.rds"
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime,  mem = mem, tags = f'{step_name}_{_output:bn}'  
R: expand = "${ }", workdir = cwd, stderr = f"{_output:n}.stderr", stdout = f"{_output:n}.stdout", container = container
    # Load necessary libraries
    library(mashr)
    library(RhpcBLASctl)
    library(magrittr)
    library(tidyverse)
    #library(ttt)

    # Set number of threads for BLAS operations
    blas_set_num_threads(1)

    # Create a function for pairwise contrast columns
    MakePairwiseContrastCols <- function(contrast_left, orig_vector) {
        orig_vector[contrast_left[1]] <- 1
        orig_vector[contrast_left[2]] <- -1
        orig_vector
    }

    # Function to fit contrast data
    FitContrast <- function(index, orig_mean, posterior_mean, posterior_vcov) {
        population_names <- colnames(posterior_mean) %>% str_remove_all("BETA_")

        orig_mean_vector <- orig_mean[index,]
        names(orig_mean_vector) <- population_names
        orig_mean_nonzero <- as.vector(orig_mean_vector != 0)
        orig_mean_tested <- names(orig_mean_vector[orig_mean_nonzero])
        
        if(length(orig_mean_tested)>0){
            n_populations <- length(orig_mean_tested)

            pairwise_vector <- rep(0, n_populations)
            names(pairwise_vector) <- orig_mean_tested

            grouping <- grouping_all[orig_mean_tested]
            if (n_populations > 1) {
                if (n_populations > 2) {
                    #####1. deviation contrast
                    deviation_contrasts <- rep(-1, n_populations^2) %>% matrix(nrow = n_populations, ncol = n_populations)
                    diag(deviation_contrasts) <- n_populations - 1
                    rownames(deviation_contrasts) <- orig_mean_tested
                    colnames(deviation_contrasts) <- orig_mean_tested
                    deviation_contrasts_tested <- deviation_contrasts[, orig_mean_tested]

                    unique_groups <- unique(grouping)
                    for (grp in unique_groups[unique_groups > 0]) {
                       #same celltype (e.g. MIC) with different populations would get 1/n for their weight,
                        diag(deviation_contrasts_tested)[grouping == grp] <- (n_populations - 1) / length(grouping[grouping == grp])
                        deviation_contrasts_tested[grouping == grp, grouping == grp] <- (n_populations - 1) / length(grouping[grouping == grp])
                    }

                    colnames(deviation_contrasts_tested) %<>% str_c("_deviation")

                    ####2. pairwise contrast
                    two_combn <- combn(orig_mean_tested, m = 2)
                    pairwise_names <- apply(two_combn, 2, str_c, collapse = "_vs_")
                    pairwise_contrast <- apply(two_combn, 2, MakePairwiseContrastCols, pairwise_vector)

                    colnames(pairwise_contrast) <- pairwise_names

                    # Create a new matrix to store the adjusted values
                    pairwise_contrast_new <- pairwise_contrast

                    # Loop through each column to archieve such goal: e.g.
                    # microglia populations would get 1/n_Mic for their weight,
                    # and Mic vs Mic would still be 1 vs -1 to estimate the internal difference among microglia datasets
                    for (col in colnames(pairwise_contrast)) {
                      # Split column names to get group names
                      groups <- strsplit(col, "_vs_")[[1]]

                      # Get the grouping values for the two groups
                      group_values <- grouping[names(grouping) %in% groups]

                      # Identify groups with non-zero grouping values
                      relevant_groups <- names(group_values[group_values > 0])

                      # Check if there are multiple distinct groups
                      if (length(unique(group_values)) > 1 && length(relevant_groups) > 0) {
                        distinct_groups <- unique(group_values[group_values > 0])

                        for (distinct_grp in distinct_groups) {
                          # Identify rows belonging to the current group
                          rows_in_group <- names(grouping[grouping == distinct_grp])

                          # Adjust the pairwise_contrast values for each row in the group
                          pairwise_contrast_new[rows_in_group, col] <- pairwise_contrast[rows_in_group[rows_in_group %in% groups], col] / length(rows_in_group)
                        }
                      }
                    }

                    # Replace the original matrix with the new one
                    pairwise_contrast <- pairwise_contrast_new

                    #### 3. combine them
                    contrast_design <- cbind(deviation_contrasts_tested / (n_populations - 1), pairwise_contrast)

                } else {
                    pairwise_vector[orig_mean_tested[1]] <- 1
                    pairwise_vector[orig_mean_tested[2]] <- -1
                    contrast_design <- as.matrix(pairwise_vector)
                    colnames(contrast_design) <- str_c(orig_mean_tested[1], "_vs_", orig_mean_tested[2])
                }

                posterior_mean_subset <- posterior_mean[index,]
                posterior_mean_subset2 <- posterior_mean_subset[orig_mean_tested]
                posterior_vcov_subset <- posterior_vcov[,,index]
                posterior_vcov_subset2 <- posterior_vcov_subset[orig_mean_tested,orig_mean_tested]

                contrast_diff <- t(contrast_design) %*% posterior_mean_subset2
                contrast_vcov <- t(contrast_design) %*% posterior_vcov_subset2 %*% contrast_design
                contrast_se <- diag(contrast_vcov) %>% sqrt

                contrast_p <- 2 * (1 - pnorm(abs(contrast_diff) / contrast_se))

                contrast_diff_df <- t(contrast_diff) %>% as_tibble
                colnames(contrast_diff_df) %<>% str_c("mean_contrast_", .)
                contrast_se_df <- t(contrast_se) %>% as_tibble
                colnames(contrast_se_df) %<>% str_c("se_contrast_", .)
                contrast_p_df <- t(contrast_p) %>% as_tibble
                colnames(contrast_p_df) %<>% str_c("p_contrast_", .)

                contrast_df <- bind_cols(contrast_diff_df, contrast_se_df, contrast_p_df)
            } else if(grouping[orig_mean_tested][1]!=grouping[orig_mean_tested][2]){
                contrast_vector <- rep(NA, length(population_names))
                names(contrast_vector) <- str_c("mean_contrast_", population_names, "_deviation")
                contrast_df <- t(contrast_vector) %>% as_tibble
            }
         
        contrast_df <- contrast_df %>% as.data.frame
        rownames(contrast_df) <- rownames(posterior_mean)[index]
        return(contrast_df)
        }
    
    }

    if(length("${cells}") > 0){
        # All the cells
        cells <- c("${", ".join(cells)}") %>% str_split(., ",", simplify = TRUE) %>% as.character 

        # Automatically set grouping categories based on the recipe, set0 for the celltypes without multiple populations
        grouping_all <- rep(0, length(cells))
        names(grouping_all) <- cells
        
  
        # Read groupings from the recipe
        if(length("${group1}") > 0){
            cell_groups <- list(
              ${"group1 = c(" + ", ".join(["'" + item + "'" for item in group1]) + ")" if len(group1) > 0 else ""} 
              ${", group2 = c(" + ", ".join(["'" + item + "'" for item in group2]) + ")" if len(group2) > 0 else ""} 
              ${", group3 = c(" + ", ".join(["'" + item + "'" for item in group3]) + ")" if len(group3) > 0 else ""}
            )
            if(!is.null(cell_groups)) {
              cell_groups <- map(cell_groups, ~str_split(.x, ",", simplify = TRUE) %>% as.character())
            }
        }
  
        if("${grouping_recipe}" != ""){
            cell_groups <- readLines("${grouping_recipe}")
            cell_groups <- lapply(cell_groups, function(g) strsplit(g, ",")[[1]])
        }

        if(!is.null(cell_groups)){
            for(i in seq_along(cell_groups)) {
              grouping_all[cell_groups[[i]]] <- i
            }
        }
    }

    
    # Read the data files
    orig_data <- read_rds("${_paths_sum[0]}")$bhat
    posterior_data <- read_rds("${_input}")
    posterior_mean <- posterior_data$PosteriorMean
    posterior_cov <- posterior_data$PosteriorCov

    # Align data and clean-up NaN values
    orig_data <- orig_data[, colnames(posterior_mean), drop = FALSE]
    orig_data[which(is.nan(orig_data))] <- 0 # Placeholder for NaNs

    # Apply the FitContrast function and consolidate results
    contrast_result <- map(1:nrow(posterior_mean), FitContrast, orig_data, posterior_mean, posterior_cov) %>% bind_rows %>%
        select(matches("mean_contrast.*deviation"), matches("mean_contrast.*_vs_"), 
               matches("se_contrast.*deviation"), matches("se_contrast.*_vs_"), 
               matches("p_contrast.*deviation"), matches("p_contrast.*_vs_"))
    #rownames(contrast_result) <- rownames(posterior_mean)

    write_rds(contrast_result,  ${_output:r})
# merge the contrast data with slice data
[mash_posterior_contrast_2]
input: group_by = "all"
output: f"{cwd}/posterior_sum.csv"
task: trunk_workers = 1, trunk_size = job_size, walltime = '24h',  mem = '10G', tags = f'{_output:bn}'  

R: expand = "${ }",stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout' 
        library(dplyr)
        library(tidyverse)
        library(ggnewscale)
        
        all.list <- stringr::str_split("${_input}", " ", simplify = T)
       
        p_cut <- ${p_cut} %>% as.numeric

        cells<-c("${",".join(cells)}")%>%str_split(.,",",simplify = T)%>%as.character #ggnewscale cannot use a specified order, I can not find a good way to order them by category for now
        conditions <-combn(cells, m = 2) %>%apply(., 2, str_c, collapse = "_vs_") 
        
        df <- matrix(ncol = length(conditions), nrow = 4) %>% as.data.frame()
        colnames(df) <- conditions
        rownames(df) <- c( "n_sig_snp", "n_snp","n_sig_feature","n_all_feature")
        for (con in conditions) {
            n.all.sig.snp <- n.all.snp <- n.all.sig.feature <- n.all.feature <- 0

            for (i in 1:length(all.list)) {
                tmp <- readRDS(all.list[i])
                p.mtx <- tmp %>% select(matches("p_contrast.*_vs_"))
                p.mtx.con <- p.mtx %>% select(matches(con))
                # print(n.sig.snp)
                if(ncol(p.mtx.con)>0){
                    p.mtx.con<-na.omit(p.mtx.con)
                    n.sig.snp <- sum(p.mtx.con < p_cut)
                    n.snp <- nrow(p.mtx.con)
                } else {
                    n.sig.snp <- n.snp <-0
                }
                # print(n.snp)
                n.sig.feature <- ifelse(n.sig.snp > 0, 1, 0)
                n.feature<- ifelse (n.snp > 0 , 1, 0)

                n.all.sig.snp <- n.sig.snp + n.all.sig.snp
                n.all.snp <- n.all.snp + n.snp
                n.all.sig.feature <- n.all.sig.feature + n.sig.feature
                n.all.feature <- n.all.feature + n.feature
            }
            df[, con] <- c( n.all.sig.snp,n.all.snp, n.all.sig.feature,n.all.feature)
        }
        write.csv(df, "${_output}")
# plot contrast result
[mash_posterior_contrast_3, posterior_cntrast_plot]
input: group_by = "all"
output: f"{cwd}/posterior_sum.png"
task: trunk_workers = 1, trunk_size = job_size, walltime = '24h',  mem = '10G', tags = f'{_output:bn}'  

R: expand = "${ }",stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout' 
        library(dplyr)
        library(tidyverse)
        library(ggnewscale)
        df <- read.csv("${_input:a}",row.names=1)
        colnames(df)<-gsub("DLPFC_","",colnames(df))
        for (i in 1:ncol(df)) {
            con1 <- stringr::str_split(colnames(df)[i], "_vs_", simplify = T)[, 1]
            con2 <- stringr::str_split(colnames(df)[i], "_vs_", simplify = T)[, 2]

            if (con1 > con2) {
                new.name <- paste0(con2, "_vs_", con1)
                colnames(df)[i] <- new.name
            }
        }

        ## summarizxse with approach 1: snp-feature pair
        snp.ratio <- df["n_sig_snp", ] / df["n_snp", ]
        snp.ratio <- snp.ratio %>%
            t() %>%
            as.data.frame()
        colnames(snp.ratio) <- "ratio"
        snp.ratio$group <- "snp"

        ## summarize with approach 2: feature
        fet.ratio <- df["n_sig_feature", ] / df["n_all_feature", ]
        fet.ratio <- fet.ratio %>%
            t() %>%
            as.data.frame()
        rownames(fet.ratio) <- paste0(stringr::str_split(rownames(fet.ratio), "_vs_", simplify = T)[, 2], "_vs_", stringr::str_split(rownames(fet.ratio), "_vs_", simplify = T)[, 1])
        colnames(fet.ratio) <- "ratio"
        fet.ratio$group <- "feature"
        ratio <- rbind(snp.ratio, fet.ratio)

        ## I need to add the below to make it Simmetrie

        cons <- rownames(ratio) %>%
            str_split(., "_vs_", simplify = T) %>%
            .[, 1] %>%
            unique()
        for (i in 1:length(cons)) {
            new.name <- paste0(cons[i], "_vs_", cons[i])
            ratio[new.name, ] <- 0
        }

        ratio$con1 <- stringr::str_split(rownames(ratio), "_vs_", simplify = T)[, 1]
        ratio$con2 <- stringr::str_split(rownames(ratio), "_vs_", simplify = T)[, 2]

        ## prepare for the plot, score1 is for snp-feature pair, score2 is for feature only
        ratio$score1 <- ratio$score2 <- 0
        ratio$score1[ratio$group == "snp"] <- ratio$ratio[ratio$group == "snp"]
        ratio$score2[ratio$group == "feature"] <- ratio$ratio[ratio$group == "feature"]
        ratio$label <- paste0(round(ratio$ratio, 4) * 100, "%")
        ratio$label[ratio$group == 0] <- NA

        # plot
        num_cols <- length(cons)
        height <- width <- 4 + num_cols * 0.5 
        ggplot(ratio[ratio$group == "snp", ], aes(x = con1, y = con2)) +
            geom_tile(aes(fill = score1)) +
            scale_fill_gradient2("SNP_Feature pair",
                low = "#762A83", mid = "white", high = "#1B7837"
            ) +
            new_scale("fill") +
            geom_tile(aes(fill = score2), data = subset(ratio, group != "snp")) +
            scale_fill_gradient2("Feature",
                low = "#1B7837", mid = "white", high = "#762A83"
            ) +
            geom_text(data = ratio, aes(label = label)) +
            theme_bw()
        #geom_text(data=ratio, aes(label = label, color = factor(group))) +theme_bw()
        #ggsave(gsub(".csv",".png",filename))

        ggsave("${_output}",width = width, height = height)

feature_score_meta#

Meta approach: Meta analysis with each cell deviation contrast result of each feature (a loop for each column in deviation results). And then perform meta analysis with pairwise contrasts to get a pvalue to find to understand what the specific differences are. But there is a problem that meta analysis with so many snps would cost too much time to compute, I am trying to downsample for input, Dan suggest to LD prune with a more permissive threshold.

# compute feature score from contrast results
[feature_score_meta_1]
#regions = [x.replace("\"","").strip().split() for x in open(analysis_unit).readlines() if x.strip() and not x.strip().startswith('#')]
parameter: plink_path = ''
parameter: bfile_path = ''
parameter: extract = ''
parameter: window_size = '100'
parameter: step_size = '10'
parameter: r2_threshold = '0.2'
parameter: per_chunk = '100'
parameter: LD_prune = "TRUE"
parameter: downsample_ratio = '1'
parameter: meta_method = 'REML'
parameter: LDcache = ''
parameter: contrast_input = [path(x[0]) for x in regions]
input: contrast_input, group_by = per_chunk
output: f"{cwd}/feature_score_metaLD/cache/mash_posterior_contrast_featurescore{_index+1}.rds"
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime,  mem = mem, tags = f'{step_name}_{_output:bn}'  
R: expand = "${ }", workdir = cwd, stderr = f"{_output:n}.stderr", stdout = f"{_output:n}.stdout", container = container
    # Library Loading
    suppressMessages({
      library(data.table)
      library(tidyverse)
      library(metafor)
    })

    # Environment Configuration
    plink_path <- "${plink_path}"
    bfile_path <- "${bfile_path}"
    indep_pairwise <- paste("${window_size}", "${step_size}", "${r2_threshold}")
    out <- NULL
    set.seed(999)

    # Define helper functions

    plink_ld_pruning <- function(res, chr, bfile_path, LDcache, gene, plink_path, indep_pairwise) {
      extract <- str_c("${LDcache}", gene, "_snp.list")
      output <- str_c("${LDcache}", gene, "_output")
      tmp_contrast_results <- readRDS(res) %>% as.matrix()
      write.table(rownames(tmp_contrast_results), extract, quote = F)
      # system(paste0("rm ", output, "*"), intern = TRUE)
      bfile <- str_c(bfile_path, chr)
      command <- paste(plink_path, "--bfile", bfile, "--extract", extract, "--indep-pairwise", indep_pairwise, "--out", output)
      system(command, intern = TRUE)
      ld_output <- read.table(str_c("${LDcache}", gene, "_output.prune.in"))
      return(tmp_contrast_results[ld_output$V1, ] %>% as.data.table())
    }

    calculate_feature_scores <- function(tmp_contrast_results, meta_method) {
      effect_sizes <- tmp_contrast_results %>% select(matches("mean_contrast.*deviation")) %>% as.matrix()
      se_values <- tmp_contrast_results %>% select(matches("se_contrast.*deviation")) %>% as.matrix()
      feature_scores <- data.table()
      for (i in 1:ncol(effect_sizes)) {
        effect_sizes_condition <- effect_sizes[, i]
        se_values_condition <- se_values[, i]
        absolute_effect_sizes <- abs(as.numeric(effect_sizes_condition))
        pairwise_standard_errors <- as.numeric(se_values_condition)
        meta_result <- rma(yi = absolute_effect_sizes, sei = pairwise_standard_errors, method = meta_method)
        z_scores <- meta_result$b / meta_result$se
        feature_scores_condition <- data.table(ZScore = z_scores)
        feature_scores <- rbindlist(list(feature_scores, feature_scores_condition))
      }
      return(feature_scores)
    }

    # Main Loop
    for (res in c(${_input:r,})) try({
      chr <- res %>% basename() %>% str_extract(., paste0("\\.(.*?)\\.")) %>% str_replace_all("\\.", "") %>% str_replace_all("chr", "")
      gene <- basename(res) %>% stringr::str_split(., "norminal.cis_long_table.", simplify = TRUE) %>% .[, 2] %>% gsub("_posterior_contrast.rds", "", .)

      tmp_contrast_results <- readRDS(res)

      LD_prune <- ${'TRUE' if LD_prune else 'FALSE'}
      if (LD_prune) {
        tmp_contrast_results <- plink_ld_pruning(res, chr, bfile_path, LDcache, gene, plink_path, indep_pairwise)
      }

      if (ncol(tmp_contrast_results %>% select(matches("mean_contrast.*deviation"))) > 0) {
        feature_scores <- calculate_feature_scores(tmp_contrast_results, "${meta_method}")
        colnames(feature_scores) <- gene
        condition_name <- colnames(effect_sizes) %>% gsub("mean_contrast_", "", .) %>% gsub("_deviation", "", .)
        feature_scores <- feature_scores[, condition := condition_name]
        saveRDS(feature_scores, str_c("${_output:d}", "/", gsub(".rds", "_featurescore.rds", basename(res))))
      }

      if (is.null(out)) {
        out <- feature_scores
      } else {
        out <- merge(out, feature_scores, by = "condition", all = TRUE)
      }
    })
    saveRDS(out, "${_output}")

feature_score_finemap#

feature score with fine mapped QTL (the top 1 in each CS)

Fine-mapped approach(more recommend by Dan). pick the top SNP in each CS from each cell type fine mapped results. Then pick the most significant one from contrast result. Get the z-score from that snp, which should be the score of that cell type.

# compute feature score from contrast results with eQTL and pQTL finemapped signals
[feature_score_finemap_1]
#regions = [x.replace("\"","").strip().split() for x in open(analysis_unit).readlines() if x.strip() and not x.strip().startswith('#')]
parameter: per_chunk = '100'
parameter: pfine_path = ''
parameter: efine_path = ''
parameter: efine_suffix = ".unisusie.fit.variant.tsv"
parameter: gene_ref_path = ''
parameter: contrast_input = [path(x[0]) for x in regions]
input: contrast_input, group_by = per_chunk
output: f"{cwd}/feature_score_finemap/cache/mash_posterior_contrast_featurescore{_index+1}.rds"
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime,  mem = mem, tags = f'{step_name}_{_output:bn}'  
R: expand = "${ }", workdir = cwd, stderr = f"{_output:n}.stderr", stdout = f"{_output:n}.stdout", container = container
   # Load necessary libraries
    suppressMessages({
        library(data.table)
        library(tidyverse)
    })

    # Function to calculate scores from fine-mapping results
    score_from_cs <- function(fine.file, contrast_results, con) {
        CSs <- unique(fine.file$cs_order) %>% .[. != 0]
        max.cs.df <- NULL
        for (cs in CSs) {
            tmp <- fine.file[fine.file$cs_order == cs, ]
            max.cs.df <- rbind(max.cs.df, tmp[tmp$pip == max(tmp$pip), ])
        }
        snps <- intersect(rownames(contrast_results), max.cs.df$variants)
        if (length(snps) == 0) return(NA)

        # Extract contrast p-values
        contrast_p <- NULL
        if (ncol(select(contrast_results %>% as.data.frame(), matches(str_c("p_contrast_", con, "_deviation")))) > 0) {
            contrast_p <- contrast_results[snps, str_c("p_contrast_", con, "_deviation"), drop = F]
        } else if (ncol(select(contrast_results %>% as.data.frame(), matches("p_contrast.*_vs_*"))) == 1) {
            contrast_p <- contrast_results %>% as.data.frame() %>% select(matches("p_contrast.*_vs_*")) %>% .[snps, , drop = F]
        }

        if (is.null(contrast_p)) return(NA)

        max.snp <- contrast_p[contrast_p[, 1] == max(contrast_p[, 1]), , drop = F] %>% rownames()
        score <- max(abs(contrast_results %>% as.data.frame() %>% select(matches("mean_contrast.*_vs_*")) %>% .[max.snp, ]) /
                     (contrast_results %>% as.data.frame() %>% select(matches("se_contrast.*_vs_*")) %>% .[max.snp, ]))
        return(score)
    }

    # Read pQTL file
    pfine.mapped.result <- fread("${pfine_path}")
    pfine.mapped.result$Gene <- str_split(pfine.mapped.result$molecular_trait_id, "_", simplify = TRUE)[, 2]
    gene.ref <- read.table("${gene_ref_path}")
    out <- data.table()

    # Loop through each input and compute scores
    for (res in c(${_input:r,})) {
        tmp_contrast_results <- readRDS(res) %>% as.matrix()

        gene <- basename(res) %>%
            str_split(., "norminal.cis_long_table.", simplify = TRUE) %>%
            .[, 2] %>%
            gsub("_posterior_contrast.rds", "", .)
        ensemble <- gene.ref[gene.ref$V5 == gene, ]$V4
        efine.mapped.result <- list.files(path = "${efine_path}", pattern = paste0(ensemble, "${efine_suffix}"), full.names = T)
        pfine.file <- pfine.mapped.result[pfine.mapped.result$Gene == gene & pfine.mapped.result$cs_order > 0, ]

        df <- data.frame()
        if (nrow(pfine.file) > 0) {
            message("Extracting signal from pQTL in ", gene)
            df[gene, "DLPFC_pQTL"] <- score_from_cs(pfine.file, tmp_contrast_results, "DLPFC_pQTL")
        }
        if (length(efine.mapped.result) > 0) {
            fine.conditions <- basename(efine.mapped.result) %>%
                sub("demo.", "", .) %>%
                sub(paste0(".", ensemble, "${efine_suffix}"), "", .) %>%
                .[. != "ALL"] %>%
                .[. != "End"]
            for (con in fine.conditions) {
                con.file <- efine.mapped.result[grep(con, efine.mapped.result)] %>% fread()
                con.fine.file <- con.file[con.file$cs_order > 0, ]
                if (nrow(con.fine.file) > 0) {
                    df[gene, con] <- score_from_cs(con.fine.file, tmp_contrast_results, con)
                }
            }
        }
        saveRDS(df, str_c("${cwd}","/feature_score_finemap/cache/",gsub(".rds","_featurescore.rds",basename(res))))
        out <- rbindlist(list(out, as.data.table(df, keep.rownames = TRUE)), use.names = TRUE, fill = TRUE)
    }
    saveRDS(out, "${_output}")
# merge the feature score from contrast results with eQTL and pQTL finemapped signals
[feature_score_finemap_2]
input: group_by = "all"
output: f"{cwd}/feature_score_finemap/posterior_feature_score_sum.csv"
task: trunk_workers = 1, trunk_size = job_size, walltime = '24h',  mem = '10G', tags = f'{_output:bn}'  

R: expand = "${ }",stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout' 
    suppressMessages(library(data.table))
    suppressMessages(library(tidyverse))
    
    out <- data.table()

    all.list <- stringr::str_split("${_input}", " ", simplify = T)
    for (i in all.list) {
        feature_scores <- readRDS(i)
        # Print the feature scores for each condition
        if (!is.null(feature_scores)) {
           if (is.null(out)) {
                out <- as.data.table(feature_scores, keep.rownames = TRUE)
              } else {
                out <-  rbindlist(list(out, as.data.table(feature_scores, keep.rownames = TRUE)), use.names = TRUE, fill = TRUE)
              }
        }
    }
    write.csv(out, "${_output}")

feature_score_nsig#

Calculate the number of meaningful SNPs in each feature of each cell type (actually, ratio)

# compute feature score from contrast results
[feature_score_nsig_1]
#regions = [x.replace("\"","").strip().split() for x in open(analysis_unit).readlines() if x.strip() and not x.strip().startswith('#')]
parameter: per_chunk = '100'
parameter: contrast_input = [path(x[0]) for x in regions]
input: contrast_input, group_by = per_chunk
output: f"{cwd}/feature_score_nsig/cache/mash_posterior_contrast_featurescore_nsig{_index+1}.rds"
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime,  mem = mem, tags = f'{step_name}_{_output:bn}'  
R: expand = "${ }", workdir = cwd, stderr = f"{_output:n}.stderr", stdout = f"{_output:n}.stdout", container = container
   # Load required libraries
    suppressMessages({
        library(data.table)
        library(tidyverse)
        library(metafor)
    })

    # Convert p_cut to numeric
    p_cut <- ${p_cut} %>% as.numeric

    # Initialize the output data table
    out <- NULL
    set.seed(999)

    # Function to extract the gene name from the file path
    get_gene_name <- function(file_path) {
        basename(file_path) %>%
            stringr::str_split(., "norminal.cis_long_table.", simplify = TRUE) %>%
            .[, 2] %>%
            gsub("_posterior_contrast.rds", "", .)
    }

    # Loop through each input RDS file
    for (res in c(${_input:r,})) {
        try({
            # Extract gene name
            gene <- get_gene_name(res)
            print(gene)

            # Load the contrast results and filter columns of interest
            tmp_contrast_results <- readRDS(res) %>%
                select(matches("p_contrast_.*deviation")) %>%
                as.matrix()

            # Initialize the feature output data table
            feature.out <- data.table()

            # Calculate the ratio for each condition
            for (i in 1:ncol(tmp_contrast_results)) {
                condition_name <- colnames(tmp_contrast_results)[i] %>%
                    gsub("p_contrast_", "", .) %>%
                    gsub("_deviation", "", .)

                n_sig_snp <- sum(tmp_contrast_results[, i] < p_cut, na.rm = TRUE)
                n_snp <- sum(!is.na(tmp_contrast_results[, i]))

                ratio <- n_sig_snp / n_snp
                feature.out <- rbind(feature.out, data.table(gene = gene, condition = condition_name, ratio = ratio))
            }

            # Save individual gene results
            saveRDS(feature.out, paste0("${_output:d}", "/", gsub(".rds", "_n_sig_ratio.rds", basename(res))))

            # Merge with the overall output
            if (is.null(out)) {
                out <- feature.out
            } else {
                out <- merge(out, feature.out, by = "condition", all = TRUE)
            }
        }, silent = TRUE) # Error handling to proceed even if one iteration fails
    }

    # Save the combined output
    saveRDS(out, "${_output}")

feature_pval_pair#

find the specific different cell type pair

perform meta analysis with pairwise contrasts to get a pvalue to find to understand what the specific differences are.

metaanalysis return a warning as “Warning message: “Ratio of largest to smallest sampling variance extremely large. May not be able to obtain stable results.”” Which is due to the big difference between max(pairwise_standard_errors^2) and min(pairwise_standard_errors^2) So I have delete the snps with small pairwise_standard_errors as Xuewei suggested

Anticipated Results#

Produces protocol_example.mash_posterior.rds with posterior effect size estimates across conditions. Expect condition-specific enrichment patterns in the Mic/Ast De_Jager cell types for the toy region.

# compute feature score from contrast results
[feature_pval_pair_1]
#regions = [x.replace("\"","").strip().split() for x in open(analysis_unit).readlines() if x.strip() and not x.strip().startswith('#')]
parameter: plink_path = ''
parameter: bfile_path = ''
parameter: extract = ''
parameter: window_size = '100'
parameter: step_size = '10'
parameter: r2_threshold = '0.2'
parameter: per_chunk = '100'
parameter: LD_prune = True
parameter: downsample_ratio = '1'
parameter: meta_method = 'REML'
parameter: se_cutoff = '1E-03'
parameter: LDcache = ''
parameter: contrast_input = [path(x[0]) for x in regions]
input: contrast_input, group_by = per_chunk
output: f"{cwd}/feature_pval_pair/cache/mash_posterior_contrast_featurescore{_index+1}.rds"
task: trunk_workers = 1, trunk_size = job_size, walltime = walltime,  mem = mem, tags = f'{step_name}_{_output:bn}'  
R: expand = "${ }", workdir = cwd, stderr = f"{_output:n}.stderr", stdout = f"{_output:n}.stdout", container = container
    # Library Loading
    suppressMessages({
      library(data.table)
      library(tidyverse)
      library(metafor)
    })

    # Initializations
    out <- NULL
    set.seed(999)

    # Helper function: extract gene name from result
    extract_gene <- function(res) {
      basename(res) %>%
        stringr::str_split(., "norminal.cis_long_table.", simplify = TRUE) %>%
        .[, 2] %>%
        gsub("_posterior_contrast.rds", "", .)
    }

    # Helper function: perform meta analysis per cell
    meta_analysis_per_cell <- function(effect_sizes, se_values, gene, se_cutoff) {
      conditions <- colnames(effect_sizes) %>%
        sub("mean_contrast_", "", .) %>%
        unique()
      cells <- c(sub("_vs_.*", "", conditions), sub(".*_vs_", "", conditions)) %>% unique()

      df <- data.table()
      for (cell in cells) {
        cell.b <- effect_sizes[, grep(cell, colnames(effect_sizes)), drop = F]
        cell.se <- se_values[, grep(cell, colnames(se_values)), drop = F]
        feature_pvals <- data.table()
        for (i in 1:ncol(cell.b)) {
          effect_sizes_condition <- cell.b[, i]
          se_values_condition <- cell.b[, i]
          # Filter out based on se cutoff
          effect_sizes_condition <- effect_sizes_condition[which(se_values_condition > as.numeric(se_cutoff))]
          se_values_condition <- se_values_condition[which(se_values_condition > as.numeric(se_cutoff))]
          # Meta-analysis
          absolute_effect_sizes <- abs(as.numeric(effect_sizes_condition))
          pairwise_standard_errors <- as.numeric(se_values_condition)
          meta_result <- rma(yi = absolute_effect_sizes, sei = pairwise_standard_errors, method = "REML")
          feature_pval_condition <- data.table(pavlue = meta_result$pval)
          feature_pvals <- rbindlist(list(feature_pvals, feature_pval_condition))
        }
        colnames(feature_pvals) <- gene
        condition_name <- colnames(cell.b) %>%
          gsub("mean_contrast_", "", .)
        feature_pvals <- feature_pvals[, condition := condition_name]
        df <- rbindlist(list(df, feature_pvals))
      }
      return(df)
    }

    # Main Loop
    for (res in c(${_input:r,})) {
      gene <- extract_gene(res)
      print(gene)
      tmp_contrast_results <- readRDS(res)
      ld_output <- read.table(str_c("${LDcache}", gene, "_output.prune.in"))
      tmp_contrast_results <- tmp_contrast_results[ld_output$V1, ] %>% as.data.table(keep.rownames = T)

      effect_sizes <- tmp_contrast_results %>% select(matches("mean_contrast.*_vs_*")) %>% as.matrix()
      se_values <- tmp_contrast_results %>% select(matches("se_contrast.*_vs_*")) %>% as.matrix()

      df <- meta_analysis_per_cell(effect_sizes, se_values, gene, "${se_cutoff}")
      saveRDS(df, str_c("${_output:d}", "/", gsub(".rds", "_featurescore_pw.rds", basename(res))))

      if (is.null(out)) {
        out <- df
      } else {
        out <- merge(out, df, by = "condition", all = TRUE, allow.cartesian = TRUE)
      }
    }
    saveRDS(out, "${_output}")
# merge the feature score from contrast results
[feature_pval_pair_2, feature_score_meta_2, feature_score_nsig_2]
input: group_by = "all"
output: f"{cwd}/feature_pval_pair/posterior_feature_score_sum.csv"
task: trunk_workers = 1, trunk_size = job_size, walltime = '24h',  mem = '10G', tags = f'{_output:bn}'  

R: expand = "${ }",stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout' 
    library(dplyr)
    library(tidyverse)
    out <- NULL

    all.list <- stringr::str_split("${_input}", " ", simplify = T)
    for (i in all.list) {
        feature_scores <- readRDS(i)
        # Print the feature scores for each condition
        if (!is.null(feature_scores)) {
            if (is.null(out)) {
                out <- feature_scores
            } else {
                out <- merge(out, feature_scores, by = "condition", all = TRUE)
            }
        }
    }
      out<-t(out)
      #out<-out[-1,]
    write.csv(out, "${_output}",col.names=F)