测试broadcast:
代码:
package SpamFilter;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.util.Collector;
import Bolts.TraWord;
public class test {
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
String[] NUMS = new String[] {"1","2","3","4","5","6"};
String[] ZI = new String[] {"A","B","C","D","E","F","A"};
Integer[] a = new Integer[] {1,2,3,4};
DataStream<String> text1;
DataStream<String> text2;
DataStream<Integer> text3;
text1 = env.fromElements(NUMS);
text2 = env.fromElements(ZI);
text3 = env.fromElements(a).broadcast();
DataStream<String> TraWordProbability = text2
.keyBy(new KeySelector<String, String>() {
@Override
public String getKey(String in) throws Exception {
// TODO Auto-generated method stub
return in;
}
})
.connect(text3)
.flatMap(new CoFlatMapFunction<String, Integer, String>() {
int count = 0;
@Override
public void flatMap1(String input, Collector<String> out) throws Exception {
// TODO Auto-generated method stub
String b = input + String.valueOf(count);
System.out.println(b);
}
@Override
public void flatMap2(Integer input2, Collector<String> out) throws Exception {
// TODO Auto-generated method stub
count +=input2;
System.out.println(count);
}
})
.setParallelism(2);
env.execute("test");
}
}
结果:
Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1234736298] with leader session id cbd2b49e-809d-459d-b884-248d50caad15.
11/09/2018 19:42:21 Job execution switched to status RUNNING.
11/09/2018 19:42:21 Source: Collection Source(1/1) switched to SCHEDULED
11/09/2018 19:42:21 Source: Collection Source(1/1) switched to SCHEDULED
11/09/2018 19:42:21 Co-Flat Map(1/2) switched to SCHEDULED
11/09/2018 19:42:21 Co-Flat Map(2/2) switched to SCHEDULED
11/09/2018 19:42:21 Source: Collection Source(1/1) switched to DEPLOYING
11/09/2018 19:42:21 Source: Collection Source(1/1) switched to DEPLOYING
11/09/2018 19:42:21 Co-Flat Map(1/2) switched to DEPLOYING
11/09/2018 19:42:21 Co-Flat Map(2/2) switched to DEPLOYING
11/09/2018 19:42:21 Co-Flat Map(1/2) switched to RUNNING
11/09/2018 19:42:21 Source: Collection Source(1/1) switched to RUNNING
11/09/2018 19:42:21 Co-Flat Map(2/2) switched to RUNNING
11/09/2018 19:42:21 Source: Collection Source(1/1) switched to RUNNING
B0
C0
E0
1
3
6
10
11/09/2018 19:42:21 Source: Collection Source(1/1) switched to FINISHED
11/09/2018 19:42:21 Source: Collection Source(1/1) switched to FINISHED
A0
D0
F0
A0
1
3
6
10
11/09/2018 19:42:21 Co-Flat Map(2/2) switched to FINISHED
11/09/2018 19:42:21 Co-Flat Map(1/2) switched to FINISHED
11/09/2018 19:42:21 Job execution switched to status FINISHED.
结论:broadcast能全部分发到partition。
网友评论