1.导入jar包
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.0</version>
</dependency>
</dependencies>
2. 继承Mapper
org.apache.hadoop.mapreduce.Mapper 下的
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
- KEYIN
是map task 读取到的数据的key的类型,是一行的启始偏移量Long - VALUEIN
是map task读取到的数据的value的类型,是一行的内容Stirng - KEYOUT
是用户的自定义map方法要返回的数据kv数据的key的类型 - VALUEOUT
用户的自定map方法要返回的结果kv数据的value的数据类型
在mapreduce中,map参数的数据传输给reduce,需要进行序列化和反序列化,而JDK中的原生序列化机制产生的数据比较冗余,就会导致MAPREDUCE运行过程中传输效率低
所以 hadoop专门设计了自己的序列化机制
hadoop为常用数据类型封装了自己的实现了hadoop序列化
java基本类型 | Long | String | Integer | Float |
---|---|---|---|---|
hadoop基本类型 | LongWritable | Text | IntWritable | FloatWritable |
core:
public class WorkcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
for (String word : words) {
context.write(new Text(word), new IntWritable(1));
}
}
}
2 另外的一个class 继承 Reducer
Reducer<Text, IntWritable, Text, IntWritable>
core:
public class WordcountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sun = 0;
Iterator<IntWritable> iterator = values.iterator();
while (iterator.hasNext()){
IntWritable va = iterator.next();
sun+=va.get();
}
context.write(key, new IntWritable(sun));
}
}
3 配置yarn
在每台机器上配置
node manager在物理上应该跟data node部署在一起
resource manager在物理上应该独立部署在一台专门的机器上
此处为demo 所有 namenode 与resourcemanager 放在同一台服务器
- 修改每台机器配置文件:
vi yarn-site.xml
<property>
<name>yarn.resourcemanager.hostname</name>
<value>vm01</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>2048</value>
</property>
<property>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>2</value>
</property>
其中vm01为 resource manager
mapreduce_shuffle为固定写法
1024 分配的内存 最小2048MB 因为yarn.app.mapreduce.am.resource.mb 需要有1536MB 如果设置小雨1536 运行yarn程序时会报错。当然不是运行就占用这么大的内存,而是最大可以用这么多。
2 为cpu个数 最小1个
- 启动yarn集群:start-yarn.sh (注:该命令应该在resource manager所在的机器上执行,否则resource manager会在执行这个命令的机器上启动)
- 用jps检查yarn的进程,用web浏览器查看yarn的web控制台
http://vm01:8088
4 运行
4.1 写windows提到交到Linux中yarn的程序
package com.looc.main;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
/**
* @author chenPeng
* @version 1.0.0
* @ClassName JobSubmitter.java
* @Description TODO
* @createTime 2019年02月01日 16:55:00
*/
public class JobSubmitter {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
//在代码中设置 获取HDFS以及yarn的用户身份
System.setProperty("HADOOP_USER_NAME", "root");
//设置job运行时的参数
Configuration conf = new Configuration();
//1. 设置job运行时需要访问的默认文件系统
conf.set("fs.defaultFS", "hdfs://vm01:9000");
//2. 设置job提交到哪里去
conf.set("mapreduce.framework.name", "yarn");
conf.set("yarn.resourcemanager.hostname", "vm01");
//3. 如果是在windows上运行就设置为跨平台,如果是在Linux系统上就不需要。因为windows上路径问题,和jar包执行命令不同
conf.set("mapreduce.app-submission.cross-platform","true");
//设置job
Job job = Job.getInstance(conf);
//1. 封装参数:jar包所在位置,写死适用于在window的ide上执行,也需要事先打好jar包,自动获取使用与在Linux系统上运行
job.setJar("H:\\BaiduNetdiskDownload\\bigDateTempJar\\wr.jar");
//job.setJarByClass(JobSubmitter.class);
//2. 封装参数:此次job所需调用的Mapper实现类,Reducer实现类
job.setMapperClass(WorkcountMapper.class);
job.setReducerClass(WordcountReduce.class);
//3. 封装参数:此次job的Mapper实现类和,Reducer实现类产生的结果数据的key,value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//此处是demo如果存在就删除
Path outPutPath = new Path("/wordCount/outPut");
FileSystem fileSystem = FileSystem.get(new URI("hdfs://vm01:9000"),conf,"root");
if (fileSystem.exists(outPutPath)){
fileSystem.delete(outPutPath, true);
}
//4. 封装参数:此次job需要处理的输入数据所在的路径,以及输出路径
FileInputFormat.setInputPaths(job, new Path("/wordCount/resource"));
FileOutputFormat.setOutputPath(job, outPutPath);
//5. 封装参数:想要启动reduce task的数量
job.setNumReduceTasks(2);
//6. 提交job给yarn 此处为等待yarn执行完,并将执行过程输出到控制台,可以用job.submit();来提交
boolean res = job.waitForCompletion(true);
System.exit(res? 0:1);
}
}
4.2 windows环境下模拟运行
public class JobSubWin {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(JobSubWin.class);
job.setMapperClass();
job.setReducerClass();
job.setMapOutputValueClass();
job.setMapOutputKeyClass();
job.setOutputValueClass();
job.setOutputKeyClass();
FileInputFormat.setInputPaths(job, new Path("H:\BaiduNetdiskDownload\bigDateTempJar\\input"));
FileOutputFormat.setOutputPath(job, new Path("H:\BaiduNetdiskDownload\bigDateTempJar\\output"));
job.setNumReduceTasks(3);
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}
4.3 将jar包导入到Linux服务器上运行
- 将jar包放置任何一台装了hadoop的Linux机器上
- 使用命令
hadoop jar xxx.jar xxx.xxx.xxx.JobSubLinux
core:
public class JobSubLinux {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(JobSubWin.class);
job.setMapperClass();
job.setReducerClass();
job.setMapOutputValueClass();
job.setMapOutputKeyClass();
job.setOutputValueClass();
job.setOutputKeyClass();
FileInputFormat.setInputPaths(job, new Path("/wordCount/input"));
FileOutputFormat.setOutputPath(job, new Path("/wordCount/output"));
job.setNumReduceTasks(3);
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}
5 实列demo
统计某个电话号码的全部上传流量,全部下载流量,以及全部上传和下载流量
如果需要使用自定义对象,那么该对象需要实现Writable接口,重写write和readFields方法 必须有无参构造因为需要反序列化
需要注意: write和readFields的顺序以及方法要一样
如下:
- bean类
package com.looc.流量数据分析demo;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* @author chenPeng
* @version 1.0.0
* @ClassName FlowBean.java
* @Description TODO
* @createTime
*/
public class FlowBean implements Writable {
private Integer upFlow;
private Integer dFlow;
private Integer amountFlow;
private String phone;
public FlowBean() {}
public FlowBean(Integer upFlow, Integer dFlow, String phone) {
this.upFlow = upFlow;
this.dFlow = dFlow;
this.amountFlow = upFlow + dFlow;
this.phone = phone;
}
public String getPhone() {
return phone;
}
public void setPhone(String phone) {
this.phone = phone;
}
public Integer getUpFlow() {
return upFlow;
}
public void setUpFlow(Integer upFlow) {
this.upFlow = upFlow;
}
public Integer getdFlow() {
return dFlow;
}
public void setdFlow(Integer dFlow) {
this.dFlow = dFlow;
}
public Integer getAmountFlow() {
return amountFlow;
}
public void setAmountFlow(Integer amountFlow) {
this.amountFlow = amountFlow;
}
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(upFlow);
dataOutput.writeInt(dFlow);
dataOutput.writeInt(amountFlow);
dataOutput.writeUTF(phone);
}
public void readFields(DataInput dataInput) throws IOException {
upFlow = dataInput.readInt();
dFlow = dataInput.readInt();
amountFlow = dataInput.readInt();
phone = dataInput.readUTF();
}
}
- mapper类
package com.looc.流量数据分析demo;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @author chenPeng
* @version 1.0.0
* @ClassName FlowCountMapper.java
* @Description TODO
* @createTime
*/
public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split("\t");
String phone = fields[1];
Integer upFlow = Integer.parseInt(fields[fields.length-3]);
Integer dFlow = Integer.parseInt(fields[fields.length-2]);
context.write(new Text(phone), new FlowBean(upFlow, dFlow, phone));
}
}
- reduce类
package com.looc.流量数据分析demo;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Iterator;
/**
* @author chenPeng
* @version 1.0.0
* @ClassName FlowCountReducer.java
* @Description TODO
* @createTime
*/
public class FlowCountReducer extends Reducer<Text ,FlowBean,Text, FlowBean> {
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
int upSum = 0;
int dSun = 0;
for (FlowBean value : values) {
upSum+=value.getUpFlow();
dSun+=value.getdFlow();
}
context.write(key,new FlowBean(upSum, dSun, key.toString()));
}
}
- 提交类
package com.looc.流量数据分析demo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* @author chenPeng
* @version 1.0.0
* @ClassName FlowJobSub.java
* @Description TODO
* @createTime
*/
public class FlowJobSub {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(FlowJobSub.class);
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
FileInputFormat.setInputPaths(job, new Path("E:\\soft\\java\\ideaProject\\hadoop\\file\\数据流量分析demo\\input"));
FileOutputFormat.setOutputPath(job, new Path("E:\\soft\\java\\ideaProject\\hadoop\\file\\数据流量分析demo\\output"));
job.waitForCompletion(true);
}
}
- 数据源
1363157985066 137****0503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200
1363157995052 138****4101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200
1363157991076 139****5656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200
1363154400022 139****1106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200
1363157993044 182****5961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 视频网站 15 12 1527 2106 200
1363157995074 841****3 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200
1363157993055 135****9658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
1363157995033 159****3257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200
1363157983019 137****9419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200
1363157984041 136****7991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站点统计 24 9 6960 690 200
1363157973098 150****5858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200
1363157986029 159****2119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站点统计 3 3 1938 180 200
1363157992093 135****9658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200
1363157986041 134****3104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200
1363157984040 136****6565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 综合门户 15 12 1938 2910 200
1363157995093 139****4466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200
1363157982040 135****8823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 综合门户 57 102 7335 110349 200
1363157986072 183****3382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200
1363157990043 139****7413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200
1363157988072 137****8710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200
1363157985066 137****8888 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200
1363157993055 135****6666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
- 输出结果
134****3104 180 180 360
135****8823 7335 110349 117684
135****6666 1116 954 2070
135****9658 2034 5892 7926
136****6565 1938 2910 4848
136****7991 6960 690 7650
137****9419 240 0 240
137****0503 2481 24681 27162
137****8888 2481 24681 27162
137****8710 120 120 240
138****4101 264 0 264
139****4466 3008 3720 6728
139****7413 11058 48243 59301
139****1106 240 0 240
139****5656 132 1512 1644
150****5858 3659 3538 7197
159****3257 3156 2936 6092
159****2119 1938 180 2118
182****5961 1527 2106 3633
183****3382 9531 2412 11943
841****3 4116 1432 5548
网友评论