美文网首页
单词统计实现

单词统计实现

作者: 长孙俊明 | 来源:发表于2019-10-12 10:27 被阅读0次
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>mutil-project</artifactId>
        <groupId>com.springcloudtest</groupId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>storm</artifactId>

    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.logging.log4j</groupId>
                    <artifactId>log4j-to-slf4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>1.1.0</version>
        </dependency>

    </dependencies>
</project>
import java.util.HashMap;
import java.util.Map;
import java.util.Random;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 单词计数拓扑
 * 
 * 我认识很多java工程师,都是会一些大数据的技术的,不会太精通,没有那么多的时间去研究
 * storm的课程,我就只是讲到,最基本的开发,就够了,java开发广告计费系统,大量的流量的引入和接入,就是用storm做得
 * 用storm,主要是用它的成熟的稳定的易于扩容的分布式系统的特性
 * java工程师,来说,做一些简单的storm开发,掌握到这个程度差不多就够了
 * 
 * @author Administrator
 *
 */
public class WordCountTopology {
    
    /**
     * spout
     * 
     * spout,继承一个基类,实现接口,这个里面主要是负责从数据源获取数据
     * 
     * 我们这里作为一个简化,就不从外部的数据源去获取数据了,只是自己内部不断发射一些句子
     * 
     * @author Administrator
     *
     */
    public static class RandomSentenceSpout extends BaseRichSpout {

        private static final long serialVersionUID = 3699352201538354417L;
        
        private static final Logger LOGGER = LoggerFactory.getLogger(RandomSentenceSpout.class);

        private SpoutOutputCollector collector;
        private Random random;
        
        /**
         * open方法
         * 
         * open方法,是对spout进行初始化的
         * 
         * 比如说,创建一个线程池,或者创建一个数据库连接池,或者构造一个httpclient
         * 
         */
        @SuppressWarnings("rawtypes")
        public void open(Map conf, TopologyContext context,
                SpoutOutputCollector collector) {
            // 在open方法初始化的时候,会传入进来一个东西,叫做SpoutOutputCollector
            // 这个SpoutOutputCollector就是用来发射数据出去的
            this.collector = collector;
            // 构造一个随机数生产对象
            this.random = new Random();
        }
        
        /**
         * nextTuple方法
         * 
         * 这个spout类,之前说过,最终会运行在task中,某个worker进程的某个executor线程内部的某个task中
         * 那个task会负责去不断的无限循环调用nextTuple()方法
         * 只要的话呢,无限循环调用,可以不断发射最新的数据出去,形成一个数据流
         * 
         */
        public void nextTuple() {
            Utils.sleep(100); 
            String[] sentences = new String[]{"the cow jumped over the moon", "an apple a day keeps the doctor away",
                    "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature"};
            String sentence = sentences[random.nextInt(sentences.length)];
            LOGGER.info("【发射句子】sentence=" + sentence);
            // 这个values,你可以认为就是构建一个tuple
            // tuple是最小的数据单位,无限个tuple组成的流就是一个stream
            collector.emit(new Values(sentence)); 
        }

