# !/usr/bin/python
# -*- coding: utf-8 -*-
import os
import pandas as pd
import datetime
import time
import kudu
from kudu.client import Partitioning
import logging
from logging import handlers
def Logger(filename, level='info', when='D', backCount=3):
level_relations = {
'debug': logging.DEBUG,
'info': logging.INFO,
'warning': logging.WARNING,
'error': logging.ERROR,
'crit': logging.CRITICAL
} # 日志级别关系映射
fmt = '%(asctime)s %(levelname)s [%(pathname)s:%(funcName)s:%(lineno)d] %(message)s'
logger = logging.getLogger(filename)
format_str = logging.Formatter(fmt)
logger.setLevel(level_relations.get(level))
sh = logging.StreamHandler()
sh.setFormatter(format_str)
th = handlers.TimedRotatingFileHandler(filename=filename, when=when, backupCount=backCount, encoding='utf-8')
th.setFormatter(format_str)
logger.addHandler(sh)
logger.addHandler(th)
return logger
log = Logger('all.log')
def read_data():
"""读取所有数据"""
# 读取2019-08-01日数据
df = pd.read_csv('/hadoop/disk1/data_result/frontlog-2019-08-01.txt', sep='\t', header=None, error_bad_lines=False)
# 修改列名
df.columns = ['dt', 'vl', 'nt', 'tm', 'amc', 're', 'nw', 'de', 'id', 'dd', 'fm', 'dip', '_id', 'mb', 'bm', 've',
'createdate', 'offsetx', 'sy', 'fn', 'createdatestr', 'dc', 'sid', 'ev', 'dpl', 'dmb', 'imei', 'sd',
'asx', 'did', 'loc', 'pid', 'cu', 'ip', 'wbn', 'rs', 'rt', 'pr', 'sp', 'spid', 'se', 'seid',
'part_dt']
log.info('读取所有数据成功')
return df
def get_data(df, initial_time=0):
"""获取当前时间段数据"""
# 深拷贝
lc = df.copy()
# 替换分区值
lc['part_dt'] = time.strftime('%Y-%m', time.localtime())
# 替换为当天时间并转换类型
lc['tm'] = pd.to_datetime(time.strftime('%Y-%m-%d ', time.localtime()) + lc['tm'].str[11:19],
format='%Y-%m-%d %H:%M:%S')
# 初始化时间为当前时间前3秒
initial_time = datetime.datetime.now() - datetime.timedelta(seconds=3) if not initial_time else initial_time
# 获取当前时间
now_time = datetime.datetime.now()
if initial_time == now_time:
log.info('当前时间与上次更新时间相同, 略过')
return []
# 获取该时间段数据
data_timedelta = lc[(lc['tm'] >= initial_time) & (lc['tm'] < now_time)]
log.info('获取当前时间数据 [%s]-[%s]' % (str(initial_time)[:19], str(now_time)[:19]))
# 所有字段转换str
data_timedelta = data_timedelta.applymap(str)
return data_timedelta.to_dict('records')
def kudu_operation(data):
"""kudu操作"""
try:
if not data:
return datetime.datetime.now()
# kerberos认证
os.system('kinit -kt /root/etluser.keytab etluser')
# 获取kudu连接
client = kudu.connect(host='172.16.163.216', port=7051)
table = client.table('impala::data_market_tuomin.frontlog_test_demo')
# 创建一个新会话用于操作表
session = client.new_session()
# 往表插入数据
for item in data:
op = table.new_insert(item)
session.apply(op)
# 刷新写入操作, 如果发生异常, 忽略
try:
session.flush()
except kudu.KuduBadStatus as e:
pass
now_time = datetime.datetime.now()
log.info('写入数据成功, 数据量: %s' % len(data))
client.close()
except Exception as e:
log.error("kudu操作失败[ERROR: %s]" % e, exc_info=True)
if __name__ == '__main__':
# 初始化时间
initial_time = 0
# 读取数据
df = read_data()
while True:
# 获取数据
data = get_data(df, initial_time)
# 写入数据
initial_time = kudu_operation(data)
网友评论