美文网首页
05_hadoop_wordcounts_yarn的配置_mr的

05_hadoop_wordcounts_yarn的配置_mr的

作者: 会摄影的程序员 | 来源:发表于2019-02-03 23:15 被阅读0次

1.导入jar包

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.2.0</version>
        </dependency>
    </dependencies>

2. 继承Mapper

org.apache.hadoop.mapreduce.Mapper 下的

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
  1. KEYIN
    是map task 读取到的数据的key的类型,是一行的启始偏移量Long
  2. VALUEIN
    是map task读取到的数据的value的类型,是一行的内容Stirng
  3. KEYOUT
    是用户的自定义map方法要返回的数据kv数据的key的类型
  4. VALUEOUT
    用户的自定map方法要返回的结果kv数据的value的数据类型

在mapreduce中,map参数的数据传输给reduce,需要进行序列化和反序列化,而JDK中的原生序列化机制产生的数据比较冗余,就会导致MAPREDUCE运行过程中传输效率低
所以 hadoop专门设计了自己的序列化机制
hadoop为常用数据类型封装了自己的实现了hadoop序列化

java基本类型 Long String Integer Float
hadoop基本类型 LongWritable Text IntWritable FloatWritable

core:

public class WorkcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] words = line.split(" ");
        for (String word : words) {
            context.write(new Text(word), new IntWritable(1));
        }
    }
}

2 另外的一个class 继承 Reducer

Reducer<Text, IntWritable, Text, IntWritable>
core:

public class WordcountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {

        int sun = 0;
        Iterator<IntWritable> iterator = values.iterator();
        while (iterator.hasNext()){
            IntWritable va = iterator.next();
            sun+=va.get();
        }

        context.write(key, new IntWritable(sun));
    }
}

3 配置yarn

在每台机器上配置

node manager在物理上应该跟data node部署在一起
resource manager在物理上应该独立部署在一台专门的机器上
此处为demo 所有 namenode 与resourcemanager 放在同一台服务器

  1. 修改每台机器配置文件:
    vi yarn-site.xml
<property>
<name>yarn.resourcemanager.hostname</name>
<value>vm01</value>
</property>

<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>

<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>2048</value>
</property> 

<property>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>2</value>
</property> 

其中vm01为 resource manager
mapreduce_shuffle为固定写法
1024 分配的内存 最小2048MB 因为yarn.app.mapreduce.am.resource.mb 需要有1536MB 如果设置小雨1536 运行yarn程序时会报错。当然不是运行就占用这么大的内存,而是最大可以用这么多。
2 为cpu个数 最小1个

  1. 启动yarn集群:start-yarn.sh (注:该命令应该在resource manager所在的机器上执行,否则resource manager会在执行这个命令的机器上启动)
  2. 用jps检查yarn的进程,用web浏览器查看yarn的web控制台
    http://vm01:8088

4 运行

4.1 写windows提到交到Linux中yarn的程序

package com.looc.main;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

/**
 * @author chenPeng
 * @version 1.0.0
 * @ClassName JobSubmitter.java
 * @Description TODO
 * @createTime 2019年02月01日 16:55:00
 */
public class JobSubmitter {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        //在代码中设置 获取HDFS以及yarn的用户身份
        System.setProperty("HADOOP_USER_NAME", "root");

        //设置job运行时的参数
        Configuration conf = new Configuration();

        //1. 设置job运行时需要访问的默认文件系统
        conf.set("fs.defaultFS", "hdfs://vm01:9000");
        //2. 设置job提交到哪里去
        conf.set("mapreduce.framework.name", "yarn");
        conf.set("yarn.resourcemanager.hostname", "vm01");
        //3. 如果是在windows上运行就设置为跨平台,如果是在Linux系统上就不需要。因为windows上路径问题,和jar包执行命令不同
        conf.set("mapreduce.app-submission.cross-platform","true");

        //设置job
        Job job = Job.getInstance(conf);
        //1. 封装参数:jar包所在位置,写死适用于在window的ide上执行,也需要事先打好jar包,自动获取使用与在Linux系统上运行
        job.setJar("H:\\BaiduNetdiskDownload\\bigDateTempJar\\wr.jar");
        //job.setJarByClass(JobSubmitter.class);

        //2. 封装参数:此次job所需调用的Mapper实现类,Reducer实现类
        job.setMapperClass(WorkcountMapper.class);
        job.setReducerClass(WordcountReduce.class);

        //3. 封装参数:此次job的Mapper实现类和,Reducer实现类产生的结果数据的key,value类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //此处是demo如果存在就删除
        Path outPutPath = new Path("/wordCount/outPut");
        FileSystem fileSystem = FileSystem.get(new URI("hdfs://vm01:9000"),conf,"root");
        if (fileSystem.exists(outPutPath)){
            fileSystem.delete(outPutPath, true);
        }


