GTEx V8 Multivariate Analysis

eQTL summary statistics formatting

This workflow converts fastqtl eQTL analysis summary statistics text output to formats more friendly to R analysis. In particular:

  1. It converts single study results to HDF5 format grouped by genes.
  2. It combines multiple studies into one single HDF5 file. In the context of GTEx each study is result from one tissue.
  3. For MASH analysis in particular, it extracts from the complete data a subset of results to compute data driven MASH prior covariance, and to fit the MASH mixture model.

Input data

A list of summary statistics

Summary statistics from fasteqtl-like output are in text format, one row per gene-snp pair. Columns in our example data-set are:

gene_id 
variant_id      
tss_distance    
ma_samples      
ma_count        
maf     
pval_nominal    
slope   
slope_se

This is format of GTEx version 8 summary statistics data. Each analysis ("tissue" for GTEx) has a separate text file. Additionally there are support files of gene transcription start site coordinates, and SNP coordinates.

The workflow takes a list of summary statistics file names, eg, data/fastqtl/FastQTLSumStats.list (can be configured) that has the contents:

Tissue_1.fastqtl.gz
Tissue_2.fastqtl.gz
...

The first two columns of these files have to be gene_id and variant_id (column name does not matter). For other contents we only need columns $\hat{\beta}$, $\text{SE}(\hat{\beta})$ and p-value for the summary statistics. In the fastqtl output format above it is columns 8, 9, 7. If your summary statistics file has a different format you can use --cols parameter to pass the proper column numbers. There are two ways to do it:

  1. Specify 3 numbers for $\hat{\beta}$, $\text{SE}(\hat{\beta})$ and p-value
  2. Specify 2 numbers for $\hat{\beta}$ and p-value. $\text{SE}(\hat{\beta})$ will then be computed on the fly assuming that the test is two-sided test and that $\hat{\beta}/\text{SE}(\hat{\beta})$ is standard normal.

Current default is --cols 8 9 7. Currently this pipeline only supports and requires summary statistics $\hat{\beta}$, $\text{SE}(\hat{\beta})$ and p-value, not other quantities (eg t statistic).

A list of gene names (optional)

To speed up merging multiple HDF5 files it helps to provide a list of gene names. Otherwise it takes too much time to figure them out from individual HDF5 files before merger can happen. The gene list has the contents like:

ENSG00000186092.4
ENSG00000227232.5
ENSG00000228463.9
ENSG00000241860.6
ENSG00000268903.1
ENSG00000269981.1
ENSG00000279457.4
ENSG00000279928.2
...

Run analysis

Under the same folder as this list file, you keep all these listed data files. Then you run:

JOB_OPTION="-j 8"
#JOB_OPTION="-q midway2 -c midway2.yml"
sos run workflows/fastqtl_to_mash.ipynb convert \
    --data-list data/fastqtl/FastQTLSumStats.list \
    --gene-list data/fastqtl/GTEx_genes.txt \
    $JOB_OPTION

to convert to HDF5 only, and

sos run workflows/fastqtl_to_mash.ipynb \
    --data-list data/fastqtl/FastQTLSumStats.list \
    --gene-list data/fastqtl/GTEx_genes.txt \
    $JOB_OPTION

to convert to HDF5 AND extract MASH input.

Note on JOB_OPTION

With bash variable JOB_OPTION set to -q midway2 -c midway2.yml, each workflow is configured to run on UChicago RCC's midway2 cluster, via task option. When executed as is, it takes 33hrs to convert the summary statistics for GTEx data (Release V8). You might need to configure task differently for your computing environment if you are familiar with SoS. Otherwise, you can use the JOB_OPTION -j 8 to use 8 CPU threads.

More options

In [2]:
sos run fastqtl_to_mash.ipynb -h
usage: sos run fastqtl_to_mash.ipynb
               [workflow_name | -t targets] [options] [workflow_options]
  workflow_name:        Single or combined workflows defined in this script
  targets:              One or more targets to generate
  options:              Single-hyphen sos parameters (see "sos run -h" for details)
  workflow_options:     Double-hyphen workflow-specific parameters

Workflows:
  convert
  default
  document_it

