GTEx V8 Multivariate Analysis

Profile and improve data conversion pipeline

The data conversion pipeline was initially written in 2015 to deal with GTEx V6 summary statistics data processing for cis-eQTL mapping. But when working with sQTL data of 10 times large the line counts, it is no longer suitable.

There are a few things one might be able to do but I think the easiest is to profile the code and see if there are some cheap improvement to be made; because I do suspect a couple of places the code is slow.

The test data

I get a small test file:

[MW] zcat allpair-at-intron-level.Testis.txt.gz | head -100000 | gzip > test.txt.gz
[MW] ll test.txt.gz 
-rw-rw-r-- 1 gaow pi-mstephens 2.4M Sep 28 12:27 test.txt.gz

then the gene list, gtex_v8_sqtl_eur_only.genes_list for 19K gene names.

The code to profile

I copy codes from the pipeline and put it here.

In [1]:
%%writefile test.py
cols = [7,6,8]
import numpy as np
import copy

class SSData:
    def __init__(self, header = False):
        self.data = {'buffer':{'data':[], 'rownames':[]}, 'output':{}}
        self.header = header
        self.previous_name = self.current_name = None
        self.count = -1

    @profile
    def parse(self, line, ensg_version = 0, include = None):
        # input line is snp, gene, beta, t, pval and optionally se(bhat)
        if not line:
            self.__reset()
            self.current_name = None
            return 1
        if isinstance(line, bytes):
            line = line.decode()
        line = line.strip().split()
        self.count += 1
        if self.header and self.count == 0:
            return 0
        # the line is not long enough ... there is format issues
        if len(line) < max([cols[0], cols[1], cols[2]]):
            return -1
        #
        line[0] = line[0].strip('"')
        if ensg_version == 0:
            line[0] = line[0].split('.')[0]
        #
        if include is not None and line[0] not in include:
            return -1
        if self.previous_name is None:
            self.previous_name = line[0]
        self.current_name = line[0]
        if self.current_name != self.previous_name:
            self.__reset()
        if cols[2] == 0:
            # the two numbers are bhat and p-value, we need to compute se(bhat)
            self.data['buffer']['data'].append([line[cols[0]-1], str(get_se(float(line[cols[0]-1]), float(line[cols[1]-1]))), line[cols[1]-1]])
        else:
            self.data['buffer']['data'].append([line[cols[0]-1], line[cols[1]-1], line[cols[2]-1]])
        self.data['buffer']['rownames'].append(self.__format_variant_id(line[1]))
        return 0

    @staticmethod
    def __format_variant_id(value):
        value = value.strip('"').split('_')
        if len(value) > 4:
            # keep it chr, pos, ref, alt
            value = value[:4]
        if value[0].startswith('chr'):
            value[0] = value[0][3:]
        return '_'.join(value)

    def __reset(self):
        self.data['buffer']['data'] = np.array(self.data['buffer']['data'], dtype = float)
        self.data['buffer']['rownames'] = np.array(self.data['buffer']['rownames'])
        self.data['buffer']['colnames'] = np.array(['beta','se','pval'])
        self.data['output'] = copy.deepcopy(self.data['buffer'])
        self.data['buffer'] = {'data':[], 'rownames':[]}

    def dump(self):
        return self.data['output']

@profile
def test(sumstat_file, gene_list, keep_ensg_version = 0):
    import warnings
    import gzip, os
    gene_names = [x.strip() for x in open(gene_list).readlines() if x.strip()]
    if keep_ensg_version == 0 and gene_names is not None:
        gene_names = [os.path.splitext(x)[0] for x in gene_names]

    ssp = SSData(header = True)
    group_counts = 0
    with gzip.open(sumstat_file) as f:
        while True:
            line = f.readline()
            quit = ssp.parse(line, keep_ensg_version, include = gene_names)
            if quit == -1:
                continue
            if ssp.current_name != ssp.previous_name:
                group_counts += 1
                ssp.previous_name = ssp.current_name
            if quit:
                break
