美文网首页
Flink之Iterate

Flink之Iterate

作者: 万州客 | 来源:发表于2022-04-11 13:58 被阅读0次

今天超过三篇记录了

代码:

package com.intsmaze.flink.streaming.helloworld;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.datastream.SplitStream;

import org.apache.flink.api.common.functions.MapFunction;


import java.util.ArrayList;
import java.util.List;

public class WorldCount {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        List<Tuple2<String, Integer>> list = new ArrayList();
        list.add(new Tuple2("flink",33));
        list.add(new Tuple2("storm",32));
        list.add(new Tuple2("spark",15));
        list.add(new Tuple2("java",18));
        list.add(new Tuple2("python",31));
        list.add(new Tuple2("scala",29));

        DataStream<Tuple2<String, Integer>> inputStream = env.fromCollection(list);
        IterativeStream<Tuple2<String, Integer>> it = inputStream.iterate(5000);

        SplitStream<Tuple2<String, Integer>> split = it.map(new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
            public Tuple2<String, Integer> map(Tuple2<String, Integer> value) throws Exception {
                Thread.sleep(1000);
                System.out.println("在迭代流中调用逻辑处理方法, 参数为: " + value);
                return new Tuple2(value.f0, --value.f1);
            }
        }).split(new OutputSelector<Tuple2<String, Integer>>() {
            public Iterable<String> select(Tuple2<String, Integer> value) {
                List<String> output = new ArrayList();
                if (value.f1 > 30) {
                    System.out.println("返回迭代数据: " + value);
                    output.add("iterate");
                } else {
                    output.add("output");
                }
                return output;
            }
        });
        it.closeWith(split.select("iterate"));
        split.select("output").print("输出结果: ");


        env.execute("Iterate Template.");
    }
}

输出:

