写在前面
首先,此工具关于比对算法源码部分来自https://zhuanlan.zhihu.com/p/78027062,感谢知乎作者 我不是大熊 的开源分享。
本人对算法的理解比较浅薄(上班也一直用不上......学校学的点啥都不剩了),此工具只是对其源码的简单修改与封装,想要详细了解算法原理,可移步该作者的知乎专栏。
处理情景
工作中,有时会出现这种尴尬的情况,已有Fastq文件,即使已经知道了扩展区域,却能确定是所用的是啥引物(同一区域引物种类很多),有时甚至不知道是什么区域。客户只有一句话:反正是在你们公司测的序.....
公司测序所有的引物加起来上百种,肉眼看?去除引物慢慢试?整合所有引物信息后直接拿来与想知道引物的Fastq文件比对显然直接有效的多。
工具源码
import sys
import gzip
degenerate = {
'A': ["A"],
'T': ["T"],
'C': ["C"],
'G': ["G"],
'R': ["A", "G"],
'Y': ["C", "T"],
'M': ["A", "C"],
'K': ["G", "T"],
'S': ["G", "C"],
'W': ["A", "T"],
'H': ["A", "T", "C"],
'B': ["G", "T", "C"],
'V': ["G", "A", "C"],
'D': ["G", "A", "T"],
'N': ["A", "T", "C", "G"]
}
def theta(a, b): # degenerate bases in b
if a == '-' or b == '-' : # gap or mismatch
return -1
elif not degenerate.get(b): # invalid character
return -2
elif a not in degenerate.get(b): # not match
return -2
elif a in degenerate.get(b): # match
return 2
def make_score_matrix(seq1, seq2):
"""
return score matrix and map(each score from which direction)
0: diagnosis
1: up
2: left
"""
seq1 = '-' + seq1
seq2 = '-' + seq2
score_mat = {}
trace_mat = {}
max_score=0
for i,p in enumerate(seq1):
score_mat[i] = {}
trace_mat[i] = {}
for j,q in enumerate(seq2):
if i == 0: # first row, gap in seq1
score_mat[i][j] = -j
trace_mat[i][j] = 1
continue
if j == 0: # first column, gap in seq2
score_mat[i][j] = -i
trace_mat[i][j] = 2
continue
ul = score_mat[i-1][j-1] + theta(p, q) # from up-left, mark 0
l = score_mat[i][j-1] + theta('-', q) # from left, mark 1, gap in seq1
u = score_mat[i-1][j] + theta(p, '-') # from up, mark 2, gap in seq2
picked = max([ul,l,u])
score_mat[i][j] = picked
trace_mat[i][j] = [ul, l, u].index(picked) # record which direction
if (picked > max_score):
max_score=picked
return score_mat, trace_mat,max_score
def traceback(seq1, seq2, trace_mat):
'''
find one optimal traceback path from trace matrix, return path code
-!- CAUTIOUS: if multiple equally possible path exits, only return one of them -!-
'''
seq1, seq2 = '-' + seq1, '-' + seq2
i, j = len(seq1) - 1, len(seq2) - 1
path_code = ''
while i > 0 or j > 0:
direction = trace_mat[i][j]
if direction == 0: # from up-left direction
i = i-1
j = j-1
path_code = '0' + path_code
elif direction == 1: # from left
j = j-1
path_code = '1' + path_code
elif direction == 2: # from up
i = i-1
path_code = '2' + path_code
return path_code
def print_m(seq1, seq2, m):
"""print score matrix or trace matrix"""
seq1 = '-' + seq1; seq2 = '-' + seq2
print()
print(' '.join(['%3s' % i for i in ' '+seq2]))
for i, p in enumerate(seq1):
line = [p] + [m[i][j] for j in range(len(seq2))]
print(' '.join(['%3s' % i for i in line]))
print()
return
def pretty_print_align(seq1, seq2, path_code):
'''
return pair alignment result string from
path code: 0 for match, 1 for gap in seq1, 2 for gap in seq2
'''
align1 = ''
middle = ''
align2 = ''
for p in path_code:
if p == '0':
align1 = align1 + seq1[0]
align2 = align2 + seq2[0]
if not degenerate.get(seq2[0]):
middle = middle + ' '
elif seq1[0] in degenerate.get(seq2[0]):
middle = middle + '|'
else:
middle = middle + ' '
seq1 = seq1[1:]
seq2 = seq2[1:]
elif p == '1':
align1 = align1 + '-'
align2 = align2 + seq2[0]
middle = middle + ' '
seq2 = seq2[1:]
elif p == '2':
align1 = align1 + seq1[0]
align2 = align2 + '-'
middle = middle + ' '
seq1 = seq1[1:]
print('Alignment:\n\n ' + align1 + '\n ' + middle + '\n ' + align2 + '\n')
return
def usage():
print('Usage:\n\tpython nwAligner.py xx.fq.gz xx.fasta\n')
return
def main():
try:
fq_gz,fasta = map(str, sys.argv[1:3])
with gzip.open(fq_gz,'rt') as fz:
i = 0
for line in fz :
i += 1
if i == 2 :
seq1 = line[0:28]
break
seq2s={}
with open (fasta,'rt') as fa :
for line in fa:
if line.startswith(">"):
name = line
seq2s[name] = ''
else:
seq2s[name]+=line
except:
usage()
return
dir_sq_score_path={} #字典
for name,seq2 in seq2s.items() :
score_mat , trace_mat , max_score = make_score_matrix(seq1, seq2)
path_code = traceback(seq1, seq2, trace_mat)
dir_sq_score_path[name]=[path_code,seq2,max_score]
sortdssp=sorted(dir_sq_score_path.items(),reverse=True,key = lambda kv:(kv[1][2]))[0:4]#字典按最大得分排序,取前五
#sortdssp成了list
for x in sortdssp:
print (x[0]+" max score "+str(x[1][2]))
pretty_print_align(seq1,x[1][1], x[1][0])
print("source code of blast from https://zhuanlan.zhihu.com/p/78027062")
if __name__ == '__main__':
main()
使用方法:
第0步:准备引物序列的fasta文件

使用与结果:

基本原理
其实原理很简单,就是将Fastq文件中的第一条序列的前28个bp截取,与fasta文件中所有的引物序列比对打分,最终输出得分前五的引物序列(注意:比对时未考虑反向互补序列情况)。
值得注意的是由于引物中有简并碱基存在,故打分时不能简单地判断相等,这点尤其要感谢B哥的改良。
网友评论