一、单词统计
hello world hadoop
hadoop hbase hive
hadoop sqoop flue
hbase redis
flue 1
hadoop 3
hbase 2
hello 1
hive 1
redis 1
sqoop 1
world 1
package com.bjsxt.hdfs.wordcount;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MainClass {
public static void main(String[] args) {
if (args == null || args.length != 2) {
throw new RuntimeException("需要指定<输入路径>和<输出路径>");
}
Configuration configuration = new Configuration();
try {
Job job = Job.getInstance(configuration);
job.setJarByClass(MainClass.class);
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(WordCountReducer.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package com.bjsxt.hdfs.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
for (String s : value.toString().split(" ")) {
context.write(new Text(s), new LongWritable(1));
}
}
}
package com.bjsxt.hdfs.wordcount;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> valueIterable, Context context) throws IOException, InterruptedException {
Iterator<LongWritable> valueIterator = valueIterable.iterator();
long l = 0L;
while (valueIterator.hasNext()) {
l += valueIterator.next().get();
}
context.write(key, new LongWritable(l));
}
}
二、温度统计
- 需求分析
统计每一年的每个月中温度最高的两天
- 输入样例
1949-10-01 14:21:02 34c
1949-10-01 19:21:02 38c
1949-10-02 14:01:02 36c
1950-01-01 11:21:02 32c
1950-10-01 12:21:02 37c
1951-12-01 12:21:02 23c
1950-10-02 12:21:02 41c
1950-10-03 12:21:02 27c
1951-07-01 12:21:02 45c
1951-07-02 12:21:02 46c
1951-07-03 12:21:03 47c
1950-01-01 11:21:02 32c
1950-10-02 12:21:02 41c
1950-10-01 12:21:02 37c
1949-10-01 19:21:02 38c
1949-10-02 14:01:02 36c
1951-07-03 12:21:03 47c
1951-07-02 12:21:02 46c
1951-12-01 12:21:02 23c
package com.bjsxt.hdfs.weather;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MainClass {
public static void main(String[] args) {
if (args == null || args.length != 2) {
throw new RuntimeException("需要指定<输入路径>和<输出路径>");
}
Configuration configuration = new Configuration();
try {
Job job = Job.getInstance(configuration);
job.setJarByClass(MainClass.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(WeatherMapper.class);
job.setMapOutputKeyClass(Weather.class);
job.setMapOutputValueClass(Text.class);
job.setSortComparatorClass(WeatherSortComparator.class);
job.setGroupingComparatorClass(WeatherGroupingComparator.class);
job.setPartitionerClass(WeatherPartitioner.class);
job.setReducerClass(WeatherReducer.class);
job.setNumReduceTasks(2);
job.waitForCompletion(true);
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package com.bjsxt.hdfs.weather;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class Weather implements WritableComparable<Weather> {
private int year;
private int month;
private int temperature;
private String line;
public Weather() {}
public Weather(int year, int month, int temperature, String line) {
this.year = year;
this.month = month;
this.temperature = temperature;
this.line = line;
}
public int getYear() {
return year;
}
public void setYear(int year) {
this.year = year;
}
public int getMonth() {
return month;
}
public void setMonth(int month) {
this.month = month;
}
public int getTemperature() {
return temperature;
}
public void setTemperature(int temperature) {
this.temperature = temperature;
}
public String getLine() {
return line;
}
public void setLine(String line) {
this.line = line;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(year);
dataOutput.writeInt(month);
dataOutput.writeInt(temperature);
dataOutput.writeUTF(line);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
setYear(dataInput.readInt());
setMonth(dataInput.readInt());
setTemperature(dataInput.readInt());
setLine(dataInput.readUTF());
}
@Override
public int compareTo(Weather other) {
return line.compareTo(other.getLine());
}
}
package com.bjsxt.hdfs.weather;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WeatherMapper extends Mapper<LongWritable, Text, Weather, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] ss = value.toString().split("\t");
String[] sss = ss[0].split("-");
context.write(new Weather(Integer.parseInt(sss[0]), Integer.parseInt(sss[1]), Integer.parseInt(ss[1].substring(0, ss[1].length() - 1)), ss[0]), new Text(ss[1]));
}
}
package com.bjsxt.hdfs.weather;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class WeatherSortComparator extends WritableComparator {
public WeatherSortComparator() {
super(Weather.class, true);
}
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable writableComparable1, WritableComparable writableComparable2) {
Weather weather1 = (Weather) writableComparable1;
Weather weather2 = (Weather) writableComparable2;
int result = weather1.getYear() - weather2.getYear();
if (result == 0) {
result = weather1.getMonth() - weather2.getMonth();
if (result == 0) {
result = weather2.getTemperature() - weather1.getTemperature();
}
}
return result;
}
}
package com.bjsxt.hdfs.weather;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class WeatherGroupingComparator extends WritableComparator {
public WeatherGroupingComparator() {
super(Weather.class, true);
}
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable writableComparable1, WritableComparable writableComparable2) {
Weather weather1 = (Weather) writableComparable1;
Weather weather2 = (Weather) writableComparable2;
int result = weather1.getYear() - weather2.getYear();
if (result == 0) {
result = weather1.getMonth() - weather2.getMonth();
}
return result;
}
}
package com.bjsxt.hdfs.weather;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class WeatherPartitioner extends Partitioner<Weather, Text> {
@Override
public int getPartition(Weather key, Text value, int numPartitions) {
return key.getYear() % numPartitions;
}
}
package com.bjsxt.hdfs.weather;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WeatherReducer extends Reducer<Weather, Text, Text, Text> {
@Override
protected void reduce(Weather key, Iterable<Text> valueIterable, Context context) throws IOException, InterruptedException {
int counter = 1;
for (Text value : valueIterable) {
if (counter > 2) {
break;
}
context.write(new Text(key.getLine()), value);
counter++;
}
}
}
网友评论