美文网首页
datax增量更新修改python启动脚本

datax增量更新修改python启动脚本

作者: yichen_china | 来源:发表于2021-10-19 10:37 被阅读0次

完整json文件如下
注意俩参数

job.difMinutes=10分
job.minDifMinutes=5分

#意思是每次脚本执行的查询时间间隔10分。

替换时间 (?) 会被替换成上次执行时间 +job.difMinutes 时间

job.content.reader.where": "update_time between (?)"

最低间隔时间,执行时间最低5分钟,低于5分钟不执行

job.minDifMinutes=5分

完整配置json如下
datax/bin/beiyao/stream.json

{
  "job": {
"minDifMinutes":5,
    "difMinutes":10,
    "content": [
      {
        "writer": {
          "parameter": {
            "username": "beiyao",
            "writeMode": "update",
            "column": [
              "id",
              "goods_no",
              "store_no",
              "stock",
              "sku_id",
              "price",
              "cost_price",
              "source_create_time",
              "source_type",
              "source_update_time"
            ],
            "splitPk": "id",
            "connection": [
              {
                "table": [
                  "zt_store_stock"
                ],
                "jdbcUrl": "jdbc:mysql://192.168.3.183:3306/beiyao"
              }
            ],
            "password": "wHm6HRNfBDYAdcjd"
          },
          "name": "mysqlwriter"
        },
        "reader": {
          "parameter": {
            "username": "mg",
            "encoding": "UTF-8",
            "column": [
              "Id",
              "goods_no",
              "store_no",
              "stock",
              "Code",
              "price",
              "cost_price",
              "update_time as create_time",
              "'tc' as source_type",
              "update_time"
            ],
            "splitPk": "Id",
            "connection": [
              {
                "table": [
                  "MG_DRUGSTORE"
                ],
                "jdbcUrl": [
                  "jdbc:sqlserver://192.168.2.222:1433;DatabaseName=bylsn"
                ]
              }
            ],
            "password": "mg",
            "where": "update_time between (?)"
          },
          "name": "sqlserverreader"
        }
      }
    ],
    "setting": {
      "speed": {
        "byte": 1048576,
        "channel": 3
      },
      "errorLimit": {
        "record": 100,
        "percentage": 0.5
      }
    }
  }
}

缓存时间json如下
根目录
datax/cache.json
key值是 json配置文件的目录名+文件全名去斜杠

{"beiyaostream.json": "2021-10-18 19:07:35"}

datax/bin 目录
datax.py 脚本如下

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import sys
import os
import signal
import subprocess
import time
import re
import socket
import json
from optparse import OptionParser
from optparse import OptionGroup
from string import Template
import codecs
import platform
import datetime
import  io
NOWTIME=datetime.datetime.now()

def isWindows():
    return platform.system() == 'Windows'

DATAX_HOME = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))

DATAX_VERSION = 'DATAX-OPENSOURCE-3.0'
if isWindows():
    codecs.register(lambda name: name == 'cp65001' and codecs.lookup('utf-8') or None)
    CLASS_PATH = ("%s/lib/*") % (DATAX_HOME)
else:
    CLASS_PATH = ("%s/lib/*:.") % (DATAX_HOME)
LOGBACK_FILE = ("%s/conf/logback.xml") % (DATAX_HOME)
DEFAULT_JVM = "-Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s/log" % (DATAX_HOME)
DEFAULT_PROPERTY_CONF = "-Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=%s -Dlogback.configurationFile=%s" % (
    DATAX_HOME, LOGBACK_FILE)
ENGINE_COMMAND = "java -server ${jvm} %s -classpath %s  ${params} com.alibaba.datax.core.Engine -mode ${mode} -jobid ${jobid} -job ${job}" % (
    DEFAULT_PROPERTY_CONF, CLASS_PATH)
REMOTE_DEBUG_CONFIG = "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,address=9999"

RET_STATE = {
    "KILL": 143,
    "FAIL": -1,
    "OK": 0,
    "RUN": 1,
    "RETRY": 2
}
CACHEFile=DATAX_HOME+'\cache.json'

def getLocalIp():
    try:
        return socket.gethostbyname(socket.getfqdn(socket.gethostname()))
    except:
        return "Unknown"


