/**
-
Created by tangchunsong on 2018/7/11.
-
merge就是把两个流的tuple合成一个
-
需要注意,第一个流的fields数量要<=第二个流的,否则会报错
-
如果小于,那么第二个流的fields会被截断,只保留和第一个流的相同数量的前多少个fields
-
还有一个问题就是:两个batch的tuple合并之后,是放到一个batch里?
*/
public class StreamMergeMain {
public static StormTopology buildTopology() {
// FixedBatchSpout spout1 = new FixedBatchSpout(new Fields("key", "value1", "value5", "value6"), 3, new Values("a", "1", "1", "1"),
// new Values("b", "2", "1", "1"), new Values("a", "3", "1", "1"), new Values("a", "4", "1", "1"));
FixedBatchSpout spout1 = new FixedBatchSpout(new Fields("key", "value1", "value5"), 3, new Values("a", "1", "1"),
new Values("b", "2", "1"), new Values("a", "3", "1"), new Values("a", "4", "1"));spout1.setCycle(true);//Spout是否循环发送
FixedBatchSpout spout2 = new FixedBatchSpout(new Fields("key", "value2", "value3"), 3, new Values("a", "1", "1"),
new Values("b", "2", "1"), new Values("a", "3", "1"), new Values("a", "5", "1"), new Values("a", "6", "1"));
spout2.setCycle(true);//Spout是否循环发送TridentTopology topology = new TridentTopology();
Stream stream1 = topology.newStream("spout1", spout1).parallelismHint(2);
Stream stream2 = topology.newStream("spout2", spout2).parallelismHint(2);topology.merge(stream1, stream2)
.peek(new Consumer() {
public void accept(TridentTuple input) {
System.out.println(input.toString());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});return topology.build();
}
public static void main(String[] args) {
Config conf = new Config();
conf.setMaxSpoutPending(20);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("wordCounter", conf, buildTopology());
}
}
网友评论