美文网首页
2021-08-27 flink测试小地方留意记录

2021-08-27 flink测试小地方留意记录

作者: 无来无去_A | 来源:发表于2021-08-27 16:02 被阅读0次

    flink socketTextStream window nc

    测试工程POM.xml

      <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>1.10.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.12</artifactId>
                <version>1.10.1</version>
            </dependency>
    

    在windows cmd使用如下命令准备连接 9000端口发送数据

    nc -lp 9000 -v
    

    flink demo

    package com.keke.test0827;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    
    public class StreamWordCount2 {
        public static void main(String[] args) throws Exception{
            // 创建流处理执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    //        env.setParallelism(1);
    //        env.disableOperatorChaining();
    
    //        // 从文件中读取数据
    //        String inputPath = "hello.txt";
    //        DataStream<String> inputDataStream = env.readTextFile(inputPath);
    
            // 从socket文本流读取数据
            DataStream<String> inputDataStream = env.socketTextStream("127.0.0.1", 9000);
    
    //        DataStreamSource<String> inputDataStream = env.socketTextStream("127.0.0.1", 9000);
    
            // 基于数据流进行转换计算
            DataStream<Tuple2<String, Integer>> resultStream = inputDataStream.flatMap(new MyFlatMapper())
                    .keyBy(0)
     //               .sum(1).setParallelism(2).slotSharingGroup("red");     
    //                .sum(1);
    
            resultStream.print();
    
            // 执行任务
            env.execute();
        }
    
        // 自定义类,实现FlatMapFunction接口
        public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                // 按空格分词
                String[] words = value.split(" ");
                // 遍历所有word,包成二元组输出
                for (String word : words) {
                    out.collect(new Tuple2<>(word, 1));
                }
    
    
            }
        }
    }
    
    

    如上例子:

    1 .sum(1).setParallelism(2).slotSharingGroup("red");

    2 .sum(1);

    快速,因为只启动flink案例后如果连接不到9000端口(没有先 执行cmd nc -lp 9000 -v 命令)程序将很快终止 使用 windows cmd中命令 netstat -ano|findstr "9000" 查看 9000端口的启动情况
    如果选择1 位置代码, 无结果显示。

    如果选择2 位置代码, 结果显示:
    TCP 0.0.0.0:9000 0.0.0.0:0 LISTENING 4612

    对于以上案例 setParallelism(2).slotSharingGroup("red") 此段代码加上以后 nc -lp 9000 -v 无法连接端口发送数据测试纪念

    相关文章

      网友评论

          本文标题:2021-08-27 flink测试小地方留意记录

          本文链接:https://www.haomeiwen.com/subject/eipmiltx.html