def suicide(signum, e):
    global child_process
    print >> sys.stderr, "[Error] DataX receive unexpected signal %d, starts to suicide." % (signum)

    if child_process:
        child_process.send_signal(signal.SIGQUIT)
        time.sleep(1)
        child_process.kill()
    print >> sys.stderr, "DataX Process was killed ! you did ?"
    sys.exit(RET_STATE["KILL"])


def register_signal():
    if not isWindows():
        global child_process
        signal.signal(2, suicide)
        signal.signal(3, suicide)
        signal.signal(15, suicide)


def getOptionParser():
    usage = "usage: %prog [options] job-url-or-path"
    parser = OptionParser(usage=usage)

    prodEnvOptionGroup = OptionGroup(parser, "Product Env Options",
                                     "Normal user use these options to set jvm parameters, job runtime mode etc. "
                                     "Make sure these options can be used in Product Env.")
    prodEnvOptionGroup.add_option("-j", "--jvm", metavar="<jvm parameters>", dest="jvmParameters", action="store",
                                  default=DEFAULT_JVM, help="Set jvm parameters if necessary.")
    prodEnvOptionGroup.add_option("--jobid", metavar="<job unique id>", dest="jobid", action="store", default="-1",
                                  help="Set job unique id when running by Distribute/Local Mode.")
    prodEnvOptionGroup.add_option("-m", "--mode", metavar="<job runtime mode>",
                                  action="store", default="standalone",
                                  help="Set job runtime mode such as: standalone, local, distribute. "
                                       "Default mode is standalone.")
    prodEnvOptionGroup.add_option("-p", "--params", metavar="<parameter used in job config>",
                                  action="store", dest="params",
                                  help='Set job parameter, eg: the source tableName you want to set it by command, '
                                       'then you can use like this: -p"-DtableName=your-table-name", '
                                       'if you have mutiple parameters: -p"-DtableName=your-table-name -DcolumnName=your-column-name".'
                                       'Note: you should config in you job tableName with ${tableName}.')
    prodEnvOptionGroup.add_option("-r", "--reader", metavar="<parameter used in view job config[reader] template>",
                                  action="store", dest="reader",type="string",
                                  help='View job config[reader] template, eg: mysqlreader,streamreader')
    prodEnvOptionGroup.add_option("-w", "--writer", metavar="<parameter used in view job config[writer] template>",
                                  action="store", dest="writer",type="string",
                                  help='View job config[writer] template, eg: mysqlwriter,streamwriter')
    parser.add_option_group(prodEnvOptionGroup)

    devEnvOptionGroup = OptionGroup(parser, "Develop/Debug Options",
                                    "Developer use these options to trace more details of DataX.")
    devEnvOptionGroup.add_option("-d", "--debug", dest="remoteDebug", action="store_true",
                                 help="Set to remote debug mode.")
    devEnvOptionGroup.add_option("--loglevel", metavar="<log level>", dest="loglevel", action="store",
                                 default="info", help="Set log level such as: debug, info, all etc.")
    parser.add_option_group(devEnvOptionGroup)
    return parser


def print_json(data):
    print(json.dumps(data, sort_keys=True, indent=4, separators=(', ', ': '), ensure_ascii=False))

# nums = {"name": "Mike", "age": 12}
def setCache(data,file_name):
    with open(file_name, 'w+') as r:
        # '''写入json文件'''
        json.dump(data, r)
    return data
def getCache(file_name):
    with open(file_name,'r') as r:
        data = json.load(r)  # 返回列表数据,也支持字典
        # print("读取json文件:", numbers)
        return data
