spark 1.6 的数据抽取代码 插入数据 采用 dataframe下面是python版的
主要代码在 main里面 插入数据 采用 dataframe
代码简要说明: 根据 ets(抽取后的表)中的updates 更新时间 字段最大值a 去源表 slave 过滤时间大于a的,有的话插入数据库
#!/usr/bin/env python
# coding=utf-8
"""
author:zb
create_at:2017-9-8 09:37:45
"""
import hashlib
import os
import sys
import datetime
import json
import logging
from os import path
from pyspark.sql.types import StructField, StringType, StructType
reload(sys)
sys.setdefaultencoding('utf-8')
from pyspark import SparkContext
from pyspark.sql import HiveContext
'''
日志模块
'''
def setLog():
logger = logging.getLogger()
# spark中 DEBUG 级别无法使用
logger.setLevel(logging.INFO) # Log等级总开关
# 第二步,创建一个handler,用于写入日志文件
logfile = os.path.join(path.dirname(__file__), 'logger.txt')
fh = logging.FileHandler(logfile, mode='a')
fh.setLevel(logging.INFO) # 输出到file的log等级的开关
# 第三步,再创建一个handler,用于输出到控制台
ch = logging.StreamHandler()
ch.setLevel(logging.INFO) # 输出到console的log等级的开关
# 第四步,定义handler的输出格式
formatter = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s")
fh.setFormatter(formatter)
ch.setFormatter(formatter)
# 第五步,将logger添加到handler里面
logger.addHandler(fh)
logger.addHandler(ch)
def md5(row):
# 强制转码
reload(sys)
sys.setdefaultencoding('utf-8')
temstr = "Learn.%s|Learn.%s|Learn.%s|Learn.%s|Learn.%s|Learn.%s" \
% (row.id, row.name, row.status, row.type, row.scoresheetcode, str(row.updated_at))
m = hashlib.md5()
m.update(temstr)
return m.hexdigest()
if __name__ == '__main__':
setLog()
# 定义客户标识
cust_no = '1'
isvalid = '1'
slaveTempTable = 'learn_slave'
etsTempTable = 'ets_learn'
appname = etsTempTable + '_insert'
sc = SparkContext(appName=appname)
sqlContext = HiveContext(sc)
# driver = "com.mysql.jdbc.Driver"
dff = sqlContext.read.format("jdbc").options(url="jdbc:mysql://192.168.1.200:3306/xxx?user=root"
"&password=xxx&useUnicode=true&characterEncoding=UTF-8"
"&zeroDateTimeBehavior=convertToNull", dbtable="Learn",
driver="com.mysql.jdbc.Driver").load()
dff.registerTempTable('learn_slave')
dft = sqlContext.read.format("jdbc").options(url="jdbc:mysql://192.168.1.200:3307/bd_ets?user=root"
"&password=xxxx&useUnicode=true&characterEncoding=UTF-8"
"&zeroDateTimeBehavior=convertToNull", dbtable="ets_learn",
driver="com.mysql.jdbc.Driver").load()
dft.registerTempTable('ets_learn')
ds_ets = sqlContext.sql(" select max(updatets) as max from ets_learn ")
pp = ds_ets.collect()[0]
max_updates = pp.max
slave_sql = ''
try:
if max_updates is not None:
logging.info(u"ets库中的最大时间是:" + str(max_updates))
slave_sql = " select id, name, status, type, scoresheetcode, updated_at " \
" from learn_slave where `updated_at` > '%s' " % (max_updates)
else:
logging.info(u"本次为初次抽取")
slave_sql = " select id, name, status, type, scoresheetcode, updated_at" \
" from learn_slave "
ds_slave = sqlContext.sql(slave_sql)
logging.info(u'slave 中 符合条件的记录数为:%s' % (ds_slave.count()))
now_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
logging.info(u'开始组装数据...')
src_fields = json.dumps({'Learn': ['id', 'name', 'status', 'type', 'scoresheetcode', 'updated_at']})
# 字段值
filedvlue = ds_slave.map(lambda row: (row.id, row.name, row.status, row.type, row.scoresheetcode, cust_no,
isvalid, src_fields,
md5(row), now_time, str(row.updated_at)))
# 创建列
schemaString = "id,name,status,type,scoresheetcode,cust_no,isvalid,src_fields,src_fields_md5,createts,updatets"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split(",")]
schema = StructType(fields)
# 使用列名和字段值创建datafrom
schemaObj = sqlContext.createDataFrame(filedvlue, schema)
logging.info(u'组装数据完成...')
# print schemaPeople
# for row in schemaPeople:
# print row.id
logging.info(u'开始执写入数据...')
# 写入数据库
schemaObj.write.insertInto(etsTempTable, overwrite=False)
logging.info(u'写入完成')
except Exception, e:
# e.message 2.6 不支持
logging.error(str(e))
raise Exception(str(e))
finally:
sc.stop()
网友评论