test('test.txt.gz', 'gtex_v8_sqtl_eur_only.genes_list')
Writing test.py
In [2]:
%%bash
kernprof -l test.py
Wrote profile results to test.py.lprof
In [3]:
%%bash
python -m line_profiler test.py.lprof
Timer unit: 1e-06 s

Total time: 31.5782 s
File: test.py
Function: parse at line 12

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
    12                                               @profile
    13                                               def parse(self, line, ensg_version = 0, include = None):
    14                                                   # input line is snp, gene, beta, t, pval and optionally se(bhat)
    15    100001      64018.0      0.6      0.2          if not line:
    16         1       3179.0   3179.0      0.0              self.__reset()
    17         1          2.0      2.0      0.0              self.current_name = None
    18         1          0.0      0.0      0.0              return 1
    19    100000      82349.0      0.8      0.3          if isinstance(line, bytes):
    20    100000      95067.0      1.0      0.3              line = line.decode()
    21    100000     150807.0      1.5      0.5          line = line.strip().split()
    22    100000      92677.0      0.9      0.3          self.count += 1
    23    100000      73105.0      0.7      0.2          if self.header and self.count == 0:
    24         1          1.0      1.0      0.0              return 0
    25                                                   # the line is not long enough ... there is format issues
    26     99999     151929.0      1.5      0.5          if len(line) < max([cols[0], cols[1], cols[2]]):
    27                                                       return -1
    28                                                   #
    29     99999      97352.0      1.0      0.3          line[0] = line[0].strip('"')
    30     99999      62109.0      0.6      0.2          if ensg_version == 0:
    31     99999     100999.0      1.0      0.3              line[0] = line[0].split('.')[0]
    32                                                   #
    33     99999   30469844.0    304.7     96.5          if include is not None and line[0] not in include:
    34     94391      68591.0      0.7      0.2              return -1
    35      5608       4894.0      0.9      0.0          if self.previous_name is None:
    36         1          0.0      0.0      0.0              self.previous_name = line[0]
    37      5608       4865.0      0.9      0.0          self.current_name = line[0]
    38      5608       4160.0      0.7      0.0          if self.current_name != self.previous_name:
    39         3       9229.0   3076.3      0.0              self.__reset()
    40      5608       4320.0      0.8      0.0          if cols[2] == 0:
    41                                                       # the two numbers are bhat and p-value, we need to compute se(bhat)
    42                                                       self.data['buffer']['data'].append([line[cols[0]-1], str(get_se(float(line[cols[0]-1]), float(line[cols[1]-1]))), line[cols[1]-1]])
    43                                                   else:
    44      5608       8565.0      1.5      0.0              self.data['buffer']['data'].append([line[cols[0]-1], line[cols[1]-1], line[cols[2]-1]])
    45      5608      26557.0      4.7      0.1          self.data['buffer']['rownames'].append(self.__format_variant_id(line[1]))
    46      5608       3621.0      0.6      0.0          return 0

Total time: 33.5308 s
File: test.py
Function: test at line 68

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
    68                                           @profile
    69                                           def test(sumstat_file, gene_list, keep_ensg_version = 0):
    70         1          5.0      5.0      0.0      import warnings
    71         1       1835.0   1835.0      0.0      import gzip, os
    72         1      16813.0  16813.0      0.1      gene_names = [x.strip() for x in open(gene_list).readlines() if x.strip()]
    73         1          2.0      2.0      0.0      if keep_ensg_version == 0 and gene_names is not None:
    74         1      90417.0  90417.0      0.3          gene_names = [os.path.splitext(x)[0] for x in gene_names]
    75                                           
    76         1         10.0     10.0      0.0      ssp = SSData(header = True)
    77         1          2.0      2.0      0.0      group_counts = 0
    78         1         99.0     99.0      0.0      with gzip.open(sumstat_file) as f:
    79         1          2.0      2.0      0.0          while True:
    80    100001     499124.0      5.0      1.5              line = f.readline()
    81    100001   32750353.0    327.5     97.7              quit = ssp.parse(line, keep_ensg_version, include = gene_names)
    82    100001      89054.0      0.9      0.3              if quit == -1:
    83     94391      73004.0      0.8      0.2                  continue
    84      5610       5480.0      1.0      0.0              if ssp.current_name != ssp.previous_name:
    85         4          4.0      1.0      0.0                  group_counts += 1
    86         4          6.0      1.5      0.0                  ssp.previous_name = ssp.current_name
    87      5610       4553.0      0.8      0.0              if quit:
    88         1         30.0     30.0      0.0                  break