        //4. 封装参数:此次job需要处理的输入数据所在的路径,以及输出路径
        FileInputFormat.setInputPaths(job, new Path("/wordCount/resource"));
        FileOutputFormat.setOutputPath(job, outPutPath);

        //5. 封装参数:想要启动reduce task的数量
        job.setNumReduceTasks(2);

        //6. 提交job给yarn  此处为等待yarn执行完,并将执行过程输出到控制台,可以用job.submit();来提交
        boolean res = job.waitForCompletion(true);

        System.exit(res? 0:1);

    }
}

4.2 windows环境下模拟运行

public class JobSubWin {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);

        job.setJarByClass(JobSubWin.class);

        job.setMapperClass();
        job.setReducerClass();

        job.setMapOutputValueClass();
        job.setMapOutputKeyClass();

        job.setOutputValueClass();
        job.setOutputKeyClass();

        FileInputFormat.setInputPaths(job, new Path("H:\BaiduNetdiskDownload\bigDateTempJar\\input"));
        FileOutputFormat.setOutputPath(job, new Path("H:\BaiduNetdiskDownload\bigDateTempJar\\output"));

        job.setNumReduceTasks(3);

        boolean res = job.waitForCompletion(true);
        System.exit(res?0:1);
    }
}

4.3 将jar包导入到Linux服务器上运行

  1. 将jar包放置任何一台装了hadoop的Linux机器上
  2. 使用命令
hadoop jar xxx.jar xxx.xxx.xxx.JobSubLinux

core:

public class JobSubLinux {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);

        job.setJarByClass(JobSubWin.class);

        job.setMapperClass();
        job.setReducerClass();

        job.setMapOutputValueClass();
        job.setMapOutputKeyClass();

        job.setOutputValueClass();
        job.setOutputKeyClass();

        FileInputFormat.setInputPaths(job, new Path("/wordCount/input"));
        FileOutputFormat.setOutputPath(job, new Path("/wordCount/output"));

        job.setNumReduceTasks(3);

        boolean res = job.waitForCompletion(true);
        System.exit(res?0:1);
    }
}

5 实列demo

统计某个电话号码的全部上传流量,全部下载流量,以及全部上传和下载流量
如果需要使用自定义对象,那么该对象需要实现Writable接口,重写write和readFields方法 必须有无参构造因为需要反序列化
需要注意: write和readFields的顺序以及方法要一样
如下:

  1. bean类
package com.looc.流量数据分析demo;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * @author chenPeng
 * @version 1.0.0
 * @ClassName FlowBean.java
 * @Description TODO
 * @createTime 
 */
public class FlowBean implements Writable {
    private Integer upFlow;
    private Integer dFlow;
    private Integer amountFlow;
    private String phone;

    public FlowBean() {}
    
    public FlowBean(Integer upFlow, Integer dFlow, String phone) {
        this.upFlow = upFlow;
        this.dFlow = dFlow;
        this.amountFlow = upFlow + dFlow;
        this.phone = phone;
    }

    public String getPhone() {
        return phone;
    }

    public void setPhone(String phone) {
        this.phone = phone;
    }

    public Integer getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(Integer upFlow) {
        this.upFlow = upFlow;
    }

    public Integer getdFlow() {
        return dFlow;
    }

    public void setdFlow(Integer dFlow) {
        this.dFlow = dFlow;
    }

    public Integer getAmountFlow() {
        return amountFlow;
    }

    public void setAmountFlow(Integer amountFlow) {
        this.amountFlow = amountFlow;
    }

    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeInt(upFlow);
        dataOutput.writeInt(dFlow);
        dataOutput.writeInt(amountFlow);
        dataOutput.writeUTF(phone);
    }

    public void readFields(DataInput dataInput) throws IOException {
        upFlow = dataInput.readInt();
        dFlow = dataInput.readInt();
        amountFlow = dataInput.readInt();
        phone = dataInput.readUTF();
    }
}

  1. mapper类
package com.looc.流量数据分析demo;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author chenPeng
 * @version 1.0.0
 * @ClassName FlowCountMapper.java
 * @Description TODO
 * @createTime 
 */
public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();

        String[] fields = line.split("\t");

        String phone = fields[1];
        Integer upFlow = Integer.parseInt(fields[fields.length-3]);
        Integer dFlow = Integer.parseInt(fields[fields.length-2]);

        context.write(new Text(phone), new FlowBean(upFlow, dFlow, phone));
    }
}

  1. reduce类
package com.looc.流量数据分析demo;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Iterator;

/**
 * @author chenPeng
 * @version 1.0.0
 * @ClassName FlowCountReducer.java
 * @Description TODO
 * @createTime 
 */
