美文网首页
2022-04-20-Flink-45(四)

2022-04-20-Flink-45(四)

作者: 冰菓_ | 来源:发表于2022-04-21 00:15 被阅读0次

1. Transformation

map
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class mapTransformation {


    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Event> source = env.fromElements(
                new Event("小米", "0001", 100000L),
                new Event("小米", "0001", 200000L) 
        );

        //1. 实现一个类
        source.map(new myMap()).print();
        //2. 使用匿名类
        source.map(new MapFunction<Event, String>() {
            @Override
            public String map(Event event) throws Exception {
                return event.url;
            }
        }).print();
        //3. Lambda表达式,这里已经帮你实现了没有类型擦除
        source.map(data -> data.timestamp.toString()).print();

        env.execute();
    }


    public static class myMap implements MapFunction<Event,String>{


        @Override
        public String map(Event event) throws Exception {
            return event.user;
        }
    }
}
filter
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class filterTransformation {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Event> source = env.fromElements(
                new Event("小米", "0001", 100000L),
                new Event("小红", "0002", 200000L),
                new Event("小黄", "0003", 200000L)

        );

        //1. 实现一个类
        source.filter(new myFilter()).print();
        //2. 使用匿名类
        source.filter(new FilterFunction<Event>() {
            @Override
            public boolean filter(Event event) throws Exception {
                return event.user.equals("小红");
            }
        }).print();
        //3. lambda表达式
        source.filter(data -> data.user.equals("小黄")).print();

        env.execute();
    }

    public static  class  myFilter implements FilterFunction<Event>{


        @Override
        public boolean filter(Event s) throws Exception {
            return s.user.equals("小米");
        }
    }
}
flatmap
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class flatmapTransformation {


    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Event> source = env.fromElements(
                new Event("小米", "0001", 100000L),
                new Event("小红", "0002", 200000L)
        );

        //1.实现一个类
        source.flatMap(new myflatMap()).print("1");
        //2. 匿名类
        //3. 注意使用lambda类型擦除的问题
        source.flatMap((Event event ,Collector<String> collector) -> {
                  if (event.user.equals("小米")){
                      collector.collect(event.user);
                      collector.collect(event.url);
                      collector.collect(event.timestamp.toString());
                  }
                  else if (event.user.equals("小红")){
                         collector.collect("空");
                  }

        }).returns(TypeInformation.of(String.class)).print("2");
         //returns(new TypeHint<String>() {}).print("2");

        env.execute();
    }

    public static class myflatMap implements FlatMapFunction<Event , String>{


        @Override
        public void flatMap(Event event, Collector<String> collector) throws Exception {
              collector.collect(event.user);
              collector.collect(event.url);
              collector.collect(event.timestamp.toString());
        }
    }
}
keyBy

DataStream → KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同 key 的元素,在内部以 hash 的形式实现的。

max&maxBy

区别max返回最大值 maxBy 把最大值对应的元素全部返回

public class keyByAggregation {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        DataStreamSource<Event> source = env.fromElements(
                new Event("小米", "0001", 100000L),
                new Event("小米", "0002", 200000L),
                new Event("小米", "0003", 150000L),
                new Event("小红", "0001", 200000L)
        );

        source.keyBy(new KeySelector<Event, String>() {
            @Override
            public String getKey(Event event) throws Exception {
                return event.user;
            }
        }).max("timestamp").print("max");

       source.keyBy(data -> data.user).maxBy("timestamp").print("maxBy");


        env.execute();
    }
}
reduce
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class reduceAggregation {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Event> source = env.fromElements(
                new Event("小米", "0001", 100000L),
                new Event("小米", "0002", 200000L),
                new Event("小米", "0003", 150000L),
                new Event("小红", "0001", 200000L)
        );
        // 1.次数统计
        SingleOutputStreamOperator<Tuple2<String, Integer>> reduce = source.map(data -> Tuple2.of(data.user, 1)).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(data -> data.f0).reduce((data1, data2) -> Tuple2.of(data1.f0, data1.f1 + data2.f1));

        reduce.print();


        //2.选取当前最活跃的用户
        reduce.keyBy(data -> "key").reduce((data1,data2) -> data1.f1 > data2.f1 ? data1 : data2 ).print();

        env.execute();
    }
}
用户自定义UDF

