美文网首页
flink与hbase的交互

flink与hbase的交互

作者: 万州客 | 来源:发表于2022-05-12 07:51 被阅读0次

因为我是在虚拟机上用docker安装的hbase,很多映射端口和主机名不太好搞,所以读模式失败了,试一下写模式也失败了作个记录吧~

一,读代码

package org.bbk.flink

import org.apache.flink.api.common.functions.{FilterFunction, RichFilterFunction}
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.java.tuple
import org.apache.flink.configuration.Configuration
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{Cell, HBaseConfiguration, HConstants, TableName}
import org.apache.flink.addons.hbase.TableInputFormat

object Demo {
  def main(args:Array[String]):Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._

    val hbaseData:DataSet[tuple.Tuple2[String, String]] = env
      .createInput(new TableInputFormat[tuple.Tuple2[String, String]]{
        override def configure(parameters:Configuration): Unit = {
          val conf = HBaseConfiguration.create()
          conf.set(HConstants.ZOOKEEPER_QUORUM, "myhbase")
          conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")
          val conn:Connection = ConnectionFactory.createConnection(conf)
          table = classOf[HTable].cast(conn.getTable(TableName.valueOf("hbasesource")))
          scan = new Scan(){
            addFamily(Bytes.toBytes("f1"))
          }
        }
        override def getScanner: Scan = {
          scan
        }
        override def getTableName:String ={
          "hbasesource"
        }
        override def mapResultToTuple(result: Result): tuple.Tuple2[String, String] = {
          val rowkey:String = Bytes.toString(result.getRow)
          val sb = new StringBuilder()
          for (cell:Cell <- result.rawCells()) {
            val value = Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength)
            sb.append(value).append(",")
          }
          val valueString = sb.replace(sb.length()-1, sb.length(), "").toString
          val tuple2 = new org.apache.flink.api.java.tuple.Tuple2[String,String]
          tuple2.setField(rowkey, 0)
          tuple2.setField(valueString, 1)
          tuple2
        }
      })
    hbaseData.print()
    env.execute()
  }
}



二,写代码

package org.bbk.flink

import java.util
import org.apache.flink.api.common.functions.{FilterFunction, RichFilterFunction}
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.java.tuple
import org.apache.flink.configuration.Configuration
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{Cell, HBaseConfiguration, HConstants, TableName}
import org.apache.flink.addons.hbase.TableInputFormat
import org.apache.flink.api.common.io.OutputFormat

object Demo {
  def main(args:Array[String]):Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._

    val sourceDataSet:DataSet[String] = env.fromElements("01, zhangsan, 28", "02, lisi, 30")
    sourceDataSet.output(new HBaseOutputFormat)
    env.execute()
  }
}

class HBaseOutputFormat extends OutputFormat[String] {
  val zkServer = "myhbase"
  val port = "2181"
  var conn:Connection = null

  override def configure(configuration: Configuration): Unit ={

  }

  override def open(i: Int, i1: Int): Unit = {
    val config:org.apache.hadoop.conf.Configuration = HBaseConfiguration.create()
    config.set(HConstants.ZOOKEEPER_QUORUM, zkServer)
    config.set(HConstants.ZOOKEEPER_CLIENT_PORT, port)
    config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30000)
    config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30000)
    conn = ConnectionFactory.createConnection(config)
  }

  override def writeRecord(it: String): Unit = {
    val tableName: TableName = TableName.valueOf("hbasesource")
    val cf1 = "f1"
    val array: Array[String] = it.split(",")
    val put: Put = new Put(Bytes.toBytes(array(0)))
    put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("name"), Bytes.toBytes(array(1)))
    put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("age"), Bytes.toBytes(array(2)))
    val putList: util.ArrayList[Put] = new util.ArrayList[Put]
    putList.add(put)
    val params: BufferedMutatorParams = new BufferedMutatorParams(tableName)
    params.writeBufferSize(1024 * 1024)
    val mutator: BufferedMutator = conn.getBufferedMutator(params)
    mutator.mutate(putList)
    mutator.flush()
    putList.clear()
  }

  override def close(): Unit = {
    if (null != conn) {
      conn.close()
    }
  }
}



相关文章

  • flink与hbase的交互

    因为我是在虚拟机上用docker安装的hbase,很多映射端口和主机名不太好搞,所以读模式失败了,试一下写模式也失...

  • flink sink hbase

    测试环境: flink 1.7.2 hbase 1.3.1 hbase util hbase sink 定义 主程序调用

  • Flink SQL 实战:HBase 的结合应用

    本文主要介绍 HBase 和 Flink SQL 的结合使用。HBase 作为 Google 发表 Big Tab...

  • HBase基本概念

    简介 本文将介绍HBase的基本概念,各个组件的组成元素以及客户端与HBase服务端交互的过程 HBase的组成元...

  • Flink SQL 1.15.x 整合HBase 2.3.x

    解压flink的安装包后,在lib目录添加以下jar包 其中 flink-sql-connector-hbase-...

  • Flink--DataSink学习

    基于flink-1.8.1 概述 flink 流式计算中需要经计算的结果进行输出(可以是msyql/hbase/E...

  • flink 写数据到hbase

    flink 写入数据到hbase 连接数等于并行度,用法同RichFunction 可以实现查询hbase,存入h...

  • 大数据生态

    大数据生态 hadoop spark DataFrame DAG flink storm hbase hive e...

  • 学习目录

    k8s hadoop elasticsearch hbase hive storm spark flink dub...

  • 07. HBase数据存取流程解析

    客户端数据存取流程 客户端与HBase系统的写入交互阶段 用户提交put请求后,HBase客户端会将put请求添加...

网友评论

      本文标题:flink与hbase的交互

      本文链接:https://www.haomeiwen.com/subject/pnoqurtx.html