今天看了下需求,网上找了好久,基本上文章都没有直接要害,要么azkaban直接传变量到shell去直接输出,要么就是链接关系型数据库,直接用shell调用hive脚本并且根据时间传进去的没找到,今天搞出来了,特地做了下总结。
需求:
1.将hbase表数据同步到hive的外部表
2.根据时间将外部表同步到内部表中
根据需求我们将操作如下步骤进行实现:
1.hbase中创建数据表并插入数据
-- 创建表
create 'hbase_emp_table','info'
--向表中插入数据
hbase(main):054:0* put 'hbase_emp_table','101','info:ename','tang tang'
hbase(main):057:0* put 'hbase_emp_table','101','info:edate','2021-02-23 11:11:11'
hbase(main):061:0* put 'hbase_emp_table','102','info:ename','kong kong'
hbase(main):064:0* put 'hbase_emp_table','102','info:edate','2021-02-23 12:12:12'
--查询表数据
hbase(main):068:0* scan 'hbase_emp_table'
ROW COLUMN+CELL
101 column=info:edate, timestamp=1614047162006, value=2021-02-23 11:11:11
101 column=info:ename, timestamp=1614047124996, value=tang tang
102 column=info:edate, timestamp=1614047207389, value=2021-02-23 12:12:12
102 column=info:ename, timestamp=1614047192853, value=kong kong
2 row(s)
Took 0.0479 seconds
2.hive中创建同步表和结果表
--ods层: 这里是同步hbase表过来的即直接操作hbase表中的数据
CREATE EXTERNAL TABLE IF NOT EXISTS ods_emp_table(
eId string,
eName string,
eDate string -- 2021-02-23 12:12:12
)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:ename,info:edate")
TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table");
--fdm层:
CREATE TABLE IF NOT EXISTS fdm_emp_table(
eId string,
eName string,
eDate string
) COMMENT '员工模型层表'
PARTITIONED BY(dt STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS ORC
LOCATION '/user/hive/warehouse/test';
3.创建作业文件脚本
test_ods_to_fdm.job :
# ods to fdm job
type=command
job_param=${a}
command=sh /opt/module/azkaban/test/test_ods_to_fdm.sh "${job_param}"
4.创建 shell脚本
test_ods_to_fdm.sh :
#!/bin/bash
echo "azkaban传入的参数为: $1"
hive=/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/hive/bin/hive
APP=nfdw
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$1" ] ;then
do_date=$1
else
do_date=`date -d "-1 day" +%F`
fi
sql="use nfdw;
insert overwrite table ${APP}.fdm_emp_table partition(dt='$do_date')
select eId,eName,SUBSTR(eDate,1,10) as eDate from ${APP}.ods_emp_table where SUBSTR(eDate,1,10) = '$do_date';"
$hive -e "$sql"
5.打包任务脚本
[root@cdh101 test]# zip chenggong.zip test_ods_to_fdm.job
如图:
image.png
6.启动azkaban并访问web调度页面
[root@cdh101 azkaban-exec-server-3.84.4]# bin/start-exec.sh
[root@cdh101 azkaban-exec-server-3.84.4]# curl -G "localhost:$(<./executor.port)/executor?action=activate" && echo
{"status":"success"}
[root@cdh101 azkaban-web-server-3.84.4]# bin/start-web.sh
访问地址:http://cdh101:8081/
7.在web页面中创建项目 image.png
8.进入刚刚创建的项目并上传zip包 image.png
9.在任务执行前设置传输时间变量
image.png10.查看调度任务的执行结果 image.png
11.任务执行成功则查询hive表数据是否同步过来
image.png目前为止,已完成整个任务的调度,其实还挺简单的。。。
12. 数据的同步
Datax读取Hive数据写入到Mysql,并传递日期参数 :https://blog.csdn.net/wFitting/article/details/107467099
网友评论