美文网首页rocketMq
rocketmq rpc 通信(二)

rocketmq rpc 通信(二)

作者: 左小星 | 来源:发表于2019-08-04 19:29 被阅读0次

      rocketMq底层使用netty作为网络通信框架,在上一篇 rocketmq rpc 通信(一) 编解码 介绍了rocketMq如何编解码,而本篇将重点介绍rocketMq netty底层的线程模型和通信方式。

    1、为什么使用netty

    • netty封装了java底层的nio,而不是使用传统的bio,底层使用reactor线程模型,使用几个线程可以轻松处理上万socket连接,效率更加高效
    • netty底层解决了jdk epoll 空轮训的bug,同时提供了许多处理网络粘包拆包的工具类,开发更加高效
    • netty的api相对简单,开发人员不用被nio的各种事件困扰,底层优秀的pipeline机制更加利于各种handler的扩展,使开发人员更加注重业务逻辑的实现
    • netty是异步编程,可以处理更高的并发,很多优秀的中间件都是基于netty做网络通信

    2、rocketMq 使用netty的线程模型介绍

      rocketMq中一个有4类线程,如图表所示

    线程数 name 说明
    1 BossThread reactor主线程,处理tcp连接建立
    3 SelectorWorkerThread reactor worker线程,负责select网络事件,之后转发到编解码线程处理
    8 CodecThread netty 的编解码线程,源码中数量为8
    N ProcessorThread 业务处理线程池,负责各种业务处理,如读写消息

    说明:在rocketMq 对应的netty 的worker线程,实际工作就是select socket 事件,之后转发给code线程,所以源码中的worker线程只设置了3个线程。上图中的线程处理时间长短是逐渐增大的。各个线程之间的交互如图所示

    netty线程模型
    还有一张图更加清晰的解释了 netty 的pipeline 和 server 端的如何做出处理的,此图来自rocketMq社区
    image.png

    3、server端流程

    server端类继承图

    3.1、server端启动流程

      服务端启动时,tcp参数说明

    1. ChannelOption.SO_BACKLOG对应的是tcp/ip协议listen函数中的backlog参数,函数listen(int socketfd,int backlog)用来初始化服务端可连接队列, 服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小
    2. ChanneOption.SO_REUSEADDR对应于套接字选项中的SO_REUSEADDR,这个参数表示允许重复使用本地地址和端口,比如,某个服务器进程占用了TCP的80端口进行监听,此时再次监听该端口就会返回错误,使用该参数就可以解决问题,该参数允许共用该端口,这个在服务器程序中比较常使用
    3. Channeloption.SO_KEEPALIVE参数对应于套接字选项中的SO_KEEPALIVE,该参数用于设置TCP连接,当设置该选项以后,连接会测试链接的状态,这个选项用于可能长时间没有数据交流的连接。当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文。由于rocketMq有心跳机制,所以此参数设置为false。
    4. ChannelOption.TCP_NODELAY 该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输
      server端构造方法,这些都是netty的一下api,需要对netty相对熟悉
    public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
            final ChannelEventListener channelEventListener) {
            super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
            this.serverBootstrap = new ServerBootstrap();
            this.nettyServerConfig = nettyServerConfig;
            this.channelEventListener = channelEventListener;
    
            int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
            if (publicThreadNums <= 0) {
                publicThreadNums = 4;
            }
    
            this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
    
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
                }
            });
            // 是否使用epoll
            if (useEpoll()) {
                this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {
                    private AtomicInteger threadIndex = new AtomicInteger(0);
    
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));
                    }
                });
    
                this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
                    private AtomicInteger threadIndex = new AtomicInteger(0);
                    private int threadTotal = nettyServerConfig.getServerSelectorThreads();
    
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
                    }
                });
            } else {
                this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
                    private AtomicInteger threadIndex = new AtomicInteger(0);
    
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));
                    }
                });
    
                this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
                    private AtomicInteger threadIndex = new AtomicInteger(0);
                    private int threadTotal = nettyServerConfig.getServerSelectorThreads();
    
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
                    }
                });
            }
            loadSslContext();
        }
    

    服务端启动方法

    @Override
        public void start() {
            // 处理编解码的线程池
            this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
                nettyServerConfig.getServerWorkerThreads(),
                new ThreadFactory() {
                    private AtomicInteger threadIndex = new AtomicInteger(0);
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                    }
                });
    
            ServerBootstrap childHandler =
                this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                    .channel(useEpoll() ? EpollServerSocketChannel.class :NioServerSocketChannel.class)// 是否使用epoll
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .option(ChannelOption.SO_REUSEADDR, true)
                    .option(ChannelOption.SO_KEEPALIVE, false)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                    .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                    .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
                                    new HandshakeHandler(TlsSystemConfig.tlsMode))
                                    // 这些handler共用defaultEventExecutorGroup这个线程池
                                .addLast(defaultEventExecutorGroup,
                                    new NettyEncoder(),
                                    new NettyDecoder(),
                                    new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                    new NettyConnectManageHandler(),
                                    new NettyServerHandler()// 具体处理
                                );
                        }
                    });
            if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
                childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
            }
    
            try {
                ChannelFuture sync = this.serverBootstrap.bind().sync();
                InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
                this.port = addr.getPort();
            } catch (InterruptedException e1) {
                throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
            }
    
            if (this.channelEventListener != null) {
                this.nettyEventExecutor.start();
            }
        }
    

      说明:可以看到,在构造函数中初始化boss和worker线程池时,判断是否可以使用epoll,下面大体上来说明下epoll比select好在哪里~
      select的最大缺陷是能打开fd的个数是有限的,linux上默认为1024,对socket的扫描是线性扫描,就是采用轮训的方法效率低下,当socket比较多时并且不管哪个socket是活跃的,都会遍历一遍。select需要存储一个维护fd的数据结构,在用户态和内核态切换时需要拷贝,效率也比较低下。
      epoll支持水平触发和边缘触发,最大的特点在于边缘触发,它只告诉进程哪些fd刚刚变为就绪态,并且只会通知一次。还有一个特点是,epoll使用“事件”的就绪通知方式,通过epoll_ctl注册fd,一旦该fd就绪,内核就会采用类似callback的回调机制来激活该fd,epoll_wait便可以收到通知。epoll没有最大连接数限制存储使用红黑树,没有用户态和内核态之间的内存拷贝,使用mmap做内存映射,效率更加高效。

    3.2、server端处理请求

      由最上面类继承图可知,NettyRemotingServer继承了NettyRemotingAbstract,server只会处理request请求,处理完成后将response写回到channel返给客户端。

    class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
            @Override
            protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
                // 这个方法是NettyRemotingAbstract的方法
                processMessageReceived(ctx, msg);
            }
        }
    // 看下server端如何处理请求,代码在NettyRemotingAbstract
    public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
            // 服务端启动时会注册相应的processor和处理processor对应的线程池
            final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
            final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
            // client生成的一个请求码,server端原路返回
            final int opaque = cmd.getOpaque();
    
            if (pair != null) {
                Runnable run = new Runnable() {
                    @Override
                    public void run() {
                        try {
                            // 执行before钩子方法
                            doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
                            // 具体的processor处理
                            final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
                            // 执行after钩子方法
                            doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
    
                            if (!cmd.isOnewayRPC()) {
                                if (response != null) {
                                    // 将客户端请求码原路返回,之后client根据opaque从map中获取相应的ResponseFuture,让client端的线程不在等待
                                    response.setOpaque(opaque);
                                    response.markResponseType();
                                    try {
                                        ctx.writeAndFlush(response);
                                    } catch (Throwable e) {
                                        log.error("process request over, but response failed", e);
                                        log.error(cmd.toString());
                                        log.error(response.toString());
                                    }
                                } else {
    
                                }
                            }
                        } catch (Throwable e) {
                            log.error("process request exception", e);
                            log.error(cmd.toString());
    
                            if (!cmd.isOnewayRPC()) {
                                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
                                    RemotingHelper.exceptionSimpleDesc(e));
                                response.setOpaque(opaque);
                                ctx.writeAndFlush(response);
                            }
                        }
                    }
                };
                // 执行对应的processor方法,看是否拒绝,这里是在code线程中执行的,所以这个方法不能太耗时
                if (pair.getObject1().rejectRequest()) {
                    final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                        "[REJECTREQUEST]system busy, start flow control for a while");
                    response.setOpaque(opaque);
                    ctx.writeAndFlush(response);
                    return;
                }
    
                try {
                    final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
                    // 提交到对应的processor线程池中执行
                    pair.getObject2().submit(requestTask);
                } catch (RejectedExecutionException e) {
                    if ((System.currentTimeMillis() % 10000) == 0) {
                        log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
                            + ", too many requests and system thread pool busy, RejectedExecutionException "
                            + pair.getObject2().toString()
                            + " request code: " + cmd.getCode());
                    }
                    // 由于Oneway 类型的rpc 客户端不关心结果,所以不用返回客户端信息
                    if (!cmd.isOnewayRPC()) {
                        final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                            "[OVERLOAD]system busy, start flow control for a while");
                        response.setOpaque(opaque);
                        ctx.writeAndFlush(response);
                    }
                }
            } else {
                // 非法的code码,返回客户端错误
                String error = " request type " + cmd.getCode() + " not supported";
                final RemotingCommand response =
                    RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
                response.setOpaque(opaque);
                ctx.writeAndFlush(response);
                log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
            }
        }
    
    

      说明:服务端在接收到客户端的请求后,由上面注册的defaultEventExecutorGroup线程也就是 编解码线程执行,通过RemotingCommand的code码获取相应的pair key : processor,value : processor对应的执行线程池,编解码线程提交到processor对应的执行线程池中执行,线程池执行完成之后,将结果写入pipeline中,做后续处理。

    4、client端流程

    client端的类继承图如图所示


    client端类继承图

    4.1、client端rpc通信方式

    1. sync,客户端线程同步等待
    2. async,客户端线程不同步等待,通过netty异步编程的callback进行通知
    3. oneway,客户端只像server端请求,不关心请求是否成功
    4.1.1、client端handler处理

      由上面的类继承图,NettyRemotingClient也继承了NettyRemotingAbstract,在NettyRemotingAbstract#processMessageReceived方法中声明了RESPONSE_COMMAND,就是客户端收到server端的响应具体处理的逻辑

    public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
            final RemotingCommand cmd = msg;
            if (cmd != null) {
                switch (cmd.getType()) {
                    case REQUEST_COMMAND:
                        processRequestCommand(ctx, cmd);
                        break;
                    // client端调用这个方法
                    case RESPONSE_COMMAND:
                        processResponseCommand(ctx, cmd);
                        break;
                    default:
                        break;
                }
            }
        }
    
    public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
            final int opaque = cmd.getOpaque();
            final ResponseFuture responseFuture = responseTable.get(opaque);
            if (responseFuture != null) {
                responseFuture.setResponseCommand(cmd);
    
                responseTable.remove(opaque);
    
                if (responseFuture.getInvokeCallback() != null) {
                    // async模式下,执行回调方法
                    executeInvokeCallback(responseFuture);
                } else {
                    // sync模式下,唤醒client端发送线程,发送线程返回RemotingCommand 对应的response
                    responseFuture.putResponse(cmd);
                    responseFuture.release();
                }
            } else {
                log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
                log.warn(cmd.toString());
            }
        }
    
    4.1.2、sync调用

    调用流程图如图所示


    同步调用流程

    代码分析如下

    public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
            final long timeoutMillis)
            throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
            final int opaque = request.getOpaque();
    
            try {
                // 初始化时,ResponseFuture 内部成员变量 CountDownLatch countDownLatch = new CountDownLatch(1);
                final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
                // 将客户端请求码和ResponseFuture put 到map中,在server端返回结果回调Listener时可以根据客户端请求码获取ResponseFuture
                this.responseTable.put(opaque, responseFuture);
                final SocketAddress addr = channel.remoteAddress();
                channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture f) throws Exception {
                        if (f.isSuccess()) {
                            // 成功,将future设置SendRequestOK为true,直接返回
                            responseFuture.setSendRequestOK(true);
                            return;
                        } else {
                            responseFuture.setSendRequestOK(false);
                        }
                        // 以下操作是失败的情况,删除客户端请求码对应的future,唤醒client端发送线程
                        responseTable.remove(opaque);
                        responseFuture.setCause(f.cause());
                        // 这个方法执行了countDownLatch.countDown(),因此会唤醒client端发送线程。
                        responseFuture.putResponse(null);
                        log.warn("send a request command to channel <" + addr + "> failed.");
                    }
                });
                // 客户端发送线程同步等待,就是调用了ResponseFuture 中的 countDownLatch.await
                RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
                if (null == responseCommand) {
                    if (responseFuture.isSendRequestOK()) {
                        throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                            responseFuture.getCause());
                    } else {
                        throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
                    }
                }
    
                return responseCommand;
            } finally {
                this.responseTable.remove(opaque);
            }
        }
    

      说明:在同步调用过程中,有两个地方执行了responseFuture中countDownLatch的countdown操作,一个是在发送数据注册的Listener中,一个是在NettyClientHandler中,这两个地方都会唤醒client端发送线程,Listener中对应的是rpc通信失败,NettyClientHandler对应的是请求成功

    4.1.3、async调用

    调用流程图如图所示


    异步调用流程

    代码如下

    public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
           final InvokeCallback invokeCallback)
           throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
           long beginStartTime = System.currentTimeMillis();
           final int opaque = request.getOpaque();
           // 获取一个信号量,防止发送太快,release的逻辑在执行完callback方法后
           boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
           if (acquired) {
               // 在 once 的release 方法里,会将信号量release 减一
               final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
               long costTime = System.currentTimeMillis() - beginStartTime;
               if (timeoutMillis < costTime) {
                   once.release();
                   throw new RemotingTimeoutException("invokeAsyncImpl call timeout");
               }
    
               final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
               this.responseTable.put(opaque, responseFuture);
               try {
                   channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                       @Override
                       public void operationComplete(ChannelFuture f) throws Exception {
                           if (f.isSuccess()) {
                               // 这里是执行成功了,设置为sendRequestOK为true,直接返回,回调方法在NettyClientHandler执行了
                               responseFuture.setSendRequestOK(true);
                               return;
                           }
                           // 这里执行失败,将失败结果传递给回调方法执行
                           requestFail(opaque);
                           log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
                       }
                   });
               } catch (Exception e) {
                   responseFuture.release();
                   log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
                   throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
               }
           } else {
               if (timeoutMillis <= 0) {
                   throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
               } else {
                   String info =
                       String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
                           timeoutMillis,
                           this.semaphoreAsync.getQueueLength(),
                           this.semaphoreAsync.availablePermits()
                       );
                   log.warn(info);
                   throw new RemotingTimeoutException(info);
               }
           }
       }
    

      说明:异步调用其实就是传入了一个callback,这样做的好处是不用阻塞client端的调用线程,在server端处理成功后,利用netty的异步编程,调用回调函数,完全实现了异步化,吞吐量有所上升,注意:异步调用有可能会丢失数据(server端突然宕机,数据会丢失)。rocketMq异步调用为了防止发送过快服务端压力过大,在client端设置了信号量 Semaphore,调用时先获取一个,在回调成功执行完callback函数后会将信号量 release。

    4.1.3、oneway调用

      说明:oneway调用客户端不关心server端是否处理成功,这种模式适合哪种不重要的调用,同时server端发生异常时发现是oneway调用,也不会向client端发送结果数据

    4.1.4、async调用定时清理

      异步调用时,有可能服务端宕机,这时候客户端会永远接收不到服务端的响应数据,因此async调用时put到map中的ResponseFuture就会一直在客户端的内存中,还有重要的一点是async调用时获取了一个信号量也得不到释放,最终async有可能调用不了,这时候需要把这些数据清除掉,代码如下

    public void scanResponseTable() {
            final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
            Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();
            while (it.hasNext()) {
                Entry<Integer, ResponseFuture> next = it.next();
                ResponseFuture rep = next.getValue();
    
                if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
                    // 释放信号量
                    rep.release();
                    // 从map中remove
                    it.remove();
                    rfList.add(rep);
                    log.warn("remove timeout request, " + rep);
                }
            }
    
            for (ResponseFuture rf : rfList) {
                try {
                    // 异步执行callback方法
                    executeInvokeCallback(rf);
                } catch (Throwable e) {
                    log.warn("scanResponseTable, operationComplete Exception", e);
                }
            }
        }
    

    5、客户端连接管理

      在server端启动时,设置了一个TCP选项 ChannelOption.SO_KEEPALIVE, false,表示禁用tcp的keepalive,由于rocketMq有心跳机制,所以取消了这个选项。
    在server端初始化pipeline时,添加了一个 IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),nettyServerConfig.getServerChannelMaxIdleTimeSeconds()默认是120秒,表示120秒没有发生读写请求,netty 会实例化一个 IdleStateEvent 沿着pipeline传输,NettyConnectManageHandler中监听了这个事件,会将这个channel remove掉,具体看下源码。
    IdleStateHandler是netty里面的一个类,内部逻辑就是在channel有读或写事件时,更新一下lastReadTime和lastWriteTime,然后起一个定时任务检测和当前时间比较是否超过了allIdleTimeNanos设置的120秒。

    // 初始化IdleStateHandler时,readerIdleTime = 0,writerIdleTime = 0,allIdleTime = 120
    // 在超过120秒没有读写请求后,这是
    public void run() {
                if (!ctx.channel().isOpen()) {
                    return;
                }
    
                long nextDelay = allIdleTimeNanos;
                if (!reading) {
                    nextDelay -= System.nanoTime() - Math.max(lastReadTime, lastWriteTime);
                }
                if (nextDelay <= 0) {
                    // Both reader and writer are idle - set a new timeout and
                    // notify the callback.
                    allIdleTimeout = ctx.executor().schedule(
                            this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
                    try {
                        IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, firstAllIdleEvent);
                        if (firstAllIdleEvent) {
                            firstAllIdleEvent = false;
                        }
                        // 发送IdleStateEvent事件到pipeline中,NettyConnectManageHandler会处理这个事件
                        channelIdle(ctx, event);
                    } catch (Throwable t) {
                        ctx.fireExceptionCaught(t);
                    }
                } else {
                    // Either read or write occurred before the timeout - set a new
                    // timeout with shorter delay.
                    allIdleTimeout = ctx.executor().schedule(this, nextDelay, TimeUnit.NANOSECONDS);
                }
            }
    // NettyConnectManageHandler处理
    class NettyConnectManageHandler extends ChannelDuplexHandler {
            @Override
            public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
                final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                log.info("NETTY SERVER PIPELINE: channelRegistered {}", remoteAddress);
                super.channelRegistered(ctx);
            }
    
            @Override
            public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
                final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                log.info("NETTY SERVER PIPELINE: channelUnregistered, the channel[{}]", remoteAddress);
                super.channelUnregistered(ctx);
            }
    
            @Override
            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                log.info("NETTY SERVER PIPELINE: channelActive, the channel[{}]", remoteAddress);
                super.channelActive(ctx);
    
                if (NettyRemotingServer.this.channelEventListener != null) {
                    // channel 连接事件
                    NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel()));
                }
            }
    
            @Override
            public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                log.info("NETTY SERVER PIPELINE: channelInactive, the channel[{}]", remoteAddress);
                super.channelInactive(ctx);
    
                if (NettyRemotingServer.this.channelEventListener != null) {
                    // channel关闭事件
                    NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
                }
            }
    
            @Override
            public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                // 这里处理IdleStateHandler 中发出来的事件
                if (evt instanceof IdleStateEvent) {
                    IdleStateEvent event = (IdleStateEvent) evt;
                    if (event.state().equals(IdleState.ALL_IDLE)) {
                        final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                        log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress);
                        RemotingUtil.closeChannel(ctx.channel());
                        if (NettyRemotingServer.this.channelEventListener != null) {
                            // 封装NettyEvent,放入队列,别的线程从队列中取相应的事件做处理
                            NettyRemotingServer.this
                                .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
                        }
                    }
                }
    
                ctx.fireUserEventTriggered(evt);
            }
    
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                log.warn("NETTY SERVER PIPELINE: exceptionCaught {}", remoteAddress);
                log.warn("NETTY SERVER PIPELINE: exceptionCaught exception.", cause);
    
                if (NettyRemotingServer.this.channelEventListener != null) {
                    // 发生异常事件
                    NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
                }
    
                RemotingUtil.closeChannel(ctx.channel());
            }
        }
    // 从队列中取事件的线程
    class NettyEventExecutor extends ServiceThread {
            private final LinkedBlockingQueue<NettyEvent> eventQueue = new LinkedBlockingQueue<NettyEvent>();
            private final int maxSize = 10000;
    
            public void putNettyEvent(final NettyEvent event) {
                if (this.eventQueue.size() <= maxSize) {
                    this.eventQueue.add(event);
                } else {
                    log.warn("event queue size[{}] enough, so drop this event {}", this.eventQueue.size(), event.toString());
                }
            }
    
            @Override
            public void run() {
                log.info(this.getServiceName() + " service started");
    
                final ChannelEventListener listener = NettyRemotingAbstract.this.getChannelEventListener();
    
                while (!this.isStopped()) {
                    try {
                        NettyEvent event = this.eventQueue.poll(3000, TimeUnit.MILLISECONDS);
                        if (event != null && listener != null) {
                            switch (event.getType()) {
                                case IDLE:
                                    listener.onChannelIdle(event.getRemoteAddr(), event.getChannel());
                                    break;
                                case CLOSE:
                                    listener.onChannelClose(event.getRemoteAddr(), event.getChannel());
                                    break;
                                case CONNECT:
                                    listener.onChannelConnect(event.getRemoteAddr(), event.getChannel());
                                    break;
                                case EXCEPTION:
                                    listener.onChannelException(event.getRemoteAddr(), event.getChannel());
                                    break;
                                default:
                                    break;
    
                            }
                        }
                    } catch (Exception e) {
                        log.warn(this.getServiceName() + " service has exception. ", e);
                    }
                }
    
                log.info(this.getServiceName() + " service end");
            }
    
            @Override
            public String getServiceName() {
                return NettyEventExecutor.class.getSimpleName();
            }
        }
    
    

      说明:当有客户端连接或关闭或超过一定时间没有读写请求时,NettyConnectManageHandler会捕捉到相应的事件,之后放到阻塞队列中,有一个单独的NettyEventExecutor从队列中获取事件,调用相应的listener做处理。

    6、总结

      本文介绍了rocketMq 使用netty的线程模型,client端发送数据,server端接收数据并根据相应的请求code码找到相应的processor提交到线程池中处理,处理完成后将结果写会客户端。客户端调用又分为3中模式,还介绍了channel的管理机制,netty底层其实还是挺复杂的,还需要深入学习。以上就是通过看rocketMq rpc模块相关代码总结出来的,如有错误,欢迎指出讨论~

    相关文章

      网友评论

        本文标题:rocketmq rpc 通信(二)

        本文链接:https://www.haomeiwen.com/subject/uoqydctx.html