The data conversion pipeline was initially written in 2015 to deal with GTEx V6 summary statistics data processing for cis-eQTL mapping. But when working with sQTL data of 10 times large the line counts, it is no longer suitable.
There are a few things one might be able to do but I think the easiest is to profile the code and see if there are some cheap improvement to be made; because I do suspect a couple of places the code is slow.
I get a small test file:
[MW] zcat allpair-at-intron-level.Testis.txt.gz | head -100000 | gzip > test.txt.gz
[MW] ll test.txt.gz
-rw-rw-r-- 1 gaow pi-mstephens 2.4M Sep 28 12:27 test.txt.gz
then the gene list, gtex_v8_sqtl_eur_only.genes_list
for 19K gene names.
I copy codes from the pipeline and put it here.
%%writefile test.py
cols = [7,6,8]
import numpy as np
import copy
class SSData:
def __init__(self, header = False):
self.data = {'buffer':{'data':[], 'rownames':[]}, 'output':{}}
self.header = header
self.previous_name = self.current_name = None
self.count = -1
@profile
def parse(self, line, ensg_version = 0, include = None):
# input line is snp, gene, beta, t, pval and optionally se(bhat)
if not line:
self.__reset()
self.current_name = None
return 1
if isinstance(line, bytes):
line = line.decode()
line = line.strip().split()
self.count += 1
if self.header and self.count == 0:
return 0
# the line is not long enough ... there is format issues
if len(line) < max([cols[0], cols[1], cols[2]]):
return -1
#
line[0] = line[0].strip('"')
if ensg_version == 0:
line[0] = line[0].split('.')[0]
#
if include is not None and line[0] not in include:
return -1
if self.previous_name is None:
self.previous_name = line[0]
self.current_name = line[0]
if self.current_name != self.previous_name:
self.__reset()
if cols[2] == 0:
# the two numbers are bhat and p-value, we need to compute se(bhat)
self.data['buffer']['data'].append([line[cols[0]-1], str(get_se(float(line[cols[0]-1]), float(line[cols[1]-1]))), line[cols[1]-1]])
else:
self.data['buffer']['data'].append([line[cols[0]-1], line[cols[1]-1], line[cols[2]-1]])
self.data['buffer']['rownames'].append(self.__format_variant_id(line[1]))
return 0
@staticmethod
def __format_variant_id(value):
value = value.strip('"').split('_')
if len(value) > 4:
# keep it chr, pos, ref, alt
value = value[:4]
if value[0].startswith('chr'):
value[0] = value[0][3:]
return '_'.join(value)
def __reset(self):
self.data['buffer']['data'] = np.array(self.data['buffer']['data'], dtype = float)
self.data['buffer']['rownames'] = np.array(self.data['buffer']['rownames'])
self.data['buffer']['colnames'] = np.array(['beta','se','pval'])
self.data['output'] = copy.deepcopy(self.data['buffer'])
self.data['buffer'] = {'data':[], 'rownames':[]}
def dump(self):
return self.data['output']
@profile
def test(sumstat_file, gene_list, keep_ensg_version = 0):
import warnings
import gzip, os
gene_names = [x.strip() for x in open(gene_list).readlines() if x.strip()]
if keep_ensg_version == 0 and gene_names is not None:
gene_names = [os.path.splitext(x)[0] for x in gene_names]
ssp = SSData(header = True)
group_counts = 0
with gzip.open(sumstat_file) as f:
while True:
line = f.readline()
quit = ssp.parse(line, keep_ensg_version, include = gene_names)
if quit == -1:
continue
if ssp.current_name != ssp.previous_name:
group_counts += 1
ssp.previous_name = ssp.current_name
if quit:
break
test('test.txt.gz', 'gtex_v8_sqtl_eur_only.genes_list')
%%bash
kernprof -l test.py
%%bash
python -m line_profiler test.py.lprof
From results above we see ssp.parse
line is 97% the computation, but as suspected, 96.5% computation were spent on checking line[0] not in include
; that is the line alone is 94% computational time.
Now according to this post it seems I should use set
or bisect
. Here is the new code using set
:
%%writefile test2.py
cols = [7,6,8]
import numpy as np
import copy, os
class SSData:
def __init__(self, header = False):
self.data = {'buffer':{'data':[], 'rownames':[]}, 'output':{}}
self.header = header
self.previous_name = self.current_name = None
self.ml = max([cols[0], cols[1], cols[2]])
@profile
def parse(self, line, ensg_version = 0, include = set()):
# input line is snp, gene, beta, t, pval and optionally se(bhat)
if not line:
self.__reset()
self.current_name = None
return 1
if isinstance(line, bytes):
line = line.decode()
line = line.split()
if self.header:
self.header = False
return 0
#
line[0] = line[0].strip('"')
if not line[0] in include:
return -1
if ensg_version == 0:
line[0] = line[0].rsplit('.',1)[0]
#
# the line is not long enough ... there is format issues
if len(line) < self.ml:
return -1
#
if self.previous_name is None:
self.previous_name = line[0]
self.current_name = line[0]
if self.current_name != self.previous_name:
self.__reset()
if cols[2] == 0:
# the two numbers are bhat and p-value, we need to compute se(bhat)
self.data['buffer']['data'].append([line[cols[0]-1], str(get_se(float(line[cols[0]-1]), float(line[cols[1]-1]))), line[cols[1]-1]])
else:
self.data['buffer']['data'].append([line[cols[0]-1], line[cols[1]-1], line[cols[2]-1]])
self.data['buffer']['rownames'].append(self.__format_variant_id(line[1]))
return 0
@staticmethod
def __format_variant_id(value):
value = value.strip('"').lstrip('chr').split('_')
if len(value) > 4:
# keep it chr, pos, ref, alt
value = value[:4]
return '_'.join(value)
def __reset(self):
self.data['buffer']['data'] = np.array(self.data['buffer']['data'], dtype = float)
self.data['buffer']['rownames'] = np.array(self.data['buffer']['rownames'])
self.data['buffer']['colnames'] = np.array(['beta','se','pval'])
self.data['output'] = copy.deepcopy(self.data['buffer'])
self.data['buffer'] = {'data':[], 'rownames':[]}
def dump(self):
return self.data['output']
@profile
def test(sumstat_file, gene_list, keep_ensg_version = 0):
import warnings
import gzip
gene_names = set([x.strip() for x in open(gene_list).readlines() if x.strip()])
ssp = SSData(header = True)
group_counts = 0
with gzip.open(sumstat_file) as f:
while True:
line = f.readline()
quit = ssp.parse(line, keep_ensg_version, include = gene_names)
if quit == -1:
continue
if ssp.current_name != ssp.previous_name:
group_counts += 1
ssp.previous_name = ssp.current_name
if quit:
break
test('test.txt.gz', 'gtex_v8_sqtl_eur_only.genes_list')
%%bash
kernprof -l test2.py
python -m line_profiler test2.py.lprof
Now that line is 73.9% * 10.2% = 7% computation. Total time to process the test file reduces from 30 sec to 2 sec.
Update based on the profile i tried to make some small changes in some other places above but there is nothing obvious left to improve. Below shows the difference before and after:
%%bash
diff test.py test2.py
Exported from analysis/profile_data_conversion.ipynb
committed by Gao Wang on Tue Feb 2 19:25:48 2021 revision 2, 4e7b5da