美文网首页大数据技术
大数据技术之MapReduce(四)

大数据技术之MapReduce(四)

作者: pauls | 来源:发表于2021-05-02 09:10 被阅读0次

    Hadoop 数据压缩
    概述
    1 )压缩的好处和坏处
    压缩的优点:以减少磁盘 IO、减少磁盘存储空间。
    压缩的缺点:增加 CPU 开销。
    2 ) 压缩原则
    (1)运算密集型的 Job,少用压缩
    (2)IO 密集型的 Job,多用压缩
    4.2 MR 支持的压缩编码
    1)压缩算法对比介绍

    压缩算法
    压缩算法
    2)压缩性能的比较
    比较
    4.3 压缩 方式 选择
    压缩方式选择时重点考虑:压缩/解压缩速度、压缩率(压缩后存储大小)、压缩后是否
    可以支持切片。
    4.3.1 Gzip 压缩
    优点:压缩率比较高;
    缺点:不支持 Split;压缩/解压速度一般;
    4.3.2 Bzip2 压缩
    优点:压缩率高;支持 Split;
    缺点:压缩/解压速度慢。
    4.3.3 Lzo 压缩
    优点:压缩/解压速度比较快;支持 Split;
    缺点:压缩率一般;想支持切片需要额外创建索引。
    4.3.4 Snappy 压缩
    优点:压缩和解压缩速度快;
    缺点:不支持 Split;压缩率一般;
    4.3.5 压缩位置选择
    压缩可以在 MapReduce 作用的任意阶段启用。
    MapReduce数据压缩
    数据压缩
    4.4 压缩 参数 配置
    1)为了支持多种压缩/解压缩算法,Hadoop 引入了编码/解码器
    压缩格式

    2)要在 Hadoop 中启用压缩,可以配置如下参数


    参数
    参数续

    4.5 压缩实操案例
    4.5.1 Map 输出 端采用压缩
    即使你的 MapReduce 的输入输出文件都是未压缩的文件,你仍然可以对 Map 任务的中
    间结果输出做压缩,因为它要写在硬盘并且通过网络传输到 Reduce 节点,对其压缩可以提
    高很多性能,这些工作只要设置两个属性即可,我们来看下代码怎么设置。
    1 )给大家 提供的 Hadoop 源码支持的压缩格式有:BZip2Codec 、DefaultCodec

    package com.sl.mapreduce.compress;
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.compress.BZip2Codec; 
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.apache.hadoop.io.compress.GzipCodec;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import  org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    public class WordCountDriver {
          public static void main(String[] args) throws IOException,
    ClassNotFoundException, InterruptedException {
              Configuration conf = new Configuration();
              // 开启 map 端输出压缩
              conf.setBoolean("mapreduce.map.output.compress", true);
              // 设置 map 端输出压缩方式
              conf.setClass("mapreduce.map.output.compress.codec",
              BZip2Codec.class,CompressionCodec.class);
              Job job = Job.getInstance(conf);
              job.setJarByClass(WordCountDriver.class);
              job.setMapperClass(WordCountMapper.class);
              job.setReducerClass(WordCountReducer.class);
              job.setMapOutputKeyClass(Text.class);
              job.setMapOutputValueClass(IntWritable.class);
              job.setOutputKeyClass(Text.class);
              job.setOutputValueClass(IntWritable.class);
              FileInputFormat.setInputPaths(job, new Path(args[0]));
              FileOutputFormat.setOutputPath(job, new Path(args[1]));
              boolean result = job.waitForCompletion(true);
              System.exit(result ? 0 : 1);
        }
    }
    

    2 )Mapper

    package com.sl.mapreduce.compress;
    import java.io.IOException;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    public class WordCountMapper extends Mapper<LongWritable, Text, Text,IntWritable>{
            Text k = new Text();
            IntWritable v = new IntWritable(1);
            @Override
            protected void map(LongWritable key, Text value, Context
    context)throws IOException, InterruptedException {
                  // 1 获取一行
                  String line = value.toString();
                  // 2 切割
                  String[] words = line.split(" ");
                  // 3 循环写出
                  for(String word:words){
                        k.set(word);
                        context.write(k, v);
                    }
          }
    }
    

    3 )Reducer

    package com.sl.mapreduce.compress;
    import java.io.IOException;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    public class WordCountReducer extends Reducer<Text, IntWritable, Text,IntWritable>{
              IntWritable v = new IntWritable();
              @Override
              protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
                int sum = 0;
                // 1 汇总
                for(IntWritable value:values){
                      sum += value.get();
                }
                v.set(sum);
                // 2 输出
                context.write(key, v);
          }
    }
    

    4.5.2 Reduce 输出 端采用压缩
    基于 WordCount 案例处理。
    1 )修改驱动

    package com.sl.mapreduce.compress;
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.compress.BZip2Codec;
    import org.apache.hadoop.io.compress.DefaultCodec;
    import org.apache.hadoop.io.compress.GzipCodec;
    import org.apache.hadoop.io.compress.Lz4Codec;
    import org.apache.hadoop.io.compress.SnappyCodec;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    public class WordCountDriver {
          public static void main(String[] args) throws IOException,
    ClassNotFoundException, InterruptedException {
              Configuration conf = new Configuration();
              Job job = Job.getInstance(conf);
              job.setJarByClass(WordCountDriver.class);
              job.setMapperClass(WordCountMapper.class);
              job.setReducerClass(WordCountReducer.class);
              job.setMapOutputKeyClass(Text.class);
              job.setMapOutputValueClass(IntWritable.class);
              job.setOutputKeyClass(Text.class);
              job.setOutputValueClass(IntWritable.class);
              FileInputFormat.setInputPaths(job, new Path(args[0]));
              FileOutputFormat.setOutputPath(job, new Path(args[1]));
              // 设置 reduce 端输出压缩开启
              FileOutputFormat.setCompressOutput(job, true);
              // 设置压缩的方式
              FileOutputFormat.setOutputCompressorClass(job,BZip2Codec.class);
              // FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
              // FileOutputFormat.setOutputCompressorClass(job,
    DefaultCodec.class);
              boolean result = job.waitForCompletion(true);
              System.exit(result?0:1);
        }
    }
    

    4.5.3 常见错误及 解决方案
    1)导包容易出错。尤其 Text 和 CombineTextInputFormat。
    2)Mapper 中第一个输入的参数必须是 LongWritable 或者 NullWritable,不可以是 IntWritable.
    报的错误是类型转换异常。
    3)java.lang.Exception: java.io.IOException: Illegal partition for 13926435656 (4),说明 Partition
    和 ReduceTask 个数没对上,调整 ReduceTask 个数。
    4)如果分区数不是 1,但是 reducetask 为 1,是否执行分区过程。答案是:不执行分区过程。
    因为在 MapTask 的源码中,执行分区的前提是先判断 ReduceNum 个数是否大于 1。不大于
    1 肯定不执行。
    5)在 Windows 环境编译的 jar 包导入到 Linux 环境中运行,
    hadoop jar wc.jar com.sl.mapreduce.wordcount.WordCountDriver
    /user/sl/
    /user/sl/output
    报如下错误:
    Exception in thread "main" java.lang.UnsupportedClassVersionError:
    com/atguigu/mapreduce/wordcount/WordCountDriver : Unsupported major.minor version 52.0
    原因是 Windows 环境用的 jdk1.7,Linux 环境用的 jdk1.8。
    解决方案:统一 jdk 版本。
    6)缓存 pd.txt 小文件案例中,报找不到 pd.txt 文件
    原因:大部分为路径书写错误。还有就是要检查 pd.txt.txt 的问题。还有个别电脑写相对路径
    找不到 pd.txt,可以修改为绝对路径。
    7)报类型转换异常。
    通常都是在驱动函数中设置 Map 输出和最终输出时编写错误。
    Map 输出的 key 如果没有排序,也会报类型转换异常。
    8)集群中运行 wc.jar 时出现了无法获得输入文件。
    原因:WordCount 案例的输入文件不能放用 HDFS 集群的根目录。
    9)出现了如下相关异常
    Exception in thread "main" java.lang.UnsatisfiedLinkError:
    org.apache.hadoop.io.nativeio.NativeIOWindows.access0(Ljava/lang/String;I)Z at org.apache.hadoop.io.nativeio.NativeIOWindows.access0(Native Method)
    at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:609)
    at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:977)
    java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
    at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:356)
    at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:371)
    at org.apache.hadoop.util.Shell.<clinit>(Shell.java:364)
    解决方案:拷贝 hadoop.dll 文件到 Windows 目录 C:\Windows\System32。个别同学电脑
    还需要修改 Hadoop 源码。
    方案二:创建如下包名,并将 NativeIO.java 拷贝到该包名下

    image.png
    10)自定义 Outputformat 时,注意在 RecordWirter 中的 close 方法必须关闭流资源。否则输出的文件内容中数据为空。
    @Override
    public  void  close(TaskAttemptContext  context)  throws  IOException,InterruptedException {
          if (atguigufos != null) {
            atguigufos.close();
          }
          if (otherfos != null) {
            otherfos.close();
          }
    }
    

    相关文章

      网友评论

        本文标题:大数据技术之MapReduce(四)

        本文链接:https://www.haomeiwen.com/subject/xscukltx.html