美文网首页消息中间件
RocketMQ源码-基于Netty的通信层设计

RocketMQ源码-基于Netty的通信层设计

作者: persisting_ | 来源:发表于2019-07-18 22:54 被阅读7次


    1 概述
    2 Netty通信服务端
    3 Netty通信客户端

    1 概述

    看RocketMQ的源码可以知道,RocketMQ各组件的通信主要基于Netty实现,这里用“主要”是因为RocketMQ的通信也有采用原生Java NIO的实现方式,比如Master Broker和Slave Broker之间HA实现就采用原生Java NIO实现,具体可参考HAService以及HAConnection等相关类的实现(后面有机会会有专文介绍RocketMQ HA实现)。

    下面我们会直接介绍RocketMQ使用Netty通信的相关类实现原理,如果对Netty不太了解,可以看本人Netty系列文章。在介绍通信原理时,我们重点介绍RocketMQ Netty通信服务端的实现原理,客户端的实现和服务端类似,只做简单描述。

    2 Netty通信服务端

    在RocketMQ中,Producer、Consumer会向Broker发送请求,进行消息存储和消费,Producer、Consumer、Broker也都会从Namesrv进行注册或查询路由信息。所以作为服务提供端的Broker和Namesrv在启动时都会实例化Netty服务端组件。这里我们主要介绍BrokerController对象持有的NettyRemotingServer类。

    RocketMQ服务端接口为RemotingServer,其提供的默认实现为NettyRemotingServer,其在BrokerController.initialize方法中被实例化,源码如下:

    //BrokerController
    public boolean initialize() throws CloneNotSupportedException {
        ···
         this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
        NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
        fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
        this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
        ...
    }
    

    通过上述源码可知,BrokerController持有两个NettyRemotingServer类实例,分别为remotingServerfastRemotingServerremotingServer监听通过参数listenPort(默认为10911)配置的端口号,而fastRemotingServer则监听remotingServer监听的端口号-2,这两个Netty服务类实例的区别我们会在后面章节介绍NettyRequestProcessor时进行介绍。

    下面我们继续看NettyRemotingServer构造函数:

    //NettyRemotingServer
    public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
        final ChannelEventListener channelEventListener) {
        //首先调用父类构造函数,传入参数为oneway(客户端单向访问无返回)
        //以及客户端异步访问两种请求类型的并发数,这里不做过多介绍
        super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
        //熟悉的Netty服务端启动类
        this.serverBootstrap = new ServerBootstrap();
        this.nettyServerConfig = nettyServerConfig;
        this.channelEventListener = channelEventListener;
    
        //从配置中获取服务端事件处理线程池线程数量,注意这个线程池和
        //Netty中的线程池没有关系,Netty线程读到网络事件之后,会调用
        //RocketMQ自定义的Processor(此Processor不要和Netty的Handler
        //混淆)进行逻辑处理,Processor的逻辑处理会被封装为Runnable
        //放入下面的publicExecutor线程池。publicThreadNums则定义
        //线程池publicExecutor线程池线程数量
        int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
        if (publicThreadNums <= 0) {
            publicThreadNums = 4;
        }
    
        this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);
    
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
            }
        });
        //根据是否使用Epoll创建不同的EventloopGroup,一个用于Accept
        //一个用于接收的客户端channel的select
        //如果平台为Linux且配置使用epoo且检测平台可以使用epoll,则
        //useEpoll返回true
        if (useEpoll()) {
            this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
    
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));
                }
            });
    
            this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                private int threadTotal = nettyServerConfig.getServerSelectorThreads();
    
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
                }
            });
        } else {
            this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
    
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));
                }
            });
    
            this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                private int threadTotal = nettyServerConfig.getServerSelectorThreads();
    
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
                }
            });
        }
        //有关安全套接字的上下文加载,本文不做介绍
        loadSslContext();
    }
    

    好了,NettyRemotingServer的构造函数已经介绍完毕,下面我们看其start方法实现,在start方法中机会见到我们熟悉的handler的注册等:

    //NettyRemotingServer
    @Override
    public void start() {
        //defaultEventExecutorGroup用于执行定时任务、用户任务等,具体可见
        //本文Netty文件集中channelpipiline和channelhandler相关介绍
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyServerConfig.getServerWorkerThreads(),
            new ThreadFactory() {
    
                private AtomicInteger threadIndex = new AtomicInteger(0);
    
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                }
            });
    
        ServerBootstrap childHandler =
            //accept线程池和select线程池
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                //相关option配置
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                //配置本地监听端口号
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                //用于初始化接受的客户端连接pipeline的初始化handler
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        //对于每个客户端连接,其pipeline注册了如下的
                        //handler
                        ch.pipeline()
                        //此handler用于ssl握手,第一次完成握手后会从
                        //pipeline中被移除
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
                                new HandshakeHandler(TlsSystemConfig.tlsMode))
                            .addLast(defaultEventExecutorGroup,
                            //编码器
                                new NettyEncoder(),
                                //解码器
                                new NettyDecoder(),
                                //空闲通道检测handler
                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                //连接管理器,看其源码主要用于处理
                                //IDLE事件并在active/registered方法中
                                //记录info日志
                                new NettyConnectManageHandler(),
                                //RocketMQ的主要处理逻辑就在此handler中
                                //完成,比如如何响应客户端的请求等
                                new NettyServerHandler()
                            );
                    }
                });
        //是否使用池化bytebuf分配器
        if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
            childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        }
    
        //进行端口绑定,开始客户端连接的实际监听
        try {
            ChannelFuture sync = this.serverBootstrap.bind().sync();
            InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
            this.port = addr.getPort();
        } catch (InterruptedException e1) {
            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
        }
    
        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }
    
        //定时任务,用于扫描那些自己发出正在等待服务端响应(如broker
        //向Namesrv发出的请求)的请求,如果已经超时,则进行超时处理
        //扫描时间间隔为1秒/次
        this.timer.scheduleAtFixedRate(new TimerTask() {
    
            @Override
            public void run() {
                try {
                    NettyRemotingServer.this.scanResponseTable();
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
    }
    

    RocketMQ中NettyRemotingServer的初始化和启动已经介绍完毕,本文不分析RocketMQ使用Netty时的编码解码方案,下面主要看下RocketMQ是如何处理客户端请求的,经过上文的介绍,我们知道注册在客户端channel中的Handler NettyServerHandler是该逻辑所在,下面我们看其源码:

    //NettyServerHandler是NettyRemotingServer的内部类
    //其在channelRead0方法中调用了NettyRemotingServer父类
    //NettyRemotingAbstract的方法processMessageReceived
    class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
            processMessageReceived(ctx, msg);
        }
    }
    

    NettyRemotingAbstract.processMessageReceived定义如下:

    //主要根据收到报文的类型为请求或者响应进行分别处理
    //下面重点介绍对请求报文的处理,响应报文的处理比较简单
    //根据报文中的id(RocketMQ为opaque)从等待响应的本地请求中
    //取得其Future对象,设置返回结果,并解除阻塞
    public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        final RemotingCommand cmd = msg;
        if (cmd != null) {
            switch (cmd.getType()) {
                case REQUEST_COMMAND:
                    processRequestCommand(ctx, cmd);
                    break;
                case RESPONSE_COMMAND:
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break;
            }
        }
    }
    

    下面重点看NettyRemotingAbstract.processRequestCommand函数的实现:

    //NettyRemotingAbstract
    public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
        //根据请求报文中的请求代码cmd.getCode()查询注册的processor,
        //processor的注册我们后面介绍,如果没有匹配的,则使用默认的
        //processor,即defaultRequestProcessor,这里返回的Pair则是
        //processor和线程池的组合体,线程池就是NettyRemotingServer
        //构造函数中定义的publicExecutor
        final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
        final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
        //获取请求的id,放入响应结果中
        final int opaque = cmd.getOpaque();
    
        if (pair != null) {
            //定义Runnable对象,Runnable.run方法则主要调用
            //processor.processRequest方法进行请求处理,
            //定义的Runnable对象会放到线程池publicExecutor进行
            //处理
            Runnable run = new Runnable() {
                @Override
                public void run() {
                    try {
                        doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
                        final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
                        doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
    
                        if (!cmd.isOnewayRPC()) {
                            if (response != null) {
                                response.setOpaque(opaque);
                                response.markResponseType();
                                try {
                                    ctx.writeAndFlush(response);
                                } catch (Throwable e) {
                                    log.error("process request over, but response failed", e);
                                    log.error(cmd.toString());
                                    log.error(response.toString());
                                }
                            } else {
    
                            }
                        }
                    } catch (Throwable e) {
                        log.error("process request exception", e);
                        log.error(cmd.toString());
    
                        if (!cmd.isOnewayRPC()) {
                            final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
                                RemotingHelper.exceptionSimpleDesc(e));
                            response.setOpaque(opaque);
                            ctx.writeAndFlush(response);
                        }
                    }
                }
            };
    
            if (pair.getObject1().rejectRequest()) {
                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                    "[REJECTREQUEST]system busy, start flow control for a while");
                response.setOpaque(opaque);
                ctx.writeAndFlush(response);
                return;
            }
    
            try {
                //上面定义的Runnable被封装为RequestTask放入线程池中处理
                final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
                pair.getObject2().submit(requestTask);
            } catch (RejectedExecutionException e) {
                if ((System.currentTimeMillis() % 10000) == 0) {
                    log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
                        + ", too many requests and system thread pool busy, RejectedExecutionException "
                        + pair.getObject2().toString()
                        + " request code: " + cmd.getCode());
                }
    
                if (!cmd.isOnewayRPC()) {
                    final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                        "[OVERLOAD]system busy, start flow control for a while");
                    response.setOpaque(opaque);
                    ctx.writeAndFlush(response);
                }
            }
        } else {
            //处理没有找到processor的异常情况
            String error = " request type " + cmd.getCode() + " not supported";
            final RemotingCommand response =
                RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
            response.setOpaque(opaque);
            ctx.writeAndFlush(response);
            log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
        }
    }
    

    到这里我们对RocketMQ服务端的介绍已经基本完毕,下面我们看下服务端的最后一个问题,就是processor的注册,通过阅读源码可以发现,服务端processor的注册是在Controller中进行的,我们这里主要看下BrokerController的实现,通过前文我们知道BrokerControllerinitialize方法中实例化了两个NettyRemotingServer实例:remotingServerfastRemotingServer,在实例化两个NettyRemotingServer实例之后,在initialize方法中调用了registerProcessor方法:

    //NettyRemotingServer
    public void registerProcessor() {
        //在介绍NettyRemotingAbstract.processRequestCommand
        //方法时说道会根据请求报文中的请求类型代码查找processor,
        //这里就为每个请求类型代码注册processor
        //下面不再展开介绍,BrokerController注册了生产者发送消息、
        //消费者拉取消息、消息查询、客户端管理(心跳、注册等)、消费管理
        //事务消息处理等处理对应的processor
        /**
            * SendMessageProcessor
            */
        SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
        sendProcessor.registerSendMessageHook(sendMessageHookList);
        sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
    
        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
        //remotingServer和fastRemotingServer的不同就在这里,
        //fastRemotingServer没有注册客户端拉取消息的processor,
        //所以fastRemotingServer不能响应客户端拉取消息的请求报文
        /**
            * PullMessageProcessor
            */
        this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
        this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
    
        /**
            * QueryMessageProcessor
            */
        NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);
        this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
    
        this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
    
        /**
            * ClientManageProcessor
            */
        ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
        this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
        this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
        this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
    
        this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
    
        /**
            * ConsumerManageProcessor
            */
        ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);
        this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
        this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
        this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
    
        this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
    
        /**
            * EndTransactionProcessor
            */
        this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);
    
        //注册默认的processor
        /**
            * Default
            */
        AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);
        this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
        this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
    }
    
    

    从上面的介绍中可以知道remotingServer和fastRemotingServer的不同就是fastRemotingServer没有注册客户端拉取消息的processor,所以fastRemotingServer不能响应客户端拉取消息的请求报文,fast的含义应该就是fastRemotingServer不能响应客户端拉取消息的请求报文所以能够快速响应其他请求。

    3 Netty通信客户端

    RocketMQ中Netty通信客户端实现类为NettyRemotingClient

    因为消息消费或者生产需要向Namesrv请求Broker信息,也需要连接Broker进行消息拉取、存储等,所以消息消费实现类DefaultMQPushConsumerDefaultMQPullConsumer;消息生产者DefaultMQProducer等,都会持有Netty客户端。

    同样地,应为Broker需要从Namesrv发送注册等请求,所以也需要持有Netty客户端。

    在了解了Netty框架的基本原理,结合着上面Netty通信服务端NettyRemotingServer的介绍,NettyRemoingClient的实现原理就十分容易理解,其实现和NettyRemotingServer基本一致,无非是:配置线程池、Netty客户端Channel实现、配置handler、注册processor等,这里不再展开介绍。

    相关文章

      网友评论

        本文标题:RocketMQ源码-基于Netty的通信层设计

        本文链接:https://www.haomeiwen.com/subject/rkhtqctx.html