From results above we see ssp.parse line is 97% the computation, but as suspected, 96.5% computation were spent on checking line[0] not in include; that is the line alone is 94% computational time.

Now according to this post it seems I should use set or bisect. Here is the new code using set:

In [4]:
%%writefile test2.py
cols = [7,6,8]
import numpy as np
import copy, os

class SSData:
    def __init__(self, header = False):
        self.data = {'buffer':{'data':[], 'rownames':[]}, 'output':{}}
        self.header = header
        self.previous_name = self.current_name = None
        self.ml = max([cols[0], cols[1], cols[2]])

    @profile
    def parse(self, line, ensg_version = 0, include = set()):
        # input line is snp, gene, beta, t, pval and optionally se(bhat)
        if not line:
            self.__reset()
            self.current_name = None
            return 1
        if isinstance(line, bytes):
            line = line.decode()
        line = line.split()
        if self.header:
            self.header = False
            return 0
        #
        line[0] = line[0].strip('"')
        if not line[0] in include:
            return -1
        if ensg_version == 0:
            line[0] = line[0].rsplit('.',1)[0]
        #

        # the line is not long enough ... there is format issues
        if len(line) < self.ml:
            return -1
        #
        if self.previous_name is None:
            self.previous_name = line[0]
        self.current_name = line[0]
        if self.current_name != self.previous_name:
            self.__reset()
        if cols[2] == 0:
            # the two numbers are bhat and p-value, we need to compute se(bhat)
            self.data['buffer']['data'].append([line[cols[0]-1], str(get_se(float(line[cols[0]-1]), float(line[cols[1]-1]))), line[cols[1]-1]])
        else:
            self.data['buffer']['data'].append([line[cols[0]-1], line[cols[1]-1], line[cols[2]-1]])
        self.data['buffer']['rownames'].append(self.__format_variant_id(line[1]))
        return 0

    @staticmethod
    def __format_variant_id(value):
        value = value.strip('"').lstrip('chr').split('_')
        if len(value) > 4:
            # keep it chr, pos, ref, alt
            value = value[:4]
        return '_'.join(value)

    def __reset(self):
        self.data['buffer']['data'] = np.array(self.data['buffer']['data'], dtype = float)
        self.data['buffer']['rownames'] = np.array(self.data['buffer']['rownames'])
        self.data['buffer']['colnames'] = np.array(['beta','se','pval'])
        self.data['output'] = copy.deepcopy(self.data['buffer'])
        self.data['buffer'] = {'data':[], 'rownames':[]}

    def dump(self):
        return self.data['output']

@profile
def test(sumstat_file, gene_list, keep_ensg_version = 0):
    import warnings
    import gzip
    gene_names = set([x.strip() for x in open(gene_list).readlines() if x.strip()])
    ssp = SSData(header = True)
    group_counts = 0
    with gzip.open(sumstat_file) as f:
        while True:
            line = f.readline()
            quit = ssp.parse(line, keep_ensg_version, include = gene_names)
            if quit == -1:
                continue
            if ssp.current_name != ssp.previous_name:
                group_counts += 1
                ssp.previous_name = ssp.current_name
            if quit:
                break
test('test.txt.gz', 'gtex_v8_sqtl_eur_only.genes_list')
Overwriting test2.py
In [5]:
%%bash
kernprof -l test2.py
python -m line_profiler test2.py.lprof
Wrote profile results to test2.py.lprof
Timer unit: 1e-06 s

