美文网首页
spark bulk load hbase 推数

spark bulk load hbase 推数

作者: 认真的等待觉醒 | 来源:发表于2018-07-12 21:38 被阅读143次

    spark环境下使用bluk load方式推hbase

    背景介绍

    Hbase作为一种常用的数据存储工具,对应解决大数据实时查询问题,有良好的解决方案,但是在使用用过程中发现在短时间内推入大规模数据存在性能瓶颈,使用API调研方式并不利于周期性(如每天)推入大规模数据的使用。因此调研了使用bulk load的方式完成大规模数据写入。

    基本数据量

    每天推入数据约在几十亿条的规模,每个数据拥有8个列。最后结果验证了此规模数据使用bulk load方式推入约使用2个小时左右

    bulk load优势说明

    写入hbase的方式有以下几种:

    1. 调用Hbase API,使用Table.put方式单条写入
    2. MapReduce方式,使用TableOutputFormat作为输出
    3. Bulk Load方式,先将要推入的数据按照格式持久化成HFile文件,然后使用Hbase对该文件进行load

    在spark环境下使用bulk load方式写入hbase有以下几点优势:

    1. spark环境能给无缝的与hive、hdfs等工具结合使用,方便数据加工与HFile文件的格式化
    2. spark是集群环境可根据数据量的规模自行扩展资源,加快文件格式化
    3. bulk load方式避免了程序频繁读写API,没有接口频繁读写的消耗。减轻Hbase侧的服务压力

    基本步骤

    image.png
    1. 从hive读取数据
    2. 利用spark集群对数据进行加工,并定义rowKey,并对数据进行排序
    3. 对数据排序后,利用ImmutableBytesWritable组长数据
    4. 利用HFileOutputFormat2将数据以HFile文件的方式写到HDFS目录上
    5. 利用LoadIncrementalHFiles工具将HFile 加载到hbase上

    代码实现

            
            // 创建sparkSession
            val builder = SparkSession.builder()
            val spark = builder.appName(s"${this.getClass.getSimpleName}")
                .enableHiveSupport()
                .getOrCreate()
            
            
            // 要写入文件的路径
            val orcFilePath = args(0)
            // 准备数据,将表中数据转换成hbase可以使用数据
            val dataRdd = spark.read.format("orc").load(args(0))
            
            // columnFamily
            val columnFamily = args(1)
            val rdd = dataRdd.rdd.sortBy(line => line.toString()).map(line => {
                val rowkey = line.toString()
                val kv : KeyValue = new KeyValue(Bytes.toBytes(rowkey), columnFamily.getBytes(), "column1".getBytes(), rowkey.getBytes() )
                (new ImmutableBytesWritable(Bytes.toBytes(rowkey)), kv)
            })
            
            // 设置hbase配置
            val tableName = args(2)  // tableName
            val erp = args(3)
            val instanceName = args(4)
            val assessKey = args(5)
            val hbaseConfig = HBaseConfiguration.create
            hbaseConfig.set("bdp.hbase.erp", erp)
            hbaseConfig.set("bdp.hbase.instance.name", instanceName)
            hbaseConfig.set("bdp.hbase.accesskey", assessKey)
            
            //
            val job = Job.getInstance(hbaseConfig)
            job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
            job.setMapOutputValueClass(classOf[KeyValue])
            
    
            
            // 获取hbase连接
            val connection = ConnectionFactory.createConnection(hbaseConfig)
            val admin = connection.getAdmin
            val hbTableName =TableName.valueOf(tableName)
            // 获取指定表
            val realTable = connection.getTable(hbTableName)
            
            // 获取hbase的region
            val regionLocator = connection.getRegionLocator(hbTableName)
            
            // 配置HFile输出
            HFileOutputFormat2.configureIncrementalLoad (job, realTable, regionLocator)
            
            // 将数据保存成hfile 到指定目录
            val timeLong = System.currentTimeMillis()
            val hFilePath = s"hdfs://172.21.2.110/user/mart/dev.db/han/hbase/$timeLong"
            val networkPath = s"hdfs://172.21.2.110/user/mart/dev.db/han/hbase/$timeLong"
            
            try {
                rdd.saveAsNewAPIHadoopFile(hFilePath, classOf[ImmutableBytesWritable],
                    classOf[KeyValue], classOf[HFileOutputFormat2], job.getConfiguration)
                
                val bulkloader = new LoadIncrementalHFiles(hbaseConfig)
                
                bulkloader.doBulkLoad(new Path(networkPath), admin, realTable, regionLocator)
                
                val hdfs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
                if (hdfs.exists(new Path(hFilePath))) {
                    hdfs.delete(new Path(hFilePath), true)
                }
            } catch {
                case e: Exception =>{
                    val error = e.getCause
                    println(s"写hbase失败 $error")
                }
            } finally {
                val hdfs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
                if (hdfs.exists(new Path(hFilePath))) {
                    hdfs.delete(new Path(hFilePath), true)
                }
            }
        }
    
    

    相关文章

      网友评论

          本文标题:spark bulk load hbase 推数

          本文链接:https://www.haomeiwen.com/subject/dxcdpftx.html