美文网首页rocketMq
rocketmq rpc 通信(二)

rocketmq rpc 通信(二)

作者: 左小星 | 来源:发表于2019-08-04 19:29 被阅读0次

  rocketMq底层使用netty作为网络通信框架,在上一篇 rocketmq rpc 通信(一) 编解码 介绍了rocketMq如何编解码,而本篇将重点介绍rocketMq netty底层的线程模型和通信方式。

1、为什么使用netty

  • netty封装了java底层的nio,而不是使用传统的bio,底层使用reactor线程模型,使用几个线程可以轻松处理上万socket连接,效率更加高效
  • netty底层解决了jdk epoll 空轮训的bug,同时提供了许多处理网络粘包拆包的工具类,开发更加高效
  • netty的api相对简单,开发人员不用被nio的各种事件困扰,底层优秀的pipeline机制更加利于各种handler的扩展,使开发人员更加注重业务逻辑的实现
  • netty是异步编程,可以处理更高的并发,很多优秀的中间件都是基于netty做网络通信

2、rocketMq 使用netty的线程模型介绍

  rocketMq中一个有4类线程,如图表所示

线程数 name 说明
1 BossThread reactor主线程,处理tcp连接建立
3 SelectorWorkerThread reactor worker线程,负责select网络事件,之后转发到编解码线程处理
8 CodecThread netty 的编解码线程,源码中数量为8
N ProcessorThread 业务处理线程池,负责各种业务处理,如读写消息

说明:在rocketMq 对应的netty 的worker线程,实际工作就是select socket 事件,之后转发给code线程,所以源码中的worker线程只设置了3个线程。上图中的线程处理时间长短是逐渐增大的。各个线程之间的交互如图所示

netty线程模型
还有一张图更加清晰的解释了 netty 的pipeline 和 server 端的如何做出处理的,此图来自rocketMq社区
image.png

3、server端流程

server端类继承图

3.1、server端启动流程

  服务端启动时,tcp参数说明

  1. ChannelOption.SO_BACKLOG对应的是tcp/ip协议listen函数中的backlog参数,函数listen(int socketfd,int backlog)用来初始化服务端可连接队列, 服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小
  2. ChanneOption.SO_REUSEADDR对应于套接字选项中的SO_REUSEADDR,这个参数表示允许重复使用本地地址和端口,比如,某个服务器进程占用了TCP的80端口进行监听,此时再次监听该端口就会返回错误,使用该参数就可以解决问题,该参数允许共用该端口,这个在服务器程序中比较常使用
  3. Channeloption.SO_KEEPALIVE参数对应于套接字选项中的SO_KEEPALIVE,该参数用于设置TCP连接,当设置该选项以后,连接会测试链接的状态,这个选项用于可能长时间没有数据交流的连接。当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文。由于rocketMq有心跳机制,所以此参数设置为false。
  4. ChannelOption.TCP_NODELAY 该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输
    server端构造方法,这些都是netty的一下api,需要对netty相对熟悉
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
        final ChannelEventListener channelEventListener) {
        super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
        this.serverBootstrap = new ServerBootstrap();
        this.nettyServerConfig = nettyServerConfig;
        this.channelEventListener = channelEventListener;

        int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
        if (publicThreadNums <= 0) {
            publicThreadNums = 4;
        }

        this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
            }
        });
        // 是否使用epoll
        if (useEpoll()) {
            this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));
                }
            });

            this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                private int threadTotal = nettyServerConfig.getServerSelectorThreads();

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
                }
            });
        } else {
            this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));
                }
            });

            this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                private int threadTotal = nettyServerConfig.getServerSelectorThreads();

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
                }
            });
        }
        loadSslContext();
    }

服务端启动方法

