一、数据准备
mysql:(固定的信息一般存储在MySQL)
city_info:城市信息表
product_info: 产品信息表
CREATE TABLE `city_info` (
`city_id` int(11) DEFAULT NULL,
`city_name` varchar(255) DEFAULT NULL,
`area` varchar(255) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
insert into `city_info`(`city_id`,`city_name`,`area`) values (1,'BEIJING','NC');
create table product_info(
product_id int(11),
product_name varchar(50),
extend_info varchar(50)
);
insert into product_info(product_id,product_name,extend_info) values (1,'product1','{"product_status":1}');
由于篇幅原因这里就只给出一条数据的样例。
Hive:
user_click:用户行为日志
create table user_click(
user_id int,
session_id string,
action_time string,
city_id int,
product_id int
) partitioned by (date string)
row format delimited fields terminated by ',';
load data local inpath '/home/hadoop/data/topn/init/user_click.txt' overwrite into table user_click partition(date='2016-05-05');
[hadoop@Hadoop001 data]$ head -10 user_click.txt
95,�2bf501a7637549c89cf55342331b15db�,2016-05-05 21:01:56�,1,72
95,�2bf501a7637549c89cf55342331b15db�,2016-05-05 21:52:26�,1,68
95,�2bf501a7637549c89cf55342331b15db�,2016-05-05 21:17:03�,1,40
95,�2bf501a7637549c89cf55342331b15db�,2016-05-05 21:32:07�,1,21
95,�2bf501a7637549c89cf55342331b15db�,2016-05-05 21:26:06�,1,63
95,�2bf501a7637549c89cf55342331b15db�,2016-05-05 21:03:11�,1,60
95,�2bf501a7637549c89cf55342331b15db�,2016-05-05 21:43:43�,1,30
95,�2bf501a7637549c89cf55342331b15db�,2016-05-05 21:09:58�,1,96
95,�2bf501a7637549c89cf55342331b15db�,2016-05-05 21:18:45�,1,71
95,�2bf501a7637549c89cf55342331b15db�,2016-05-05 21:42:39�,1,8
二、需求分析
hive表里现在并没有区域信息
区域信息我们是存放在MySQL里面的
所以我们需要用Sqoop将mysql里面的表导入带hive中
步骤:
1)city表和product_info表 放到Hive里面
2)通过user_click关联Hive里面的city_info和product_info
3)再使用窗口函数求分组内的TOPN
4)将结果导入到mysql中
三、Sqoop安装
1.下载安装包
[hadoop@Hadoop001 software]$ wget http://archive.cloudera.com/cdh5/cdh/5/sqoop-1.4.6-cdh5.7.0.tar.gz
2.解压安装包
[hadoop@Hadoop001 software]$ ll
-rw-r--r-- 1 hadoop hadoop 29966286 Jul 22 2019 sqoop-1.4.6-cdh5.7.0.tar.gz
[hadoop@Hadoop001 software]$ tar -zxvf sqoop-1.4.6-cdh5.7.0.tar.gz
[hadoop@Hadoop001 software]$ ll
drwxr-xr-x 10 hadoop hadoop 4096 Mar 24 2016 sqoop-1.4.6-cdh5.7.0
-rw-r--r-- 1 hadoop hadoop 29966286 Jul 22 2019 sqoop-1.4.6-cdh5.7.0.tar.gz
3.建立软连接到app目录下
[hadoop@Hadoop001 software]$ ln -s /home/hadoop/software/sqoop-1.4.6-cdh5.7.0 /home/hadoop/app/sqoop
4.修改个人环境变量
[hadoop@Hadoop001 ~]$ vi .bash_profile
export SQOOP_HOME=/home/hadoop/app/sqoop
export PATH=$SQOOP_HOME/bin:$PATH
5.修改sqoop配置文件
[hadoop@Hadoop001 conf]$ cp sqoop-env-template.sh sqoop-env.sh
[hadoop@Hadoop001 conf]$ vi sqoop-env.sh
export HADOOP_COMMON_HOME=/home/hadoop/app/hadoop
export HADOOP_MAPRED_HOME=/home/hadoop/app/hadoop
export HIVE_HOME=/home/hadoop/app/hive
6.拷贝mysql的jdbc驱动包mysql-connector-java-5.1.27-bin.jar和java-json.jar和hive-shims.jar和hive-common.jar到sqoop/lib目录下
[hadoop@Hadoop001 lib]$
[hadoop@Hadoop001 lib]$ cp /home/hadoop/app/hive/lib/mysql-connector-java-5.1.38.jar .
7.sqoop help 使用
[hadoop@Hadoop001 sqoop]$ sqoop help
See 'sqoop help COMMAND' for information on a specific command.
四、sqoop命令使用
1.查看mysql数据库列表
[hadoop@Hadoop001 sqoop]$ sqoop list-databases --connect jdbc:mysql://Hadoop001:3306 --password 123456 --username root
2.查看mysql库下面的表
[hadoop@Hadoop001 sqoop]$ sqoop list-tables --connect jdbc:mysql://Hadoop001:3306/ruozedata --password 123456 --username root
3.将mysql表导入到hdfs中
默认是写到hdfs的/user/hadoop/city_info目录下
sqoop import \
--connect jdbc:mysql://Hadoop001:3306/ruozedata \
--password 123456\
--username root \
#表名称
--table city_info \
#底层跑的mr任务,设置job名称
--mapreduce-job-name MySQL2HDFS_CITY_INFO \
#使用delete mode导入数据
--delete-target-dir \
#默认会根据主键来进行split,如果没有主键需要设置split字段
--split-by city_id \
#使用n个map数,默认是4个
-m 2
常用导入数据
--columns
--target-dir
想把NULL设置成自己想要的值
字符串类型 "" '' --null-string
数值类型 0 -99 --null-non-string
分隔符:
--fields-terminated-by
条件: --where "city_id >5"
SQL: --query/-e: query和table不能同时使用
4.将mysql表导入到hive中
1)创建对应的hive表
create table city_info(
city_id int,
city_name string,
area string
)
row format delimited fields terminated by '\t';
create table product_info(
product_id int,
product_name string,
extend_info string
)
row format delimited fields terminated by '\t';
2)导入数据到hive中
sqoop import \
--connect jdbc:mysql://Hadoop001:3306/ruozedata \
--password 123456 \
--username root \
--table product_info \
--hive-database default \
--hive-table product_info \
--hive-import \
--hive-overwrite \
--delete-target-dir \
--fields-terminated-by '\t' \
--split-by product_id \
-m 2
sqoop import \
--connect jdbc:mysql://Hadoop001:3306/ruozedata \
--password 123456 \
--username root \
--table city_info \
--hive-database default \
--hive-table city_info \
--hive-import \
--hive-overwrite \
--delete-target-dir \
--fields-terminated-by '\t' \
--split-by city_id \
-m 2
五.求解各个区域下最受欢迎的产品的TOP N
create TEMPORARY table tmp_product_click_basic_info
as
select u.product_id, u.city_id, c.city_name, c.area
from
(select product_id, city_id from user_click where date='2016-05-05' ) u
join
(select city_id, city_name, area from city_info ) c
on u.city_id = c.city_id;
create TEMPORARY table tmp_area_product_click_count
as
select product_id,area,count(1) click_count from tmp_product_click_basic_info group by product_id,area;
create TEMPORARY table tmp_area_product_click_count_full_info
as
select
a.product_id,a.area,a.click_count,b.product_name
from tmp_area_product_click_count a join product_info b
on a.product_id = b.product_id;
drop table if exists area_product_click_count_top3;
create table area_product_click_count_top3
row format delimited fields terminated by '\t'
as
select * from
(
select
"2016-05-05" day,product_id,product_name,area,click_count,
row_number() over(partition by area order by click_count desc) r
from tmp_area_product_click_count_full_info
) t where t.r<=3;
六、将hive结果数据导入到mysql中
1、创建mysql表
create table area_product_click_count_top3(
day varchar(15),
product_id int(11),
product_name varchar(50),
area varchar(10),
click_count int(11),
r int(10)
)
2、通过sqoop导入到mysql
sqoop export \
--connect jdbc:mysql://localhost:3306/ruozedata \
--password 123456 \
--username root \
--table area_product_click_count_top3 \
--export-dir /user/hive/warehouse/area_product_click_count_top3 \
--columns "day,product_id,product_name,area,click_count,r" \
--fields-terminated-by '\t' \
-m 2
七、使用shell脚本封装这个业务线的所有代码
思路:
- 我们离线是一天一次,是今天凌晨去运行昨天的数据
通过linux命令获取当前天,格式是2016-05-05 -1的到昨天的日期 - city_info product_info通过其他的脚本导入
- 所有的sql封装到shell
注意点:
a) 每次创建的临时表,在执行之前一定要先删除,要使用if not exits
b) 关键的执行要有日志输出
c) 统计结果输出,如何解决幂等性问题
所有的业务逻辑都封装在shell里面了,但是如何触发呢?
oozie、azkaban等调度工具,那么使用crontab触发,每天凌晨2点开始执行
脚本:
#!/bin/bash
#获取前一天
the_day_before=`date -d '1 days ago' "+%Y-%m-%d"`
echo ${the_day_before}
#导入city_info product_info表到hive中
#-------------------------------------------------------
echo 'start import table product_info from mysql to hive'
sqoop import \
--connect jdbc:mysql://Hadoop001:3306/ruozedata \
--password 123456 \
--username root \
--table product_info \
--hive-database default \
--hive-table product_info \
--hive-import \
--hive-overwrite \
--delete-target-dir \
--fields-terminated-by '\t' \
--split-by product_id \
-m 2
echo 'end import table product_info'
#-------------------------------------------------------
echo 'start import table city_info from mysql to hive'
sqoop import \
--connect jdbc:mysql://Hadoop001:3306/ruozedata \
--password 123456 \
--username root \
--table city_info \
--hive-database default \
--hive-table city_info \
--hive-import \
--hive-overwrite \
--delete-target-dir \
--fields-terminated-by '\t' \
--split-by city_id \
-m 2
echo 'end import table city_info'
#-------------------------------------------------------
#将每日的用户行为日志数据导入到分区表中
echo "start import user_click.txt into hive"
hive -e "create table if not exists user_click(user_id int, session_id string,action_time string,city_id int, product_id int
) partitioned by (date string) row format delimited fields terminated by ',';"
hive -e "alter table user_click drop if exists partition(date='${the_day_before}');"
hive -e "load data local inpath '/home/hadoop/data/user_click.txt' overwrite into table user_click partition(date='${the_day_before}');"
echo "end import user_click.txt into hive"
#-------------------------------------------------------
#求解各个区域下最受欢迎的产品的TOP N
echo "start calculate topN"
hive -e "create TEMPORARY table if not exists tmp_product_click_basic_info as select u.product_id, u.city_id, c.city_name, c.area from (select product_id, city_id from user_click where date='${the_day_before}' ) u join (select city_id, city_name, area from city_info ) c on u.city_id = c.city_id;create TEMPORARY table if not exists tmp_area_product_click_count as select product_id,area,count(1) click_count from tmp_product_click_basic_info group by product_id,area;create TEMPORARY table if not exists tmp_area_product_click_count_full_info as select a.product_id,a.area,a.click_count,b.product_name from tmp_area_product_click_count a join product_info b on a.product_id = b.product_id;drop table if exists area_product_click_count_top3;create table area_product_click_count_top3 row format delimited fields terminated by '\t' as select * from ( select '${the_day_before}' day,product_id,product_name,area,click_count,row_number() over(partition by area order by click_count desc) r from tmp_area_product_click_count_full_info) t where t.r<=3;"
echo "end calculate topN"
#-------------------------------------------------------
#将area_product_click_count_top3表从hive导入到mysql中
#删除mysql表中day字段等于前一天的
#幂等性操作
cmd="use ruozedata;delete from area_product_click_count_top3 where day='${the_day_before}'"
sudo su - mysqladmin -c "mysql -uroot -p123456 -e \"${cmd}\""
echo "success delete mysql data"
echo "start export area_product_click_count_top3 from hive to mysql"
sqoop export \
--connect jdbc:mysql://localhost:3306/ruozedata \
--password 123456 \
--username root \
--table area_product_click_count_top3 \
--export-dir /user/hive/warehouse/area_product_click_count_top3 \
--columns "day,product_id,product_name,area,click_count,r" \
--fields-terminated-by '\t' \
-m 2
echo "end export area_product_click_count_top3 from hive to mysql"
[hadoop@Hadoop001 shell]$ crontab -e
0 2 * * * sh /home/hadoop/shell/test.sh >> /tmp/test.log
mysql密码应该作为参数传递到脚本中
网友评论