题目:一个文件当中,每行为一个项集集合,对每行的记录元素进行两两组合,找出所有记录中组合次数出现最多的top3的两两组合和他们出现的次数。
文件数据如下:
53
36 81 65 85 11
65 55 76 92 72
21 68 48 91 81
29 81 36 5 86
41 17 0 59 26
18 30 11 94 16
96 75 27 0 86
0 48 74 86 82
82 24 57 97 49
30 70 89 75 40
7 83 59 38 45
7 60 32 68 53
45 3 59 15 1
61 42 84 88 53
69 12 64 10 78
45 66 26 56 10
85 38 58 82 70
21 15 92 99 74
56 99 89 80 29
41 25 82 81 33
30 48 40 57 17
33 63 86 83 49
30 87 24 83 79
1 77 41 80 19
71 0 55 84 43
4 61 54 47 87
52 94 67 62 59
98 85 10 61 1
83 17 50 57 55
34 10 19 85 62
98 30 33 93 96
90 15 73 69 9
63 54 15 25 27
63 62 2 49 73
55 26 44 13 31
。。。。。。还有好多数据,没写完
思路:第一步,对其两两组合,如36-81 36-65 36-85 36-11。。。。。
第二步,出现一组组合,记一个1
第三步,对其个数求和,
第四步,哎,直接上代码吧,代码里有详细介绍
代码:
第一个mapreduce
package cn.analysys.test;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 需求:一个文件当中,每行为一个项集集合,对每行的记录元素进行两两组 合,
* 找出所有记录中组合次数出现最多的top3的两两组合和他们出现的次数。
* 36 81 65 85 11
* 65 55 76 92 72
* 21 68 48 91 81
* 29 81 36 5 86
* 41 17 0 59 26
* 18 30 11 94 16
* 96 75 27 0 86
* 0 48 74 86 82
* 82 24 57 97 49
* 30 70 89 75 40
* 7 83 59 38 45数据太多,没写完
* @author XiangBoyu
*
*/
public class MainTestTwoStep1 {
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
if(args.length < 2){
System.out.println("args must be two");return ;
}
try {
Configuration configuration = new Configuration();
//构建job对象
Job job = Job.getInstance(configuration);
//注意:main方法所在的类
job.setJarByClass(MainTestTwoStep1.class);
//设置Mapper相关属性
job.setMapperClass(MainTestTwoMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
//设置Reducer相关属性
job.setReducerClass(MainTestTwoReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//提交任务
System.exit(job.waitForCompletion(true)?0:1);
} catch (Exception e) {
// TODO: handle exception
}
}
/**
* 实现map方法
* @author XiangBoyu
*
*/
public static class MainTestTwoMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
/**
* 每次调用map方法会传入split中一行数据;
* key:该行数据所在文件中的位置下标
* value:该行数据
*/
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//获取一行数据,转换成字符串
//36 81 65 85 11
//line = "36 81 65 85 11"
String line = value.toString();
//安照空格对字符串进行切分成字符数组
String[] friend_persons = line.split(" ");
//按照自然数进行排序,避免出现组合11-22 和 22-11,这种情况只能算11-22一种组合
//出现两次,而不能算两种组合各出现一次
Arrays.sort(friend_persons);
//经过sort排序后,原本的顺序36 81 65 85 11
//变成11 36 65 81 85
//因为某一行数据可能出现 1 1 1 1 2
//那么这一行的数据两两组合只有1-1 和1-2,
//而不是组合 1-1 1-1 1-1 1-2 1-1 1-1 1-2 1-1 1-2 1-2
//所以要对每行数据去重
//set是一个不包含重复元素的集合,确切地说,是不包含e1.equals(e2)的元素对。
//Set中允许添加null。Set不能保证集合里元素的顺序。
HashSet<String> hashSet = new HashSet<String>();
for(int i=0; i<friend_persons.length; i++) {
// friend_persons=1 1 2 3 4
hashSet.add(friend_persons[i]);
// hashSet = 1 2 3 4
}
//转换成数组
Object[] word = hashSet.toArray();
//数据两两组合
for (int i = 0; i < word.length - 1; i++) {
for (int j = i + 1; j < word.length; j++) {
context.write(new Text(word[i]+ "-" + word[j]), new IntWritable(1));
}
}
}
}
/**
* 实现Reduce方法
* @author XiangBoyu
*
*/
public static class MainTestTwoReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> value,Context context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
// 统计组合word-word出现的次数
int count = 0;
for (IntWritable val : value) {
count += val.get();
}
// 以 <word-word,次数>形式写出数据,传给下一个mr进行排序
context.write(key, new IntWritable(count));
}
}
}
第二个mapreduce
package cn.analysys.test;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 第一个mapreduce输出结果
* 0-1 379
* 0-10 395
* 0-11 374
* 0-12 418
* 0-13 357
* 0-14 365
* 0-15 376
* 0-16 388
* 0-17 356
* 0-18 401
* 0-19 384
* 0-2 376
* 0-20 384
* 0-21 385
* 还有好多数据,没写完
* @author XiangBoyu
*
*/
public class MainTestTwoStep2 {
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(MainTestTwoStep2.class);
job.setMapperClass(MainTestTwoStep2Mapper.class);
job.setMapOutputKeyClass(TextIntWritable.class);
job.setMapOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
//job.setReducerClass(IdenticalFriendsStepTwoReducer.class);
job.setReducerClass(MainTestTwoStep2Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
/**
* 实现map方法
* @author XiangBoyu
*
*/
public static class MainTestTwoStep2Mapper extends Mapper<LongWritable, Text, TextIntWritable, NullWritable>{
TextIntWritable k = new TextIntWritable();
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 接收上一个mr结果 <word-word,次数>,进行kv对调操作
// 原因:map默认以key进行自然数集排序
String[] line = value.toString().split("\t");
String word = line[0];
String wordcount = line[1];
int i = Integer.parseInt(wordcount);
//将数据设入bean中,对其进行排序操作
k.set(new Text(word), new IntWritable(i));
context.write(k, NullWritable.get());
}
}
/**
* 实现Reduce方法
* @author XiangBoyu
*
*/
public static class MainTestTwoStep2Reducer extends Reducer<TextIntWritable, NullWritable, TextIntWritable, NullWritable>{
//IntWritable类是一个为整数可以进行写、可以进行比较而定义的,比如统计单词出现频率就是一个整数。
private IntWritable i = new IntWritable(1);
@Override
protected void reduce(TextIntWritable key, Iterable<NullWritable> value,Context context)
throws IOException, InterruptedException {
for(NullWritable v : value)
{
//输出top3组合和他们出现的次数
if(i.get() <= 3) {
context.write(key, v);
i = new IntWritable(i.get() + 1);
}
}
}
}
}
TextIntWritable类的代码
package cn.analysys.test;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
public class TextIntWritable implements WritableComparable<TextIntWritable>{
Text word; //单词
IntWritable count; //次数
public TextIntWritable(){
set(new Text(), new IntWritable());
}
public void set(Text word, IntWritable count){
this.word = word;
this.count = count;
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
word.readFields(in);
count.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
word.write(out);
count.write(out);
}
@Override
public String toString(){
return word.toString() + " " + count.toString();
}
@Override
public int hashCode(){
return this.word.hashCode() + this.count.hashCode();
}
@Override
public int compareTo(TextIntWritable o) {
int result = -1 * this.count.compareTo(o.count); //先比较次数
if(result != 0)
return result;
return this.word .compareTo(o.word); //次数相同,则按字典排序
}
}
至此,完成了需求
网友评论