美文网首页
Flink_Aysnc IO 异步IO

Flink_Aysnc IO 异步IO

作者: Eqo | 来源:发表于2022-08-25 09:26 被阅读0次

    异步IO为了解决与外部系统交互时网络延迟成为了系统瓶颈的问题.
    列如:
    我们再Flink处理数据的时候,很可能需要关联数据库,从数据库中获取对应信息然后再处理,如果数据量较大,每条数据都去请求数据库,等待数据库回应之后,才能处理数据,及其影响性能.
    举例 数据流处理数据时,加入我需要判断这条数据,是否存在于数据库中.

    Flink 就采取异步IO,所有的数据先发送请求,响不响应先不管,先继续处理数据流中的数据,谁先响应 先处理谁.

    image.png
    异步模式可以并发的处理多个请求和回复,可以连续的向数据库发送用户a、b、c、d等的请求,与此同时,哪个请求的回复先返回了就处理哪个回复,从而连续的请求之间不需要阻塞等待,这也正是Async I/O的实现原理。

    使用前提

    • 数据库(或kv类型数据存储系统)必须支持异步的请求clinent(如java的vetx)
    • 如果不支持异步客户端的话,也可以将同步客户端丢到线程池 中做为异步客户端

    使用步骤

    • 1.使用AysncDataStream工具类对数据流DataStream进行异步处理
    • 2.自定义类,继承RichAsyncFunction 富有的异步方法类 ,转换异步处理数据,其中需要异步请求外部存储系统,处理结果

    核心代码

    使用AsyncDataStream工具类中的unorderedWait()无需等待 方法设置要处理的数据流, 传入 异步处理方法

            // todo 3-2. 异步请求Mysql数据库,采用JDBC方式,由于Mysql不支持异步访问,以多线程方式实现
            // 使用异步工具类创建 使用无需等待
            /**
             *
             in – Input DataStream 需要进行异步请求的 数据流
             func – AsyncFunction    自定义类,转换异步处理数据,其中需要异步请求外部存储系统,处理结果
             timeout – for the asynchronous operation to complete 超时时间
             timeUnit – of the given timeout  单位
             capacity – The max number of async i/o operation that can be triggered 异步请求变化量
             */
            //in
            SingleOutputStreamOperator<String> resultDS = AsyncDataStream.unorderedWait(tupleStream, new AsyncMysqlRequestResult(),
                    10000, TimeUnit.MILLISECONDS, 10);
    

    异步处理方法类(重要)

    这个方法类 实现的是 出数据类中真实处理逻辑


    image.png
    package cn.itcast.async;
    
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.functions.async.ResultFuture;
    import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
    
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.util.Collections;
    import java.util.concurrent.*;
    
    /**
     * 义子类,实现函数接口:AsyncFunction,重写asyncInvoke方法,实现异步请求数据库,获取数据
     *
     * @author ado
     */
    // 实现富有的 异步请求方法类
    public class AsyncMysqlRequestResult extends RichAsyncFunction<Tuple2<String, String>, String> {
    
        // 定义mysql 连接变量
        private Connection connection = null ;
        private PreparedStatement pstmt = null ;
        private ResultSet result = null ;
        // 定义线程池  因为mysql 不支持异步请求, 所以需要线程池实现
        private ExecutorService executorService = null;
    
        // 初始化线程池
        @Override
        public void open(Configuration parameters) throws Exception {
    
            executorService= Executors.newFixedThreadPool(10);
            // a. 加载驱动类
            Class.forName("com.mysql.jdbc.Driver") ;
            // b. 获取连接
            connection = DriverManager.getConnection(
                    "jdbc:mysql://node1.itcast.cn:3306/?useSSL=false", "root", "123456"
            );
            // c. 实例化Statement对象
            pstmt = connection.prepareStatement("SELECT user_name FROM db_flink.tbl_user_info WHERE user_id = ?") ;
    
        }
    
        @Override
        public void asyncInvoke(Tuple2<String, String> input, ResultFuture<String> resultFuture) throws Exception {
    
               /*
                input -> 数据流中每条数据: (u_1009,     u_1009,click,2022-08-06 19:30:55.347)
                            |
                       wujiu,u_1009,click,2022-08-06 19:30:55.347
             */
            // 1.获取用户id
             String userID = input.f0;
    
            // 2.todo 异步请求mysql 到数据库中访问对应的用户 userName 一句ueseID 检索
    
            Future<String> future = executorService.submit(new Callable<String>() {
                // 相当于再线程池内启动一个线程 去访问数据库
                @Override
                public String call() throws Exception {
    
                    //todo 核心 编写jdbc代码 根据userid 获取username
                    // d. 设置占位符值,进行查询
                    pstmt.setString(1, userID);
                    result = pstmt.executeQuery();
    
                    String userName ="null";
    
                    // 为什么使用 while 因为我们查询的数据 有可能是多条数据
                    while (result.next()){
                        userName = result.getString("user_name");
    
                    }
    
                    return userName;
                }
            });
            //3. 获取返回结果的 username
            String userName = future.get();
            String output = userName + "," + input.f1 ;
    
            //4. 将查询数据结构异步返回
            resultFuture.complete(Collections.singletonList(output));
    
    
        }
        // 超时时间 这个方法是 如果请求数据库 超时了 数据库没有响应 该怎么办
        @Override
        public void timeout(Tuple2<String, String> input, ResultFuture<String> resultFuture) throws Exception {
            // 超时了 直接返回
    
            // 此时的逻辑是 根据 字段获取 数据库中的userid
            // 获取日志
            String log = input.f1;
            //输出数据
            String output ="unknown"+log;
            // 里面需要结束一个集合
            // 通过集合工具类 获取一个集合
            resultFuture.complete(Collections.singletonList(output));
    
    
    
        }
    
        @Override
        public void close() throws Exception {
            // 关闭线程池
            if(null != executorService) {
                executorService.shutdown();
            }
    
            // f. 关闭连接
            if(null != result){
                result.close();
            }
            if(null != pstmt){
                pstmt.close();
            }
            if(null != connection) {
                connection.close();
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:Flink_Aysnc IO 异步IO

          本文链接:https://www.haomeiwen.com/subject/svgogrtx.html