美文网首页Flink实践
关于gobblin应用-mapreduce+azkaban

关于gobblin应用-mapreduce+azkaban

作者: 神奇的考拉 | 来源:发表于2019-11-06 15:31 被阅读0次

    一。 gobblin部署【mapreduce模式】

    1.源码编译:下载源码链接

    a。解压文件 tar -zvxf incubator-gobblin-release-0.14.0.tar.gz
    

    b。切换到解压文件夹根目录【gradle安装自行参考网上】需要保持指定的hadoop版本和线上集群一致(最起码是兼容的)

     ./gradlew build -PhadoopVersion=2.6.0-cdh5.14.2 -Pversion=0.14.0 -PuseHadoop2 -x test
    
    注:若是编译过程下载jar过慢,修改下build.gradle指定从本地maven仓库或阿里云gradle库
    

    2.安装部署

    a。将编译后的gz包上传到对应的服务器

       tar -zvxf incubator-gobblin-release-0.14.0.tar.gz
    

    b。配置【此处部署采用mapreduce模式】

       export GOBBLIN_JOB_CONFIG_DIR=/usr/local/gobblin/configdir  #用于指定自身调度job配置文件地址 可以不设定,通过命令行参数:--conf来指定
       export GOBBLIN_WORK_DIR=/user/adanalysis/test/gobblin_workdir #用于指定gobblin执行过程涉及到state/stage/output等设置 在非standalone模式下 该地址都是hdfs上存在的地址;通过--workdir来设置
      export HADOOP_BIN_DIR=/opt/cloudera/parcels/CDH/lib/hadoop/bin #用于指定该机器上hadoop bin目录
    
    • bin/gobblin-mapreduce.sh设置【通常设置的不多】

      一般来需要调整的是libjars 参数的内容,指定第三方依赖的jar包

    3.测试:模拟hadoop自身的Distcp的任务

    配置文件如下:

    #job.template = "resource:///hdfs2hdfs.template"
    # setting path
    gobblin.flow.edge.input.dataset.descriptor.path=/user/dalan/tests/test_cal_logs
    gobblin.flow.edge.output.dataset.descriptor.path=/user/dalan/tests/test_cal_logs_tmp
    
    # settings hdfs
    source.data.node.fs.uri=hdfs://localhost:9000 #使用时改成自己的hdfs集群namenode地址
    destination.data.node.fs.uri=hdfs://localhost:9000
    
    user.to.proxy=adanalysis #涉及到用户授权的时候 来设定指定job的用户
    # gobblin settings
    specExecInstance.job.type=hadoopJava   
    specExecInstance.job.launcher.type=MAPREDUCE
    specExecInstance.job.launcher.class=org.apache.gobblin.runtime.mapreduce.MRJobLauncher
    
    # start
    # ====================================================================
    # Job configurations
    # ====================================================================
    job.name=Distcp
    
    # Source and destination paths to be obtained from flow config.
    from=${gobblin.flow.edge.input.dataset.descriptor.path}
    to=${gobblin.flow.edge.output.dataset.descriptor.path}
    
    #Will delete files in target if not exist in source
    #gobblin.copy.recursive.update=true
    #gobblin.copy.recursive.delete=true
    #gobblin.copy.recursive.deleteEmptyDirectories=true
    #gobblin.trash.skip.trash=true
    
    #Will make the job fail if there's any failure
    #gobblin.copy.abortOnSingleDatasetFailure=true
    
    #gobblin.copy.preserved.attributes=p
    
    #Job properties to be resolved from source and dest data node config.
    fs.uri=${source.data.node.fs.uri}
    source.filebased.fs.uri=${fs.uri}
    state.store.fs.uri=${fs.uri}/user/dalan/tests
    target.filebased.fs.uri=${destination.data.node.fs.uri}
    writer.fs.uri=${target.filebased.fs.uri}
    
    work.dir=/user/dalan/tests/gobblin_workdir #保持是在hdfs上的地址 通过参数--workdir来指定也可以
    
    # ====================================================================
    # Distcp configurations
    # ====================================================================
    extract.namespace=gobblin.copy
    
    gobblin.dataset.profile.class=org.apache.gobblin.data.management.copy.CopyableGlobDatasetFinder
    
    # target location for copy
    data.publisher.final.dir=${to}
    gobblin.dataset.pattern=${from}
    
    data.publisher.type=org.apache.gobblin.data.management.copy.publisher.CopyDataPublisher
    source.class=org.apache.gobblin.data.management.copy.CopySource
    writer.builder.class=org.apache.gobblin.data.management.copy.writer.FileAwareInputStreamDataWriterBuilder
    converter.classes=org.apache.gobblin.converter.IdentityConverter
    
    # =======================================
    # Job Parameters to be resolved using SpecExecutor properties
    # =======================================
    type=${specExecInstance.job.type}
    
    job.jars=/usr/local/gobblin/lib/*
    job.lock.enabled=false # 需要zookeeper的支持 最好设置成false;若是配置gobblin-yarn时需要指定为true 指定zookeeper集群地址
    job.class=${specExecInstance.job.launcher.class} 
    # Gobblin Hadoop Parameters
    #launcher.type=${specExecInstance.job.launcher.type} #想让mapreduce在hadoop集群的yarn执行 最好不要设置该参数
    

    4.测试

    bin/gobblin-mapreduce.sh --conf configdir/gobblin-distcp.conf --workdir /user/dalan/tests/gobblin_workdir

    输出结果:logs/gobblin-current.log

    二。gobblin与Azkaban整合

    1.azkaban配置内容调整:目录/data/azkaban/exec-server/plugins/jobtypes

    a。common.properties 添加:

    gobblin.home=/usr/local/gobblin
    

    b。commonprivate.properties 添加:

    gobblin.home=/usr/local/gobblin
    

    c. gobblin/private.properties 添加:

      jobtype.classpath=${gobblin.home}/lib/*
      job.hdfs.jars=/resources/gobblin #指定job上传jar到hdfs指定路径
      jobtype.class=azkaban.jobtype.connectors.gobblin.GobblinHadoopJob
    

    d.其他设置项:【在job文件中设置】

     classpath=/usr/local/gobblin/lib/*  #gobblin的jar路径 用于上传到hdfs上
     gobblin.work_dir=/user/adanalysis/test/gobblin_workdir   #指定gobblin的workdir 在hdfs上地址
     job.class=org.apache.gobblin.azkaban.AzkabanJobLauncher #Azkaban调度类
     source.timezone=Asia/Shanghai #时区
    

    2.测试【采用在指定机器上执行:useExecutor=3,用户需要管理员】

    # jar library
    # 设置gobblin jar library
    framework.jars=/usr/local/gobblin/lib/*
    #job.jars=/usr/local/gobblin/lib/*
    #classpath=/usr/local/gobblin/lib/*
    # jam args
    Xmx=3G
    Xms=3G
    # jvm设置
    jvm.args=-Xmn1g -XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+UnlockExperimentalVMOptions -XX:+G1SummarizeConcMark -XX:MaxGCPauseMillis=100 -XX:-ResizePLAB -XX:+ParallelRefProcEnabled -XX:+AlwaysPreTouch -XX:ParallelGCThreads=24 -XX:ConcGCThreads=16 -XX:G1HeapWastePercent=3 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1MixedGCLiveThresholdPercent=85 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps
    [user.to](http://user.to).proxy=adanalysis
    # gobblin framework
    
    #设置gobblin work dir【由于采用的mr,该地址必须是hdfs上可读写地址】
    gobblin.work_dir=/user/adanalysis/test/gobblin_workdir
    
    # job args
    type=gobblin
    [job.name](http://job.name)=mysqlToHive_mysqlToHiveExtract
    job.group=demos
    job.description="a simple mysql2hive demo"
    
    # 指定mr地址
    mr.job.root.dir=${gobblin.work_dir}
    
    # 指定launcher方式
    
    job.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
    
    # 取消lock 需要zookeeper支持
    job.lock.enabled=false
    
    #job.commit.parallelize=true
    #job.commit.parallelCommits=20
    
    # 指定hdfs namenode地址
    
    fs.uri=[hdfs://](hdfs://nswx)127.0.0.1:9000
    
    #local.work_dir=/usr/local/gobblin
    # goblin path args
    writer.fs.uri=[hdfs://](hdfs://nswx)127.0.0.1:9000
    writer.staging.dir=${gobblin.work_dir}/staging
    writer.output.dir=${gobblin.work_dir}/output
    
    state.store.fs.uri=[hdfs://](hdfs://nswx)127.0.0.1:9000
    state.store.dir=${gobblin.work_dir}/state
    
    metrics.log.dir=${gobblin.work_dir}/metrics
    
    # source args
    source.class=org.apache.gobblin.source.extractor.extract.jdbc.MysqlSource
    source.timezone=Asia/Shanghai
    # Source properties
    source.entity=wk_ad_income_daily_report
    source.querybased.schema=adxbe
    source.max.number.of.partitions=20
    
    # Source connection properties
    source.conn.driver=com.mysql.jdbc.Driver
    source.conn.host=127.0.0.1
    source.conn.username=user
    source.conn.password=password
    source.conn.port=3306
    source.conn.timeout=500000
    
    #指定源数据开始位置:watermark
    
    source.querybased.watermark.type=timestamp
    source.querybased.start.value=1541411045000
    
    # Extract properties
    extract.namespace=${source.querybased.schema}
    extract.table.type=snapshot_only
    # Property to consider the extract as full dump
    # 全量抽取
    extract.is.full=true
    # Converter properties - Record from mysql source will be processed by the below series of converters
    converter.classes=org.apache.gobblin.converter.avro.JsonIntermediateToAvroConverter
    
    # date columns format
    converter.avro.timestamp.format=yyyy-MM-dd HH:mm:ss'.0'
    converter.avro.date.format=yyyy-MM-dd
    converter.avro.time.format=HH:mm:ss
    
    # Qualitychecker properties
    qualitychecker.task.policies=org.apache.gobblin.policies.count.RowCountPolicy,org.apache.gobblin.policies.schema.SchemaCompatibilityPolicy
    qualitychecker.task.policy.types=OPTIONAL,OPTIONAL
    qualitychecker.row.policies=org.apache.gobblin.policies.schema.SchemaRowCheckPolicy
    qualitychecker.row.policy.types=OPTIONAL
    qualitychecker.row.err.file=${gobblin.work_dir}/rowerr
    
    # writer
    writer.builder.class=org.apache.gobblin.writer.AvroDataWriterBuilder
    writer.destination.type=HDFS
    #writer.file.path=output
    writer.output.format=AVRO
    
    # Publisher properties
    data.publisher.type=org.apache.gobblin.publisher.BaseDataPublisher
    data.publisher.final.dir=${gobblin.work_dir}/final
    data.publisher.replace.final.dir=true
    

    三。其他

    1. gobblin配置参数
    2. gobblin on yarn
    3. gobblin架构组件
    4. gobblin源码

    相关文章

      网友评论

        本文标题:关于gobblin应用-mapreduce+azkaban

        本文链接:https://www.haomeiwen.com/subject/sumubctx.html