Total time: 0.721887 s
File: test2.py
Function: parse at line 12

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
    12                                               @profile
    13                                               def parse(self, line, ensg_version = 0, include = set()):
    14                                                   # input line is snp, gene, beta, t, pval and optionally se(bhat)
    15    100001      63919.0      0.6      8.9          if not line:
    16         1       2636.0   2636.0      0.4              self.__reset()
    17         1          2.0      2.0      0.0              self.current_name = None
    18         1          1.0      1.0      0.0              return 1
    19    100000      80667.0      0.8     11.2          if isinstance(line, bytes):
    20    100000      87354.0      0.9     12.1              line = line.decode()
    21    100000     122029.0      1.2     16.9          line = line.split()
    22    100000      67378.0      0.7      9.3          if self.header:
    23         1          1.0      1.0      0.0              self.header = False
    24         1          1.0      1.0      0.0              return 0
    25                                                   #
    26     99999      98355.0      1.0     13.6          line[0] = line[0].strip('"')
    27     99999      73386.0      0.7     10.2          if not line[0] in include:
    28     94391      55688.0      0.6      7.7              return -1
    29      5608       3706.0      0.7      0.5          if ensg_version == 0:
    30      5608       5900.0      1.1      0.8              line[0] = line[0].rsplit('.',1)[0]
    31                                                   #
    32                                           
    33                                                   # the line is not long enough ... there is format issues
    34      5608       4607.0      0.8      0.6          if len(line) < self.ml:
    35                                                       return -1
    36                                                   #
    37      5608       3825.0      0.7      0.5          if self.previous_name is None:
    38         1          1.0      1.0      0.0              self.previous_name = line[0]
    39      5608       4254.0      0.8      0.6          self.current_name = line[0]
    40      5608       4406.0      0.8      0.6          if self.current_name != self.previous_name:
    41         3       8749.0   2916.3      1.2              self.__reset()
    42      5608       4105.0      0.7      0.6          if cols[2] == 0:
    43                                                       # the two numbers are bhat and p-value, we need to compute se(bhat)
    44                                                       self.data['buffer']['data'].append([line[cols[0]-1], str(get_se(float(line[cols[0]-1]), float(line[cols[1]-1]))), line[cols[1]-1]])
    45                                                   else:
    46      5608       7613.0      1.4      1.1              self.data['buffer']['data'].append([line[cols[0]-1], line[cols[1]-1], line[cols[2]-1]])
    47      5608      19907.0      3.5      2.8          self.data['buffer']['rownames'].append(self.__format_variant_id(line[1]))
    48      5608       3397.0      0.6      0.5          return 0

Total time: 2.19414 s
File: test2.py
Function: test at line 68

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
    68                                           @profile
    69                                           def test(sumstat_file, gene_list, keep_ensg_version = 0):
    70         1          4.0      4.0      0.0      import warnings
    71         1       1387.0   1387.0      0.1      import gzip
    72         1      18989.0  18989.0      0.9      gene_names = set([x.strip() for x in open(gene_list).readlines() if x.strip()])
    73         1         15.0     15.0      0.0      ssp = SSData(header = True)
    74         1          1.0      1.0      0.0      group_counts = 0
    75         1         93.0     93.0      0.0      with gzip.open(sumstat_file) as f:
    76         1          2.0      2.0      0.0          while True:
    77    100001     403364.0      4.0     18.4              line = f.readline()
    78    100001    1622132.0     16.2     73.9              quit = ssp.parse(line, keep_ensg_version, include = gene_names)
    79    100001      77187.0      0.8      3.5              if quit == -1:
    80     94391      62053.0      0.7      2.8                  continue
    81      5610       4992.0      0.9      0.2              if ssp.current_name != ssp.previous_name:
    82         4          2.0      0.5      0.0                  group_counts += 1
    83         4          4.0      1.0      0.0                  ssp.previous_name = ssp.current_name
    84      5610       3894.0      0.7      0.2              if quit:
    85         1         22.0     22.0      0.0                  break

Now that line is 73.9% * 10.2% = 7% computation. Total time to process the test file reduces from 30 sec to 2 sec.

Update based on the profile i tried to make some small changes in some other places above but there is nothing obvious left to improve. Below shows the difference before and after:

In [ ]:
%%bash
diff test.py test2.py

© 2018 Gao Wang, University of Chicago

Exported from analysis/profile_data_conversion.ipynb committed by Gao Wang on Tue Feb 2 19:25:48 2021 revision 2, 4e7b5da