Global Workflow Options:
  --cwd fastqtl_to_mash_output (as path)
                        Work directory / output directory
  --data-list data/eQTL_summary_files.txt (as path)
                        A text file containing data-set names
  --gene-list . (as path)
                        Optionally, a list of gene names.
  --msg 'eQTL mapping summary statistics'
                        Meta-info tag for HDF5 database
  --maxsize 1000 (as int)
                        maximum number of groups per HDF5 file
  --cols 8 9 7 (as list)
                        1-based indexing of 3 numbers indicating columns for
                        betahat, se(betahat) and p-value, or 2 numbesr for
                        betahat and p-value
  --keep-ensg-version 0 (as int)
  --common-suffix ''
                        for GTEx data, parameter: common_suffix =
                        ".allpairs.txt"

Sections
  convert_0, default_0: Generate utility functions
  convert_1, default_1: Convert summary stats gzip format to HDF5
  convert_2, default_2: Merge single study data to multivariate data
  default_3:            Extract data to fit MASH model
    Workflow Options:
      --random-per-gene 9 (as int)
                        Number of random SNPs to draw per gene for fitting MASH
                        mixture Use -1 to export all
      --best-per-gene 1 (as int)
                        Use 1 to extract strongest gene-snp pair Use 0 to avoid
                        extracting the info
  default_4:            Subset and split data, generate Z-score and save to RDS
    Workflow Options:
      --random-snp-size 0.5 (as float)
                        Size of mash random SNPs. This specifies the size of
                        `random` SNP set and the rest of random SNPs go into
                        `random_test` SNP set. if input is float it means half
                        random SNPs the input can also be a integer to use
                        specified number.
      --effects-list NULL (as path)
                        A list of effect names (SNP names) to include in mash
                        analysis.
      --conditions-list NULL (as path)
                        A list of condition names (tissue names) to include in
                        mash analysis
  document_it:          Export workflow to HTML document

Output data

If you run the entire workflow, you should find under ./fastqtl_to_mash_output (can be configured):

  • Study (tissue) specific HDF5 files of summary statistics
  • Merged HDF5 from multiple studies
  • "*.portable.h5" data extracted for MASH computations
  • "*.mash.rds" data in RDS format splitted to strong/random and optionally random_test, with Z-scores computed from p-values.
In [ ]:
[global]
# Work directory / output directory
parameter: cwd = path('./fastqtl_to_mash_output')
# A text file containing data-set names
parameter: data_list = path("data/eQTL_summary_files.txt")
# Optionally, a list of gene names.
parameter: gene_list = path()
# Meta-info tag for HDF5 database
parameter: msg = "eQTL mapping summary statistics"
# maximum number of groups per HDF5 file
parameter: maxsize = 1000
# 1-based indexing of 3 numbers indicating columns for betahat, se(betahat) and p-value, 
# or 2 numbesr for betahat and p-value
parameter: cols = [8, 9, 7]
parameter: keep_ensg_version = 0
# for GTEx data, parameter: common_suffix = ".allpairs.txt"
parameter: common_suffix = ""

if len(cols) == 2:
    cols.append(0)

Pipeline details

HDF5 utilities

The HDF5 Python interface pytables is used to program the data format conversion workhorse.

