美文网首页
datax BetweenTime.py 启动文件

datax BetweenTime.py 启动文件

作者: yichen_china | 来源:发表于2022-02-11 10:30 被阅读0次

    启动文件

    python27 betweenTime.py  --filename="prod/liansuo/drug_sale.json" --forwardTime=600 --difminutes=5
    最低间隔时间 difminutes 两次间隔时间低于这个时间不会重复执行
    文件名 filename  执行的json配置文件名
    查询前移时间秒 forwardTime 上次查询时间为6:30 设置每小时执行一次,并且设置前移时间600秒 下次执行时间是6.20 -7.30;再下次是7.20-8.30以此类推
    
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import sys
    import os
    import signal
    import argparse
    import subprocess
    import time
    import re
    import socket
    import json
    from optparse import OptionParser
    from optparse import OptionGroup
    from string import Template
    import codecs
    import platform
    import datetime
    reload(sys)
    sys.setdefaultencoding('utf8')
    NOWTIME=datetime.datetime.now()
    
    def isWindows():
        return platform.system() == 'Windows'
    
    DATAX_HOME = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
    
    DATAX_VERSION = 'DATAX-OPENSOURCE-3.0'
    if isWindows():
        codecs.register(lambda name: name == 'cp65001' and codecs.lookup('utf-8') or None)
        CLASS_PATH = ("%s/lib/*") % (DATAX_HOME)
    else:
        CLASS_PATH = ("%s/lib/*:.") % (DATAX_HOME)
    LOGBACK_FILE = ("%s/conf/logback.xml") % (DATAX_HOME)
    DEFAULT_JVM = "-Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s/log" % (DATAX_HOME)
    DEFAULT_PROPERTY_CONF = "-Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=%s -Dlogback.configurationFile=%s" % (
        DATAX_HOME, LOGBACK_FILE)
    ENGINE_COMMAND = "java -server ${jvm} %s -classpath %s  ${params} com.alibaba.datax.core.Engine -mode ${mode} -jobid ${jobid} -job ${job}" % (
        DEFAULT_PROPERTY_CONF, CLASS_PATH)
    REMOTE_DEBUG_CONFIG = "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,address=9999"
    
    RET_STATE = {
        "KILL": 143,
        "FAIL": -1,
        "OK": 0,
        "RUN": 1,
        "RETRY": 2
    }
    CACHEFile=DATAX_HOME+'/cache.json'
    def getCacheFileName(filename=""):
        return  DATAX_HOME+'/'+filename+'_cache.json'
    
    def getLocalIp():
        try:
            return socket.gethostbyname(socket.getfqdn(socket.gethostname()))
        except:
            return "Unknown"
    
    
    def suicide(signum, e):
        global child_process
        print >> sys.stderr, "[Error] DataX receive unexpected signal %d, starts to suicide." % (signum)
    
        if child_process:
            child_process.send_signal(signal.SIGQUIT)
            time.sleep(1)
            child_process.kill()
        print >> sys.stderr, "DataX Process was killed ! you did ?"
        sys.exit(RET_STATE["KILL"])
    
    
    def register_signal():
        if not isWindows():
            global child_process
            signal.signal(2, suicide)
            signal.signal(3, suicide)
            signal.signal(15, suicide)
    
    
    def getOptionParser():
        usage = "usage: %prog [options] job-url-or-path"
        parser = OptionParser(usage=usage)
    
        prodEnvOptionGroup = OptionGroup(parser, "Product Env Options",
                                         "Normal user use these options to set jvm parameters, job runtime mode etc. "
                                         "Make sure these options can be used in Product Env.")
        prodEnvOptionGroup.add_option("-j", "--jvm", metavar="<jvm parameters>", dest="jvmParameters", action="store",
                                      default=DEFAULT_JVM, help="Set jvm parameters if necessary.")
        prodEnvOptionGroup.add_option("--jobid", metavar="<job unique id>", dest="jobid", action="store", default="-1",
                                      help="Set job unique id when running by Distribute/Local Mode.")
        prodEnvOptionGroup.add_option("-m", "--mode", metavar="<job runtime mode>",
                                      action="store", default="standalone",
                                      help="Set job runtime mode such as: standalone, local, distribute. "
                                           "Default mode is standalone.")
        prodEnvOptionGroup.add_option("-p", "--params", metavar="<parameter used in job config>",
                                      action="store", dest="params",
                                      help='Set job parameter, eg: the source tableName you want to set it by command, '
                                           'then you can use like this: -p"-DtableName=your-table-name", '
                                           'if you have mutiple parameters: -p"-DtableName=your-table-name -DcolumnName=your-column-name".'
                                           'Note: you should config in you job tableName with ${tableName}.')
        prodEnvOptionGroup.add_option("-r", "--reader", metavar="<parameter used in view job config[reader] template>",
                                      action="store", dest="reader",type="string",
                                      help='View job config[reader] template, eg: mysqlreader,streamreader')
        prodEnvOptionGroup.add_option("-w", "--writer", metavar="<parameter used in view job config[writer] template>",
                                      action="store", dest="writer",type="string",
                                      help='View job config[writer] template, eg: mysqlwriter,streamwriter')
        parser.add_option_group(prodEnvOptionGroup)
    
        devEnvOptionGroup = OptionGroup(parser, "Develop/Debug Options",
                                        "Developer use these options to trace more details of DataX.")
        devEnvOptionGroup.add_option("-d", "--debug", dest="remoteDebug", action="store_true",
                                     help="Set to remote debug mode.")
        devEnvOptionGroup.add_option("--loglevel", metavar="<log level>", dest="loglevel", action="store",
                                     default="info", help="Set log level such as: debug, info, all etc.")
        parser.add_option_group(devEnvOptionGroup)
        return parser
    
    
    def print_json(data):
        print(json.dumps(data, sort_keys=True, indent=4, separators=(', ', ': '), ensure_ascii=False))
    
    # nums = {"name": "Mike", "age": 12}
    def setCache(data,file_name):
        with open(file_name, 'w+') as r:
            # '''写入json文件'''
            json.dump(data, r)
        return data
    def getCache(file_name):
        with open(file_name,'r') as r:
            data = json.load(r)  # 返回列表数据,也支持字典
            # print("读取json文件:", numbers)
            return data
    # file 文件名 filenames文件名去所有斜杠用于缓存上次请求时间作为jsonkey,difminutes差值分,每次请求低于*分不会重复执行
    #forwardTime 前移时间秒  每次查询时间 向前增加一定时间,防止系统时差丢失数据 比如查询10:00:00-11:00:00  前移时间是10秒  ,实际执行sql是 09:50:00-11:00:00
    def generateJobConfigTemplate(file,filenames,difminutes,forwardTime=0):
        CACHEFile =getCacheFileName(filenames)
        nowtime =NOWTIME.strftime("%Y-%m-%d %H:%M:%S")
        readerTemplatePath =  DATAX_HOME+'/bin/'+file
        readerPar = getCache(readerTemplatePath)
        if("difMinutes" in readerPar["job"]):
            difminutes= readerPar["job"]["difMinutes"]
        minDifMinutes=difminutes/2
        if("minDifMinutes" in readerPar["job"]):
            minDifMinutes= readerPar["job"]["minDifMinutes"]
        # print(NOWTIME)
        lastTime=(NOWTIME-datetime.timedelta(minutes=difminutes)).strftime("%Y-%m-%d %H:%M:%S")
        try:
            cache= getCache(CACHEFile)
        except Exception as e:
            # 默认1小时前
            cache={filenames:lastTime}
            setCache(cache,CACHEFile)
        if(filenames in cache):
            lastTime=cache[filenames]
        else:
            cache[filenames] =lastTime
            setCache(cache, CACHEFile)
    
        startTime = datetime.datetime.strptime(lastTime, "%Y-%m-%d %H:%M:%S")
        endTime = (startTime + datetime.timedelta(minutes=difminutes)).strftime("%Y-%m-%d %H:%M:%S")
    
        difs=(datetime.datetime.strptime(nowtime, "%Y-%m-%d %H:%M:%S") - datetime.datetime.strptime(
            endTime, "%Y-%m-%d %H:%M:%S")).total_seconds()/60
        # print(difs,minDifMinutes)
    
        # 最大时间大于当前时间 重新赋值当前时间
        if (difs <0):
            endTime=nowtime
        # print(endTime,startTime.strftime("%Y-%m-%d %H:%M:%S"))
        difs=(datetime.datetime.strptime(endTime, "%Y-%m-%d %H:%M:%S") - datetime.datetime.strptime(lastTime, "%Y-%m-%d %H:%M:%S")).total_seconds()
        # 低于最低差值分钟 不执行
        if(difs<(minDifMinutes*60)):
            return None
        # print(difs)
        # difs = (datetime.datetime.strptime(endTime, "%Y-%m-%d %H:%M:%S") - datetime.datetime.strptime(
        #     lastTime, "%Y-%m-%d %H:%M:%S")).total_seconds()
        print("上次时间:" + str(lastTime) + "____本次时间:" + str(endTime) + "_____当前时间:" + str(
                nowtime) + "执行时间差{" + str(difs) + "秒," + str(difs / 60) + "分," + str(difs / 3600) + "小时," + str(
                difs / 86400)+"天")
        # print (forwardTime)
        lastTime2= lastTime
        if(forwardTime is not  None and forwardTime>0):
            lastTime2 = (startTime - datetime.timedelta(seconds=forwardTime)).strftime("%Y-%m-%d %H:%M:%S")
        # print (lastTime)
        # print (lastTime2)
        # try:
        if ("where" in readerPar["job"]['content'][0]["reader"]["parameter"]):
            where=readerPar["job"]['content'][0]["reader"]["parameter"]["where"]
            # slot = re.findall(r"between(.*)\)", where)
            # print(slot)
            readerPar["job"]['content'][0]["reader"]["parameter"]["where"] = where.replace("(?)"," '"+lastTime2+"' and '"+endTime+"' ")
            print(readerPar["job"]['content'][0]["reader"]["parameter"]["where"])
        if("querySql" in readerPar["job"]['content'][0]["reader"]["parameter"]):
            readerPar["job"]['content'][0]["reader"]["parameter"]["querySql"] = where.replace("(?)"," '"+lastTime2+"' and '"+endTime+"' ")
            print(readerPar["job"]['content'][0]["reader"]["parameter"]["querySql"])
        file = readerTemplatePath.split('.json', 1)[0] + 'Start.json'
        setCache(readerPar,file)
        cache[filenames] =endTime
        return {"file":file,"cache":cache}
    
    
    def readPluginTemplate(plugin):
        with open(plugin, 'r') as f:
                return json.load(f)
    
    def isUrl(path):
        if not path:
            return False
    
        assert (isinstance(path, str))
        m = re.match(r"^http[s]?://\S+\w*", path.lower())
        if m:
            return True
        else:
            return False
    
    
    def buildStartCommand(options, jobResource):
        commandMap = {}
        tempJVMCommand = DEFAULT_JVM
        if options.jvmParameters:
            tempJVMCommand = tempJVMCommand + " " + options.jvmParameters
    
        if options.remoteDebug:
            tempJVMCommand = tempJVMCommand + " " + REMOTE_DEBUG_CONFIG
            print 'local ip: ', getLocalIp()
    
        if options.loglevel:
            tempJVMCommand = tempJVMCommand + " " + ("-Dloglevel=%s" % (options.loglevel))
    
        if options.mode:
            commandMap["mode"] = options.mode
    
        # jobResource 可能是 URL,也可能是本地文件路径(相对,绝对)
        # jobResource = args[0]
        if not isUrl(jobResource):
            jobResource = os.path.abspath(jobResource)
            if jobResource.lower().startswith("file://"):
                jobResource = jobResource[len("file://"):]
    
        jobParams = ("-Dlog.file.name=%s") % (jobResource[-20:].replace('/', '_').replace('.', '_'))
        if options.params:
            jobParams = jobParams + " " + options.params
    
        if options.jobid:
            commandMap["jobid"] = options.jobid
    
        commandMap["jvm"] = tempJVMCommand
        commandMap["params"] = jobParams
        commandMap["job"] = jobResource
    
        return Template(ENGINE_COMMAND).substitute(**commandMap)
    
    
    def printCopyright():
        print ('''
    DataX (%s), From Alibaba !
    Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
    
    ''' % DATAX_VERSION)
        sys.stdout.flush()
    
    def testab(e):
        print (e)
    if __name__ == "__main__":
        printCopyright()
        parserb = getOptionParser()
        options,b = parserb.parse_args([])
        parser = argparse.ArgumentParser(description='manual to this script')
        parser.add_argument("--difminutes", type=str, default=5)
        parser.add_argument("--forwardTime", type=int, default=0)
        parser.add_argument("--filename", type=str, default=None)
        args = parser.parse_args()
    
    
        print("最低间隔时间"+str(args.difminutes))
        print("文件名:--filename"+args.filename)
        print("查询前移时间秒"+str(args.forwardTime))
        # print(options)
        print("RET_STATE-----------------------------请检查配置文件json参数-------------------------------")
        # sys.exit(child_process.returncode)
        # #
        if options.reader is not None  and options.writer is not None:
            # generateJobConfigTemplate(options.reader,options.writer)
            print("RET_STATE-----------------------------请检查配置文件json参数-------------------------------")
            print(RET_STATE)
            sys.exit(RET_STATE['OK'])
        if args.filename is None:
            print("RET_STATE-----------------------------请检查启动参数-----(--filename='prod/liansuo/drug_sale.json' --forwardTime=600 --difminutes=5)--------------------------")
            parser.print_help()
            sys.exit(RET_STATE['FAIL'])
        file=args.filename
        filenames=file.replace("/","_").replace("\\","_").replace(".","_")
        # 两次请求区间时差
        difminutes = 5
        if args.difminutes is not None:
            difminutes=args.difminutes
        forwardTime=0
        if args.forwardTime is not None:
            forwardTime=args.forwardTime
        rt= generateJobConfigTemplate(file,filenames,difminutes,forwardTime)
        # print(('cache' in rt))
    
        if(rt==None or ('cache' in rt)!=True):
            print("-----------------------------间隔时间太少不需要执行-------------------------------")
            sys.exit(0)
    
        startCommand = buildStartCommand(options, rt["file"])
        print("RET_STATE-----------------------------开始执行请等待-------------------------------")
        child_process = subprocess.Popen(startCommand, shell=True)
        register_signal()
        (stdout, stderr) = child_process.communicate()
        if(child_process.returncode==0):
             # 执行成功缓存本次执行时间 有脏数据也会按成功处理(比如部分数据执行写入失败)
             # print(child_process.)
             print("END-----------------------------执行成功------------------------------")
             CACHEFile=getCacheFileName(filenames)
             setCache(rt["cache"],CACHEFile)
        else:
            print("END-----------------------------执行失败-------------------------------")
        sys.exit(child_process.returncode)
    
    

    执行同步配置 参考

    /bin/prod/sm_customer_company.json
    配置文件参数

    最低间隔时间minDifMinutes 两次间隔时间低于这个时间不会重复执行
    文件名 filename 执行的json配置文件名
    查询最长前移时间分钟difMinutes 没有缓存上次时间的情况执行当前时间和前移时间区间的数据,执行完缓存当前时间,下次执行就这个缓存时间到下次查询的时间了,不会前移了

    {
      "job": {
        "minDifMinutes": 1,
        "difMinutes": 1440, 
        "setting": {
          "speed": {
            "channel": 3,
            "byte": 1048576
          },
          "errorLimit": {
            "record": 1000,
            "percentage": 0.5
          }
        },
        "content": [
          {
            "reader": {
              "name": "mysqlreader",
              "parameter": {
                "encoding": "UTF-8",
                "username": "beiyzgtai1",
                "password": "ekcxbyj",
                "column": [
                    "customer_code",
                    "ban_classification",
                    "is_cold_medicine",
                    "is_prescription",
                    "is_insulin",
                    "is_del"
                ],
                "where": "",
                "splitPk": "customer_code",
                "connection": [
                  {
                    "table": [
                      "sm_customer_company"
                    ],
                    "jdbcUrl": [
                      "jdbc:mysql://rm-8vbif77yv67jo.mysql.zhangbei.rds.aliyuncs.com:3306/beiyaozhongtai"
                    ]
                  }
                ]
              }
            },
            "writer": {
              "name": "mysqlwriter",
              "parameter": {
                "writeMode": "update",
                "column": [
                    "customer_code",
                    "ban_classification",
                    "is_cold_medicine",
                    "is_prescription",
                    "is_insulin",
                    "is_del"
                ],
                "splitPk": "customer_code",
                "connection": [
                  {
                    "table": [
                      "company_control_pin"
                    ],
                    "jdbcUrl": "jdbc:mysql://rm-8vbif77yv6gb1p4g7jo.mysql.zhangbei.rds.aliyuncs.com:3306/b2b"
                  }
                ],
                "username": "bb",
                "password": "ekcbyj"
              }
            }
          }
        ]
      }
    }
    
    

    相关文章

      网友评论

          本文标题:datax BetweenTime.py 启动文件

          本文链接:https://www.haomeiwen.com/subject/eheakrtx.html