美文网首页我爱编程
hadoop-java客户端搭建&WordCount

hadoop-java客户端搭建&WordCount

作者: 持而盈 | 来源:发表于2017-09-15 12:26 被阅读280次

    java客户端&开发环境搭建

    win7下开发环境配置

    1 先官网下hadop,然后配置HADOOP_HOME.

    2 用csdn下的包替换HADOOP_HOME里的bin目录
    此文件已经存于网盘
    要注意版本对应.
    这个包是操作系统依赖的文件. 在linux 下没毛病,win7很蛋疼.

    hdfs上传文件

    其他的都差不多,照着写就行.

    package com.example;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.junit.Before;
    import org.junit.Test;
    
    import java.io.IOException;
    import java.net.URI;
    import java.net.URISyntaxException;
    
    /**
     * Created by v_zhangbing on 2017/7/4.
     */
    public class HdfsClientTest {
    
        private Configuration conf;
        private FileSystem fs;
    
        @Before
        public void init() throws URISyntaxException, IOException, InterruptedException {
            conf = new Configuration();
            //conf.set("fs.defaultFS","hdfs://ubuntu:9000");
    
            fs = FileSystem.get(new URI("hdfs://ubuntu:9000"), conf, "zb");
        }
    
        @Test
        public void testUploadFile() throws IOException {
            fs.copyFromLocalFile(new Path("C:/Users/v_zhangbing/Downloads/aaa"), new Path("/java/aaa"));
            fs.close();
        }
    
        
    }
    
    

    MapReduce本地调试

    本地运行最重要的是环境的配置.
    1 maven引用的hadoop版本要和本地Hadoop版本一致.
    2 网上下载windows版本的工具包(就是bin下的几个文件)替换/hadoop/bin. 这个我的网盘存了一份, 另外网上都能找到.

    maven:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.example</groupId>
        <artifactId>hadoop</artifactId>
        <version>1.0-SNAPSHOT</version>
        <packaging>jar</packaging>
    
    
        <dependencies>
            <!-- hadoop 分布式文件系统类库 -->
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-hdfs</artifactId>
                <version>2.6.5</version>
            </dependency>
    
            <!-- hadoop 公共类库 -->
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>2.6.5</version>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>2.6.5</version>
            </dependency>
        </dependencies>
    
    
    </project>
    

    conf中设置本地模式

    public class WordcountDriver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Configuration conf = new Configuration();
    
            // 本地调试MapReduce
            conf.set("mapreduce.framework.name","local");
            conf.set("fs.defaultFS","file:///");
    
            Job job = Job.getInstance(conf);
    
            // 指定本程序的jar包所在本地路径
            job.setJarByClass(WordcountDriver.class);
    
            // 指定本业务用的mapper reducer 类
            job.setMapperClass(WordcountMapper.class);
            job.setReducerClass(WordcountReducer.class);
    
            // 指定mapper输出数据的kv类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            // 指定最终输出数据的kv类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            // 指定job的输入源文件所在目录
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            // 指定job的输出结果所在目录
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            // job.submit();
            boolean result = job.waitForCompletion(true);
    
            // 根据处理结果给程序设定退出码
            System.exit(result ? 0 : 1);
    
        }
    }
    

    WordCount-单词统计程序

    统计单词次数的mapreducer程序

    maven

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.example</groupId>
        <artifactId>hadoop</artifactId>
        <version>1.0-SNAPSHOT</version>
        <packaging>jar</packaging>
    
    
        <dependencies>
            <!-- hadoop 分布式文件系统类库 -->
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-hdfs</artifactId>
                <version>2.6.5</version>
            </dependency>
    
            <!-- hadoop 公共类库 -->
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>2.6.5</version>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>2.6.5</version>
            </dependency>
        </dependencies>
    
    
    </project>
    

    WordcountMapper

    package wordcount;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    /**
     * 一个单词次数统计的MapReduce程序。
     * 泛型的含义如下:
     *
     * KeyIn: 默认情况下是mr框架所读到的第一行文本的偏移量, Long.
     * 但是在hadoop中有自己更精简的序列化接口, 所以不用Long, 而用LongWritable
     * ValueIn: 默认情况是时mr框架读到的一行文本内容, String 同上用Text
     *
     * KeyOut: 是用户自定义逻辑处理完之后输出数据中的Key, 在此处是单词, String
     * ValueOut: 是用户自定义逻辑处理完之后输出数据中的, 在此处是单次次数, Integer
     *
     * Created by zb on 2017/7/13.
     */
    public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
        /**
         * map阶段的业务逻辑就写在自定义的map()中
         * mapTask会对每一行输入数据调用一次我们的map()
         *
         * @param key
         * @param value
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 将mapTask传给我们的的文本内容先转换成String
            String line = value.toString();
            // 根据空格切分单词
            String[] words = line.split(" ");
    
            // 将单词输出为<单词, 1>
            for (String word : words) {
                // 把单词作为key 次数作为value 分发给reduce, 相同的key会给到同一个reduceTask
                context.write(new Text(word), new IntWritable(1));
            }
    
        }
    }
    
    

    WordcountReducer

    package wordcount;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    /**
     * 单词统计的ReducerTask,泛型参数如下:
     *
     * KeyIn ValueIn 对应Mapper输出的KeyOut ValueOut
     *
     * KeyOut ValueOut 是自定义Reducer逻辑处理结构的输出
     * KeyOut是单词 ValueOut是次数
     *
     * Created by zb on 2017/7/13.
     */
    public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    
        /**
         * @param key     是一组单词相同的KV对的key
         * @param values  是值得集合
         * @param context 上下文
         */
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int count = 0;
    
            for (IntWritable value : values) {
                count++;
            }
    
            context.write(key,new IntWritable(count));
    
        }
    }
    
    

    WordcountDriver

    package wordcount;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    /**
     * 相当于一个yarn集群的客户端, 要再次封装我们map reduce程序的运行参数,指定jar包,最后提交给yarn
     *
     * Created by zb on 2017/7/14.
     */
    public class WordcountDriver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
    
            // 指定本程序的jar包所在本地路径
            job.setJarByClass(WordcountDriver.class);
    
            // 指定本业务用的mapper reducer 类
            job.setMapperClass(WordcountMapper.class);
            job.setReducerClass(WordcountReducer.class);
    
            // 指定mapper输出数据的kv类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            // 指定最终输出数据的kv类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            // 指定job的输入源文件所在目录
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            // 指定job的输出结果所在目录
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            // job.submit();
            boolean result = job.waitForCompletion(true);
    
            // 根据处理结果给程序设定退出码
            System.exit(result ? 0 : 1);
    
        }
    }
    
    

    程序的启动命令

    hadoop jar hadoop-1.0-SNAPSHOT.jar wordcount.WordcountDriver /wordcount/input /wordcount/output
    

    后面跟上运行的主类,2个参数
    hadoop har 其实就是普通的 java -jar 附加Hadoop_Home下的jar包而已。

    相关文章

      网友评论

        本文标题:hadoop-java客户端搭建&WordCount

        本文链接:https://www.haomeiwen.com/subject/yoxrsxtx.html