美文网首页
Mapreduce切片机制

Mapreduce切片机制

作者: 手扶拖拉机_6e4d | 来源:发表于2019-11-10 11:28 被阅读0次
  • FileInputFormat切片机制

切片机制:
简单的按照文件的内容长度进行切片
切片大小,默认等于Block大小
切片时不考虑数据集的整体,而是逐个针对每一个文件单独切片

  • 案例分析

输入数据有两个文件:
file1.txt 320M
file2.txt 10M

经过FileInputFormat的切片机制运算后,形成的切片信息如下:
file1.txt.split2-- 0~128
file1.txt.split3-- 128~256
file1.txt.split3-- 256~320
file1.txt.split1-- 0~10M

  • 源码中计算切片大小的公式

Math.max(minSize, Math.min(maxSize,blockSize))
mapreduce.input.fileinputformat.split.minsize=1 默认值为1
mapreduce.input.fileinputformat.split.maxsize=Long.MAXValue 默认值Long.MAXValue
切片大小设置:
maxsize(切片最大值):参数如果调的比blockSize小,则会让切片变小,而且就等于配置的这个参数的值
minsize(切片最小值):参数调的比blockSize大,则可以让切片变得比blockSize还大

  • 获取切片信息API:

//获取切片的文件名称
String name = inputSplit.getPath().getName();
//根据文件类型获取切片信息
FileSplit inputSplit = (FileSplit) context.getInputSplit();

  • ConbineTextInputFormat切片机制:

框架默认的TextInputFormat切片机制是对任务按文件规划切开,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量小文件,会产生大量的MapTask

应用场景:ConbineTextInputFormat 用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件可以交给一个MapTask处理

虚拟存储切片最大值设置:

ConbineTextInputFormat.setMaxInputSplitSize(job, 4194304); //4M
注意:虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值

  • KeyValueTextInputFormat:

每一行均为一条记录,被分隔符分割为key,value,可以通过在驱动类中设置conf.set(KeyValueLineRecorder.KEY_VALUE_SEPERATOR, "\t"); 来设定分隔符
默认分隔符是tab(\t)
以下是一个案例,输入的是包含4片记录的分片,其中 ->表示一个水平方向的制表符

line1 -> Rich Learning from
line2 -> Intelligent learning engine
line3 -> from the china

每条记录表示为以下键/值对
(line1,Rich Learning from)
(line2,Intelligent learning engine)
(line3,from thc china)

  • 案例分析:
    1.需求:统计输入文件中每一行的第一个单词相同的行数
    2.输入数据
wudy reading books
xihuan china
wudy is stronger
xihuan huawei mate30 pro

3.期望输出数据
wudy 2
xihuan 2

4.Map阶段
wudy reading books

4.1>设置key和value
<wudy,1>

5.Reduce阶段
<wudy,1>
<xihuan,1>
<wudy,1>
<xihuan,1>

汇总:
<wudy,2>
<xihuan,2>

6.Driver
6.1>设置切割符
conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");
6.2>设置输入格式
job.setInputFormatClass(KeyValueTextInputFormat.class);

案例代码如下:

  • KVTextMapper
package com.mr.kv;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class KVTextMapper extends Mapper<Text, Text, Text, IntWritable> {
    // 1.封装对象
    IntWritable v = new IntWritable(1);

    @Override
    protected void map(Text key, Text value, Context context)
            throws IOException, InterruptedException {
        // 2.写出
        context.write(key, v);
    }
}

  • KVTextReducer
package com.mr.kv;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

public class KVTextReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    IntWritable v = new IntWritable(1);

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        // 1.累加求和
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        v.set(sum);
        // 2.写出
        context.write(key, v);
    }
}
  • KVTextDriver
package com.mr.kv;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;

