美文网首页
第2章 关于MapReduce 学习笔记

第2章 关于MapReduce 学习笔记

作者: 主君_05c4 | 来源:发表于2019-04-12 20:35 被阅读0次

2.1 使用Hadoop分析数据

2.1.1 map和reduce

MapReduce包含map和reduce两个阶段,每阶段输入输出都为key-value

以下示例为计算每个月最高温度

1. 输入

温度原始数据input.txt,格式为日期,...,温度

2018-01-02,...,12
2018-01-03,...,8
2018-01-04,...,
2018-01-05,...,30
2018-02-01,...,8
2018-02-02,...,15
2018-02-03,...,12
2018-02-04,...,24
2.map
package com.zyf.study;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] values = line.split(",");
        if (values.length > 2) {
            int temperature = Integer.parseInt(values[2]);
            context.write(new Text(values[0].substring(0, 7)), new IntWritable(temperature));
        }
    }
}
  • map输入:key=行号,value=行内容,如key=1,value=2018-01-02,...,12
  • map输出:key=年月,value=温度,如key=2018-01,value=12
3.reduce
package com.zyf.study;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Iterator;

public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {


    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) 
                   throws IOException, InterruptedException {
        int maxValue = Integer.MIN_VALUE;
        for (Iterator<IntWritable> it = values.iterator(); it.hasNext(); ) {
            int t = it.next().get();
            if (t > maxValue) {
                maxValue = t;
            }
        }

        context.write(key, new IntWritable(maxValue));
    }
}
  • reduce输入:key=年月,value=同一年月的温度数组,如key=2018-01,value=[12,8,30]
  • reduce输出:key=年月,value=温度,如key=2018-01,value=30
