{ "cells": [ { "cell_type": "markdown", "metadata": { "kernel": "SoS" }, "source": [ "# A multivariate EBNM approach for mixture multivariate distribution estimate\n", "\n", "In this notebook, we compute a prior independent of the specific analysis method chosen for the data. This foundational step enables the application of various techniques, such as UDR, ED, TED, and initialization with FLASHier, among others. Essentially, our goal is to establish a mixture model to extract meaningful signals from the data.\n", "\n", "An earlier version of the approach is outlined in Urbut et al 2019. This workflow implements a few improvements including using additional EBMF methods as well as the new `udr` package (latest update: May 2023) to fit the mixture model." ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "kernel": "SoS" }, "outputs": [ { "data": { "text/html": [ "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "
RevisionAuthorDateMessage
75d03afGao Wang2023-10-11Improve mash analysis codes
c35109bGao Wang2023-10-11Update mash pipeline
7203e2bGao Wang2023-10-10Update container entrypoint to allow for different conventions to specify containers
80ec945Gao Wang2023-10-10clean up mashr codes
3b8db47rfeng20232023-10-09update mash
3fb13c4rfeng20232023-05-30update mash
309e04cGao Wang2022-02-07Rename pipeline folder
" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "%revisions -s" ] }, { "cell_type": "markdown", "metadata": { "kernel": "SoS" }, "source": [ "## Overview of approach\n", "\n", "1. Factor analysis;\n", "2. Estimate residual variance;\n", "3. Compute MASH prior;\n", "4. Prior plots\n", "\n", "## Input\n", "formatted association summary statistics \n", "\n", "## Output\n", "1. residual variance\n", "2. MASH prior\n", "3. Prior plots" ] }, { "cell_type": "markdown", "metadata": { "kernel": "SoS" }, "source": [ "## Methods\n", "\n", "Below is a \"blackbox\" implementation to compute data-driven prior mixture for MASH model -- blackbox in the sense that you can run this pipeline as an executable, without thinking too much about it, if you see your problem fits the context of Urbut 2019. However when reading it as a notebook it is a good source of information to help developing your own `mashr` analysis procedures.\n", "\n", "### [UD](https://stephenslab.github.io/udr/) \n", "\n", "Implement fast statistical algorithms for solving the multivariate normal means problem via empirical Bayes, building on the Extreme Deconvolution method.\n" ] }, { "cell_type": "markdown", "metadata": { "kernel": "SoS" }, "source": [ "## Minimal working example\n", "\n", "Data-set available [on synapse.org](https://www.synapse.org/#!Synapse:syn52623812).\n", "\n", "**FIXME: this part will be later based on fine-mapping results to pick the top signals. For now, it is fine to run this pipeline. You can also find the output of this step in the `output` folder of the minimal working example on synapse.org (link above) so you can take that and skip this step.**" ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "kernel": "Bash", "tags": [] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "INFO: Running \u001b[32mextract_effects_1\u001b[0m: extract data for MASH from summary stats\n", "INFO: \u001b[32mextract_effects_1\u001b[0m is \u001b[32mcompleted\u001b[0m.\n", "INFO: \u001b[32mextract_effects_1\u001b[0m output: \u001b[32moutput/protocol_example.mashr_input/cache/protocol_example.mashr_input_batch1.rds\u001b[0m\n", "INFO: Running \u001b[32mextract_effects_2\u001b[0m: \n", "INFO: \u001b[32mextract_effects_2\u001b[0m is \u001b[32mcompleted\u001b[0m.\n", "INFO: \u001b[32mextract_effects_2\u001b[0m output: \u001b[32moutput/protocol_example.mashr_input.rds\u001b[0m\n", "INFO: Workflow extract_effects (ID=w3de7d69402b55380) is executed successfully with 2 completed steps.\n" ] } ], "source": [ "sos run pipeline/extract_effects.ipynb extract_effects \\\n", " --name protocol_example.mashr_input \\\n", " --analysis-units <(cat protocol_example.mashr.list | cut -f 2 ) \\\n", " --need-genename TRUE \\\n", " --sum-stat protocol_example.mashr.list" ] }, { "cell_type": "markdown", "metadata": { "kernel": "Bash" }, "source": [ "Mixture with UD method," ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "kernel": "Bash" }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "INFO: Running \u001b[32mud\u001b[0m: Latest update: May/2023\n", "INFO: Running \u001b[32mvhat_simple\u001b[0m: V estimate: \"simple\" method (using null z-scores)\n", "INFO: \u001b[32mvhat_simple\u001b[0m is \u001b[32mcompleted\u001b[0m.\n", "INFO: \u001b[32mvhat_simple\u001b[0m output: \u001b[32m/home/gw/Documents/xQTL/output/mashr/protocol_example.ud_mixture.EZ.V_simple.rds\u001b[0m\n", "INFO: \u001b[32mud\u001b[0m is \u001b[32mcompleted\u001b[0m.\n", "INFO: \u001b[32mud\u001b[0m output: \u001b[32m/home/gw/Documents/xQTL/output/mashr/protocol_example.ud_mixture.EZ.prior.rds\u001b[0m\n", "INFO: Workflow ud (ID=w9f192fe3274fcffe) is executed successfully with 2 completed steps.\n" ] } ], "source": [ "sos run pipeline/mixture_prior.ipynb ud \\\n", " --output-prefix protocol_example.ud_mixture \\\n", " --data output/protocol_example.mashr_input.rds \\\n", " --cwd output/mashr \\\n", " --container oras://ghcr.io/cumc/stephenslab_apptainer:latest -s force" ] }, { "cell_type": "markdown", "metadata": { "kernel": "Bash" }, "source": [ "Mixture with ED method," ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "kernel": "Bash" }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "INFO: Running \u001b[32med_bovy\u001b[0m: \n", "INFO: Running \u001b[32mvhat_simple\u001b[0m: V estimate: \"simple\" method (using null z-scores)\n", "INFO: \u001b[32mvhat_simple\u001b[0m is \u001b[32mcompleted\u001b[0m.\n", "INFO: \u001b[32mvhat_simple\u001b[0m output: \u001b[32m/home/gw/Documents/xQTL/output/mashr/protocol_example.ed_mixture.EZ.V_simple.rds\u001b[0m\n", "INFO: Running \u001b[32mflash\u001b[0m: Perform FLASH analysis with non-negative factor constraint (time estimate: 20min)\n", "INFO: \u001b[32mflash\u001b[0m is \u001b[32mcompleted\u001b[0m.\n", "INFO: \u001b[32mflash\u001b[0m output: \u001b[32m/home/gw/Documents/xQTL/output/mashr/protocol_example.ed_mixture.flash.rds\u001b[0m\n", "INFO: Running \u001b[32mflash_nonneg\u001b[0m: Perform FLASH analysis with non-negative factor constraint (time estimate: 20min)\n", "INFO: \u001b[32mflash_nonneg\u001b[0m is \u001b[32mcompleted\u001b[0m.\n", "INFO: \u001b[32mflash_nonneg\u001b[0m output: \u001b[32m/home/gw/Documents/xQTL/output/mashr/protocol_example.ed_mixture.flash_nonneg.rds\u001b[0m\n", "INFO: Running \u001b[32mpca\u001b[0m: \n", "INFO: \u001b[32mpca\u001b[0m is \u001b[32mcompleted\u001b[0m.\n", "INFO: \u001b[32mpca\u001b[0m output: \u001b[32m/home/gw/Documents/xQTL/output/mashr/protocol_example.ed_mixture.pca.rds\u001b[0m\n", "INFO: Running \u001b[32mcanonical\u001b[0m: \n", "INFO: \u001b[32mcanonical\u001b[0m is \u001b[32mcompleted\u001b[0m.\n", "INFO: \u001b[32mcanonical\u001b[0m output: \u001b[32m/home/gw/Documents/xQTL/output/mashr/protocol_example.ed_mixture.canonical.rds\u001b[0m\n", "INFO: \u001b[32med_bovy\u001b[0m is \u001b[32mcompleted\u001b[0m.\n", "INFO: \u001b[32med_bovy\u001b[0m output: \u001b[32m/home/gw/Documents/xQTL/output/mashr/protocol_example.ed_mixture.EZ.prior.rds\u001b[0m\n", "INFO: Workflow ed_bovy (ID=w744b5ce9803a7efe) is executed successfully with 6 completed steps.\n" ] } ], "source": [ "sos run pipeline/mixture_prior.ipynb ed_bovy \\\n", " --output-prefix protocol_example.ed_mixture \\\n", " --data output/protocol_example.mashr_input.rds \\\n", " --cwd output/mashr \\\n", " --container oras://ghcr.io/cumc/stephenslab_apptainer:latest" ] }, { "cell_type": "markdown", "metadata": { "kernel": "Bash" }, "source": [ "Make plots," ] }, { "cell_type": "code", "execution_count": 10, "metadata": { "kernel": "Bash" }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "INFO: Running \u001b[32mplot_U\u001b[0m: \n", "\u001b[91mERROR\u001b[0m: \u001b[91mplot_U (id=57d9dbce2d7c753d) returns an error.\u001b[0m\n", "\u001b[91mERROR\u001b[0m: \u001b[91m[plot_U]: [0]: Executing script in Singularity returns an error (exitcode=1, stderr=/home/gw/Documents/xQTL/output/mashr/protocol_example.ud_mixture.EZ.prior.stderr, stdout=/home/gw/Documents/xQTL/output/mashr/protocol_example.ud_mixture.EZ.prior.stdout).\n", "The script has been saved to /home/gw/.sos/c42794ff2d041d88//home/gw/.sos/c42794ff2d041d88.To reproduce the error please run:\n", "\u001b[0m\u001b[32msingularity exec /home/gw/.sos/singularity/library/ghcr.io-cumc-stephenslab_apptainer-latest.sif micromamba run -a \"\" -n stephenslab Rscript /home/gw/.sos/c42794ff2d041d88/singularity_run_2327875.R\u001b[0m\u001b[91m\u001b[0m\n" ] }, { "ename": "", "evalue": "1", "output_type": "error", "traceback": [] } ], "source": [ "sos run pipeline/mixture_prior.ipynb plot_U \\\n", " --output-prefix protocol_example.mixture_plots \\\n", " --model-data output/mashr/protocol_example.ud_mixture.EZ.prior.rds \\\n", " --cwd output/mashr \\\n", " --container oras://ghcr.io/cumc/stephenslab_apptainer:latest" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "kernel": "Bash" }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "INFO: Running \u001b[32mplot_U\u001b[0m: \n", "INFO: \u001b[32mplot_U\u001b[0m is \u001b[32mcompleted\u001b[0m.\n", "INFO: \u001b[32mplot_U\u001b[0m output: \u001b[32m/home/gw/Documents/xQTL/output/mashr/protocol_example.ed_mixture.EZ.prior.pdf\u001b[0m\n", "INFO: Workflow plot_U (ID=wf81e5b819c0398c1) is executed successfully with 1 completed step.\n" ] } ], "source": [ "sos run pipeline/mixture_prior.ipynb plot_U \\\n", " --output-prefix protocol_example.mixture_plots \\\n", " --model-data output/mashr/protocol_example.ed_mixture.EZ.prior.rds \\\n", " --cwd output/mashr \\\n", " --container oras://ghcr.io/cumc/stephenslab_apptainer:latest" ] }, { "cell_type": "code", "execution_count": 11, "metadata": { "kernel": "Bash" }, "outputs": [ { "data": { "text/html": [ "
> output/mashr/protocol_example.ed_mixture.EZ.prior.pdf (24.6 KiB):
" ], "text/plain": [ "\n", "> output/mashr/protocol_example.ed_mixture.EZ.prior.pdf (24.6 KiB):" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "image/png": "" }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/html": [ "
> output/mashr/protocol_example.ed_mixture.EZ.prior.pdf:
" ], "text/plain": [ ">>> output/mashr/protocol_example.ed_mixture.EZ.prior.pdf:\n" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "bash: output/mashr/protocol_example.ed_mixture.EZ.prior.pdf: Permission denied\n" ] } ], "source": [ "%preview -n -s png output/mashr/protocol_example.ed_mixture.EZ.prior.pdf" ] }, { "cell_type": "markdown", "metadata": { "kernel": "SoS", "tags": [] }, "source": [ "## Global parameters" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "kernel": "SoS" }, "outputs": [], "source": [ "[global]\n", "parameter: cwd = path('./mashr_workflow_output')\n", "# Input summary statistics data\n", "parameter: data = path\n", "# Prefix of output files. If not specified, it will derive it from data.\n", "# If it is specified, for example, `--output-prefix AnalysisResults`\n", "# It will save output files as `{cwd}/AnalysisResults*`.\n", "parameter: output_prefix = ''\n", "parameter: output_suffix = 'all'\n", "# Exchangable effect (EE) or exchangable z-scores (EZ)\n", "parameter: effect_model = 'EE'\n", "# Identifier of $\\hat{V}$ estimate file\n", "# Options are \"identity\", \"simple\", \"mle\", \"vhat_corshrink_xcondition\", \"vhat_simple_specific\"\n", "parameter: vhat = 'simple'\n", "parameter: mixture_components = ['flash', 'flash_nonneg', 'pca',\"canonical\"]\n", "parameter: container = \"\"\n", "# For cluster jobs, number commands to run per job\n", "parameter: job_size = 1\n", "# Wall clock time expected\n", "parameter: walltime = \"5h\"\n", "# Memory expected\n", "parameter: mem = \"16G\"\n", "# Number of threads\n", "parameter: numThreads = 1\n", "import re\n", "parameter: entrypoint= ('micromamba run -a \"\" -n' + ' ' + re.sub(r'(_apptainer:latest|_docker:latest|\\.sif)$', '', container.split('/')[-1])) if container else \"\"\n", "data = data.absolute()\n", "cwd = cwd.absolute()\n", "if len(output_prefix) == 0:\n", " output_prefix = f\"{data:bn}\"\n", "prior_data = file_target(f\"{cwd:a}/{output_prefix}.{effect_model}.prior.rds\")\n", "vhat_data = file_target(f\"{cwd:a}/{output_prefix}.{effect_model}.V_{vhat}.rds\")\n", "\n", "def sort_uniq(seq):\n", " seen = set()\n", " return [x for x in seq if not (x in seen or seen.add(x))]" ] }, { "cell_type": "markdown", "metadata": { "kernel": "Bash" }, "source": [ "## Factor analyses" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "kernel": "SoS" }, "outputs": [], "source": [ "# Perform FLASH analysis with non-negative factor constraint\n", "[flash]\n", "input: data\n", "output: f\"{cwd}/{output_prefix}.flash.rds\"\n", "task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, tags = f'{step_name}_{_output:bn}' \n", "R: expand = \"${ }\", stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout', container = container, entrypoint=entrypoint\n", " dat = readRDS(${_input:r})\n", " dat = mashr::mash_set_data(dat$strong.b, Shat=dat$strong.s, alpha=${1 if effect_model == 'EZ' else 0}, zero_Bhat_Shat_reset = 1E3)\n", " res = mashr::cov_flash(dat, factors=\"default\", remove_singleton=${\"TRUE\" if \"canonical\" in mixture_components else \"FALSE\"}, output_model=\"${_output:n}.model.rds\")\n", " saveRDS(res, ${_output:r})" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "kernel": "SoS" }, "outputs": [], "source": [ "# Perform FLASH analysis with non-negative factor constraint\n", "[flash_nonneg]\n", "input: data\n", "output: f\"{cwd}/{output_prefix}.flash_nonneg.rds\"\n", "task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, tags = f'{step_name}_{_output:bn}' \n", "R: expand = \"${ }\", stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout', container = container, entrypoint=entrypoint\n", " dat = readRDS(${_input:r})\n", " dat = mashr::mash_set_data(dat$strong.b, Shat=dat$strong.s, alpha=${1 if effect_model == 'EZ' else 0}, zero_Bhat_Shat_reset = 1E3)\n", " res = mashr::cov_flash(dat, factors=\"nonneg\", remove_singleton=${\"TRUE\" if \"canonical\" in mixture_components else \"FALSE\"}, output_model=\"${_output:n}.model.rds\")\n", " saveRDS(res, ${_output:r})" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "kernel": "SoS" }, "outputs": [], "source": [ "[pca]\n", "# Number of components in PCA analysis for prior\n", "# set to 3 as in mash paper\n", "parameter: npc = 2\n", "input: data\n", "output: f\"{cwd}/{output_prefix}.pca.rds\"\n", "task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, tags = f'{step_name}_{_output:bn}' \n", "R: expand = \"${ }\", stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout', container = container, entrypoint=entrypoint\n", " dat = readRDS(${_input:r})\n", " dat = mashr::mash_set_data(dat$strong.b, Shat=dat$strong.s, alpha=${1 if effect_model == 'EZ' else 0}, zero_Bhat_Shat_reset = 1E3)\n", " res = mashr::cov_pca(dat, ${npc})\n", " saveRDS(res, ${_output:r})" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "kernel": "SoS" }, "outputs": [], "source": [ "[canonical]\n", "input: data\n", "output: f\"{cwd}/{output_prefix}.canonical.rds\"\n", "task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, tags = f'{step_name}_{_output:bn}' \n", "R: expand = \"${ }\", stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout', container = container, entrypoint=entrypoint\n", " library(\"mashr\")\n", " dat = readRDS(${_input:r})\n", " dat = mashr::mash_set_data(dat$strong.b, Shat=dat$strong.s, alpha=${1 if effect_model == 'EZ' else 0}, zero_Bhat_Shat_reset = 1E3)\n", " res = mashr::cov_canonical(dat)\n", " saveRDS(res, ${_output:r})" ] }, { "cell_type": "markdown", "metadata": { "kernel": "SoS" }, "source": [ "## Estimate residual variance\n" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "kernel": "SoS" }, "outputs": [], "source": [ "# V estimate: \"identity\" method\n", "[vhat_identity]\n", "input: data\n", "output: f'{vhat_data:nn}.V_identity.rds'\n", "task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, tags = f'{step_name}_{_output:bn}' \n", "R: expand = \"${ }\", workdir = cwd, stderr = f\"{_output:n}.stderr\", stdout = f\"{_output:n}.stdout\", container = container, entrypoint=entrypoint\n", " dat = readRDS(${_input:r})\n", " saveRDS(diag(ncol(dat$random.b)), ${_output:r})" ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "kernel": "SoS" }, "outputs": [], "source": [ "# V estimate: \"simple\" method (using null z-scores)\n", "[vhat_simple]\n", "input: data\n", "output: f'{vhat_data:nn}.V_simple.rds'\n", "task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, tags = f'{step_name}_{_output:bn}' \n", "R: expand = \"${ }\", workdir = cwd, stderr = f\"{_output:n}.stderr\", stdout = f\"{_output:n}.stdout\", container = container, entrypoint=entrypoint\n", " library(mashr)\n", " dat = readRDS(${_input:r})\n", " vhat = estimate_null_correlation_simple(mash_set_data(dat$null.b, Shat=dat$null.s, alpha=${1 if effect_model == 'EZ' else 0}, zero_Bhat_Shat_reset = 1E3))\n", " saveRDS(vhat, ${_output:r})" ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "kernel": "SoS" }, "outputs": [], "source": [ "# V estimate: \"mle\" method\n", "[vhat_mle]\n", "# number of samples to use\n", "parameter: n_subset = 6000\n", "# maximum number of iterations\n", "parameter: max_iter = 6\n", "\n", "input: data, prior_data\n", "output: f'{vhat_data:nn}.V_mle.rds'\n", "task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, tags = f'{step_name}_{_output:bn}' \n", "R: expand = \"${ }\", workdir = cwd, stderr = f\"{_output:n}.stderr\", stdout = f\"{_output:n}.stdout\", container = container, entrypoint=entrypoint\n", " library(mashr)\n", " dat = readRDS(${_input[0]:r})\n", " # choose random subset\n", " set.seed(1)\n", " random.subset = sample(1:nrow(dat$random.b), min(${n_subset}, nrow(dat$random.b)))\n", " random.subset = mash_set_data(dat$random.b[random.subset,], dat$random.s[random.subset,], alpha=${1 if effect_model == 'EZ' else 0}, zero_Bhat_Shat_reset = 1E3)\n", " # estimate V mle\n", " vhatprior = mash_estimate_corr_em(random.subset, readRDS(${_input[1]:r})$U, max_iter = ${max_iter})\n", " vhat = vhatprior$V\n", " saveRDS(vhat, ${_output:r})" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "kernel": "SoS" }, "outputs": [], "source": [ "# Estimate each V separately via corshrink\n", "[vhat_corshrink_xcondition_1]\n", "# Utility script\n", "parameter: util_script = path('/project/mstephens/gtex/scripts/SumstatQuery.R')\n", "# List of genes to analyze\n", "parameter: gene_list = path()\n", "\n", "fail_if(not gene_list.is_file(), msg = 'Please specify valid path for --gene-list')\n", "fail_if(not util_script.is_file() and len(str(util_script)), msg = 'Please specify valid path for --util-script')\n", "genes = sort_uniq([x.strip().strip('\"') for x in open(f'{gene_list:a}').readlines() if not x.strip().startswith('#')])\n", "\n", "\n", "depends: R_library(\"CorShrink\")\n", "input: data, for_each = 'genes'\n", "output: f'{vhat_data:nn}/{vhat_data:bnn}_V_corshrink_{_genes}.rds'\n", "task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, tags = f'{step_name}_{_output:bn}' \n", "R: expand = \"${ }\", workdir = cwd, stderr = f\"{_output:n}.stderr\", stdout = f\"{_output:n}.stdout\", container = container, entrypoint=entrypoint\n", " source(${util_script:r})\n", " CorShrink_sum = function(gene, database, z_thresh = 2){\n", " print(gene)\n", " dat <- GetSS(gene, database)\n", " z = dat$\"z-score\"\n", " max_absz = apply(abs(z), 1, max)\n", " nullish = which(max_absz < z_thresh)\n", " # if (length(nullish) < ncol(z)) {\n", " # stop(\"not enough null data to estimate null correlation\")\n", " # }\n", " if (length(nullish) <= 1){\n", " mat = diag(ncol(z))\n", " } else {\n", " nullish_z = z[nullish, ] \n", " mat = as.matrix(CorShrink::CorShrinkData(nullish_z, ash.control = list(mixcompdist = \"halfuniform\"))$cor)\n", " }\n", " return(mat)\n", " }\n", " V = Corshrink_sum(\"${_genes}\", ${data:r})\n", " saveRDS(V, ${_output:r})" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "kernel": "SoS" }, "outputs": [], "source": [ "# Estimate each V separately via \"simple\" method\n", "[vhat_simple_specific_1]\n", "# Utility script\n", "parameter: util_script = path('/project/mstephens/gtex/scripts/SumstatQuery.R')\n", "# List of genes to analyze\n", "parameter: gene_list = path()\n", "\n", "fail_if(not gene_list.is_file(), msg = 'Please specify valid path for --gene-list')\n", "fail_if(not util_script.is_file() and len(str(util_script)), msg = 'Please specify valid path for --util-script')\n", "genes = sort_uniq([x.strip().strip('\"') for x in open(f'{gene_list:a}').readlines() if not x.strip().startswith('#')])\n", "\n", "depends: R_library(\"Matrix\")\n", "input: data, for_each = 'genes'\n", "output: f'{vhat_data:nn}/{vhat_data:bnn}_V_simple_{_genes}.rds'\n", "\n", "task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, tags = f'{step_name}_{_output:bn}' \n", "R: expand = \"${ }\", workdir = cwd, stderr = f\"{_output:n}.stderr\", stdout = f\"{_output:n}.stdout\", container = container, entrypoint=entrypoint\n", " source(${util_script:r})\n", " simple_V = function(gene, database, z_thresh = 2){\n", " print(gene)\n", " dat <- GetSS(gene, database)\n", " z = dat$\"z-score\"\n", " max_absz = apply(abs(z), 1, max)\n", " nullish = which(max_absz < z_thresh)\n", " # if (length(nullish) < ncol(z)) {\n", " # stop(\"not enough null data to estimate null correlation\")\n", " # }\n", " if (length(nullish) <= 1){\n", " mat = diag(ncol(z))\n", " } else {\n", " nullish_z = z[nullish, ]\n", " mat = as.matrix(Matrix::nearPD(as.matrix(cov(nullish_z)), conv.tol=1e-06, doSym = TRUE, corr=TRUE)$mat)\n", " }\n", " return(mat)\n", " }\n", " V = simple_V(\"${_genes}\", ${data:r})\n", " saveRDS(V, ${_output:r})" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "kernel": "SoS" }, "outputs": [], "source": [ "# Consolidate Vhat into one file\n", "[vhat_corshrink_xcondition_2, vhat_simple_specific_2]\n", "depends: R_library(\"parallel\")\n", "# List of genes to analyze\n", "parameter: gene_list = path()\n", "\n", "fail_if(not gene_list.is_file(), msg = 'Please specify valid path for --gene-list')\n", "genes = paths([x.strip().strip('\"') for x in open(f'{gene_list:a}').readlines() if not x.strip().startswith('#')])\n", "\n", "\n", "input: group_by = 'all'\n", "output: f\"{vhat_data:nn}.V_{step_name.rsplit('_',1)[0]}.rds\"\n", "\n", "task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, tags = f'{step_name}_{_output:bn}' \n", "R: expand = \"${ }\", workdir = cwd, stderr = f\"{_output:n}.stderr\", stdout = f\"{_output:n}.stdout\", container = container, entrypoint=entrypoint\n", " library(parallel)\n", " files = sapply(c(${genes:r,}), function(g) paste0(c(${_input[0]:adr}), '/', g, '.rds'), USE.NAMES=FALSE)\n", " V = mclapply(files, function(i){ readRDS(i) }, mc.cores = 1)\n", " R = dim(V[[1]])[1]\n", " L = length(V)\n", " V.array = array(as.numeric(unlist(V)), dim=c(R, R, L))\n", " saveRDS(V.array, ${_output:ar})" ] }, { "cell_type": "markdown", "metadata": { "kernel": "SoS", "tags": [] }, "source": [ "## Compute multivariate mixture" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "kernel": "SoS" }, "outputs": [], "source": [ "# Latest update: May/2023\n", "[ud]\n", "# Options are \"ted\", \"ed\", \n", "parameter: unconstrained_prior = \"ted\"\n", "input:[data, vhat_data if vhat != \"mle\" else f'{vhat_data:nn}.V_simple.rds'] \n", "output: prior_data\n", "task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, tags = f'{step_name}_{_output:bn}' \n", "R: expand = \"${ }\", stderr = f\"{_output:n}.stderr\", stdout = f\"{_output:n}.stdout\", container = container, entrypoint=entrypoint\n", " library(stringr)\n", " library(udr)\n", " library(mashr)\n", "\n", " rds_files = c(${_input:r,})\n", " # mash data \n", " dat = readRDS(rds_files[1])\n", " vhat = readRDS(rds_files[2])\n", "\n", " # Fit mixture model using udr package\n", " mash_data = mash_set_data(dat$strong.b, Shat=dat$strong.s, V=vhat, alpha=${1 if effect_model == 'EZ' else 0}, zero_Bhat_Shat_reset = 1E3)\n", " # Canonical matrices\n", " U.can = cov_canonical(mash_data) \n", "\n", " set.seed(999)\n", " # Penalty strength\n", " lambda = ncol(dat$strong.z)\n", " # Initialize udr\n", " fit0 <- ud_init(mash_data, n_unconstrained = 50, U_scaled = U.can)\n", " # Fit udr and use penalty as default as suggested by Yunqi\n", " # penalty is necessary in small sample size case, and there won't be a difference in large sample size \n", " fit2 = ud_fit(fit0, control = list(unconstrained.update = \"${unconstrained_prior}\", scaled.update = \"fa\", resid.update = 'none', \n", " lambda =lambda, penalty.type = \"iw\", maxiter=1e3, tol = 1e-2, tol.lik = 1e-2))\n", "\n", " # extract data-driven covariance from udr model. (A list of covariance matrices)\n", " U.ud <- lapply(fit2$U,function (e) \"[[\"(e,\"mat\")) \n", "\n", " saveRDS(list(U=U.ud, w=fit2$w, loglik=fit2$loglik), ${_output:r})" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "kernel": "SoS" }, "outputs": [], "source": [ "[ud_unconstrained]\n", "# Method is `ed` or `ted`\n", "parameter: ud_method = \"ed\"\n", "# A typical choice is to estimate scales only for canonical components\n", "parameter: scale_only = []\n", "# Tolerance for change in likelihood\n", "parameter: ud_tol_lik = 1e-3\n", "input: [data, vhat_data if vhat != \"mle\" else f'{vhat_data:nn}.V_simple.rds'] + [f\"{cwd}/{output_prefix}.{m}.rds\" for m in mixture_components]\n", "output: prior_data\n", "task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, tags = f'{step_name}_{_output:bn}' \n", "R: expand = \"${ }\", stderr = f\"{_output:n}.stderr\", stdout = f\"{_output:n}.stdout\", container = container, entrypoint=entrypoint\n", " library(stringr)\n", " rds_files = c(${_input:r,})\n", " dat = readRDS(rds_files[1])\n", " vhat = readRDS(rds_files[2])\n", " mash_data = mash_set_data(dat$strong.b, Shat=dat$strong.s, V=vhat, alpha=${1 if effect_model == 'EZ' else 0}, zero_Bhat_Shat_reset = 1E3)\n", " # U is different from mashr pipeline, here I kept the later one\n", " # U = list(XtX = dat$XtX)\n", " U = list(XtX = t(mash_data$Bhat) %*% mash_data$Bhat / nrow(mash_data$Bhat))\n", "\n", " U_scaled = list()\n", " mixture_components = c(${paths(mixture_components):r,})\n", " scale_only = c(${paths(scale_only):r,})\n", " scale_idx = which(mixture_components %in% scale_only )\n", " for (f in 3:length(rds_files) ) {\n", " if ((f - 1) %in% scale_idx ) {\n", " U_scaled = c(U_scaled, readRDS(rds_files[f]))\n", " } else {\n", " U = c(U, readRDS(rds_files[f]))\n", " }\n", " }\n", " \n", " # Fit mixture model using udr package\n", " library(udr)\n", " message(paste(\"Running ${ud_method.upper()} via udr package for\", length(U), \"mixture components\"))\n", " f0 = ud_init(X = as.matrix(dat$strong.z), V = V, U_scaled = U_scaled, U_unconstrained = U, n_rank1=0)\n", " res = ud_fit(f0, X = na.omit(f0$X), control = list(unconstrained.update = \"ed\", resid.update = 'none', scaled.update = \"fa\", maxiter=5000, tol.lik = ${ud_tol_lik}), verbose=TRUE)\n", " res_ted = ud_fit(f0, X = na.omit(f0$X), control = list(unconstrained.update = \"ted \", resid.update = 'none', scaled.update = \"fa\", maxiter=5000, tol.lik = ${ud_tol_lik}), verbose=TRUE)\n", "\n", " saveRDS(list(U=res$U, w=res$w, loglik=res$loglik), ${_output:r})" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "kernel": "SoS" }, "outputs": [], "source": [ "[ed_bovy]\n", "parameter: ed_tol = 1e-6\n", "input: [data, vhat_data if vhat != \"mle\" else f'{vhat_data:nn}.V_simple.rds'] + [f\"{cwd}/{output_prefix}.{m}.rds\" for m in mixture_components]\n", "output: prior_data\n", "task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, tags = f'{step_name}_{_output:bn}' \n", "R: expand = \"${ }\", stderr = f\"{_output:n}.stderr\", stdout = f\"{_output:n}.stdout\", container = container, entrypoint=entrypoint\n", " library(mashr)\n", " rds_files = c(${_input:r,})\n", " dat = readRDS(rds_files[1])\n", " vhat = readRDS(rds_files[2])\n", " mash_data = mash_set_data(dat$strong.b, Shat=dat$strong.s, V=vhat, alpha=${1 if effect_model == 'EZ' else 0}, zero_Bhat_Shat_reset = 1E3)\n", " # U is different from mashr pipeline, here I kept the later one\n", " # U = list(XtX = dat$XtX)\n", " U = list(XtX = t(mash_data$Bhat) %*% mash_data$Bhat / nrow(mash_data$Bhat))\n", " \n", " for (f in rds_files[3:length(rds_files)]) U = c(U, readRDS(f))\n", " # Fit mixture model using ED code by J. Bovy\n", " message(paste(\"Running ED via J. Bovy's code for\", length(U), \"mixture components\"))\n", " res = mashr:::bovy_wrapper(mash_data, U, logfile=${_output:nr}, tol = ${ed_tol})\n", " saveRDS(list(U=res$Ulist, w=res$pi, loglik=scan(\"${_output:n}_loglike.log\")), ${_output:r})" ] }, { "cell_type": "markdown", "metadata": { "kernel": "SoS", "tags": [] }, "source": [ "## Plot patterns of sharing\n", "\n", "This is a simple utility function that takes the output from the pipeline above and make some heatmap to show major patterns of multivariate effects. The plots will be ordered by their mixture weights." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "kernel": "SoS" }, "outputs": [], "source": [ "[plot_U]\n", "parameter: data = path\n", "# number of components to show\n", "parameter: max_comp = -1\n", "# whether or not to convert to correlation\n", "parameter: to_cor = False\n", "parameter: tol = \"1E-6\"\n", "parameter: remove_label = False\n", "parameter: name = \"\"\n", "input: data\n", "output: f'{cwd:a}/{_input:bn}{(\"_\" + name.replace(\"$\", \"_\")) if name != \"\" else \"\"}.pdf'\n", "R: expand = \"${ }\", stderr = f'{_output:n}.stderr', stdout = f'{_output:n}.stdout', container = container, entrypoint=entrypoint\n", " library(reshape2)\n", " library(ggplot2)\n", " plot_sharing = function(X, col = 'black', to_cor=FALSE, title=\"\", remove_names=F) {\n", " clrs <- colorRampPalette(rev(c(\"#D73027\",\"#FC8D59\",\"#FEE090\",\"#FFFFBF\",\n", " \"#E0F3F8\",\"#91BFDB\",\"#4575B4\")))(128)\n", " if (to_cor) lat <- cov2cor(X)\n", " else lat = X/max(diag(X))\n", " lat[lower.tri(lat)] <- NA\n", " n <- nrow(lat)\n", " if (remove_names) {\n", " colnames(lat) = paste('t',1:n, sep = '')\n", " rownames(lat) = paste('t',1:n, sep = '')\n", " }\n", " melted_cormat <- melt(lat[n:1,], na.rm = TRUE)\n", " p = ggplot(data = melted_cormat, aes(Var2, Var1, fill = value))+\n", " geom_tile(color = \"white\")+ggtitle(title) + \n", " scale_fill_gradientn(colors = clrs, limit = c(-1,1), space = \"Lab\") +\n", " theme_minimal()+ \n", " coord_fixed() +\n", " theme(axis.title.x = element_blank(),\n", " axis.title.y = element_blank(),\n", " axis.text.x = element_text(color=col, size=8,angle=45,hjust=1),\n", " axis.text.y = element_text(color=rev(col), size=8),\n", " title =element_text(size=10),\n", " # panel.grid.major = element_blank(),\n", " panel.border = element_blank(),\n", " panel.background = element_blank(),\n", " axis.ticks = element_blank(),\n", " legend.justification = c(1, 0),\n", " legend.position = c(0.6, 0),\n", " legend.direction = \"horizontal\")+\n", " guides(fill = guide_colorbar(title=\"\", barwidth = 7, barheight = 1,\n", " title.position = \"top\", title.hjust = 0.5))\n", " if(remove_names){\n", " p = p + scale_x_discrete(labels= 1:n) + scale_y_discrete(labels= n:1)\n", " }\n", " return(p)\n", " }\n", " \n", " dat = readRDS(${_input:r})\n", " name = \"${name}\"\n", " if (name != \"\") {\n", " if (is.null(dat[[name]])) stop(\"Cannot find data ${name} in ${_input}\")\n", " dat = dat[[name]]\n", " }\n", " if (is.null(names(dat$U))) names(dat$U) = paste0(\"Comp_\", 1:length(dat$U))\n", " meta = data.frame(names(dat$U), dat$w, stringsAsFactors=F)\n", " colnames(meta) = c(\"U\", \"w\")\n", " tol = ${tol}\n", " n_comp = length(meta$U[which(meta$w>tol)])\n", " meta = head(meta[order(meta[,2], decreasing = T),], ${max_comp if max_comp > 1 else \"nrow(meta)\"})\n", " message(paste(n_comp, \"components out of\", length(dat$w), \"total components have weight greater than\", tol))\n", " res = list()\n", " for (i in 1:n_comp) {\n", " title = paste(meta$U[i], \"w =\", round(meta$w[i], 6))\n", " ##Handle updated udr data structure\n", " if(is.list(dat$U[[meta$U[i]]])){\n", " res[[i]] = plot_sharing(dat$U[[meta$U[i]]]$mat, to_cor = ${\"T\" if to_cor else \"F\"}, title=title, remove_names = ${\"TRUE\" if remove_label else \"FALSE\"})\n", " } else if(is.matrix(dat$U[[meta$U[i]]])){\n", " res[[i]] = plot_sharing(dat$U[[meta$U[i]]], to_cor = ${\"T\" if to_cor else \"F\"}, title=title, remove_names = ${\"TRUE\" if remove_label else \"FALSE\"})\n", " }\n", " }\n", " unit = 4\n", " n_col = 5\n", " n_row = ceiling(n_comp / n_col)\n", " pdf(${_output:r}, width = unit * n_col, height = unit * n_row)\n", " do.call(gridExtra::grid.arrange, c(res, list(ncol = n_col, nrow = n_row, bottom = \"Data source: readRDS(${_input:br})${('$'+name) if name else ''}\")))\n", " dev.off()" ] } ], "metadata": { "kernelspec": { "display_name": "SoS", "language": "sos", "name": "sos" }, "language_info": { "codemirror_mode": "sos", "file_extension": ".sos", "mimetype": "text/x-sos", "name": "sos", "nbconvert_exporter": "sos_notebook.converter.SoS_Exporter", "pygments_lexer": "sos" }, "sos": { "kernels": [ [ "R" ], [ "SoS" ] ], "version": "0.24.3" } }, "nbformat": 4, "nbformat_minor": 4 }