一、手机号分类
1)数据实体类
public class DataBean implements Writable {
// 手机号
private String telNo;
// 上行流量
private long upPayLoad;
// 下行流量
private long downPayLoad;
// 总流量
private long totalPayLoad;
// 序列化
// 注意:序列化和反序列化一定要注意类型和顺序,比如我们序列化的时候先序列化字符串telNo,
//反序列化的时候就应该先反序列化telNo
public void write(DataOutput out) throws IOException {
out.writeUTF(telNo);
out.writeLong(upPayLoad);
out.writeLong(downPayLoad);
out.writeLong(totalPayLoad);
}
// 反序列化
public void readFields(DataInput in) throws IOException {
this.telNo = in.readUTF();
this.upPayLoad = in.readLong();
this.downPayLoad = in.readLong();
this.totalPayLoad = in.readLong();
}
public DataBean() {
}
public DataBean(String telNo, long upPayLoad, long downPayLoad) {
super();
this.telNo = telNo;
this.upPayLoad = upPayLoad;
this.downPayLoad = downPayLoad;
this.totalPayLoad = upPayLoad + downPayLoad;
}
@Override
public String toString() {
return this.upPayLoad + "\t" + this.downPayLoad + "\t" + this.totalPayLoad;
}
public String getTelNo() {
return telNo;
}
public void setTelNo(String telNo) {
this.telNo = telNo;
}
public long getUpPayLoad() {
return upPayLoad;
}
public void setUpPayLoad(long upPayLoad) {
this.upPayLoad = upPayLoad;
}
public long getDownPayLoad() {
return downPayLoad;
}
public void setDownPayLoad(long downPayLoad) {
this.downPayLoad = downPayLoad;
}
public long getTotalPayLoad() {
return totalPayLoad;
}
public void setTotalPayLoad(long totalPayLoad) {
this.totalPayLoad = totalPayLoad;
}
}
2)分区类
public class ProviderPartitioner extends Partitioner<Text, DataBean> {
/**
* numPartitions---分区的值是由Reducer的数量决定的,
* 起几个Reducer就创建几个分区
*/
public static Map<String, Integer> providerMap = new HashMap<String, Integer>();
static{
providerMap.put("135", 1);
providerMap.put("136", 1);
providerMap.put("137", 1);
providerMap.put("138", 1);
providerMap.put("139", 1);
providerMap.put("150", 2);
providerMap.put("159", 2);
providerMap.put("182", 3);
providerMap.put("183", 3);
}
@Override
public int getPartition(Text key, DataBean value, int numPartitions) {
// key是电话号码
String telNo = key.toString();
// 我们截取前3位,比如135、136、150、182等等,通过前三位就可以知道是移动、联通还是电信或是其它
String sub_tel = telNo.substring(0, 3);
Integer num = providerMap.get(sub_tel);
if (num == null) {
return 0;
}
return num;
}
}
3)Map类
public class MapperClass extends Mapper<LongWritable, Text, Text, DataBean> {
Text text = null;
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split("\t");
// 我们要使用的数据的第二列(列索引号为1)就是手机号,第9列(列索引号8)是上行流量,
//第10列(列索引号9)是下行流量
String telNo = fields[1];
long up = Long.parseLong(fields[8]);
long down = Long.parseLong(fields[9]);
DataBean bean = new DataBean(telNo, up, down);
text = new Text(telNo);
context.write(text, bean);
}
4)Reduce类
public class ReduceClass extends Reducer<Text, DataBean, Text, DataBean> {
@Override
protected void reduce(Text key, Iterable<DataBean> v2s, Context context) throws IOException, InterruptedException {
long up_sum = 0;
long down_sum = 0;
for (DataBean bean : v2s) {
// 累计上行流量
up_sum += bean.getUpPayLoad();
// 累计下行流量
down_sum += bean.getDownPayLoad();
}
//创建Bean对象
DataBean bean = new DataBean(key.toString(), up_sum, down_sum);
context.write(key, bean);
}
}
5)测试类
public class DataCount {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 通过传入的class 找到job的jar包
job.setJarByClass(DataCount.class);
job.setMapperClass(MapperClass.class);
// 键值类
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DataBean.class);
// 输入路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 告诉Job我们自定义了分区功能
job.setPartitionerClass(ProviderPartitioner.class);
job.setReducerClass(ReduceClass.class);
// 键值类
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DataBean.class);
// 输出路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//设置Reducer的数量,默认情况下只启动一个Reducer,一个Reducer对应一个文件,
//我们现在想要得到4个文件,自然而然我们得启动多个Reducer,为了程序的灵活性我
//们通过参数的形式给它赋值。
job.setNumReduceTasks(Integer.parseInt(args[2]));
//
job.waitForCompletion(true);
}
}
二、年最高气温
(一)年最高气温
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class TemperatureTest {
public static class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
// 注意:空格的数量
String[] data = line.split("\\s+");
int airTemperature=Integer.valueOf(data[4]);
context.write(new Text(data[0]), new IntWritable(airTemperature));
}
}
public static class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
conf.set("mapred.job.tracker", "192.168.71.129:9000");
Job job = Job.getInstance(conf);
// 重要:指定本job所在的jar包
job.setJarByClass(TemperatureTest.class);
// 设置wordCountJob所用的mapper逻辑类为哪个类
job.setMapperClass(MaxTemperatureMapper.class);
// 设置map阶段输出的kv数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置wordCountJob所用的reducer逻辑类为哪个类
job.setReducerClass(MaxTemperatureReducer.class);
// 设置最终输出的kv数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置要处理的文本数据所存放的路径
FileInputFormat.setInputPaths(job, "hdfs://192.168.71.129:9000/013470-99999-2016");
FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.71.129:9000/Temperature_out"));
// 提交job给hadoop集群
job.waitForCompletion(true);
}
}
(二)年最高气温分组排序
1)键值对类
public class KeyPair implements WritableComparable<KeyPair> {
private int year; //年
private int hot; //气温
public int getYear() {
return year;
}
public void setYear(int year) {
this.year = year;
}
public int getHot() {
return hot;
}
public void setHot(int hot) {
this.hot = hot;
}
@Override
public void readFields(DataInput in) throws IOException {
this.year=in.readInt();
this.hot=in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(year);
out.writeInt(hot);
}
@Override
public int compareTo(KeyPair o) {
// System.out.println(Integer.compare(1, 1));//0
// System.out.println(Integer.compare(1, 2));//-1
//先对比年,如果相等,结果为0,返回
int result=Integer.compare(year, o.getYear());
if (result!=0){
return result;
}
return Integer.compare(hot, o.hot);
}
@Override
public String toString() {
return year+"\t"+hot;
}
@Override
public int hashCode() {
return new Integer(year+hot).hashCode();
}
}
2)排序类
public class HotSort extends WritableComparator{
public HotSort() {
super(KeyPair.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
KeyPair o1=(KeyPair) a;
KeyPair o2=(KeyPair) b;
int res=Integer.compare(o1.getYear(), o2.getYear());
if (res!=0){
return res;
}
return -Integer.compare(o1.getHot(),o2.getHot());//降序排序
}
}
3)分区类
public class HotPartition extends Partitioner<KeyPair, Text>{
@Override
public int getPartition(KeyPair key, Text value, int num) {
return (key.getYear()*127%num);
}
}
4)分组类
public class HotGroup extends WritableComparator{
public HotGroup() {
super(KeyPair.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
KeyPair o1=(KeyPair) a;
KeyPair o2=(KeyPair) b;
return Integer.compare(o1.getYear(),o2.getYear());
}
}
5)Mapper类
public class HotMapper extends Mapper<LongWritable, Text, KeyPair, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] ss = line.split("\t");
if (ss.length == 2) {
int year = Integer.parseInt(ss[0].substring(0, 4));
int hot = Integer.parseInt(ss[1].substring(0, ss[1].indexOf("°C")));
KeyPair kp = new KeyPair();
kp.setYear(year);
kp.setHot(hot);
context.write(kp, value);
}
}
}
6)HotReduce类
public class HotReduce extends Reducer<KeyPair, Text, KeyPair, Text>{
@Override
protected void reduce(KeyPair kp, Iterable<Text> i,Context context)
throws IOException, InterruptedException {
for (Text text : i) {
context.write(kp, text);
}
}
}
7)测试类
public class RunTempJob {
public static void main(String args[]) throws IOException, InterruptedException{
//获取配置
Configuration conf=new Configuration();
//修改命令行的配置
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("使用: temp <in> <out>");
System.exit(2);
}
//创建Job
Job job=Job.getInstance(conf,"temp");
//1.设置job运行的类
job.setJarByClass(RunTempJob.class);
//2.设置map和reduce的类
job.setMapperClass(HotMapper.class);
job.setReducerClass(HotReduce.class);
//3.设置map的输出的key和value 的类型
job.setMapOutputKeyClass(KeyPair.class);
job.setMapOutputValueClass(Text.class);
//4.设置输入文件的目录和输出文件的目录
FileInputFormat.addInputPath(job,new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));
//5.设置Reduce task的数量 每个年份对应一个reduce task
job.setNumReduceTasks(3);//3个年份
//5.设置partition sort Group的class
job.setPartitionerClass(HotPartition.class);
job.setSortComparatorClass(HotSort.class);
job.setGroupingComparatorClass(HotGroup.class);
//6.提交job 等待运行结束并在客户端显示运行信息
boolean isSuccess= false;
try {
isSuccess = job.waitForCompletion(true);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
//7.结束程序
System.exit(isSuccess ?0:1);
}
}
三、计算奇数行与偶数行数据之和
1.代码一
1)Partitioner类
public class MyPartitioner extends Partitioner<LongWritable, IntWritable> {
@Override
public int getPartition(LongWritable key, IntWritable value, int arg2) {
/**
* 根据行号进行分区,把行号为的偶数的分区到0号reduce
* 把行号为奇数的分区到1号reduce,并把key的值设置为0或1
* 目的是为了在进入reduce时奇数和偶数能被分别放到同一个
* 迭代器中以便求和操作
*/
if( key.get() % 2 == 0) {
key.set(0);
return 0;
} else {
key.set(1);
return 1;
}
}
}
2)Map类
public class MyMapper extends Mapper<LongWritable, Text, LongWritable, IntWritable> {
private long lineNum = 0;
private LongWritable okey = new LongWritable();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
lineNum ++;
okey.set(lineNum);
/**
* 输出行号作为key,并把行的值作为value,这里只是简单的说明的patitioner的定制
* 不考虑多mapper情况下行号控制,这里只关注partitioner的使用就行
*/
context.write(okey, new IntWritable(Integer.parseInt(value.toString())));
}
}
3)Reduce类
public class MyReducer extends Reducer<LongWritable, IntWritable, Text, IntWritable> {
@Override
protected void reduce(LongWritable key, Iterable<IntWritable> value, Context context) throws IOException, InterruptedException {
int sum = 0;
for( IntWritable val : value) {
sum += val.get();
}
if( key.get() == 0 ) {
context.write(new Text("偶数行之和为:"), new IntWritable(sum));
} else if ( key.get() == 1) {
context.write(new Text("奇数行之和为:"), new IntWritable(sum));
}
}
}
4)测试类
public class JobMain {
static Path inPath=new Path("hdfs://192.168.146.136:9000/input");
static Path outPath=new Path("hdfs://192.168.146.136:9000/output");
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration, "partitioner-job");
job.setJarByClass(JobMain.class);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置自定义的Partitioner对map输出进行分区
job.setPartitionerClass(MyPartitioner.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置job的reducer的个数为2
job.setNumReduceTasks(2);
FileInputFormat.addInputPath(job, inPath);
FileSystem fs = FileSystem.get(configuration);
//如果路径存在则删除
// if (fs.exists(outPath)) {
// fs.delete(outPath, true);
// }
FileOutputFormat.setOutputPath(job, outPath);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
- 代码二
1)Recordreader类
public class MyRecordReader extends RecordReader<LongWritable, Text> {
private long start; //开始
private long end; //结束
private long pos;//表示行号
private FSDataInputStream fin = null ;
private LongWritable key = null ;
private Text value = null ;
private LineReader reader = null ;
@Override
public void close() throws IOException {
fin.close() ;
}
@Override
public LongWritable getCurrentKey() throws IOException,
InterruptedException {
return key;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return 0;
}
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext context)
throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit)inputSplit;
//获取文件开始
start = fileSplit.getStart();
//文件结束
end = start + fileSplit.getLength() ;
//获取配置
Configuration conf = context.getConfiguration() ;
//得到文件路径
Path path = fileSplit.getPath() ;
FileSystem fs = path.getFileSystem(conf) ;
fin = fs.open(path) ;
//定位到起始位置
fin.seek(start);
//读取一行数据
reader = new LineReader(fin);
pos = 1 ;
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if(key == null){
key = new LongWritable() ;
}
//设定键的位置
key.set(pos) ;
if(value == null){
value = new Text() ;
}
if(reader.readLine(value) ==0){
return false ;
}
//位置偏移
pos++ ;
return true ;
}
}
2)MyFileInputFormat类
public class MyFileInputFormat extends FileInputFormat<LongWritable, Text> {
/**
* 重写FileInputFormat创建记录读取方法
* 读取数据
*/
@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit arg0,
TaskAttemptContext arg1) throws IOException, InterruptedException {
return new MyRecordReader();
}
/**
* 自定义分隔
*/
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false ;
}
}
3)分区类
public class MyPartitioner extends Partitioner<LongWritable, Text> {
@Override
public int getPartition(LongWritable key, Text value, int numPartitions) {
if(key.get() % 2 == 0){
key.set(1) ;
return 1 ;
}else{
key.set(0) ;
return 0 ;
}
}
}
4)Mapper类
public class MapperClass extends Mapper<LongWritable, Text, LongWritable, Text> {
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
context.write(key, value) ;
}
}
5)Reducer类
public class ReducerClass extends Reducer<LongWritable, Text, Text, IntWritable> {
@Override
protected void reduce(LongWritable key, Iterable<Text> value,Context context)
throws IOException, InterruptedException {
int sum = 0 ;
for(Text val : value){
sum += Integer.valueOf(val.toString()) ;
}
Text writeKey = new Text() ;
IntWritable writeValue = new IntWritable() ;
if(key.get() == 0){
writeKey.set("奇数行之和:") ;
}else{
writeKey.set("偶数行之和:") ;
}
writeValue.set(sum) ;
context.write(writeKey, writeValue) ;
}
}
四、常见错误:
如果出现数据格式不正确的话,请使用ANSI格式保存数据,而不用使用UTF-8。
网友评论