美文网首页
flink Async I/O

flink Async I/O

作者: 邵红晓 | 来源:发表于2019-08-01 17:29 被阅读0次

Hbase端代码示例

  • 这里使用线程池模拟异步IO,本人Hbase实验版本为1.30,Hbase 2.0才开始支持异步客户端
import java.util.concurrent.{ExecutorService, Executors}

import org.apache.flink.streaming.api.scala.async.{AsyncFunction, ResultFuture}
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.{Get, Table}
import org.apache.hadoop.hbase.util.Bytes

class AsyncHbaseClient extends AsyncFunction[(String, Int), (String, Int, String)] {

  lazy val  table: Table = HbaseUtil().connect.getTable(TableName.valueOf("test"))
  lazy val executorService: ExecutorService = Executors.newFixedThreadPool(30)

  override def asyncInvoke(input: (String, Int), resultFuture: ResultFuture[(String, Int, String)]): Unit = {
    executorService.submit(new Runnable {
      override def run(): Unit = {
        val get = new Get(Bytes.toBytes(input._1))
        get.addColumn(Bytes.toBytes("a"), Bytes.toBytes("key11"))
        val result = table.get(get)
        val value = Bytes.toString(result.getValue(Bytes.toBytes("a"), Bytes.toBytes("key11")))
        // 一定要记得放回 resultFuture,不然数据全部是timeout 的
        resultFuture.complete(Iterable((input._1, input._2, value)))
      }
    })
  }

  override def timeout(input: (String, Int), resultFuture: ResultFuture[(String, Int, String)]): Unit = {
    resultFuture.complete(Iterable((input._1, input._2, "timeout")))
  }
}

主函数调用示例

 def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(2)
    import org.apache.flink.api.scala._
    val list = ArrayBuffer[String]()
    list.append("my-key-1")
    list.append("my-key-1")
    list.append("my-key-1")
    list.append("my-key-1")
    val text = env.fromCollection(list).map((_, 1))
    // 添加一个 async I/O 的转换
    val resultStream: DataStream[(String, Int,String)] = AsyncDataStream.orderedWait[(String, Int),(String, Int,String)](
      text,new AsyncHbaseClient(),1000L,TimeUnit.MILLISECONDS,10
    )
    resultStream.print()
    env.execute("Streaming WordCount")
  }

相关文章

  • 聊聊flink的Async I/O

    序 本文主要研究一下flink的Async I/O 实例 本实例展示了flink Async I/O的基本用法,首...

  • flink Async I/O

    Hbase端代码示例 这里使用线程池模拟异步IO,本人Hbase实验版本为1.30,Hbase 2.0才开始支持异...

  • Flink异步之矛盾-锋利的Async I/O

    维表JOIN-绕不过去的业务场景 在Flink 流处理过程中,经常需要和外部系统进行交互,用维度表补全事实表中的字...

  • Flink异步I/O

    1 概述 流计算系统中经常需要与外部系统进行交互,我们通常的做法如向数据库发送用户a的查询请求,然后等待结果返回,...

  • Flink 异步I/O

    1.为什么需要异步IO flink在做实时处理时,有时候需要和外部数据交互,但是通常情况下这个交互过程是同步的,这...

  • Async I/O 的实现原理

    在使用 Flink 处理实时数据流时,经常需要和外部系统进行交互。例如,在构建实时数据仓库的时候,通常需要将消息和...

  • 浅谈MySQL连接的“8小时问题”(wait_timeout与空

    前言 一个月前,笔者写了篇Flink维表关联方面的文章,其中将Flink异步I/O与Vert.x JDBC Cli...

  • Flink 异步IO实战

    基本概念 首先通过官网的一个图片了解一下Asynchronous I/O Operation Flink sour...

  • Python并发之异步I/O(async,await)

    Python并发之异步I/O(async,await) 背景 Python有很长一段的异步编程历史,特别是twis...

  • Flink 原理与实现:Aysnc I/O

    背景 Async I/O 是阿里巴巴贡献给社区的一个呼声非常高的特性,于1.2版本引入。主要目的是为了解决与外部系...

网友评论

      本文标题:flink Async I/O

      本文链接:https://www.haomeiwen.com/subject/ugycdctx.html