Here I run the latest flashr + mashr
pipeline on the latest GTEx release.
Version: 2019.01.24 by Gao Wang and Yuxin Zou
%revisions -s
fastqtl
summary statistics data were obtained from dbGaP (data on CRI at UChicago Genetic Medicine). It has 49 tissues. [more description to come]
Using an established workflow (which takes 33hrs to run on a cluster system as configured by midway2.yml
; see inside fastqtl_to_mash.ipynb
for a note on computing environment),
INPUT_DIR=/project/compbio/GTEx_dbGaP/GTEx_Analysis_2017-06-05_v8/eqtl/GTEx_Analysis_v8_eQTL_all_associations
JOB_OPT="-c midway2.yml -q midway2"
sos run workflows/fastqtl_to_mash.ipynb --data-list $INPUT_DIR/FastQTLSumStats.list --common-suffix ".allpairs.txt" $JOB_OPT
As a result of command above I obtained the "mashable" data-set in the same format as described here.
$ zcat Whole_Blood.allpairs.txt.gz | cut -f1 | sort -u | wc -l
20316
$ h5ls Whole_Blood.allpairs.txt.h5 | wc -l
20315
The results agreed on Whole Blood sample (the original data has a header thus one line more than the H5 version). We should be good (since the pipeline reported success for all other files).
The command above took 33 hours on UChicago RCC midway2
.
[MW] cat FastQTLSumStats.log
39832 out of 39832 groups merged!
So we have a total of 39832 genes (union of 49 tissues).
[MW] cat FastQTLSumStats.portable.log
15636 out of 39832 groups extracted!
We have 15636 groups without missing data in any tissue. This will be used to train the MASH model.
The "mashable" data file is FastQTLSumStats.mash.rds
, 124Mb serialized R file.
Below is a "blackbox" implementation of the mashr
eQTL workflow -- blackbox in the sense that you can run this pipeline as an executable, without thinking too much about it, if you see your problem fits our GTEx analysis scheme. However when reading it as a notebook it is a good source of information to help developing your own mashr
analysis procedures.
Since the submission to biorxiv of Urbut 2017 we have improved implementation of MASH algorithm and made a new R package, mashr
. Major improvements compared to Urbut 2017 are:
SFA
with FLASH
, a new sparse factor analysis method to generate prior covariance candidates.At this point, the input data have already been converted from the original eQTL summary statistics to a format convenient for analysis in MASH, as a result of running the data conversion pipeline in fastqtl_to_mash.ipynb
.
Example command:
JOB_OPT="-j 8"
#JOB_OPT="-c midway2.yml -q midway2"
sos run workflows/mashr_flashr_workflow.ipynb mash $JOB_OPT # --data ... --cwd ... --vhat ...
FIXME: add comments on submitting jobs to HPC. Here we use the UChicago RCC cluster but other users can similarly configure their computating system to run the pipeline on HPC.
[global]
parameter: cwd = path('./mashr_flashr_workflow_output')
# Input summary statistics data
parameter: data = path("fastqtl_to_mash_output/FastQTLSumStats.mash.rds")
# Prefix of output files. If not specified, it will derive it from data.
# If it is specified, for example, `--output-prefix AnalysisResults`
# It will save output files as `{cwd}/AnalysisResults*`.
parameter: output_prefix = ''
# Exchangable effect (EE) or exchangable z-scores (EZ)
parameter: effect_model = 'EZ'
# Identifier of $\hat{V}$ estimate file
# It should be available as {cwd}/{data:bn}.V_{vhat}.rds
parameter: vhat = 'simple'
# Additional label for vhat file
parameter: vhat_file_label = ''
# default method for convex optimization
parameter: optmethod = "mixSQP"
# Path to mosek license file, if `--optmethod mixIP` is used
parameter: mosek_license = file_target("~/.mosek.lic")
# Most established heavy-lifting computations are in C++ via Rcpp
# but some experimental features are in R. This argument allows one
# to switch between implementations
parameter: implementation = 'Rcpp'
# Number of components in PCA analysis for prior
# default to 3 as in mash paper
# set it to 0 to not use PCA priors
parameter: npc = 3
data = data.absolute()
if len(output_prefix) == 0:
output_prefix = f"{data:bn}"
if vhat_file_label and not vhat_file_label.startswith('_'):
vhat_file_label = '_' + vhat_file_label
flash_data = file_target(f"{cwd:a}/{output_prefix}.{effect_model}.flash.rds")
prior_data = file_target(f"{cwd:a}/{output_prefix}.{effect_model}.FL_PC{npc}.rds")
vhat_data = file_target(f"{cwd:a}/{output_prefix}.{effect_model}.FL_PC{npc}.V_{vhat}{vhat_file_label}.rds")
fail_if(optmethod == "mixIP" and not mosek_license.is_file(), msg = f'To use mixIP optimization, please put a valid copy (NOT a symbolic link!) of MOSEK license to: \n``{mosek_license}``')
def sort_uniq(seq):
seen = set()
return [x for x in seq if not (x in seen or seen.add(x))]
sos run mashr_flashr_workflow.ipynb -h
Main reference are our mashr
vignettes this for mashr eQTL outline and this for using FLASH prior.
The latter was written recently specifically for this effort, and will likely be subject to changes for future versions.
If you use --optmethod mixIP
you will have to put a copy of MOSEK license file to <workdir>/mosek.lic
(ie, mashr_flashr_workflow_output/mosek.lic
if you did not change any settings below). Current default method is mixSQP
.
The outcome of this workflow should be found under ./mashr_flashr_workflow_output
folder (can be configured). File names have pattern *.mash_model_*.rds
. They can be used to computer posterior for input list of gene-SNP pairs (see next section).
flashr
prior covariances¶# Perform FLASH analysis (time estimate: 20min)
[flash]
depends: R_library("flashr")
input: data
output: flash_data
task: trunk_workers = 1, walltime = '2h', trunk_size = 1, mem = '4G', cores = 1, tags = f'{_output:bn}'
R: expand = "${ }", workdir = cwd, env = {'MOSEKLM_LICENSE_FILE': str(mosek_license)}, stderr = f"{_output:n}.stderr", stdout = f"{_output:n}.stdout"
library(flashr)
library(mixsqp)
library(mashr)
my_init_fn <- function(Y, K = 1) {
ret = flashr:::udv_si(Y, K)
pos_sum = sum(ret$v[ret$v > 0])
neg_sum = -sum(ret$v[ret$v < 0])
if (neg_sum > pos_sum) {
return(list(u = -ret$u, d = ret$d, v = -ret$v))
} else
return(ret)
}
flash_pipeline = function(data, ...) {
## current state-of-the art
## suggested by Jason Willwerscheid
## cf: discussion section of
## https://willwerscheid.github.io/MASHvFLASH/MASHvFLASHnn2.html
ebnm_fn = "ebnm_ash"
ebnm_param = list(l = list(mixcompdist = "normal",
optmethod = "${optmethod}"),
f = list(mixcompdist = "+uniform",
optmethod = "${optmethod}"))
##
fl_g <- flashr:::flash_greedy_workhorse(data,
var_type = "constant",
ebnm_fn = ebnm_fn,
ebnm_param = ebnm_param,
init_fn = "my_init_fn",
stopping_rule = "factors",
tol = 1e-3,
verbose_output = "odF")
fl_b <- flashr:::flash_backfit_workhorse(data,
f_init = fl_g,
var_type = "constant",
ebnm_fn = ebnm_fn,
ebnm_param = ebnm_param,
stopping_rule = "factors",
tol = 1e-3,
verbose_output = "odF")
return(fl_b)
}
cov_flash = function(data, subset = NULL, non_canonical = FALSE, save_model = NULL) {
if(is.null(subset)) subset = 1:mashr:::n_effects(data)
b.center = apply(data$Bhat, 2, function(x) x - mean(x))
## Only keep factors with at least two values greater than 1 / sqrt(n)
find_nonunique_effects <- function(fl) {
thresh <- 1/sqrt(ncol(fl$fitted_values))
vals_above_avg <- colSums(fl$ldf$f > thresh)
nonuniq_effects <- which(vals_above_avg > 1)
return(fl$ldf$f[, nonuniq_effects, drop = FALSE])
}
fmodel = flash_pipeline(b.center)
if (non_canonical)
flash_f = find_nonunique_effects(fmodel)
else
flash_f = fmodel$ldf$f
## row.names(flash_f) = colnames(b)
if (!is.null(save_model)) saveRDS(list(model=fmodel, factors=flash_f), save_model)
if(ncol(flash_f) == 0){
U.flash = list("tFLASH" = t(fmodel$fitted_values) %*% fmodel$fitted_values / nrow(fmodel$fitted_values))
} else{
U.flash = c(cov_from_factors(t(as.matrix(flash_f)), "FLASH"),
list("tFLASH" = t(fmodel$fitted_values) %*% fmodel$fitted_values / nrow(fmodel$fitted_values)))
}
return(U.flash)
}
##
dat = readRDS(${_input:r})
dat = mash_set_data(dat$strong.b, dat$strong.s, alpha=${1 if effect_model == 'EZ' else 0})
res = cov_flash(dat, non_canonical = TRUE, save_model = "${_output:n}.model.rds")
saveRDS(res, ${_output:r})
# Compute data-driven / canonical prior matrices (time estimate: 2h ~ 12h for ~30 49 by 49 matrix mixture)
[prior]
depends: R_library("mashr")
input: data, output_from('flash')
output: prior_data
task: trunk_workers = 1, walltime = '36h', trunk_size = 1, mem = '4G', cores = 4, tags = f'{_output:bn}'
R: expand = "${ }", workdir = cwd, stderr = f"{_output:n}.stderr", stdout = f"{_output:n}.stdout"
library(mashr)
dat = readRDS(${_input[0]:r})
mash_data = mash_set_data(dat$strong.b, Shat=dat$strong.s, alpha=${1 if effect_model == 'EZ' else 0})
# FLASH matrices
U.flash = readRDS(${_input[1]:r})
# SVD matrices
U.pca = ${"cov_pca(mash_data, %s)" % npc if npc > 0 else "list()"}
# Emperical cov matrix
X.center = apply(mash_data$Bhat, 2, function(x) x - mean(x))
# Denoised data-driven matrices
U.ed = cov_ed(mash_data, c(U.flash, U.pca, list("XX" = t(X.center) %*% X.center / nrow(X.center))), logfile=${_output:nr})
# Canonical matrices
U.can = cov_canonical(mash_data)
saveRDS(c(U.ed, U.can), ${_output:r})
FIXME: add some narratives here explaining what we do in each method.
# V estimate: "identity" method
[vhat_identity]
input: data
output: vhat_data
R: expand = "${ }", workdir = cwd, stderr = f"{_output:n}.stderr", stdout = f"{_output:n}.stdout"
dat = readRDS(${_input:r})
saveRDS(diag(ncol(dat$random.b)), ${_output:r})
# V estimate: "simple" method (using null z-scores)
[vhat_simple]
depends: R_library("mashr")
input: data
output: vhat_data
R: expand = "${ }", workdir = cwd, stderr = f"{_output:n}.stderr", stdout = f"{_output:n}.stdout"
library(mashr)
dat = readRDS(${_input:r})
vhat = estimate_null_correlation_simple(mash_set_data(dat$random.b, Shat=dat$random.s))
saveRDS(vhat, ${_output:r})
# V estimate: "mle" method
[vhat_mle]
# number of samples to use
parameter: n_subset = 6000
# maximum number of iterations
parameter: max_iter = 6
depends: R_library("mashr")
input: data, output_from('prior')
output: vhat_data
task: trunk_workers = 1, walltime = '36h', trunk_size = 1, mem = '4G', cores = 1, tags = f'{_output:bn}'
R: expand = "${ }", workdir = cwd, stderr = f"{_output:n}.stderr", stdout = f"{_output:n}.stdout"
library(mashr)
dat = readRDS(${_input[0]:r})
# choose random subset
set.seed(1)
random.subset = sample(1:nrow(dat$random.b), min(${n_subset}, nrow(dat$random.b)))
random.subset = mash_set_data(dat$random.b[random.subset,], dat$random.s[random.subset,], alpha=${1 if effect_model == 'EZ' else 0})
# estimate V mle
vhat = estimate_null_correlation(random.subset, readRDS(${_input[1]:r}), max_iter = ${max_iter})
saveRDS(vhat, ${_output:r})
# Estimate each V separately via corshrink
[vhat_corshrink_xcondition_1]
# Utility script
parameter: util_script = path('/project/mstephens/gtex/scripts/SumstatQuery.R')
# List of genes to analyze
parameter: gene_list = path()
fail_if(not gene_list.is_file(), msg = 'Please specify valid path for --gene-list')
fail_if(not util_script.is_file() and len(str(util_script)), msg = 'Please specify valid path for --util-script')
genes = sort_uniq([x.strip().strip('"') for x in open(f'{gene_list:a}').readlines() if not x.strip().startswith('#')])
depends: R_library("CorShrink")
input: data, for_each = 'genes'
output: f'{vhat_data:n}/{_genes}.rds'
task: trunk_workers = 1, walltime = '3m', trunk_size = 500, mem = '3G', cores = 1, tags = f'{_output:bn}'
R: expand = "${ }", workdir = cwd, stderr = f"{_output:n}.stderr", stdout = f"{_output:n}.stdout"
source(${util_script:r})
CorShrink_sum = function(gene, database, z_thresh = 2){
print(gene)
dat <- GetSS(gene, database)
z = dat$"z-score"
max_absz = apply(abs(z), 1, max)
nullish = which(max_absz < z_thresh)
# if (length(nullish) < ncol(z)) {
# stop("not enough null data to estimate null correlation")
# }
if (length(nullish) <= 1){
mat = diag(ncol(z))
} else {
nullish_z = z[nullish, ]
mat = as.matrix(CorShrink::CorShrinkData(nullish_z, ash.control = list(mixcompdist = "halfuniform"))$cor)
}
return(mat)
}
V = Corshrink_sum("${_genes}", ${data:r})
saveRDS(V, ${_output:r})
# Consolidate Vhat into one file
[vhat_corshrink_xcondition_2, vhat_simple_specific_2]
depends: R_library("parallel")
# List of genes to analyze
parameter: gene_list = path()
fail_if(not gene_list.is_file(), msg = 'Please specify valid path for --gene-list')
genes = paths([x.strip().strip('"') for x in open(f'{gene_list:a}').readlines() if not x.strip().startswith('#')])
input: group_by = 'all'
output: vhat_data
task: trunk_workers = 1, walltime = '1h', trunk_size = 1, mem = '4G', cores = 1, tags = f'{_output:bn}'
R: expand = "${ }", workdir = cwd, stderr = f"{_output:n}.stderr", stdout = f"{_output:n}.stdout"
library(parallel)
files = sapply(c(${genes:r,}), function(g) paste0(c(${_input[0]:adr}), '/', g, '.rds'), USE.NAMES=FALSE)
V = mclapply(files, function(i){ readRDS(i) }, mc.cores = 1)
R = dim(V[[1]])[1]
L = length(V)
V.array = array(as.numeric(unlist(V)), dim=c(R, R, L))
saveRDS(V.array, ${_output:ar})
# Estimate each V separately via "simple" method
[vhat_simple_specific_1]
# Utility script
parameter: util_script = path('/project/mstephens/gtex/scripts/SumstatQuery.R')
# List of genes to analyze
parameter: gene_list = path()
fail_if(not gene_list.is_file(), msg = 'Please specify valid path for --gene-list')
fail_if(not util_script.is_file() and len(str(util_script)), msg = 'Please specify valid path for --util-script')
genes = sort_uniq([x.strip().strip('"') for x in open(f'{gene_list:a}').readlines() if not x.strip().startswith('#')])
depends: R_library("Matrix")
input: data, for_each = 'genes'
output: f'{vhat_data:n}/{_genes}.rds'
task: trunk_workers = 1, walltime = '1m', trunk_size = 500, mem = '3G', cores = 1, tags = f'{_output:bn}'
R: expand = "${ }", workdir = cwd, stderr = f"{_output:n}.stderr", stdout = f"{_output:n}.stdout"
source(${util_script:r})
simple_V = function(gene, database, z_thresh = 2){
print(gene)
dat <- GetSS(gene, database)
z = dat$"z-score"
max_absz = apply(abs(z), 1, max)
nullish = which(max_absz < z_thresh)
# if (length(nullish) < ncol(z)) {
# stop("not enough null data to estimate null correlation")
# }
if (length(nullish) <= 1){
mat = diag(ncol(z))
} else {
nullish_z = z[nullish, ]
mat = as.matrix(Matrix::nearPD(as.matrix(cov(nullish_z)), conv.tol=1e-06, doSym = TRUE, corr=TRUE)$mat)
}
return(mat)
}
V = simple_V("${_genes}", ${data:r})
saveRDS(V, ${_output:r})
mashr
mixture model fitting¶# Fit MASH mixture model (time estimate: <15min for 70K by 49 matrix)
[mash_1]
depends: R_library("mashr")
input: data, output_from("prior"), output_from(f"vhat_{vhat}")
output: f"{_input[-1]:n}.mash_model.rds"
task: trunk_workers = 1, walltime = '36h', trunk_size = 1, mem = '4G', cores = 1, tags = f'{_output:bn}'
R: expand = "${ }", workdir = cwd, env = {'MOSEKLM_LICENSE_FILE': str(mosek_license)}, stderr = f"{_output:n}.stderr", stdout = f"{_output:n}.stdout"
library(mashr)
dat = readRDS(${_input[0]:r})
vhat = readRDS(${_input[2]:r})
mash_data = mash_set_data(dat$random.b, Shat=dat$random.s, alpha=${1 if effect_model == 'EZ' else 0}, V=vhat)
saveRDS(mash(mash_data, Ulist = readRDS(${_input[1]:r}), optmethod = "${optmethod}", outputlevel = 1, algorithm.version = "${implementation}"), ${_output:r})
Additionally provide posterior for the "strong" set in MASH input data.
# Compute posterior for the "strong" set of data as in Urbut et al 2017.
# This is optional because most of the time we want to apply the
# MASH model learned on much larger data-set.
[mash_2]
# default to True; use --no-compute-posterior to disable this
parameter: compute_posterior = True
# input Vhat file for the batch of posterior data
parameter: posterior_vhat_file = path()
skip_if(not compute_posterior)
fail_if(not posterior_vhat_file.is_file() and len(str(posterior_vhat_file)) > 1, msg = 'Cannot find specified --posterior-vhat-file. Please provide the correct path for it!')
depends: R_library("mashr")
input: data, posterior_vhat_file if posterior_vhat_file.is_file() else output_from(f"vhat_{vhat}"), output_from(-1)
output: f'{posterior_vhat_file:n}.posterior.rds' if posterior_vhat_file.is_file() else f"{_input[-1]:nn}.posterior.rds"
task: trunk_workers = 1, walltime = '36h', trunk_size = 1, mem = '4G', cores = 1, tags = f'{_output:bn}'
R: expand = "${ }", workdir = cwd, stderr = f"{_output:n}.stderr", stdout = f"{_output:n}.stdout"
library(mashr)
dat = readRDS(${_input[0]:r})
vhat = readRDS(${_input[1]:r})
mash_data = mash_set_data(dat$strong.b, Shat=dat$strong.s, alpha=${1 if effect_model == 'EZ' else 0}, V=vhat)
mash_model = readRDS(${_input[2]:ar})
saveRDS(mash_compute_posterior_matrices(mash_model, mash_data, algorithm.version = "${implementation}"), ${_output:r})
In the GTEx V6 paper we assumed one eQTL per gene and applied the model learned above to those SNPs. Under that assumption, the input data for posterior calculation will be the dat$strong.*
matrices.
It is a fairly straightforward procedure as shown in this vignette.
But it is often more interesting to apply MASH to given list of eQTLs, eg, from those from fine-mapping results. In GTEx V8 analysis we obtain such gene-SNP pairs from DAP-G fine-mapping analysis. See this notebook for how the input data is prepared. The workflow below takes a number of input chunks (each chunk is a list of matrices dat$Bhat
and dat$Shat
)
and computes posterior for each chunk. It is therefore suited for running in parallel posterior computation for all gene-SNP pairs, if input data chunks are provided.
JOB_OPT="-c midway2.yml -q midway2"
DATA_DIR=/project/compbio/GTEx_eQTL/independent_eQTL
sos run workflows/mashr_flashr_workflow.ipynb posterior \
$JOB_OPT \
--posterior-input $DATA_DIR/DAPG_pip_gt_0.01-AllTissues/DAPG_pip_gt_0.01-AllTissues.*.rds \
$DATA_DIR/ConditionalAnalysis_AllTissues/ConditionalAnalysis_AllTissues.*.rds
# Apply posterior calculations
[posterior]
parameter: mash_model = path(f"{vhat_data:n}.mash_model.rds")
parameter: posterior_input = paths()
parameter: posterior_vhat_files = paths()
# eg, if data is saved in R list as data$strong, then
# when you specify `--data-table-name strong` it will read the data as
# readRDS('{_input:r}')$strong
parameter: data_table_name = ''
parameter: bhat_table_name = 'Bhat'
parameter: shat_table_name = 'Shat'
skip_if(len(posterior_input) == 0, msg = "No posterior input data to compute on. Please specify it using --posterior-input.")
fail_if(len(posterior_vhat_files) > 1 and len(posterior_vhat_files) != len(posterior_input), msg = "length of --posterior-input and --posterior-vhat-files do not agree.")
for p in posterior_input:
fail_if(not p.is_file(), msg = f'Cannot find posterior input file ``{p}``')
depends: R_library("mashr")
input: posterior_input, group_by = 1
output: f"{_input:n}.posterior.rds"
task: trunk_workers = 1, walltime = '20h', trunk_size = 1, mem = '20G', cores = 1, tags = f'{_output:bn}'
R: expand = "${ }", workdir = cwd, stderr = f"{_output:n}.stderr", stdout = f"{_output:n}.stdout"
library(mashr)
data = readRDS(${_input:r})${('$' + data_table_name) if data_table_name else ''}
vhat = readRDS("${vhat_data if len(posterior_vhat_files) == 0 else posterior_vhat_files[_index]}")
mash_data = mash_set_data(data$${bhat_table_name}, Shat=data$${shat_table_name}, alpha=${1 if effect_model == 'EZ' else 0}, V=vhat)
saveRDS(mash_compute_posterior_matrices(readRDS(${mash_model:r}), mash_data, algorithm = "${implementation}"), ${_output:r})
[posterior]
step should produce a number of serialized R objects *.batch_*.posterior.rds
(can be loaded to R via readRDS()
) -- I chopped data to batches to take advantage of computing in multiple cluster nodes. It should be self-explanary but please let me know otherwise.*.batch_*.yaml
: gene-SNP pairs of interest, identified elsewhere (eg. fine-mapping analysis). *.batch_*.yaml
are extracted and saved to *.batch_*.rds
, creating input to the [posterior]
step.*.batch_*.stdout
file documents some SNPs found in fine-mapping results but not found in the original fastqtl
output.Exported from analysis/mashr_flashr_workflow.ipynb
committed by Gao Wang on Tue Feb 2 19:25:48 2021 revision 2, 4e7b5da