美文网首页程序员
【原创】用xapian打造一个文件“行”索引系统

【原创】用xapian打造一个文件“行”索引系统

作者: 嗒嗒噜拉 | 来源:发表于2019-02-21 20:32 被阅读0次
  • 对于大量的非格式化文本数据(如txt、xls、xlsx、csv、sql、html等),某些情况下我们需要针对文件的某一行进行检索。这些文件往往难以格式化后存入结构化数据库中,特别是单个文件体量较大的情况,难以使用常规方式进行检索,即使使用一些特殊的工具,在没有进行索引的情况下,检索速速往往难以忍受。
  • 笔者利用开源的全文检索引擎xapian编写了一个完整的文件行索引程序,能自动对txt、csv、sql、html、xls,xlsx格式的文本文件按行建立索引,程序采用中文分词工具“结巴分词”工具对文本行内容进行分词。
  • 笔者对50余个总量约200G的文件按行进行索引,完成索引后,检索响应时间在300ms以内。
#coding:utf8
#######################################################################################################
#                             自动索引系统 by DaDaLuLa                                                #
#######################################################################################################
#   程序功能:自动对txt,xls,xlsx,csv,sql,html格式的文本文件按行建立索引                                   #
#   程序要求:1.需索引的文件要放到一个固定的文件夹内,注意文件扩展名要符合要求                              #                        
#            2.文件编码方式须为utf8                                                                    #
#            3.文件名不能有中文                                                                        #
#   执行流程:实例化filesIndex对象-->从上次断点处继续索引-->遍历文件目录检查未索引的文件-->逐个建立索引      #
#   其他:    1.默认的日志文件为/var/log/auto_index.log(自动建立)                                       #
#            2.每个待索引文件的名字,md5值,断点位置等信息保存在mongodb数据库中                             #
#            3.索引文件所在目录默认为/var/lib/xapian/                                                  #
######################################################################################################

#!/bin/python
import os
import sys
import time
import datetime
import hashlib
import string
import re
import io
import logging
import pymongo
import xapian
import jieba
import chardet
#from multiprocessing import Pool
import types
from pyexcel_xls import XLBook
reload(sys)
sys.setdefaultencoding("utf-8")

SG_FILES    = "/var/lib/files/"                                 #存放待索引文件的目录
INDEX_DIR   = "/var/lib/xapian/"                                #存放索引文件的目录
LOG_FILE    = "/var/log/auto_index.log"                         #日志文件
POOL_SIZE   = 4                                                 #进程池大小

