美文网首页
Flink 常用算子

Flink 常用算子

作者: Eqo | 来源:发表于2022-08-18 15:58 被阅读0次
  • map 算子 接收 t o t是传入参数的数据类型 o是接收的数据类型
    如果返回是Tuple Tuple中也要指定泛型
        //todo 此时数据流内是一个一个 单词, 将单词转换成 tuple元组
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordTupleDS = wordDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                return Tuple2.of(value, 1);

            }
        });
  • keyby
    在Flink中如果是批处理,分组使用函数:groupBy,从Flink 1.12以后开始,由于流批一体,无论是流计算还是批处理,分组函数:keyBy

    使用:
    如果是元组 可以指定下标
    如果是 javabean 可以指定对象属性
    指定select 选择器
    官方建议 使用 lambda 表达式


    image.png


import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 *
 * 使用keyby算子 对数据流分组
 * @author ado
 */
public class TransformatKeyByDemo {

    public static void main(String[] args) throws Exception {
        // 1. 执行环境-env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 2. 数据源-source
        // 使用 可变参数 数据源 传入元素为tuple 元组对象
        DataStreamSource<Tuple2<String, Integer>> inputDataStream = env.fromElements(
                Tuple2.of("spark", 1), Tuple2.of("flink", 1), Tuple2.of("hadoop", 1),
                Tuple2.of("spark", 1), Tuple2.of("flink", 1), Tuple2.of("hadoop", 1)
        );


        // 3. 数据转换-transformation
        // keyby算子有三种使用方式
        //todo 数据为元组时,指定key下标
        SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = inputDataStream.keyBy(0).sum(1);
//        ds1.printToErr();


        //todo 数据为 javabean对象时 指定属性
        SingleOutputStreamOperator<Tuple2<String, Integer>> ds2 = inputDataStream.keyBy("f0").sum("f1");
        ds2.printToErr();

        //todo 数据为 keyselector接口对象时 (最标准)
        // IN 是传入的元素 key 是元素key的类型
        SingleOutputStreamOperator<Tuple2<String, Integer>> ds3 = inputDataStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {

            // z这个方法其实就是 获取key
            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
            }
        }).sum(1);
        ds3.printToErr();
        // todo 官方建议使用 lmbad表达式
        inputDataStream.keyBy((Tuple2<String, Integer> value) ->{
            return value.f0;
        } ).sum(1);

        inputDataStream.keyBy(value -> value.f0);


        // 4. 数据终端-sink

        // 5. 触发执行-execute
        env.execute("TransformatKeyByDemo");
    }

}  

reduce 算子

  • 对分组流中每个组内数据的聚合,来一条数据,聚合一条数据
    比sum功能全面

  • 需要指定 两个变量 tmp item
    tmp 表示聚合的中间变量
    itrem 表示要聚合的元素
    第一次计算的时候 只会赋值 不会计算

  • todo: KeyedStream使用reduce算子对分组流中各个组内数据聚合时,要求组内数据聚合结果类型与流中数据类型相同

package cn.itcast.flink.transformation;

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * 测试reduce 算子
 * reduce 的返回值必须跟传入的数据流 结果一致
 * reduce 接收两个参数 一个是tmp 聚合中间变量 一个是 iteam 要聚合的元素
 *  // 第一次计算的时候 只会赋值 不会计算
 *                 System.out.println("tmp = " + tmp + ", item = " + item);
 * @author ado
 */
public class TransformationReduceDemo {

    public static void main(String[] args) throws Exception {
        // 1. 执行环境-env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. 数据源-source
        DataStreamSource<Tuple2<String, Integer>> inputDataStream = env.fromElements(
                Tuple2.of("flink", 1), Tuple2.of("spark", 1), Tuple2.of("flink", 1),
                Tuple2.of("flink", 1), Tuple2.of("flink", 1), Tuple2.of("spark", 1)
        );


        // 3. 数据转换-transformation
        KeyedStream<Tuple2<String, Integer>, Tuple> stream = inputDataStream.keyBy(0);

        SingleOutputStreamOperator<Tuple2<String, Integer>> reduceDS= stream.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> tmp,
                                                  Tuple2<String, Integer> item) throws Exception {

                // 第一次计算的时候 只会赋值 不会计算
                System.out.println("tmp = " + tmp + ", item = " + item);
                Integer historyValue = tmp.f1;
                Integer nowValue = item.f1;

                historyValue += nowValue;
                return Tuple2.of(tmp.f0, historyValue);


            }
        });
        // 4. 数据终端-sink

        reduceDS.printToErr();
        // 5. 触发执行-execute
        env.execute("TransformationReduceDemo");
    }

}  
  • max/maxby
  • max或min只会求出最大或最小的那个字段,其他的字段不管
  • maxBy或minBy会求出最大或最小的那个字段和对应的其他的字段
