美文网首页
Flink增量读取文件模拟实时数据流

Flink增量读取文件模拟实时数据流

作者: Jorvi | 来源:发表于2020-01-06 10:37 被阅读0次

    代码

    public class SourceFromFile extends RichSourceFunction<String> {
        private volatile Boolean isRunning = true;
    
        @Override
        public void run(SourceContext ctx) throws Exception {
            BufferedReader bufferedReader = new BufferedReader(new FileReader("E:\\documents\\test.txt"));
            while (isRunning) {
                String line = bufferedReader.readLine();
                if (StringUtils.isBlank(line)) {
                    continue;
                }
                ctx.collect(line);
                TimeUnit.SECONDS.sleep(10);
            }
        }
    
        @Override
        public void cancel() {
            isRunning = false;
        }
    }
    
    public class WindowWordCount {
        public static void main(String[] args) throws Exception {
    
            final ParameterTool parameters = ParameterTool.fromArgs(args);
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.getConfig().setGlobalJobParameters(parameters);
    
            env.enableCheckpointing(5000, CheckpointingMode.AT_LEAST_ONCE);
    
            DataStreamSource<String> dataStream = env.addSource(new SourceFromFile()).setParallelism(1);
    
            dataStream
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                        String[] arr = value.split(",");
                        for (String item : arr) {
                            out.collect(new Tuple2<>(item, 1));
                        }
                    }
                })
                .keyBy(0)
                .timeWindow(Time.minutes(1))
                .sum(1)
                .print();
    
            env.execute("WindowWordCount");
        }
    }
    

    测试

    文件中输入:
    aaa,bbb,ccc

    结果:
    2> (bbb,1)
    3> (aaa,1)
    4> (ccc,1)


    增加输入:
    bbb,ccc,ddd

    结果:
    4> (ccc,1)
    2> (bbb,1)
    4> (ddd,1)


    增加输入:
    aaa,aaa,aaa

    结果:
    3> (aaa,3)

    相关文章

      网友评论

          本文标题:Flink增量读取文件模拟实时数据流

          本文链接:https://www.haomeiwen.com/subject/oyihactx.html