美文网首页
Trident Stream merge

Trident Stream merge

作者: 正居明阳 | 来源:发表于2018-07-11 23:18 被阅读0次

    /**

    • Created by tangchunsong on 2018/7/11.

    • merge就是把两个流的tuple合成一个

    • 需要注意,第一个流的fields数量要<=第二个流的,否则会报错

    • 如果小于,那么第二个流的fields会被截断,只保留和第一个流的相同数量的前多少个fields

    • 还有一个问题就是:两个batch的tuple合并之后,是放到一个batch里?
      */
      public class StreamMergeMain {
      public static StormTopology buildTopology() {
      // FixedBatchSpout spout1 = new FixedBatchSpout(new Fields("key", "value1", "value5", "value6"), 3, new Values("a", "1", "1", "1"),
      // new Values("b", "2", "1", "1"), new Values("a", "3", "1", "1"), new Values("a", "4", "1", "1"));
      FixedBatchSpout spout1 = new FixedBatchSpout(new Fields("key", "value1", "value5"), 3, new Values("a", "1", "1"),
      new Values("b", "2", "1"), new Values("a", "3", "1"), new Values("a", "4", "1"));

      spout1.setCycle(true);//Spout是否循环发送

      FixedBatchSpout spout2 = new FixedBatchSpout(new Fields("key", "value2", "value3"), 3, new Values("a", "1", "1"),
      new Values("b", "2", "1"), new Values("a", "3", "1"), new Values("a", "5", "1"), new Values("a", "6", "1"));
      spout2.setCycle(true);//Spout是否循环发送

      TridentTopology topology = new TridentTopology();
      Stream stream1 = topology.newStream("spout1", spout1).parallelismHint(2);
      Stream stream2 = topology.newStream("spout2", spout2).parallelismHint(2);

      topology.merge(stream1, stream2)
      .peek(new Consumer() {
      public void accept(TridentTuple input) {
      System.out.println(input.toString());
      try {
      Thread.sleep(1000);
      } catch (InterruptedException e) {
      e.printStackTrace();
      }
      }
      });

      return topology.build();

    }
    public static void main(String[] args) {
    Config conf = new Config();
    conf.setMaxSpoutPending(20);
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("wordCounter", conf, buildTopology());
    }

    }

    相关文章

      网友评论

          本文标题:Trident Stream merge

          本文链接:https://www.haomeiwen.com/subject/aqnkpftx.html