# file 文件名 filenames文件名去所有斜杠用于缓存上次请求时间作为jsonkey,difminutes差值分,每次请求低于*分不会重复执行
def generateJobConfigTemplate(file,filenames,difminutes):
    nowtime =NOWTIME.strftime("%Y-%m-%d %H:%M:%S")
    readerTemplatePath =  DATAX_HOME+'/bin/'+file
    readerPar = getCache(readerTemplatePath)
    if("difMinutes" in readerPar["job"]):
        difminutes= readerPar["job"]["difMinutes"]
    minDifMinutes=difminutes/2
    if("minDifMinutes" in readerPar["job"]):
        minDifMinutes= readerPar["job"]["minDifMinutes"]
    # print(NOWTIME)
    lastTime=(NOWTIME-datetime.timedelta(minutes=difminutes)).strftime("%Y-%m-%d %H:%M:%S")
    try:
        cache= getCache(CACHEFile)
    except Exception as e:
        # 默认1小时前
        cache={filenames:lastTime}
        setCache(cache,CACHEFile)
    if(filenames in cache):
        lastTime=cache[filenames]
    else:
        cache[filenames] =lastTime
        setCache(cache, CACHEFile)

    startTime = datetime.datetime.strptime(lastTime, "%Y-%m-%d %H:%M:%S")
    endTime = (startTime + datetime.timedelta(minutes=difminutes)).strftime("%Y-%m-%d %H:%M:%S")

    difs=(datetime.datetime.strptime(nowtime, "%Y-%m-%d %H:%M:%S") - datetime.datetime.strptime(
        endTime, "%Y-%m-%d %H:%M:%S")).total_seconds()/60
    print(difs,minDifMinutes)

    # 最大时间大于当前时间 重新赋值当前时间
    if (difs <0):
        endTime=nowtime
    print(endTime,startTime.strftime("%Y-%m-%d %H:%M:%S"))
    difs=(datetime.datetime.strptime(endTime, "%Y-%m-%d %H:%M:%S") - datetime.datetime.strptime(lastTime, "%Y-%m-%d %H:%M:%S")).total_seconds()
    # 低于最低差值分钟 不执行
    if(difs<(minDifMinutes*60)):
        return None
    print(difs)
    # difs = (datetime.datetime.strptime(endTime, "%Y-%m-%d %H:%M:%S") - datetime.datetime.strptime(
    #     lastTime, "%Y-%m-%d %H:%M:%S")).total_seconds()
    print("上次时间:" + str(lastTime) + "____本次时间:" + str(endTime) + "_____当前时间:" + str(
            nowtime) + "执行时间差{" + str(difs) + "秒," + str(difs / 60) + "分," + str(difs / 3600) + "小时," + str(
            difs / 86400)+"天")


    # try:
    if ("where" in readerPar["job"]['content'][0]["reader"]["parameter"]):
        where=readerPar["job"]['content'][0]["reader"]["parameter"]["where"]
        # slot = re.findall(r"between(.*)\)", where)
        # print(slot)
        readerPar["job"]['content'][0]["reader"]["parameter"]["where"] = where.replace("(?)"," '"+lastTime+"' and '"+endTime+"' ")
        print(readerPar["job"]['content'][0]["reader"]["parameter"]["where"])
    if("querySql" in readerPar["job"]['content'][0]["reader"]["parameter"]):
        readerPar["job"]['content'][0]["reader"]["parameter"]["querySql"] = where.replace("(?)"," '"+lastTime+"' and '"+endTime+"' ")
        print(readerPar["job"]['content'][0]["reader"]["parameter"]["querySql"])
    file = readerTemplatePath.split('.json', 1)[0] + 'Start.json'
    setCache(readerPar,file)
    cache[filenames] =endTime
    return {"file":file,"cache":cache}


def readPluginTemplate(plugin):
    with open(plugin, 'r') as f:
            return json.load(f)

def isUrl(path):
    if not path:
        return False

    assert (isinstance(path, str))
    m = re.match(r"^http[s]?://\S+\w*", path.lower())
    if m:
        return True
    else:
        return False


def buildStartCommand(options, jobResource):
    commandMap = {}
    tempJVMCommand = DEFAULT_JVM
    if options.jvmParameters:
        tempJVMCommand = tempJVMCommand + " " + options.jvmParameters

    if options.remoteDebug:
        tempJVMCommand = tempJVMCommand + " " + REMOTE_DEBUG_CONFIG
        print 'local ip: ', getLocalIp()

    if options.loglevel:
        tempJVMCommand = tempJVMCommand + " " + ("-Dloglevel=%s" % (options.loglevel))

    if options.mode:
        commandMap["mode"] = options.mode

    # jobResource 可能是 URL,也可能是本地文件路径(相对,绝对)
    # jobResource = args[0]
    if not isUrl(jobResource):
        jobResource = os.path.abspath(jobResource)
        if jobResource.lower().startswith("file://"):
            jobResource = jobResource[len("file://"):]

    jobParams = ("-Dlog.file.name=%s") % (jobResource[-20:].replace('/', '_').replace('.', '_'))
    if options.params:
        jobParams = jobParams + " " + options.params

    if options.jobid:
        commandMap["jobid"] = options.jobid

    commandMap["jvm"] = tempJVMCommand
    commandMap["params"] = jobParams
    commandMap["job"] = jobResource

    return Template(ENGINE_COMMAND).substitute(**commandMap)


