美文网首页
2022-03-22-Flink-43(二)

2022-03-22-Flink-43(二)

作者: 冰菓_ | 来源:发表于2022-03-22 15:58 被阅读0次
环境配置
    <properties>
        <maven.compiler.source>14</maven.compiler.source>
        <maven.compiler.target>14</maven.compiler.target>
        <flink.version>1.13.0</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
批处理:DataSet API 实现一个word count案例

DataSet API将面临淘汰,flink流处理才是核心,怎么用流实现批,提交的时候加一个参数:BATCH模式

public class woodcut {

    public static void main(String[] args) throws Exception {
        /*创建执行环境*/
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        DataSource<String> source = executionEnvironment.readTextFile("src/main/resources/a.txt");
        FlatMapOperator<String, Tuple2<String, Integer>> flatMapOperator = source.flatMap((String line, Collector<Tuple2<String, Integer>> s) -> {
            String[] split = line.split("\\s+");
            for (String s1 : split) {
                s.collect(Tuple2.of(s1, 1));
            }

        }).returns(Types.TUPLE(Types.STRING, Types.INT));

        flatMapOperator.groupBy(0).sum(1).print();

    }
}
流处理:有界流
public class bindedwordcount {


    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment ex= StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> streamSource = ex.readTextFile("src/main/resources/a.txt");
        SingleOutputStreamOperator<Tuple2<String, Integer>> flapWord = streamSource.flatMap((String line, Collector<Tuple2<String, Integer>> s) -> {
            String[] split = line.split("\\s+");
            for (String s1 : split) {
                s.collect(Tuple2.of(s1, 1));
            }


        }).returns(Types.TUPLE(Types.STRING, Types.INT));

       /* flapWord.keyBy(0)*/
        DataStreamSink<Tuple2<String, Integer>> streamSink = flapWord.keyBy(data -> data.f0).sum(1).print();

        ex.execute();

    }
}
流处理:无界流

模拟测试一下,需要安装 nc,实现端口的监听 netcat 1.11

public class unbindedwordcount {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> streamSource = environment.socketTextStream("localhost", 6666);

        SingleOutputStreamOperator<Tuple2<String, Integer>>  flapWord =
                streamSource.flatMap((String line, Collector<Tuple2<String, Integer>> s) -> {
            String[] split = line.split("\\s+");
            for (String s1 : split) {
                s.collect(Tuple2.of(s1, 1));
            }


        }).returns(Types.TUPLE(Types.STRING, Types.INT));

        /* flapWord.keyBy(0)*/
        DataStreamSink<Tuple2<String, Integer>> streamSink = flapWord.keyBy(data -> data.f0).sum(1).print();

        environment.execute();
    }

}

从args中获取参数
使用flink提供的ParameterTool.fromArgs(args);
注意写法没有 : --host localhost --port 6666

public class unbindedwordcount {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        ParameterTool fromArgs = ParameterTool.fromArgs(args);
        String host = fromArgs.get("host");
        int port = fromArgs.getInt("port");
        DataStreamSource<String> streamSource = environment.socketTextStream(host, port);

        SingleOutputStreamOperator<Tuple2<String, Integer>>  flapWord =
                streamSource.flatMap((String line, Collector<Tuple2<String, Integer>> s) -> {
            String[] split = line.split("\\s+");
            for (String s1 : split) {
                s.collect(Tuple2.of(s1, 1));
            }


        }).returns(Types.TUPLE(Types.STRING, Types.INT));

        /* flapWord.keyBy(0)*/
        DataStreamSink<Tuple2<String, Integer>> streamSink = flapWord.keyBy(data -> data.f0).sum(1).print();

        environment.execute();
    }

}

相关文章

  • 2022-03-22-Flink-43(二)

    环境配置 批处理:DataSet API 实现一个word count案例 DataSet API将面临淘汰,fl...

  • 二(二)

    发什么神经 突然就很想花钱 一边心疼 一边毫不在乎的花 啧 莫名其妙

  • 二,二

    2017.9.11教师节后的周一,第一次走进教室,刚站到讲台两个小可爱送给我两束花,原谅我那时候人还没有认全没有记...

  • 二〇二〇

    本来这篇小结打算年初写的,但是想想后边还有复试就先放一放,结果复试结束后过了两个月才想起要写这篇小结... 时过境...

  • 二金二木二火二土

    今天看完了极简中国史,这本书看了半个多月,因为是八十年前写的书,不是白话文,所以看的特别累。不过从近代前辈的角度去...

  • 说二『似二非二的二』

    说实话原以为他最多似二,生活小节或许专门学着似二,中枢神经应该不至于非二,没想到最近越来越疯狂地绞尽脑汁地朝着二的...

  • 二胎(二)

    今天宝宝三十周了,还有十周你就要出来了,也许会提前,妈妈和家人都很期待。 到了孕晚期,睡觉是个问题,左睡右睡都不对...

  • 二小姐(二)

    我去了李家,那环境好,夫人老爷小姐都很和蔼,我正坐在由木头和瓷做的椅子上,正等待着女管家来接我,我人生地不熟...

  • 二、递归(二)

    一、棋盘分割问题 【每次切完一个棋盘,就是需要舍弃掉一面,只能在一面操作】

  • 二选二

    既然已经分手了,那就应该正视我们的故事。待成熟睿智的甘露填补满年少轻狂的洼壑,你再回过头来,促膝把酒花前月下也能...

网友评论

      本文标题:2022-03-22-Flink-43(二)

      本文链接:https://www.haomeiwen.com/subject/mxbmdrtx.html