public class FlowCountReducer extends Reducer<Text ,FlowBean,Text, FlowBean> {
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {

        int upSum = 0;
        int dSun = 0;

        for (FlowBean value : values) {
            upSum+=value.getUpFlow();
            dSun+=value.getdFlow();
        }

        context.write(key,new FlowBean(upSum, dSun, key.toString()));
    }
}

  1. 提交类
package com.looc.流量数据分析demo;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author chenPeng
 * @version 1.0.0
 * @ClassName FlowJobSub.java
 * @Description TODO
 * @createTime 
 */
public class FlowJobSub {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        job.setJarByClass(FlowJobSub.class);

        job.setMapperClass(FlowCountMapper.class);
        job.setReducerClass(FlowCountReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        FileInputFormat.setInputPaths(job, new Path("E:\\soft\\java\\ideaProject\\hadoop\\file\\数据流量分析demo\\input"));
        FileOutputFormat.setOutputPath(job, new Path("E:\\soft\\java\\ideaProject\\hadoop\\file\\数据流量分析demo\\output"));

        job.waitForCompletion(true);

    }
}

  1. 数据源
1363157985066   137****0503 00-FD-07-A4-72-B8:CMCC  120.196.100.82  i02.c.aliimg.com        24  27  2481    24681   200
1363157995052   138****4101 5C-0E-8B-C7-F1-E0:CMCC  120.197.40.4            4   0   264 0   200
1363157991076   139****5656 20-10-7A-28-CC-0A:CMCC  120.196.100.99          2   4   132 1512    200
1363154400022   139****1106 5C-0E-8B-8B-B1-50:CMCC  120.197.40.4            4   0   240 0   200
1363157993044   182****5961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99  iface.qiyi.com  视频网站    15  12  1527    2106    200
1363157995074   841****3    5C-0E-8B-8C-E8-20:7DaysInn  120.197.40.4    122.72.52.12        20  16  4116    1432    200
1363157993055   135****9658 C4-17-FE-BA-DE-D9:CMCC  120.196.100.99          18  15  1116    954 200
1363157995033   159****3257 5C-0E-8B-C7-BA-20:CMCC  120.197.40.4    sug.so.360.cn   信息安全    20  20  3156    2936    200
1363157983019   137****9419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82          4   0   240 0   200
1363157984041   136****7991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4    s19.cnzz.com    站点统计    24  9   6960    690 200
1363157973098   150****5858 5C-0E-8B-C7-F7-90:CMCC  120.197.40.4    rank.ie.sogou.com   搜索引擎    28  27  3659    3538    200
1363157986029   159****2119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99  www.umeng.com   站点统计    3   3   1938    180 200
1363157992093   135****9658 C4-17-FE-BA-DE-D9:CMCC  120.196.100.99          15  9   918 4938    200
1363157986041   134****3104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4            3   3   180 180 200
1363157984040   136****6565 5C-0E-8B-8B-B6-00:CMCC  120.197.40.4    2052.flash2-http.qq.com 综合门户    15  12  1938    2910    200
1363157995093   139****4466 00-FD-07-A2-EC-BA:CMCC  120.196.100.82  img.qfc.cn      12  12  3008    3720    200
1363157982040   135****8823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99  y0.ifengimg.com 综合门户    57  102 7335    110349  200
1363157986072   183****3382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99  input.shouji.sogou.com  搜索引擎    21  18  9531    2412    200
1363157990043   139****7413 00-1F-64-E1-E6-9A:CMCC  120.196.100.55  t3.baidu.com    搜索引擎    69  63  11058   48243   200
1363157988072   137****8710 00-FD-07-A4-7B-08:CMCC  120.196.100.82          2   2   120 120 200
1363157985066   137****8888 00-FD-07-A4-72-B8:CMCC  120.196.100.82  i02.c.aliimg.com        24  27  2481    24681   200
1363157993055   135****6666 C4-17-FE-BA-DE-D9:CMCC  120.196.100.99          18  15  1116    954 200
  1. 输出结果
134****3104 180 180 360
135****8823 7335 110349 117684
135****6666 1116 954 2070
135****9658 2034 5892 7926
136****6565 1938 2910 4848
136****7991 6960 690 7650
137****9419 240 0 240
137****0503 2481 24681 27162
137****8888 2481 24681 27162
137****8710 120 120 240
138****4101 264 0 264
139****4466 3008 3720 6728
139****7413 11058 48243 59301
139****1106 240 0 240
139****5656 132 1512 1644
150****5858 3659 3538 7197
159****3257 3156 2936 6092
159****2119 1938 180 2118
182****5961 1527 2106 3633
183****3382 9531 2412 11943
841****3    4116 1432 5548

相关文章

网友评论

      本文标题:05_hadoop_wordcounts_yarn的配置_mr的

      本文链接:https://www.haomeiwen.com/subject/aliusqtx.html