一、Mapreduce计数器。
用来记录job执行的进度和状态, 可以理解为日志, 通过在程序某个位置插入计数器来记录数据或者进度的变化情况。
package Mapreduce_Counters;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Counters {
public static class MyCounterMap extends Mapper<LongWritable, Text, Text, Text> {
public static Counter ct = null;
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String arr_valueString[] = value.toString().split("\t");
if (arr_valueString.length > 3) {
ct = context.getCounter("ErrorCounter", "toolong"); // ErrorCounter为组名,toolong为组员名
ct.increment(1);
} else if (arr_valueString.length < 3) {
ct = context.getCounter("ErrorCounter", "tooshort");
ct.increment(1);
}
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.out.println("Usage: Counters<in><out>");
System.exit(2);
}
Job job = new Job(conf, "MyJob");
job.setJarByClass(Counters.class);
job.setMapperClass(MyCounterMap.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}
网友评论