美文网首页
Flink常用的DataSet和DataStream API

Flink常用的DataSet和DataStream API

作者: 一生逍遥一生 | 来源:发表于2021-01-04 15:13 被阅读0次

    DataSet和DataStream的区别和联系

    DataSet部分来源于文件、表或者Java集合;DataStream的Source部分则一般是消息中间件。

    API介绍

    @Data
    @Getter
    @Setter
    @ToString
    public class Item implements Serializable {
        private String name;
        private Integer id;
    }
    
    public class CustomDataSource implements SourceFunction<Item> {
        private boolean isRunning = true;
    
        @Override
        public void run(SourceContext<Item> ctx) throws Exception {
            while (isRunning) {
                Item item = generateItem();
                ctx.collect(item);
                Thread.sleep(1000);
            }
        }
    
        @Override
        public void cancel() {
            isRunning = false;
        }
    
        private Item generateItem() {
            int i = new Random().nextInt(100);
            Item item = new Item();
            item.setName("name " + i);
            item.setId(i);
            return item;
        }
    }
    
    public class StreamingDemo {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //获取数据源
            /**
             *         DataStreamSource<Item> text = env.addSource(new CustomDataSource()).setParallelism(1);
             *         DataStream<Item> item = text.map((MapFunction<Item, Item>) value -> value);
             *         item.print().setParallelism(1);
             *         String jobName = "user define streaming source";
             *         env.execute(jobName);
             */
            // Map方式:Map接收一个元素作为输入,
            /**
             *         DataStreamSource<Item> items = env.addSource(new CustomDataSource()).setParallelism(1);
             *         SingleOutputStreamOperator<Object> mapItems = items.map((MapFunction<Item, Object>) item -> item.getName());
             *         mapItems.print().setParallelism(1);
             *         String jobName = "user define streaming source";
             *         env.execute(jobName);
             */
            //FlatMap:FlatMap接收一个元素,返回零到多个元素。
            //Filter:过滤不需要的数据
            //KeyedStream:根据某个或者某种属性进行分组,进行不同的处理
            //KeyBy: 对数据进行分组,数据添加随机数,防止数据倾斜
            //Aggregations:聚合函数。
            //min与minBy的区别:min会返回限定字段的最小值,minBy会返回对应的元素
            List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
            data.add(new Tuple3<>(0,1,0));
            data.add(new Tuple3<>(0,1,1));
            data.add(new Tuple3<>(0,2,2));
            data.add(new Tuple3<>(0,1,3));
            data.add(new Tuple3<>(1,2,5));
            data.add(new Tuple3<>(1,2,9));
            data.add(new Tuple3<>(1,2,11));
            data.add(new Tuple3<>(1,2,13));
            DataStreamSource<Item> items = env.fromCollection(data);
            items.keyBy(0).max(2).printToErr();
            env.execute();
            //Reduce会在每一个分组的KeyedStream上生效,按照用户自定义的聚合逻辑进行分组聚合。
        }
    }
    

    相关文章

      网友评论

          本文标题:Flink常用的DataSet和DataStream API

          本文链接:https://www.haomeiwen.com/subject/ulpsoktx.html