@Override
    public void start() {
        // 处理编解码的线程池
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyServerConfig.getServerWorkerThreads(),
            new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                }
            });

        ServerBootstrap childHandler =
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                .channel(useEpoll() ? EpollServerSocketChannel.class :NioServerSocketChannel.class)// 是否使用epoll
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
                                new HandshakeHandler(TlsSystemConfig.tlsMode))
                                // 这些handler共用defaultEventExecutorGroup这个线程池
                            .addLast(defaultEventExecutorGroup,
                                new NettyEncoder(),
                                new NettyDecoder(),
                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                new NettyConnectManageHandler(),
                                new NettyServerHandler()// 具体处理
                            );
                    }
                });
        if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
            childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        }

        try {
            ChannelFuture sync = this.serverBootstrap.bind().sync();
            InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
            this.port = addr.getPort();
        } catch (InterruptedException e1) {
            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
        }

        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }
    }

  说明:可以看到,在构造函数中初始化boss和worker线程池时,判断是否可以使用epoll,下面大体上来说明下epoll比select好在哪里~
  select的最大缺陷是能打开fd的个数是有限的,linux上默认为1024,对socket的扫描是线性扫描,就是采用轮训的方法效率低下,当socket比较多时并且不管哪个socket是活跃的,都会遍历一遍。select需要存储一个维护fd的数据结构,在用户态和内核态切换时需要拷贝,效率也比较低下。
  epoll支持水平触发和边缘触发,最大的特点在于边缘触发,它只告诉进程哪些fd刚刚变为就绪态,并且只会通知一次。还有一个特点是,epoll使用“事件”的就绪通知方式,通过epoll_ctl注册fd,一旦该fd就绪,内核就会采用类似callback的回调机制来激活该fd,epoll_wait便可以收到通知。epoll没有最大连接数限制存储使用红黑树,没有用户态和内核态之间的内存拷贝,使用mmap做内存映射,效率更加高效。

3.2、server端处理请求

  由最上面类继承图可知,NettyRemotingServer继承了NettyRemotingAbstract,server只会处理request请求,处理完成后将response写回到channel返给客户端。

class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
            // 这个方法是NettyRemotingAbstract的方法
            processMessageReceived(ctx, msg);
        }
    }
// 看下server端如何处理请求,代码在NettyRemotingAbstract
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
        // 服务端启动时会注册相应的processor和处理processor对应的线程池
        final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
        final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
        // client生成的一个请求码,server端原路返回
        final int opaque = cmd.getOpaque();

        if (pair != null) {
            Runnable run = new Runnable() {
                @Override
                public void run() {
                    try {
                        // 执行before钩子方法
                        doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
                        // 具体的processor处理
                        final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
                        // 执行after钩子方法
                        doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);

                        if (!cmd.isOnewayRPC()) {
                            if (response != null) {
                                // 将客户端请求码原路返回,之后client根据opaque从map中获取相应的ResponseFuture,让client端的线程不在等待
                                response.setOpaque(opaque);
                                response.markResponseType();
                                try {
                                    ctx.writeAndFlush(response);
                                } catch (Throwable e) {
                                    log.error("process request over, but response failed", e);
                                    log.error(cmd.toString());
                                    log.error(response.toString());
                                }
                            } else {

                            }
                        }
                    } catch (Throwable e) {
                        log.error("process request exception", e);
                        log.error(cmd.toString());

                        if (!cmd.isOnewayRPC()) {
                            final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
                                RemotingHelper.exceptionSimpleDesc(e));
                            response.setOpaque(opaque);
                            ctx.writeAndFlush(response);
                        }
                    }
                }
            };
            // 执行对应的processor方法,看是否拒绝,这里是在code线程中执行的,所以这个方法不能太耗时
            if (pair.getObject1().rejectRequest()) {
                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                    "[REJECTREQUEST]system busy, start flow control for a while");
                response.setOpaque(opaque);
                ctx.writeAndFlush(response);
                return;
            }

            try {
                final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
                // 提交到对应的processor线程池中执行
                pair.getObject2().submit(requestTask);
            } catch (RejectedExecutionException e) {
                if ((System.currentTimeMillis() % 10000) == 0) {
                    log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
                        + ", too many requests and system thread pool busy, RejectedExecutionException "
                        + pair.getObject2().toString()
                        + " request code: " + cmd.getCode());
                }
                // 由于Oneway 类型的rpc 客户端不关心结果,所以不用返回客户端信息
                if (!cmd.isOnewayRPC()) {
                    final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                        "[OVERLOAD]system busy, start flow control for a while");
                    response.setOpaque(opaque);
                    ctx.writeAndFlush(response);
                }
            }
        } else {
            // 非法的code码,返回客户端错误
            String error = " request type " + cmd.getCode() + " not supported";
            final RemotingCommand response =
                RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
            response.setOpaque(opaque);
            ctx.writeAndFlush(response);
            log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
        }
    }

  说明:服务端在接收到客户端的请求后,由上面注册的defaultEventExecutorGroup线程也就是 编解码线程执行,通过RemotingCommand的code码获取相应的pair key : processor,value : processor对应的执行线程池,编解码线程提交到processor对应的执行线程池中执行,线程池执行完成之后,将结果写入pipeline中,做后续处理。

