美文网首页
利用MapReduce将文件内容写入Hbase

利用MapReduce将文件内容写入Hbase

作者: 小猪Harry | 来源:发表于2018-10-07 23:49 被阅读0次
    <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>2.7.3</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-client</artifactId>
                <version>1.3.1</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-server -->
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-server</artifactId>
                <version>1.3.1</version>
            </dependency>
    
    
    package com.neuedu;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Mutation;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    
    import java.io.IOException;
    
    public class WordCountHbase {
    
        public static void main(String[] args) throws Exception {
            Configuration configuration = HBaseConfiguration.create();
            configuration.set("hbase.zookeeper.quorum","hadoop01,hadoop02,hadoop03");
            Job job = Job.getInstance(configuration);
            job.setJarByClass(WordCountHbase.class);
    
            job.setMapperClass(WordCountToBaseMap.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            //这个wordcount是在Hbase中建好的表, create 'wordcount','i'
            TableMapReduceUtil.initTableReducerJob("wordcount", WordCountToBaseReduce.class, job);
            //这是上传到hdfs上的文档
            FileInputFormat.addInputPath(job, new Path("hdfs://hadoop01:8020/liushishi.love"));
            boolean completion = job.waitForCompletion(true);
            System.out.println(completion);
        }
    
        //定义reducer对接输出到hbase
        //reduce的输入类型KEYIN, VALUEIN
        //reduce输出的key的类型KEYOUT,写入hbase中reduce的输出key并不重要,重要的是value,value的数据会被写入hbase表,key的数据不重要
        //只需要保证reduce的输出value是put类型就可以了
    
        public static class WordCountToBaseMap extends Mapper<LongWritable, Text, Text, IntWritable> {
            public static IntWritable ONE = new IntWritable(1);
            public Text outputKey = new Text();
            public String[] info;
    
            @Override
            protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
                    throws IOException, InterruptedException {
                String[] readline = value.toString().split("\\s+");
                for (String word : readline) {
                    if (!word.equals("")) {
                        outputKey.set(word);
                        context.write(outputKey, ONE);
                    }
                }
            }
    
        }
    
        public static class WordCountToBaseReduce extends TableReducer<Text, IntWritable, NullWritable> {
            public static NullWritable OUT_PUT_KEY = NullWritable.get();
            public Put outputValue;
            public int sum;
    
            @Override
            protected void reduce(Text key, Iterable<IntWritable> value,
                                  Reducer<Text, IntWritable, NullWritable, Mutation>.Context context)
                    throws IOException, InterruptedException {
                sum = 0;
                for (IntWritable intWritable : value) {
                    sum += intWritable.get();
                }
                outputValue = new Put(Bytes.toBytes(key.toString()));
                outputValue.addColumn(Bytes.toBytes("i"), Bytes.toBytes("count"), Bytes.toBytes(String.valueOf(sum)));
                context.write(OUT_PUT_KEY, outputValue);
            }
        }
    
    }
    
    

    相关文章

      网友评论

          本文标题:利用MapReduce将文件内容写入Hbase

          本文链接:https://www.haomeiwen.com/subject/tiaraftx.html