美文网首页大数据BigData
hadoop入门-MapReduce实例(一)

hadoop入门-MapReduce实例(一)

作者: 文贞武毅 | 来源:发表于2019-07-29 17:23 被阅读0次

    本机开发mapreduce,然后打包上传到hadoop服务器

    一、入门wordcount
    1、新建maven项目,pom.xml内容:

    <?xml version="1.0" encoding="UTF-8"?>
    
    <project xmlns="http://maven.apache.org/POM/4.0.0"
    
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    
            xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.luo</groupId>
    
        <artifactId>wordcount</artifactId>
    
        <version>1.0-SNAPSHOT</version>
    
        <packaging>jar</packaging>
    
        <name>wordcount</name>
    
        <url>http://maven.apche.org</url>
    
        <properties>
    
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    
        </properties>
    
        <dependencies>
    
            <dependency>
    
                <groupId>commons-beanutils</groupId>
    
                <artifactId>commons-beanutils</artifactId>
    
                <version>1.9.3</version>
    
            </dependency>
    
            <dependency>
    
                <groupId>org.apache.hadoop</groupId>
    
                <artifactId>hadoop-common</artifactId>
    
                <version>2.7.7</version>
    
            </dependency>
    
            <dependency>
    
                <groupId>org.apache.hadoop</groupId>
    
                <artifactId>hadoop-hdfs</artifactId>
    
                <version>2.7.7</version>
    
            </dependency>
    
            <dependency>
    
                <groupId>org.apache.hadoop</groupId>
    
                <artifactId>hadoop-mapreduce-client-common</artifactId>
    
                <version>2.7.7</version>
    
            </dependency>
    
            <dependency>
    
                <groupId>org.apache.hadoop</groupId>
    
                <artifactId>hadoop-mapreduce-client-core</artifactId>
    
                <version>2.7.7</version>
    
            </dependency>
    
            <dependency>
    
                <groupId>junit</groupId>
    
                <artifactId>junit</artifactId>
    
                <version>3.8.1</version>
    
            </dependency>
    
        </dependencies>
    
    

    然后创建源码目录:src/main/java

    2、编写mapper程序:src/main/java/WordCountMapper

    import org.apache.hadoop.io.IntWritable;
    
    import org.apache.hadoop.io.LongWritable;
    
    import org.apache.hadoop.io.Text;
    
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
        @Override
    
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
            //得到输入的每一行数据
    
            String line = value.toString();
    
            //通过空格分隔
    
            String[] words = line.split(" ");
    
            //循环遍历输出
    
            for(String word:words){
    
                context.write(new Text(word),new IntWritable(1));
    
            }
    
        }
    }
    

    mapreduce程序,因为需要传输数据,所以数据必须是可序列化的,Hadoop定义了自己的可序列化类型,例如Long类型在hadoop中的可序列化为LongWritable,String对应Text,int对应IntWritable。

    mapreduce框架每读到一行数据,就会调用一次这里定义的map方法,map方法接收一个key-value对,继承的Mapper<LongWritable, Text, Text, IntWritable>分别是输入key类型,输入value类型,输出key类型,输出value类型。输入的key默认是mapreduce读到一行文本的起始偏移量,value默认是一行的数据内容。

    输出的key-value是用户自己定义的。

    3、编写reduce程序:src/main/java/WordCountReducer

    import org.apache.hadoop.io.IntWritable;
    
    import org.apache.hadoop.io.Text;
    
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    
        @Override
    
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    
            Integer count = 0;
    
            for (IntWritable value:values){
    
                count+=value.get();
    
            }
    
            context.write(key, new IntWritable(count));
    
        }
    }
    

    reduce方法继承的Reducer<Text, IntWritable, Text, IntWritable>,分别为输入key类型,输入value类型,输出key类型,输出value类型,但是reduce方法接收的value是一个可迭代的数据集,因为reduce任务读取到的map任务处理结果例如是(good,1)(good,1)(good,1)当传给reduce方法时,会经过一个排序和合并的过程,合并完会变成,key:good,value:(1,1,1)

    4、编写主程序:src/main/java/WordCountMapReduce

    import org.apache.hadoop.conf.Configuration;
    
    import org.apache.hadoop.fs.Path;
    
    import org.apache.hadoop.io.IntWritable;
    
    import org.apache.hadoop.io.Text;
    
    import org.apache.hadoop.mapreduce.Job;
    
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    public class WordCountMapReduce {
    
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    
            //创建配置对象
    
            Configuration conf = new Configuration();
    
            //创建job
    
            Job job = Job.getInstance(conf, "wordcount");
    
            //设置运行的job类
    
            job.setJarByClass(WordCountMapReduce.class);
    
            //设置mapper类
    
            job.setMapperClass(WordCountMapper.class);
    
            //设置reduce类
    
            job.setReducerClass(WordCountReducer.class);
    
            //设置map输出的key,value
    
            job.setMapOutputKeyClass(Text.class);
    
            job.setOutputValueClass(IntWritable.class);
    
            //设置reduce输出的key,value
    
            job.setOutputKeyClass(Text.class);
    
            job.setOutputValueClass(IntWritable.class);
    
            //设置输入输出的路径
    
            FileInputFormat.setInputPaths(job, new Path(args[0]));
    
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            //提交job
    
            boolean b = job.waitForCompletion(true);
    
            if(!b){
    
                System.out.println("wordcount task fail!");
    
            }
    
        }
    
    }
    
    

    然后打包上传运行一下,成功~

    相关文章

      网友评论

        本文标题:hadoop入门-MapReduce实例(一)

        本文链接:https://www.haomeiwen.com/subject/uycirctx.html