class filesIndex(object):
    def __init__(self):
        self.all_files = set()                             #存放遍历出来的所有文件
        self.need_indexed_files = set()
        self.bad_chars = [' ',',',u',','"',"'","(",")",u"(",u")","/","~","^","-",u".",u"\r",u"\n",u"\t","NULL","null",u'[',u']',u'{',u'}']   #结巴分词需要过滤的坏字符
    
        logging.basicConfig(
                    level = logging.INFO,
                                    format ='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                                    datefmt = '%a, %d %b %Y %H:%M:%S',
                                    filename = LOG_FILE,
                                    filemode = 'a'
                                   )
        self.log = logging                                 #日志模块
    
        #self.pool = Pool(processes = POOL_SIZE)           #多进程
    
        self.conn = pymongo.MongoClient("localhost",27017) #连接mongodb
        self.col = self.conn.sg_db.files                   #打开sg_db中的files集合

        self.ix_dir = INDEX_DIR                            #索引文件存放位置
        if not os.path.exists(self.ix_dir):
            os.mkdir(self.ix_dir)

        self.walkDir()                                 #遍历社工库文件夹中所有文件
        self.check_new_file()                          #检查所有待索引新文件
        self.init_index()
            
    #遍历待索引文件夹中所有文件
    def walkDir(self):
        for root,dirs,files in os.walk(SG_FILES,topdown = True):
            for name in files:
                #print name
                fname = os.path.join(root,name)
                ext = os.path.splitext(fname)[-1].strip('.').lower()
                if ext in ("txt","log","csv","xls","xlsx"):
                    self.all_files.add(fname)
                elif ext == "sql":
                    self.all_files.add(self.__sql2txt(fname))
                elif ext == "old":
                    pass
                else:
                    self.log.info(u"文件 "+fname+u" 格式无法处理")
    
    #sql格式转为txt
    def __sql2txt(self,fname):
        f = open("sql.sed","w")
        f.write("s/),(/)-\\n(/g\ns/);/)-\\n/g")
        f.close()
        self.log.info(u"文件 "+fname+u" 开始转化格式")
        fname_new = os.path.splitext(fname)[0]+"_sql.txt"
        fname_modify = os.path.splitext(fname)[0]+"_sql.old"
        os.system("cat "+fname+"| sed -f 'sql.sed' | grep ')-' | sed 's/)-/)/' > "+fname_new)
        os.rename(fname,fname_modify)   
        self.log.info(u"文件 "+fname+u" 转化格式完成")
        return fname_new

    #检查没有被索引的文件
    def check_new_file(self):
        old_files = set()
        for file_item in self.col.find():
            old_files.add(file_item["file_name"])
        new_files = self.all_files - old_files
        for f in new_files:
            print "[!]"+f 
            md5value = self.__md5_file(f)
            dup_file = self.col.find({"md5value":md5value})
            if dup_file.count():
                self.log.info(u"文件 "+f+u" 与文件 "+dup_file[0]["file_name"]+u" 系同一个文件")
                os.remove(f)
            else:
                charset = chardet.detect(open(f,"r").read(10000))["encoding"]
                self.col.insert_one({"file_name":f,"md5value":md5value,"charset":charset,"is_indexed":0,"line_pointer":0})
            
    #计算文件的md5值
    def __md5_file(self,file_name):
        m = hashlib.md5()   
        f = io.FileIO(file_name,'r')
        bytes = f.read(1024)
        while(bytes != b''):
            m.update(bytes)
            bytes = f.read(1024)
        f.close()
        md5value = m.hexdigest()
        return md5value
    
    #按文件类别找到索引入口
    def __indexFiles(self,fname,start = 0):
        ext = os.path.splitext(fname)[-1].strip('.')
        ext = ext.lower()
        if ext in ("txt","log","html","csv"):
            self.__index_txt(fname,start)
        elif ext in("xls","xlsx"):
            self.__index_excel(fname,start)
        else:
            self.log.info(u"文件"+fname+u"格式无法识别")    
    
    #针对txt,log,csv格式文件索引函数  
    def __index_txt(self,fname,start):
        f = open(fname,"r")
        index_dir = self.ix_dir+os.path.split(fname)[1].replace(".","_")
        index_db = xapian.WritableDatabase(index_dir,xapian.DB_CREATE_OR_OPEN)
        print "开始"  
        if start > 0:                                           #断点索引
            line_num = start
            self.log.info(u"文件"+fname+ u"从第"+str(start)+u"行处开始索引")
            while start > 0:                                #移动文件指针到断点位置
                f.readline()
                #print start
                start = start -1
        else:
            self.log.info(u"文件"+fname+u"开始索引")          #正常索引
            line_num = 0
            
        datas = f.readlines(20000000)                           #一次读取20M加快速度
        try:
            while datas:
                lines = 0
                for data in datas:
                    doc = xapian.Document()
                    doc.set_data(data)
                    seg_list = jieba.cut_for_search(data)
                    seg_list_clean = list(set(seg_list).difference(set(self.bad_chars)))
                    for seg in seg_list_clean:
                        #print seg
                        doc.add_term(seg)
                    index_db.add_document(doc)
                    lines = lines + 1
                    #print lines
                    #index_db.flush()
                
                line_num = line_num + lines
                print line_num
                datas= f.readlines(20000000)
        
        except:
                line_num = line_num + lines
                f.close()
                print "[!]############" 
                info = sys.exc_info()
                print info[0],":",info[1]                           #把错误打印出来,便于调试
                print "[!]***********"
                index_db.flush()
                self.__index_exit(fname,line_num)                   #调用索引中断处理程序
        index_db.flush()
        self.col.update_one({"file_name":fname},{"$set":{"is_indexed":1,"line_pointer":line_num}})
        self.log.info(u"文件"+fname+u"索引成功完成,共"+str(line_num)+u"条记录")                     #执行到这一步说明索引成功完成
        f.close()
    
    #针对excel文件的处理函数     
    def __index_excel(self,fname,start):
        book = XLBook(fname)
        xls_data = dict(book.sheets())
        index_dir = self.ix_dir+os.path.split(fname)[1].replace(".","_")
        index_db = xapian.WritableDatabase(index_dir,xapian.DB_CREATE_OR_OPEN)
        line_num = 0
        if start > 0: 
            self.log.info(u"文件"+fname+ u"从第"+str(start)+u"行处开始索引")
        else:
            self.log.info(u"文件"+fname+u"开始索引")          #正常索引
        try:
            for sheet in xls_data:
                for row in xls_data[sheet]:
                    line_num = line_num + 1
                    if line_num < start:
                        continue
                    data = ""
                    for col in row:
                        if type(col) == float:
                            col = str(int(col))
                        elif type(col) == int or type(col) == datetime.datetime or type(col) == datetime.date:
                            col = str(col)
                        else:
                            col = col.encode("utf-8")
                        data = data+col+ ","
                    data = data.strip(",")
                    doc = xapian.Document()
                    doc.set_data(data)
                    seg_list = jieba.cut_for_search(data)
                    seg_list_clean = list(set(seg_list).difference(set(self.bad_chars)))
                    for seg in seg_list_clean:
                        doc.add_term(seg)
                    index_db.add_document(doc)
        except:
                info = sys.exc_info()
                print info[0],":",info[1]                           #把错误打印出来,便于调试
                index_db.flush()
                self.__index_exit(fname,line_num)                   #调用索引中断处理程序
        index_db.flush()
        self.col.update_one({"file_name":fname},{"$set":{"is_indexed":1,"line_pointer":line_num}})
        self.log.info(u"文件"+fname+u"索引成功完成")                    #执行到这一步说明索引成功完成
        self.log.info(u"文件"+fname+u"索引成功完成,共"+str(line_num)+u"条记录")                     #执行到这一步说明索引成功完成
        
    #索引断点处理程序 (保存断点位置,写入日志)
    def __index_exit(self,fname,line_num):
        #记录索引断点的位置
        self.col.update_one({"file_name":fname},{"$set":{"line_pointer":line_num}})

        self.log.info(u"文件"+fname+u"索引中断发生在第"+str(line_num)+u"行")
        sys.exit()
        
    #索引入口,判断索引模式(断点索引还是正常索引)
    def init_index(self):
        #断点索引
        for doc in self.col.find({"is_indexed":0,"line_pointer":{"$gt":0}}):
            #self.pool.apply_async(self.__indexFiles,args=(doc["file_name"],doc["line_pointer"]))
            self.data_tag = os.path.split(doc["file_name"])[1].split(".")[0]    #数据标签
            self.__indexFiles(doc["file_name"],doc["line_pointer"])             
        #新文件索引      
        for doc in self.col.find({"is_indexed":0,"line_pointer":0}):
            #self.pool.apply_async(self.__indexFiles,args=(doc["file_name"],))
            self.data_tag = os.path.split(doc["file_name"])[1].split(".")[0]
            self.__indexFiles(doc["file_name"])
        #self.pool.close()
        #self.pool.join()       

    #数据检索接口     
    def search(self,keyword):
        db = xapian.Database()
        for d in os.listdir(self.ix_dir):
            print self.ix_dir+d
            db_d = xapian.Database(self.ix_dir+d)
            db.add_database(db_d)
        enquire =xapian.Enquire(db)
        query = xapian.Query(keyword)
        print keyword
        enquire.set_query(query)
        matches = enquire.get_mset(0,50)
        print matches.size()
        for m in matches:
            print "%i %i%% docid=%i [%s]" % (m.rank+1,m.percent,m.docid,m.document.get_data())  
    #i析构函数
    def __del__(self):
        self.conn.close()      #关闭mongodb连接