In [2]:
# Generate utility functions
[convert_0, default_0]
depends: Py_Module('tables'), Py_Module('pandas>0.23.0')
report: expand = "${ }", output = '.sos/utils.py'
    import sys, os, re, copy
    import numpy as np, pandas as pd, tables as tb
    tb.parameters.MAX_GROUP_WIDTH = 51200
    # tb.parameters.NODE_CACHE_SLOTS = -51200
    # tb.parameters.METADATA_CACHE_SIZE = 1048576 * 100000
    # tb.parameters.CHUNK_CACHE_SIZE = 2097152 * 100000
    # tb.parameters.CHUNK_CACHE_NELMTS = 521
    import scipy.stats as st

    class Environment:
        def __init__(self):
            self.float = np.float32
            self.duplicate_tag = '_duplicated_'
            self.common_suffix = '${common_suffix}.h5'

    env = Environment()

    class TBData(dict):
        def __init__(self, data, name, msg = None, root = '/', complib = 'bzip2'):
            '''bzip2 may not be compatible with other hdf5 applications; but zlib is fine'''
            self.__root = root.strip('/')
            self.__group = name
            self.__msg = msg
            try:
                if type(data) is dict:
                    self.update(data)
                elif type(data) is str:
                    # is file name
                    self.__load(tb.open_file(data))
                else:
                    # is file stream
                    self.__load(data)
            except tb.exceptions.NoSuchNodeError:
                raise ValueError('Cannot find dataset {}!'.format(name))
            self.tb_filters = tb.Filters(complevel = 9, complib=complib)

        def sink(self, filename):
            with tb.open_file(filename, 'a') as f:
                if self.__root:
                    try:
                        f.create_group("/", self.__root)
                    except:
                        pass
                try:
                    # there is existing data -- have to merge with current data
                    # have to do this because the input file lines are not grouped by gene names!!
                    # use try ... except to hopefully faster than if ... else
                    # e.g., if not f.__contains__('/{}'.format(self.__group)) ... else ...
                    for element in f.list_nodes('/{}/{}'.format(self.__root, self.__group)):
                        if element.name != 'colnames':
                            self[element.name] = np.concatenate((element[:], self[element.name]))
                except tb.exceptions.NoSuchNodeError:
                    f.create_group("/" + self.__root, self.__group,
                                   self.__msg if self.__msg else self.__group)
                for key in self:
                    self.__store_array(key, f)
                f.flush()

        def dump(self, table, output = False):
            if output:
                pd.DataFrame({self['colnames'][i] : self[table][:,i] for i in range(len(self['colnames']))}, index = self['rownames']).to_csv(sys.stdout, na_rep = 'NA')
                return None
            else:
                return pd.DataFrame({self['colnames'][i] : self[table][:,i] for i in range(len(self['colnames']))}, index = self['rownames'])

        def __load(self, fstream):
            try:
                for element in fstream.list_nodes('/{}/{}'.format(self.__root, self.__group)):
                    self[element.name] = element[:]
                fstream.close()
            except:
                fstream.close()
                raise

        def __roll_back(self, group, name):
            try:
                n = getattr(group, name)
                n._f_remove()
            except AttributeError:
                pass

        def __store_array(self, name, fstream):
            if self.__root:
                element = getattr(getattr(fstream.root, self.__root), self.__group)
            else:
                element = getattr(fstream.root, self.__group)
            arr = self[name]
            if type(arr) is list:
                arr = np.array(arr)
            self.__roll_back(element, name)
            #
            if arr.shape != (0,):
                ds = fstream.create_carray(element, name, tb.Atom.from_dtype(arr.dtype), arr.shape,
                                           filters = self.tb_filters)
                ds[:] = arr

    def get_tb_grps(filenames, group_name = None):
        if len(filenames) == 0:
            return []
        if isinstance(filenames, str):
            filenames = [filenames]
        names = set()
        for filename in filenames:
            with tb.open_file(filename) as f:
                names.update([node._v_name for node in (f.root if group_name is None else getattr(f.root, '{}'.format(group_name)))])
        return sorted(names)

    def get_se(bhat, pval):
        if bhat < 0:
            z = st.norm.ppf(pval / 2)
        else:
            z = st.norm.ppf(1 - pval / 2)
        if z != 0:
            se = bhat/z
        else:
            se = np.nan
        return se

    class SSData:
        def __init__(self, header = False):
            self.data = {'buffer':{'data':[], 'rownames':[]}, 'output':{}}
            self.header = header
            self.previous_name = self.current_name = None
            self.min_line_len = max([${cols[0]}, ${cols[1]}, ${cols[2]}])

        def parse(self, line, ensg_version = 0, include = set()):
            # input line is snp, gene, beta, t, pval and optionally se(bhat)
            if not line:
                self.__reset()
                self.current_name = None
                return 1
            if isinstance(line, bytes):
                line = line.decode()
            line = line.strip().split()
            if self.header:
                # header line has to be accounted for
                self.header = False
                return 0
            #
            line[0] = line[0].strip('"')
            if not line[0] in include:
                return -1
            # the line is not long enough ... there is format issues
            if len(line) < self.min_line_len:
                return -1
            #
            if ensg_version == 0:
                line[0] = line[0].rsplit('.',1)[0]
            if self.previous_name is None:
                self.previous_name = line[0]
            self.current_name = line[0]
            if self.current_name != self.previous_name:
                self.__reset()
            if ${cols[2]} == 0:
                # the two numbers are bhat and p-value, we need to compute se(bhat)
                self.data['buffer']['data'].append([line[${cols[0]-1}], str(get_se(float(line[${cols[0]-1}]), float(line[${cols[1]-1}]))), line[${cols[1]-1}]])
            else:
                self.data['buffer']['data'].append([line[${cols[0]-1}], line[${cols[1]-1}], line[${cols[2]-1}]])
            self.data['buffer']['rownames'].append(self.__format_variant_id(line[1]))
            return 0
        
        @staticmethod
        def __format_variant_id(value):
            value = value.strip('"').lstrip('chr').split('_')
            if len(value) > 4:
                # keep it chr, pos, ref, alt
                value = value[:4]
            return '_'.join(value)

        def __reset(self):
            self.data['buffer']['data'] = np.array(self.data['buffer']['data'], dtype = env.float)
            self.data['buffer']['rownames'] = np.array(self.data['buffer']['rownames'])
            self.data['buffer']['colnames'] = np.array(['beta','se','pval'])
            self.data['output'] = copy.deepcopy(self.data['buffer'])
            self.data['buffer'] = {'data':[], 'rownames':[]}

        def dump(self):
            return self.data['output']

    class DataMerger(TBData):
        def __init__(self, files, name, msg = None):
            TBData.__init__(self, {}, name, msg, complib = "zlib")
            self.files = sorted(files)
            self.__group = name

        def merge(self):
            data = {}
            one_snp = None
            failure_ct = 0
            # Collect data
            for item in self.files:
                tissue = re.sub(r'{}$'.format(env.common_suffix), '', os.path.basename(item))
                try:
                    data[tissue] = TBData(item, self.__group)
                    if one_snp is None: one_snp = data[tissue]['rownames'][0]
                except ValueError:
                    data[tissue] = {'data' : np.array([[np.nan, np.nan, np.nan]]), 'rownames': None}
                    failure_ct += 1
                # Fix row name
                # Because in GTEx data file there are duplicated gene-snp pairs having different sumstats!!
                if data[tissue]['rownames'] is not None:
                    data[tissue]['rownames'] = self.__dedup(data[tissue]['rownames'], item)
            if failure_ct == len(self.files):
                return 1
            # Merge data
            for idx, item in enumerate(['beta','se','pval']):
                self[item] = pd.concat([pd.DataFrame(
                    {tissue : data[tissue]['data'][:,idx]},
                    index = data[tissue]['rownames'] if data[tissue]['rownames'] is not None else [one_snp]
                    ) for tissue in sorted(data.keys())], sort=True, axis = 1)
                if 'rownames' not in self:
                    self['rownames'] = np.array(self[item].index, dtype = str)
                if 'colnames' not in self:
                    self['colnames'] = np.array(self[item].columns.values.tolist(), dtype = str)
                self[item] = np.array(self[item].values, dtype = env.float)
            # np.savetxt(sys.stdout, self['pval'], fmt='%10.5f')
            # print(self['rownames'])
            # print(self['colnames'])
            return 0

        def __dedup(self, seq, filename):
            seen = {}
            dups = set()
            def __is_seen(x, seen):
                if x not in seen:
                    seen[x] = 0
                    return 0
                else:
                    seen[x] += 1
                    dups.add(x)
                    return 1
            # Tag them
            obs = [x if not __is_seen(x, seen) else '%s%s%s' % (x, env.duplicate_tag, seen[x]) for x in seq]
            # Log them
            if len(dups):
                filename = os.path.splitext(filename)[0]
                with open(filename + '.error', 'a') as f:
                    for item in dups:
                        f.write('{}:{} appeared {} times in {}\n'.\
                                format(self.__group, item, seen[item] + 1, filename))
            return obs

    def get_gs_pairs(data, name, num = (1, 9), method = 'equal_space'):
        '''choose gene-snp pairs from data, controlled by num = (a, b)
        for the best gene-snp pair (a = 0 or 1), and b other random 
        gene-snp pairs'''

        def random_sample(x, k):
            if k < 0:
                return sorted(x)
            return sorted(set(np.random.choice(x, min(len(x), k))))

        def equal_sample(x, k):
            if len(x) < k or k < 0:
                return sorted(x)
            f = lambda m, n: [i*n//m + n//(2*m) for i in range(m)]
            return sorted(set([x[i] for i in f(k, len(x))]))

        output = {'colnames' : data['colnames']}
        lp = data.dump('pval')
        shat = data.dump('se')
        # pval cannot be nan or zero (in fastqtl output, Shat nan has pval zero)
        lp = lp[np.all(np.isfinite(lp), axis=1) & np.all(np.isfinite(shat), axis=1)]
        #
        if lp.empty and num[0] > 0:
            return None
        # Find strong SNP-gene pair
        lp = -np.log10(lp)
        rowidx = np.where(data['rownames'] == lp.max(axis=1).idxmax())[0][0]
        if num[0] != 0:
            output['strong_rownames'] = ['%s_%s' % (name, data['rownames'][rowidx].decode())]
            output['strong'] = {}
            for k in ['beta', 'pval', 'se']:
                output['strong'][k] = data[k][rowidx, :]
        if num[1] != 0:
            all_randomidxes = [y for y, x in enumerate(data['rownames']) if x in lp.index and y != rowidx]
            sample_randomidxes = random_sample(all_randomidxes, num[1]) if method == 'random' else equal_sample(all_randomidxes, num[1])
            output['random_rownames'] = ['%s_%s' % (name, data['rownames'][x].decode()) for x in sample_randomidxes]
            output['random'] = {}
            for k in ['beta', 'pval', 'se']:
                output['random'][k] = data[k][sample_randomidxes, :]
        if not 'strong' in output and not 'random' in output:
            output = None
        return output

    def merge_tmp_h5(output, verbose = 0):
        from glob import glob
        tmpfiles = list(glob(output + "_*.tmp"))
        if os.path.isfile(output+'.h5'):
            os.remove(output+'.h5')
        for item in sorted(tmpfiles):
            for name in get_tb_grps(item):
                cmd = 'h5copy -i {0} -o {2} -s "/{1}" -d "/{1}"'.format(item, name, output+'.h5')
                if verbose:
                    print(cmd)
                os.system(cmd)

Convert to HDF5 format

Per study (tissue) conversion

For per study conversion I use bzip2 compression method and float32 to achieve higher compression rate. This workflow step will generate one HDF5 file per summary statistics file. h5copy program can be installed via conda install -c conda-forge hdf5.

In [3]:
# Convert summary stats gzip format to HDF5
[convert_1, default_1]
depends: executable("h5copy")
fail_if(not data_list.is_file(), msg = 'Need data list file!')
data_files = set(get_output(f"awk '{{print $1}}' {data_list:e}").strip().split('\n'))
fail_if(len(data_files) == 0, msg = 'Need input data files!')
import os
input: [f'{data_list:d}/{x}' if os.path.isfile(f'{data_list:d}/{x}') else x for x in data_files], group_by = 1
output: f'{cwd:a}/{_input:bn}.h5'
task: trunk_workers = 1, walltime = '36h', trunk_size = 1, mem = '4G', cores = 1, tags = f'{_output:bn}'

bash: expand = True, workdir = cwd
    rm -f {_output:n}_*.tmp
    
python: expand = "${ }", input = '.sos/utils.py'
    import warnings
    import gzip
    gene_names = set([x.strip() for x in open(${gene_list:r}).readlines() if x.strip()] if ${gene_list.is_file()} else [])

    ssp = SSData(header = True)
    group_counts = 0
    with gzip.open(${_input:r}) as f:
        while True:
            line = f.readline()
            quit = ssp.parse(line, ${keep_ensg_version}, include = gene_names)
            if quit == -1:
                continue
            if ssp.current_name != ssp.previous_name:
                group_counts += 1
                with warnings.catch_warnings():
                    warnings.filterwarnings("ignore", category = tb.FlavorWarning)
                    data = TBData(ssp.dump(), ssp.previous_name, "${msg}")
                    data.sink("${_output:n}_%i.tmp" % (np.ceil(group_counts / ${maxsize})) 
                                if ${maxsize} > 0 else ${_output:r})
                ssp.previous_name = ssp.current_name
            if quit:
                break
    if ${maxsize} > 0:
        merge_tmp_h5(${_output:nr})

bash: expand = True, workdir = cwd
    rm -f {_output:n}_*.tmp

Merge per study (tissue) HDF5 to one HDF5

This step creates tables for each summary statistic per gene from multiple studies. Rows are effects (SNPs), columns are conditions (tissue names). This time I use zlib compression for better compatibility with other HDF5 routines (eg rhdf5).

In [4]:
# Merge single study data to multivariate data
[convert_2, default_2]
depends: executable("h5copy")
input: group_by = 'all'
output: f'{cwd:a}/{data_list:bn}.h5'
task: trunk_workers = 1, walltime = '36h', trunk_size = 1, mem = '4G', cores = 1, tags = f'{_output:bn}'

bash: expand = True, workdir = cwd
    rm -f {_output:n}_*.tmp

python: expand = '${ }', input = '.sos/utils.py'
    import warnings
    if ${gene_list.is_file()}:
        gene_names = [x.strip() for x in open(${gene_list:r}).readlines() if x.strip()]
    else:
        gene_names = get_tb_grps([${_input:r,}])
    if ${keep_ensg_version} == 0:
        gene_names = [os.path.splitext(x)[0] for x in gene_names]
    failure_ct = 0
    for idx, item in enumerate(gene_names):
        ssm = DataMerger([${_input:r,}], item, "${msg}")
        with warnings.catch_warnings():
            warnings.filterwarnings("ignore", category = tb.FlavorWarning)
            if ssm.merge() == 0:
                ssm.sink("${_output:n}_%i.tmp" % (np.ceil((idx + 1.0) / ${maxsize})) 
                        if ${maxsize} > 0 else ${_output:r})
            else:
                failure_ct += 1
    with open("${_output:n}.log", 'w') as f:
        f.write("%s out of %s groups merged!\n" % (len(gene_names) - failure_ct, len(gene_names)))
    if ${maxsize} > 0:
        merge_tmp_h5(${_output:nr})
            
bash: expand = True, workdir = cwd
    rm -f {_output:n}_*.tmp

Extract data for MASH model

We need to extract the "top" signals based on single tissue analysis to compute MASH priors, as well as some random SNP sets to fit MASH mixture model.

In [5]:
# Extract data to fit MASH model
[default_3]
# Number of random SNPs to draw per gene for fitting MASH mixture
# Use -1 to export all
parameter: random_per_gene = 9
# Use 1 to extract strongest gene-snp pair
# Use 0 to avoid extracting the info
parameter: best_per_gene = 1
output: f"{_input:n}.portable.h5"
task: trunk_workers = 1, walltime = '36h', trunk_size = 1, mem = '4G', cores = 1, tags = f'{_output:bn}'

python: expand = "${ }", input = '.sos/utils.py'
    import warnings
    if ${gene_list.is_file()}:
        gene_names = [x.strip() for x in open(${gene_list:r}).readlines() if x.strip()]
    else:
        gene_names = get_tb_grps([${_input:r,}])
    if ${keep_ensg_version} == 0:
        gene_names = [os.path.splitext(x)[0] for x in gene_names]
    output = dict()
    output['random'] = {'colnames': None, 'rownames': [], 'beta': None, 'se': None, 'pval': None}
    output['strong'] = {'colnames': None, 'rownames': [], 'beta': None, 'se': None, 'pval': None}
    failure_ct = 0
    for idx, name in enumerate(gene_names):
        # extract the best gene-snp pair or some random gene-snp pairs

        try:
            data = TBData(${_input:r}, name)
            res = get_gs_pairs(data, name, (${best_per_gene}, ${random_per_gene}))
        except ValueError:
            res = None
        #
        if res is None:
            failure_ct += 1
            continue
        for k in output:
            if not k in res:
                continue
            for kk in output[k]:
                if kk == 'rownames':
                    if output[k]['rownames'] is None:
                        output[k]['rownames'] = res["{}_rownames".format(k)]
                    else:
                        output[k]['rownames'].extend(res["{}_rownames".format(k)])
                elif kk == 'colnames':
                    if output[k]['colnames'] is None:
                        output[k]['colnames'] = res['colnames']
                else:
                    output[k][kk] = np.vstack((output[k][kk], res[k][kk])) if output[k][kk] is not None else res[k][kk]
    #
    if failure_ct < len(gene_names):
        with warnings.catch_warnings():
            warnings.filterwarnings("ignore", category = tb.FlavorWarning)
            for k in output:
                TBData(dict(output[k]), k, msg = "%s, %s gene-snp pair" % ("${msg}", k), complib = 'zlib').sink(${_output:r})
    with open("${_output:n}.log", 'w') as f:
        f.write("%s out of %s groups extracted!\n" % (len(gene_names) - failure_ct, len(gene_names)))

Compute Z score and save to RDS

Here I also implemented two command options to subset data:

  • --effects-list: a list of effect names (row names) to include from output MASH data-set. This option was used in MASH paper to remove SNPs in LD with each other.
  • --conditions-list: a list of conditions (column names) to include in output MASH data-set. Conditions not in this list are excluded from output. This option was used in MASH paper to focus analysis on brain / no-brain tissues.
In [6]:
# Subset and split data, generate Z-score and save to RDS
[default_4]
# Size of mash random SNPs. 
# This specifies the size of `random` SNP set 
# and the rest of random SNPs go into `random_test` SNP set.
# if input is float it means half random SNPs
# the input can also be a integer to use specified number.
parameter: random_snp_size = 0.5
# A list of effect names (SNP names) to include in mash analysis.
parameter: effects_list = path('NULL')
# A list of condition names (tissue names) to include in mash analysis
parameter: conditions_list = path('NULL')

depends: R_library('rhdf5')
output: f"{_input:n}.mash.rds" if not str(_input).endswith('.portable.h5') else f"{_input:nn}.mash.rds"

task: trunk_workers = 1, walltime = '36h', trunk_size = 1, mem = '4G', cores = 1, tags = f'{_output:bn}'

R: expand = "${ }"
    ConvertP2Z <- function(pval, beta) {
      z <- abs(qnorm(pval / 2))
      z[which(beta < 0)] <- -1 * z[which(beta < 0)]
      return(z)
    }

    GetSS <- function(table, db) {
      dat <- rhdf5::h5read(db, table)
      dat$"z" <- ConvertP2Z(dat$"pval", dat$"beta")
      for (name in c("beta", "se", "pval", "z")) {
        dat[[name]] <- as.matrix(t(dat[[name]]))
        colnames(dat[[name]]) <- dat$colnames
        rownames(dat[[name]]) <- dat$rownames
      }
      dat$colnames <- dat$rownames <- NULL
      ## fastqtl tend to produce nan for betahat zero
      ## here I set it to zero as well
      dat[['se']][which(is.nan(dat[['se']]))] = 0
      return(dat)
    }
  
    SplitTrainTest <- function(dat, table) {
        # load data
        strong = dat$strong[[table]]
        random = dat$random[[table]]
        # select rows to keep
        num_train = ${random_snp_size}
        if (num_train < 1)
          num_train = nrow(random) * num_train
        num_train = as.integer(num_train)
        if (num_train > nrow(random)) {
            num_train = nrow(random)
        }
        train = random[1:num_train,]
        if (num_train == nrow(random)) {
            test = NULL
        } else {
            test = random[(num_train+1):nrow(random),]
        }
        if (${effects_list:r} != 'NULL') {
            pout = scan(${effects_list:ar}, what="character", sep=NULL)
            strong = strong[(rownames(strong) %in% pout),]
            train = train[(rownames(train) %in% pout),]
            if (!is.null(test))
                test = test[(rownames(test) %in% pout),]                                         
        }
        if (${conditions_list:r} != 'NULL') {
            rout = scan(${conditions_list:ar}, what="character", sep=NULL)
            strong = strong[,(colnames(strong) %in% rout),drop=F]
            train = train[,(colnames(train) %in% rout),drop=F]
            if (!is.null(test))
                test = test[,(colnames(test) %in% rout),drop=F]
        } 

        return(list(random = train,
               random.test = test,
               strong = strong))
    }
      
    SS_data = list(strong = GetSS('strong', ${_input:r}), random = GetSS('random', ${_input:r}))
    ztable = SplitTrainTest(SS_data, "z")
    btable = SplitTrainTest(SS_data, "beta")
    stable = SplitTrainTest(SS_data, "se")
    # save output
    saveRDS(list(random.z = ztable$random,
                 random.test.z = ztable$random.test,
                 strong.z = ztable$strong,
                 random.b = btable$random,
                 random.test.b = btable$random.test,
                 strong.b = btable$strong,
                 random.s = stable$random,
                 random.test.s = stable$random.test,
                 strong.s = stable$strong), ${_output:r})

Export workflow documentation to HTML format

sos run workflow/fastqtl_to_mash document_it

will export the narrative in this workflow notebook to HTML format as a documentation file.

In [ ]:
# Export workflow to HTML document
[document_it]
input: [item for item in paths(sys.argv) if item.suffix == '.ipynb'], group_by = 1
output: [f'{cwd:a}/{item:bn}.html' for item in paths(sys.argv) if item.suffix == '.ipynb'], group_by = 2
bash: expand = True, stderr = False
  sos convert {_input} {_output} --template sos-report-toc

© 2018 Gao Wang, University of Chicago

Exported from analysis/fastqtl_to_mash.ipynb committed by Gao Wang on Tue Feb 2 19:11:23 2021 revision 1, c5fe213