public class KVTextDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

        args = new String[]{
            "/Users/XXX/Desktop/word/input/kvtext.txt", "/Users/XXX/Desktop/word/kvoutput"
        };

        Configuration conf = new Configuration();

        // 设定分隔符, 默认分隔符是tab(\t)
        conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");

        // 1.获取job对象
        Job job = Job.getInstance(conf);

        // 2.设置jar存储位置
        job.setJarByClass(KVTextDriver.class);

        // 3.关联Mapper和Reducer类
        job.setMapperClass(KVTextMapper.class);
        job.setReducerClass(KVTextReducer.class);

        // 4.设置Mapper阶段输出数据的key和value类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 5.设置最终数据输出的key和value类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 设置输入格式
        job.setInputFormatClass(KeyValueTextInputFormat.class);

        // 6.设置输入路径和输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7.提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}
  • 运行后的输出结果:


    088B8A30-862D-4CA6-8A0C-E7145A8BE6B4.png

NLineInputFormat

  • 如果使用NLineInputFormat,代表每个map进程处理掉的InputSplit不再按照Block块去划分,而是按照NLineInputFormat指定的行数N来划分。
    即输入文件的总行数/N = 切片数,如果不整除,切片数=商+1

以下是一个示例:

line1 Rich Learning from
line2 Intelligent learning engine
line3 from thc china
line4 Tomorrow is another day

例如:如果N是2,则每个输入分片包涵两行,开启两个MapTask

(0,line1 Rich Learning from)
(25,line2 Intelligent learning engine)

另一个mapper则收到两行

(59,line3 from thc china)
(70,line4 Tomorrow is another day)

这里的键值与TextInputFormat生成的一样

案例:
对每个单词进行个数统计,根据每个输入文件的行数来规定输出多少个切片。此案例要求每三行放入一个切片中

  • 输入数据
    wudy nihao
    wudy like hadoop and flink
    xihuan you
    wudy reading books
    xihuan china
    wudy is stronger
    xihuan huawei mate30 pro
    xihuan is love

  • 代码实现

  • Mapper阶段

package com.nline.input.format;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.io.Text;

import java.io.IOException;

public class NLineInputFormatMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        Text k = new Text();

        // 1.获取一行
        String line = value.toString();

        // 2.切割
        String[] words = line.split(" ");

        // 3.循环写出
        for (String word : words) {
            k.set(word);
            IntWritable v = new IntWritable(1);
            context.write(k, v);
        }
    }
}
  • Reduce阶段
package com.nline.input.format;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class NLineInputFormatReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        IntWritable v = new IntWritable(1);
        // 1.累加求和
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }

        v.set(sum);

        // 2.写出
        context.write(key, v);
    }
}
  • Driver
package com.nline.input.format;

import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class NLineInputFormatDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException{
        args = new String[]{
            "/Users/yuliang/Desktop/word/input/nlineformat.txt", "/Users/yuliang/Desktop/word/nline_output"
        };

        Configuration conf = new Configuration();

        // 1.获取job对象
        Job job = Job.getInstance(conf);

        // 2.设置jar存储位置
        job.setJarByClass(NLineInputFormatDriver.class);

        // 3.关联Mapper和Reducer类
        job.setMapperClass(NLineInputFormatMapper.class);
        job.setReducerClass(NLineInputFormatReducer.class);

        // 4.设置Mapper阶段输出数据的key和value类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 5.设置最终数据输出的key和value类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 设置每个切片InputSplit中划分三条记录
        NLineInputFormat.setNumLinesPerSplit(job, 3);

        // 使用NLineInputFormat处理记录数(默认是 TextInputFormat)
        job.setInputFormatClass(NLineInputFormat.class);

        // 6.设置输入路径和输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7.提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}
  • 输出效果:
and 1
books   1
china   1
flink   1
hadoop  1
huawei  1
is  2
like    1
love    1
mate30  1
nihao   1
pro 1
reading 1
stronger    1
wudy    4
xihuan  4
you 1

相关文章

网友评论

      本文标题:Mapreduce切片机制

      本文链接:https://www.haomeiwen.com/subject/opxnbctx.html