namesrv三部曲
开篇
-
本文基于rocketmq-all-4.6.1的版本进行分析,主要分析rocketmq的namesrv功能,namesrv的核心功能包括启动流程、元数据存储、以及交互流程。
-
这篇文章主要是分析namesrv的交互流程。
namesrv的定位
- namesrv的定位是作为注册中心,保存broker节点的路由信息,保存一些简单的k/v配置信息。
- namesrv支持集群模式,但是每个namesrv之间相互独立不进行任何通信,它的多点容灾通过producer/consumer在访问namesrv的时候轮询获取信息(当前节点访问失败就转向下一个)。
- namesrv作为注册中心,负责接收broker定期的注册信息并维持在内存当中,没错namesrv是没有持久化功能的,所有数据都保存在内存当中,broker的注册过程也是循环遍历所有namesrv进行注册。
- namesrv提供对外接口给producer和consumer访问broker的路由信息,底层通过netty来实现。
- namesrv对broker的存活检测机制:心跳机制即namesrv作为broker的server端定期接收broker的心跳信息,超时无心跳就移除broker;连接异常检测机制即底层通过epoll的消息机制来检测连接的断开。
namesrv的交互流程
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
prepareSharableHandlers();
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
}
});
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
}
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
}
}
- NettyRemotingServer#start()方法启动NettyServer监听并指定对应的handler。
- NettyDecoder用于消息的编解码。
- serverHandler(实际为NettyRemotingServer),用于处理消息请求和响应。
NettyDecoder
public class NettyDecoder extends LengthFieldBasedFrameDecoder {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
private static final int FRAME_MAX_LENGTH =
Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216"));
public NettyDecoder() {
super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
}
@Override
public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame = null;
try {
frame = (ByteBuf) super.decode(ctx, in);
if (null == frame) {
return null;
}
ByteBuffer byteBuffer = frame.nioBuffer();
return RemotingCommand.decode(byteBuffer);
} catch (Exception e) {
log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
RemotingUtil.closeChannel(ctx.channel());
} finally {
if (null != frame) {
frame.release();
}
}
return null;
}
}
public class RemotingCommand {
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
int length = byteBuffer.limit();
// 前4个字节指定的是HeaderLen
int oriHeaderLen = byteBuffer.getInt();
int headerLength = getHeaderLength(oriHeaderLen);
// 获取HeaderData
byte[] headerData = new byte[headerLength];
byteBuffer.get(headerData);
// 解析RemotingCommand对象
RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
int bodyLength = length - 4 - headerLength;
byte[] bodyData = null;
if (bodyLength > 0) {
bodyData = new byte[bodyLength];
byteBuffer.get(bodyData);
}
// 绑定RemotingCommand的body
cmd.body = bodyData;
return cmd;
}
private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {
switch (type) {
case JSON:
RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);
resultJson.setSerializeTypeCurrentRPC(type);
return resultJson;
case ROCKETMQ:
RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);
resultRMQ.setSerializeTypeCurrentRPC(type);
return resultRMQ;
default:
break;
}
return null;
}
}
- NettyDecoder#decode真正执行的是RemotingCommand#decode方法。
- NettyDecoder解析按照先header后body的方法解析成RemotingCommand对象。
NettyRemotingServer
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
@ChannelHandler.Sharable
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg);
}
}
}
public abstract class NettyRemotingAbstract {
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
case REQUEST_COMMAND:
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND:
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
final int opaque = cmd.getOpaque();
if (pair != null) {
Runnable run = new Runnable() {
@Override
public void run() {
try {
doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
// pair.getObject1()指的是DefaultRequestProcessor
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
if (!cmd.isOnewayRPC()) {
if (response != null) {
response.setOpaque(opaque);
response.markResponseType();
try {
ctx.writeAndFlush(response);
} catch (Throwable e) {
log.error("process request over, but response failed", e);
log.error(cmd.toString());
log.error(response.toString());
}
} else {
}
}
} catch (Throwable e) {
}
}
};
if (pair.getObject1().rejectRequest()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[REJECTREQUEST]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
return;
}
try {
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
// pair.getObject2()是指NamesrvController的remotingExecutor
// Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
pair.getObject2().submit(requestTask);
} catch (RejectedExecutionException e) {
}
} else {
String error = " request type " + cmd.getCode() + " not supported";
final RemotingCommand response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
response.setOpaque(opaque);
ctx.writeAndFlush(response);
log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
}
}
}
- NettyRemotingServer#channelRead0最终执行NettyRemotingAbstract的
processMessageReceived方法。 - NettyRemotingAbstract#processRequestCommand用于处理REQUEST_COMMAND命令。
- processRequestCommand是通过submit提交线程任务来实现的,也就是说namesrv处理任务是通过线程池来实现的。
DefaultRequestProcessor
public class DefaultRequestProcessor implements NettyRequestProcessor {
private static InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
protected final NamesrvController namesrvController;
public DefaultRequestProcessor(NamesrvController namesrvController) {
this.namesrvController = namesrvController;
}
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
if (ctx != null) {
log.debug("receive request, {} {} {}",
request.getCode(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
request);
}
switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request);
case RequestCode.QUERY_DATA_VERSION:
return queryBrokerTopicConfig(ctx, request);
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
case RequestCode.UNREGISTER_BROKER:
return this.unregisterBroker(ctx, request);
case RequestCode.GET_ROUTEINTO_BY_TOPIC:
return this.getRouteInfoByTopic(ctx, request);
case RequestCode.GET_BROKER_CLUSTER_INFO:
return this.getBrokerClusterInfo(ctx, request);
case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
return this.wipeWritePermOfBroker(ctx, request);
case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
return getAllTopicListFromNameserver(ctx, request);
case RequestCode.DELETE_TOPIC_IN_NAMESRV:
return deleteTopicInNamesrv(ctx, request);
case RequestCode.GET_KVLIST_BY_NAMESPACE:
return this.getKVListByNamespace(ctx, request);
case RequestCode.GET_TOPICS_BY_CLUSTER:
return this.getTopicsByCluster(ctx, request);
case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
return this.getSystemTopicListFromNs(ctx, request);
case RequestCode.GET_UNIT_TOPIC_LIST:
return this.getUnitTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
return this.getHasUnitSubTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
return this.getHasUnitSubUnUnitTopicList(ctx, request);
case RequestCode.UPDATE_NAMESRV_CONFIG:
return this.updateConfig(ctx, request);
case RequestCode.GET_NAMESRV_CONFIG:
return this.getConfig(ctx, request);
default:
break;
}
return null;
}
}
DefaultRequestProcessor
- DefaultRequestProcessor作为namesrv的核心处理逻辑,提供了各类方法处理命令方法。
- DefaultRequestProcessor的各类处理方法按需求自行研究。
网友评论