美文网首页
Hbase学习笔记(六) Hbase与MR的结合使用

Hbase学习笔记(六) Hbase与MR的结合使用

作者: 做个合格的大厂程序员 | 来源:发表于2020-08-11 20:43 被阅读0次

    需求,使用MR实现读取hbase表数据,只要某一个列族的数据,并且写入到另一张表中。

    Main

    package hbaseMR;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class HbaseMain  extends Configured implements Tool{
    
        public int run(String[] args) throws Exception {
            /**
             * Mr 执行的8个步骤
             * 1, 读取文件,解析成key,value对 K1,v1
             * 2.自定义mapper逻辑,接受k1,v2,转换成,k2,v2输出
             * 3.分区,相同的key的数据发送到同一个reduce里面去
             * 4.排序,对数据按照key2进行排序
             * 5.规约,调优的步骤,在map端相同的key2进行提前合并
             * 6.分组,相同的key的数据合并
             * 7.自定义reduce逻辑,接受k2,v2转换成k3,v3
             * 8.输出k3,v3保存
             * */
    
            Scan scan = new Scan();
    
            Job job = Job.getInstance(super.getConf(), "hbase_mapreduce");
    
    
            /*
    
            TableName table,
          Scan scan,
          Class<? extends TableMapper> mapper,
          Class<?> outputKeyClass,
          Class<?> outputValueClass,
          job
            * */
            TableMapReduceUtil.initTableMapperJob("myuser",scan,HbaseReadMapper.class, Text.class, Put.class,job);
    
    
            TableMapReduceUtil.initTableReducerJob("myuser2",HbaseWriteReducer.class,job);
    
            job.setNumReduceTasks(1);
    
            boolean b = job.waitForCompletion(true);
    
            return b?0:1;
    
        }
    
    
    
        public static void main(String[] args) throws Exception {
            //创造一个hbase的配置
            Configuration configuration = HBaseConfiguration.create();
    
            configuration.set("hbase.zookeeper.quorum","hadoop01:2181,hadoop02:2181,hadoop03:2181");
    
            int run = ToolRunner.run(configuration,new HbaseMain(),args);
    
            System.exit(run);
        }
    }
    
    

    Mapper

    package hbaseMR;
    
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapper;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.Text;
    
    import java.io.IOException;
    import java.util.List;
    
    public class HbaseReadMapper extends TableMapper<Text, Put> {
        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
    
            //获取到rowkey的字节数组
            byte[] rowkeyBytes = key.get();
            String rowkey = Bytes.toString(rowkeyBytes);
    
            Put put = new Put(rowkeyBytes);
    
    
            //选中我们结果当中所有对应的列,只要f1,中的name和age列
            List<Cell> cells = value.listCells();
    
            for (Cell cell : cells){
    
                //获取类族名
                byte[] family = cell.getFamily();
    
                //获取列名
                byte[] qualifier = cell.getQualifier();
    
                if("f1".equals(Bytes.toString(family))){
                    if ("name".equals(Bytes.toString(qualifier)) || ("age".equals(Bytes.toString(qualifier)))){
    
                        //这里都是需要的cell
                        put.add(cell);
                    }
                }
            }
    
    
            //判断put对象是否为空
            if (!put.isEmpty()){
                context.write(new Text(rowkey),put);
            }
    
        }
    }
    
    

    Reducer

    package hbaseMR;
    
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.io.Text;
    
    import java.io.IOException;
    
    public class HbaseWriteReducer extends TableReducer<Text, Put, ImmutableBytesWritable> {
        @Override
        protected void reduce(Text key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
    
            ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable();
            immutableBytesWritable.set(key.toString().getBytes());
    
    
            for (Put put : values){
                context.write(immutableBytesWritable,put);
            }
        }
    }
    
    

    HDFS导入数据到Hbase和Hbase导出到HDFS中

    Main

    package Hdfs2Hbase;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class Hbase2HDFS extends Configured implements Tool {
    
    
        public int run(String[] args) throws Exception {
    
            Job job = Job.getInstance(super.getConf(),"hdfs2Hbase");
    
            job.setInputFormatClass(TextInputFormat.class);
    
            TextInputFormat.addInputPath(job,new Path("hdfs://hadoop01:9000/hbase/input"));
    
            job.setMapperClass(HDFSMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(NullWritable.class);
    
            TableMapReduceUtil.initTableReducerJob("myuser2",HbaseReducer.class,job);
    
            job.setNumReduceTasks(1);
    
            boolean bl = job.waitForCompletion(true);
    
            return bl?0:1;
        }
    
        public static void main(String[] args) throws Exception {
    
            Configuration configuration = HBaseConfiguration.create();
            configuration.set("hbase.zookeeper.quorum","hadoop01:2181,hadoop02:2181,hadoop03:2181");
            int run = ToolRunner.run(configuration,new Hbase2HDFS(),args);
            System.exit(run);
        }
    }
    
    

    Mapper

    package Hdfs2Hbase;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class HDFSMapper extends Mapper<LongWritable,Text,Text,NullWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
            context.write(value,NullWritable.get());
        }
    }
    
    

    Reducer

    package Hdfs2Hbase;
    
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableReducer;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    
    import java.io.IOException;
    
    public class HbaseReducer extends TableReducer<Text, NullWritable, ImmutableBytesWritable> {
        @Override
        protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
    
            String[] split = key.toString().split("\t");
    
            //rowkey
            Put put =new Put(Bytes.toBytes(split[0]));
    
            put.addColumn("f1".getBytes(),"name".getBytes(),split[1].getBytes());
    
            put.addColumn("f1".getBytes(),"age".getBytes(),split[2].getBytes());
    
            context.write(new ImmutableBytesWritable(Bytes.toBytes(split[0])),put);
        }
    }
    
    

    用Bulkload的方式直接输出Hfile供Hbase导入

    Main

    package bulkLoad;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.Connection;
    import org.apache.hadoop.hbase.client.ConnectionFactory;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Table;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    
    public class BulkLoadHbase extends Configured implements Tool {
    
        @Override
        public int run(String[] args) throws Exception {
            final String INPUT_PATH= "hdfs://hadoop01:9000/hbase/input";
            final String OUTPUT_PATH= "hdfs://hadoop01:9000/hbase/output_hfile";
            Configuration conf = HBaseConfiguration.create();
            Connection connection = ConnectionFactory.createConnection(conf);
            Table table = connection.getTable(TableName.valueOf("myuser2"));
            Job job= Job.getInstance(conf);
            job.setJarByClass(BulkLoadHbase.class);
            job.setMapperClass(BulkLoadMapper.class);
            job.setMapOutputKeyClass(ImmutableBytesWritable.class);
            job.setMapOutputValueClass(Put.class);
            job.setOutputFormatClass(HFileOutputFormat2.class);
            HFileOutputFormat2.configureIncrementalLoad(job,table,connection.getRegionLocator(TableName.valueOf("myuser2")));
            FileInputFormat.addInputPath(job,new Path(INPUT_PATH));
            FileOutputFormat.setOutputPath(job,new Path(OUTPUT_PATH));
            boolean b = job.waitForCompletion(true);
            return b?0:1;
        }
    
        public static void main(String[] args) throws Exception {
            Configuration configuration = HBaseConfiguration.create();
            int run = ToolRunner.run(configuration, new BulkLoadHbase(), args);
            System.exit(run);
        }
    }
    
    

    Mapper

    package bulkLoad;
    
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class BulkLoadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] split = value.toString().split("\t");
            Put put = new Put(Bytes.toBytes(split[0]));
            put.addColumn("f1".getBytes(),"name".getBytes(),split[1].getBytes());
            put.addColumn("f1".getBytes(),"age".getBytes(),Bytes.toBytes(Integer.parseInt(split[2])));
    
            //split[0] rowkey
            context.write(new ImmutableBytesWritable(Bytes.toBytes(split[0])),put);
        }
    }
    

    相关文章

      网友评论

          本文标题:Hbase学习笔记(六) Hbase与MR的结合使用

          本文链接:https://www.haomeiwen.com/subject/uxxudktx.html