美文网首页大数据Spark_Flink_Hadoop
使用Flink批处理完成数据比对(对账)一

使用Flink批处理完成数据比对(对账)一

作者: 李不言被占用了 | 来源:发表于2020-04-10 21:47 被阅读0次

看了几天flink,刚入门。
简单说下对flink的感受,flink有4层(有些说3层,将Table API和SQL看成一层)API,越底层,对数据的操作就越精细,越高层完成功能所需要的代码就越少,而且代码越易读。


image.png

api使用起来很像java中的stream,这个其实很显然,都是为了对流数据进行处理。感觉就像flink是java中并行流的分布式版本,所以对stream熟悉的话,flink上手不难,或者说使用flink编写代码并不难。

Flink的编程模式:输入(source) -> 处理(转换transform) -> 输出(sink),3部分,相当清爽。

统一术语

数据比对一般针对两个数据集A/B,在选定一个基准方A后,定义如下:
F000:A/B两方数据相同
F113:A中存在,但B中没有,A比B多
F114:B中存在,但A中没有,B比A多
F115:A与B的关键字段相同,但毕竟字段不同,如A与B都有同一笔订单,但订单金额不同

新建工程

这里我们使用官方提供的quickstart做模板,如果是比较新版的idea(如2020.1)里面直接有flink的quickstart模板,旧版的idea的话,需要自己添加一下。


image.png image.png

下次使用的时候可以直接从这里看到:


image.png

如果你使用的是scala,ArtifactId则填flink-quickstart-scala。具体的版本信息可以根据最新版的填写。

添加Table API依赖

在pom.xml中添加Table API依赖。

<!-- Table API -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<!-- Table API需要scala -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-scala_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>

编写代码

利用模板里的BatchJob来编写:

package com.flink;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;

import java.util.List;

/**
 * Skeleton for a Flink Batch Job.
 *
 * <p>For a tutorial how to write a Flink batch application, check the
 * tutorials and examples on the <a href="http://flink.apache.org/docs/stable/">Flink Website</a>.
 *
 * <p>To package your application into a JAR file for execution,
 * change the main class in the POM.xml file to this class (simply search for 'mainClass')
 * and run 'mvn clean package' on the command line.
 */
public class BatchJob {

    public static void main(String[] args) throws Exception {
        // set up the batch execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // Table Environment
        BatchTableEnvironment tableEnvironment = BatchTableEnvironment.getTableEnvironment(env);

        /**
         * 构造两个数据集,实际生产从自己需要的source中获取即可
         */
        DataSource<String> dataSourceA_unique = env.fromElements("orderId_1_f113", "orderId_2_f000", "orderId_3_f115");
        DataSource<String> dataSourceB_unique = env.fromElements("orderId_2_f000", "orderId_3_f115", "orderId_4_f114");


        // 转换成table
        Table tableA_unique = tableEnvironment.fromDataSet(dataSourceA_unique);
        Table tableB_unique = tableEnvironment.fromDataSet(dataSourceB_unique);


        /**
         * 核心比对(对账)逻辑
         */
        Table f113_table = tableA_unique.minusAll(tableB_unique);// 差集
        Table f114_table = tableB_unique.minusAll(tableA_unique);// 差集
        Table f000_table = tableA_unique.intersect(tableB_unique);// 交集

        // 转回DataSet用于输出
        DataSet<String> f000 = tableEnvironment.toDataSet(f000_table, String.class);
        DataSet<String> f113 = tableEnvironment.toDataSet(f113_table, String.class);
        DataSet<String> f114 = tableEnvironment.toDataSet(f114_table, String.class);


        /**
         * 输出,实际输出到自己需要的sink即可
         */
        List<String> f000_list = f000.collect();
        List<String> f113_list = f113.collect();
        List<String> f114_list = f114.collect();

        System.out.println("==============================");
        System.out.println("f000 ->" + f000_list);
        System.out.println("==============================");
        System.out.println("f113 ->" + f113_list);
        System.out.println("==============================");
        System.out.println("f114 ->" + f114_list);



        // 批处理不需要显示调用execute,否则会报错
        // env.execute("Flink Batch Java API Skeleton");
    }

}

简单说下几个关键点:

  1. 使用Table API需要创建对应的执行环境:
BatchTableEnvironment tableEnvironment = BatchTableEnvironment.getTableEnvironment(env);
  1. 模板代码中最后显式调用env.execute(),其实在批处理中不需要,显式调用反而会报错。

源码

源码

总结

本质上就是利用Table API中对数据集的处理函数(交集、差集)来完成数据比对。
如果你有更好的想法,欢迎留言,多多指教。
转载请注明出处

相关文章

网友评论

    本文标题:使用Flink批处理完成数据比对(对账)一

    本文链接:https://www.haomeiwen.com/subject/wgolmhtx.html