一、单词筛选Top K
public class TopK {
public static final int K = 2;
public static class KMap extends Mapper<LongWritable, Text, IntWritable, Text> {
// 定义一个map,用于排序
TreeMap<Integer, String> map = new TreeMap<>();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 读取值
String line = value.toString();
System.out.println(line+"**************************");
//进行字符分隔
if (line.trim().length() > 0 && line.indexOf("\t") != -1) {
String[] arr = line.split("\t", 2);
String name = arr[0];
Integer num = Integer.parseInt(arr[1]);
System.out.println(num+"-------"+name);
map.put(num, name);
// 如果map中的数量大于 K,则移除第一个,也就是保留最大的两个,因为已经排序
if (map.size() > K) {
map.remove(map.firstKey());
}
}
}
@Override
protected void cleanup(Mapper<LongWritable, Text, IntWritable, Text>.Context context)
throws IOException, InterruptedException {
// 将map中的数据输出,给reduce
for (Integer num : map.keySet()) {
context.write(new IntWritable(num), new Text(map.get(num)));
}
}
}
public static class KReduce extends Reducer<IntWritable, Text, IntWritable, Text> {
//定义map,进行排序
TreeMap<Integer, String> map = new TreeMap<Integer, String>();
public void reduce(IntWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
map.put(key.get(), values.iterator().next().toString());
//如果超出移除 第一个值
if (map.size() > K) {
map.remove(map.firstKey());
}
}
@Override
protected void cleanup(Reducer<IntWritable, Text, IntWritable, Text>.Context context)
throws IOException, InterruptedException {
//输出
for (Integer num : map.keySet()) {
context.write(new IntWritable(num), new Text(map.get(num)));
}
}
}
public static void main(String[] args) {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
try {
Job job = Job.getInstance(conf, "top-K统计");
job.setJarByClass(TopK.class);
job.setMapperClass(KMap.class);
job.setCombinerClass(KReduce.class);
job.setReducerClass(KReduce.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path("hdfs://hadoopslave1:9000/input"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoopslave1:9000/output"));
System.out.println(job.waitForCompletion(true));
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
二、筛选出文章中出现频率最高的10个词语
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
public class WordCountK {
/**
*
* @类名: TokenizerMapper
* @描述: 单词计数Mapper
*
*/
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
// 单词
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString().toLowerCase(); // 全部转换为小写字母
StringTokenizer itr = new StringTokenizer(line, " \t\n\f\" . , : ; ? ! [ ] ' - ) (");
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
/**
*
* @类名: TokenizerMapper2
* @描述: top-k maper
*
*/
public static class TokenizerMapper2 extends Mapper<Object, Text, IntWritable, Text> {
int c = 0;
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
IntWritable a = new IntWritable(Integer.parseInt(itr.nextToken()));
Text b = new Text(itr.nextToken());
if (c < 10) {
System.out.println("sss");
context.write(a, b);
c++;
}
}
}
/**
*
* @类名: IntSumReducer
* @描述: Reducer
*
*/
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
/**
*
* @类名: IntWritableDecreasingComparator
* @描述: 比较器 默认实现是先反序列化成对象,再对对象进行比较
*/
private static class IntWritableDecreasingComparator extends IntWritable.Comparator {
public int compare(WritableComparable a, WritableComparable b) {
// 调用父类的
return -super.compare(a, b);
}
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
// 调用父类的
return -super.compare(b1, s1, l1, b2, s2, l2);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Word-K统计");
job.setJarByClass(WordCountK.class);
Path tempath = new Path("hdfs://hadoopslave1:9000/output");
//输入输出
FileInputFormat.addInputPath(job, new Path("hdfs://hadoopslave1:9000/input"));
FileOutputFormat.setOutputPath(job, tempath);
//进行统计
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(IntSumReducer.class);
//设置输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//使用二进制处理类型不匹配问题
//设置输出文件类型为二进制类型
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.waitForCompletion(false);
//第二个任务
Job jobsort = Job.getInstance(conf, "Word-K排序");
//输入文件路径
FileInputFormat.addInputPath(jobsort, tempath);
jobsort.setOutputKeyClass(IntWritable.class);
jobsort.setOutputValueClass(Text.class);
jobsort.setInputFormatClass(SequenceFileInputFormat.class);
//键值反转
jobsort.setMapperClass(InverseMapper.class);
jobsort.setNumReduceTasks(1);
Path result = new Path("hdfs://hadoopslave1:9000/result");
FileOutputFormat.setOutputPath(jobsort, result);
//进行排序
jobsort.setSortComparatorClass(IntWritableDecreasingComparator.class);
jobsort.waitForCompletion(false);
}
}
三、单表关联
public class SingleJoin {
// Map类
public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
private static Text child = new Text();
private static Text parent = new Text();
private static Text tempChild = new Text();
private static Text tempParent = new Text();
protected void map(LongWritable key, Text value, Context context)
throws java.io.IOException, InterruptedException {
//使用空白字符作为分隔 包括空格、制表符、换页符等等。等价于 [ \f\n\r\t\v]
String[] splits = value.toString().split("\\s+");
if (splits.length != 2) {
return;
}
child.set(splits[0]);
parent.set(splits[1]);
//值添加前缀
tempChild.set("1" + splits[0]);
tempParent.set("2" + splits[1]);
context.write(parent, tempChild);
context.write(child, tempParent);
};
}
// Reduce类
public static class ReduceClass extends Reducer<Text, Text, Text, Text> {
private static Text child = new Text();
private static Text grand = new Text();
private static List<String> childs = new ArrayList<String>();
private static List<String> grands = new ArrayList<String>();
protected void reduce(Text key, Iterable<Text> values, Context context)
throws java.io.IOException, InterruptedException {
//数据处理
// 1child 2 grand
for (Text value : values) {
String temp = value.toString();
if (temp.startsWith("1"))
childs.add(temp.substring(1));
else
grands.add(temp.substring(1));
}
// 笛卡尔积
for (String c : childs) {
for (String g : grands) {
child.set(c);
grand.set(g);
context.write(child, grand);
}
}
// 清理
childs.clear();
grands.clear();
};
}
public static void main(String[] args) throws Exception {
String inputPath="hdfs://hadoopslave1:9000/input";
String outputPath="hdfs://hadoopslave1:9000/output";
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "单表连接");
job.setJarByClass(SingleJoin.class);
job.setMapperClass(MapClass.class);
job.setReducerClass(ReduceClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
四、多表连接
public class MultiJoin {
public static int time = 0;
/*
* 在map中先区分输入行属于左表还是右表,然后对两列值进行分割,
* 保存连接列在key值,剩余列和左右表标志在value中,最后输出
*/
public static class MapClass extends Mapper<Object, Text, Text, Text> {
/* 实现map函数 */
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString(); /* 每行文件 */
String relationtype = new String(); /* 左右表标识 */
/* 输入文件首行,不处理 */
if (line.contains("factoryname") == true|| line.contains("addressed") == true) {
return;
}
/* 输入的一行预处理文本 */
StringTokenizer itr = new StringTokenizer(line);
String mapkey = new String();
String mapvalue = new String();
int i = 0;
while (itr.hasMoreTokens()) {
/* 先读取一个单词 */
String token = itr.nextToken();
/* 判断该地址ID就把存到"values[0]" */
//根据 第一列是否是0-9判断是左表还是右表
if (token.charAt(0) >= '0' && token.charAt(0) <= '9') {
mapkey = token;
if (i > 0) {
relationtype = "1";
} else {
relationtype = "2";
}
continue;
}
/* 存工厂名 */
mapvalue += token + " ";
i++;
}
/* 输出左右表 */
context.write(new Text(mapkey), new Text(relationtype + "+" + mapvalue));
}
}
/*
* reduce解析map输出,将value中数据按照左右表分别保存,
* 然后求出笛卡尔积,并输出。
*/
public static class ReduceClass extends Reducer<Text, Text, Text, Text> {
/* 实现reduce函数 */
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
/* 输出表头 */
if (0 == time) {
context.write(new Text("factoryname"), new Text("addressname"));
time++;
}
int factorynum = 0;
String[] factory = new String[10];
int addressnum = 0;
String[] address = new String[10];
Iterator ite = values.iterator();
while (ite.hasNext()) {
String record = ite.next().toString();
int len = record.length();
int i = 2;
if (0 == len) {
continue;
}
/* 取得左右表标识 */
char relationtype = record.charAt(0);
/* 左表 */
if ('1' == relationtype) {
factory[factorynum] = record.substring(i);
factorynum++;
}
/* 右表 */
if ('2' == relationtype) {
address[addressnum] = record.substring(i);
addressnum++;
}
}
/* 求笛卡尔积 */
if (0 != factorynum && 0 != addressnum) {
for (int m = 0; m < factorynum; m++) {
for (int n = 0; n < addressnum; n++) {
/* 输出结果 */
context.write(new Text(factory[m]),
new Text(address[n]));
}
}
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String inputPath0="hdfs://hadoopslave1:9000/factoy";
String inputPath1="hdfs://hadoopslave1:9000/address";
Path[] paths=new Pah[]{new Paht(inputPath0),new Path(inputPath1)};
String outputPath="hdfs://hadoopslave1:9000/output";
Job job = Job.getInstance(conf, "多表连接");
job.setJarByClass(MultiJoin.class);
/* 设置Map和Reduce处理类 */
job.setMapperClass(MapClass.class);
job.setReducerClass(ReduceClass.class);
/* 设置输出类型 */
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
/* 设置输入和输出目录 */
// FileInputFormat.setInputPaths(job, new Path(inputPath0),new Path(inputPath1));
FileInputFormat.setInputPaths(job,paths );
FileOutputFormat.setOutputPath(job, new Path(outputPath));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
网友评论