美文网首页
Trident join

Trident join

作者: 正居明阳 | 来源:发表于2018-07-11 22:54 被阅读0次

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.Consumer;
import org.apache.storm.trident.testing.FixedBatchSpout;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

/**

  • Created by tangchunsong on 2018/7/11.
  • 结论:
  • topology.join 是将两个stream内的两个batch的tuple进行join,join完了,这两个batch就pass了
  • 轮到这两个流的分别的下面的batch进行join了
  • 两个batch进行join是:
  •  只有key能join上,才会输出,inner join
    

*/
public class StreamJoinMain {

public static StormTopology buildTopology() {
FixedBatchSpout spout1 = new FixedBatchSpout(new Fields("key", "value1"), 3, new Values("a", "1"),
new Values("b", "2"), new Values("a", "3"), new Values("a", "4"));
spout1.setCycle(true);//Spout是否循环发送

FixedBatchSpout spout2 = new FixedBatchSpout(new Fields("key", "value2"), 3, new Values("a", "1"),
        new Values("b", "2"), new Values("a", "3"), new Values("a", "5"), new Values("a", "6"));
spout2.setCycle(true);//Spout是否循环发送

TridentTopology topology = new TridentTopology();
Stream stream1 = topology.newStream("spout1", spout1).parallelismHint(2);
Stream stream2 = topology.newStream("spout2", spout2).parallelismHint(2);

topology.join(stream1, new Fields("key"), stream2, new Fields("key"), new Fields("key", "value1", "value2"))
        .peek(new Consumer() {
          public void accept(TridentTuple input) {
            System.out.println(input.toString());
            try {
              Thread.sleep(2000);
            } catch (InterruptedException e) {
              e.printStackTrace();
            }
          }
        });

return topology.build();

}
public static void main(String[] args) {
Config conf = new Config();
conf.setMaxSpoutPending(20);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("wordCounter", conf, buildTopology());
}
}

相关文章

  • Trident join

    import org.apache.storm.Config;import org.apache.storm.Lo...

  • trident的另一种join

    如果这个流有先后顺序,那么先的那个流,先写到kv存储然后后的流,从kv中查找相同key的value,进行合并,然后...

  • 不同浏览器的内核

    Trident Trident内核代表产品Internet Explorer,又称其为IE内核。Trident(又...

  • Storm Trident之一spout和bolt

    Trident Spout Trident Spout特点 Trident中,定义Spout的接口为ITriden...

  • Apache Storm Trident

    Apache Storm Trident Trident是Storm的延伸。像Storm,Trident也是由Tw...

  • Trident

  • The Trident

    The fiery spirit whispered in the air, He told me there i...

  • storm笔记:Trident状态

    在storm笔记:Trident应用中说了下Trident的使用,这里说下Trident几种状态的变化及其对应AP...

  • 常用浏览器及其内核

    Trident Trident(IE内核):该内核程序在1997年的IE4中首次被采用 Trident内核的常见浏...

  • javascript 临时总结

    varbrowserData={//移动终端浏览器版本信息 trident:u.indexOf('Trident'...

网友评论

      本文标题:Trident join

      本文链接:https://www.haomeiwen.com/subject/nlwkpftx.html