美文网首页rocketMq理论与实践
RocketMQ namesrv 元数据存储

RocketMQ namesrv 元数据存储

作者: 晴天哥_王志 | 来源:发表于2020-05-01 21:26 被阅读0次

    namesrv三部曲

    开篇

    • 本文基于rocketmq-all-4.6.1的版本进行分析,主要分析rocketmq的namesrv功能,namesrv的核心功能包括启动流程、元数据存储、以及交互流程。

    • 这篇文章主要是分析namesrv的交互流程。

    namesrv的定位

    • namesrv的定位是作为注册中心,保存broker节点的路由信息,保存一些简单的k/v配置信息。
    • namesrv支持集群模式,但是每个namesrv之间相互独立不进行任何通信,它的多点容灾通过producer/consumer在访问namesrv的时候轮询获取信息(当前节点访问失败就转向下一个)。
    • namesrv作为注册中心,负责接收broker定期的注册信息并维持在内存当中,没错namesrv是没有持久化功能的,所有数据都保存在内存当中,broker的注册过程也是循环遍历所有namesrv进行注册。
    • namesrv提供对外接口给producer和consumer访问broker的路由信息,底层通过netty来实现。
    • namesrv对broker的存活检测机制:心跳机制即namesrv作为broker的server端定期接收broker的心跳信息,超时无心跳就移除broker;连接异常检测机制即底层通过epoll的消息机制来检测连接的断开。

    namesrv的交互流程

    public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
    
        public void start() {
            this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
                nettyServerConfig.getServerWorkerThreads(),
                new ThreadFactory() {
    
                    private AtomicInteger threadIndex = new AtomicInteger(0);
    
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                    }
                });
    
            prepareSharableHandlers();
    
            ServerBootstrap childHandler =
                this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                    .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .option(ChannelOption.SO_REUSEADDR, true)
                    .option(ChannelOption.SO_KEEPALIVE, false)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                    .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                    .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                                .addLast(defaultEventExecutorGroup,
                                    encoder,
                                    new NettyDecoder(),
                                    new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                    connectionManageHandler,
                                    serverHandler
                                );
                        }
                    });
    
            if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
                childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
            }
    
            try {
                ChannelFuture sync = this.serverBootstrap.bind().sync();
                InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
                this.port = addr.getPort();
            } catch (InterruptedException e1) {
            }
    
            if (this.channelEventListener != null) {
                this.nettyEventExecutor.start();
            }
        }
    }
    
    • NettyRemotingServer#start()方法启动NettyServer监听并指定对应的handler。
    • NettyDecoder用于消息的编解码。
    • serverHandler(实际为NettyRemotingServer),用于处理消息请求和响应。

    NettyDecoder

    public class NettyDecoder extends LengthFieldBasedFrameDecoder {
        private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
    
        private static final int FRAME_MAX_LENGTH =
            Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216"));
    
        public NettyDecoder() {
            super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
        }
    
        @Override
        public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
            ByteBuf frame = null;
            try {
                frame = (ByteBuf) super.decode(ctx, in);
                if (null == frame) {
                    return null;
                }
    
                ByteBuffer byteBuffer = frame.nioBuffer();
    
                return RemotingCommand.decode(byteBuffer);
            } catch (Exception e) {
                log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
                RemotingUtil.closeChannel(ctx.channel());
            } finally {
                if (null != frame) {
                    frame.release();
                }
            }
    
            return null;
        }
    }
    
    
    public class RemotingCommand {
    
        public static RemotingCommand decode(final ByteBuffer byteBuffer) {
            int length = byteBuffer.limit();
            // 前4个字节指定的是HeaderLen
            int oriHeaderLen = byteBuffer.getInt();
            int headerLength = getHeaderLength(oriHeaderLen);
            // 获取HeaderData
            byte[] headerData = new byte[headerLength];
            byteBuffer.get(headerData);
            // 解析RemotingCommand对象
            RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
    
            int bodyLength = length - 4 - headerLength;
            byte[] bodyData = null;
            if (bodyLength > 0) {
                bodyData = new byte[bodyLength];
                byteBuffer.get(bodyData);
            }
            // 绑定RemotingCommand的body
            cmd.body = bodyData;
    
            return cmd;
        }
    
    
        private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {
            switch (type) {
                case JSON:
                    RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);
                    resultJson.setSerializeTypeCurrentRPC(type);
                    return resultJson;
                case ROCKETMQ:
                    RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);
                    resultRMQ.setSerializeTypeCurrentRPC(type);
                    return resultRMQ;
                default:
                    break;
            }
    
            return null;
        }
    }
    
    • NettyDecoder#decode真正执行的是RemotingCommand#decode方法。
    • NettyDecoder解析按照先header后body的方法解析成RemotingCommand对象。

    NettyRemotingServer

    public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
    
        @ChannelHandler.Sharable
        class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
    
            @Override
            protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
                processMessageReceived(ctx, msg);
            }
        }
    }
    
    
    public abstract class NettyRemotingAbstract {
    
        public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
            final RemotingCommand cmd = msg;
    
            if (cmd != null) {
    
                switch (cmd.getType()) {
                    case REQUEST_COMMAND:
                        processRequestCommand(ctx, cmd);
                        break;
    
                    case RESPONSE_COMMAND:
                        processResponseCommand(ctx, cmd);
                        break;
    
                    default:
                        break;
                }
            }
        }
    
        public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
            final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
            final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
            final int opaque = cmd.getOpaque();
    
            if (pair != null) {
                Runnable run = new Runnable() {
                    @Override
                    public void run() {
                        try {
                            doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
                            // pair.getObject1()指的是DefaultRequestProcessor
                            final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
                            doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
    
                            if (!cmd.isOnewayRPC()) {
                                if (response != null) {
                                    response.setOpaque(opaque);
                                    response.markResponseType();
                                    try {
                                        ctx.writeAndFlush(response);
                                    } catch (Throwable e) {
                                        log.error("process request over, but response failed", e);
                                        log.error(cmd.toString());
                                        log.error(response.toString());
                                    }
                                } else {
    
                                }
                            }
                        } catch (Throwable e) {
                            
                        }
                    }
                };
    
                if (pair.getObject1().rejectRequest()) {
                    final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                        "[REJECTREQUEST]system busy, start flow control for a while");
                    response.setOpaque(opaque);
                    ctx.writeAndFlush(response);
                    return;
                }
    
                try {
                    final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
                    // pair.getObject2()是指NamesrvController的remotingExecutor
                    // Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
                    pair.getObject2().submit(requestTask);
                } catch (RejectedExecutionException e) {
                    
                }
            } else {
                String error = " request type " + cmd.getCode() + " not supported";
                final RemotingCommand response =
                    RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
                response.setOpaque(opaque);
                ctx.writeAndFlush(response);
                log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
            }
        }
    }
    
    • NettyRemotingServer#channelRead0最终执行NettyRemotingAbstract的
      processMessageReceived方法。
    • NettyRemotingAbstract#processRequestCommand用于处理REQUEST_COMMAND命令。
    • processRequestCommand是通过submit提交线程任务来实现的,也就是说namesrv处理任务是通过线程池来实现的。

    DefaultRequestProcessor

    public class DefaultRequestProcessor implements NettyRequestProcessor {
        private static InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    
        protected final NamesrvController namesrvController;
    
        public DefaultRequestProcessor(NamesrvController namesrvController) {
            this.namesrvController = namesrvController;
        }
    
        @Override
        public RemotingCommand processRequest(ChannelHandlerContext ctx,
            RemotingCommand request) throws RemotingCommandException {
    
            if (ctx != null) {
                log.debug("receive request, {} {} {}",
                    request.getCode(),
                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                    request);
            }
    
    
            switch (request.getCode()) {
                case RequestCode.PUT_KV_CONFIG:
                    return this.putKVConfig(ctx, request);
    
                case RequestCode.GET_KV_CONFIG:
                    return this.getKVConfig(ctx, request);
    
                case RequestCode.DELETE_KV_CONFIG:
                    return this.deleteKVConfig(ctx, request);
    
                case RequestCode.QUERY_DATA_VERSION:
                    return queryBrokerTopicConfig(ctx, request);
    
                case RequestCode.REGISTER_BROKER:
                    Version brokerVersion = MQVersion.value2Version(request.getVersion());
                    if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                        return this.registerBrokerWithFilterServer(ctx, request);
                    } else {
                        return this.registerBroker(ctx, request);
                    }
    
                case RequestCode.UNREGISTER_BROKER:
                    return this.unregisterBroker(ctx, request);
    
                case RequestCode.GET_ROUTEINTO_BY_TOPIC:
                    return this.getRouteInfoByTopic(ctx, request);
    
                case RequestCode.GET_BROKER_CLUSTER_INFO:
                    return this.getBrokerClusterInfo(ctx, request);
    
                case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
                    return this.wipeWritePermOfBroker(ctx, request);
    
                case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
                    return getAllTopicListFromNameserver(ctx, request);
    
                case RequestCode.DELETE_TOPIC_IN_NAMESRV:
                    return deleteTopicInNamesrv(ctx, request);
    
                case RequestCode.GET_KVLIST_BY_NAMESPACE:
                    return this.getKVListByNamespace(ctx, request);
    
                case RequestCode.GET_TOPICS_BY_CLUSTER:
                    return this.getTopicsByCluster(ctx, request);
    
                case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
                    return this.getSystemTopicListFromNs(ctx, request);
    
                case RequestCode.GET_UNIT_TOPIC_LIST:
                    return this.getUnitTopicList(ctx, request);
    
                case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
                    return this.getHasUnitSubTopicList(ctx, request);
    
                case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
                    return this.getHasUnitSubUnUnitTopicList(ctx, request);
    
                case RequestCode.UPDATE_NAMESRV_CONFIG:
                    return this.updateConfig(ctx, request);
    
                case RequestCode.GET_NAMESRV_CONFIG:
                    return this.getConfig(ctx, request);
    
                default:
                    break;
            }
    
            return null;
        }
    }
    
    DefaultRequestProcessor
    • DefaultRequestProcessor作为namesrv的核心处理逻辑,提供了各类方法处理命令方法。
    • DefaultRequestProcessor的各类处理方法按需求自行研究。

    相关文章

      网友评论

        本文标题:RocketMQ namesrv 元数据存储

        本文链接:https://www.haomeiwen.com/subject/evbqghtx.html