MapReduce中使用Avro

作者: 名字想好没 | 来源:发表于2019-01-09 22:11 被阅读3次

原文地址:https://itweknow.cn/detail?id=71 ,欢迎大家访问。

上篇文章我们简要介绍了一下Avro是啥,以及其几种数据类型。那么通过这篇文章我们一起来实践一下Avro在MapReduce中的使用。

前提条件

一个maven项目
Hadoop集群,如果你还没有安装的话,请戳这里,查看之前的文章。

说明

本篇文章是一个简单的用例,使用的例子是一个txt文件中存储了大量的学生信息,这些学生有姓名、年龄、爱好和班级信息,我们要做的事情就是通过MapReduce程序找到各个班级年龄最大的学生。

项目依赖

我们需要hadoop以及avro相关的包。

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.8.5</version>
</dependency>

<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>1.8.2</version>
</dependency>

<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro-mapred</artifactId>
    <version>1.8.2</version>
</dependency>

Avro模式

前面也说到了每个学生有姓名、年龄、爱好、班级四个字段的信息,所以我们定义了如下的Avro模式来描述一个学生。命名为Student.avsc,存放在resources目录下。

{
    "type": "record",
    "name": "StudentRecord",
    "doc": "A student",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "age", "type": "int"},
        {"name": "hobby", "type": "string"},
        {"name": "class", "type": "string"}
    ]
}

Mapper和Reducer

  • Mapper
public class StudentAgeMaxMapper extends Mapper<LongWritable, Text,
        AvroKey<String>, AvroValue<GenericRecord>> {

    private GenericRecord record = new GenericData.Record(SchemaUtil.STUDENT_SCHEMA.getSchema());

    private StudentRecordParser parser = new StudentRecordParser();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException,
            InterruptedException {
        parser.parse(value);
        if (parser.isValid()) {
            // 数据合法。
            record.put("name", parser.getName());
            record.put("age", parser.getAge());
            record.put("hobby", parser.getHobby());
            record.put("class", parser.getClazz());
            context.write(new AvroKey<>(parser.getClazz()), new AvroValue<>(record));
        }
    }
}

上面的代码中你可以看到我们自定义了一个StudentRecordParser的类来解析一行记录,由于篇幅的原因这里就不展示了,你可以在后面提供的源码中找到。其实不难看出,Map程序主要做的事情就是将我们存放在txt中的记录解析成一个个的GenericRecord对戏,然后以班级名称为键,record为值传递给Reducer做进一步处理。

  • Reducer
public class StudentAgeMaxReducer extends Reducer<AvroKey<String>, AvroValue<GenericRecord>,
        AvroKey<GenericRecord>, NullWritable> {

    @Override
    protected void reduce(AvroKey<String> key, Iterable<AvroValue<GenericRecord>> values,
                          Context context) throws IOException, InterruptedException {

        GenericRecord max = null;
        for (AvroValue<GenericRecord> value : values) {
            GenericRecord record = value.datum();
            if (max == null || ((Integer)max.get("age") <
                    (Integer) record.get("age"))) {
                max = new GenericData.Record(SchemaUtil.STUDENT_SCHEMA.getSchema());
                max.put("name", record.get("name"));
                max.put("age", record.get("age"));
                max.put("hobby", record.get("hobby"));
                max.put("class", record.get("class"));
            }
        }
        context.write(new AvroKey<>(max), NullWritable.get());
    }
}

Reducer的逻辑其实也比较简单,就是通过循环比较的方式找到年龄最大的学生。

驱动程序

public class StudentAgeMaxDriver {

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        // 注释1:为了解决在Hadoop集群中运行时我们使用的Avro版本和集群中Avro版本不一致的问题。
        configuration.setBoolean(Job.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
        Job job = Job.getInstance(configuration);
        job.setJarByClass(StudentAgeMaxDriver.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.STRING));
        AvroJob.setMapOutputValueSchema(job, SchemaUtil.STUDENT_SCHEMA.getSchema());
        AvroJob.setOutputKeySchema(job, SchemaUtil.STUDENT_SCHEMA.getSchema());
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(AvroKeyOutputFormat.class);
        job.setMapperClass(StudentAgeMaxMapper.class);
        job.setReducerClass(StudentAgeMaxReducer.class);
        System.exit(job.waitForCompletion(true)?0:1);
    }

}

