美文网首页
flink 写数据到hbase

flink 写数据到hbase

作者: 邵红晓 | 来源:发表于2019-07-31 20:06 被阅读0次

flink 写入数据到hbase

  • 连接数等于并行度,用法同RichFunction
  • 可以实现查询hbase,存入hbase
    欢迎批评指正
    hbase 依赖
 <dependency>
            <groupId>org.apache.hbase</groupId>
            <!--shaded主要是解决jar包冲突-->
            <artifactId>hbase-shaded-client</artifactId>
            <version>1.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.3.1</version>
        </dependency>
 def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._
    val list = ArrayBuffer[String]()
    list.append("my-key-1")
    val text = env.fromCollection(list)
    text.map((_, 1)).process(new ProcessFunction[(String, Int), String] {
      var table: Table = _

      override def open(parameters: Configuration): Unit = {
        table = HbaseUtil().connect.getTable(TableName.valueOf("test"))
      }

      override def processElement(value: (String, Int), ctx: ProcessFunction[(String, Int), String]#Context, out: Collector[String]): Unit = {
        //读取
        val get = new Get(Bytes.toBytes(value._1))
        get.addColumn(Bytes.toBytes("a"), Bytes.toBytes("key11"))
        val result = table.get(get)
        val v = Bytes.toString(result.getValue(Bytes.toBytes("a"), Bytes.toBytes("key11")))
        println(v)
        //写入
        val put = new Put(Bytes.toBytes("shx_" + value._2))
        put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes("lisi"))
        table.put(put)
      }

      override def close(): Unit = {
        table.close()
      }
    })

相关文章

网友评论

      本文标题:flink 写数据到hbase

      本文链接:https://www.haomeiwen.com/subject/ftkxdctx.html