美文网首页Flink
flink的broadcast

flink的broadcast

作者: 大大大大大大大熊 | 来源:发表于2018-11-09 19:44 被阅读394次

测试broadcast:
代码:

package SpamFilter;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.util.Collector;

import Bolts.TraWord;


public class test {

    public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        String[] NUMS = new String[] {"1","2","3","4","5","6"};
        String[] ZI = new String[] {"A","B","C","D","E","F","A"};
        Integer[] a = new Integer[] {1,2,3,4};
        DataStream<String> text1;
        DataStream<String> text2;
        DataStream<Integer> text3;
        text1 = env.fromElements(NUMS);
        text2 = env.fromElements(ZI);
        text3 = env.fromElements(a).broadcast();
        DataStream<String> TraWordProbability = text2
                .keyBy(new KeySelector<String, String>() {
                    
                            @Override
                            public String getKey(String in) throws Exception {
                                // TODO Auto-generated method stub
                                return in;
                            }
                        })
                .connect(text3)
                .flatMap(new CoFlatMapFunction<String, Integer, String>() {

                    int count = 0;
                    @Override
                    public void flatMap1(String input, Collector<String> out) throws Exception {
                        // TODO Auto-generated method stub
                        String b = input + String.valueOf(count);
                        System.out.println(b);
                    }

                    @Override
                    public void flatMap2(Integer input2, Collector<String> out) throws Exception {
                        // TODO Auto-generated method stub
                        count +=input2;
                        System.out.println(count);
                    }
                    
                    })
                .setParallelism(2);
        
        env.execute("test");
    }
    
}

结果:

Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1234736298] with leader session id cbd2b49e-809d-459d-b884-248d50caad15.
11/09/2018 19:42:21 Job execution switched to status RUNNING.
11/09/2018 19:42:21 Source: Collection Source(1/1) switched to SCHEDULED 
11/09/2018 19:42:21 Source: Collection Source(1/1) switched to SCHEDULED 
11/09/2018 19:42:21 Co-Flat Map(1/2) switched to SCHEDULED 
11/09/2018 19:42:21 Co-Flat Map(2/2) switched to SCHEDULED 
11/09/2018 19:42:21 Source: Collection Source(1/1) switched to DEPLOYING 
11/09/2018 19:42:21 Source: Collection Source(1/1) switched to DEPLOYING 
11/09/2018 19:42:21 Co-Flat Map(1/2) switched to DEPLOYING 
11/09/2018 19:42:21 Co-Flat Map(2/2) switched to DEPLOYING 
11/09/2018 19:42:21 Co-Flat Map(1/2) switched to RUNNING 
11/09/2018 19:42:21 Source: Collection Source(1/1) switched to RUNNING 
11/09/2018 19:42:21 Co-Flat Map(2/2) switched to RUNNING 
11/09/2018 19:42:21 Source: Collection Source(1/1) switched to RUNNING 
B0
C0
E0
1
3
6
10
11/09/2018 19:42:21 Source: Collection Source(1/1) switched to FINISHED 
11/09/2018 19:42:21 Source: Collection Source(1/1) switched to FINISHED 
A0
D0
F0
A0
1
3
6
10
11/09/2018 19:42:21 Co-Flat Map(2/2) switched to FINISHED 
11/09/2018 19:42:21 Co-Flat Map(1/2) switched to FINISHED 
11/09/2018 19:42:21 Job execution switched to status FINISHED.

结论:broadcast能全部分发到partition。

相关文章

网友评论

    本文标题:flink的broadcast

    本文链接:https://www.haomeiwen.com/subject/kstaxqtx.html