美文网首页
Storm | WordCount

Storm | WordCount

作者: icebreakeros | 来源:发表于2019-07-05 20:30 被阅读0次

    wordcount

    应用设计

    storm jar jar路径 拓扑包名.拓扑类名 拓扑名称
    storm kill 拓扑名称

    消息源Spout,继承BaseRichSpout类/实现IRichSpout接口
    open():初始化动作
    nextTuple():消息接入,执行数据发射
    ack()tuple成功处理后调用
    fail()tuple处理失败时调用
    declareOutputFields():通常声明输出字段

    处理单元Bolt,继承BaseBasicBolt类/实现IRichBolt接口
    prepare()worker启动时初始化
    execute():接受一个tuple并执行逻辑处理,发射出去
    cleanup():关闭前调用
    declareOutputFields():字段申明

    编码

    vim pom.xml
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>com.icebreakeros.boot.Application</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
        </plugins>
    </build>
    
    <dependencies>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>1.1.0</version>
        </dependency>
    </dependencies>
    
    # WordCountTopology.java
    public class WordCountTopology {
    
        private static TopologyBuilder topologyBuilder = new TopologyBuilder();
    
        public static void main(String[] args) {
            Config config = new Config();
            topologyBuilder.setSpout("RandomSentence", new RandomSentenceSpout(), 2);
            topologyBuilder.setBolt("WordNormalizer", 
                new WordNormalizerBolt(), 
                2).shuffleGrouping("RandomSentence");
            
            topologyBuilder.setBolt("WordCount", 
                new WordCountBolt(), 
                2).fieldsGrouping("WordNormalizer", new Fields("word"));
    
            topologyBuilder.setBolt("Print", 
                new PrintBolt(), 1).shuffleGrouping("WordCount");
            config.setDebug(false);
    
            if (args != null && args.length > 0) {
                config.setNumWorkers(1);
                try {
                    StormSubmitter.submitTopology(args[0], 
                        config, topologyBuilder.createTopology());
                } catch (Exception e) {
    
                }
            } else {
                config.setMaxTaskParallelism(1);
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("wordcount", 
                    config, topologyBuilder.createTopology());
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:Storm | WordCount

          本文链接:https://www.haomeiwen.com/subject/oqjyhctx.html