美文网首页
MapReduce实现词频统计

MapReduce实现词频统计

作者: 扎西的德勒 | 来源:发表于2021-02-22 15:02 被阅读0次

    一、MapReduce编程指导思想

    MapReduce的开发一共有八个步骤其中map阶段分为2个步骤,shuffle阶段4个步骤,reduce阶段分为2个步骤。

    1. Map阶段2个步骤

    • 第一步:设置inputFormat类,将数据切分成key,value对,输入到第二步

    • 第二步:自定义map逻辑,处理我们第一步的输入kv对数据,然后转换成新的key,value对进行输出

    2. shuffle阶段4个步骤

    • 第三步:对上一步输出的key,value对进行分区。(相同key的kv对属于同一分区)

    • 第四步:对每个分区的数据按照key进行排序

    • 第五步:对分区中的数据进行规约(combine操作),降低数据的网络拷贝(可选步骤)

    • 第六步:对排序后的kv对数据进行分组;分组的过程中,key相同的kv对为一组;将同一组的kv对的所有value放到一个集合当中(每组数据调用一次reduce方法)

    3. reduce阶段2个步骤

    • 第七步:对多个map的任务进行合并,排序,写reduce函数自己的逻辑,对输入的key,value对进行处理,转换成新的key,value对进行输出

    • 第八步:设置将输出的key,value对数据保存到文件中

    二、MapReduce编程实现

    1. 源数据准备

    hello,hello
    world,world
    hadoop,hadoop
    hello,world
    hello,flume
    hadoop,hive
    hive,kafka
    flume,storm
    hive,oozie
    

    2. 代码实现

    创建maven工程导入jar包
     <properties>
            <hadoop.version>3.1.4</hadoop.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-hdfs</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-core</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/junit/junit -->
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.11</version>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.testng</groupId>
                <artifactId>testng</artifactId>
                <version>RELEASE</version>
            </dependency>
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>1.2.17</version>
            </dependency>
        </dependencies>
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.0</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                        <encoding>UTF-8</encoding>
                        <!--   <verbal>true</verbal>-->
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>2.4.3</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <minimizeJar>true</minimizeJar>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    
    定义WordCountMain主类

    固定写法,实现mapreduce8个步骤:

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class WordCountMain extends Configured implements Tool {
    
        @Override
        public int run(String[] args) throws Exception {
            //定义MapReduce的8个步骤 然后通过Job组装
    
            //继承父类 Configured 获取conf
            Configuration conf = super.getConf();
            Job job = Job.getInstance(conf, "WordCount");
    
            //如果需要集群运行,需要设置程序运行主类;若本地运行,则不需要该设置
            job.setJarByClass(WordCountMain.class);
    
            //通过Job组装MpaReduce的8个步骤
            //第一步:读取文件,解析成为key,value对(k1,v1)
            job.setInputFormatClass(TextInputFormat.class);
            TextInputFormat.addInputPath(job,new Path("file:///D:\\input\\数据文件"));
            //第二步:自定义map逻辑,接受(k1,v1) 转换成为新的 (k2,v2)
            job.setMapperClass(MyMapper.class);
            //设置map的输出的key,value对象(k2,v2)的类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            /**
             * 第三步:分区
             * 第四步:排序
             * 第五步:规约
             * 第六步:分组
             */
    
            //第七步:自定义reduce逻辑
            job.setReducerClass(MyReduce.class);
            //设置reduce的输出key,value对象(k3,v3)的类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            //第八步:
            job.setOutputFormatClass(TextOutputFormat.class);
            TextOutputFormat.setOutputPath(job,new Path("file:///D:\\output"));
    
            //提交整个job任务,等到整个任务结束之后,如果任务执行成功,返回true;如果执行失败,返回false
            boolean b = job.waitForCompletion(true);
    
            //使用三余运算符
            return b?0:1;
    
        }
        /**
         * MapReduce进行WordCount的入口类
         */
        public static void main(String[] args) throws Exception {
            //获取配置文件
            Configuration configuration = new Configuration();
            //通过ToolRunner.run 执行程序的入口
            int run = ToolRunner.run(configuration, new WordCountMain(), args);
            //整个系统的退出
            System.exit(run);
        }
    
    }
    
    定义Mapper类
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    /**
     * 使用TextInputFormat读取文件,一行一行进行读取
     *   key:行偏移量 一般为LongWritable
     *   value: 一行文本记录为 Text
     * 每个单词出现一次,记做一次
     *   key:单词  Text
     *   value: 1 IntWritable
     */
    
    public class MyMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
    
        //准备好输出的: <Text,IntWritable>
        Text text = new Text();
        IntWritable intWritable = new IntWritable(1);
        /**
         *
         * @param key
         * @param value   一行文本记录
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //Text value: 对文件一行记录进行处理
            String line = value.toString();
            String[] split = line.split(",");
    
            for (String word : split){
                //context 为 <Text,IntWritable>
                text.set(word);
                context.write(text,intWritable);
            }
    
        }
    
    }
    
    定义Reducer类
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    public class MyReduce extends Reducer<Text, IntWritable, Text,IntWritable> {
    
        /**
         *
         * @param key
         * @param values
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int result = 0;
            for(IntWritable value:values){
                int i = value.get();
                result += i;
            }
            IntWritable intWritable = new IntWritable(result);
            context.write(key,intWritable);
        }
    
    }
    

    注意: 切记导入正确的jar包,开发中由于导入了错误的Text包,导致返回code1,也没有报错,切记切记。

    相关文章

      网友评论

          本文标题:MapReduce实现词频统计

          本文链接:https://www.haomeiwen.com/subject/mwtkfltx.html