美文网首页
Flink之Connect和CoMap

Flink之Connect和CoMap

作者: 万州客 | 来源:发表于2022-04-11 13:59 被阅读0次

    一般是成对使用

    代码:

    package com.intsmaze.flink.streaming.helloworld;
    
    import org.apache.flink.streaming.api.datastream.*;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import org.apache.flink.streaming.api.functions.co.CoMapFunction;
    
    
    import java.util.ArrayList;
    import java.util.List;
    
    public class WorldCount {
    
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            List<Long> listLong = new ArrayList<Long>();
            listLong.add(1L);
            listLong.add(2L);
            List<String> listStr = new ArrayList<String>();
            listStr.add("www cnblogs com sky");
            listStr.add("hello sky");
            listStr.add("hello flink");
            listStr.add("hello java");
            DataStream<Long> longDataStream = env.fromCollection(listLong);
            DataStream<String> stringDataStream = env.fromCollection(listStr);
    
            ConnectedStreams<Long, String> connectedStreams = longDataStream.connect(stringDataStream);
    
            DataStream<String> connectedMap = connectedStreams.map(new CoMapFunction<Long, String, String>() {
                public String map1(Long aLong) throws Exception {
                    return "数据来自元素类型为Long的流: " + aLong;
                }
    
                public String map2(String s) throws Exception {
                    return "数据来自元素类型为String的流: " + s;
                }
            });
    
            connectedMap.print("CoMapFunction输出结果: ");
    
            env.execute("CoMapFunction Template.");
        }
    }
    

    输出:

    D:\jdk1.8.0_271\bin\java.exe "-javaagent:D:\JetBrains\IntelliJ IDEA Community Edition 2020.2.3\lib\idea_rt.jar=64082:D:\JetBrains\IntelliJ IDEA Community Edition 2020.2.3\bin" -Dfile.encoding=UTF-8 -classpath D:\jdk1.8.0_271\jre\lib\charsets.jar;D:\jdk1.8.0_271\jre\lib\deploy.jar;D:\jdk1.8.0_271\jre\lib\ext\access-bridge-64.jar;D:\jdk1.8.0_271\jre\lib\ext\cldrdata.jar;D:\jdk1.8.0_271\jre\lib\ext\dnsns.jar;D:\jdk1.8.0_271\jre\lib\ext\jaccess.jar;D:\jdk1.8.0_271\jre\lib\ext\jfxrt.jar;D:\jdk1.8.0_271\jre\lib\ext\localedata.jar;D:\jdk1.8.0_271\jre\lib\ext\nashorn.jar;D:\jdk1.8.0_271\jre\lib\ext\sunec.jar;D:\jdk1.8.0_271\jre\lib\ext\sunjce_provider.jar;D:\jdk1.8.0_271\jre\lib\ext\sunmscapi.jar;D:\jdk1.8.0_271\jre\lib\ext\sunpkcs11.jar;D:\jdk1.8.0_271\jre\lib\ext\zipfs.jar;D:\jdk1.8.0_271\jre\lib\javaws.jar;D:\jdk1.8.0_271\jre\lib\jce.jar;D:\jdk1.8.0_271\jre\lib\jfr.jar;D:\jdk1.8.0_271\jre\lib\jfxswt.jar;D:\jdk1.8.0_271\jre\lib\jsse.jar;D:\jdk1.8.0_271\jre\lib\management-agent.jar;D:\jdk1.8.0_271\jre\lib\plugin.jar;D:\jdk1.8.0_271\jre\lib\resources.jar;D:\jdk1.8.0_271\jre\lib\rt.jar;D:\Code\helloworld\target\classes;C:\Users\ccc\.m2\repository\org\apache\flink\flink-java\1.9.2\flink-java-1.9.2.jar;C:\Users\ccc\.m2\repository\org\apache\flink\flink-core\1.9.2\flink-core-1.9.2.jar;C:\Users\ccc\.m2\repository\org\apache\flink\flink-annotations\1.9.2\flink-annotations-1.9.2.jar;C:\Users\ccc\.m2\repository\org\apache\flink\flink-metrics-core\1.9.2\flink-metrics-core-1.9.2.jar;C:\Users\ccc\.m2\repository\com\esotericsoftware\kryo\kryo\2.24.0\kryo-2.24.0.jar;C:\Users\ccc\.m2\repository\com\esotericsoftware\minlog\minlog\1.2\minlog-1.2.jar;C:\Users\ccc\.m2\repository\org\objenesis\objenesis\2.1\objenesis-2.1.jar;C:\Users\ccc\.m2\repository\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;C:\Users\ccc\.m2\repository\org\apache\commons\commons-compress\1.18\commons-compress-1.18.jar;C:\Users\ccc\.m2\repository\org\apache\flink\flink-shaded-asm-6\6.2.1-7.0\flink-shaded-asm-6-6.2.1-7.0.jar;C:\Users\ccc\.m2\repository\org\apache\commons\commons-lang3\3.3.2\commons-lang3-3.3.2.jar;C:\Users\ccc\.m2\repository\org\apache\commons\commons-math3\3.5\commons-math3-3.5.jar;C:\Users\ccc\.m2\repository\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;C:\Users\ccc\.m2\repository\org\apache\flink\force-shading\1.9.2\force-shading-1.9.2.jar;C:\Users\ccc\.m2\repository\org\apache\flink\flink-streaming-java_2.12\1.9.2\flink-streaming-java_2.12-1.9.2.jar;C:\Users\ccc\.m2\repository\org\apache\flink\flink-runtime_2.12\1.9.2\flink-runtime_2.12-1.9.2.jar;C:\Users\ccc\.m2\repository\org\apache\flink\flink-queryable-state-client-java\1.9.2\flink-queryable-state-client-java-1.9.2.jar;C:\Users\ccc\.m2\repository\org\apache\flink\flink-hadoop-fs\1.9.2\flink-hadoop-fs-1.9.2.jar;C:\Users\ccc\.m2\repository\commons-io\commons-io\2.4\commons-io-2.4.jar;C:\Users\ccc\.m2\repository\org\apache\flink\flink-shaded-netty\4.1.32.Final-7.0\flink-shaded-netty-4.1.32.Final-7.0.jar;C:\Users\ccc\.m2\repository\org\apache\flink\flink-shaded-jackson\2.10.1-9.0\flink-shaded-jackson-2.10.1-9.0.jar;C:\Users\ccc\.m2\repository\commons-cli\commons-cli\1.3.1\commons-cli-1.3.1.jar;C:\Users\ccc\.m2\repository\org\javassist\javassist\3.19.0-GA\javassist-3.19.0-GA.jar;C:\Users\ccc\.m2\repository\org\scala-lang\scala-library\2.12.7\scala-library-2.12.7.jar;C:\Users\ccc\.m2\repository\com\typesafe\akka\akka-actor_2.12\2.5.21\akka-actor_2.12-2.5.21.jar;C:\Users\ccc\.m2\repository\com\typesafe\config\1.3.3\config-1.3.3.jar;C:\Users\ccc\.m2\repository\org\scala-lang\modules\scala-java8-compat_2.12\0.8.0\scala-java8-compat_2.12-0.8.0.jar;C:\Users\ccc\.m2\repository\com\typesafe\akka\akka-stream_2.12\2.5.21\akka-stream_2.12-2.5.21.jar;C:\Users\ccc\.m2\repository\org\reactivestreams\reactive-streams\1.0.2\reactive-streams-1.0.2.jar;C:\Users\ccc\.m2\repository\com\typesafe\ssl-config-core_2.12\0.3.7\ssl-config-core_2.12-0.3.7.jar;C:\Users\ccc\.m2\repository\org\scala-lang\modules\scala-parser-combinators_2.12\1.1.1\scala-parser-combinators_2.12-1.1.1.jar;C:\Users\ccc\.m2\repository\com\typesafe\akka\akka-protobuf_2.12\2.5.21\akka-protobuf_2.12-2.5.21.jar;C:\Users\ccc\.m2\repository\com\typesafe\akka\akka-slf4j_2.12\2.5.21\akka-slf4j_2.12-2.5.21.jar;C:\Users\ccc\.m2\repository\org\clapper\grizzled-slf4j_2.12\1.3.2\grizzled-slf4j_2.12-1.3.2.jar;C:\Users\ccc\.m2\repository\com\github\scopt\scopt_2.12\3.5.0\scopt_2.12-3.5.0.jar;C:\Users\ccc\.m2\repository\org\xerial\snappy\snappy-java\1.1.4\snappy-java-1.1.4.jar;C:\Users\ccc\.m2\repository\com\twitter\chill_2.12\0.7.6\chill_2.12-0.7.6.jar;C:\Users\ccc\.m2\repository\com\twitter\chill-java\0.7.6\chill-java-0.7.6.jar;C:\Users\ccc\.m2\repository\org\apache\flink\flink-clients_2.12\1.9.2\flink-clients_2.12-1.9.2.jar;C:\Users\ccc\.m2\repository\org\apache\flink\flink-optimizer_2.12\1.9.2\flink-optimizer_2.12-1.9.2.jar;C:\Users\ccc\.m2\repository\org\apache\flink\flink-shaded-guava\18.0-7.0\flink-shaded-guava-18.0-7.0.jar;C:\Users\ccc\.m2\repository\org\slf4j\slf4j-api\1.7.21\slf4j-api-1.7.21.jar;C:\Users\ccc\.m2\repository\org\slf4j\slf4j-log4j12\1.7.21\slf4j-log4j12-1.7.21.jar;C:\Users\ccc\.m2\repository\log4j\log4j\1.2.17\log4j-1.2.17.jar com.intsmaze.flink.streaming.helloworld.WorldCount
    CoMapFunction输出结果: :8> 数据来自元素类型为String的流: hello flink
    CoMapFunction输出结果: :6> 数据来自元素类型为String的流: www cnblogs com sky
    CoMapFunction输出结果: :9> 数据来自元素类型为String的流: hello java
    CoMapFunction输出结果: :12> 数据来自元素类型为Long的流: 1
    CoMapFunction输出结果: :1> 数据来自元素类型为Long的流: 2
    CoMapFunction输出结果: :7> 数据来自元素类型为String的流: hello sky
    
    Process finished with exit code 0
    

    相关文章

      网友评论

          本文标题:Flink之Connect和CoMap

          本文链接:https://www.haomeiwen.com/subject/pfacvltx.html