waterdrop 可以设置多数据源,多输出源,按照固定模板写就行
mkdir -p /data/software
cd /data/software
wget https://github.com/InterestingLab/waterdrop/releases/download/v1.1.2/waterdrop-1.1.2.zip -O waterdrop-1.1.2.zip
unzip waterdrop-1.1.2.zip
ln -s waterdrop-1.1.2 waterdrop
# 修改sparkHome路径
cd waterdrop
vim config/waterdrop-env.sh
# SPARK_HOME=${SPARK_HOME:-/opt/spark}
SPARK_HOME=/opt/cloudera/parcels/SPARK2/lib/spark2
# 测试 hive to clickHouse
cp config/batch.conf.template config/batch.conf
vim config/batch.conf
# 配置文件的内容
spark {
spark.app.name = "Waterdrop"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
}
input {
hive {
pre_sql = "select * from access.nginx_msg_detail"
table_name = "access_log"
}
}
filter {
remove {
source_field = ["minute", "hour"]
}
}
output {
clickhouse {
host = "your.clickhouse.host:8123"
database = "waterdrop"
table = "access_log"
fields = ["date", "datetime", "hostname", "uri", "http_code", "request_time", "data_size", "domain"]
username = "username"
password = "password"
}
}
【此代码为引用其他文章】
waterdrop 执行流程:
#第一步:
start-waterdrop.sh->exec ${SPARK_HOME}/bin/spark-submit --class io.github.interestinglab.waterdrop.Waterdrop
#第二步:
在waterdrop中:
调用entrypoint(configFilePath):
1. 获取配置文件对象
2. 根据是否为Batch还是Streaming引擎创建对应的输入,输出,过滤对象
3. 检查相关配置
4. 如果是Batch处理,那么调用: batchProcessing(sparkSession, configBuilder, staticInputs, filters, outputs)函数
在batchProcessig中:
1. 取出第一个ds;
2. 执行showWaterdropAsciiLogo()函数,表明waterdrop真正启动;
3. 如果输入源非空,那么过滤,然后输出结果 #(其实这块我是有疑问的)
具体表现在:
private def batchProcessing{
val headDs = registerInputTempViewWithHead(staticInputs, sparkSession)
if (staticInputs.nonEmpty) {
var ds = headDs
for (f <- filters) {
ds = filterProcess(sparkSession, f, ds)
registerFilterTempView(f, ds)
}
outputs.foreach(p => {
outputProcess(sparkSession, p, ds) #为什么只有第一个ds过滤,其他的呢?
})else {
throw new ConfigRuntimeException("Input must be configured at least once.")
}
}
网友评论