美文网首页
HDFS中的Java API的使用

HDFS中的Java API的使用

作者: lehuai | 来源:发表于2018-01-10 14:34 被阅读0次

    上传文件

    PutFile.java

    import java.io.IOException;
    import java.net.URI;
    import java.net.URISyntaxException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    public class PutFile {
    
        public static void main(String[] args) throws IOException,URISyntaxException {
            Configuration conf = new Configuration();
            URI uri = new URI("hdfs://192.168.56.31:9000");
            FileSystem fs = FileSystem.get(uri,conf);
            //本地文件
            Path src = new Path("D:\\scala\\文档\\63\\access.txt");
            //HDFS存放位置
            Path dst = new Path("/");
            fs.copyFromLocalFile(src, dst);
            System.out.println("Upload to " + conf.get("fs.defaultFS"));
            // 以下相当于执行hdfs dfs -ls /
            FileStatus files[] = fs.listStatus(dst);
            
            for (FileStatus file:files) {
                System.out.println(file.getPath());
            }
            
            
    
        }
    
    }
    
    

    创建文件

    CreateFile.java

    
    import java.net.URI;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    public class CreateFile {
    
        public static void main(String[] args) throws Exception {
            FileSystem fs = FileSystem.get(new URI("hdfs://192.168.56.31:9000"),new Configuration());
            // 定义新文件
            Path dfs = new Path("/hdfsfile");
            // 创建新文件,如果有则覆盖(true)
            FSDataOutputStream create = fs.create(dfs,true);
            
            create.writeBytes("Hello,HDFS !");
    
        }
    
    }
    
    

    查看文件详细信息

    FileLocation.java

    import java.net.URI;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.BlockLocation;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    public class FileLocation {
    
        public static void main(String[] args) throws Exception {
            FileSystem fs = FileSystem.get(new URI("hdfs://192.168.56.31:9000"),new Configuration());
            Path fpath = new Path("/access.txt");
            FileStatus filestatus = fs.getFileStatus(fpath);
            /*
             * 获取文件在HDFS集群位置:
             * FileSystem.getFileBlockLocation(FileStatus file,long start, long len)"
             * 可查找指定文件在HDFS集群上的位置,其中file为文件的完整路径,start和len来标识查找文件的路径
             */
            BlockLocation[]blkLocations = fs.getFileBlockLocations(filestatus, 0, filestatus.getLen());
            filestatus.getAccessTime();
                for(int i=0;i<blkLocations.length;i++) {
                    String[] hosts = blkLocations[i].getHosts();
                    System.out.println("block_"+i+"_location:"+hosts[0]);
                }
            // 格式化日期输出
            SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            // 获取文件访问时间,返回long
            long accessTime = filestatus.getAccessTime();
            System.out.println("access:"+formatter.format(new Date(accessTime)));
            // 获取文件修改时间,返回long
            long modificationTime = filestatus.getModificationTime();
            System.out.println("modification:"+formatter.format(new Date(modificationTime)));
            // 获取块大小,单位B
            long blockSize = filestatus.getBlockSize();
            System.out.println("blockSize:"+blockSize);
            // 获取文件大小,单位B
            long len = filestatus.getLen();
            System.out.println("length:"+len);
            // 获取文件所在用户组
            String group = filestatus.getGroup();
            System.out.println("group:"+group);
            // 获取文件拥有者
            String owner = filestatus.getOwner();
            System.out.println("owner:"+owner);
            // 获取文件拷贝数
            short replication = filestatus.getReplication();
            System.out.println("replication:"+replication);
        }
    
    }
    
    

    下载文件

    GetFile.java

    import java.net.URI;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    public class GetFile {
    
        public static void main(String[] args) throws Exception {
            FileSystem fs = FileSystem.get(new URI("hdfs://192.168.56.31:9000"),new Configuration());
            //hdfs上文件
            Path src = new Path("/access.txt");
            // 下载到本地的文件名
            Path dst = new Path("D:\\scala\\文档\\63\\newfile.txt");
            fs.copyToLocalFile(src, dst);
    
        }
    
    }
    
    

    RPC通信

    反射机制

    Student.java

    interface people{
        public void study();
    }
    public class Student implements people {
        private String name; //名字;
        private int age;
        //构造方法1;
        public Student() {}
        // 构造方法2;
        public Student(String name,int age) {
            this.name = name;
            this.age = age;
        }
        //set和get方法;
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        public int getAge() {
            return age;
        }
        public void setAge(int age) {
            this.age = age;
        }
        
        public void study() {
            System.out.println("正在学习");
        }
        // 程序的主方法;
        public static void main(String[] args) {
        // 
        Class<? extends Student> tmp=Student.class;
        String cName = tmp.getName();
        System.out.println("类的名字是"+cName);
        try {
        // 动态加载指定类名
            Class c = Class.forName(cName);
            //得到类中的方法;
            java.lang.reflect.Method[] ms = c.getMethods();
            for(java.lang.reflect.Method m:ms) {
                System.out.println("方法的名字是"+m.getName());
                System.out.println("方法的返回值类型是"+m.getReturnType().toString());
                System.out.println("方法的参数类型是"+m.getParameterTypes());
            }
            //得到属性
            java.lang.reflect.Field[] fields = c.getFields();
            for(java.lang.reflect.Field f:fields) {
                System.out.println("参数类型是"+f.getType());
            }
            // 得到父接口
            Class[] is = c.getInterfaces();
            for(Class s:is) {
                System.out.println("父接口的名字是"+s.getName());
            }
            // 判断是否是数组
            System.out.println("数组:"+c.isArray());
            String CLName = c.getClassLoader().getClass().getName();
            System.out.println("类加载器:"+CLName);
            // 实例化构造器
            java.lang.reflect.Constructor cons = c.getConstructor(String.class,int.class);
            Student stu = (Student) cons.newInstance("hadoop",23);
            System.out.println(stu.getName()+":"+stu.getAge());
        }catch (Exception e) {
            e.printStackTrace();
        }
    
        }
    }
    
    

    MapReduce实现技术

    WordMapper.java

    package wordcount;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    // 创建一个WordMapper类继承于Mapper抽象类
    public class WordMapper extends Mapper<Object, Text, Text, IntWritable>{
        
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        //Mapper抽象类的核心方法,三个参数
        public void map( Object key,  //首字符偏移量
                        Text value,   //文件的一行内容
                        Context context)  //Mapper端的上下文,与outputCollector和 Reporter的功能类似
                    throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    
    }
    
    

    WordReduce.java

    package wordcount;
    
    
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    // 创建一个WordReducer类继承于Reducer抽象类
    public class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        
        private IntWritable result = new IntWritable();   //记录词频
        //Reducer 抽象类的核心方法,3个参数
        public void reduce( Text key,   //Map端输出的key值
                Iterable<IntWritable> values,  // Map端输出的Value集合
                Context context)  
                throws IOException,InterruptedException {
            int sum = 0;
            for (IntWritable val : values)  //遍历values集合,并把值相加
            {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
        
    }
    
    

    WordMain.java

    package wordcount;
    
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    public class WordMain {
    
        public static void main(String[] args) throws Exception {
            //Configuration类:读取Hadoop的配置文件,如core-site.xml...;
            // 也可用set方法重新设置(会覆盖):conf.set("fs.default.name",//"hdfs://xxxx:9000")
            Configuration conf = new Configuration();
            
            // 将命令行中参数自动设置到变量conf中
            String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
            //      conf.set("fs.defaultFS", "hdfs://192.168.56.31:9000");
    //      conf.set("hadoop.job.user", "root");
    //      conf.set("mapreduce.framework.name", "yarn");
    //      conf.set("mapreduce.jobtracker.address", "192.168.56.31:9001");
    //      conf.set("yarn.resourcemanager.hostname", "192.168.56.31");
    //      conf.set("yarn.resourcemanager.admin.address", "192.168.56.31:8033");
    //      conf.set("yarn.resourcemanager.address", "192.168.56.31:80312");
    //      conf.set("yarn.resourcemanager.resource-tracker.address", "192.168.56.31:8031");
    //      conf.set("yarn.resourcemanager.scheduler.address", "192.168.56.31:8030");
    
            if(otherArgs.length != 2)
            {
                System.err.println("Usage: wordcount <in><out>");
                System.exit(2);
            }
            
            Job job = new Job(conf, "word count");  // 新建一个job,传入配置信息
            job.setJarByClass(WordMain.class);  //设置主类
            job.setMapperClass(WordMapper.class);  //设置Mapper类
            job.setCombinerClass(WordReducer.class);  //设置作业合成类
            job.setReducerClass(WordReducer.class); //设置Reducer类
            job.setOutputKeyClass(Text.class);  //设置输出数据的关键类
            job.setOutputValueClass(IntWritable.class);    //设置输出值类
            FileInputFormat.addInputPath(job, new Path(otherArgs[0]));  //文件输入
            FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));  // 文件输出
            System.exit(job.waitForCompletion(true) ? 0 : 1);  // 等待完成退出
        }
    
    }
    
    

    打包上传

    hdfs dfs -mkdir /user/hadoop
    hdfs dfs -mkdir /user/hadoop/input
    hdfs dfs -put file* /user/hadoop/input
    hdfs dfs -ls /user/hadoop/input
    hadoop jar wordcount.jar wordcount.WordMain /user/hadoop/input/file* /user/hadoop/output
    hdfs dfs -ls /user/hadoop/output
    hdfs dfs -text /user/hadoop/output/part-r-00000
    
    hdfs://192.168.56.31:9000/user/hadoop/input
    hdfs://192.168.56.31:9000/user/hadoop/output2
    
    image.png

    WordCount2.java

    package wordcount2;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    public class WordCount2 {
        
        public static class TokenizerMapper extends Mapper<Object, Text, Text,IntWritable> {
            private final static IntWritable one = new IntWritable(1);
            private Text word = new Text();
            public void Map(Object key,Text value, Context context) throws IOException,InterruptedException {
                StringTokenizer itr = new StringTokenizer(value.toString());
                while (itr.hasMoreTokens())
                {
                    word.set(itr.nextToken());
                    context.write(word, one);
                }
            }
        }
        
        public static class IntSumReducer extends Reducer<Text,IntWritable,Text, IntWritable>
        {
            private IntWritable result = new IntWritable();
            public void reduce(Text key,Iterable<IntWritable>value, Context context) throws IOException, InterruptedException
            {
                int sum = 0;
                for (IntWritable val: value)
                {
                    sum += val.get();
                }
                result.set(sum);
                context.write(key, result);
            }
        }
        public static void main(String[] args) throws Exception{
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://192.168.56.31:9000");
            conf.set("hadoop.job.user", "root");
            conf.set("mapreduce.framework.name", "yarn");
            conf.set("mapreduce.jobtracker.address", "192.168.56.31:9001");
            conf.set("yarn.resourcemanager.hostname", "192.168.56.31");
            conf.set("yarn.resourcemanager.admin.address", "192.168.56.31:8033");
            conf.set("yarn.resourcemanager.address", "192.168.56.31:80312");
            conf.set("yarn.resourcemanager.resource-tracker.address", "192.168.56.31:8031");
            conf.set("yarn.resourcemanager.scheduler.address", "192.168.56.31:8030");
            
            String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
            if (otherArgs.length != 2)
            {
                System.err.println("Usage:wordcount <in><out>");
                System.exit(2);
            }
            
            Job job = new Job(conf, "word count2");  // 新建一个job,传入配置信息
            job.setJarByClass(WordCount2.class);  //设置主类
            job.setMapperClass(TokenizerMapper.class);  //设置Mapper类
            job.setCombinerClass(IntSumReducer.class);  //设置作业合成类
            job.setReducerClass(IntSumReducer.class); //设置Reducer类
            job.setOutputKeyClass(Text.class);  //设置输出数据的关键类
            job.setOutputValueClass(IntWritable.class);    //设置输出值类
            FileInputFormat.addInputPath(job, new Path(otherArgs[0]));  //文件输入
            FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));  // 文件输出
            boolean flag = job.waitForCompletion(true);
            System.out.println("SUCCEED !"+flag);  //任务完成提示
            System.exit(flag ? 0 : 1);
            System.out.println();
        }
    
    }
    
    

    相关文章

      网友评论

          本文标题:HDFS中的Java API的使用

          本文链接:https://www.haomeiwen.com/subject/rplgnxtx.html