美文网首页Hase我爱编程
编写MapReduce程序,集成HBase对表进行读取和写入数据

编写MapReduce程序,集成HBase对表进行读取和写入数据

作者: 明明德撩码 | 来源:发表于2018-05-07 23:08 被阅读382次

    参考地址:http://hbase.apache.org/boot.html#mapreduce

    导入:import org.apache.hadoop.conf.Configured;
    导入:import org.apache.hadoop.util.Tool;。


    三要素

    创建Mapper Class
    创建Reducer Class
    创建Driver

    创建Mapper Class

    在map方法中,代码思路步骤如下:
    • 获取rowkey:首先我们要得到rowkey来生成Put对象,通过String rowkey = (Bytes.toString(key.get());key是一个形参,类型是ImmutableBytesWritable, 当 集成时读HBase这张表的时候,得到的key就是rowkey。
    • 创建一个Put对象:Put put = new Put(key.get());然后用Cell对象来迭代Result对象的value.rawCells()来获取值,使用put.add(cell)组装好Put对象。判断列族是否相同,若相同判断name是否存在,判断age是否存在,最后使用put。
    • 最后使用context.write(mapOutPutKey,put )。
    class内代码片段
    //读取user表中的数据  ImmutableBytesWritable:key   Put:一列数据
        public static class ReadUserMapper extends TableMapper<ImmutableBytesWritable, Put> {
    
            @Override
            protected void map(ImmutableBytesWritable row, Result value,
                    Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Put>.Context context)
                            throws IOException, InterruptedException {
                // TODO Auto-generated method stub
                context.write(row, resultToPut(row, value));
            }
            //和命令:put 'user','10001','info:address','shanghai'  相同
            private static Put resultToPut(ImmutableBytesWritable key, Result result) throws IOException {
                Put put = new Put(key.get());
                for (KeyValue kv : result.raw()) {
                    put.add(kv);
                }
                return put;
            }
        }
    

    创建Reducer Class

    • 迭代Iterable<Put>的值
    • 使用context.write(null,put)进行写
    • class内部代码段如下:
    public static class WriteBasicReducer extends TableReducer<ImmutableBytesWritable, Put,ImmutableBytesWritable>{
    
            @Override
            protected void reduce(ImmutableBytesWritable key, Iterable<Put> values,
                    Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, Mutation>.Context context)
                            throws IOException, InterruptedException {
                // TODO Auto-generated method stub
                for(Put put:values){
                    context.write(key, put);
                }
            }
            
        }
    

    创建Driver

    • 导入包
      import org.apache.hadoop.hbase.client.Scan;
      import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    • 创建一个Job
    Job job = Job.getInstance(getConf(), this.getClass().getName());
    
    • 设置Job运行的class
    job.setJarByClass(this.getClass());
    
    • 设置Job
    • 创建Scan对象
    Scan scan = new Scan();
            scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
            scan.setCacheBlocks(false);  // don't set to true for MR jobs
    

    Scan表示全表扫描,setCaching方法表示一次抓取多少条数据,而setCacheBlock 方法表示是否设置缓存,mapReduce千万不设置缓存,所以设置为false。

    • 设置input和mapper
      利用TableMapReduceUtil.initTableMapperJob()方法设置input对象和设置映射关系。
    TableMapReduceUtil.initTableMapperJob(
                      "user",        // input HBase table name
                      scan,             // Scan instance to control CF and attribute selection
                      ReadUserMapper.class,   // mapper
                      Text.class,             // mapper output key
                      Put.class,             // mapper output value
                      job);
    
    • 设置reducer和output,利用TableMapReduceUtil的initTableReducerJob( )方法来设置输出对象和相应的类等信息。
        TableMapReduceUtil.initTableReducerJob(
                      "basic",      // output table
                      WriteBasicReducer.class,             // reducer class
                      job);
    

    +设置Reduce的任务为1个

    job.setNumReduceTasks(0);
    
    • 提交job
    boolean b = job.waitForCompletion(true);
            
    
    • run()方法中的代码示例如下:
    public int run(String[] args) throws Exception {
            // TODO Auto-generated method stub
            Job job = Job.getInstance(getConf(), this.getClass().getName());
            job.setJarByClass(this.getClass());
            
            Scan scan = new Scan();
            scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
            scan.setCacheBlocks(false);  // don't set to true for MR jobs
            
            
            TableMapReduceUtil.initTableMapperJob(
                      "user",        // input HBase table name
                      scan,             // Scan instance to control CF and attribute selection
                      ReadUserMapper.class,   // mapper
                      Text.class,             // mapper output key
                      Put.class,             // mapper output value
                      job);
            
            
            TableMapReduceUtil.initTableReducerJob(
                      "basic",      // output table
                      WriteBasicReducer.class,             // reducer class
                      job);
            
            job.setNumReduceTasks(0);
            
            boolean b = job.waitForCompletion(true);
            
            return b?0:1;
        }
    

    写main方法

    • 得到Configuration对象
    Configuration configuration = HBaseConfiguration.create();
    
    • 运行Job
            int status = ToolRunner.run(configuration, new User2BasicMapReduce(), args); 
    
    • 程序结束代码
    System.exit(status);
    
    • 主函数代码如下:
    public static void main(String[] args) throws Exception {
            Configuration configuration = HBaseConfiguration.create();
            int status = ToolRunner.run(configuration, new User2BasicMapReduce(), args);
            System.exit(status);
        }
    

    验证

    • 导出jar包



    • 导出上传到linux系统目录下


    • 然后导出jar包,运行该jar包,上传到FileZilla里面然后输入以下命令,验证mapreduce程序的正确性。
    export HBASE_HOME=/opt/sofewares/hbase/hbase-0.98.6-hadoop2
    export HADOOP_HOME=/opt/cdh5.3.6/hadoop-2.5.0-cdh5.3.6
    HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`  $HADOOP_HOME/bin/yarn jar  $HADOOP_HOME/jars/hbase-mr-user2basic.jar
    
    • 正确后,测试basic表格中的数据,得到以下结果,证明以上操作是正确的。


      image.png

    相关文章

      网友评论

        本文标题:编写MapReduce程序,集成HBase对表进行读取和写入数据

        本文链接:https://www.haomeiwen.com/subject/ozybrftx.html