#!/usr/bin/env python2
# -*- coding: utf-8 -*-
from kafka.producer import KafkaProducer
import sys
import logging
import os
import datetime
import random
import threading
import time
parentPath = "/opt/git/xx/app/event"
finishedLogPath = parentPath+"/finished_log.dat"
logger = logging.getLogger("eventToKafka")
logger.setLevel(logging.DEBUG)
# 建立一个filehandler来把日志记录在文件里,级别为debug以上
fh = logging.FileHandler(parentPath+"/event_to_kafka.log")
fh.setLevel(logging.DEBUG)
# 建立一个streamhandler来把日志打在CMD窗口上,级别为error以上
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
# 设置日志格式
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
ch.setFormatter(formatter)
fh.setFormatter(formatter)
#将相应的handler添加在logger对象中
logger.addHandler(ch)
logger.addHandler(fh)
class Kafka_producer():
"""
使用kafka的生产模块
"""
def __init__(self, kafkatopic, kafkapartition):
self.kafkaTopic = kafkatopic
self.kafkaPartition=kafkapartition
self.producer = KafkaProducer(bootstrap_servers = ['111.111.111.111:9092'])
def sendjsondata(self, jsonData):
try:
producer = self.producer
producer.send(self.kafkaTopic, jsonData)
#producer.flush()
except Exception, e:
logger.error(e)
def flush(self):
producer = self.producer
producer.flush()
def sendBatchJsonData(self, jsonData):
try:
curcount = len(jsonData)/self.kafkaPartition
for i in range(0, self.kafkaPartition):
start = i * curcount
if i != (self.kafkaPartition - 1):
end = (i+1) * curcount
curdata = jsonData[start:end]
self.producer.send(self.kafkaTopic, curdata)
self.producer.flush()
else:
curdata = jsonData[start:]
self.producer.send(self.kafkaTopic, curdata)
self.producer.flush()
except Exception, e:
logger.error(e)
def searchFile(path, keyword):
fpList = []
for filename in os.listdir(path):
fp = os.path.join(path, filename)
if os.path.isfile(fp) and keyword in filename:
fpList.append(fp)
return fpList
def insertIntoSet(filePath):
file = open(filePath)
try:
tempSet = set()
for line in file:
tempSet.add(line.replace('\n',''))
except Exception, e:
logger.error(e)
finally:
file.close()
return tempSet
class calthread(threading.Thread):
#初始化函数
def __init__(self,threadname,cond,startN,endN,files):
threading.Thread.__init__(self,name = threadname)
self.cond = cond
self.startN = startN
self.endN = endN
self.files = files
#业务函数
def run(self):
for i in range(self.startN,self.endN + 1):
filePath = self.files[i]
logger.info("current file is " + filePath)
producer = Kafka_producer("event", 1)
file = open(filePath)
try:
fileLines = 0
for line in file:
arr = line.strip().split('\t')
if len(arr) > 0:
try:
producer.sendjsondata(arr[2])
producer.flush()
#随机打印日志
if random.random() < 0.00001:
logger.info(arr[2])
fileLines += 1
except Exception, e:
logger.error("current wrong file is %s" % (filePath))
logger.error("The wrong event log is %s" % (arr[2]))
logger.error(e)
continue
logger.info("insert into kafka %s lines" % (str(fileLines)))
except Exception, e:
logger.error(e)
finally:
file.close()
def main(argv=None):
if argv == None:
argv = sys.argv
#获取线程锁
cond = threading.Condition()
#已处理日志
finishedLog = set()
finishedFile = open(finishedLogPath)
try:
for line in finishedFile:
finishedLog.add(line.strip('\n'))
finally:
finishedFile.close()
#incoming日志
incomingLog = set(searchFile("/xx/xx/staging/tracking/incoming/", "xxx"))
#待处理日志写入finished_log.dat
todoLog = incomingLog - finishedLog
if len(todoLog) == 0:
return
for i in todoLog:
print(i)
outfile = open(finishedLogPath, 'a')
try:
for i in todoLog:
outfile.write(i + "\n")
finally:
outfile.close()
todoList = list(todoLog)
alen = len(todoList)
threadN = alen
#执行线程对象列表
threadL = []
t = alen / threadN
logger.info( "初始化线程" )
for x in range(0,threadN):
startN = x*t
endN = 0
if((x+1)*t >= alen):
endN = alen - 1
else:
if(x == threadN - 1):
endN = alen - 1
else:
endN = (x+1)*t - 1
#向列表中存入新线程对象
threadTT = calthread("Thread--"+str(x),cond,startN,endN,todoList)
threadL.append(threadTT)
#总计完成线程计数器
logger.info("Start time of threadCal--"+ str(time.time()))
for a in threadL:
a.start()
logger.info("done")
if __name__ == "__main__":
main()
网友评论