美文网首页
flink与hbase的交互

flink与hbase的交互

作者: 万州客 | 来源:发表于2022-05-12 07:51 被阅读0次

    因为我是在虚拟机上用docker安装的hbase,很多映射端口和主机名不太好搞,所以读模式失败了,试一下写模式也失败了作个记录吧~

    一,读代码

    package org.bbk.flink
    
    import org.apache.flink.api.common.functions.{FilterFunction, RichFilterFunction}
    import org.apache.flink.api.scala.ExecutionEnvironment
    import org.apache.flink.api.java.tuple
    import org.apache.flink.configuration.Configuration
    import org.apache.hadoop.hbase.client._
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.hadoop.hbase.{Cell, HBaseConfiguration, HConstants, TableName}
    import org.apache.flink.addons.hbase.TableInputFormat
    
    object Demo {
      def main(args:Array[String]):Unit = {
        val env = ExecutionEnvironment.getExecutionEnvironment
        import org.apache.flink.api.scala._
    
        val hbaseData:DataSet[tuple.Tuple2[String, String]] = env
          .createInput(new TableInputFormat[tuple.Tuple2[String, String]]{
            override def configure(parameters:Configuration): Unit = {
              val conf = HBaseConfiguration.create()
              conf.set(HConstants.ZOOKEEPER_QUORUM, "myhbase")
              conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")
              val conn:Connection = ConnectionFactory.createConnection(conf)
              table = classOf[HTable].cast(conn.getTable(TableName.valueOf("hbasesource")))
              scan = new Scan(){
                addFamily(Bytes.toBytes("f1"))
              }
            }
            override def getScanner: Scan = {
              scan
            }
            override def getTableName:String ={
              "hbasesource"
            }
            override def mapResultToTuple(result: Result): tuple.Tuple2[String, String] = {
              val rowkey:String = Bytes.toString(result.getRow)
              val sb = new StringBuilder()
              for (cell:Cell <- result.rawCells()) {
                val value = Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength)
                sb.append(value).append(",")
              }
              val valueString = sb.replace(sb.length()-1, sb.length(), "").toString
              val tuple2 = new org.apache.flink.api.java.tuple.Tuple2[String,String]
              tuple2.setField(rowkey, 0)
              tuple2.setField(valueString, 1)
              tuple2
            }
          })
        hbaseData.print()
        env.execute()
      }
    }
    
    
    
    

    二,写代码

    package org.bbk.flink
    
    import java.util
    import org.apache.flink.api.common.functions.{FilterFunction, RichFilterFunction}
    import org.apache.flink.api.scala.ExecutionEnvironment
    import org.apache.flink.api.java.tuple
    import org.apache.flink.configuration.Configuration
    import org.apache.hadoop.hbase.client._
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.hadoop.hbase.{Cell, HBaseConfiguration, HConstants, TableName}
    import org.apache.flink.addons.hbase.TableInputFormat
    import org.apache.flink.api.common.io.OutputFormat
    
    object Demo {
      def main(args:Array[String]):Unit = {
        val env = ExecutionEnvironment.getExecutionEnvironment
        import org.apache.flink.api.scala._
    
        val sourceDataSet:DataSet[String] = env.fromElements("01, zhangsan, 28", "02, lisi, 30")
        sourceDataSet.output(new HBaseOutputFormat)
        env.execute()
      }
    }
    
    class HBaseOutputFormat extends OutputFormat[String] {
      val zkServer = "myhbase"
      val port = "2181"
      var conn:Connection = null
    
      override def configure(configuration: Configuration): Unit ={
    
      }
    
      override def open(i: Int, i1: Int): Unit = {
        val config:org.apache.hadoop.conf.Configuration = HBaseConfiguration.create()
        config.set(HConstants.ZOOKEEPER_QUORUM, zkServer)
        config.set(HConstants.ZOOKEEPER_CLIENT_PORT, port)
        config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30000)
        config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30000)
        conn = ConnectionFactory.createConnection(config)
      }
    
      override def writeRecord(it: String): Unit = {
        val tableName: TableName = TableName.valueOf("hbasesource")
        val cf1 = "f1"
        val array: Array[String] = it.split(",")
        val put: Put = new Put(Bytes.toBytes(array(0)))
        put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("name"), Bytes.toBytes(array(1)))
        put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("age"), Bytes.toBytes(array(2)))
        val putList: util.ArrayList[Put] = new util.ArrayList[Put]
        putList.add(put)
        val params: BufferedMutatorParams = new BufferedMutatorParams(tableName)
        params.writeBufferSize(1024 * 1024)
        val mutator: BufferedMutator = conn.getBufferedMutator(params)
        mutator.mutate(putList)
        mutator.flush()
        putList.clear()
      }
    
      override def close(): Unit = {
        if (null != conn) {
          conn.close()
        }
      }
    }
    
    
    
    

    相关文章

      网友评论

          本文标题:flink与hbase的交互

          本文链接:https://www.haomeiwen.com/subject/pnoqurtx.html