函数类(Function Classes
匿名函数(Lambda Functions

富函数(Rich Functions)

“富函数”是 DataStream API 提供的一个函数类的接口,所有 Flink 函数类都有其 Rich 版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能

Rich Function 有一个生命周期的概念。典型的生命周期方法有:

  1. open()方法是 rich function 的初始化方法,当一个算子例如 map 或者 filter被调用之前 open()会被调用。
  2. close()方法是生命周期中的最后一个调用的方法,做一些清理工作。
  3. getRuntimeContext()方法提供了函数的 RuntimeContext 的一些信息,例如函数执行的并行度,任务的名字,以及 state 状态
public class richMap {

        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(2);

            DataStreamSource<Event> source = env.fromElements(
                    new Event("小米", "0001", 100000L),
                    new Event("小米", "0002", 200000L),
                    new Event("小米", "0003", 150000L),
                    new Event("小红", "0001", 200000L)
            );
            source.map(new myRichMap()).print();

            env.execute();
        }

        public static  class  myRichMap extends RichMapFunction<Event,Tuple2<String,Integer>>{
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                System.out.println("open" + getRuntimeContext().getTaskNameWithSubtasks());
            }

            @Override
            public Tuple2<String, Integer> map(Event event) throws Exception {
                return Tuple2.of(event.user,1);
            }

            @Override
            public void close() throws Exception {
                super.close();
                System.out.println("close" + getRuntimeContext().getTaskName());
            }
        }
    }

相关文章

  • 2022-04-20-Flink-45(四)

    1. Transformation map filter flatmap keyBy DataStream → K...

  • 四郎四郎傅四郎(四)

    上一章-南城(三) 第二章•无头女婴(一) 聚园柳庄内一私人别墅。 “四个,这是刚刚丁探长让人整理的和9...

  • 四(四)资源

    资源在MQL4程序中使用图形和声音 MQL4中的程序允许处理声音和图形文件:PlaySound() // 播放声...

  • 小 四 (四)

    这一次见面过后,又好多年没有见过小四。但是她三十岁那年,听到家里人又谈起了小四。 小四前两年又生了个女儿。由于她生...

  • 四幺四

    四月十四日,我想起了大学宿舍里面的扑克游戏四幺四,当时风靡整个后楼二十二系势力范围,一时“不会四幺四,人缘肯定次”...

  • 四苦 四醒 四行 四喜 四悲 四得(经典!)

    人生四苦 一苦:看不透 看不透人际中的纠结,争斗后的隐伤 看不透喧嚣中的平淡,繁华后的宁静 二苦:舍不得 舍不得曾...

  • 四人 · 四城 · 四时 · 四地

    四人 · 四城 · 四时 · 四地 由于工作和生活的原因,四个闺密的开始四地分隔生活,不一样的生活环境,不尽相同的...

  • 四郎四郎傅四郎

    契子 你有没有试过 掏心掏肺的爱一个人 嫉妒他对另一个人的好 却在他厌恶自己的那一刻 才发现 原来自己...

  • 四懂四会四能力

    四懂 1.懂本岗位的火灾危害性 2.懂预防措施 3.懂扑救火灾的方法 4.懂疏散 四会 1.会报警 2.会使用消防...

  • 四套四

    一、种子四大定律 我们是融于自然界的,不然没有大自然怎会有我们?故自然界的法则完全可以存在于我们的意识中。日常我们...

网友评论

      本文标题:2022-04-20-Flink-45(四)

      本文链接:https://www.haomeiwen.com/subject/myuksrtx.html