D:\jdk1.8.0_271\bin\java.exe "-javaagent:D:\JetBrains\IntelliJ IDEA Community Edition 2020.2.3\lib\idea_rt.jar=57758:D:\JetBrains\IntelliJ IDEA Community Edition 2020.2.3\bin" -Dfile.encoding=UTF-8 -classpath D:\jdk1.8.0_271\jre\lib\charsets.jar;D:\jdk1.8.0_271\jre\lib\deploy.jar;D:\jdk1.8.0_271\jre\lib\ext\access-bridge-64.jar;D:\jdk1.8.0_271\jre\lib\ext\cldrdata.jar;D:\jdk1.8.0_271\jre\lib\ext\dnsns.jar;D:\jdk1.8.0_271\jre\lib\ext\jaccess.jar;D:\jdk1.8.0_271\jre\lib\ext\jfxrt.jar;D:\jdk1.8.0_271\jre\lib\ext\localedata.jar;D:\jdk1.8.0_271\jre\lib\ext\nashorn.jar;D:\jdk1.8.0_271\jre\lib\ext\sunec.jar;D:\jdk1.8.0_271\jre\lib\ext\sunjce_provider.jar;D:\jdk1.8.0_271\jre\lib\ext\sunmscapi.jar;D:\jdk1.8.0_271\jre\lib\ext\sunpkcs11.jar;D:\jdk1.8.0_271\jre\lib\ext\zipfs.jar;D:\jdk1.8.0_271\jre\lib\javaws.jar;D:\jdk1.8.0_271\jre\lib\jce.jar;D:\jdk1.8.0_271\jre\lib\jfr.jar;D:\jdk1.8.0_271\jre\lib\jfxswt.jar;D:\jdk1.8.0_271\jre\lib\jsse.jar;D:\jdk1.8.0_271\jre\lib\management-agent.jar;D:\jdk1.8.0_271\jre\lib\plugin.jar;D:\jdk1.8.0_271\jre\lib\resources.jar;D:\jdk1.8.0_271\jre\lib\rt.jar;D:\Code\helloworld\target\classes;C:\Users\ccc\.m2\repository\org\apache\flink\flink-java\1.9.2\flink-java-1.9.2.jar;C:\Users\ccc\.m2\repository\org\apache\flink\flink-core\1.9.2\flink-core-1.9.2.jar;C:\Users\ccc\.m2\repository\org\apache\flink\flink-annotations\1.9.2\flink-annotations-1.9.2.jar;C:\Users\ccc\.m2\repository\org\apache\flink\flink-metrics-core\1.9.2\flink-metrics-core-1.9.2.jar;C:\Users\ccc\.m2\repository\com\esotericsoftware\kryo\kryo\2.24.0\kryo-2.24.0.jar;C:\Users\ccc\.m2\repository\com\esotericsoftware\minlog\minlog\1.2\minlog-1.2.jar;C:\Users\ccc\.m2\repository\org\objenesis\objenesis\2.1\objenesis-2.1.jar;C:\Users\ccc\.m2\repository\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;C:\Users\ccc\.m2\repository\org\apache\commons\commons-compress\1.18\commons-compress-1.18.jar;C:\Users\ccc\.m2\repository\org\apache\flink\flink-shaded-asm-6\6.2.1-7.0\flink-shaded-asm-6-6.2.1-7.0.jar;C:\Users\ccc\.m2\repository\org\apache\commons\commons-lang3\3.3.2\commons-lang3-3.3.2.jar;C:\Users\ccc\.m2\repository\org\apache\commons\commons-math3\3.5\commons-math3-3.5.jar;C:\Users\ccc\.m2\repository\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;C:\Users\ccc\.m2\repository\org\apache\flink\force-shading\1.9.2\force-shading-1.9.2.jar;C:\Users\ccc\.m2\repository\org\apache\flink\flink-streaming-java_2.12\1.9.2\flink-streaming-java_2.12-1.9.2.jar;C:\Users\ccc\.m2\repository\org\apache\flink\flink-runtime_2.12\1.9.2\flink-runtime_2.12-1.9.2.jar;C:\Users\ccc\.m2\repository\org\apache\flink\flink-queryable-state-client-java\1.9.2\flink-queryable-state-client-java-1.9.2.jar;C:\Users\ccc\.m2\repository\org\apache\flink\flink-hadoop-fs\1.9.2\flink-hadoop-fs-1.9.2.jar;C:\Users\ccc\.m2\repository\commons-io\commons-io\2.4\commons-io-2.4.jar;C:\Users\ccc\.m2\repository\org\apache\flink\flink-shaded-netty\4.1.32.Final-7.0\flink-shaded-netty-4.1.32.Final-7.0.jar;C:\Users\ccc\.m2\repository\org\apache\flink\flink-shaded-jackson\2.10.1-9.0\flink-shaded-jackson-2.10.1-9.0.jar;C:\Users\ccc\.m2\repository\commons-cli\commons-cli\1.3.1\commons-cli-1.3.1.jar;C:\Users\ccc\.m2\repository\org\javassist\javassist\3.19.0-GA\javassist-3.19.0-GA.jar;C:\Users\ccc\.m2\repository\org\scala-lang\scala-library\2.12.7\scala-library-2.12.7.jar;C:\Users\ccc\.m2\repository\com\typesafe\akka\akka-actor_2.12\2.5.21\akka-actor_2.12-2.5.21.jar;C:\Users\ccc\.m2\repository\com\typesafe\config\1.3.3\config-1.3.3.jar;C:\Users\ccc\.m2\repository\org\scala-lang\modules\scala-java8-compat_2.12\0.8.0\scala-java8-compat_2.12-0.8.0.jar;C:\Users\ccc\.m2\repository\com\typesafe\akka\akka-stream_2.12\2.5.21\akka-stream_2.12-2.5.21.jar;C:\Users\ccc\.m2\repository\org\reactivestreams\reactive-streams\1.0.2\reactive-streams-1.0.2.jar;C:\Users\ccc\.m2\repository\com\typesafe\ssl-config-core_2.12\0.3.7\ssl-config-core_2.12-0.3.7.jar;C:\Users\ccc\.m2\repository\org\scala-lang\modules\scala-parser-combinators_2.12\1.1.1\scala-parser-combinators_2.12-1.1.1.jar;C:\Users\ccc\.m2\repository\com\typesafe\akka\akka-protobuf_2.12\2.5.21\akka-protobuf_2.12-2.5.21.jar;C:\Users\ccc\.m2\repository\com\typesafe\akka\akka-slf4j_2.12\2.5.21\akka-slf4j_2.12-2.5.21.jar;C:\Users\ccc\.m2\repository\org\clapper\grizzled-slf4j_2.12\1.3.2\grizzled-slf4j_2.12-1.3.2.jar;C:\Users\ccc\.m2\repository\com\github\scopt\scopt_2.12\3.5.0\scopt_2.12-3.5.0.jar;C:\Users\ccc\.m2\repository\org\xerial\snappy\snappy-java\1.1.4\snappy-java-1.1.4.jar;C:\Users\ccc\.m2\repository\com\twitter\chill_2.12\0.7.6\chill_2.12-0.7.6.jar;C:\Users\ccc\.m2\repository\com\twitter\chill-java\0.7.6\chill-java-0.7.6.jar;C:\Users\ccc\.m2\repository\org\apache\flink\flink-clients_2.12\1.9.2\flink-clients_2.12-1.9.2.jar;C:\Users\ccc\.m2\repository\org\apache\flink\flink-optimizer_2.12\1.9.2\flink-optimizer_2.12-1.9.2.jar;C:\Users\ccc\.m2\repository\org\apache\flink\flink-shaded-guava\18.0-7.0\flink-shaded-guava-18.0-7.0.jar;C:\Users\ccc\.m2\repository\org\slf4j\slf4j-api\1.7.21\slf4j-api-1.7.21.jar;C:\Users\ccc\.m2\repository\org\slf4j\slf4j-log4j12\1.7.21\slf4j-log4j12-1.7.21.jar;C:\Users\ccc\.m2\repository\log4j\log4j\1.2.17\log4j-1.2.17.jar com.intsmaze.flink.streaming.helloworld.WorldCount
在迭代流中调用逻辑处理方法, 参数为: (flink,33)
返回迭代数据: (flink,32)
在迭代流中调用逻辑处理方法, 参数为: (storm,32)
返回迭代数据: (storm,31)
在迭代流中调用逻辑处理方法, 参数为: (spark,15)
输出结果: > (spark,14)
在迭代流中调用逻辑处理方法, 参数为: (java,18)
输出结果: > (java,17)
在迭代流中调用逻辑处理方法, 参数为: (python,31)
输出结果: > (python,30)
在迭代流中调用逻辑处理方法, 参数为: (scala,29)
输出结果: > (scala,28)
在迭代流中调用逻辑处理方法, 参数为: (flink,32)
返回迭代数据: (flink,31)
在迭代流中调用逻辑处理方法, 参数为: (storm,31)
输出结果: > (storm,30)

Process finished with exit code 0

相关文章

网友评论

      本文标题:Flink之Iterate

      本文链接:https://www.haomeiwen.com/subject/vvthvltx.html