4、client端流程

client端的类继承图如图所示


client端类继承图

4.1、client端rpc通信方式

  1. sync,客户端线程同步等待
  2. async,客户端线程不同步等待,通过netty异步编程的callback进行通知
  3. oneway,客户端只像server端请求,不关心请求是否成功
4.1.1、client端handler处理

  由上面的类继承图,NettyRemotingClient也继承了NettyRemotingAbstract,在NettyRemotingAbstract#processMessageReceived方法中声明了RESPONSE_COMMAND,就是客户端收到server端的响应具体处理的逻辑

public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        final RemotingCommand cmd = msg;
        if (cmd != null) {
            switch (cmd.getType()) {
                case REQUEST_COMMAND:
                    processRequestCommand(ctx, cmd);
                    break;
                // client端调用这个方法
                case RESPONSE_COMMAND:
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break;
            }
        }
    }

public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
        final int opaque = cmd.getOpaque();
        final ResponseFuture responseFuture = responseTable.get(opaque);
        if (responseFuture != null) {
            responseFuture.setResponseCommand(cmd);

            responseTable.remove(opaque);

            if (responseFuture.getInvokeCallback() != null) {
                // async模式下,执行回调方法
                executeInvokeCallback(responseFuture);
            } else {
                // sync模式下,唤醒client端发送线程,发送线程返回RemotingCommand 对应的response
                responseFuture.putResponse(cmd);
                responseFuture.release();
            }
        } else {
            log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
            log.warn(cmd.toString());
        }
    }
4.1.2、sync调用

调用流程图如图所示


同步调用流程

代码分析如下

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
        final long timeoutMillis)
        throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
        final int opaque = request.getOpaque();

        try {
            // 初始化时,ResponseFuture 内部成员变量 CountDownLatch countDownLatch = new CountDownLatch(1);
            final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
            // 将客户端请求码和ResponseFuture put 到map中,在server端返回结果回调Listener时可以根据客户端请求码获取ResponseFuture
            this.responseTable.put(opaque, responseFuture);
            final SocketAddress addr = channel.remoteAddress();
            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {
                    if (f.isSuccess()) {
                        // 成功,将future设置SendRequestOK为true,直接返回
                        responseFuture.setSendRequestOK(true);
                        return;
                    } else {
                        responseFuture.setSendRequestOK(false);
                    }
                    // 以下操作是失败的情况,删除客户端请求码对应的future,唤醒client端发送线程
                    responseTable.remove(opaque);
                    responseFuture.setCause(f.cause());
                    // 这个方法执行了countDownLatch.countDown(),因此会唤醒client端发送线程。
                    responseFuture.putResponse(null);
                    log.warn("send a request command to channel <" + addr + "> failed.");
                }
            });
            // 客户端发送线程同步等待,就是调用了ResponseFuture 中的 countDownLatch.await
            RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
            if (null == responseCommand) {
                if (responseFuture.isSendRequestOK()) {
                    throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                        responseFuture.getCause());
                } else {
                    throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
                }
            }

            return responseCommand;
        } finally {
            this.responseTable.remove(opaque);
        }
    }

  说明:在同步调用过程中,有两个地方执行了responseFuture中countDownLatch的countdown操作,一个是在发送数据注册的Listener中,一个是在NettyClientHandler中,这两个地方都会唤醒client端发送线程,Listener中对应的是rpc通信失败,NettyClientHandler对应的是请求成功

4.1.3、async调用

调用流程图如图所示


异步调用流程

代码如下