def printCopyright():
    print ('''
DataX (%s), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.

''' % DATAX_VERSION)
    sys.stdout.flush()

def testab(e):
    print (e)
if __name__ == "__main__":
    printCopyright()
    # testab(DATAX_VERSION)
    parser = getOptionParser()
    # args=sys.argv[1:]
    options, args = parser.parse_args(sys.argv[1:])
    print(options)
    # #
    if options.reader is not None and options.writer is not None:
        # generateJobConfigTemplate(options.reader,options.writer)
        print("RET_STATE-----------------------------啊啊啊啊啊啊啊啊啊啊啊-------------------------------")
        print(RET_STATE)
        sys.exit(RET_STATE['OK'])
    if len(args) != 1:
        parser.print_help()
        sys.exit(RET_STATE['FAIL'])
    file=args[0]
    filenames=args[0].replace("/","").replace("\\","").replace(".","")
    # 两次请求区间时差
    difminutes = 5
    if len(args)>1:
        difminutes=args[1]
    else:
        difminutes=5
    rt= generateJobConfigTemplate(file,filenames,difminutes)
    # print(('cache' in rt))

    if(rt==None or ('cache' in rt)!=True):
        print("-----------------------------间隔时间太少不需要执行-------------------------------")
        sys.exit(0)

    startCommand = buildStartCommand(options, rt["file"])
    print("RET_STATE-----------------------------啊啊啊啊啊啊啊啊啊啊啊------1-------------------------")
    print (startCommand)
    child_process = subprocess.Popen(startCommand, shell=True)
    register_signal()
    (stdout, stderr) = child_process.communicate()
    print("RET_STATE-----------------------------啊啊啊啊啊啊啊啊啊啊啊---------------2----------------")
    print  (child_process.returncode)
    if(child_process.returncode==0):
         # 执行成功缓存本次执行时间
         print(rt)
         setCache(rt["cache"],CACHEFile)
    sys.exit(child_process.returncode)

相关文章

  • datax增量更新修改python启动脚本

    完整json文件如下注意俩参数 替换时间 (?) 会被替换成上次执行时间 +job.difMinutes 时间 最...

  • DataX 数据全量,增量同步方案

    关于DataX 增量更新实现 注:参考来源文章 增量更新总体思路:从目标数据库读取一个最大值的记录,可以是Data...

  • datax梳理

    一、启动执行python datax.py {job.json}python需要2.7版本(Linux环境下自带无...

  • 修改开机启动脚本

    修改开机启动脚本

  • 【MySQL】xtrabackup实战版

    备份脚本 全量备份脚本 增量备份脚本 全量恢复 增量恢复 全量备份脚本 增量备份脚本 目录结构 其中mysql_d...

  • redis 主从模式

    1、redis配置 本文使用docker启动的redis,启动脚本如下 restart.sh 修改脚本启动三个re...

  • redis 主从模式

    1、redis配置 本文使用docker启动的redis,启动脚本如下 restart.sh 修改脚本启动三个re...

  • Android 增量更新教程

    一.什么是增量更新? 增量更新的关键在于如何理解增量一词。来想想平时我们的开发过程,往往都是今天在昨天的基础上修改...

  • 记一次monit配置服务出问题的解决方法

    首先在机器上各种基本配置部署monit,修改monitrc,写启动脚本,修改启动脚本的执行权限巴拉巴拉的搞定,按照...

  • mac python 升级 和 恢复

    本文参考 最新 python 下载地址 安装后 运行 python --version 还是没变 更新脚本 恢复脚本

网友评论

      本文标题:datax增量更新修改python启动脚本

      本文链接:https://www.haomeiwen.com/subject/ypzooltx.html