4. Job启动类
package com.zyf.study;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class MaxTemperature {
    public static void main( String[] args ) throws IOException, ClassNotFoundException, InterruptedException {
        if (args.length < 2) {
            System.err.println("Usage: MaxTemperature <input path> <output path>");
            System.exit(-1);
        }

        Job job = Job.getInstance();

        job.setJobName("MaxTemperature");
        job.setJarByClass(MaxTemperature.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapperClass(MaxTemperatureMapper.class);
        job.setCombinerClass(MaxTemperatureReducer.class);
        job.setReducerClass(MaxTemperatureReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
  • FileInputFormat.addInputPath为任务指定输入数据目录,可以是单个文件、目录、符合文件模式的一系列文件,FileInputFormat.addInputPath可传入用逗号隔开的多个路径;
  • FileOutputFormat.setOutputPath为任务指定输出目录,若指定输出目录已存在,任务将会报错,以防覆盖数据;
  • setMapperClass setReducerClass指定map类和reduce类,setCombinerClass指定合并类,一般与reduce相同,该类可在map计算节点预先合并输出,以减少数据网络传输;
  • setOutputKeyClass setOutputValueClass指定reduce输出结果的数据类型,map输出结果数据类型一般与reduce输入相同,不需要指定,若不同,需要通过setMapOutputKeyClasssetMapOutputValueClass指定;
  • job.waitForCompletion(verbose)入参标识是否打印job执行详细日志,返回值标识job执行成(true)败(false)。
5.执行日志

hadoop jar hadoop-first-1.0-SNAPSHOT.jar D:\temp\hadoop-testdata D:\temp\hadoop-testdata\output
截取部分日志如下:

  • JOB标识
19/04/12 20:07:58 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local281339793_0001
19/04/12 20:07:58 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
19/04/12 20:07:58 INFO mapreduce.Job: Running job: job_local281339793_0001
  • MAP任务部分日志
19/04/12 20:07:59 INFO mapred.LocalJobRunner: Waiting for map tasks
19/04/12 20:07:59 INFO mapred.LocalJobRunner: Starting task: attempt_local281339793_0001_m_000000_0
19/04/12 20:07:59 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
19/04/12 20:07:59 INFO util.ProcfsBasedProcessTree: ProcfsBasedProcessTree currently is supported only on Linux.
19/04/12 20:07:59 INFO mapred.Task:  Using ResourceCalculatorProcessTree : org.apache.hadoop.yarn.util.WindowsBasedProcessTree@6a58318a
19/04/12 20:07:59 INFO mapred.MapTask: Processing split: file:/D:/temp/hadoop-testdata/input.txt:0+146
.
.
19/04/12 20:07:59 INFO mapred.Task: Task:attempt_local281339793_0001_m_000000_0 is done. And is in the process of committing
19/04/12 20:07:59 INFO mapred.LocalJobRunner: map
19/04/12 20:07:59 INFO mapred.Task: Task 'attempt_local281339793_0001_m_000000_0' done.
19/04/12 20:07:59 INFO mapred.Task: Final Counters for attempt_local281339793_0001_m_000000_0: Counters: 18
        File System Counters
                FILE: Number of bytes read=6900
                FILE: Number of bytes written=305142
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
        Map-Reduce Framework
                Map input records=8
                Map output records=7
                Map output bytes=84
                Map output materialized bytes=34
                Input split bytes=104
                Combine input records=7
                Combine output records=2
                Spilled Records=2
                Failed Shuffles=0
                Merged Map outputs=0
                GC time elapsed (ms)=0
                Total committed heap usage (bytes)=257425408
        File Input Format Counters
                Bytes Read=146
19/04/12 20:07:59 INFO mapred.LocalJobRunner: Finishing task: attempt_local281339793_0001_m_000000_0
  • Reduce任务部分日志
19/04/12 20:08:00 INFO mapred.Task: Task 'attempt_local281339793_0001_r_000000_0' done.
19/04/12 20:08:00 INFO mapred.Task: Final Counters for attempt_local281339793_0001_r_000000_0: Counters: 24
        File System Counters
                FILE: Number of bytes read=7000
                FILE: Number of bytes written=305210
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
        Map-Reduce Framework
                Combine input records=0
                Combine output records=0
                Reduce input groups=2
                Reduce shuffle bytes=34
                Reduce input records=2
                Reduce output records=2
                Spilled Records=2
                Shuffled Maps =1
                Failed Shuffles=0
                Merged Map outputs=1
                GC time elapsed (ms)=0
                Total committed heap usage (bytes)=257425408
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Output Format Counters
                Bytes Written=34
19/04/12 20:08:00 INFO mapred.LocalJobRunner: Finishing task: attempt_local281339793_0001_r_000000_0

通过上述日志也可以看到,combiner将原先的7条map输出,合并为2条

6.执行结果

结果文件保存于output目录

._SUCCESS.crc
.part-r-00000.crc
_SUCCESS
part-r-00000

part-r-00000文件内容如下:

2018-01 30
2018-02 24

2.2 横向扩展

为了实现横向扩展,需要将数据存储在分布式文件系统,YARN分发MapReduce计算至数据存储节点;

2.2.1 数据流
  • JOB

包括输入流、MapReduce函数、配置信息
分成若干Map、Reduce任务,任务由YARN调度在集群运行,任务失败将在其他节点自动重新调度运行;

  • 输入流
  • 数据划分成等长小数据块,每个分片创建一个对应map任务,以实现更好的负载均衡,处理性能更高且空闲的机器,能处理的分片也更多,且map任务失败重试,单任务需要重新计算的数据量也会更少,分片越细,负载均衡质量越高,不过若切得太细,管理分片的时间越长,一般合理大小趋于HDFS块大小,默认128M.若分片跨越两个数据块,单HDFS节点不大可能同时存在这两个数据块,会存在网络传输问题。
  • 数据本地优化(data locality optimization),map任务调度至输入数据存储节点上执行,无须网络传输,以获得最佳性能,若数据副本所在节点非空闲,map任务将被调度至输入数据节点所在机架其他空闲map slot执行,非常少情况下,任务被调度至其他机架,这将导致机架间网络传输。
  • map输出流
  • map任务结果写入到本地磁盘,该结果作为reduce输入,Job完成,该结果被删除,无须存在HDFS备份,从而减少网络开销,若map输出传送给reduce之前失败,其他节点将重运行map任务以构建中间结果
本地数据、本地机架、跨机架 map任务
  • reduce输入与输出
  • 单个reduce的输入,是所有map的输出,排过序的map输出通过网络传输至reduce节点,不具备数据本地化优势;
  • 多个map输入,在reduce节点先合并,再由reduce函数处理;
  • reduce输出,存储于HDFS实现高可靠,第一个副本存储于当前节点,其他副本存储于同机架其他节点和其他机架随机节点;
一个reduce任务的MapReduce数据流.png
  • reduce任务数需要指定,而非基于输入数据量;
  • 多个reduce任务数据流如下,map任务根据reduce任务数量,创建对应数量的分区,将输出按分区函数放入对应的分区,默认partitioner通过哈希函数分区,也可自定义;
  • map与reduce之间的数据流称为shuffle(混洗),调整混洗参数对任务总执行时间影响很大;
  • 当数据处理可完全并行(即无需混洗),可能无需reduce,map将结果直接写入HDFS
多个reduce任务的MapReduce数据流.png 无reduce任务的MapReduce数据流.png
2.2.2 Combiner函数
  • 集群上可用带宽限制了MapReduce作业数量,最大化减少map和reduce间数据传输;
  • 可为map任务指定一个combiner以合并每个map输出,Hadoop无法确定对一个指定map任务输出记录调用多少次combiner,不论调多少次,不影响reduce输出;
  • 适用场景,如取最大值、最小值等,不适用场景包括取平均值等
1、
第一个map输出
    (2018-01, 12)
    (2018-01, 8)
    (2018-01, 30)
第二个map输出
    (2018-01, 16)
2、
reduce节点接收到上述4条记录,reduce调用时,输入合并为:
   (2018-01, [12, 8, 30,16])
3、
若调用了combiner且其逻辑与reduce一样
reduce节点只会接收到2条记录,内容如下,
  (2018-01, [30])
  (2018-01, [16])
reduce调用时,输入合并为:
  (2018-01, [30,16])
若取平均值,输出结果不符合预期
mean(12, 8, 30, 16) = 16.5
mean(mean(12, 8, 30), 16) = 16.3
2.2.3 运行分布式MapReduce作业

MapReduce可在本地开发调测,无须修改即可运行在集群,且集群可根据数据量水平扩展。

以上为《Hadoop权威指南》学习笔记

相关文章

  • 第2章 关于MapReduce 学习笔记

    2.1 使用Hadoop分析数据 2.1.1 map和reduce MapReduce包含map和reduce两个...

  • mapreduce框架详解

    参考:hadoop 学习笔记:mapreduce框架详解 [toc] 总结 Mapreduce是一个计算框架,既然...

  • Hadoop MapReduce 学习笔记

    前言 本文是个人之前纪录的MapReduce学习笔记,主要涉及到MapReduce基本概念、Hadoop 经典示例...

  • mapreduce学习笔记

    本文是对mapreduce技术的一个初步学习的总结,包括如下章节的内容: 概述 发展史 基本概念 程序编写 运行测...

  • MapReduce学习笔记

    wordcount: 统计文件中每个单词出现的次数需求:1) 文件内容小:shell2)文件内容很大:TB GB ...

  • MapReduce学习笔记

    MapReduce 一、什么是MapReduce 1.1 定义: MapReduce是Google提出的一个软件架...

  • 学习笔记—MapReduce

    MapReduce是什么 MapReduce是一种分布式计算编程框架,是Hadoop主要组成部分之一,可以让用户专...

  • MapReduce 6.824 学习笔记

    map函数和reduce函数 这两个函数是交给用户实现的,这两个函数定义了任务本身。 map函数:接受一个键值对(...

  • Hadoop之MapReduce And Yarn

    第1章 MapReduce概述 1.1 MapReduce定义 1.2 MapReduce优缺点 1.3 MapR...

  • python中的map和reduce

    什么是MapReduce 摘自wiki中关于MapReduce的说明 MapReduce是Google提出的一个软...

网友评论

      本文标题:第2章 关于MapReduce 学习笔记

      本文链接:https://www.haomeiwen.com/subject/djkhwqtx.html