美文网首页
pyspark 1.6 的数据抽取代码 插入数据 采用 data

pyspark 1.6 的数据抽取代码 插入数据 采用 data

作者: 堤岸小跑 | 来源:发表于2017-09-15 10:13 被阅读0次
spark 1.6 的数据抽取代码 插入数据 采用 dataframe下面是python版的
主要代码在 main里面 插入数据 采用 dataframe
代码简要说明: 根据 ets(抽取后的表)中的updates 更新时间 字段最大值a 去源表 slave 过滤时间大于a的,有的话插入数据库
#!/usr/bin/env python
# coding=utf-8

"""
   author:zb
   create_at:2017-9-8 09:37:45
"""
import hashlib
import os
import sys
import datetime
import json
import logging

from os import path

from pyspark.sql.types import StructField, StringType, StructType

reload(sys)
sys.setdefaultencoding('utf-8')


from pyspark import SparkContext
from pyspark.sql import HiveContext


'''
    日志模块
'''


def setLog():
    logger = logging.getLogger()
    # spark中 DEBUG 级别无法使用
    logger.setLevel(logging.INFO)  # Log等级总开关

    # 第二步,创建一个handler,用于写入日志文件
    logfile = os.path.join(path.dirname(__file__), 'logger.txt')
    fh = logging.FileHandler(logfile, mode='a')
    fh.setLevel(logging.INFO)  # 输出到file的log等级的开关

    # 第三步,再创建一个handler,用于输出到控制台
    ch = logging.StreamHandler()
    ch.setLevel(logging.INFO)  # 输出到console的log等级的开关

    # 第四步,定义handler的输出格式
    formatter = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s")
    fh.setFormatter(formatter)
    ch.setFormatter(formatter)

    # 第五步,将logger添加到handler里面
    logger.addHandler(fh)
    logger.addHandler(ch)


def md5(row):
    # 强制转码
    reload(sys)
    sys.setdefaultencoding('utf-8')
    temstr = "Learn.%s|Learn.%s|Learn.%s|Learn.%s|Learn.%s|Learn.%s" \
             % (row.id, row.name, row.status, row.type, row.scoresheetcode, str(row.updated_at))
    m = hashlib.md5()
    m.update(temstr)
    return m.hexdigest()


if __name__ == '__main__':
    setLog()
    # 定义客户标识
    cust_no = '1'
    isvalid = '1'
    slaveTempTable = 'learn_slave'
    etsTempTable = 'ets_learn'
    appname = etsTempTable + '_insert'
    sc = SparkContext(appName=appname)
    sqlContext = HiveContext(sc)
    # driver = "com.mysql.jdbc.Driver"
    dff = sqlContext.read.format("jdbc").options(url="jdbc:mysql://192.168.1.200:3306/xxx?user=root"
                                                     "&password=xxx&useUnicode=true&characterEncoding=UTF-8"
                                                     "&zeroDateTimeBehavior=convertToNull", dbtable="Learn",
                                                     driver="com.mysql.jdbc.Driver").load()
    dff.registerTempTable('learn_slave')

    dft = sqlContext.read.format("jdbc").options(url="jdbc:mysql://192.168.1.200:3307/bd_ets?user=root"
                                                     "&password=xxxx&useUnicode=true&characterEncoding=UTF-8"
                                                     "&zeroDateTimeBehavior=convertToNull", dbtable="ets_learn",
                                                       driver="com.mysql.jdbc.Driver").load()
    dft.registerTempTable('ets_learn')
    ds_ets = sqlContext.sql(" select max(updatets) as max from ets_learn ")
    pp = ds_ets.collect()[0]
    max_updates = pp.max
    slave_sql = ''
    try:
        if max_updates is not  None:
            logging.info(u"ets库中的最大时间是:" + str(max_updates))
            slave_sql = " select id, name, status, type, scoresheetcode, updated_at " \
                        "  from learn_slave where `updated_at` > '%s' " % (max_updates)
        else:
            logging.info(u"本次为初次抽取")
            slave_sql = " select id, name, status, type, scoresheetcode, updated_at" \
                        " from learn_slave  "
        ds_slave = sqlContext.sql(slave_sql)
        logging.info(u'slave 中 符合条件的记录数为:%s' % (ds_slave.count()))
        now_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        logging.info(u'开始组装数据...')
        src_fields = json.dumps({'Learn': ['id', 'name', 'status', 'type', 'scoresheetcode', 'updated_at']})
        # 字段值
        filedvlue = ds_slave.map(lambda row: (row.id, row.name, row.status, row.type, row.scoresheetcode, cust_no,
                                              isvalid, src_fields,
                                              md5(row), now_time, str(row.updated_at)))
        # 创建列
        schemaString = "id,name,status,type,scoresheetcode,cust_no,isvalid,src_fields,src_fields_md5,createts,updatets"
        fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split(",")]
        schema = StructType(fields)
        # 使用列名和字段值创建datafrom
        schemaObj = sqlContext.createDataFrame(filedvlue, schema)
        logging.info(u'组装数据完成...')
        # print schemaPeople
        # for row in schemaPeople:
        #     print row.id
        logging.info(u'开始执写入数据...')
        # 写入数据库
        schemaObj.write.insertInto(etsTempTable, overwrite=False)
        logging.info(u'写入完成')
    except Exception, e:
        # e.message 2.6 不支持
        logging.error(str(e))
        raise Exception(str(e))
    finally:
        sc.stop()



相关文章

网友评论

      本文标题:pyspark 1.6 的数据抽取代码 插入数据 采用 data

      本文链接:https://www.haomeiwen.com/subject/tbhssxtx.html