美文网首页storm
Storm坑集锦

Storm坑集锦

作者: Magia | 来源:发表于2016-10-26 19:20 被阅读132次

    KafkaSpout

    1.poutConfig继承KafkaConfig,可以通过SpoutConfig设置kafkaSpout基本属性
    spoutConfig.forceFromStart可以设置不从kafka初始位置消费,以免重复消费数据。
    2.Config.TOPOLOGY_MAX_SPOUT_PENDING配置可以动态对kafka消费进行限流。

    EsBolt

    1)向Es发数据时发生了NullPointerException:
    at org.codehaus.jackson.util.TextBuffer.findBuffer(TextBuffer.java:207)
    refer

    2)用户自定义esIndex

    builder.setBolt("bolt1", new Bolt1(), 2).shuffleGrouping("bolt1");
    builder.setBolt("testBolt", new EsBolt("{esIndex}/" + "test", conf), 2).shuffleGrouping("bolt2");
    

    在Bolt1中定义esIndex

    public class TimeBasedIndexNameBuilder {
        public static String build(String indexPrefix, Date collectTime) {
            return indexPrefix + "_" + new SimpleDateFormat("yyyy-MM-dd").format(collectTime);
        }
    }
    String esIndex = TimeBasedIndexNameBuilder.build("agentX", new Date());
    

    如上,EsBolt就会传入agentX_2016-10-26/test,在ES服务器上生成index:agentX_2016-10-26,type:test

    bolt继承多spout

    现有bolt需要接受来自spout1和spout2的数据流,可通过getSourceComponent来判断数据流来自哪个spout,然后做进一步处理。

    //spou1
    builder.setSpout("spout1", new Spout1(spoutConfig), 2);
    //spou2
    builder.setSpout("spout2", new Spout2(), 2);
    builder.setBolt("bolt1", new Bolt1(), 2).allGrouping("spout1").shuffleGrouping("spout2");
    

    Bolt1部分代码如下:

    @Override
    public void execute(Tuple input) {
        //判断数据流来自Spout1
        if(input.getSourceComponent().equals("spout1")) {
            ...
        } else {
            ...
        }
    }
    

    相关文章

      网友评论

        本文标题:Storm坑集锦

        本文链接:https://www.haomeiwen.com/subject/izxjuttx.html