美文网首页大数据学习笔记
03_HADOOP_06_HDFS和MR客户端java编程

03_HADOOP_06_HDFS和MR客户端java编程

作者: 超级小小张 | 来源:发表于2019-08-29 17:31 被阅读0次

    创建一个maven工程

    pom.xml引入如下依赖
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>2.9.2</version>
    </dependency>
    

    复制hadoop环境下面的配置文件到resources目录

    复制四个配置文件

    编写HDFS客户端

    package com.zhanghh.train;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.*;
    import org.apache.hadoop.io.IOUtils;
    import org.junit.After;
    import org.junit.Before;
    import org.junit.Test;
    
    import java.io.*;
    
    /**
     * Hello world!
     *
     */
    public class HdfsTest {
        Configuration conf;
        FileSystem fs;
    
        @Before
        public void before() throws Exception {
            //加载配置,默认会加载classpath下面的四个配置
            conf = new Configuration(true);
            //根据配置获取FileSystem API
            fs = FileSystem.get(conf);
        }
    
        @After
        public void after() throws Exception {
            fs.close();
        }
    
        /**
         * 创建目录
         * @throws Exception
         */
        @Test
        public void mkdir() throws Exception {
            Path path = new Path("/tmp");
            if (fs.exists(path)){
                fs.delete(path,true);
            }
            fs.mkdirs(path);
        }
    
        /**
         * 上传本地文件到HDFS指定文件
         * @throws Exception
         */
        @Test
        public void uploadFile() throws Exception {
            Path path = new Path("/tmp/test.txt");
            if(fs.exists(path)){
                fs.delete(path,false);
            }
            FSDataOutputStream outputStream = fs.create(path);
            File file = new File("C:\\Users\\zhanghh\\Desktop\\test.txt");
            InputStream inputStream = new BufferedInputStream(new FileInputStream(file));
            IOUtils.copyBytes(inputStream,outputStream,conf,true);
        }
    
        /**
         * 下载HDFS上的文件
         * @throws Exception
         */
        @Test
        public void downFile() throws Exception {
            Path path = new Path("/tmp/test.txt");
            FSDataInputStream inputStream = fs.open(path);
            BufferedOutputStream outputStream = new BufferedOutputStream(new FileOutputStream(new File("C:\\Users\\zhanghh\\Desktop\\222.txt")));
            IOUtils.copyBytes(inputStream,outputStream,conf,true);
        }
    
        /**
         * 查看文件存放的block位置信息
         * @throws Exception
         */
        @Test
        public void blockLoction() throws Exception {
            Path path = new Path("/tmp/test.txt");
            FileStatus fss=fs.getFileStatus(path);
            BlockLocation[] locations = fs.getFileBlockLocations(fss, 0, fss.getLen());
            for (BlockLocation obj:locations) {
                System.out.println(obj);
            }
        }
    
        /**
         * 删除文件
         * @throws Exception
         */
        @Test
        public void delete() throws Exception{
            Path path = new Path("/tmp");
            if (fs.exists(path)){
                fs.delete(path,true);
            }
        }
    }
    

    mapperReduce编写一个wordCount简单计算

    package com.zhanghh.train;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    /**
     * <pre>
     * MR测试类
     * </pre>
     *
     * @author zhanghh
     * @create 2019/8/25
     */
    public class MapperReduceTest {
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration(true);
            Job job = Job.getInstance(conf);
            job.setJarByClass(MyWordCount.class); //设置程序入口
            job.setJobName("zhanghh_job");  //任务名称
    
            Path inputPath = new Path("/tmp/test.txt");
            FileInputFormat.addInputPath(job, inputPath); //输入路径
    
            Path outputPath = new Path("/tmp/output");
            if (outputPath.getFileSystem(conf).exists(outputPath)) {
                outputPath.getFileSystem(conf).delete(outputPath, true);
            }
            FileOutputFormat.setOutputPath(job, outputPath); //输出路径
    
            job.setMapperClass(MyWordCount.MyMapper.class);  //mapper类
            job.setMapOutputKeyClass(Text.class);            //mapper输出的key类型
            job.setMapOutputValueClass(IntWritable.class);   //mapper输出的value类型
            job.setReducerClass(MyWordCount.MyReducer.class);//reducer类
    
            // Submit the job, then poll for progress until the job is complete
            job.waitForCompletion(true);    //提交任务,等待job完成
        }
    }
    
    ----------------------------------------------------------------------------------------------------------
    package com.zhanghh.train;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    /**
     * <pre>
     * 单词统计
     * </pre>
     *
     * @author zhanghh
     * @create 2019/8/27
     */
    public class MyWordCount {
    
        public static class MyMapper extends Mapper<Object, Text, Text, IntWritable> {
            private IntWritable one=new IntWritable(1);
            private Text word = new Text();
            @Override
            protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
                StringTokenizer itr = new StringTokenizer(value.toString());
                while (itr.hasMoreTokens()){
                    word.set(itr.nextToken());
                    context.write(word,one);
                }
            }
        }
    
        public static class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
            private IntWritable result=new IntWritable();
            @Override
            protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
                int sum = 0;
                for (IntWritable val:values) {
                     sum +=val.get();
                }
                result.set(sum);
                context.write(key,result);
            }
        }
    }
    

    相关文章

      网友评论

        本文标题:03_HADOOP_06_HDFS和MR客户端java编程

        本文链接:https://www.haomeiwen.com/subject/dpjiectx.html