美文网首页
读取文件内容并写到kafka

读取文件内容并写到kafka

作者: davidic | 来源:发表于2019-02-13 17:01 被阅读0次
    #!/usr/bin/env python2
    # -*- coding: utf-8 -*-
    from kafka.producer import KafkaProducer  
    import sys
    import logging
    import os
    import datetime
    import random
    import threading  
    import time
    
    
    parentPath = "/opt/git/xx/app/event"
    finishedLogPath = parentPath+"/finished_log.dat"
    
    logger = logging.getLogger("eventToKafka")
    logger.setLevel(logging.DEBUG)
    # 建立一个filehandler来把日志记录在文件里,级别为debug以上
    fh = logging.FileHandler(parentPath+"/event_to_kafka.log")
    fh.setLevel(logging.DEBUG)
    # 建立一个streamhandler来把日志打在CMD窗口上,级别为error以上
    ch = logging.StreamHandler()
    ch.setLevel(logging.ERROR)
    # 设置日志格式
    formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
    ch.setFormatter(formatter)
    fh.setFormatter(formatter)
    #将相应的handler添加在logger对象中
    logger.addHandler(ch)
    logger.addHandler(fh)
    
    
    class Kafka_producer():
        """
        使用kafka的生产模块
        """
    
        def __init__(self, kafkatopic, kafkapartition):
            self.kafkaTopic = kafkatopic
            self.kafkaPartition=kafkapartition
            self.producer = KafkaProducer(bootstrap_servers = ['111.111.111.111:9092'])
    
        def sendjsondata(self, jsonData):
            try:
                producer = self.producer
                producer.send(self.kafkaTopic, jsonData)
                #producer.flush()
            except Exception, e:
                logger.error(e)
    
        def flush(self):
            producer = self.producer
            producer.flush()
    
        def sendBatchJsonData(self, jsonData):
            try:
                curcount = len(jsonData)/self.kafkaPartition
                for i in range(0, self.kafkaPartition):
                    start = i * curcount
                    if i != (self.kafkaPartition - 1):
                        end = (i+1) * curcount
                        curdata = jsonData[start:end]
                        self.producer.send(self.kafkaTopic, curdata)
                        self.producer.flush()
                    else:
                        curdata = jsonData[start:]
                        self.producer.send(self.kafkaTopic, curdata)
                        self.producer.flush()
            except Exception, e:
                logger.error(e)
    
    def searchFile(path, keyword):
        fpList = []
        for filename in os.listdir(path):
            fp = os.path.join(path, filename)
            if os.path.isfile(fp) and keyword in filename:
                fpList.append(fp)
        return fpList
    
    def insertIntoSet(filePath):
        file = open(filePath)
        try:
            tempSet = set()
            for line in file:
                tempSet.add(line.replace('\n',''))
        except Exception, e:
            logger.error(e)
        finally:
            file.close()
        return tempSet
        
    
    class calthread(threading.Thread):  
        #初始化函数  
        def __init__(self,threadname,cond,startN,endN,files):  
            threading.Thread.__init__(self,name = threadname)  
            self.cond = cond  
            self.startN = startN  
            self.endN = endN  
            self.files = files
        #业务函数  
        def run(self):  
            for i in range(self.startN,self.endN + 1):  
                filePath = self.files[i]  
                logger.info("current file is " + filePath)
                producer = Kafka_producer("event", 1)
                file = open(filePath)
                try:
                    fileLines = 0
                    for line in file:
                        arr = line.strip().split('\t')
                        if len(arr) > 0:
                            try:
                                producer.sendjsondata(arr[2])
                                producer.flush()
                                #随机打印日志
                                if random.random() < 0.00001:
                                    logger.info(arr[2])
                                fileLines += 1
                            except Exception, e:
                                logger.error("current wrong file is %s" % (filePath))
                                logger.error("The wrong event log is %s" % (arr[2]))
                                logger.error(e)
                                continue
                    logger.info("insert into kafka %s lines" % (str(fileLines)))
                except Exception, e:
                    logger.error(e)
                finally:
                    file.close()
    
    def main(argv=None):
        if argv == None:
            argv = sys.argv
            
            
        #获取线程锁  
        cond = threading.Condition()  
        
        #已处理日志
        finishedLog = set()
        finishedFile = open(finishedLogPath)
        try:
            for line in finishedFile:
                finishedLog.add(line.strip('\n'))
        finally:
            finishedFile.close()            
        
        #incoming日志
        incomingLog = set(searchFile("/xx/xx/staging/tracking/incoming/", "xxx"))
        
        #待处理日志写入finished_log.dat
        todoLog = incomingLog - finishedLog
        if len(todoLog) == 0:
        return
        for i in todoLog:
        print(i)
        outfile = open(finishedLogPath, 'a')
        try:
            for i in todoLog:
                outfile.write(i + "\n")
        finally:
            outfile.close()
        
        todoList = list(todoLog)
          
    
     
        alen = len(todoList)
        threadN = alen
    
        #执行线程对象列表  
        threadL  = []   
        t = alen / threadN 
           
        logger.info( "初始化线程"  )
        for x in range(0,threadN):  
            startN = x*t  
            endN = 0  
            if((x+1)*t >= alen):  
                endN = alen - 1  
            else:  
                if(x == threadN - 1):  
                    endN = alen - 1  
                else:  
                    endN = (x+1)*t - 1  
            #向列表中存入新线程对象  
            threadTT = calthread("Thread--"+str(x),cond,startN,endN,todoList)  
            threadL.append(threadTT)  
          
        #总计完成线程计数器  
        logger.info("Start time of threadCal--"+ str(time.time()))
        for a in threadL:  
            a.start() 
        
        logger.info("done")
    
    
    
    if __name__ == "__main__":
        main()
    
    

    相关文章

      网友评论

          本文标题:读取文件内容并写到kafka

          本文链接:https://www.haomeiwen.com/subject/rzvdeqtx.html