        /**
         * declareOutputFielfs这个方法
         * 
         * 很重要,这个方法是定义一个你发射出去的每个tuple中的每个field的名称是什么
         * 
         */
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("sentence"));   
        }
        
    }

    /**
     * 写一个bolt,直接继承一个BaseRichBolt基类
     * 
     * 实现里面的所有的方法即可,每个bolt代码,同样是发送到worker某个executor的task里面去运行
     * 
     * @author Administrator
     *
     */
    public static class SplitSentence extends BaseRichBolt {
        
        private static final long serialVersionUID = 6604009953652729483L;
        
        private OutputCollector collector;
        
        /**
         * 对于bolt来说,第一个方法,就是prepare方法
         * 
         * OutputCollector,这个也是Bolt的这个tuple的发射器
         * 
         */
        @SuppressWarnings("rawtypes")
        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }
        
        /**
         * execute方法
         * 
         * 就是说,每次接收到一条数据后,就会交给这个executor方法来执行
         * 
         */
        public void execute(Tuple tuple) {
            String sentence = tuple.getStringByField("sentence"); 
            String[] words = sentence.split(" "); 
            for(String word : words) {
                collector.emit(new Values(word)); 
            }
        }

        /**
         * 定义发射出去的tuple,每个field的名称
         */
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));   
        }
        
    }
    
    public static class WordCount extends BaseRichBolt {

        private static final long serialVersionUID = 7208077706057284643L;
        
        private static final Logger LOGGER = LoggerFactory.getLogger(WordCount.class);

        private OutputCollector collector;
        private Map<String, Long> wordCounts = new HashMap<String, Long>();
        
        @SuppressWarnings("rawtypes")
        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }
        
        public void execute(Tuple tuple) {
            String word = tuple.getStringByField("word");
            
            Long count = wordCounts.get(word);
            if(count == null) {
                count = 0L;
            }
            count++;
            
            wordCounts.put(word, count);
            
            LOGGER.info("【单词计数】" + word + "出现的次数是" + count);
            
            collector.emit(new Values(word, count));
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word", "count"));    
        }
        
    }
    
    public static void main(String[] args) {
        // 在main方法中,会去将spout和bolts组合起来,构建成一个拓扑
        TopologyBuilder builder = new TopologyBuilder();
    
        // 这里的第一个参数的意思,就是给这个spout设置一个名字
        // 第二个参数的意思,就是创建一个spout的对象
        // 第三个参数的意思,就是设置spout的executor有几个
        builder.setSpout("RandomSentence", new RandomSentenceSpout(), 2);
        builder.setBolt("SplitSentence", new SplitSentence(), 5)
                .setNumTasks(10)
                .shuffleGrouping("RandomSentence");
        // 这个很重要,就是说,相同的单词,从SplitSentence发射出来时,一定会进入到下游的指定的同一个task中
        // 只有这样子,才能准确的统计出每个单词的数量
        // 比如你有个单词,hello,下游task1接收到3个hello,task2接收到2个hello
        // 5个hello,全都进入一个task
        builder.setBolt("WordCount", new WordCount(), 10)
                .setNumTasks(20)
                .fieldsGrouping("SplitSentence", new Fields("word"));  
        
        Config config = new Config();
    
        // 说明是在命令行执行,打算提交到storm集群上去
        if(args != null && args.length > 0) {
            config.setNumWorkers(3);  
            try {
                StormSubmitter.submitTopology(args[0], config, builder.createTopology());  
            } catch (Exception e) {
                e.printStackTrace();
            }
        } else {
            // 说明是在eclipse里面本地运行
            config.setMaxTaskParallelism(20);  
            
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("WordCountTopology", config, builder.createTopology());  
            
            Utils.sleep(60000); 
            
            cluster.shutdown();
        }
    }
    
}

相关文章

  • 单词统计实现

  • 分布式计算(一)Hadoop例子

    用Mapreduce实现单词个数统计 计算向量的和输入为A 11 0.3 统计词的个数 其他资料

  • MapReduce 编写 wordcount (二)

    今天我们开始对 mapreduce 进行操作,实现一个功能:编写 mapreduce 实现单词的统计功能,并使用 ...

  • 用Mapreduce实现单词个数统计

    首先有三个文件 red_new.py map_new.py run.sh代码如下map_new.py red_ne...

  • Spark Streaming

    spark streaming 接受socket数据 实现单词统计 运行spark自带的example: 这样一来...

  • 单词统计

    有一个文本文件,被分成了4份,分别放到了4台服务器中存储 Text1:the weather is goodTex...

  • MR编程实例之单词次数统计

    本文讲解利用MR实现简单的单词统计功能。 创建com.test.hadoop.wordcount包,然后在包中创建...

  • python面试题-2018-01-29

    用python实现统计一篇英文文章内每个单词的出现频率,并返回出现频率最高的前10个单词及其出现次数,并解答以下问...

  • MapReduce案例

    一、单词统计 需求分析统计每个单词出现的次数 输入样例 输出样例 示例代码 二、温度统计 需求分析统计每一年的每个...

  • Python3.5+PyQt5词频统计(一)

    一、需求分析 统计给定英文文档中单词出现的频率,要求速度快、准确率高,有良好的交互界面,初期实现简单的分词、统计功...

网友评论

      本文标题:单词统计实现

      本文链接:https://www.haomeiwen.com/subject/qnzfmctx.html