public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
       final InvokeCallback invokeCallback)
       throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
       long beginStartTime = System.currentTimeMillis();
       final int opaque = request.getOpaque();
       // 获取一个信号量,防止发送太快,release的逻辑在执行完callback方法后
       boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
       if (acquired) {
           // 在 once 的release 方法里,会将信号量release 减一
           final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
           long costTime = System.currentTimeMillis() - beginStartTime;
           if (timeoutMillis < costTime) {
               once.release();
               throw new RemotingTimeoutException("invokeAsyncImpl call timeout");
           }

           final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
           this.responseTable.put(opaque, responseFuture);
           try {
               channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                   @Override
                   public void operationComplete(ChannelFuture f) throws Exception {
                       if (f.isSuccess()) {
                           // 这里是执行成功了,设置为sendRequestOK为true,直接返回,回调方法在NettyClientHandler执行了
                           responseFuture.setSendRequestOK(true);
                           return;
                       }
                       // 这里执行失败,将失败结果传递给回调方法执行
                       requestFail(opaque);
                       log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
                   }
               });
           } catch (Exception e) {
               responseFuture.release();
               log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
               throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
           }
       } else {
           if (timeoutMillis <= 0) {
               throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
           } else {
               String info =
                   String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
                       timeoutMillis,
                       this.semaphoreAsync.getQueueLength(),
                       this.semaphoreAsync.availablePermits()
                   );
               log.warn(info);
               throw new RemotingTimeoutException(info);
           }
       }
   }

  说明:异步调用其实就是传入了一个callback,这样做的好处是不用阻塞client端的调用线程,在server端处理成功后,利用netty的异步编程,调用回调函数,完全实现了异步化,吞吐量有所上升,注意:异步调用有可能会丢失数据(server端突然宕机,数据会丢失)。rocketMq异步调用为了防止发送过快服务端压力过大,在client端设置了信号量 Semaphore,调用时先获取一个,在回调成功执行完callback函数后会将信号量 release。

4.1.3、oneway调用

  说明:oneway调用客户端不关心server端是否处理成功,这种模式适合哪种不重要的调用,同时server端发生异常时发现是oneway调用,也不会向client端发送结果数据

4.1.4、async调用定时清理

  异步调用时,有可能服务端宕机,这时候客户端会永远接收不到服务端的响应数据,因此async调用时put到map中的ResponseFuture就会一直在客户端的内存中,还有重要的一点是async调用时获取了一个信号量也得不到释放,最终async有可能调用不了,这时候需要把这些数据清除掉,代码如下

public void scanResponseTable() {
        final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
        Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<Integer, ResponseFuture> next = it.next();
            ResponseFuture rep = next.getValue();

            if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
                // 释放信号量
                rep.release();
                // 从map中remove
                it.remove();
                rfList.add(rep);
                log.warn("remove timeout request, " + rep);
            }
        }

        for (ResponseFuture rf : rfList) {
            try {
                // 异步执行callback方法
                executeInvokeCallback(rf);
            } catch (Throwable e) {
                log.warn("scanResponseTable, operationComplete Exception", e);
            }
        }
    }

5、客户端连接管理

  在server端启动时,设置了一个TCP选项 ChannelOption.SO_KEEPALIVE, false,表示禁用tcp的keepalive,由于rocketMq有心跳机制,所以取消了这个选项。
在server端初始化pipeline时,添加了一个 IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),nettyServerConfig.getServerChannelMaxIdleTimeSeconds()默认是120秒,表示120秒没有发生读写请求,netty 会实例化一个 IdleStateEvent 沿着pipeline传输,NettyConnectManageHandler中监听了这个事件,会将这个channel remove掉,具体看下源码。
IdleStateHandler是netty里面的一个类,内部逻辑就是在channel有读或写事件时,更新一下lastReadTime和lastWriteTime,然后起一个定时任务检测和当前时间比较是否超过了allIdleTimeNanos设置的120秒。

// 初始化IdleStateHandler时,readerIdleTime = 0,writerIdleTime = 0,allIdleTime = 120
// 在超过120秒没有读写请求后,这是
public void run() {
            if (!ctx.channel().isOpen()) {
                return;
            }

            long nextDelay = allIdleTimeNanos;
            if (!reading) {
                nextDelay -= System.nanoTime() - Math.max(lastReadTime, lastWriteTime);
            }
            if (nextDelay <= 0) {
                // Both reader and writer are idle - set a new timeout and
                // notify the callback.
                allIdleTimeout = ctx.executor().schedule(
                        this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
                try {
                    IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, firstAllIdleEvent);
                    if (firstAllIdleEvent) {
                        firstAllIdleEvent = false;
                    }
                    // 发送IdleStateEvent事件到pipeline中,NettyConnectManageHandler会处理这个事件
                    channelIdle(ctx, event);
                } catch (Throwable t) {
                    ctx.fireExceptionCaught(t);
                }
            } else {
                // Either read or write occurred before the timeout - set a new
                // timeout with shorter delay.
                allIdleTimeout = ctx.executor().schedule(this, nextDelay, TimeUnit.NANOSECONDS);
            }
        }