if __name__ == "__main__":
    reload(sys)
    sys.setdefaultencoding("utf8")
    INDEX = filesIndex()
    INDEX.walkDir()                                 #遍历待索引文件夹中所有文件
    INDEX.check_new_file()                          #检查所有待索引新文件
    INDEX.init_index()
    #start_time = time.time()
    #INDEX.search("123456")                          #数据检索测试
    #finsh_time = time.time()
    #print "[+] 检索耗时 "+str(finsh_time-start_time)

【原创文章,转载文章和文中代码请注明出处】

相关文章

  • 【原创】用xapian打造一个文件“行”索引系统

    对于大量的非格式化文本数据(如txt、xls、xlsx、csv、sql、html等),某些情况下我们需要针对文件的...

  • xapian note 01

    [1]. Index xapian建立索引,参考https://github.com/xapian/xapian-...

  • Docker方法运用(十一)-搜索引擎elasticsearch

    各种搜索引擎:whoosh,Solr, Sphinx, Xapian。。 引擎的比较:https://blog.c...

  • Linux 文件属性及权限

    Linux一切皆文件: Llinux系统的文件或目录的属性主要包括:索引节点、文件类型、文件权限、链接数、所属的用...

  • 操作系统-文件结构

    文件结构 计算机系统中采用的索引文件结构如下图所示: 系统中有13个索引节点,0-9为直接索引,即每个索引节点存放...

  • linux学习笔记三

    文件组成 linux文件系统的运行和操作系统的文件组成有关,文件系统会将文件权限,属性放在inode(索引节点)中...

  • 文件系统-索引

    如果一个索引式文件的索引节点有10个直接块,1个一级间接块,1个二级间接块,1个三级间接块。假设每个数据块的大小是...

  • 文件系统和磁盘工作原理

    文件系统 磁盘为系统提供了最基本的持久化存储 文件系统则在磁盘的基础上,提供了一个用于管理文件的树状结构。 索引节...

  • 文件系统和磁盘工作原理

    文件系统 磁盘为系统提供了最基本的持久化存储 文件系统则在磁盘的基础上,提供了一个用于管理文件的树状结构。 索引节...

  • 知识点小结

    一、文件名通配符 二、inode索引节点号 inode (index node )表中包含文件系统所有文件列表一个...

网友评论

    本文标题:【原创】用xapian打造一个文件“行”索引系统

    本文链接:https://www.haomeiwen.com/subject/hwwdyqtx.html