美文网首页
Seata 高性能 RPC 通信的实现- 巧用 reactor

Seata 高性能 RPC 通信的实现- 巧用 reactor

作者: Java程序员YY | 来源:发表于2023-04-02 15:51 被阅读0次

    一、Reactor 模式

    reactor 模式是一种事件驱动的应用层 I/O 处理模式,基于分而治之和事件驱动的思想,致力于构建一个高性能的可伸缩的 I/O 处理模式。维基百科对 Reactor pattern 的解释:

    The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers

    大致意思是说,reactor设计模式是一种事件处理模式,用于同时有一个或多个请求发送到事件处理器(service handler),这个事件处理器会采用多路分离(demultiplexes )的方式,同步的将这些请求分发到请求处理器(request handlers)。

    不难看出,上边介绍的 reactor 模式是一种抽象;从实现角度说,reactor 模式有许多变种,不同编程语言中的实现也有差异。就 java 而言,大师 Doug Lea 在其【Scalable IO in Java】中就讲述了几个reactor模式的演进,如单线程版本多线程版 ,阅读此文后,笔者对大师所讲reactor模式演进的理解与网络中一些描述稍有差异。

    reactor 单线程版中,只有一个reactor线程,线程中通过 select (I/O 多路复用接口) 监听所有 I/O 事件,收到 I/O 事件后通过 dispatch 进行分发给 Handlers 处理,此版本容易实现,也容易理解,但性能不高。为了适配多处理器,充分利用多核并行处理的优势,实现高性能的网络服务,可以采用分治策略,关键环节采用多线程模式,于是就出现了reactor多线程版本,而多线程的应用体现为worder线程和reactor线程,多线程应该被池化管理,这样才容易被调整和控制。线程池中的线程数会比客户端的数量少很多,实际数量可以根据程序本身是 CPU 密集型还是 I/O 密集型操作来进行合理的分配。

    • 多个 worder 线程(池化管理)

      • 属于网络 I/O 操作与业务处理的拆分,因为 reactors 监听到 I/O 事件后应该快速分发给 handlers 来处理程序;但如果 handler 中的非 I/O 操作慢了就会减慢 reactor 中的 I/O 事件响应速度,所以把非 I/O 操作从 reactors 的 I/O 线程转移到其他线程中,即由worker线程来分担非 I/O 逻辑的操作处理。
    • 多个 reactor 线程(池化管理)

      • 属于网络建连操作与网络 I/O 读写操作的拆分,因为由一个reactor在一个线程中完成所有 I/O 操作也会遇到性能瓶颈,可采取拆分并增加reactor策略,将 I/O 负载分配给多个 reactor(每个reactor都有自己的线程、选择器和调度循环)以达到负载平衡。这看起来挺不错,但谁来执行分配以达到负载均衡呢?或许是因为这个问题,将reactor拆分为两类角色,mainReactor负责接收连接,之后采用一定的负载均衡策略将新连接分配给其他subReactor来处理 I/O 读写,这样的拆分自然流畅。

    如此就演进出如上图中的主从reactor多线程模型。请注意,结合【Scalable IO in Java】原文中的用词和描述看,上图中的mainReactorsubReactor可以有多个并做池化管理,所有也有一些文章中会看到如主ReactorGroupmainReactorGroup从ReactorGroupsubReactorGroup等这类名词用 Group 后缀来强调 Reactor 是池化管理。 或许是不好布局,也或许是为了凸显主从reactor角色的协作关系,上图中都只展示了一个,另外服务端应用通常只暴露一个服务端口时,只需用一个 mainReactor 来监听端口上的连接事件并处理。

    二、Netty 主从 reactor 多线程模型

    Nettyreactor所对应的实现类是NioEventLoop,其核心逻辑如下:

    • 不同类型的 channel 向 Selector 注册所感兴趣的事件
    • 扫描是否有感兴趣的事件发生
    • 事件发生后做相应的处理

    客户端和服务端分别会有不同类型的channel,客户端创建SocketChannel向服务端发起连接请求,服务端创建ServerSocketChannel监听客户端连接,建连后创建SocketChannel与客户端的SocketChannel互相收发数据,这些channel分工不同,向 Selector 注册所感兴趣的事件情况也不同:

    客户端/服务端 channel OP_ACCEPT OP_CONNECT OP_WRITE OP_READ
    客户端 SocketChannel YES
    服务端 ServerSocketChannel YES
    服务端 SocketChannel YES YES

    Netty中 Nio 方式实现几种 reactor 模型如下:

    mainReactor 对应 Netty 中配置的 bossGroup 线程组(下图中的主ReactorGroup),主要负责接受客户端连接的建立。每 bind 一个端口就用掉一个bossGroup中的线程。

    subReactor 对应 Netty 中配置的 workerGroup 线程组(下图中的 reactorGroup),bossGroup 线程组接受完客户端的连接后,将 channel 转交给 workerGroup 线程组,在 workerGroup 线程组内选择一个线程,执行 I/O 读写的处理,workerGroup 线程组默认是 2 * CPU 核数个线程。

    主从 reactor 模式的核心流程:

    1. 如果只监听一个端口,那么只需一个主reactor干活儿,所以通常看到boosGroup只配置一个线程。主reactor运行在独立的线程中 ,该线程中只负责与客户端的连接请求

    2. reactor在服务器端可以不止一个, 通常运行多个从 reactor , 每个从 reactor 也运行在一个独立的线程中 ,负责与客户端的读写操作

    3. reactor 检测到客户端的链接后,创建 NioSocketChannel,按照一定的算法循环选取(负载均衡)一个从reactor,并把刚创建的NioSocketChannel 注册到这个从 reactor 中,这样建连和读写事件互不影响。

    4. 一个 reactor 中可被注册多个NioSocketChannel,这个 reactor 监听所有的被分配的 NioSocketChannel 的读写事件 , 如果监听到客户端的数据发送事件 , 将对应的业务逻辑转发给 NioSocketChannel 中的pipeline 里的 handler 链进行处理

    5. handler 最好只负责响应 I/O 事件,不处理具体的与客户端交互的业务逻辑 , 这样不会长时间阻塞 , 其 read 方法读取客户端数据后 , 将消息数据交给业务线程池去处理相关业务逻辑

    6. 业务线程池完成相关业务逻辑的处理后,将结果返回,通过NioSocketChannel的的pipeline 里的 handler 链将结果消息写回给客户端

    7. buffer不满足将结果消息写回给客户端时的条件时,注册写事件,等待可写时再写

    三、Seata Server 端 的 reactor 模式应用

    Seata Server 采用了 主从 reactor 多线程模型,对应这个模型的话是有四个线程池,其中自定义业务线程池是两个。

    功能 线程池对象 备注
    接收客户端连接 NettyServerBootstrap#eventLoopGroupBoss
    处理 IO 事件 NettyServerBootstrap#eventLoopGroupWorker 部分 RPC 消息在这里处理
    处理客户端的 request 消息 AbstractNettyRemoting#messageExecutor 客户端主动发给的消息
    处理客户端的 response 消息 NettyRemotingServer#branchResultMessageExecutor 服务端主动发给客户端消息,客户端处理后给服务端响应

    3.1、NettyServerBootstrap#eventLoopGroupBoss

    笔者的环境未启用 epoll,关键信息如下:

    • 线程数:1,只监听一个端口
    • 线程名前缀:“NettyBoss”
    
    this.eventLoopGroupBoss = new NioEventLoopGroup(
            //CONFIG.getInt("transport.threadFactory.bossThreadSize", 1);
            nettyServerConfig.getBossThreadSize(),
        new NamedThreadFactory(
                // CONFIG.getConfig("transport.threadFactory.bossThreadPrefix", "NettyBoss");
                nettyServerConfig.getBossThreadPrefix(),
                //CONFIG.getConfig("transport.threadFactory.bossThreadSize", 1);
                nettyServerConfig.getBossThreadSize())
    );
    

    3.2、NettyServerBootstrap#eventLoopGroupWorker

    笔者的环境未启用 epoll,关键信息如下:

    • 线程数:默认值是 cpu 核数 * 2
    • 线程名前缀:“NettyServerNIOWorker”
    this.eventLoopGroupWorker = new NioEventLoopGroup(
            // System.getProperty("transport.serverWorkerThreads", String.valueOf(WORKER_THREAD_SIZE)));//默认值cpu核数*2
            nettyServerConfig.getServerWorkerThreads(),
        new NamedThreadFactory(
                // CONFIG.getConfig("transport.threadFactory.workerThreadPrefix",
                //            enableEpoll() ? EPOLL_WORKER_THREAD_PREFIX : DEFAULT_NIO_WORKER_THREAD_PREFIX);
                // 默认值 NettyServerNIOWorker ,没有启用 epoll
                nettyServerConfig.getWorkerThreadPrefix(),
                //System.getProperty("transport.serverWorkerThreads", String.valueOf(WORKER_THREAD_SIZE)));//默认值 cpu核数*2
                nettyServerConfig.getServerWorkerThreads())
    );
    

    3.3、AbstractNettyRemoting#messageExecutor

    此线程池处理客户端的 request 消息,关键参数信息如下:

    • 线程数:50 ~ 500
    • keepAlive:500 秒
    • 线程名字前缀: "ServerHandlerThread"
    • 队列长度: 500
    • 拒绝策略:CallerRunsPolicy(),饱和的情况下,调用者来执行该任务,即 Netty 的 I/O 线程
    ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(
            //Integer.parseInt(System.getProperty("transport.minServerPoolSize", "50"));
            NettyServerConfig.getMinServerPoolSize(),
            //Integer.parseInt(System.getProperty("transport.maxServerPoolSize", "500"));
            NettyServerConfig.getMaxServerPoolSize(),
            //Integer.parseInt(System.getProperty("transport.keepAliveTime", "500"));
            NettyServerConfig.getKeepAliveTime(),
            TimeUnit.SECONDS,
            //Integer.parseInt(System.getProperty("transport.maxTaskQueueSize", "500"));
            new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
            new NamedThreadFactory(
                    "ServerHandlerThread",
                    //Integer.parseInt(System.getProperty("transport.maxServerPoolSize", "500"));
                    NettyServerConfig.getMaxServerPoolSize()),
                    //饱和的情况下,调用者来执行该任务,即Netty的IO线程
                    new ThreadPoolExecutor.CallerRunsPolicy()
    );
    

    3.4、NettyRemotingServer#branchResultMessageExecutor

    此线程池处理客户端的 response 消息,关键参数信息如下:

    • 线程数:cpu 核数2 ~ cpu 核数2
    • keepAlive:500 秒
    • 线程名字前缀: "BranchResultHandlerThread"
    • 队列长度: 20000
    • 拒绝策略:CallerRunsPolicy(),饱和的情况下,调用者来执行该任务,即 Netty 的 IO 线程
    private ThreadPoolExecutor branchResultMessageExecutor = new ThreadPoolExecutor(
            //System.getProperty("transport.minBranchResultPoolSize", String.valueOf(WORKER_THREAD_SIZE))),默认值 cpu核数*2
            NettyServerConfig.getMinBranchResultPoolSize(),
            //System.getProperty("transport.maxBranchResultPoolSize", String.valueOf(WORKER_THREAD_SIZE))),默认值 cpu核数*2
            NettyServerConfig.getMaxBranchResultPoolSize(),
            // System.getProperty("transport.keepAliveTime", "500"),默认值500
            NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(
                    //System.getProperty("transport.maxTaskQueueSize", "20000"),默认值 20000
                    NettyServerConfig.getMaxTaskQueueSize()),
            new NamedThreadFactory(
                    // 分支响应消息的处理线程的名字前缀  BranchResultHandlerThread
                    "BranchResultHandlerThread",
                    // System.getProperty("transport.maxBranchResultPoolSize", String.valueOf(WORKER_THREAD_SIZE))),默认值 cpu核数*2
                    NettyServerConfig.getMaxBranchResultPoolSize()
            ),
            //饱和的情况下,调用者来执行该任务,即Netty的IO线程
            new ThreadPoolExecutor.CallerRunsPolicy()
    );
    

    3.5、业务线程池如何处理消息

    3.5.1、登记消息处理器

    Seata 消息处理的核心逻辑是:定义好什么类型的消息,使用哪个消息处理器,这个消息处理器的消息处理逻辑在哪个线程池中执行。这个映射关系通过AbstractNettyRemoting#processorTable来存储。

    /**
     * 可以接收什么类型的消息,以及使用哪个消息处理器和线程池来处理消息
     * HashMap<消息类型, Pair<消息处理器, 线程池>>
     * processor type {@link MessageType}
     */
    protected final HashMap<Integer/*MessageType*/, Pair<RemotingProcessor, ExecutorService>> processorTable = new HashMap<>(32);
    

    各模块 Netty 组件启动前,通过AbstractNettyRemotingServer#registerProcessor方法登记到这个结构中。

    public void registerProcessor(int messageType, RemotingProcessor processor, ExecutorService executor) {
        Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, executor);
        this.processorTable.put(messageType, pair);
    }
    

    拿 Seata Server 来说,如在ServerBootStrap启动前,通过NettyRemotingServer#registerProcessor注册好消息处理器。不同消息对应的处理器的线程池也不同,也有一些消息没有指定业务线程池(没必要),情况如下:

    private void registerProcessor() {
        // 1\. registry on request message processor
        ServerOnRequestProcessor onRequestProcessor =
            new ServerOnRequestProcessor(this, getHandler());
        ShutdownHook.getInstance().addDisposable(onRequestProcessor);
        super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);
        // 2\. registry on response message processor
        ServerOnResponseProcessor onResponseProcessor =
            new ServerOnResponseProcessor(getHandler(), getFutures());
        super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, branchResultMessageExecutor);
        super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, branchResultMessageExecutor);
        // 3\. registry rm message processor
        RegRmProcessor regRmProcessor = new RegRmProcessor(this);
        super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);
        // 4\. registry tm message processor
        RegTmProcessor regTmProcessor = new RegTmProcessor(this);
        super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);
        // 5\. registry heartbeat message processor
        ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
        super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
    }
    

    3.5.2、处理消息

    当 Seata Server 收到客户端发送的 RPC 消息后,会进入AbstractNettyRemotingServer.ServerHandler#channelRead中,在这里对消息类型简单判断后,委托给processMessage处理。

    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
        if (!(msg instanceof RpcMessage)) {
            return;
        }
        // 收到消息后,委托给 processMessage 处理
        processMessage(ctx, (RpcMessage) msg);
    }
    

    processMessage中通过消息类型找到消息处理器进行业务层处理:

    1. 如果消息处理器有指定的业务线程池,在指定的业务线程池中处理消息
    2. 若消息处理器没有指定的业务线程池,则在 I/O 线程中直接处理。
    protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
        ...
        Object body = rpcMessage.getBody();
        if (body instanceof MessageTypeAware) {
            MessageTypeAware messageTypeAware = (MessageTypeAware) body;
            // 通过消息类型找到消息处理器
            final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
            if (pair != null) {
                // 如果消息处理器有指定的业务线程池
                if (pair.getSecond() != null) {
                    try {
                        // 在指定的业务线程池中处理消息
                        pair.getSecond().execute(() -> {
                            ...
                            pair.getFirst().process(ctx, rpcMessage);
                            ...
                        });
                    } catch (RejectedExecutionException e) {
                        ...
                    }
                } else {
                    try {
                        //若消息处理器没有指定的业务线程池,则在I/O现成中直接处理。
                        pair.getFirst().process(ctx, rpcMessage);
                    } catch (Throwable th) {
                       ...
                    }
                }
            } else {
              ...
            }
        } else {
            ...
        }
    }
    

    四、Seata client 端的 reactor 模式应用

    Seata client 端也采用了 reactor 多线程模型,在初始化的时候有RmNettyRemotingClientTmNettyRemotingClient两个对象,分别会创建各自的 Bootstrap,RM 和 TM 各有自己的 I/O 线程池和业务线程池。

    功能 线程池对象 备注
    处理 IO 事件 NettyClientBootstrap#eventLoopGroupWorker
    处理业务消息 AbstractNettyRemoting#messageExecutor

    源码里还有个NettyClientBootstrap#defaultEventExecutorGroup,没看出来哪里有用。TmNettyRemotingClient#getInstance()中构建了 TM 的业务线程池,赋值给NettyClientBootstrap#messageExecutor,同样RmNettyRemotingClient#getInstance()中构建了 RM 的业务线程池

    4.1、NettyClientBootstrap#eventLoopGroupWorker

    客户端此线程池关键信息如下:

    • 线程数:1
    • 线程名字前缀:
      • TM:"NettyClientSelector_TMROLE"
      • RM:"NettyClientSelector_RMROLE"
    // 单I/O线程
    this.eventLoopGroupWorker = new NioEventLoopGroup(
    //CONFIG.getInt("transport.threadFactory.clientSelectorThreadSize", 1)
    selectorThreadSizeThreadSize,
    new NamedThreadFactory(
        // CONFIG.getConfig("transport.threadFactory.clientSelectorThreadPrefix", "NettyClientSelector");
        // 再拼上角色后默认值为:"NettyClientSelector_TMROLE"
        getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()),
        //CONFIG.getInt("transport.threadFactory.clientSelectorThreadSize", 1)
        selectorThreadSizeThreadSize)
    );
    

    4.2、AbstractNettyRemoting#messageExecutor

    TmNettyRemotingClient#getInstance()RmNettyRemotingClient#getInstance()创建各自的线程池,配置并不相同。

    1)TmNettyRemotingClient#getInstance()中所创建线程池的关键信息如下:

    • 线程数:默认值是 cpu 核数 _ 2 ~ cpu 核数 _ 2
    • keepAlive:Integer.MAX_VALUE 秒
    • 线程名字前缀:rpcDispatch_TMROLE
    • 队列长度: 2000
    • 拒绝策略:runsOldestTaskPolicy(),饱和的情况下,添加新任务并由投递任务的线程运行最早的任务。
    public static TmNettyRemotingClient getInstance() {
        if (instance == null) {
            synchronized (TmNettyRemotingClient.class) {
                if (instance == null) {
                    NettyClientConfig nettyClientConfig = new NettyClientConfig();
                    // 自定义TM业务线程池
                    final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(
                            nettyClientConfig.getClientWorkerThreads(), // 默认是cpu核数 * 2
                            nettyClientConfig.getClientWorkerThreads(), // 默认是cpu核数 * 2
                            KEEP_ALIVE_TIME, TimeUnit.SECONDS,//Integer.MAX_VALUE;
                            new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),//2000
                            new NamedThreadFactory(nettyClientConfig.getTmDispatchThreadPrefix(),// TM的线程名是:rpcDispatch_TMROLE
                                    nettyClientConfig.getClientWorkerThreads()),// 默认是cpu核数 * 2
                            RejectedPolicies.runsOldestTaskPolicy());//添加新任务并由主线程运行最早的任务。
                    instance = new TmNettyRemotingClient(nettyClientConfig, null, messageExecutor);
                }
            }
        }
        return instance;
    }
    

    2)RmNettyRemotingClient#getInstance() 中所创建线程池的关键信息如下:

    • 线程数:默认是 cpu 核数 _ 2 ~ cpu 核数 _ 2
    • keepAlive:Integer.MAX_VALUE 秒
    • 线程名字前缀:rpcDispatch_RMROLE
    • 队列长度: 20000
    • 拒绝策略:CallerRunsPolicy(),饱和的情况下,调用者来执行该任务,即 Netty 的 IO 线程。
    public static RmNettyRemotingClient getInstance() {
        if (instance == null) {
            synchronized (RmNettyRemotingClient.class) {
                if (instance == null) {
                    NettyClientConfig nettyClientConfig = new NettyClientConfig();
                    // 自定义RM业务线程池
                    final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(
                            nettyClientConfig.getClientWorkerThreads(), // 默认是cpu核数 * 2
                            nettyClientConfig.getClientWorkerThreads(), // 默认是cpu核数 * 2
                            KEEP_ALIVE_TIME, TimeUnit.SECONDS,//Integer.MAX_VALUE;
                            new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),//20000
                            new NamedThreadFactory(
                                nettyClientConfig.getRmDispatchThreadPrefix(),// RM的线程名是:rpcDispatch_RMROLE,
                                nettyClientConfig.getClientWorkerThreads()),// 默认是cpu核数 * 2
                        new ThreadPoolExecutor.CallerRunsPolicy());////饱和的情况下,调用者来执行该任务,即Netty的IO线程
                    instance = new RmNettyRemotingClient(nettyClientConfig, null, messageExecutor);
                }
            }
        }
        return instance;
    }
    

    4.3、消息处理

    TmNettyRemotingClientRmNettyRemotingClientinit()方法中会调用registerProcessor()方法注册各自的 RPC 消息处理器。收到 RPC 消息后就由这些处理器+对应的线程池做后续处理,消息的相关业务属性在后续的事务流程中介绍。

    五、支撑特殊能力的业务线程池

    1)AbstractNettyRemotingClient#mergeSendExecutorService

    用于批量发送请求,多个消息合并,减少通信次数。实现逻辑比较清晰,当允许发送批量消息时,消息首先分桶保存到 basketMap,在一个周期性的无线循环中,把 basketMap 中的消息队列取出来,把每个队列的消息都放到 mergeMessage 中,最后把 mergeMessage 发送出去。

    • 线程数:1
    • 线程名前缀:”rpcMergeMessageSend“
    • AbstractNettyRemotingClient中功能相关的属性介绍:
      • Object mergeLock:发送请求的锁对象。
      • Map<Integer, MergeMessage> mergeMsgMap:当发送消息的类型是 MergeMessage,那么就将消息保存到 mergeMsgMap。
      • ConcurrentHashMap<String/*serverAddress*/, BlockingQueue<RpcMessage>> basketMap:当允许发送批量消息时,消息首先分桶保存到 basketMap,然后通过定时任务将保存 basketMap 的消息发送出去。basketMap 的是服务器的地址,value 是保存的发送个服务器的消息。按照地址分桶是将要发给同一个服务器的多个消息合并到一个MergedWarpMessage后发送。
    • 有配置开关,默认值如下:
    transport.enableTmClientBatchSendRequest=false
    transport.enableRmClientBatchSendRequest=true
    transport.enableTcServerBatchSendResponse=false
    

    对应的关键代码逻辑如下:

    1. AbstractNettyRemotingClient#sendSyncRequest中,同步发送时将消息缓存起来,默认配置看只有 RM 开启了消息合并发送,另外同步发送超时设定,默认 TM 30 秒,RM 15 秒。按照 IP 地址分桶,同一个目标实例的消息才可以合并发送
    public Object sendSyncRequest(Object msg) throws TimeoutException {
        String serverAddress = loadBalance(getTransactionServiceGroup(), msg);
        // 同步发送超时设定,默认 TM 30秒,RM 15秒
        long timeoutMillis = this.getRpcRequestTimeout();
        RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
    
        // send batch message
        // put message into basketMap, @see MergedSendRunnable
        // 默认只有RM开启了消息合并发送,TM 并未开启批发送
        if (this.isEnableClientBatchSendRequest()) {
    
            // send batch message is sync request, needs to create messageFuture and put it in futures.
            MessageFuture messageFuture = new MessageFuture();
            messageFuture.setRequestMessage(rpcMessage);
            messageFuture.setTimeout(timeoutMillis);
            futures.put(rpcMessage.getId(), messageFuture);
    
            // put message into basketMap
            // 按照目标地址分桶,同一个TC实例的消息才可以合并发送
            BlockingQueue<RpcMessage> basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress,
                key -> new LinkedBlockingQueue<>());
            if (!basket.offer(rpcMessage)) {
                LOGGER.error("put message into basketMap offer failed, serverAddress:{},rpcMessage:{}",
                        serverAddress, rpcMessage);
                return null;
            }
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("offer message: {}", rpcMessage.getBody());
            }
            // 通知合并发送线程 有消息要发送,醒来干活儿
            if (!isSending) {
                synchronized (mergeLock) {
                    mergeLock.notifyAll();
                }
            }
    
            try {
                // 阻塞等待消息的响应。
                return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
            } catch (Exception exx) {
                LOGGER.error("wait response error:{},ip:{},request:{}",
                    exx.getMessage(), serverAddress, rpcMessage.getBody());
                if (exx instanceof TimeoutException) {
                    throw (TimeoutException) exx;
                } else {
                    throw new RuntimeException(exx);
                }
            }
    
        } else {
            // 不合并发送的话,就获取指定IP的channel,并立即发送。
            Channel channel = clientChannelManager.acquireChannel(serverAddress);
            return super.sendSync(channel, rpcMessage, timeoutMillis);
        }
    
    }
    
    1. AbstractNettyRemotingClient#init中构建线程池mergeSendExecutorService,在这个线程池中执行消息的批处理(消息合并、消息发送)。
    public void init() {
        ...
        // 通过线程池有1个线程,执行消息合并发送
        if (this.isEnableClientBatchSendRequest()) {
            mergeSendExecutorService = new ThreadPoolExecutor(
                MAX_MERGE_SEND_THREAD,//1
                MAX_MERGE_SEND_THREAD,//1
                KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(),
                new NamedThreadFactory(
                        //TM : rpcMergeMessageSend_TMROLE
                        //RM : rpcMergeMessageSend_RMROLE
                        //SERVER : rpcMergeMessageSend_SERVERROLE
                        getThreadPrefix(),
                        MAX_MERGE_SEND_THREAD)//1
            );
            mergeSendExecutorService.submit(new MergedSendRunnable());
        }
        super.init();
        clientBootstrap.start();
    }
    
    1. 批处理任务MergedSendRunnable中,实现了消息合并和消息发送
    private class MergedSendRunnable implements Runnable {
    
        @Override
        public void run() {
            while (true) {
                //mergeLock 用于生产-消费的协作
                synchronized (mergeLock) {
                    try {
                        // MAX_MERGE_SEND_MILLS = 1,还有线程休眠的效果
                        mergeLock.wait(MAX_MERGE_SEND_MILLS);
                    } catch (InterruptedException e) {
                    }
                }
                isSending = true;
                // 发送消息,消息是按照IP地址分组
                basketMap.forEach((address, basket) -> {
                    if (basket.isEmpty()) {
                        return;
                    }
    
                    MergedWarpMessage mergeMessage = new MergedWarpMessage();
                    //如果basket队列不为空,将其中的消息全取出来,添加到mergeMessage中
                    while (!basket.isEmpty()) {
                        RpcMessage msg = basket.poll();
                        mergeMessage.msgs.add((AbstractMessage) msg.getBody());
                        mergeMessage.msgIds.add(msg.getId());
                    }
                    // debug 打印本次发送的消息个数和每个消息的Id,以及此时在futures中做超时管控的所有消息的Id,
                    // 两个消息Id比对,可知道消息积压情况9666
                    if (mergeMessage.msgIds.size() > 1) {
                        printMergeMessageLog(mergeMessage);
                    }
                    Channel sendChannel = null;
                    try {
                        // 获取指定地址的channel对象,异步发送消息
                        // 发送批量消息是同步的请求,但是这里不需要得到返回的值,在消息保存到basketMap之前,已经创建了messageFuture了,
                        // 返回值将会从ClientOnResponseProcessor中得到
                        sendChannel = clientChannelManager.acquireChannel(address);
                        // 因为原始消息的发送已经加入过超时管控,所以批量发送环节不再需要加入额外的超时控制
                        AbstractNettyRemotingClient.this.sendAsyncRequest(sendChannel, mergeMessage);
                    } catch (FrameworkException e) {
                        if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && sendChannel != null) {
                            destroyChannel(address, sendChannel);
                        }
                        // fast fail
                        // 发生异常,快速将保存在mergeMessage的消息清理掉
                        for (Integer msgId : mergeMessage.msgIds) {
                            MessageFuture messageFuture = futures.remove(msgId);
                            if (messageFuture != null) {
                                messageFuture.setResultMessage(
                                    new RuntimeException(String.format("%s is unreachable", address), e));
                            }
                        }
                        LOGGER.error("client merge call failed: {}", e.getMessage(), e);
                    }
                });
                isSending = false;
            }
        }
    

    2)AbstractNettyRemoting#timerExecutor

    Netty 的 I/O 操作异步的,RPC 消息的发送操作会对应一个 Future 对象,在 Seata 中这个 Futrue 对象被封装为 MessageFuture,需同步发送的消息,其对应的 MessageFuture 被放入 map 缓存起来,当收到消息的 response 后,将消息从 map 中移除。AbstractNettyRemoting#timerExecutor里的这个线程定时巡检 map 中的消息,若超时未收到 response 则认定为发送超时。

    • 线程数:1
    • 线程名前缀:”timeoutChecker“
    • scheduleAtFixedRate :延迟 3 秒,频率 3 秒
    • AbstractNettyRemoting中的功能相关的属性介绍:
      • ScheduledExecutorService timerExecutor:执行定时任务,消息发送以后,到了过期时间还没有返回,则会对消息进行清理。
      • ConcurrentHashMap<Integer, MessageFuture> futures:保存着不同消息,timerExecutor 会清理 futures 中过期的消息。

    对应的关键代码逻辑如下:

    1. 构建定时任务的线程池AbstractNettyRemoting#timerExecutor,只用 1 个线程
    /**
     * 定时器,用于巡检消息的发送是否超时
     */
    protected final ScheduledExecutorService timerExecutor = new ScheduledThreadPoolExecutor(1,
        new NamedThreadFactory("timeoutChecker", 1, true));
    复制代码
    
    1. 通过AbstractNettyRemoting#sendSync同步发送消息,构建MessageFuture并放入futures这个 map 中,发送过程配置监听器 用于处理 channel 异常,指定失败原因并从futures中移除,还要销毁 channel
    protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {
        ...
        // 构建 MessageFuture
        MessageFuture messageFuture = new MessageFuture();
        messageFuture.setRequestMessage(rpcMessage);
        messageFuture.setTimeout(timeoutMillis);
        // 放入 futures 这个map中
        futures.put(rpcMessage.getId(), messageFuture);
        //检查通道是否可以写
        channelWritableCheck(channel, rpcMessage.getBody());
        String remoteAddr = ChannelUtil.getAddressFromChannel(channel);
        //在请求发送之前执行钩子
        doBeforeRpcHooks(remoteAddr, rpcMessage);
        // 发送请求,并配置监听器 用于处理channel异常
        channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
            // 这里响应不成功,基本是channel不正常了
            if (!future.isSuccess()) {
                //移除消息
                MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());
                if (messageFuture1 != null) {
                    messageFuture1.setResultMessage(future.cause());
                }
                //响应不成功,则销毁channel
                destroyChannel(future.channel());
            }
        });
        ...
        //获取响应结果
        Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
        //响应之后执行钩子
        doAfterRpcHooks(remoteAddr, rpcMessage, result);
        ...
    }
    
    1. 正常收到 response 后,给MessageFuture对象赋值,从futures中移除,如ClientOnResponseProcessor#process中的实现
    @Override
    public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
      ...
      // 从futures中移除
      MessageFuture messageFuture = futures.remove(rpcMessage.getId());
      if (messageFuture != null) {
          // 赋值结果
          messageFuture.setResultMessage(rpcMessage.getBody());
      }
    }
    
    1. AbstractNettyRemoting#init中开启定时任务,巡检出futures 这个 map 中的超时对象后从 futures 中移除,不再检查,并指定结果为 TimeoutException
    public void init() {
        // 检测消息同步发送(sendSync(xxx))是否超时,
        // 定时任务默认是延迟3秒,间隔3秒
        timerExecutor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                for (Map.Entry<Integer, MessageFuture> entry : futures.entrySet()) {
                    MessageFuture future = entry.getValue();
                    if (future.isTimeout()) {
                        // 如果过期了则将发送结果设置为TimeoutException
                        // 从futures中移除,不再检查
                        futures.remove(entry.getKey());
                        RpcMessage rpcMessage = future.getRequestMessage();
                        future.setResultMessage(new TimeoutException(String
                            .format("msgId: %s ,msgType: %s ,msg: %s ,request timeout", rpcMessage.getId(), String.valueOf(rpcMessage.getMessageType()), rpcMessage.getBody().toString())));
                        if (LOGGER.isDebugEnabled()) {
                            LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody());
                        }
                    }
                }
    
                nowMills = System.currentTimeMillis();
            }
        }, TIMEOUT_CHECK_INTERVAL, TIMEOUT_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
    }
    

    还有线程池跟服务注册发现和建连相关,会后边篇章再介绍。

    相关文章

      网友评论

          本文标题:Seata 高性能 RPC 通信的实现- 巧用 reactor

          本文链接:https://www.haomeiwen.com/subject/gddrddtx.html