大数据常用数据采集工具
离线
Sqoop: 基于MR的大数据量分布式的离线数据采集工具
DataX:阿里开源离线数据采集工具,单机版,数据量比sqoop少,可部署多台实现并行
实时
flume:实时日志采集工具
Maxwell:mysql
[大数据技术之 Maxwell(1.29.2版本)(最新最全教程)_maxwell教程_莱恩大数据的博客-CSDN博客](https://blog.csdn.net/cxl_shelly/article/details/122132124)
FlinkCDC:RDBMS数据库监控工具,数据库中的数据发生变化时,会将数据的变化收集起来 发送kafka或其他数据处理接口
[TOC]
DataX简介
Datax是阿里开源的支持多数据源的数据采集工具,能够实现 RDBMS/nosql数据库之间数据导入导出
支持MySQL、Oracle 、HDFS、Hive、ODPS、HBase、FTP 等各种异构数据源之间稳定高
效的数据同步功能。
DataX原理
image.png image.png分析:每个job 实际上就是作业采集模块,每个task是一个线程. datax是一个java单进程的应用
并发量默认是5
过程:
DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0
DataX于sqoop的区别
datax提供精确一致性语义
采集工具选型
日志问题
选型条件:
各种数据源比较多 选datax
单纯的 rdbms->hdfs 选sqoop
sqoop的底层是mr ,要到yarn的jobhistory上面查看日志
datax的日志 直接打印控制台或者在log里面查看
mysql2hdfs
image.png-- querysql 是最重要的 优先级高
where 增量全量
splitPK 主键或者递增的数 防止数据倾斜 一般是整形
fs: hdfs url 端口号(8020/9820)
任务一 实现datax导入到hive表(成功)
- 数据准备
create table insurance.zengliang(
id int,
name varchar(50),
dt varchar(50)
);
INSERT into insurance.zengliang VALUES
(1,'小明','2023-05-28 12:50:33'),
(2,'小明','2023-05-28 11:50:33'),
(3,'小明','2023-05-28 11:50:33'),
(4,'小明','2023-05-28 11:50:33'),
(5,'小明','2023-05-28 21:50:33'),
(6,'小明','2023-05-28 12:50:33'),
(7,'小明','2023-05-28 11:50:33'),
(8,'小明','2023-05-28 11:50:33'),
(9,'小明','2023-05-28 11:50:33'),
(10,'小明','2023-05-28 11:50:33'),
(11,'小明','2023-05-28 11:50:33'),
(12,'小明','2023-05-29 11:50:33'),
(13,'小明','2023-05-27 11:50:33')
任务二 实现datax 增量导入hive表
--更改 mysql w 2 hdfs r 的json文件
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["id", "name","dt"],
"querySql": [
"select id,name,dt from zengliang "
],
"connection": [
{
"jdbcUrl": ["jdbc:mysql://node1:3306/insurance"],
"table": ["zengliang"]
}
],
"password": "123456",
"username": "root",
"where": "dt >='${start_time}' and dt < '${end_time}'"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "varchar"
},
{
"name": "dt",
"type": "varchar"
},
],
"compress": "",
"defaultFS": "hdfs://node1:8020",
"fieldDelimiter": "\t",
"fileName": "zengliang",
"fileType": "text",
"path": "/user/hive/warehouse/datax.db/zengliang",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
执行调度命令
python ../bin/datax.py ./mysql2hive.json -p "-Dstart_time='2018-06-17 00:00:00' -Dend_time='2018-06-18 23:59:59'"
注意 datax3.0 之后,where 参数需要单独写 且权限高于 querysql(坑)
writeMode 两种模式 append over
datax导入hive分区表
网友评论