美文网首页程序员
【原创】用xapian打造一个文件“行”索引系统

【原创】用xapian打造一个文件“行”索引系统

作者: 嗒嗒噜拉 | 来源:发表于2019-02-21 20:32 被阅读0次
    • 对于大量的非格式化文本数据(如txt、xls、xlsx、csv、sql、html等),某些情况下我们需要针对文件的某一行进行检索。这些文件往往难以格式化后存入结构化数据库中,特别是单个文件体量较大的情况,难以使用常规方式进行检索,即使使用一些特殊的工具,在没有进行索引的情况下,检索速速往往难以忍受。
    • 笔者利用开源的全文检索引擎xapian编写了一个完整的文件行索引程序,能自动对txt、csv、sql、html、xls,xlsx格式的文本文件按行建立索引,程序采用中文分词工具“结巴分词”工具对文本行内容进行分词。
    • 笔者对50余个总量约200G的文件按行进行索引,完成索引后,检索响应时间在300ms以内。
    #coding:utf8
    #######################################################################################################
    #                             自动索引系统 by DaDaLuLa                                                #
    #######################################################################################################
    #   程序功能:自动对txt,xls,xlsx,csv,sql,html格式的文本文件按行建立索引                                   #
    #   程序要求:1.需索引的文件要放到一个固定的文件夹内,注意文件扩展名要符合要求                              #                        
    #            2.文件编码方式须为utf8                                                                    #
    #            3.文件名不能有中文                                                                        #
    #   执行流程:实例化filesIndex对象-->从上次断点处继续索引-->遍历文件目录检查未索引的文件-->逐个建立索引      #
    #   其他:    1.默认的日志文件为/var/log/auto_index.log(自动建立)                                       #
    #            2.每个待索引文件的名字,md5值,断点位置等信息保存在mongodb数据库中                             #
    #            3.索引文件所在目录默认为/var/lib/xapian/                                                  #
    ######################################################################################################
    
    #!/bin/python
    import os
    import sys
    import time
    import datetime
    import hashlib
    import string
    import re
    import io
    import logging
    import pymongo
    import xapian
    import jieba
    import chardet
    #from multiprocessing import Pool
    import types
    from pyexcel_xls import XLBook
    reload(sys)
    sys.setdefaultencoding("utf-8")
    
    SG_FILES    = "/var/lib/files/"                                 #存放待索引文件的目录
    INDEX_DIR   = "/var/lib/xapian/"                                #存放索引文件的目录
    LOG_FILE    = "/var/log/auto_index.log"                         #日志文件
    POOL_SIZE   = 4                                                 #进程池大小
    
    class filesIndex(object):
        def __init__(self):
            self.all_files = set()                             #存放遍历出来的所有文件
            self.need_indexed_files = set()
            self.bad_chars = [' ',',',u',','"',"'","(",")",u"(",u")","/","~","^","-",u".",u"\r",u"\n",u"\t","NULL","null",u'[',u']',u'{',u'}']   #结巴分词需要过滤的坏字符
        
            logging.basicConfig(
                        level = logging.INFO,
                                        format ='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                                        datefmt = '%a, %d %b %Y %H:%M:%S',
                                        filename = LOG_FILE,
                                        filemode = 'a'
                                       )
            self.log = logging                                 #日志模块
        
            #self.pool = Pool(processes = POOL_SIZE)           #多进程
        
            self.conn = pymongo.MongoClient("localhost",27017) #连接mongodb
            self.col = self.conn.sg_db.files                   #打开sg_db中的files集合
    
            self.ix_dir = INDEX_DIR                            #索引文件存放位置
            if not os.path.exists(self.ix_dir):
                os.mkdir(self.ix_dir)
    
            self.walkDir()                                 #遍历社工库文件夹中所有文件
            self.check_new_file()                          #检查所有待索引新文件
            self.init_index()
                
        #遍历待索引文件夹中所有文件
        def walkDir(self):
            for root,dirs,files in os.walk(SG_FILES,topdown = True):
                for name in files:
                    #print name
                    fname = os.path.join(root,name)
                    ext = os.path.splitext(fname)[-1].strip('.').lower()
                    if ext in ("txt","log","csv","xls","xlsx"):
                        self.all_files.add(fname)
                    elif ext == "sql":
                        self.all_files.add(self.__sql2txt(fname))
                    elif ext == "old":
                        pass
                    else:
                        self.log.info(u"文件 "+fname+u" 格式无法处理")
        
        #sql格式转为txt
        def __sql2txt(self,fname):
            f = open("sql.sed","w")
            f.write("s/),(/)-\\n(/g\ns/);/)-\\n/g")
            f.close()
            self.log.info(u"文件 "+fname+u" 开始转化格式")
            fname_new = os.path.splitext(fname)[0]+"_sql.txt"
            fname_modify = os.path.splitext(fname)[0]+"_sql.old"
            os.system("cat "+fname+"| sed -f 'sql.sed' | grep ')-' | sed 's/)-/)/' > "+fname_new)
            os.rename(fname,fname_modify)   
            self.log.info(u"文件 "+fname+u" 转化格式完成")
            return fname_new
    
        #检查没有被索引的文件
        def check_new_file(self):
            old_files = set()
            for file_item in self.col.find():
                old_files.add(file_item["file_name"])
            new_files = self.all_files - old_files
            for f in new_files:
                print "[!]"+f 
                md5value = self.__md5_file(f)
                dup_file = self.col.find({"md5value":md5value})
                if dup_file.count():
                    self.log.info(u"文件 "+f+u" 与文件 "+dup_file[0]["file_name"]+u" 系同一个文件")
                    os.remove(f)
                else:
                    charset = chardet.detect(open(f,"r").read(10000))["encoding"]
                    self.col.insert_one({"file_name":f,"md5value":md5value,"charset":charset,"is_indexed":0,"line_pointer":0})
                
        #计算文件的md5值
        def __md5_file(self,file_name):
            m = hashlib.md5()   
            f = io.FileIO(file_name,'r')
            bytes = f.read(1024)
            while(bytes != b''):
                m.update(bytes)
                bytes = f.read(1024)
            f.close()
            md5value = m.hexdigest()
            return md5value
        
        #按文件类别找到索引入口
        def __indexFiles(self,fname,start = 0):
            ext = os.path.splitext(fname)[-1].strip('.')
            ext = ext.lower()
            if ext in ("txt","log","html","csv"):
                self.__index_txt(fname,start)
            elif ext in("xls","xlsx"):
                self.__index_excel(fname,start)
            else:
                self.log.info(u"文件"+fname+u"格式无法识别")    
        
        #针对txt,log,csv格式文件索引函数  
        def __index_txt(self,fname,start):
            f = open(fname,"r")
            index_dir = self.ix_dir+os.path.split(fname)[1].replace(".","_")
            index_db = xapian.WritableDatabase(index_dir,xapian.DB_CREATE_OR_OPEN)
            print "开始"  
            if start > 0:                                           #断点索引
                line_num = start
                self.log.info(u"文件"+fname+ u"从第"+str(start)+u"行处开始索引")
                while start > 0:                                #移动文件指针到断点位置
                    f.readline()
                    #print start
                    start = start -1
            else:
                self.log.info(u"文件"+fname+u"开始索引")          #正常索引
                line_num = 0
                
            datas = f.readlines(20000000)                           #一次读取20M加快速度
            try:
                while datas:
                    lines = 0
                    for data in datas:
                        doc = xapian.Document()
                        doc.set_data(data)
                        seg_list = jieba.cut_for_search(data)
                        seg_list_clean = list(set(seg_list).difference(set(self.bad_chars)))
                        for seg in seg_list_clean:
                            #print seg
                            doc.add_term(seg)
                        index_db.add_document(doc)
                        lines = lines + 1
                        #print lines
                        #index_db.flush()
                    
                    line_num = line_num + lines
                    print line_num
                    datas= f.readlines(20000000)
            
            except:
                    line_num = line_num + lines
                    f.close()
                    print "[!]############" 
                    info = sys.exc_info()
                    print info[0],":",info[1]                           #把错误打印出来,便于调试
                    print "[!]***********"
                    index_db.flush()
                    self.__index_exit(fname,line_num)                   #调用索引中断处理程序
            index_db.flush()
            self.col.update_one({"file_name":fname},{"$set":{"is_indexed":1,"line_pointer":line_num}})
            self.log.info(u"文件"+fname+u"索引成功完成,共"+str(line_num)+u"条记录")                     #执行到这一步说明索引成功完成
            f.close()
        
        #针对excel文件的处理函数     
        def __index_excel(self,fname,start):
            book = XLBook(fname)
            xls_data = dict(book.sheets())
            index_dir = self.ix_dir+os.path.split(fname)[1].replace(".","_")
            index_db = xapian.WritableDatabase(index_dir,xapian.DB_CREATE_OR_OPEN)
            line_num = 0
            if start > 0: 
                self.log.info(u"文件"+fname+ u"从第"+str(start)+u"行处开始索引")
            else:
                self.log.info(u"文件"+fname+u"开始索引")          #正常索引
            try:
                for sheet in xls_data:
                    for row in xls_data[sheet]:
                        line_num = line_num + 1
                        if line_num < start:
                            continue
                        data = ""
                        for col in row:
                            if type(col) == float:
                                col = str(int(col))
                            elif type(col) == int or type(col) == datetime.datetime or type(col) == datetime.date:
                                col = str(col)
                            else:
                                col = col.encode("utf-8")
                            data = data+col+ ","
                        data = data.strip(",")
                        doc = xapian.Document()
                        doc.set_data(data)
                        seg_list = jieba.cut_for_search(data)
                        seg_list_clean = list(set(seg_list).difference(set(self.bad_chars)))
                        for seg in seg_list_clean:
                            doc.add_term(seg)
                        index_db.add_document(doc)
            except:
                    info = sys.exc_info()
                    print info[0],":",info[1]                           #把错误打印出来,便于调试
                    index_db.flush()
                    self.__index_exit(fname,line_num)                   #调用索引中断处理程序
            index_db.flush()
            self.col.update_one({"file_name":fname},{"$set":{"is_indexed":1,"line_pointer":line_num}})
            self.log.info(u"文件"+fname+u"索引成功完成")                    #执行到这一步说明索引成功完成
            self.log.info(u"文件"+fname+u"索引成功完成,共"+str(line_num)+u"条记录")                     #执行到这一步说明索引成功完成
            
        #索引断点处理程序 (保存断点位置,写入日志)
        def __index_exit(self,fname,line_num):
            #记录索引断点的位置
            self.col.update_one({"file_name":fname},{"$set":{"line_pointer":line_num}})
    
            self.log.info(u"文件"+fname+u"索引中断发生在第"+str(line_num)+u"行")
            sys.exit()
            
        #索引入口,判断索引模式(断点索引还是正常索引)
        def init_index(self):
            #断点索引
            for doc in self.col.find({"is_indexed":0,"line_pointer":{"$gt":0}}):
                #self.pool.apply_async(self.__indexFiles,args=(doc["file_name"],doc["line_pointer"]))
                self.data_tag = os.path.split(doc["file_name"])[1].split(".")[0]    #数据标签
                self.__indexFiles(doc["file_name"],doc["line_pointer"])             
            #新文件索引      
            for doc in self.col.find({"is_indexed":0,"line_pointer":0}):
                #self.pool.apply_async(self.__indexFiles,args=(doc["file_name"],))
                self.data_tag = os.path.split(doc["file_name"])[1].split(".")[0]
                self.__indexFiles(doc["file_name"])
            #self.pool.close()
            #self.pool.join()       
    
        #数据检索接口     
        def search(self,keyword):
            db = xapian.Database()
            for d in os.listdir(self.ix_dir):
                print self.ix_dir+d
                db_d = xapian.Database(self.ix_dir+d)
                db.add_database(db_d)
            enquire =xapian.Enquire(db)
            query = xapian.Query(keyword)
            print keyword
            enquire.set_query(query)
            matches = enquire.get_mset(0,50)
            print matches.size()
            for m in matches:
                print "%i %i%% docid=%i [%s]" % (m.rank+1,m.percent,m.docid,m.document.get_data())  
        #i析构函数
        def __del__(self):
            self.conn.close()      #关闭mongodb连接
    
    if __name__ == "__main__":
        reload(sys)
        sys.setdefaultencoding("utf8")
        INDEX = filesIndex()
        INDEX.walkDir()                                 #遍历待索引文件夹中所有文件
        INDEX.check_new_file()                          #检查所有待索引新文件
        INDEX.init_index()
        #start_time = time.time()
        #INDEX.search("123456")                          #数据检索测试
        #finsh_time = time.time()
        #print "[+] 检索耗时 "+str(finsh_time-start_time)
    

    【原创文章,转载文章和文中代码请注明出处】

    相关文章

      网友评论

        本文标题:【原创】用xapian打造一个文件“行”索引系统

        本文链接:https://www.haomeiwen.com/subject/hwwdyqtx.html