package cn.itcast.flink.transformation;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 *
 * 测试Flink 流式计算 max min /maxby minby 算子
 * 功能 求分组后的 最大最小值
 * - **max或min**:`只会求出最大或最小的那个字段`,其他的字段不管
 * - **maxBy或minBy**:`会求出最大或最小的那个字段和对应的其他的字段`
 * @author ado
 */
public class TransformationMaxMinDemo {

    public static void main(String[] args) throws Exception {
        // 1. 执行环境-env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);
        // 2. 数据源-source
        DataStreamSource<Tuple3<String, String, Integer>> inputDS = env.fromElements(
                Tuple3.of("上海", "浦东新区", 777),
                Tuple3.of("上海", "闵行区", 999),
                Tuple3.of("上海", "杨浦区", 666),

                Tuple3.of("北京", "东城区", 567),
                Tuple3.of("北京", "西城区", 987),
                Tuple3.of("上海", "静安区", 888),
                Tuple3.of("北京", "海淀区", 9999)

        );


        // 3. 数据转换-transformation
        inputDS.keyBy(0).max(2).printToErr();

        inputDS.keyBy(0).maxBy(2).print();



        // 4. 数据终端-sink

        // 5. 触发执行-execute
        env.execute("TransformationMaxMinDemo");
    }

}  

union collect 算子

  • union函数:可以合并多个同类型的数据流,并生成同类型的数据流,即可以将多个DataStream[T]合并为一个新的DataStream[T]。
  • connect函数:与union函数功能类似,用来连接两个数据流,且2个数据流数据类型可不一样
  1. connect只能连接两个数据流,union可以连接多个数据流;

  2. connect所连接的两个数据流的数据类型可以不一致,union所连接的两个数据流的数据类型必须一致;

  3. todo:2个流进行连接connect,数据类型可以不相同,通常应用于大表数据流关联小表数据流
           当将2个流连接以后,得到连接流,必须调用map、flatMap等算子,对流中数据处理,否则不能输出
    
package cn.itcast.flink.transformation;

import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import scala.Tuple2;


/**
 * @author ado
 */
public class TransformationUnionConnectDemo {

    public static void main(String[] args) throws Exception {
        // 1. 执行环境-env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 2. 数据源-source
        DataStream<String> dataStream01 = env.fromElements("A", "B", "C", "D");
        DataStream<String> dataStream02 = env.fromElements("aa", "bb", "cc", "dd");
        DataStream<Integer> dataStream03 = env.fromElements(1, 2, 3, 4);

        // 3. 数据转换-transformation
        // todo: 2个流进行union,要求流中数据类型必须相同
        //将两个流的数据合到一个流中 列如 DS1(a,b,c ) +DS2(A,B,C) -> DS3(a,b,c,A,B,C)
        DataStream<String> unionDS = dataStream01.union(dataStream02);
        unionDS.print();

        // 4. 数据终端-sink
         /*
            todo:2个流进行连接connect,数据类型可以不相同,通常应用于大表数据流关联小表数据流
                   当将2个流连接以后,得到连接流,必须调用map、flatMap等算子,对流中数据处理,否则不能输出
         */
        ConnectedStreams<String, Integer> connectDS = dataStream01.connect(dataStream03);

        SingleOutputStreamOperator<String> output = connectDS.map(new CoMapFunction<String, Integer, String>() {

            // COMAP接收的是三个参数  1 2是两个流的输入参数 3 是out 输出
            @Override
            public String map1(String value) throws Exception {
                return value;
            }

            @Override
            public String map2(Integer value) throws Exception {

                return value+"";
            }
        });
        output.printToErr("connect");




        // 5. 触发执行-execute
        env.execute("TransformationUnionConnectDemo");
    }

}  

相关文章

网友评论

      本文标题:Flink 常用算子

      本文链接:https://www.haomeiwen.com/subject/zthugrtx.html