一。 gobblin部署【mapreduce模式】
1.源码编译:下载源码链接
a。解压文件 tar -zvxf incubator-gobblin-release-0.14.0.tar.gz
b。切换到解压文件夹根目录【gradle安装自行参考网上】需要保持指定的hadoop版本和线上集群一致(最起码是兼容的)
./gradlew build -PhadoopVersion=2.6.0-cdh5.14.2 -Pversion=0.14.0 -PuseHadoop2 -x test
注:若是编译过程下载jar过慢,修改下build.gradle指定从本地maven仓库或阿里云gradle库
2.安装部署
a。将编译后的gz包上传到对应的服务器
tar -zvxf incubator-gobblin-release-0.14.0.tar.gz
b。配置【此处部署采用mapreduce模式】
-
conf/gobblin-mapreduce.properties 设定自己hdfs namenode地址:如hdfs://localhost:9000
-
bin/gobblin-env.sh 设置
export GOBBLIN_JOB_CONFIG_DIR=/usr/local/gobblin/configdir #用于指定自身调度job配置文件地址 可以不设定,通过命令行参数:--conf来指定
export GOBBLIN_WORK_DIR=/user/adanalysis/test/gobblin_workdir #用于指定gobblin执行过程涉及到state/stage/output等设置 在非standalone模式下 该地址都是hdfs上存在的地址;通过--workdir来设置
export HADOOP_BIN_DIR=/opt/cloudera/parcels/CDH/lib/hadoop/bin #用于指定该机器上hadoop bin目录
-
bin/gobblin-mapreduce.sh设置【通常设置的不多】
一般来需要调整的是libjars 参数的内容,指定第三方依赖的jar包
3.测试:模拟hadoop自身的Distcp的任务
配置文件如下:
#job.template = "resource:///hdfs2hdfs.template"
# setting path
gobblin.flow.edge.input.dataset.descriptor.path=/user/dalan/tests/test_cal_logs
gobblin.flow.edge.output.dataset.descriptor.path=/user/dalan/tests/test_cal_logs_tmp
# settings hdfs
source.data.node.fs.uri=hdfs://localhost:9000 #使用时改成自己的hdfs集群namenode地址
destination.data.node.fs.uri=hdfs://localhost:9000
user.to.proxy=adanalysis #涉及到用户授权的时候 来设定指定job的用户
# gobblin settings
specExecInstance.job.type=hadoopJava
specExecInstance.job.launcher.type=MAPREDUCE
specExecInstance.job.launcher.class=org.apache.gobblin.runtime.mapreduce.MRJobLauncher
# start
# ====================================================================
# Job configurations
# ====================================================================
job.name=Distcp
# Source and destination paths to be obtained from flow config.
from=${gobblin.flow.edge.input.dataset.descriptor.path}
to=${gobblin.flow.edge.output.dataset.descriptor.path}
#Will delete files in target if not exist in source
#gobblin.copy.recursive.update=true
#gobblin.copy.recursive.delete=true
#gobblin.copy.recursive.deleteEmptyDirectories=true
#gobblin.trash.skip.trash=true
#Will make the job fail if there's any failure
#gobblin.copy.abortOnSingleDatasetFailure=true
#gobblin.copy.preserved.attributes=p
#Job properties to be resolved from source and dest data node config.
fs.uri=${source.data.node.fs.uri}
source.filebased.fs.uri=${fs.uri}
state.store.fs.uri=${fs.uri}/user/dalan/tests
target.filebased.fs.uri=${destination.data.node.fs.uri}
writer.fs.uri=${target.filebased.fs.uri}
work.dir=/user/dalan/tests/gobblin_workdir #保持是在hdfs上的地址 通过参数--workdir来指定也可以
# ====================================================================
# Distcp configurations
# ====================================================================
extract.namespace=gobblin.copy
gobblin.dataset.profile.class=org.apache.gobblin.data.management.copy.CopyableGlobDatasetFinder
# target location for copy
data.publisher.final.dir=${to}
gobblin.dataset.pattern=${from}
data.publisher.type=org.apache.gobblin.data.management.copy.publisher.CopyDataPublisher
source.class=org.apache.gobblin.data.management.copy.CopySource
writer.builder.class=org.apache.gobblin.data.management.copy.writer.FileAwareInputStreamDataWriterBuilder
converter.classes=org.apache.gobblin.converter.IdentityConverter
# =======================================
# Job Parameters to be resolved using SpecExecutor properties
# =======================================
type=${specExecInstance.job.type}
job.jars=/usr/local/gobblin/lib/*
job.lock.enabled=false # 需要zookeeper的支持 最好设置成false;若是配置gobblin-yarn时需要指定为true 指定zookeeper集群地址
job.class=${specExecInstance.job.launcher.class}
# Gobblin Hadoop Parameters
#launcher.type=${specExecInstance.job.launcher.type} #想让mapreduce在hadoop集群的yarn执行 最好不要设置该参数
4.测试
bin/gobblin-mapreduce.sh --conf configdir/gobblin-distcp.conf --workdir /user/dalan/tests/gobblin_workdir
输出结果:logs/gobblin-current.log
二。gobblin与Azkaban整合
1.azkaban配置内容调整:目录/data/azkaban/exec-server/plugins/jobtypes
a。common.properties 添加:
gobblin.home=/usr/local/gobblin
b。commonprivate.properties 添加:
gobblin.home=/usr/local/gobblin
c. gobblin/private.properties 添加:
jobtype.classpath=${gobblin.home}/lib/*
job.hdfs.jars=/resources/gobblin #指定job上传jar到hdfs指定路径
jobtype.class=azkaban.jobtype.connectors.gobblin.GobblinHadoopJob
d.其他设置项:【在job文件中设置】
classpath=/usr/local/gobblin/lib/* #gobblin的jar路径 用于上传到hdfs上
gobblin.work_dir=/user/adanalysis/test/gobblin_workdir #指定gobblin的workdir 在hdfs上地址
job.class=org.apache.gobblin.azkaban.AzkabanJobLauncher #Azkaban调度类
source.timezone=Asia/Shanghai #时区
2.测试【采用在指定机器上执行:useExecutor=3,用户需要管理员】
# jar library
# 设置gobblin jar library
framework.jars=/usr/local/gobblin/lib/*
#job.jars=/usr/local/gobblin/lib/*
#classpath=/usr/local/gobblin/lib/*
# jam args
Xmx=3G
Xms=3G
# jvm设置
jvm.args=-Xmn1g -XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+UnlockExperimentalVMOptions -XX:+G1SummarizeConcMark -XX:MaxGCPauseMillis=100 -XX:-ResizePLAB -XX:+ParallelRefProcEnabled -XX:+AlwaysPreTouch -XX:ParallelGCThreads=24 -XX:ConcGCThreads=16 -XX:G1HeapWastePercent=3 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1MixedGCLiveThresholdPercent=85 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps
[user.to](http://user.to).proxy=adanalysis
# gobblin framework
#设置gobblin work dir【由于采用的mr,该地址必须是hdfs上可读写地址】
gobblin.work_dir=/user/adanalysis/test/gobblin_workdir
# job args
type=gobblin
[job.name](http://job.name)=mysqlToHive_mysqlToHiveExtract
job.group=demos
job.description="a simple mysql2hive demo"
# 指定mr地址
mr.job.root.dir=${gobblin.work_dir}
# 指定launcher方式
job.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
# 取消lock 需要zookeeper支持
job.lock.enabled=false
#job.commit.parallelize=true
#job.commit.parallelCommits=20
# 指定hdfs namenode地址
fs.uri=[hdfs://](hdfs://nswx)127.0.0.1:9000
#local.work_dir=/usr/local/gobblin
# goblin path args
writer.fs.uri=[hdfs://](hdfs://nswx)127.0.0.1:9000
writer.staging.dir=${gobblin.work_dir}/staging
writer.output.dir=${gobblin.work_dir}/output
state.store.fs.uri=[hdfs://](hdfs://nswx)127.0.0.1:9000
state.store.dir=${gobblin.work_dir}/state
metrics.log.dir=${gobblin.work_dir}/metrics
# source args
source.class=org.apache.gobblin.source.extractor.extract.jdbc.MysqlSource
source.timezone=Asia/Shanghai
# Source properties
source.entity=wk_ad_income_daily_report
source.querybased.schema=adxbe
source.max.number.of.partitions=20
# Source connection properties
source.conn.driver=com.mysql.jdbc.Driver
source.conn.host=127.0.0.1
source.conn.username=user
source.conn.password=password
source.conn.port=3306
source.conn.timeout=500000
#指定源数据开始位置:watermark
source.querybased.watermark.type=timestamp
source.querybased.start.value=1541411045000
# Extract properties
extract.namespace=${source.querybased.schema}
extract.table.type=snapshot_only
# Property to consider the extract as full dump
# 全量抽取
extract.is.full=true
# Converter properties - Record from mysql source will be processed by the below series of converters
converter.classes=org.apache.gobblin.converter.avro.JsonIntermediateToAvroConverter
# date columns format
converter.avro.timestamp.format=yyyy-MM-dd HH:mm:ss'.0'
converter.avro.date.format=yyyy-MM-dd
converter.avro.time.format=HH:mm:ss
# Qualitychecker properties
qualitychecker.task.policies=org.apache.gobblin.policies.count.RowCountPolicy,org.apache.gobblin.policies.schema.SchemaCompatibilityPolicy
qualitychecker.task.policy.types=OPTIONAL,OPTIONAL
qualitychecker.row.policies=org.apache.gobblin.policies.schema.SchemaRowCheckPolicy
qualitychecker.row.policy.types=OPTIONAL
qualitychecker.row.err.file=${gobblin.work_dir}/rowerr
# writer
writer.builder.class=org.apache.gobblin.writer.AvroDataWriterBuilder
writer.destination.type=HDFS
#writer.file.path=output
writer.output.format=AVRO
# Publisher properties
data.publisher.type=org.apache.gobblin.publisher.BaseDataPublisher
data.publisher.final.dir=${gobblin.work_dir}/final
data.publisher.replace.final.dir=true
网友评论