美文网首页
Hbase - 表导出CSV数据

Hbase - 表导出CSV数据

作者: kikiki5 | 来源:发表于2019-06-28 19:32 被阅读28次

    新鲜文章,昨天刚经过线上验证过的,使用它导出了3亿的用户数据出来,花了半个小时,性能还是稳稳的,好了不吹牛皮了,直接上代码吧。

    MR

    考查了Hbase的各种MR,没有发现哪一个是能实现的,如果有请通知我,我给他发红包。
    所以我们只能自己来写一个MR了,编写一个Hbase的MR,官方文档上也有相应的例子。
    我们用来加以化妆就得到我们想要的了。

    导出的CSV格式为

    admin,22,北京
    admin,23,天津
    

    依赖 hbase-mapreduce

    撸scala代码了

    定义Map转换类

    class MyMapper extends TableMapper[Text, Text] {
    
      val keyText = new Text()
      val valueText = new Text()
    
      override def map(key: ImmutableBytesWritable, value: Result, context: Mapper[ImmutableBytesWritable, Result, Text, Text]#Context): Unit = {
        val maps = result2Map(value)
        keyText.set(maps.get("userId"))
        valueText.set(s"${maps.get("regTime")}")
        context.write(keyText, valueText)
      }
    
      //将Result转换为Map
      def result2Map(result: Result): util.HashMap[lang.String, lang.String] = {
        val map = new util.HashMap[lang.String, lang.String]()
        result.rawCells().foreach {
          cell =>
            val column: Array[Byte] = CellUtil.cloneQualifier(cell)
            val value: Array[Byte] = CellUtil.cloneValue(cell)
            val qualifierByte = cell.getQualifierArray
            if (qualifierByte != null && qualifierByte.nonEmpty) {
              if (value == null || value.length == 0) {
                map.put(Bytes.toString(column), "")
              } else {
                map.put(Bytes.toString(column), Bytes.toString(value))
              }
            }
        }
        map
      }
    
    }
    

    定义Reducer类

    class MyReducer extends Reducer[Text, Text, Text, Text] {
      override def reduce(key: Text, values: lang.Iterable[Text], context: Reducer[Text, Text, Text, Text]#Context): Unit = {
        val iter = values.iterator()
        while (iter.hasNext) {
         //这样可以只保留下Key字段,也就只有一行数据了
          val tmpText = iter.next()
          val mergeKey = new Text()
          mergeKey.set(key.toString + "," + tmpText.toString)
          val v = new Text()
          v.set("")
          context.write(mergeKey, v)
        }
      }
    }
    

    ExportCsv核心

    class ExportCsv extends Configured with Tool {
    
      override def run(args: Array[String]): Int = {
        val conf = HBaseConfiguration.create()
        conf.addResource(new FileInputStream(new File("/etc/hbase/conf/hbase-site.xml")))
        conf.set(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR, "/tmp/hbasecsv")
        conf.set("mapreduce.job.running.map.limit", "8") //最多有多少个Task同时跑
    
        val job = Job.getInstance(conf, "HbaseExportCsv")
        job.setJarByClass(classOf[ExportCsv])
    
        val scan = new Scan()
    
        //过滤我们想要的数据
        scan.addFamily(Bytes.toBytes("ext"))
        scan.addColumn(Bytes.toBytes("ext"), Bytes.toBytes("userId"))
        scan.addColumn(Bytes.toBytes("ext"), Bytes.toBytes("regTime"))
    
        scan.setBatch(1000)
        scan.setCacheBlocks(false)
    
        TableMapReduceUtil.initTableMapperJob(
          "USER_TABLE",
          scan,
          classOf[MyMapper],
          classOf[Text],
          classOf[Text],
          job
        )
        job.setReducerClass(classOf[MyReducer])
        val jobConf = new JobConf(job.getConfiguration)
        FileOutputFormat.setOutputPath(jobConf, new Path("/tmp/hbasecsv"))
        val isDone = job.waitForCompletion(true)
        if (isDone) 0 else 1
      }
    }
    

    要跑了任务了

    hadoop jar ExportCsv.jar
    

    相关文章

      网友评论

        本文标题:Hbase - 表导出CSV数据

        本文链接:https://www.haomeiwen.com/subject/pszecctx.html