和之前的MapReduce实战中实例比较,我们这里使用AvroJob来配置作业,AvroJob类主要用来给输入、map输出以及最后输出数据指定Avro模式。

项目打包

在打包的时候我们需要将依赖也打到jar包中,不然后面在集群中运行的时候会报找不到AvroJob类的错误。可通过在pom.xml中添加如下插件来解决打包的问题。

<plugin>
    <artifactId>maven-assembly-plugin</artifactId>
    <executions>
        <execution>
            <phase>package</phase>
            <goals>
                <goal>single</goal>
            </goals>
        </execution>
    </executions>
    <configuration>
        <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
        </descriptorRefs>
    </configuration>
</plugin>

运行

  1. 准备输入文件,input.txt。

    zhangsan    23  music   class1
    lisi    24  pingpong    class2
    wangwu  24  dance   class1
    liuyi   25  music   class1
    chener  25  dance   class2
    zhaoliu 22  dance   class2
    sunqi   22  pingpong    class1
    zhouba  23  music   class2
    wujiu   26  dance   class1
    zhengshi    21  dance   class2
    
  2. 将输入文件上传到HDFS上

    hadoop fs -mkdir /input
    hadoop fs -put input.txt /input
    
  3. 将jar拷贝到集群中任意一台Hadoop机器上。

  4. 运行下面的命令执行jar包

    export HADOOP_CLASSPATH=${你的jar包名}
    export HADOOP_USER_CLASSPATH_FIRST=true
    hadoop jar {你的jar包名} {主类路径} /input /output
    
  5. 将运行结果拷贝到本地

    hadoop fs -copyToLocal /output/part-r-00000.avro part-r-00000.avro
    
  6. 运行结果查看

    root@test:~# java -jar /root/extra-jar/avro-tools-1.8.2.jar tojson part-r-00000.avro
    {"name":"wujiu","age":26,"hobby":"dance","class":"class1"}
    {"name":"chener","age":25,"hobby":"dance","class":"class2"}
    
    

想要项目源码吗?戳这里就有哦

相关文章

  • MapReduce中使用Avro

    原文地址:https://itweknow.cn/detail?id=71 ,欢迎大家访问。 上篇文章我们简要介绍...

  • MongoDB mapreduce 使用

    MapReduce介绍 MongoDB 中的MapReduce 使用JavaScript来作为查询语言,因此能表达...

  • Hive调优

    Fetch抓取(Hive可以避免进行MapReduce)Hive中对某些情况的查询可以不必使用MapReduce计...

  • MapReduce

    MapReduce介绍 在Hadoop中计算模型使用的是MapReduce。Hadoop MapReduce计算编...

  • Kafka 中使用 Avro 序列化框架(一):使用传统的 av

    关于 avro 的 maven 工程的搭建以及 avro 的入门知识,可以参考: Apache Avro 入门 ...

  • avro源码阅读-写avro文件

    avro文件结构 avro文件格式大致如下(引用自avro官网Specification的Object Conta...

  • Flume-1.8.0(三)Source支持的类型

    avro 类型的 source 监听 Avro 端口来接收外部 avro 客户端的事件流。和 netcat 不同的...

  • avro源码阅读-读avro文件

    在实际工作中,会将avro文件读到一个类中。下面就看看代码是如何实现的(这里看的是avro-1.7.7版本)。 读...

  • Avro

    Avro[http://avro.apache.org/]是一种与编程语言无关的序列化格式,Avro 数据通过与语...

  • Avro介绍

    1. 介绍 Avro 是 Hadoop 中的一个子项目,也是 Apache 中一个独立的项目,Avro 是一个基于...

网友评论

    本文标题:MapReduce中使用Avro

    本文链接:https://www.haomeiwen.com/subject/idynrqtx.html