美文网首页
pyspark 1.6 的数据抽取代码 插入数据 采用 data

pyspark 1.6 的数据抽取代码 插入数据 采用 data

作者: 堤岸小跑 | 来源:发表于2017-09-15 10:13 被阅读0次
    spark 1.6 的数据抽取代码 插入数据 采用 dataframe下面是python版的
    主要代码在 main里面 插入数据 采用 dataframe
    代码简要说明: 根据 ets(抽取后的表)中的updates 更新时间 字段最大值a 去源表 slave 过滤时间大于a的,有的话插入数据库
    #!/usr/bin/env python
    # coding=utf-8
    
    """
       author:zb
       create_at:2017-9-8 09:37:45
    """
    import hashlib
    import os
    import sys
    import datetime
    import json
    import logging
    
    from os import path
    
    from pyspark.sql.types import StructField, StringType, StructType
    
    reload(sys)
    sys.setdefaultencoding('utf-8')
    
    
    from pyspark import SparkContext
    from pyspark.sql import HiveContext
    
    
    '''
        日志模块
    '''
    
    
    def setLog():
        logger = logging.getLogger()
        # spark中 DEBUG 级别无法使用
        logger.setLevel(logging.INFO)  # Log等级总开关
    
        # 第二步,创建一个handler,用于写入日志文件
        logfile = os.path.join(path.dirname(__file__), 'logger.txt')
        fh = logging.FileHandler(logfile, mode='a')
        fh.setLevel(logging.INFO)  # 输出到file的log等级的开关
    
        # 第三步,再创建一个handler,用于输出到控制台
        ch = logging.StreamHandler()
        ch.setLevel(logging.INFO)  # 输出到console的log等级的开关
    
        # 第四步,定义handler的输出格式
        formatter = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s")
        fh.setFormatter(formatter)
        ch.setFormatter(formatter)
    
        # 第五步,将logger添加到handler里面
        logger.addHandler(fh)
        logger.addHandler(ch)
    
    
    def md5(row):
        # 强制转码
        reload(sys)
        sys.setdefaultencoding('utf-8')
        temstr = "Learn.%s|Learn.%s|Learn.%s|Learn.%s|Learn.%s|Learn.%s" \
                 % (row.id, row.name, row.status, row.type, row.scoresheetcode, str(row.updated_at))
        m = hashlib.md5()
        m.update(temstr)
        return m.hexdigest()
    
    
    if __name__ == '__main__':
        setLog()
        # 定义客户标识
        cust_no = '1'
        isvalid = '1'
        slaveTempTable = 'learn_slave'
        etsTempTable = 'ets_learn'
        appname = etsTempTable + '_insert'
        sc = SparkContext(appName=appname)
        sqlContext = HiveContext(sc)
        # driver = "com.mysql.jdbc.Driver"
        dff = sqlContext.read.format("jdbc").options(url="jdbc:mysql://192.168.1.200:3306/xxx?user=root"
                                                         "&password=xxx&useUnicode=true&characterEncoding=UTF-8"
                                                         "&zeroDateTimeBehavior=convertToNull", dbtable="Learn",
                                                         driver="com.mysql.jdbc.Driver").load()
        dff.registerTempTable('learn_slave')
    
        dft = sqlContext.read.format("jdbc").options(url="jdbc:mysql://192.168.1.200:3307/bd_ets?user=root"
                                                         "&password=xxxx&useUnicode=true&characterEncoding=UTF-8"
                                                         "&zeroDateTimeBehavior=convertToNull", dbtable="ets_learn",
                                                           driver="com.mysql.jdbc.Driver").load()
        dft.registerTempTable('ets_learn')
        ds_ets = sqlContext.sql(" select max(updatets) as max from ets_learn ")
        pp = ds_ets.collect()[0]
        max_updates = pp.max
        slave_sql = ''
        try:
            if max_updates is not  None:
                logging.info(u"ets库中的最大时间是:" + str(max_updates))
                slave_sql = " select id, name, status, type, scoresheetcode, updated_at " \
                            "  from learn_slave where `updated_at` > '%s' " % (max_updates)
            else:
                logging.info(u"本次为初次抽取")
                slave_sql = " select id, name, status, type, scoresheetcode, updated_at" \
                            " from learn_slave  "
            ds_slave = sqlContext.sql(slave_sql)
            logging.info(u'slave 中 符合条件的记录数为:%s' % (ds_slave.count()))
            now_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            logging.info(u'开始组装数据...')
            src_fields = json.dumps({'Learn': ['id', 'name', 'status', 'type', 'scoresheetcode', 'updated_at']})
            # 字段值
            filedvlue = ds_slave.map(lambda row: (row.id, row.name, row.status, row.type, row.scoresheetcode, cust_no,
                                                  isvalid, src_fields,
                                                  md5(row), now_time, str(row.updated_at)))
            # 创建列
            schemaString = "id,name,status,type,scoresheetcode,cust_no,isvalid,src_fields,src_fields_md5,createts,updatets"
            fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split(",")]
            schema = StructType(fields)
            # 使用列名和字段值创建datafrom
            schemaObj = sqlContext.createDataFrame(filedvlue, schema)
            logging.info(u'组装数据完成...')
            # print schemaPeople
            # for row in schemaPeople:
            #     print row.id
            logging.info(u'开始执写入数据...')
            # 写入数据库
            schemaObj.write.insertInto(etsTempTable, overwrite=False)
            logging.info(u'写入完成')
        except Exception, e:
            # e.message 2.6 不支持
            logging.error(str(e))
            raise Exception(str(e))
        finally:
            sc.stop()
    
    
    
    

    相关文章

      网友评论

          本文标题:pyspark 1.6 的数据抽取代码 插入数据 采用 data

          本文链接:https://www.haomeiwen.com/subject/tbhssxtx.html