例子:
首先让我们先看一个Map/Reduce的应用示例,以便对它们的工作方式有一个初步的认识。
本次DEMO它可以统计出log日志中各种浏览器的数量。
在运行这个demo前,我们需要做以下准备:
一:统计数据准备和上传HDFS
1.首先需要在HDFS上创建input,output目录
当前目录下:/hadoop-2.7.3
创建方式如下:
①创建目录input
$ hadoop fs -mkdir /input
②将本地的文件上传到input中
先去 https://git.oschina.net/jeetpan/Hadoop,下载access.20120104.log文件
把本地access.log文件拷贝创建的input目录。
$ hadoop fs -put xxx/access.log /input
③查看input目录下的文件
$ hadoop fs -ls /input
④创建目录output
$ hadoop fs -mkdir /output
二:Demo 程序
1.KpiBrowser 程序运行Main
/**
* 统计用户使用的客户端程序
*/
public class KpiBrowser {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//参考core-site.xml中设置的url
String[] inPath = new String[]{"hdfs://localhost:9000/input/*"};
String outPath = "hdfs://localhost:9000/output";
Configuration conf = new Configuration();
String jobName = "browser-pv";
JobInitModel job = new JobInitModel(inPath, outPath, conf, null, jobName
, KpiBrowser.class, null, Mapper.class, Text.class, IntWritable.class, null, null, Reducer.class
, Text.class, IntWritable.class);
JobInitModel sortJob = new JobInitModel(new String[]{outPath + "/part-*"}, outPath + "/sort", conf, null
, jobName + "sort", KpiBrowser.class, null, Mapper.class, Text.class, IntWritable.class, null, null, null, null, null);
BaseDriver.initJob(new JobInitModel[]{job, sortJob});
}
}
2.Mapper
/**
* 读取log文件中的记录,写入Map
*/
public class Mapper extends org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, IntWritable> {
Text browser = new Text();
IntWritable one = new IntWritable(1);
Kpi kpi = new Kpi();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
kpi = Kpi.parse(value.toString());
if (kpi.getIs_validate()) {
browser.set(kpi.getUser_agent());
context.write(browser, one);//写入浏览器类型和数量
}
}
}
3.Reducer
/**
* Reducer
*/
public class Reducer extends org.apache.hadoop.mapreduce.Reducer<Text, IntWritable, Text, IntWritable> {
IntWritable resCount = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
Integer sum = 0;
for (IntWritable i : values) {
sum += i.get();
}
resCount.set(sum);
context.write(key, resCount);
}
}
三:运行结果
KpiBrowser 程序运行结果
/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/bin/java -Didea.launcher.port=7532 "-Didea.launcher.bin.path=/Applications/IntelliJ IDEA.app/Contents/bin" -Dfile.encoding=UTF-8 -classpath "/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/jre/lib/deploy.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/jre/lib/ext/cldrdata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/jre/lib/ext/dnsns.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/jre/lib/ext/jaccess.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/jre/lib/ext/jfxrt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/jre/lib/ext/localedata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/jre/lib/ext/nashorn.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/jre/lib/ext/sunec.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/jre/lib/ext/sunjce_provider.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/jre/lib/ext/sunpkcs11.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/jre/lib/ext/zipfs.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/jre/lib/javaws.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/jre/lib/jfxswt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/jre/lib/management-agent.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/jre/lib/plugin.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/lib/ant-javafx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/lib/dt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/lib/javafx-mx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/lib/jconsole.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/lib/packager.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/lib/sa-jdi.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/lib/tools.jar:/Users/jeetpan/WorkSpace/00-ProjectCode/GitHub/Hadoop/Hadoop-MapReduce/target/classes:/Users/jeetpan/.m2/repository/org/apache/hadoop/hadoop-hdfs/2.7.3/hadoop-hdfs-2.7.3.jar:/Users/jeetpan/.m2/repository/com/google/guava/guava/11.0.2/guava-11.0.2.jar:/Users/jeetpan/.m2/repository/org/mortbay/jetty/jetty/6.1.26/jetty-6.1.26.jar:/Users/jeetpan/.m2/repository/org/mortbay/jetty/jetty-util/6.1.26/jetty-util-6.1.26.jar:/Users/jeetpan/.m2/repository/com/sun/jersey/jersey-core/1.9/jersey-core-1.9.jar:/Users/jeetpan/.m2/repository/com/sun/jersey/jersey-server/1.9/jersey-server-1.9.jar:/Users/jeetpan/.m2/repository/asm/asm/3.1/asm-3.1.jar:/Users/jeetpan/.m2/repository/commons-cli/commons-cli/1.2/commons-cli-1.2.jar:/Users/jeetpan/.m2/repository/commons-codec/commons-codec/1.4/commons-codec-1.4.jar:/Users/jeetpan/.m2/repository/commons-io/commons-io/2.4/commons-io-2.4.jar:/Users/jeetpan/.m2/repository/commons-lang/commons-lang/2.6/commons-lang-2.6.jar:/Users/jeetpan/.m2/repository/commons-logging/commons-logging/1.1.3/commons-logging-1.1.3.jar:/Users/jeetpan/.m2/repository/commons-daemon/commons-daemon/1.0.13/commons-daemon-1.0.13.jar:/Users/jeetpan/.m2/repository/log4j/log4j/1.2.17/log4j-1.2.17.jar:/Users/jeetpan/.m2/repository/com/google/protobuf/protobuf-java/2.5.0/protobuf-java-2.5.0.jar:/Users/jeetpan/.m2/repository/javax/servlet/servlet-api/2.5/servlet-api-2.5.jar:/Users/jeetpan/.m2/repository/org/codehaus/jackson/jackson-core-asl/1.9.13/jackson-core-asl-1.9.13.jar:/Users/jeetpan/.m2/repository/org/codehaus/jackson/jackson-mapper-asl/1.9.13/jackson-mapper-asl-1.9.13.jar:/Users/jeetpan/.m2/repository/xmlenc/xmlenc/0.52/xmlenc-0.52.jar:/Users/jeetpan/.m2/repository/io/netty/netty/3.6.2.Final/netty-3.6.2.Final.jar:/Users/jeetpan/.m2/repository/io/netty/netty-all/4.0.23.Final/netty-all-4.0.23.Final.jar:/Users/jeetpan/.m2/repository/xerces/xercesImpl/2.9.1/xercesImpl-2.9.1.jar:/Users/jeetpan/.m2/repository/xml-apis/xml-apis/1.3.04/xml-apis-1.3.04.jar:/Users/jeetpan/.m2/repository/org/apache/htrace/htrace-core/3.1.0-incubating/htrace-core-3.1.0-incubating.jar:/Users/jeetpan/.m2/repository/org/fusesource/leveldbjni/leveldbjni-all/1.8/leveldbjni-all-1.8.jar:/Users/jeetpan/.m2/repository/org/apache/hadoop/hadoop-common/2.7.3/hadoop-common-2.7.3.jar:/Users/jeetpan/.m2/repository/org/apache/hadoop/hadoop-annotations/2.7.3/hadoop-annotations-2.7.3.jar:/Users/jeetpan/.m2/repository/commons-httpclient/commons-httpclient/3.1/commons-httpclient-3.1.jar:/Users/jeetpan/.m2/repository/commons-net/commons-net/3.1/commons-net-3.1.jar:/Users/jeetpan/.m2/repository/commons-collections/commons-collections/3.2.2/commons-collections-3.2.2.jar:/Users/jeetpan/.m2/repository/javax/servlet/jsp/jsp-api/2.1/jsp-api-2.1.jar:/Users/jeetpan/.m2/repository/com/sun/jersey/jersey-json/1.9/jersey-json-1.9.jar:/Users/jeetpan/.m2/repository/org/codehaus/jettison/jettison/1.1/jettison-1.1.jar:/Users/jeetpan/.m2/repository/com/sun/xml/bind/jaxb-impl/2.2.3-1/jaxb-impl-2.2.3-1.jar:/Users/jeetpan/.m2/repository/org/codehaus/jackson/jackson-jaxrs/1.8.3/jackson-jaxrs-1.8.3.jar:/Users/jeetpan/.m2/repository/org/codehaus/jackson/jackson-xc/1.8.3/jackson-xc-1.8.3.jar:/Users/jeetpan/.m2/repository/net/java/dev/jets3t/jets3t/0.9.0/jets3t-0.9.0.jar:/Users/jeetpan/.m2/repository/org/apache/httpcomponents/httpcore/4.1.2/httpcore-4.1.2.jar:/Users/jeetpan/.m2/repository/com/jamesmurty/utils/java-xmlbuilder/0.4/java-xmlbuilder-0.4.jar:/Users/jeetpan/.m2/repository/commons-configuration/commons-configuration/1.6/commons-configuration-1.6.jar:/Users/jeetpan/.m2/repository/commons-digester/commons-digester/1.8/commons-digester-1.8.jar:/Users/jeetpan/.m2/repository/commons-beanutils/commons-beanutils/1.7.0/commons-beanutils-1.7.0.jar:/Users/jeetpan/.m2/repository/commons-beanutils/commons-beanutils-core/1.8.0/commons-beanutils-core-1.8.0.jar:/Users/jeetpan/.m2/repository/org/slf4j/slf4j-api/1.7.10/slf4j-api-1.7.10.jar:/Users/jeetpan/.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar:/Users/jeetpan/.m2/repository/org/apache/avro/avro/1.7.4/avro-1.7.4.jar:/Users/jeetpan/.m2/repository/com/thoughtworks/paranamer/paranamer/2.3/paranamer-2.3.jar:/Users/jeetpan/.m2/repository/org/xerial/snappy/snappy-java/1.0.4.1/snappy-java-1.0.4.1.jar:/Users/jeetpan/.m2/repository/com/google/code/gson/gson/2.2.4/gson-2.2.4.jar:/Users/jeetpan/.m2/repository/com/jcraft/jsch/0.1.42/jsch-0.1.42.jar:/Users/jeetpan/.m2/repository/org/apache/curator/curator-client/2.7.1/curator-client-2.7.1.jar:/Users/jeetpan/.m2/repository/org/apache/curator/curator-recipes/2.7.1/curator-recipes-2.7.1.jar:/Users/jeetpan/.m2/repository/com/google/code/findbugs/jsr305/3.0.0/jsr305-3.0.0.jar:/Users/jeetpan/.m2/repository/org/apache/zookeeper/zookeeper/3.4.6/zookeeper-3.4.6.jar:/Users/jeetpan/.m2/repository/org/apache/commons/commons-compress/1.4.1/commons-compress-1.4.1.jar:/Users/jeetpan/.m2/repository/org/tukaani/xz/1.0/xz-1.0.jar:/Users/jeetpan/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-core/2.7.3/hadoop-mapreduce-client-core-2.7.3.jar:/Users/jeetpan/.m2/repository/org/apache/hadoop/hadoop-yarn-common/2.7.3/hadoop-yarn-common-2.7.3.jar:/Users/jeetpan/.m2/repository/javax/xml/bind/jaxb-api/2.2.2/jaxb-api-2.2.2.jar:/Users/jeetpan/.m2/repository/javax/xml/stream/stax-api/1.0-2/stax-api-1.0-2.jar:/Users/jeetpan/.m2/repository/javax/activation/activation/1.1/activation-1.1.jar:/Users/jeetpan/.m2/repository/com/sun/jersey/jersey-client/1.9/jersey-client-1.9.jar:/Users/jeetpan/.m2/repository/com/google/inject/guice/3.0/guice-3.0.jar:/Users/jeetpan/.m2/repository/javax/inject/javax.inject/1/javax.inject-1.jar:/Users/jeetpan/.m2/repository/aopalliance/aopalliance/1.0/aopalliance-1.0.jar:/Users/jeetpan/.m2/repository/com/sun/jersey/contribs/jersey-guice/1.9/jersey-guice-1.9.jar:/Users/jeetpan/.m2/repository/com/google/inject/extensions/guice-servlet/3.0/guice-servlet-3.0.jar:/Users/jeetpan/.m2/repository/org/apache/hadoop/hadoop-client/2.7.3/hadoop-client-2.7.3.jar:/Users/jeetpan/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-app/2.7.3/hadoop-mapreduce-client-app-2.7.3.jar:/Users/jeetpan/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-common/2.7.3/hadoop-mapreduce-client-common-2.7.3.jar:/Users/jeetpan/.m2/repository/org/apache/hadoop/hadoop-yarn-client/2.7.3/hadoop-yarn-client-2.7.3.jar:/Users/jeetpan/.m2/repository/org/apache/hadoop/hadoop-yarn-server-common/2.7.3/hadoop-yarn-server-common-2.7.3.jar:/Users/jeetpan/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-shuffle/2.7.3/hadoop-mapreduce-client-shuffle-2.7.3.jar:/Users/jeetpan/.m2/repository/org/apache/hadoop/hadoop-yarn-api/2.7.3/hadoop-yarn-api-2.7.3.jar:/Users/jeetpan/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-jobclient/2.7.3/hadoop-mapreduce-client-jobclient-2.7.3.jar:/Users/jeetpan/.m2/repository/org/apache/hadoop/hadoop-auth/2.7.3/hadoop-auth-2.7.3.jar:/Users/jeetpan/.m2/repository/org/apache/httpcomponents/httpclient/4.2.5/httpclient-4.2.5.jar:/Users/jeetpan/.m2/repository/org/apache/directory/server/apacheds-kerberos-codec/2.0.0-M15/apacheds-kerberos-codec-2.0.0-M15.jar:/Users/jeetpan/.m2/repository/org/apache/directory/server/apacheds-i18n/2.0.0-M15/apacheds-i18n-2.0.0-M15.jar:/Users/jeetpan/.m2/repository/org/apache/directory/api/api-asn1-api/1.0.0-M20/api-asn1-api-1.0.0-M20.jar:/Users/jeetpan/.m2/repository/org/apache/directory/api/api-util/1.0.0-M20/api-util-1.0.0-M20.jar:/Users/jeetpan/.m2/repository/org/apache/curator/curator-framework/2.7.1/curator-framework-2.7.1.jar:/Users/jeetpan/.m2/repository/org/apache/mahout/mahout-core/0.9/mahout-core-0.9.jar:/Users/jeetpan/.m2/repository/org/apache/commons/commons-lang3/3.1/commons-lang3-3.1.jar:/Users/jeetpan/.m2/repository/com/thoughtworks/xstream/xstream/1.4.4/xstream-1.4.4.jar:/Users/jeetpan/.m2/repository/xmlpull/xmlpull/1.1.3.1/xmlpull-1.1.3.1.jar:/Users/jeetpan/.m2/repository/xpp3/xpp3_min/1.1.4c/xpp3_min-1.1.4c.jar:/Users/jeetpan/.m2/repository/org/apache/lucene/lucene-core/4.6.1/lucene-core-4.6.1.jar:/Users/jeetpan/.m2/repository/org/apache/lucene/lucene-analyzers-common/4.6.1/lucene-analyzers-common-4.6.1.jar:/Users/jeetpan/.m2/repository/org/apache/mahout/commons/commons-cli/2.0-mahout/commons-cli-2.0-mahout.jar:/Users/jeetpan/.m2/repository/org/apache/solr/solr-commons-csv/3.5.0/solr-commons-csv-3.5.0.jar:/Users/jeetpan/.m2/repository/org/apache/hadoop/hadoop-core/1.2.1/hadoop-core-1.2.1.jar:/Users/jeetpan/.m2/repository/org/apache/commons/commons-math/2.1/commons-math-2.1.jar:/Users/jeetpan/.m2/repository/commons-el/commons-el/1.0/commons-el-1.0.jar:/Users/jeetpan/.m2/repository/org/apache/mahout/mahout-math/0.9/mahout-math-0.9.jar:/Users/jeetpan/.m2/repository/org/apache/mahout/mahout-integration/0.9/mahout-integration-0.9.jar:/Users/jeetpan/.m2/repository/org/apache/commons/commons-math3/3.2/commons-math3-3.2.jar:/Applications/IntelliJ IDEA.app/Contents/lib/idea_rt.jar" com.intellij.rt.execution.application.AppMain com.jeet.hadoop.mapreduce.kpi.browser.KpiBrowser
16/12/30 15:12:07 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/12/30 15:12:08 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/12/30 15:12:08 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/12/30 15:12:08 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
16/12/30 15:12:08 WARN mapreduce.JobResourceUploader: No job jar file set. User classes may not be found. See Job or Job#setJar(String).
16/12/30 15:12:08 INFO input.FileInputFormat: Total input paths to process : 1
16/12/30 15:12:09 INFO mapreduce.JobSubmitter: number of splits:6
16/12/30 15:12:09 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1040540662_0001
16/12/30 15:12:09 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
16/12/30 15:12:09 INFO mapreduce.Job: Running job: job_local1040540662_0001
16/12/30 15:12:09 INFO mapred.LocalJobRunner: OutputCommitter set in config null
16/12/30 15:12:09 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
16/12/30 15:12:09 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
16/12/30 15:12:09 INFO mapred.LocalJobRunner: Waiting for map tasks
16/12/30 15:12:09 INFO mapred.LocalJobRunner: Starting task: attempt_local1040540662_0001_m_000000_0
16/12/30 15:12:09 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
16/12/30 15:12:09 INFO util.ProcfsBasedProcessTree: ProcfsBasedProcessTree currently is supported only on Linux.
16/12/30 15:12:09 INFO mapred.Task: Using ResourceCalculatorProcessTree : null
16/12/30 15:12:09 INFO mapred.MapTask: Processing split: hdfs://localhost:9000/input/access.20120104.log:0+134217728
16/12/30 15:12:09 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
16/12/30 15:12:09 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
16/12/30 15:12:09 INFO mapred.MapTask: soft limit at 83886080
16/12/30 15:12:09 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
16/12/30 15:12:09 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
16/12/30 15:12:09 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
16/12/30 15:12:10 INFO mapreduce.Job: Job job_local1040540662_0001 running in uber mode : false
16/12/30 15:12:10 INFO mapreduce.Job: map 0% reduce 0%
16/12/30 15:12:11 INFO mapred.LocalJobRunner:
16/12/30 15:12:11 INFO mapred.MapTask: Starting flush of map output
16/12/30 15:12:11 INFO mapred.MapTask: Spilling map output
16/12/30 15:12:11 INFO mapred.MapTask: bufstart = 0; bufend = 2092378; bufvoid = 104857600
16/12/30 15:12:11 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 25724936(102899744); length = 489461/6553600
16/12/30 15:12:12 INFO mapred.MapTask: Finished spill 0
16/12/30 15:12:12 INFO mapred.Task: Task:attempt_local1040540662_0001_m_000000_0 is done. And is in the process of committing
16/12/30 15:12:12 INFO mapred.LocalJobRunner: map
16/12/30 15:12:12 INFO mapred.Task: Task 'attempt_local1040540662_0001_m_000000_0' done.
16/12/30 15:12:12 INFO mapred.LocalJobRunner: Finishing task: attempt_local1040540662_0001_m_000000_0
16/12/30 15:12:12 INFO mapred.LocalJobRunner: Starting task: attempt_local1040540662_0001_m_000001_0
16/12/30 15:12:12 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
16/12/30 15:12:12 INFO util.ProcfsBasedProcessTree: ProcfsBasedProcessTree currently is supported only on Linux.
16/12/30 15:12:12 INFO mapred.Task: Using ResourceCalculatorProcessTree : null
16/12/30 15:12:12 INFO mapred.MapTask: Processing split: hdfs://localhost:9000/input/access.20120104.log:134217728+134217728
16/12/30 15:12:12 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
16/12/30 15:12:12 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
16/12/30 15:12:12 INFO mapred.MapTask: soft limit at 83886080
16/12/30 15:12:12 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
16/12/30 15:12:12 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
16/12/30 15:12:12 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
16/12/30 15:12:12 INFO mapreduce.Job: map 100% reduce 0%
16/12/30 15:12:13 INFO mapred.LocalJobRunner:
16/12/30 15:12:13 INFO mapred.MapTask: Starting flush of map output
16/12/30 15:12:13 INFO mapred.MapTask: Spilling map output
16/12/30 15:12:13 INFO mapred.MapTask: bufstart = 0; bufend = 2657832; bufvoid = 104857600
16/12/30 15:12:13 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 25592192(102368768); length = 622205/6553600
16/12/30 15:12:13 INFO mapred.MapTask: Finished spill 0
16/12/30 15:12:13 INFO mapred.Task: Task:attempt_local1040540662_0001_m_000001_0 is done. And is in the process of committing
16/12/30 15:12:13 INFO mapred.LocalJobRunner: map
16/12/30 15:12:13 INFO mapred.Task: Task 'attempt_local1040540662_0001_m_000001_0' done.
16/12/30 15:12:13 INFO mapred.LocalJobRunner: Finishing task: attempt_local1040540662_0001_m_000001_0
16/12/30 15:12:13 INFO mapred.LocalJobRunner: Starting task: attempt_local1040540662_0001_m_000002_0
16/12/30 15:12:13 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
16/12/30 15:12:13 INFO util.ProcfsBasedProcessTree: ProcfsBasedProcessTree currently is supported only on Linux.
16/12/30 15:12:13 INFO mapred.Task: Using ResourceCalculatorProcessTree : null
16/12/30 15:12:13 INFO mapred.MapTask: Processing split: hdfs://localhost:9000/input/access.20120104.log:268435456+134217728
16/12/30 15:12:13 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
16/12/30 15:12:13 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
16/12/30 15:12:13 INFO mapred.MapTask: soft limit at 83886080
16/12/30 15:12:13 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
16/12/30 15:12:13 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
16/12/30 15:12:13 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
16/12/30 15:12:15 INFO mapred.LocalJobRunner:
.....
.....
.....
16/12/30 15:12:18 INFO mapred.LocalJobRunner: 6 / 6 copied.
16/12/30 15:12:18 INFO reduce.MergeManagerImpl: finalMerge called with 6 in-memory map-outputs and 0 on-disk map-outputs
16/12/30 15:12:18 INFO mapred.Merger: Merging 6 sorted segments
16/12/30 15:12:18 INFO mapred.Merger: Down to the last merge-pass, with 6 segments left of total size: 14852536 bytes
16/12/30 15:12:18 INFO reduce.MergeManagerImpl: Merged 6 segments, 14852761 bytes to disk to satisfy reduce memory limit
16/12/30 15:12:18 INFO reduce.MergeManagerImpl: Merging 1 files, 14852755 bytes from disk
16/12/30 15:12:18 INFO reduce.MergeManagerImpl: Merging 0 segments, 0 bytes from memory into reduce
16/12/30 15:12:18 INFO mapred.Merger: Merging 1 sorted segments
16/12/30 15:12:18 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 14852748 bytes
16/12/30 15:12:18 INFO mapred.LocalJobRunner: 6 / 6 copied.
16/12/30 15:12:18 INFO Configuration.deprecation: mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
16/12/30 15:12:19 INFO mapred.Task: Task:attempt_local1040540662_0001_r_000000_0 is done. And is in the process of committing
16/12/30 15:12:19 INFO mapred.LocalJobRunner: 6 / 6 copied.
16/12/30 15:12:19 INFO mapred.Task: Task attempt_local1040540662_0001_r_000000_0 is allowed to commit now
16/12/30 15:12:19 INFO output.FileOutputCommitter: Saved output of task 'attempt_local1040540662_0001_r_000000_0' to hdfs://localhost:9000/output/_temporary/0/task_local1040540662_0001_r_000000
16/12/30 15:12:19 INFO mapred.LocalJobRunner: reduce > reduce
16/12/30 15:12:19 INFO mapred.Task: Task 'attempt_local1040540662_0001_r_000000_0' done.
16/12/30 15:12:19 INFO mapred.LocalJobRunner: Finishing task: attempt_local1040540662_0001_r_000000_0
16/12/30 15:12:19 INFO mapred.LocalJobRunner: reduce task executor complete.
16/12/30 15:12:20 INFO mapreduce.Job: map 100% reduce 100%
16/12/30 15:12:20 INFO mapreduce.Job: Job job_local1040540662_0001 completed successfully
16/12/30 15:12:20 INFO mapreduce.Job: Counters: 35
File System Counters
FILE: Number of bytes read=29724390
FILE: Number of bytes written=88981738
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=3389790430
HDFS: Number of bytes written=934
HDFS: Number of read operations=92
HDFS: Number of large read operations=0
HDFS: Number of write operations=16
Map-Reduce Framework
Map input records=2929645
Map output records=778116
Map output bytes=13296517
Map output materialized bytes=14852785
Input split bytes=672
Combine input records=0
Combine output records=0
Reduce input groups=44
Reduce shuffle bytes=14852785
Reduce input records=778116
Reduce output records=44
Spilled Records=1556232
Shuffled Maps =6
Failed Shuffles=0
Merged Map outputs=6
GC time elapsed (ms)=453
Total committed heap usage (bytes)=11341922304
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=688231535
File Output Format Counters
Bytes Written=934
16/12/30 15:12:20 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
16/12/30 15:12:20 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
16/12/30 15:12:20 WARN mapreduce.JobResourceUploader: No job jar file set. User classes may not be found. See Job or Job#setJar(String).
.....
.....
.....
15:12:21 INFO mapreduce.Job: Job job_local558846411_0002 running in uber mode : false
16/12/30 15:12:21 INFO mapreduce.Job: map 100% reduce 100%
16/12/30 15:12:21 INFO mapreduce.Job: Job job_local558846411_0002 completed successfully
16/12/30 15:12:21 INFO mapreduce.Job: Counters: 35
File System Counters
FILE: Number of bytes read=59419410
FILE: Number of bytes written=60554086
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=1376464938
HDFS: Number of bytes written=1868
HDFS: Number of read operations=63
HDFS: Number of large read operations=0
HDFS: Number of write operations=18
Map-Reduce Framework
Map input records=44
Map output records=0
Map output bytes=0
Map output materialized bytes=6
Input split bytes=106
Combine input records=0
Combine output records=0
Reduce input groups=0
Reduce shuffle bytes=6
Reduce input records=0
Reduce output records=0
Spilled Records=0
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=0
Total committed heap usage (bytes)=3979345920
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=934
File Output Format Counters
Bytes Written=0
Process finished with exit code 0
访问 http://localhost:50070/ 查询运行结果
下载part-r-00000查看统计结果
6
"HTCT9188_TD/1.0 3
"HuaweiSymantecSpider/1.0+DSE-support@huaweisymantec.com+(compatible; 1200
"MQQBrowser/1.9.1 4
"MQQBrowser/2.1 8
"MQQBrowser/28 13
"MQQBrowser/29 4
"Microsoft 80
"Mozilla/4.0 668020
"Mozilla/5.0 108539
"Nokia5230/21.0.004 4
"Nokia5233/40.1.003 2
"Nokia5233/50.1.001 1
"Nokia5800w/51.0.006 1
"Nokia6120c/3.83.2 2
"Nokia6120c/4.21 15
"Nokia6120ci/7.10 1
"NokiaC5-03/20.0.024 1
"NokiaC7-00/022.014 1
"NokiaC7-00/CU/022.014 15
"NokiaE63/200.21.005 2
"NokiaE72-1/031.023 1
"NokiaE72-1/081.003 1
"NokiaE72/031.023 3
.......
.......
四:核心功能描述
应用程序通常会通过提供map和reduce来实现 Mapper和Reducer接口,它们组成作业的核心。
Mapper
Mapper将输入键值对(key/value pair)映射到一组中间格式的键值对集合。
Map是一类将输入记录集转换为中间格式记录集的独立任务。 这种转换的中间格式记录集不需要与输入记录集的类型一致。
一个给定的输入键值对可以映射成0个或多个输出键值对。
Hadoop Map/Reduce框架为每一个InputSplit产生一个map任务,而每个InputSplit是由该作业的InputFormat产生的。
Reducer
Reducer将与一个key关联的一组中间数值集归约(reduce)为一个更小的数值集。
用户可以通过 JobConf.setNumReduceTasks(int)设定一个作业中reduce任务的数目。
概括地说,对Reducer的实现者需要重写 JobConfigurable.configure(JobConf)方法,这个方法需要传递一个JobConf参数,目的是完成Reducer的初始化工作。
然后,框架为成组的输入数据中的每个<key, (list of values)>对调用一次 reduce(WritableComparable, Iterator, OutputCollector, Reporter)方法。
之后,应用程序可以通过重写Closeable.close()来执行相应的清理工作。
Reducer有3个主要阶段:shuffle、sort和reduce。
Shuffle
Reducer的输入就是Mapper已经排好序的输出。在这个阶段,框架通过HTTP为每个Reducer获得所有Mapper输出中与之相关的分块。
Sort
这个阶段,框架将按照key的值对Reducer的输入进行分组 (因为不同mapper的输出中可能会有相同的key)。
Shuffle和Sort两个阶段是同时进行的;map的输出也是一边被取回一边被合并的。
Secondary Sort
如果需要中间过程对key的分组规则和reduce前对key的分组规则不同,那么可以通过 JobConf.setOutputValueGroupingComparator(Class)来指定一个Comparator。再加上 JobConf.setOutputKeyComparatorClass(Class)可用于控制中间过程的key如何被分组,所以结合两者可以实现按值的二次排序。
Reduce
在这个阶段,框架为已分组的输入数据中的每个 <key, (list of values)>对调用一次 reduce(WritableComparable, Iterator, OutputCollector, Reporter)方法。
Reduce任务的输出通常是通过调用 OutputCollector.collect(WritableComparable, Writable)写入 文件系统的。
应用程序可以使用Reporter报告进度,设定应用程序级别的状态消息,更新Counters(计数器),或者仅是表明自己运行正常。
Reducer的输出是没有排序的。
需要多少个Reduce?
Reduce的数目建议是0.95或1.75乘以 (<no. of nodes> * mapred.tasktracker.reduce.tasks.maximum)。
用0.95,所有reduce可以在maps一完成时就立刻启动,开始传输map的输出结果。用1.75,速度快的节点可以在完成第一轮reduce任务后,可以开始第二轮,这样可以得到比较好的负载均衡的效果。
增加reduce的数目会增加整个框架的开销,但可以改善负载均衡,降低由于执行失败带来的负面影响。
上述比例因子比整体数目稍小一些是为了给框架中的推测性任务(speculative-tasks) 或失败的任务预留一些reduce的资源。
无Reducer
如果没有归约要进行,那么设置reduce任务的数目为零是合法的。
这种情况下,map任务的输出会直接被写入由 setOutputPath(Path)指定的输出路径。框架在把它们写入FileSystem之前没有对它们进行排序。
Partitioner
Partitioner用于划分键值空间(key space)。
Partitioner负责控制map输出结果key的分割。Key(或者一个key子集)被用于产生分区,通常使用的是Hash函数。分区的数目与一个作业的reduce任务的数目是一样的。因此,它控制将中间过程的key(也就是这条记录)应该发送给m个reduce任务中的哪一个来进行reduce操作。
HashPartitioner是默认的 Partitioner。
Reporter
Reporter是用于Map/Reduce应用程序报告进度,设定应用级别的状态消息, 更新Counters(计数器)的机制。
Mapper和Reducer的实现可以利用Reporter 来报告进度,或者仅是表明自己运行正常。在那种应用程序需要花很长时间处理个别键值对的场景中,这种机制是很关键的,因为框架可能会以为这个任务超时了,从而将它强行杀死。另一个避免这种情况发生的方式是,将配置参数mapred.task.timeout设置为一个足够高的值(或者干脆设置为零,则没有超时限制了)。
应用程序可以用Reporter来更新Counter(计数器)。
OutputCollector
OutputCollector是一个Map/Reduce框架提供的用于收集 Mapper或Reducer输出数据的通用机制 (包括中间输出结果和作业的输出结果)。
Hadoop Map/Reduce框架附带了一个包含许多实用型的mapper、reducer和partitioner 的类库。
作业配置
JobConf代表一个Map/Reduce作业的配置。
JobConf是用户向Hadoop框架描述一个Map/Reduce作业如何执行的主要接口。框架会按照JobConf描述的信息忠实地去尝试完成这个作业,然而:
一些参数可能会被管理者标记为 final,这意味它们不能被更改。
一些作业的参数可以被直截了当地进行设置(例如: setNumReduceTasks(int)),而另一些参数则与框架或者作业的其他参数之间微妙地相互影响,并且设置起来比较复杂(例如: setNumMapTasks(int))。
通常,JobConf会指明Mapper、Combiner(如果有的话)、 Partitioner、Reducer、InputFormat和 OutputFormat的具体实现。JobConf还能指定一组输入文件 (setInputPaths(JobConf, Path...) /addInputPath(JobConf, Path)) 和(setInputPaths(JobConf, String) /addInputPaths(JobConf, String)) 以及输出文件应该写在哪儿 (setOutputPath(Path))。
JobConf可选择地对作业设置一些高级选项,例如:设置Comparator; 放到DistributedCache上的文件;中间结果或者作业输出结果是否需要压缩以及怎么压缩; 利用用户提供的脚本(setMapDebugScript(String)/setReduceDebugScript(String)) 进行调试;作业是否允许预防性(speculative)任务的执行 (setMapSpeculativeExecution(boolean))/(setReduceSpeculativeExecution(boolean)) ;每个任务最大的尝试次数 (setMaxMapAttempts(int)/setMaxReduceAttempts(int)) ;一个作业能容忍的任务失败的百分比 (setMaxMapTaskFailuresPercent(int)/setMaxReduceTaskFailuresPercent(int)) ;等等。
当然,用户能使用 set(String, String)/get(String, String) 来设置或者取得应用程序需要的任意参数。然而,DistributedCache的使用是面向大规模只读数据的。
网友评论