美文网首页HBASE 精选文集HBaseHadoop
用mapreduce的方式将csv格式文件格式化处理并写入HBa

用mapreduce的方式将csv格式文件格式化处理并写入HBa

作者: 牛轧糖123 | 来源:发表于2016-10-22 23:25 被阅读1259次

    将数据导入HBase的方式有很多,其中之一就是采用mapreduce来批量写入,最近所在的小组有这样的需求,大家又都还属于学习阶段,于是查阅了很多的资料(感谢http://www.cnblogs.com/dongdone/p/5689370.html的作者为我的第一次尝试提供了宝贵的经验),在这里做个记录,希望对想我一样的初学者有帮助。

    要想做好这个事情大致要分4步:
    1、在Hbase中创建一张表(我的表名叫做pn,要写入的列族的名字叫做gcf)
    在hbase shell中执行一下命令:

    *** a、create 'pn', 'gcf'***
    2、上传要导入的csv到hdfs上面
    a、在Linux命令行中执行:hadoop fs -put 2222.csv /inputDir
    3、编写mapreduce程序:
    我是用idea写的java代码,关于idea打包jar包的方式网上有很多,我就不说了,只是有一点要注意的是关于打包方式的,在yarn上运行代码的时候,yarn上是没有hbase的jar包的,所以打包jar的时候要把你工程所依赖的jar包也一并打包进生成的jar里。我打包的时候不是直接打包进去的,而是先打包一个只有class文件的jar包,然后在随便一个地方新建一个lib文件夹并将工程所依赖的jar包都放进去,然后用winrar打开生成的jar包,将lib文件夹拖进去,这样jar包就生成好。
    还有一个要注意的就是我用的maven来构建工程,这里做依赖的有:

    org.apache.hadoop
    hadoop-common
    2.7.2
    org.apache.hadoop
    hadoop-hdfs
    2.7.2
    org.apache.hadoop
    hadoop-mapreduce-client-core
    2.7.2
    org.apache.hbase
    hbase
    1.2.0
    org.apache.hbase
    hbase-client
    1.2.0
    org.apache.hbase
    hbase-common
    1.2.0
    org.apache.hbase
    hbase-server
    1.2.0
    

    这是打包过程,代码在后面:


    打包一个只有class文件的jar包

    然后在随便一个地方新建一个lib文件夹并将工程所依赖的jar包都放进去

    然后用winrar打开生成的jar包

    这是生成好的jar的内容

    我是代码:

    packagecn.com.hbase;
    importcn.com.utils.MD5;
    importorg.apache.hadoop.conf.Configuration;
    importorg.apache.hadoop.hbase.HBaseConfiguration;
    importorg.apache.hadoop.hbase.client.Put;
    importorg.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    importorg.apache.hadoop.hbase.mapreduce.TableOutputFormat;
    importorg.apache.hadoop.hbase.mapreduce.TableReducer;
    importorg.apache.hadoop.io.LongWritable;
    importorg.apache.hadoop.io.NullWritable;
    importorg.apache.hadoop.io.Text;
    importorg.apache.hadoop.mapreduce.Job;
    importorg.apache.hadoop.mapreduce.Mapper;
    importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    importjava.io.IOException;
    /**
    * Created by Asher on 2016/10/20.
    */
    public classHdfsToHBase {
    public static voidmain(String[] args)throwsException {
          Configuration conf = HBaseConfiguration.create();
    conf.set("hbase.zookeeper.quorum","master01:2181,worker01:2181,worker02:2181");//当然这些都可以作为参数传入,这里只是实验,所以写死在了代码里,实际成产过程中肯定要用参数的方式
    conf.set("hbase.rootdir","hdfs://master01:9000/hbase");
    conf.set(TableOutputFormat.OUTPUT_TABLE,"pn");
    Job job = Job.getInstance(conf,HdfsToHBase.class.getSimpleName());
    TableMapReduceUtil.addDependencyJars(job);
    job.setJarByClass(HdfsToHBase.class);
    job.setMapperClass(HdfsToHBaseMapper.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    job.setReducerClass(HdfsToHBaseReducer.class);
    FileInputFormat.setInputPaths(job,"hdfs://master01:9000/inputDir/2222.csv");
    job.setOutputFormatClass(TableOutputFormat.class);
    job.waitForCompletion(true);
    }
    public static classHdfsToHBaseMapperextendsMapper {
    privateTextoutKey=newText();
    privateTextoutValue=newText();
    @Override
    protected voidmap(LongWritable key,Text value,Context context)throwsIOException,InterruptedException {
    String[] splited = value.toString().split(",");
    outKey.set(MD5.GetMD5Code(splited[0]));//我将第一个分片的MD5值作为rowkey
    outValue.set(splited[1] +"\t"+ splited[2] +"\t"+ splited[3]);
    context.write(outKey,outValue);
    }
    }
    public static classHdfsToHBaseReducerextendsTableReducer {
    @Override
    protected voidreduce(Text k2,Iterable v2s,Context context)throwsIOException,InterruptedException {
    Put put =newPut(k2.getBytes());
    for(Text v2 : v2s) {
    String[] splited = v2.toString().split("\t");
    if(splited[0] !=null&& !"NULL".equals(splited[0])) {
    put.add("gcf".getBytes(),"name".getBytes(),splited[0].getBytes());
    }else{
    put.add("gcf".getBytes(),"name".getBytes(), null);
    }
    if(splited[1] !=null&& !"NULL".equals(splited[1])) {
    put.add("gcf".getBytes(),"sex".getBytes(),splited[1].getBytes());
    }else{
    put.add("gcf".getBytes(),"sex".getBytes(), null);
    }
    if(splited[2] !=null&& !"NULL".equals(splited[2])) {
    put.add("gcf".getBytes(),"mobile".getBytes(),splited[2].getBytes());
    }else{
    put.add("gcf".getBytes(),"mobile".getBytes(), null);
    }
    }
    context.write(NullWritable.get(),put);
    }
    }
    }
    

    4、打包并执行:打包在上面先说了,下面说说执行的过程:
    a、很简单,在Linux终端执行:hadoop jar /home/cat/hdfs2hbase.jar cn.com.hbase.HdfsToHBase
    就能正常运行了,运行的结果如下:

    111.png

    看到成功插入了数据

    本次笔记结束!

    相关文章

      网友评论

        本文标题:用mapreduce的方式将csv格式文件格式化处理并写入HBa

        本文链接:https://www.haomeiwen.com/subject/ecwhuttx.html