上一篇文章中说到要继续讨论关于“Registry”注册器的实现。然而我反悔了。注册器的实现涉及到了客户端程序,而客户端是属于consumer的部分。因此我决定将这个部分稍微放一放。这篇文章开始介绍“注册中心”的实现。
public static void main(String[] args) {
RegistryServer registryServer = RegistryServer.Default.createRegistryServer(20001, 1);
MonitorServer monitor = new MonitorServer(19998);
try {
monitor.setRegistryMonitor(registryServer);
monitor.start();
registryServer.startRegistryServer();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
启动一个注册中心server很简单。注册中心中有一个监控“MonitorServer”,这个不仅仅在注册中心中有,在provider中也有。
public interface RegistryServer extends RegistryMonitor {
void startRegistryServer();
}
public interface RegistryMonitor {
/**
* Returns the address list of publisher.
*/
List<String> listPublisherHosts();
/**
* Returns the address list of subscriber.
*/
List<String> listSubscriberAddresses();
/**
* Returns to the service of all the specified service provider's address.
*/
List<String> listAddressesByService(String group, String serviceProviderName, String version);
/**
* Finds the address(host, port) of the corresponding node and returns all
* the service names it provides.
*/
List<String> listServicesByAddress(String host, int port);
}
注册中心抽象接口中只提供了一个操作--启动注册中心。其超级继承自Monitor,而这个Monitor提供的操作也很简单。默认的实现是由DefaultRegistryServer
来完成的。这个实例的创建并不是显示的,而是使用反射方式去实例化。在某些情况下不想使用默认的注册中心而是使用别的如zk,如果jupiter-registry-default
依赖包没有显示的添加进来,编译会直接报错的。这种思路在很多框架中都有体现。
注册中心的作用除了保存provider的一些信息外,还有一个重要的职责--监听注册信息的变化。当一个provider断线了,那么注册中心就得及时将这个provide发布的信息给清理掉(下线)并且告诉consumer:你订阅的服务不可用了。如果重连后,provider会重新发布服务信息,注册中心又得将这些变化告诉consumer:你订阅的服务又能使用了。对于consumer来说,当consumer断线了注册中心将consumer订阅的服务给清理掉,连接重建后又会自动订阅。
和provider一样,注册中心也是一个server。因此需要关心的核心逻辑其实就是handler的实现。
boot.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(
new IdleStateChecker(timer, JConstants.READER_IDLE_TIME_SECONDS, 0, 0),
idleStateTrigger,
new MessageDecoder(),
encoder,
ackEncoder,
handler);
}
});
这里还多了一个ackEncoder
编码器。
protected void encode(ChannelHandlerContext ctx, Acknowledge ack, ByteBuf out) throws Exception {
out.writeShort(JProtocolHeader.MAGIC)
.writeByte(JProtocolHeader.ACK)
.writeByte(0)
.writeLong(ack.sequence())
.writeInt(0);
}
从具体的实现逻辑中可以看出,这个编码器实际上仅仅只做了一件事:构造一个ack的消息包。
解码器的逻辑和之前provider的类似,只不过接收的消息格式类型不同。
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
switch (state()) {
case MAGIC:
checkMagic(in.readShort()); // MAGIC
checkpoint(State.SIGN);
case SIGN:
header.sign(in.readByte()); // 消息标志位
checkpoint(State.STATUS);
case STATUS:
in.readByte(); // no-op
checkpoint(State.ID);
case ID:
header.id(in.readLong()); // 消息id
checkpoint(State.BODY_SIZE);
case BODY_SIZE:
header.bodySize(in.readInt()); // 消息体长度
checkpoint(State.BODY);
case BODY:
byte s_code = header.serializerCode();
switch (header.messageCode()) {
case JProtocolHeader.HEARTBEAT:
break;
case JProtocolHeader.PUBLISH_SERVICE:
case JProtocolHeader.PUBLISH_CANCEL_SERVICE:
case JProtocolHeader.SUBSCRIBE_SERVICE:
case JProtocolHeader.OFFLINE_NOTICE: {
byte[] bytes = new byte[header.bodySize()];
in.readBytes(bytes);
Serializer serializer = SerializerFactory.getSerializer(s_code);
Message msg = serializer.readObject(bytes, Message.class);
msg.messageCode(header.messageCode());
out.add(msg);
break;
}
case JProtocolHeader.ACK:
out.add(new Acknowledge(header.id()));
break;
default:
throw IoSignals.ILLEGAL_SIGN;
}
checkpoint(State.MAGIC);
}
}
实际上有三种消息类型:心跳、发布/订阅和ack消息。对应的处理逻辑也不相同,心跳消息不处理,发布/订阅消息单独去构造消息实体,ack消息也如此,只不过内容非常简单。
将消息解码成特定对象之后就得对这些对象进行处理。也就是MessageHandler
要干的活儿。
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel ch = ctx.channel();
if (msg instanceof Message) {
Message obj = (Message) msg;
switch (obj.messageCode()) {
case JProtocolHeader.PUBLISH_SERVICE:
case JProtocolHeader.PUBLISH_CANCEL_SERVICE:
RegisterMeta meta = (RegisterMeta) obj.data();
if (Strings.isNullOrEmpty(meta.getHost())) {
SocketAddress address = ch.remoteAddress();
if (address instanceof InetSocketAddress) {
meta.setHost(((InetSocketAddress) address).getAddress().getHostAddress());
} else {
logger.warn("Could not get remote host: {}, info: {}", ch, meta);
return;
}
}
if (obj.messageCode() == JProtocolHeader.PUBLISH_SERVICE) {
handlePublish(meta, ch);
} else if (obj.messageCode() == JProtocolHeader.PUBLISH_CANCEL_SERVICE) {
handlePublishCancel(meta, ch);
}
ch.writeAndFlush(new Acknowledge(obj.sequence())) // 回复ACK
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
break;
case JProtocolHeader.SUBSCRIBE_SERVICE:
handleSubscribe((RegisterMeta.ServiceMeta) obj.data(), ch);
ch.writeAndFlush(new Acknowledge(obj.sequence())) // 回复ACK
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
break;
case JProtocolHeader.OFFLINE_NOTICE:
handleOfflineNotice((RegisterMeta.Address) obj.data());
break;
}
} else if (msg instanceof Acknowledge) {
handleAcknowledge((Acknowledge) msg, ch);
} else {
logger.warn("Unexpected message type received: {}, channel: {}.", msg.getClass(), ch);
ReferenceCountUtil.release(msg);
}
}
这里处理的也只会有两种类型--ack消息包和Message消息包,也就是之前解码器的产物。
对于发布/取消发布服务操作来说,消息的发送者一定是provider,而消息的内容之前也说到,一定是RegisterMeta
类型的。不信可以看看代码:
public void publish(ServiceWrapper serviceWrapper) {
ServiceMetadata metadata = serviceWrapper.getMetadata();
RegisterMeta meta = new RegisterMeta();
meta.setPort(acceptor.boundPort());
meta.setGroup(metadata.getGroup());
meta.setServiceProviderName(metadata.getServiceProviderName());
meta.setVersion(metadata.getVersion());
meta.setWeight(serviceWrapper.getWeight());
meta.setConnCount(JConstants.SUGGESTED_CONNECTION_COUNT);
registryService.register(meta);
}
因此这里使用强制类型转换是没有问题的。可以看到,provider发布注册消息的时候并没有将address给set进去,因此这里是通过channel来获取provider的host。没有一点毛病。我猜是因为provider获取自己的host比较麻烦,所以干脆直接让注册中心来处理这件事情。
接下来就是区分是发布还是取消发布。没有直接在case中判断可能是因为如果使用那种方式上面的代码得重写一遍,也很累人。干脆直接用if来判断得了。处理完了之后还得发送一个ack的消息到provider/consumer。这种逻辑也是能够理解的。作为注册中心肯定是有责任告诉provider/consumer:你们的请求都完成了。不然provider/consumer怎么能保证自己发布/订阅的消息是不是成功的被注册中心给处理了呢?作为注册中心,还是得让人信任的。
看看发布服务的具体逻辑。
private void handlePublish(RegisterMeta meta, Channel channel) {
logger.info("Publish {} on channel{}.", meta, channel);
attachPublishEventOnChannel(meta, channel);
final RegisterMeta.ServiceMeta serviceMeta = meta.getServiceMeta();
ConfigWithVersion<ConcurrentMap<RegisterMeta.Address, RegisterMeta>> config =
registerInfoContext.getRegisterMeta(serviceMeta);
synchronized (registerInfoContext.publishLock(config)) {
// putIfAbsent和config.newVersion()需要是原子操作, 所以这里加锁
if (config.getConfig().putIfAbsent(meta.getAddress(), meta) == null) {
registerInfoContext.getServiceMeta(meta.getAddress()).add(serviceMeta);
final Message msg = new Message(serializerType.value());
msg.messageCode(JProtocolHeader.PUBLISH_SERVICE);
msg.version(config.newVersion()); // 版本号+1
msg.data(Pair.of(serviceMeta, meta));
subscriberChannels.writeAndFlush(msg, new ChannelMatcher() {
@Override
public boolean matches(Channel channel) {
boolean doSend = isChannelSubscribeOnServiceMeta(serviceMeta, channel);
if (doSend) {
MessageNonAck msgNonAck = new MessageNonAck(serviceMeta, msg, channel);
// 收到ack后会移除当前key(参见handleAcknowledge), 否则超时超时重发
messagesNonAck.put(msgNonAck.id, msgNonAck);
}
return doSend;
}
});
}
}
}
这句代码attachPublishEventOnChannel(meta, channel);
的方法名直译过来意思就是在channel上绑定发布服务的信息。具体实现如下:
private static final AttributeKey<ConcurrentSet<RegisterMeta>> S_PUBLISH_KEY =
AttributeKey.valueOf("server.published");
private static boolean attachPublishEventOnChannel(RegisterMeta meta, Channel channel) {
Attribute<ConcurrentSet<RegisterMeta>> attr = channel.attr(S_PUBLISH_KEY);
ConcurrentSet<RegisterMeta> registerMetaSet = attr.get();
if (registerMetaSet == null) {
ConcurrentSet<RegisterMeta> newRegisterMetaSet = new ConcurrentSet<>();
registerMetaSet = attr.setIfAbsent(newRegisterMetaSet);
if (registerMetaSet == null) {
registerMetaSet = newRegisterMetaSet;
}
}
return registerMetaSet.add(meta);
}
这样就很容易理解了,意思就是一个channel上是能够保存信息的,怎么保存呢?通过S_PUBLISH_KEY
,就将这个玩意作为一个记号,刻在channel上,这个记号是能够指定类型的,这里指定的是ConcurrentSet
类型,当然也能指定别的如String、Integer啥的。然后就是一段骚操作--将这个类型给实例化出来,并且将meta消息给添加进去。第一次在这个channel去获取S_PUBLISH_KEY
标记的内容,肯定是不存在的,那么就得将其中的内容给实例化出来,单线程环境什么都好说,简单粗暴去赋值即可,但是在并发情况下鬼知道啥时候某个线程就悄咪咪的把这个标记好的set给实例化了,而这时候你却不知道,再去实例化赋值一次,这就产生了并发问题。这里的操作是使用setIfAbsent,如果这个标记中已经有了,那就直接返回这个存在的值,没有就将其set进去,并返回null。
“打标记”这种概念很抽象,很纳闷,为什么一个channel能够被打上标记。其实也容易理解,无非就是netty将这个channel进行单独的“处理”。从本质上来说在服务端的内存中分配了一块区域,与channel相关联。这个channel也比较抽象,不容易理解,其实就是一个“连接”,再具体点就是一个客户端。具体的实现细节得去查阅netty的源码。
接下来使用一个config
变量来保存address和RegisterMeta的映射.这个config是通过serviceMeta
来获取的,也就是通过服务名来获取映射--指定的服务有哪些节点注册。这里同时也得维护节点-服务的映射关系。最后构造消息报文,这个报文发给所有的订阅者。
仅仅发送出消息还不算完事
private final ConcurrentMap<String, MessageNonAck> messagesNonAck = Maps.newConcurrentMap();
这里还维护一个map变量,保存着“未回应”的消息。这么做的目的也是为了保证可靠。也就是说,当provider发布一条注册信息给注册中心了,注册中心得回应provider:我收到了你的请求了。同时也得处理这个注册消息--告诉订阅者,但是不能一直等着订阅者回复确认收到的消息。于是在本地保存一个“未回应”消息,当收到订阅者的响应报文时再将这些“未回应”报文给移除掉。当然也得考虑永远收不到或者很长时间都收不到的情况,内部有一个守护线程,定时去清理超时的消息。不然的话这个map肯定会爆炸,oom肯定是不允许发生的。
至于取消发布的逻辑基本上和发布一样,只不过是反着来的,一个是添加,一个是移除而已。不啰嗦了。
服务订阅和服务下线留到下篇接着讲。
网友评论