本文为学习笔记,会随着学习深入持续更新,仅供参考
1、Seatunnel是干什么的
简化企业中多源、异构数据的集成过程。它能够每天稳定地同步数十亿条数据,支持多种数据格式和存储系统之间的高效数据集成。主要用于处理实时流式和离线批处理任务。
2、实现原理和基本模块
SeaTunnel的数据处理流水线由Source、Sink以及多个Transform构成。Source负责从各种数据源获取数据,Sink则负责将数据写入目标系统。在Source和Sink之间,可以通过多个Transform对数据进行清洗、转换和聚合等操作。
3、在springboot中如何实现数据转换集成
a)直接转换(数据源1 —》数据源2)
参考官方文档
b)需要进行数据处理后再同步(数据源1—》数据处理—》数据源2)
简单的数据转换官方文档有提供配置参数(如:映射、过滤、替换等),复杂的数据转换需要重写transform模块(如涉及业务逻辑或官方没有的转换)
重写transform模块方法有以下几种:
方式一:工具和代码模块独立,可以直接将转换代码写在配置文件里
参考:DynamicCompile
方式二:集成在springboot项目中进行引用
参考代码逻辑如下
### 1. **配置文件,放在resources文件夹下**
env:
execution.parallelism: 4
source:
Kafka:
consumer.bootstrap.servers: "localhost:9092"
consumer.group.id: "sea-group"
topic: "input-topic"
schema:
field:
- name: "value"
type: "string"
transform:
- class_name: com.example.transform.TLVToJsonTransform
row_type:
- name: "value"
type: "string"
sink:
HDFS:
path: "hdfs://namenode:8020/user/data/output"
file_format: "json"
partition_by: ["date"]
save_mode: "append"
### 2. **实现自定义Transform**
创建一个自定义的Transform组件,用于解析TLV格式数据并转换为JSON格式。在SeaTunnel中,Transform是用来对数据进行中间处理的组件。
```java
import org.apache.seatunnel.api.common.SeaTunnelTransform;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.json.JSONObject;
public class TLVToJsonTransform implements SeaTunnelTransform<SeaTunnelRow> {
private final SeaTunnelRowType rowType;
public TLVToJsonTransform(SeaTunnelRowType rowType) {
this.rowType = rowType;
}
@Override
public SeaTunnelRow map(SeaTunnelRow input) {
String tlvData = (String) input.getField(0); // 假设TLV数据在第一列
JSONObject json = parseTLVToJson(tlvData);
return new SeaTunnelRow(new Object[]{json.toString()});
}
private JSONObject parseTLVToJson(String tlvData) {
JSONObject json = new JSONObject();
for (int i = 0; i < tlvData.length(); ) {
String tag = tlvData.substring(i, i + 2);
int length = Integer.parseInt(tlvData.substring(i + 2, i + 4));
String value = tlvData.substring(i + 4, i + 4 + length * 2);
json.put(tag, value);
i += 4 + length * 2;
}
return json;
}
@Override
public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
return rowType;
}
}
### 3. **在Spring Boot应用中启动SeaTunnel任务:**
.
import org.apache.seatunnel.core.SeaTunnelCore;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SeaTunnelApplication {
public static void main(String[] args) {
SpringApplication.run(SeaTunnelApplication.class, args);
SeaTunnelCore.start(args);
或者SeaTunnel.start("seatunnel-config.conf");
}
}
备注:配置文件编写可以参考官网,如果有多个topic,可以配置多个文件
参考文件
1、3分钟搞懂 SeaTunnel CDC 数据同步
2、官方例子
3、使用 Apache SeaTunnel 实现 Kafka Source 解析复杂Json 案例
网友评论