// NettyConnectManageHandler处理
class NettyConnectManageHandler extends ChannelDuplexHandler {
        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
            log.info("NETTY SERVER PIPELINE: channelRegistered {}", remoteAddress);
            super.channelRegistered(ctx);
        }

        @Override
        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
            log.info("NETTY SERVER PIPELINE: channelUnregistered, the channel[{}]", remoteAddress);
            super.channelUnregistered(ctx);
        }

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
            log.info("NETTY SERVER PIPELINE: channelActive, the channel[{}]", remoteAddress);
            super.channelActive(ctx);

            if (NettyRemotingServer.this.channelEventListener != null) {
                // channel 连接事件
                NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel()));
            }
        }

        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
            log.info("NETTY SERVER PIPELINE: channelInactive, the channel[{}]", remoteAddress);
            super.channelInactive(ctx);

            if (NettyRemotingServer.this.channelEventListener != null) {
                // channel关闭事件
                NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
            }
        }

        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            // 这里处理IdleStateHandler 中发出来的事件
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent event = (IdleStateEvent) evt;
                if (event.state().equals(IdleState.ALL_IDLE)) {
                    final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                    log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress);
                    RemotingUtil.closeChannel(ctx.channel());
                    if (NettyRemotingServer.this.channelEventListener != null) {
                        // 封装NettyEvent,放入队列,别的线程从队列中取相应的事件做处理
                        NettyRemotingServer.this
                            .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
                    }
                }
            }

            ctx.fireUserEventTriggered(evt);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
            log.warn("NETTY SERVER PIPELINE: exceptionCaught {}", remoteAddress);
            log.warn("NETTY SERVER PIPELINE: exceptionCaught exception.", cause);

            if (NettyRemotingServer.this.channelEventListener != null) {
                // 发生异常事件
                NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
            }

            RemotingUtil.closeChannel(ctx.channel());
        }
    }
// 从队列中取事件的线程
class NettyEventExecutor extends ServiceThread {
        private final LinkedBlockingQueue<NettyEvent> eventQueue = new LinkedBlockingQueue<NettyEvent>();
        private final int maxSize = 10000;

        public void putNettyEvent(final NettyEvent event) {
            if (this.eventQueue.size() <= maxSize) {
                this.eventQueue.add(event);
            } else {
                log.warn("event queue size[{}] enough, so drop this event {}", this.eventQueue.size(), event.toString());
            }
        }

        @Override
        public void run() {
            log.info(this.getServiceName() + " service started");

            final ChannelEventListener listener = NettyRemotingAbstract.this.getChannelEventListener();

            while (!this.isStopped()) {
                try {
                    NettyEvent event = this.eventQueue.poll(3000, TimeUnit.MILLISECONDS);
                    if (event != null && listener != null) {
                        switch (event.getType()) {
                            case IDLE:
                                listener.onChannelIdle(event.getRemoteAddr(), event.getChannel());
                                break;
                            case CLOSE:
                                listener.onChannelClose(event.getRemoteAddr(), event.getChannel());
                                break;
                            case CONNECT:
                                listener.onChannelConnect(event.getRemoteAddr(), event.getChannel());
                                break;
                            case EXCEPTION:
                                listener.onChannelException(event.getRemoteAddr(), event.getChannel());
                                break;
                            default:
                                break;

                        }
                    }
                } catch (Exception e) {
                    log.warn(this.getServiceName() + " service has exception. ", e);
                }
            }

            log.info(this.getServiceName() + " service end");
        }

        @Override
        public String getServiceName() {
            return NettyEventExecutor.class.getSimpleName();
        }
    }

  说明:当有客户端连接或关闭或超过一定时间没有读写请求时,NettyConnectManageHandler会捕捉到相应的事件,之后放到阻塞队列中,有一个单独的NettyEventExecutor从队列中获取事件,调用相应的listener做处理。

6、总结

  本文介绍了rocketMq 使用netty的线程模型,client端发送数据,server端接收数据并根据相应的请求code码找到相应的processor提交到线程池中处理,处理完成后将结果写会客户端。客户端调用又分为3中模式,还介绍了channel的管理机制,netty底层其实还是挺复杂的,还需要深入学习。以上就是通过看rocketMq rpc模块相关代码总结出来的,如有错误,欢迎指出讨论~

相关文章

网友评论

    本文标题:rocketmq rpc 通信(二)

    本文链接:https://www.haomeiwen.com/subject/uoqydctx.html