美文网首页邵红晓
flink sink hbase

flink sink hbase

作者: 邵红晓 | 来源:发表于2019-08-02 15:42 被阅读0次

    测试环境:

    • flink 1.7.2
    • hbase 1.3.1

    hbase util

    package com.xxx.etl
    
    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory}
    import org.apache.hadoop.security.UserGroupInformation
    
    class HbaseUtil(getConnect:()=> Connection) extends Serializable {
      lazy val  connect = getConnect()
    }
    
    object HbaseUtil {
      val conf: Configuration = HBaseConfiguration.create
      conf.set("hbase.zookeeper.quorum", "pg-hadoop-xxx.xxxx.com.cn,pg-hadoop-xxx.xxx.com.cn,pg-hadoop-xxx.xxx.com.cn")
      conf.set("zookeeper.znode.parent", "/hbase-unsecure")
      conf.set("hbase.zookeeper.property.clientPort", "2181")
      conf.setLong("hbase.rpc.timeout", 3000000L)
      conf.setInt("hbase.client.ipc.pool.size", 1)
      // No FileSystem for schema : hdfs
      conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem")
    
      def apply(): HbaseUtil = {
        val f = ()=>{
          UserGroupInformation.setConfiguration(conf)
          val romoteUser = UserGroupInformation.createRemoteUser("hbase")
          UserGroupInformation.setLoginUser(romoteUser)
          val connection = ConnectionFactory.createConnection(conf)
          //释放资源 在executor的JVM关闭之前,千万不要忘记
          sys.addShutdownHook {
            connection.close()
          }
          connection
        }
        new HbaseUtil(f)
      }
    }
    
    

    hbase sink 定义

    import org.apache.flink.api.common.io.RichOutputFormat
    import org.apache.flink.configuration.Configuration
    import org.apache.hadoop.hbase.TableName
    import org.apache.hadoop.hbase.client.{Put, Table}
    import org.apache.hadoop.hbase.util.Bytes
    import org.slf4j.{Logger, LoggerFactory}
    
    class HBaseRichOutputFormat extends RichOutputFormat[Array[(String,Int)]]{
      val logger: Logger = LoggerFactory.getLogger(getClass)
      var table: Table = _
    
      override def configure(parameters: Configuration) :Unit= {
        logger.info("configure open")
      }
      override def open(taskNumber: Int, numTasks: Int): Unit = {
        table = HbaseUtil().connect.getTable(TableName.valueOf("test"))
      }
      override def writeRecord(record: Array[(String,Int)]): Unit ={
        import scala.collection.JavaConverters._
        //批量写入数据
        val list = record.map(d=>{
          val put = new Put(Bytes.toBytes(d._1))
          put.addColumn(Bytes.toBytes("f"),Bytes.toBytes("name"),Bytes.toBytes(d._2))
          put
        }).toList
        table.put(list.asJava)
      }
      override def close()  :Unit= {
        // 结束的时候记得关闭连接(其实永远不会结束)
        table.close()
      }
    }
    
    

    主程序调用

    import scala.collection.mutable.ArrayBuffer
    
    /**
      * hbase flink sink
      */
    object HbaseSinkExample {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(2)
        import org.apache.flink.api.scala._
        val list = ArrayBuffer[String]()
        list.append("shx_2")
        list.append("shx_3")
        list.append("shx_4")
        list.append("shx_5")
        list.append("shx_6")
        env.fromCollection(list).map((_, 1))
          .keyBy(_._1).countWindow(1L)
            .process(new ProcessWindowFunction[(String,Int),Array[(String,Int)],String,GlobalWindow] {
              override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[Array[(String,Int)]]): Unit ={
                println(elements.toArray)
                out.collect(elements.toArray)
              }
            }).writeUsingOutputFormat(new HBaseRichOutputFormat())
    
        env.execute("Streaming WordCount")
      }
    }
    

    相关文章

      网友评论

        本文标题:flink sink hbase

        本文链接:https://www.haomeiwen.com/subject/zpbldctx.html