美文网首页
温故知新:初识Seatunnel

温故知新:初识Seatunnel

作者: 灿烂的GL | 来源:发表于2024-07-29 16:21 被阅读0次

    本文为学习笔记,会随着学习深入持续更新,仅供参考

    1、Seatunnel是干什么的
    简化企业中多源、异构数据的集成过程。它能够每天稳定地同步数十亿条数据,支持多种数据格式和存储系统之间的高效数据集成。主要用于处理实时流式和离线批处理任务。


    2、实现原理和基本模块
    SeaTunnel的数据处理流水线由Source、Sink以及多个Transform构成。Source负责从各种数据源获取数据,Sink则负责将数据写入目标系统。在Source和Sink之间,可以通过多个Transform对数据进行清洗、转换和聚合等操作。


    3、在springboot中如何实现数据转换集成
    a)直接转换(数据源1 —》数据源2)
    参考官方文档
    b)需要进行数据处理后再同步(数据源1—》数据处理—》数据源2)
    简单的数据转换官方文档有提供配置参数(如:映射、过滤、替换等),复杂的数据转换需要重写transform模块(如涉及业务逻辑或官方没有的转换)

    官方转换方法.png
    重写transform模块方法有以下几种:
    方式一:工具和代码模块独立,可以直接将转换代码写在配置文件里
    参考:DynamicCompile
    方式二:集成在springboot项目中进行引用
    参考代码逻辑如下
    ### 1. **配置文件,放在resources文件夹下**
    env:
      execution.parallelism: 4
    
    source:
      Kafka:
        consumer.bootstrap.servers: "localhost:9092"
        consumer.group.id: "sea-group"
        topic: "input-topic"
        schema:
          field:
            - name: "value"
              type: "string"
                      
    transform:
      - class_name: com.example.transform.TLVToJsonTransform
        row_type: 
          - name: "value"
            type: "string"
    
    sink:
      HDFS:
        path: "hdfs://namenode:8020/user/data/output"
        file_format: "json"
        partition_by: ["date"]
        save_mode: "append"
        
    
    ### 2. **实现自定义Transform**
    创建一个自定义的Transform组件,用于解析TLV格式数据并转换为JSON格式。在SeaTunnel中,Transform是用来对数据进行中间处理的组件。
    
    ```java
    import org.apache.seatunnel.api.common.SeaTunnelTransform;
    import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
    import org.apache.seatunnel.api.table.type.SeaTunnelRow;
    import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
    import org.json.JSONObject;
    
    public class TLVToJsonTransform implements SeaTunnelTransform<SeaTunnelRow> {
    
        private final SeaTunnelRowType rowType;
    
        public TLVToJsonTransform(SeaTunnelRowType rowType) {
            this.rowType = rowType;
        }
    
        @Override
        public SeaTunnelRow map(SeaTunnelRow input) {
            String tlvData = (String) input.getField(0); // 假设TLV数据在第一列
            JSONObject json = parseTLVToJson(tlvData);
            return new SeaTunnelRow(new Object[]{json.toString()});
        }
    
        private JSONObject parseTLVToJson(String tlvData) {
            JSONObject json = new JSONObject();
            for (int i = 0; i < tlvData.length(); ) {
                String tag = tlvData.substring(i, i + 2);
                int length = Integer.parseInt(tlvData.substring(i + 2, i + 4));
                String value = tlvData.substring(i + 4, i + 4 + length * 2);
                json.put(tag, value);
                i += 4 + length * 2;
            }
            return json;
        }
        
        @Override
        public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
            return rowType;
        }
    }
    
    ### 3. **在Spring Boot应用中启动SeaTunnel任务:**
    .
    import org.apache.seatunnel.core.SeaTunnelCore;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
     
    @SpringBootApplication
    public class SeaTunnelApplication {
     
        public static void main(String[] args) {
            SpringApplication.run(SeaTunnelApplication.class, args);
            SeaTunnelCore.start(args);
    或者SeaTunnel.start("seatunnel-config.conf");
        }
    }
    

    备注:配置文件编写可以参考官网,如果有多个topic,可以配置多个文件


    参考文件
    1、3分钟搞懂 SeaTunnel CDC 数据同步
    2、官方例子
    3、使用 Apache SeaTunnel 实现 Kafka Source 解析复杂Json 案例

    相关文章

      网友评论

          本文标题:温故知新:初识Seatunnel

          本文链接:https://www.haomeiwen.com/subject/lttihjtx.html