This workflow converts fastqtl
eQTL analysis summary statistics text output to formats more friendly to R analysis. In particular:
Summary statistics from fasteqtl
-like output are in text format, one row per gene-snp pair. Columns in our example data-set are:
gene_id
variant_id
tss_distance
ma_samples
ma_count
maf
pval_nominal
slope
slope_se
This is format of GTEx version 8 summary statistics data. Each analysis ("tissue" for GTEx) has a separate text file. Additionally there are support files of gene transcription start site coordinates, and SNP coordinates.
The workflow takes a list of summary statistics file names, eg, data/fastqtl/FastQTLSumStats.list
(can be configured) that has the contents:
Tissue_1.fastqtl.gz
Tissue_2.fastqtl.gz
...
The first two columns of these files have to be gene_id
and variant_id
(column name does not matter). For other contents we only need columns $\hat{\beta}$, $\text{SE}(\hat{\beta})$ and p-value for the summary statistics. In the fastqtl
output format above it is columns 8, 9, 7
. If your summary statistics file has a different format you can use --cols
parameter to pass the proper column numbers. There are two ways to do it:
Current default is --cols 8 9 7
. Currently this pipeline only supports and requires summary statistics $\hat{\beta}$, $\text{SE}(\hat{\beta})$ and p-value, not other quantities (eg t statistic).
To speed up merging multiple HDF5 files it helps to provide a list of gene names. Otherwise it takes too much time to figure them out from individual HDF5 files before merger can happen. The gene list has the contents like:
ENSG00000186092.4
ENSG00000227232.5
ENSG00000228463.9
ENSG00000241860.6
ENSG00000268903.1
ENSG00000269981.1
ENSG00000279457.4
ENSG00000279928.2
...
Under the same folder as this list file, you keep all these listed data files. Then you run:
JOB_OPTION="-j 8"
#JOB_OPTION="-q midway2 -c midway2.yml"
sos run workflows/fastqtl_to_mash.ipynb convert \
--data-list data/fastqtl/FastQTLSumStats.list \
--gene-list data/fastqtl/GTEx_genes.txt \
$JOB_OPTION
to convert to HDF5 only, and
sos run workflows/fastqtl_to_mash.ipynb \
--data-list data/fastqtl/FastQTLSumStats.list \
--gene-list data/fastqtl/GTEx_genes.txt \
$JOB_OPTION
to convert to HDF5 AND extract MASH input.
JOB_OPTION
¶With bash variable JOB_OPTION
set to -q midway2 -c midway2.yml
, each workflow is configured to run on UChicago RCC's midway2 cluster, via task
option.
When executed as is, it takes 33hrs to convert the summary statistics for GTEx data (Release V8).
You might need to configure task
differently for your computing environment if you are familiar with SoS
.
Otherwise, you can use the JOB_OPTION
-j 8
to use 8 CPU threads.
sos run fastqtl_to_mash.ipynb -h
If you run the entire workflow, you should find under ./fastqtl_to_mash_output
(can be configured):
strong
/random
and optionally random_test
, with Z-scores computed from p-values.[global]
# Work directory / output directory
parameter: cwd = path('./fastqtl_to_mash_output')
# A text file containing data-set names
parameter: data_list = path("data/eQTL_summary_files.txt")
# Optionally, a list of gene names.
parameter: gene_list = path()
# Meta-info tag for HDF5 database
parameter: msg = "eQTL mapping summary statistics"
# maximum number of groups per HDF5 file
parameter: maxsize = 1000
# 1-based indexing of 3 numbers indicating columns for betahat, se(betahat) and p-value,
# or 2 numbesr for betahat and p-value
parameter: cols = [8, 9, 7]
parameter: keep_ensg_version = 0
# for GTEx data, parameter: common_suffix = ".allpairs.txt"
parameter: common_suffix = ""
if len(cols) == 2:
cols.append(0)
# Generate utility functions
[convert_0, default_0]
depends: Py_Module('tables'), Py_Module('pandas>0.23.0')
report: expand = "${ }", output = '.sos/utils.py'
import sys, os, re, copy
import numpy as np, pandas as pd, tables as tb
tb.parameters.MAX_GROUP_WIDTH = 51200
# tb.parameters.NODE_CACHE_SLOTS = -51200
# tb.parameters.METADATA_CACHE_SIZE = 1048576 * 100000
# tb.parameters.CHUNK_CACHE_SIZE = 2097152 * 100000
# tb.parameters.CHUNK_CACHE_NELMTS = 521
import scipy.stats as st
class Environment:
def __init__(self):
self.float = np.float32
self.duplicate_tag = '_duplicated_'
self.common_suffix = '${common_suffix}.h5'
env = Environment()
class TBData(dict):
def __init__(self, data, name, msg = None, root = '/', complib = 'bzip2'):
'''bzip2 may not be compatible with other hdf5 applications; but zlib is fine'''
self.__root = root.strip('/')
self.__group = name
self.__msg = msg
try:
if type(data) is dict:
self.update(data)
elif type(data) is str:
# is file name
self.__load(tb.open_file(data))
else:
# is file stream
self.__load(data)
except tb.exceptions.NoSuchNodeError:
raise ValueError('Cannot find dataset {}!'.format(name))
self.tb_filters = tb.Filters(complevel = 9, complib=complib)
def sink(self, filename):
with tb.open_file(filename, 'a') as f:
if self.__root:
try:
f.create_group("/", self.__root)
except:
pass
try:
# there is existing data -- have to merge with current data
# have to do this because the input file lines are not grouped by gene names!!
# use try ... except to hopefully faster than if ... else
# e.g., if not f.__contains__('/{}'.format(self.__group)) ... else ...
for element in f.list_nodes('/{}/{}'.format(self.__root, self.__group)):
if element.name != 'colnames':
self[element.name] = np.concatenate((element[:], self[element.name]))
except tb.exceptions.NoSuchNodeError:
f.create_group("/" + self.__root, self.__group,
self.__msg if self.__msg else self.__group)
for key in self:
self.__store_array(key, f)
f.flush()
def dump(self, table, output = False):
if output:
pd.DataFrame({self['colnames'][i] : self[table][:,i] for i in range(len(self['colnames']))}, index = self['rownames']).to_csv(sys.stdout, na_rep = 'NA')
return None
else:
return pd.DataFrame({self['colnames'][i] : self[table][:,i] for i in range(len(self['colnames']))}, index = self['rownames'])
def __load(self, fstream):
try:
for element in fstream.list_nodes('/{}/{}'.format(self.__root, self.__group)):
self[element.name] = element[:]
fstream.close()
except:
fstream.close()
raise
def __roll_back(self, group, name):
try:
n = getattr(group, name)
n._f_remove()
except AttributeError:
pass
def __store_array(self, name, fstream):
if self.__root:
element = getattr(getattr(fstream.root, self.__root), self.__group)
else:
element = getattr(fstream.root, self.__group)
arr = self[name]
if type(arr) is list:
arr = np.array(arr)
self.__roll_back(element, name)
#
if arr.shape != (0,):
ds = fstream.create_carray(element, name, tb.Atom.from_dtype(arr.dtype), arr.shape,
filters = self.tb_filters)
ds[:] = arr
def get_tb_grps(filenames, group_name = None):
if len(filenames) == 0:
return []
if isinstance(filenames, str):
filenames = [filenames]
names = set()
for filename in filenames:
with tb.open_file(filename) as f:
names.update([node._v_name for node in (f.root if group_name is None else getattr(f.root, '{}'.format(group_name)))])
return sorted(names)
def get_se(bhat, pval):
if bhat < 0:
z = st.norm.ppf(pval / 2)
else:
z = st.norm.ppf(1 - pval / 2)
if z != 0:
se = bhat/z
else:
se = np.nan
return se
class SSData:
def __init__(self, header = False):
self.data = {'buffer':{'data':[], 'rownames':[]}, 'output':{}}
self.header = header
self.previous_name = self.current_name = None
self.min_line_len = max([${cols[0]}, ${cols[1]}, ${cols[2]}])
def parse(self, line, ensg_version = 0, include = set()):
# input line is snp, gene, beta, t, pval and optionally se(bhat)
if not line:
self.__reset()
self.current_name = None
return 1
if isinstance(line, bytes):
line = line.decode()
line = line.strip().split()
if self.header:
# header line has to be accounted for
self.header = False
return 0
#
line[0] = line[0].strip('"')
if not line[0] in include:
return -1
# the line is not long enough ... there is format issues
if len(line) < self.min_line_len:
return -1
#
if ensg_version == 0:
line[0] = line[0].rsplit('.',1)[0]
if self.previous_name is None:
self.previous_name = line[0]
self.current_name = line[0]
if self.current_name != self.previous_name:
self.__reset()
if ${cols[2]} == 0:
# the two numbers are bhat and p-value, we need to compute se(bhat)
self.data['buffer']['data'].append([line[${cols[0]-1}], str(get_se(float(line[${cols[0]-1}]), float(line[${cols[1]-1}]))), line[${cols[1]-1}]])
else:
self.data['buffer']['data'].append([line[${cols[0]-1}], line[${cols[1]-1}], line[${cols[2]-1}]])
self.data['buffer']['rownames'].append(self.__format_variant_id(line[1]))
return 0
@staticmethod
def __format_variant_id(value):
value = value.strip('"').lstrip('chr').split('_')
if len(value) > 4:
# keep it chr, pos, ref, alt
value = value[:4]
return '_'.join(value)
def __reset(self):
self.data['buffer']['data'] = np.array(self.data['buffer']['data'], dtype = env.float)
self.data['buffer']['rownames'] = np.array(self.data['buffer']['rownames'])
self.data['buffer']['colnames'] = np.array(['beta','se','pval'])
self.data['output'] = copy.deepcopy(self.data['buffer'])
self.data['buffer'] = {'data':[], 'rownames':[]}
def dump(self):
return self.data['output']
class DataMerger(TBData):
def __init__(self, files, name, msg = None):
TBData.__init__(self, {}, name, msg, complib = "zlib")
self.files = sorted(files)
self.__group = name
def merge(self):
data = {}
one_snp = None
failure_ct = 0
# Collect data
for item in self.files:
tissue = re.sub(r'{}$'.format(env.common_suffix), '', os.path.basename(item))
try:
data[tissue] = TBData(item, self.__group)
if one_snp is None: one_snp = data[tissue]['rownames'][0]
except ValueError:
data[tissue] = {'data' : np.array([[np.nan, np.nan, np.nan]]), 'rownames': None}
failure_ct += 1
# Fix row name
# Because in GTEx data file there are duplicated gene-snp pairs having different sumstats!!
if data[tissue]['rownames'] is not None:
data[tissue]['rownames'] = self.__dedup(data[tissue]['rownames'], item)
if failure_ct == len(self.files):
return 1
# Merge data
for idx, item in enumerate(['beta','se','pval']):
self[item] = pd.concat([pd.DataFrame(
{tissue : data[tissue]['data'][:,idx]},
index = data[tissue]['rownames'] if data[tissue]['rownames'] is not None else [one_snp]
) for tissue in sorted(data.keys())], sort=True, axis = 1)
if 'rownames' not in self:
self['rownames'] = np.array(self[item].index, dtype = str)
if 'colnames' not in self:
self['colnames'] = np.array(self[item].columns.values.tolist(), dtype = str)
self[item] = np.array(self[item].values, dtype = env.float)
# np.savetxt(sys.stdout, self['pval'], fmt='%10.5f')
# print(self['rownames'])
# print(self['colnames'])
return 0
def __dedup(self, seq, filename):
seen = {}
dups = set()
def __is_seen(x, seen):
if x not in seen:
seen[x] = 0
return 0
else:
seen[x] += 1
dups.add(x)
return 1
# Tag them
obs = [x if not __is_seen(x, seen) else '%s%s%s' % (x, env.duplicate_tag, seen[x]) for x in seq]
# Log them
if len(dups):
filename = os.path.splitext(filename)[0]
with open(filename + '.error', 'a') as f:
for item in dups:
f.write('{}:{} appeared {} times in {}\n'.\
format(self.__group, item, seen[item] + 1, filename))
return obs
def get_gs_pairs(data, name, num = (1, 9), method = 'equal_space'):
'''choose gene-snp pairs from data, controlled by num = (a, b)
for the best gene-snp pair (a = 0 or 1), and b other random
gene-snp pairs'''
def random_sample(x, k):
if k < 0:
return sorted(x)
return sorted(set(np.random.choice(x, min(len(x), k))))
def equal_sample(x, k):
if len(x) < k or k < 0:
return sorted(x)
f = lambda m, n: [i*n//m + n//(2*m) for i in range(m)]
return sorted(set([x[i] for i in f(k, len(x))]))
output = {'colnames' : data['colnames']}
lp = data.dump('pval')
shat = data.dump('se')
# pval cannot be nan or zero (in fastqtl output, Shat nan has pval zero)
lp = lp[np.all(np.isfinite(lp), axis=1) & np.all(np.isfinite(shat), axis=1)]
#
if lp.empty and num[0] > 0:
return None
# Find strong SNP-gene pair
lp = -np.log10(lp)
rowidx = np.where(data['rownames'] == lp.max(axis=1).idxmax())[0][0]
if num[0] != 0:
output['strong_rownames'] = ['%s_%s' % (name, data['rownames'][rowidx].decode())]
output['strong'] = {}
for k in ['beta', 'pval', 'se']:
output['strong'][k] = data[k][rowidx, :]
if num[1] != 0:
all_randomidxes = [y for y, x in enumerate(data['rownames']) if x in lp.index and y != rowidx]
sample_randomidxes = random_sample(all_randomidxes, num[1]) if method == 'random' else equal_sample(all_randomidxes, num[1])
output['random_rownames'] = ['%s_%s' % (name, data['rownames'][x].decode()) for x in sample_randomidxes]
output['random'] = {}
for k in ['beta', 'pval', 'se']:
output['random'][k] = data[k][sample_randomidxes, :]
if not 'strong' in output and not 'random' in output:
output = None
return output
def merge_tmp_h5(output, verbose = 0):
from glob import glob
tmpfiles = list(glob(output + "_*.tmp"))
if os.path.isfile(output+'.h5'):
os.remove(output+'.h5')
for item in sorted(tmpfiles):
for name in get_tb_grps(item):
cmd = 'h5copy -i {0} -o {2} -s "/{1}" -d "/{1}"'.format(item, name, output+'.h5')
if verbose:
print(cmd)
os.system(cmd)
For per study conversion I use bzip2
compression method and float32
to achieve higher compression rate. This workflow step will generate one HDF5 file per summary statistics file. h5copy
program can be installed via conda install -c conda-forge hdf5
.
# Convert summary stats gzip format to HDF5
[convert_1, default_1]
depends: executable("h5copy")
fail_if(not data_list.is_file(), msg = 'Need data list file!')
data_files = set(get_output(f"awk '{{print $1}}' {data_list:e}").strip().split('\n'))
fail_if(len(data_files) == 0, msg = 'Need input data files!')
import os
input: [f'{data_list:d}/{x}' if os.path.isfile(f'{data_list:d}/{x}') else x for x in data_files], group_by = 1
output: f'{cwd:a}/{_input:bn}.h5'
task: trunk_workers = 1, walltime = '36h', trunk_size = 1, mem = '4G', cores = 1, tags = f'{_output:bn}'
bash: expand = True, workdir = cwd
rm -f {_output:n}_*.tmp
python: expand = "${ }", input = '.sos/utils.py'
import warnings
import gzip
gene_names = set([x.strip() for x in open(${gene_list:r}).readlines() if x.strip()] if ${gene_list.is_file()} else [])
ssp = SSData(header = True)
group_counts = 0
with gzip.open(${_input:r}) as f:
while True:
line = f.readline()
quit = ssp.parse(line, ${keep_ensg_version}, include = gene_names)
if quit == -1:
continue
if ssp.current_name != ssp.previous_name:
group_counts += 1
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category = tb.FlavorWarning)
data = TBData(ssp.dump(), ssp.previous_name, "${msg}")
data.sink("${_output:n}_%i.tmp" % (np.ceil(group_counts / ${maxsize}))
if ${maxsize} > 0 else ${_output:r})
ssp.previous_name = ssp.current_name
if quit:
break
if ${maxsize} > 0:
merge_tmp_h5(${_output:nr})
bash: expand = True, workdir = cwd
rm -f {_output:n}_*.tmp
This step creates tables for each summary statistic per gene from multiple studies. Rows are effects (SNPs), columns are conditions (tissue names). This time I use zlib
compression for better compatibility with other HDF5 routines (eg rhdf5
).
# Merge single study data to multivariate data
[convert_2, default_2]
depends: executable("h5copy")
input: group_by = 'all'
output: f'{cwd:a}/{data_list:bn}.h5'
task: trunk_workers = 1, walltime = '36h', trunk_size = 1, mem = '4G', cores = 1, tags = f'{_output:bn}'
bash: expand = True, workdir = cwd
rm -f {_output:n}_*.tmp
python: expand = '${ }', input = '.sos/utils.py'
import warnings
if ${gene_list.is_file()}:
gene_names = [x.strip() for x in open(${gene_list:r}).readlines() if x.strip()]
else:
gene_names = get_tb_grps([${_input:r,}])
if ${keep_ensg_version} == 0:
gene_names = [os.path.splitext(x)[0] for x in gene_names]
failure_ct = 0
for idx, item in enumerate(gene_names):
ssm = DataMerger([${_input:r,}], item, "${msg}")
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category = tb.FlavorWarning)
if ssm.merge() == 0:
ssm.sink("${_output:n}_%i.tmp" % (np.ceil((idx + 1.0) / ${maxsize}))
if ${maxsize} > 0 else ${_output:r})
else:
failure_ct += 1
with open("${_output:n}.log", 'w') as f:
f.write("%s out of %s groups merged!\n" % (len(gene_names) - failure_ct, len(gene_names)))
if ${maxsize} > 0:
merge_tmp_h5(${_output:nr})
bash: expand = True, workdir = cwd
rm -f {_output:n}_*.tmp
We need to extract the "top" signals based on single tissue analysis to compute MASH priors, as well as some random SNP sets to fit MASH mixture model.
# Extract data to fit MASH model
[default_3]
# Number of random SNPs to draw per gene for fitting MASH mixture
# Use -1 to export all
parameter: random_per_gene = 9
# Use 1 to extract strongest gene-snp pair
# Use 0 to avoid extracting the info
parameter: best_per_gene = 1
output: f"{_input:n}.portable.h5"
task: trunk_workers = 1, walltime = '36h', trunk_size = 1, mem = '4G', cores = 1, tags = f'{_output:bn}'
python: expand = "${ }", input = '.sos/utils.py'
import warnings
if ${gene_list.is_file()}:
gene_names = [x.strip() for x in open(${gene_list:r}).readlines() if x.strip()]
else:
gene_names = get_tb_grps([${_input:r,}])
if ${keep_ensg_version} == 0:
gene_names = [os.path.splitext(x)[0] for x in gene_names]
output = dict()
output['random'] = {'colnames': None, 'rownames': [], 'beta': None, 'se': None, 'pval': None}
output['strong'] = {'colnames': None, 'rownames': [], 'beta': None, 'se': None, 'pval': None}
failure_ct = 0
for idx, name in enumerate(gene_names):
# extract the best gene-snp pair or some random gene-snp pairs
try:
data = TBData(${_input:r}, name)
res = get_gs_pairs(data, name, (${best_per_gene}, ${random_per_gene}))
except ValueError:
res = None
#
if res is None:
failure_ct += 1
continue
for k in output:
if not k in res:
continue
for kk in output[k]:
if kk == 'rownames':
if output[k]['rownames'] is None:
output[k]['rownames'] = res["{}_rownames".format(k)]
else:
output[k]['rownames'].extend(res["{}_rownames".format(k)])
elif kk == 'colnames':
if output[k]['colnames'] is None:
output[k]['colnames'] = res['colnames']
else:
output[k][kk] = np.vstack((output[k][kk], res[k][kk])) if output[k][kk] is not None else res[k][kk]
#
if failure_ct < len(gene_names):
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category = tb.FlavorWarning)
for k in output:
TBData(dict(output[k]), k, msg = "%s, %s gene-snp pair" % ("${msg}", k), complib = 'zlib').sink(${_output:r})
with open("${_output:n}.log", 'w') as f:
f.write("%s out of %s groups extracted!\n" % (len(gene_names) - failure_ct, len(gene_names)))
Here I also implemented two command options to subset data:
--effects-list
: a list of effect names (row names) to include from output MASH data-set. This option was used in MASH paper to remove SNPs in LD with each other.--conditions-list
: a list of conditions (column names) to include in output MASH data-set. Conditions not in this list are excluded from output. This option was used in MASH paper to focus analysis on brain / no-brain tissues.# Subset and split data, generate Z-score and save to RDS
[default_4]
# Size of mash random SNPs.
# This specifies the size of `random` SNP set
# and the rest of random SNPs go into `random_test` SNP set.
# if input is float it means half random SNPs
# the input can also be a integer to use specified number.
parameter: random_snp_size = 0.5
# A list of effect names (SNP names) to include in mash analysis.
parameter: effects_list = path('NULL')
# A list of condition names (tissue names) to include in mash analysis
parameter: conditions_list = path('NULL')
depends: R_library('rhdf5')
output: f"{_input:n}.mash.rds" if not str(_input).endswith('.portable.h5') else f"{_input:nn}.mash.rds"
task: trunk_workers = 1, walltime = '36h', trunk_size = 1, mem = '4G', cores = 1, tags = f'{_output:bn}'
R: expand = "${ }"
ConvertP2Z <- function(pval, beta) {
z <- abs(qnorm(pval / 2))
z[which(beta < 0)] <- -1 * z[which(beta < 0)]
return(z)
}
GetSS <- function(table, db) {
dat <- rhdf5::h5read(db, table)
dat$"z" <- ConvertP2Z(dat$"pval", dat$"beta")
for (name in c("beta", "se", "pval", "z")) {
dat[[name]] <- as.matrix(t(dat[[name]]))
colnames(dat[[name]]) <- dat$colnames
rownames(dat[[name]]) <- dat$rownames
}
dat$colnames <- dat$rownames <- NULL
## fastqtl tend to produce nan for betahat zero
## here I set it to zero as well
dat[['se']][which(is.nan(dat[['se']]))] = 0
return(dat)
}
SplitTrainTest <- function(dat, table) {
# load data
strong = dat$strong[[table]]
random = dat$random[[table]]
# select rows to keep
num_train = ${random_snp_size}
if (num_train < 1)
num_train = nrow(random) * num_train
num_train = as.integer(num_train)
if (num_train > nrow(random)) {
num_train = nrow(random)
}
train = random[1:num_train,]
if (num_train == nrow(random)) {
test = NULL
} else {
test = random[(num_train+1):nrow(random),]
}
if (${effects_list:r} != 'NULL') {
pout = scan(${effects_list:ar}, what="character", sep=NULL)
strong = strong[(rownames(strong) %in% pout),]
train = train[(rownames(train) %in% pout),]
if (!is.null(test))
test = test[(rownames(test) %in% pout),]
}
if (${conditions_list:r} != 'NULL') {
rout = scan(${conditions_list:ar}, what="character", sep=NULL)
strong = strong[,(colnames(strong) %in% rout),drop=F]
train = train[,(colnames(train) %in% rout),drop=F]
if (!is.null(test))
test = test[,(colnames(test) %in% rout),drop=F]
}
return(list(random = train,
random.test = test,
strong = strong))
}
SS_data = list(strong = GetSS('strong', ${_input:r}), random = GetSS('random', ${_input:r}))
ztable = SplitTrainTest(SS_data, "z")
btable = SplitTrainTest(SS_data, "beta")
stable = SplitTrainTest(SS_data, "se")
# save output
saveRDS(list(random.z = ztable$random,
random.test.z = ztable$random.test,
strong.z = ztable$strong,
random.b = btable$random,
random.test.b = btable$random.test,
strong.b = btable$strong,
random.s = stable$random,
random.test.s = stable$random.test,
strong.s = stable$strong), ${_output:r})
sos run workflow/fastqtl_to_mash document_it
will export the narrative in this workflow notebook to HTML format as a documentation file.
# Export workflow to HTML document
[document_it]
input: [item for item in paths(sys.argv) if item.suffix == '.ipynb'], group_by = 1
output: [f'{cwd:a}/{item:bn}.html' for item in paths(sys.argv) if item.suffix == '.ipynb'], group_by = 2
bash: expand = True, stderr = False
sos convert {_input} {_output} --template sos-report-toc
Exported from analysis/fastqtl_to_mash.ipynb
committed by Gao Wang on Tue Feb 2 19:11:23 2021 revision 1, c5fe213