1.手写模拟Dubbo
- 定义一个接口,服务提供者需要实现该接口。该接口需要打成一个jar包,然后服务提供者和调用者都要依赖该接口的jar包。
- Dubbo需要处理Http请求(HttpServer)、Dubbo请求
- 服务提供者需要启动HttpServer,比如Tomcat还要添加DispatchServlet
- DispatchServlet#service调用Handler处理请求,服务调用者RPC调用服务提供者,需要传递的参数包括(封装成Invocation,序列化与反序列化统一为JDK序列化方式):
接口名
方法名
方法参数类型列表
方法参数值列表
版本号 - 服务提供者从请求里面拿到Invocation对象后,需要找到对应接口实现类的执行方法。那到底可以调用哪些实现类呢?服务提供者这边需要对可以向外暴露的实现类进行本地注册。
然后根据Invocation中的接口拿到实现类,然后使用Invocation方法名、参数调用实现类对应的方法。 - 如果服务提供者对应接口有多个版本的实现类,则Invocation需要加入版本号,然后通过指定版本号执行指定版本的实现类。
- 服务调用者需要远程调用,比如HttpClient,对Invocation进行JDK序列化,发送请求,然后接收响应。
- (服务调用者升级)可直接拿到接口的动态代理实现类,然后直接通过该实现类进行RPC调用。把RPC调用逻辑封装到该代理实现类里面。ProxyFactory#getProxy获取JDK动态代理,将RPC逻辑封装到InvocationHandler#invoke里面去。
- (注册中心+负载均衡)服务提供者地址:ip + port需要可配置,而不是写死。需要根据服务名称可以直接获取到ip + port。这其实就是注册中心的功能,服务提供者需要向注册中心进行远程注册,存入Map<接口名,List<URL>>。然后服务调用者从注册中心拉取服务提供者信息,然后进行负载均衡选取一个服务提供者的URL,然后进行RPC调用。
- (服务容错+重试服务)调用服务出错,调用指定的容错类方法,该方法的逻辑非常灵活,可以返回错误说明等。某个服务调用失败,可以进行指定次数的重试,下次重试调用要避开失败的服务实例。
- (Mock机制)模拟、伪造服务提供者。
- (支持多协议)比如支持Tomcat以及Netty。抽象出一个接口Protocol,start()启动协议,send()基于协议发送数据。然后DubboProtocol(基于Netty)和HttpProtocol(基于Tomcat)都要实现Protocol接口。然后再提供一个ProtocolFactory工厂,根据传入的类型创建对应的协议。最后通过配置动态切换协议,程序代码不用变。(更进一步)服务提供者向注册中心注册的时候,带上协议,也即URL里面加入一个protocol。服务调用者可以根据拉取到的服务实例中URL的protocol创建对应的协议进行RPC调用。
- (优化升级,向Dubbo靠拢)
1)URL除了protocol,hostname,port,再增加两个信息:interfaceName以及implClass。
2)Protocol接口更改:export和refer方法。
3)实现类DubboProtocol和HttpProtocol。
服务提供者调用DubboProtocol#export向本地以及远程注册URL,并且启动Netty;服务调用者调用DubboProtocol#refer生成DubboInvoker对象。
HttpProtocol#export向本地及远程注册URL,并且启动HttpServer;HttpProtocol#refer生成HttpInvoker对象。
4)Invoker接口:invoke(Invocation);
5)DubboInvoker及HttpInvoker实现Invoker接口。DubboInvoker通过netty实现调用,参数包括url及Invocation。HttpInvoker通过HttpClient实现调用,参数包括url及Invocation。
6)CulsterInvoker也实现了Invoker接口,持有List<Invoker>。CulsterInvoker#join,根据服务接口名称获取List<URL>,然后对每一个URL生成对应的Invoker,加入到List<Invoker>里面。然后在CulsterInvoker#invoke里面进行负载均衡调用。
总结:
- 1)核心是Protocol
- 2)服务提供者,构建URL,然后根据URL中的Protocol获取实际的Protocol,调用其export进行服务导出。向本地以及远程注册URL,并且启动相应的服务器(Netty或者Tomcat)。
- 3)服务调用者,动态代理,其InvocationHandler#invoke中首先创建Invocation,然后根据Invocation中的服务接口名从注册中心拿到对应的List<URL>,然后对每个URL根据其协议生成对应的Invoker(DubboInvoker或者HttpInvoker)。Invoker会根据对应的协议使用对应的客户端进行RPC调用,传递的参数就是Invocation。
- 4)服务提供者,NettyServerHandler#channelRead、HttpServerHandler#handle会从参数Invocation根据服务接口名、版本号获取对应的实现服务(本地服务注册表),然后根据方法名、方法参数进行实际调用。
2.服务导出
几个核心步骤:
- 1)构建服务URL(协议、ip、port)
- 2)启动Tomcat/Netty,要先启动,后注册,否则调用端拉到URL,提供端没启动,就没法调通
- 3)向注册中心注册(接口名:URL)
Dubbo接口级的服务注册 vs SpringCloud应用级的服务注册:
@EnableDubbo
-> @EnableDubboConfig
-> @Import(DubboConfigConfigurationRegistrar.class)
-> DubboConfigConfigurationRegistrar是个ImportBeanDefinitionRegistrar,调用registerBeanDefinitions()
-> DubboSpringInitializer#initialize
-> DubboBeanUtils#registerCommonBeans
-> 注册DubboDeployApplicationListener监听器监听ContextRefreshedEvent事件。
-> DubboDeployApplicationListener#onContextRefreshedEvent
-> DefaultModuleDeployer#start
1)服务导出exportServices();
2)服务引入referServices();
3)应用级注册onModuleStarted();
DubboDeployApplicationListener#onApplicationEvent,监听ContextRefreshedEvent事件。
public void onApplicationEvent(ApplicationContextEvent event) {
if (nullSafeEquals(applicationContext, event.getSource())) {
if (event instanceof ContextRefreshedEvent) {
onContextRefreshedEvent((ContextRefreshedEvent) event);
} else if (event instanceof ContextClosedEvent) {
onContextClosedEvent((ContextClosedEvent) event);
}
}
}
2.1 Dubbo3.0之前接口级的服务注册
注册过程:
- 1)@DubboService,解析注解得到实现类UserServiceImpl和接口UserService,生成一个对象ServiceConfig(包含UserService、UserServiceImpl、version等)
- 2)ServiceConfig#export进行服务导出
2-1)确定协议(应用配置),生成URL;
2-2)启动Netty、Jetty;
2-3)URL存到注册中心;
应用的每个接口都会注册到注册中心里面:
2.2 Dubbo3.0应用级的服务注册
Dubbo3.0为了兼容之前版本,既会进行接口级服务注册,也会进行应用级的服务注册。
注册过程:
- 1)所有接口服务注册完成;(可以关掉register-mode:instance,只进行应用级注册);
- 2)所有接口会向注册中心mapping注册一个<接口名:实例名(应用名)>
- 3)(默认没有进行注册,metadata-type改为remote才启用)会将应用元数据信息(应用提供了哪些Dubbo服务接口)存到元数据中心的metadata(元数据中心和注册中心可以共用一个,也可以分开);(metadata-type为local启动服务接口,在这里进行导出)另外一个方法,服务提供者会提供一个元数据Dubbo服务接口MetadataService,消费者可以调用该服务的getMetadataInfo获取元数据信息。
- 4)然后才进行应用级注册,只用注册一次,应用名: 实例ip+port,这里port默认取Dubbo协议端口,如果没有Dubbo则取第一个协议端口(ServiceInstanceHostPortCustomizer#customize);这里还会存值,包括dubbo.endpoints(port, protocol)。
如果服务暴露了Triple协议+20880端口,并且metadata-type为local,则应用级注册为:ip + 20881(优先是Dubbo的ip+port,这里是MetadataService)。因为此时MetadataService会暴露为Dubbo协议+20881(默认的20880被占用)。
怎么判断端口是否被占用?
- MetadataService -> ServiceConfig#export -> NetUtils#getAvailablePort会对port进行遍历,new ServerSocket(i),如果抛异常,则表示被占用,则尝试i++。
只根据应用级注册的信息,服务消费者(调用者)怎么办?
- 1)根据服务名(接口)可以拿到实例 ip+port。有两个问题,问题一,你怎么根据接口名找到应用名(实例名);问题二,你怎么确定该服务提供者是否导出了?
- 2)问题一,就是通过注册时,注册<接口名:实例名(应用名)>解决;
- 3)问题二,通过元数据中心metadata可以查到该应用实例提供了哪些接口服务、协议、超时时间(然后就可以生成对应的Invoker,比如DubboInvoker、TripleInvoker)
那这里,如果应用配置相同的协议(dubbo)有两个端口,在dubbo.endpoints里怎么处理?
- 只会存一个,这里是个bug,放到Map<Protocol, Port>,这里会覆盖。
- ProtocolPortsMetadataCustomizer#customize
- 因此服务消费者只会生成一个URL,但是服务导出有两个URL。
如果配置Triple协议,有两个端口,强制走接口FORCE_INTERFACE,会有另外一个Bug,也只会调用一个端口,另一个端口无法调用。
- ServiceConfig#doExportUrl,根据接口、实现类对象ref、服务URL生成一个Invoker;JavassistProxyFactory,执行服务接口方法时,最后都会调用到具体实现类ref的相应方法。
- TripleProtocol#export服务导出时,会进行本地注册,放到path2Invoker.put(path, invoker),接口名字 -> invoker(实现类),这里如果协议相同、端口不同,第二个URL生成的Invoker会覆盖第一个URL的Invoker,本地注册有问题。
- 那DubboProtocol#export为什么没有问题,因为本地注册时,它的key生成时,包含了端口号,而不仅仅是接口名字。
元数据中心特点:
- 1)数据不怎么变化
- 2)这里面的数据跟服务消费者(调用者)没有太大的关系
如上图,应用级注册的值里面也会标识metadata是local还是remote,也就是指示服务消费者是到元数据中心查询还是通过Dubbo调用服务提供者的MetadataService来获取元数据信息。
另外还有一个重要应用,通过元数据信息只能查到<接口/服务, 协议>,是没有端口的,那这个端口在哪里?就在应用级注册的值里面:
- metadata -> dubbo.endpoints里面,会有port以及对应的协议。
1)元数据默认方式:
-
服务提供者提供MetadataService,会向注册中心进行注册,用的老一套的接口级注册
2)元数据向元数据中心注册的方式:
3.服务引入
@DubboReference
-> ReferenceConfig
-> ReferenceConfig#get 生成代理对象
-> RegistryProtocol#refer
-> RegistryProtocol#doRefer 生成MigrationInvoker
A)FORCE_INTERFACE,强制使用接口级服务引入
B)FORCE_APPLICATION,强制使用应用级服务引入
C)APPLICATION_FIRST,智能选择是接口级还是应用级,默认就是这个
总结一下整体服务消费者(调用者)的查找流程:
- step1.根据服务名/接口名DemoService,去dubbo/mapping查找应用名
- step2.根据应用名dubbo-springboot-demo-provider去应用级注册信息services查找实例,可能有多个(192.168.65.61:20881),然后找出实例值里面的dubbo.metadata.storage-type(local后者remote)以及dubbo.endpoints(port, protocol)。
- step3.根据dubbo.metadata.storage-type查找实例元数据信息(远程就是元数据中心,如果共用zk,就是dubbo/metadata;如果是local,就是调用dubbo服务MetadataService),获取到服务(接口)对应的协议,例如DemoService:tri
- step4.找到服务接口 + 协议后,根据step2中获取到的dubbo.endpoints,就能获取到服务接口+协议对应的端口
- step5.最后遍历所有实例,生成URL(跟dubbo2.7一样),然后过滤(比如@DubboReference(protocol = "dubbo")),最终生成对应的Invoker(TripleInvoke和DubooInvoker),包装成ClusterInvoker,最后生成接口DemoServcie的代理对象。
tri://实例ip:对应端口/DemoService
dubbo://实例ip:对应端口/DemoService - step6.调用
ClusterInvoker#invoke -> TripleInvoker#invoke -> 使用指定的协议发送请求到服务提供者:实例ip + triple协议端口
ServiceDiscoveryRegistryDirectory#subscribe
-> ServiceDiscoveryRegistry#doSubscribe
-> serviceNameMapping.getAndListen(),获取接口对应的应用名
-> serviceDiscovery.getInstances() 根据应用名,从/services/应用名节点查出所有实例
-> serviceDiscovery.getRemoteMetadata(),获取应用的元数据(从元数据中心或元数据服务获取)
服务消费者监听什么变化?
- 监听 /dubbo/services/应用名节点 下面的变化,就是服务实例重启后发生的变化
- ServiceInstancesChangedListener#doOnEvent
调用serviceDiscovery.getRemoteMetadata(),重新去获取应用的元数据 - 另外缓存metaCacheManager有过期机制,会更新
MigrationInvoker
# dubbo.application.service-discovery.migration 仅支持通过 -D 以及 全局配置中心 两种方式进行配置。
dubbo.application.service-discovery.migration=APPLICATION_FIRST
# 可选值
# FORCE_INTERFACE,强制使用接口级服务引入
# FORCE_APPLICATION,强制使用应用级服务引入
# APPLICATION_FIRST,智能选择是接口级还是应用级,默认就是这个
事实上,在进行某个服务的服务引入时,会统一利用InterfaceCompatibleRegistryProtocol的refer来生成一个MigrationInvoker对象,在MigrationInvoker中有三个属性:
private volatile ClusterInvoker<T> invoker; // 用来记录接口级ClusterInvoker
private volatile ClusterInvoker<T> serviceDiscoveryInvoker; // 用来记录应用级的ClusterInvoker
private volatile ClusterInvoker<T> currentAvailableInvoker; // 用来记录当前使用的ClusterInvoker,要么是接口级,要么应用级
RegistryProtocol#doRefer
- migrationInvoke = getMigrationInvoker()
- RegistryProtocol#interceptInvoker()
-> listener.onRefer()
-> MigrationRuleListener#onRefer
-> MigrationRuleHandler#doMigrate
-> MigrationInvoker#migrateToApplicationFirstInvoker
-> refreshInterfaceInvoker() 接口级ClusterInvoker (2.7版本)
refreshServiceDiscoveryInvoker 应用级ClusterInvoker (3.0版本)
calcPreferredInvoker()选择哪个ClusterInvoker的逻辑
MigrationInvoker#invoke
@Override
public Result invoke(Invocation invocation) throws RpcException {
// currentAvailableInvoker要么是接口级ClusterInvoker,要么是应用级ClusterInvoker
if (currentAvailableInvoker != null) {
if (step == APPLICATION_FIRST) {
// call ratio calculation based on random value
// 在同时支持接口级和应用级的情况下,如果promotion小于100,则每次调用时,生成一个100以内的随机数,如果随机数大于promotion,则走接口级ClusterInvoker进行服务调用
// 表示支持部分走接口级调用,部分走应用级调用,看随机数
// promotion默认等于100,所以默认不会支持部分
if (promotion < 100 && ThreadLocalRandom.current().nextDouble(100) > promotion) {
return invoker.invoke(invocation);
}
}
return currentAvailableInvoker.invoke(invocation);
}
switch (step) {
case APPLICATION_FIRST:
if (checkInvokerAvailable(serviceDiscoveryInvoker)) {
currentAvailableInvoker = serviceDiscoveryInvoker;
} else if (checkInvokerAvailable(invoker)) {
currentAvailableInvoker = invoker;
} else {
currentAvailableInvoker = serviceDiscoveryInvoker;
}
break;
case FORCE_APPLICATION:
currentAvailableInvoker = serviceDiscoveryInvoker;
break;
case FORCE_INTERFACE:
default:
currentAvailableInvoker = invoker;
}
return currentAvailableInvoker.invoke(invocation);
}
4.服务调用
4.1 服务导出TripleProtocol协议的流程
TripleProtocol#export
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
checkProtobufVersion(url);
// 本地注册--->实例类
String key = serviceKey(url);
// 服务导出器,用来卸载服务时做一些善后处理
final AbstractExporter<T> exporter = new AbstractExporter<T>(invoker) {
@Override
public void afterUnExport() {
pathResolver.remove(url.getServiceKey());
pathResolver.add(url.getServiceModel().getServiceModel().getInterfaceName(),
invoker);
// set service status
triBuiltinService.getHealthStatusManager()
.setStatus(url.getServiceKey(), ServingStatus.NOT_SERVING);
triBuiltinService.getHealthStatusManager()
.setStatus(url.getServiceInterface(), ServingStatus.NOT_SERVING);
exporterMap.remove(key);
}
};
exporterMap.put(key, exporter);
invokers.add(invoker);
pathResolver.add(url.getServiceKey(), invoker); // url 20882
pathResolver.add(url.getServiceModel().getServiceModel().getInterfaceName(), invoker);
// set service status
triBuiltinService.getHealthStatusManager()
.setStatus(url.getServiceKey(), HealthCheckResponse.ServingStatus.SERVING);
triBuiltinService.getHealthStatusManager()
.setStatus(url.getServiceInterface(), HealthCheckResponse.ServingStatus.SERVING);
// 启动服务器,用来处理HTTP2的请求
PortUnificationExchanger.bind(invoker.getUrl());
return exporter;
}
PortUnificationExchanger#bind
public static void bind(URL url) {
// servers表示可以同时运行多个PortUnificationServer,只需要绑定的host+port不一样即可
servers.computeIfAbsent(url.getAddress(), addr -> {
final PortUnificationServer server = new PortUnificationServer(url);
// 运行NettyServer,并绑定ip和port
server.bind();
return server;
});
}
public void bind() {
if (channel == null) {
doOpen();
}
}
protected void doOpen() {
bootstrap = new ServerBootstrap();
bossGroup = NettyEventLoopFactory.eventLoopGroup(1, EVENT_LOOP_BOSS_POOL_NAME);
workerGroup = NettyEventLoopFactory.eventLoopGroup(
getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
EVENT_LOOP_WORKER_POOL_NAME);
bootstrap.group(bossGroup, workerGroup)
.channel(NettyEventLoopFactory.serverSocketChannelClass())
.option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// FIXME: should we use getTimeout()?
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
// ch是Socket连接
final ChannelPipeline p = ch.pipeline();
// p.addLast(new LoggingHandler(LogLevel.DEBUG));
final boolean enableSsl = getUrl().getParameter(SSL_ENABLED_KEY, false);
if (enableSsl) {
p.addLast("negotiation-ssl", new SslServerTlsHandler(getUrl()));
}
// 初始化SocketChannel,并在pipeline中绑定PortUnificationServerHandler
final PortUnificationServerHandler puHandler = new PortUnificationServerHandler(url, protocols);
p.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS));
p.addLast("negotiation-protocol", puHandler);
channelGroup = puHandler.getChannels();
}
});
// bind
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
bindIp = ANYHOST_VALUE;
}
InetSocketAddress bindAddress = new InetSocketAddress(bindIp, bindPort);
ChannelFuture channelFuture = bootstrap.bind(bindAddress);
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
核心是PortUnificationServerHandler。
ByteToMessageDecoder#channelRead
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance();
try {
first = cumulation == null;
cumulation = cumulator.cumulate(ctx.alloc(),
first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
try {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++numReads >= discardAfterReads) {
// We did enough reads already try to discard some bytes so we not risk to see a OOME.
// See https://github.com/netty/netty/issues/4275
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
firedChannelRead |= out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
} finally {
out.recycle();
}
}
} else {
ctx.fireChannelRead(msg);
}
}
ByteToMessageDecoder#callDecode
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while (in.isReadable()) {
int outSize = out.size();
if (outSize > 0) {
fireChannelRead(ctx, out, outSize);
out.clear();
// Check if this handler was removed before continuing with decoding.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See:
// - https://github.com/netty/netty/issues/4635
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
int oldInputLength = in.readableBytes();
decodeRemovalReentryProtection(ctx, in, out);
// Check if this handler was removed before continuing the loop.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See https://github.com/netty/netty/issues/1664
if (ctx.isRemoved()) {
break;
}
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Exception cause) {
throw new DecoderException(cause);
}
}
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
decodeState = STATE_CALLING_CHILD_DECODE;
try {
decode(ctx, in, out);
} finally {
boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
decodeState = STATE_INIT;
if (removePending) {
fireChannelRead(ctx, out, out.size());
out.clear();
handlerRemoved(ctx);
}
}
}
这里会调用PortUnificationServerHandler#decode
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// ctx.channel() 拿到的是NioSocketChannel 接收到数据ByteBuf后,会先解码,然后再触发Handler的channelRead
// 根据前5个字节确定对应的协议
// Will use the first five bytes to detect a protocol.
if (in.readableBytes() < 5) {
return;
}
// 看是不是HTTP2.0
for (final WireProtocol protocol : protocols) {
in.markReaderIndex();
final ProtocolDetector.Result result = protocol.detector().detect(ctx, in);
in.resetReaderIndex();
switch (result) {
case UNRECOGNIZED:
continue;
case RECOGNIZED:
// 符合个某个协议后,再給Socket连接对应的pipeline绑定Handler
protocol.configServerPipeline(url, ctx.pipeline(), sslCtx);
ctx.pipeline().remove(this);
case NEED_MORE_DATA:
return;
default:
return;
}
}
// Unknown protocol; discard everything and close the connection.
in.clear();
ctx.close();
}
总结一下后续流程:
- Http2ProtocolDetector#detect,如何要建立的是一个HTTP2连接,那么在建立完Socket连接后,客户端会发送一个连接前言,也就是一串字节(对应的字符串为:“PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n”),给到服务端,服务端从而知道要建立的是一个HTTP2的连接
- 符合HTTP2协议后,会删除本身的Handler,通过TripleHttp2Protocol#configServerPipeline添加处理Triple协议的Handler
1)Http2FrameCodec,配置HTTP2,比如max-concurrent-streams、max-frame-size;
2)TripleServerConnectionHandler用来处理Http2PingFrame、Http2GoAwayFrame;
3)Http2MultiplexHandler是用来创建子Channel的,并且ChannelInitializer是用来初始化子Channel,一个Socket连接对应一个NioSocketChannel,下层可以设置多个子Channel,每个子Channel对应一个HTTP2Stream(TripleCommandOutBoundHandler:将QueuedCommand转换成Http2StreamFrame,然后再发出去;TripleHttp2FrameServerHandler:处理的是HTTP2Stream所对应的子Channel,核心Handler, 用来处理Http2HeadersFrame、Http2DataFrame, lookupExecutor会根据服务url得到一个线程池,每个子Channel对应一个线程池?还是共享一个线程池?默认是共享一个。)
4)TripleTailHandler释放ByteBuf的内存空间
重点看一下TripleHttp2Protocol#configServerPipeline
// Http2MultiplexHandler是用来创建子Channel的,并且ChannelInitializer是用来初始化子Channel
// 一个Socket连接对应一个NioSocketChannel,下层可以设置多个子Channel,每个子Channel对应一个HTTP2Stream
final Http2MultiplexHandler handler = new Http2MultiplexHandler(
// Channel初始化器,用来初始化传入进来的Channel,比如HTTP2Stream所对应的Channel
new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
final ChannelPipeline p = ch.pipeline();
// 将QueuedCommand转换成Http2StreamFrame,然后再发出去
p.addLast(new TripleCommandOutBoundHandler());
// TripleHttp2FrameServerHandler处理的是HTTP2Stream所对应的子Channel
// 核心Handler, 用来处理Http2HeadersFrame、Http2DataFrame, lookupExecutor会根据服务url得到一个线程池
// 每个子Channel对应一个线程池?还是共享一个线程池?默认是共享一个
p.addLast(new TripleHttp2FrameServerHandler(frameworkModel, lookupExecutor(url),
filters));
}
});
核心是TripleHttp2FrameServerHandler。
4.2 服务引入TripleProtocol协议的流程
public TripleInvoker(Class<T> serviceType,
URL url,
String acceptEncodings,
ConnectionManager connectionManager,
Set<Invoker<?>> invokers,
ExecutorService streamExecutor) {
super(serviceType, url, new String[]{INTERFACE_KEY, GROUP_KEY, TOKEN_KEY});
this.invokers = invokers;
// 与服务提供者建立Socket连接
this.connection = connectionManager.connect(url);
this.acceptEncodings = acceptEncodings;
this.streamExecutor = streamExecutor;
}
MultiplexProtocolConnectionManager#connect
public Connection connect(URL url) {
// 协议相同的URL将对应同一个ConnectionManager
final ConnectionManager manager = protocols.computeIfAbsent(url.getProtocol(), this::createSingleProtocolConnectionManager);
return manager.connect(url);
}
SingleProtocolConnectionManager#connect
public Connection connect(URL url) {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
// 不同的address对应不同的Connection对象
return connections.compute(url.getAddress(), (address, conn) -> {
if (conn == null) {
final Connection created = new Connection(url);
created.getClosePromise().addListener(future -> connections.remove(address, created));
return created;
} else {
conn.retain();
return conn;
}
});
}
public Connection(URL url) {
url = ExecutorUtil.setThreadName(url, "DubboClientHandler");
url = url.addParameterIfAbsent(THREADPOOL_KEY, DEFAULT_CLIENT_THREADPOOL);
this.url = url;
this.protocol = ExtensionLoader.getExtensionLoader(WireProtocol.class).getExtension(url.getProtocol());
this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);
this.remote = getConnectAddress();
// 只是创建一个Bootstrap对象,并不会建立Socket连接
this.bootstrap = create();
}
private Bootstrap create() {
final Bootstrap bootstrap = new Bootstrap();
bootstrap.group(NettyEventLoopFactory.NIO_EVENT_LOOP_GROUP.get())
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.remoteAddress(remote)
.channel(socketChannelClass());
final ConnectionHandler connectionHandler = new ConnectionHandler(this);
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
final ChannelPipeline pipeline = ch.pipeline();
SslContext sslContext = null;
if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
pipeline.addLast("negotiation", new SslClientTlsHandler(url));
}
//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
// TODO support IDLE
// int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
pipeline.addLast(connectionHandler);
protocol.configClientPipeline(url, pipeline, sslContext);
// TODO support Socks5
}
});
return bootstrap;
}
构造TripleInvoker,创建connection只是创建一个Bootstrap对象,并不会建立Socket连接。
4.3 调用逻辑
4.3.1 服务消费者发送请求
那Socket连接什么时候创建呢?
- 第一次使用TripleInvoker#invoke
-> TripleInvoker#doInvoke
-> Connection#isAvailable
-> Connection#connect
-> Bootstrap#connect()
StreamObserver<String> streamObserver = demoService.sayHelloBiStream(new ZhouyuResultStreamObserver());
streamObserver.onNext("zhouyu1");
streamObserver.onCompleted();
TripleInvoker#doInvoke
- 1)检查Socket连接是否可用,如果不可用并且没有初始化,那就连接服务端创建Socket连接
- 2)拿到服务接口信息和当前调用的方法信息
- 3)根据方法区分调用类型
UNARY -> invokeUnary();
SERVER_STREAM -> invokeServerStream();
CLIENT_STREAM/ BI_STREAM -> invokeBiOrClientStream();
TripleInvoker#invokeBiOrClientStream
- 1)拿到处理响应的responseObserver,也就是上面调用方法时传入的ZhouyuResultStreamObserver;
- 2)创建requestObserver,streamCall方法中就会创建一个Stream,并且返回一个StreamObserver对象,可以利用这个requestObserver来向Stream中发送数据,将responseObserver封装为ClientCall.Listener,ClientCall.Listener是用来接收响应数据的。
TripleInvoker#streamCall
- 将responseObserver封装为ClientCall.Listener
- call.start()返回一个ClientStreamObserver,用来发送数据
ClientCall#start
- this.stream = new ClientStream(),在构造方法里面, this.writeQueue = createWriteQueue(parent);
ClientStream#createWriteQueue
private WriteQueue createWriteQueue(Channel parent) {
// 利用Netty开启一个Http2StreamChannel,也就是HTTP2中的流
final Http2StreamChannelBootstrap bootstrap = new Http2StreamChannelBootstrap(parent);
final Future<Http2StreamChannel> future = bootstrap.open().syncUninterruptibly();
if (!future.isSuccess()) {
throw new IllegalStateException("Create remote stream failed. channel:" + parent);
}
// 并绑定两个Handler,一个工作在发送数据时,一个工作在接收数据时
final Http2StreamChannel channel = future.getNow();
channel.pipeline()
.addLast(new TripleCommandOutBoundHandler())
// TripleHttp2ClientResponseHandler是用来接收响应的
.addLast(new TripleHttp2ClientResponseHandler(createTransportListener()));
// 基于Http2StreamChannel创建一个WriteQueue
// 后续把要发送的数据,添加到WriteQueue中就能发送出去了
return new WriteQueue(channel);
}
当用户程序调用streamObserver.onNext()发送数据时,实际调用的是:
- ClientCallToObserverAdapter#onNext
- ClientCall#sendMessage
1)发送请求头
2)发送请求体
把要发送的message进行序列化,得到字节数组;
看是否需要压缩数据;
stream.writeMessage(compress, compressed);发送数据,发送的是请求体,在请求体的最开始最记录当前请求体是否被压缩,压缩只会的数据长度是多少(DataQueueCommand,表示HTTP2中的Http2DataFrame,用的就是gRPC发送请求体的格式);
发送的核心逻辑WriteQueue#enqueue(QueuedCommand):
- queue.add(command);这里的queue是ConcurrentLinkedQueue
- scheduleFlush();调用channel.eventLoop().execute(this::flush);
- 发送逻辑在WriteQueue#flush
1)while循环从queue.poll()获取发送数据cmd
2)cmd.run(channel);将数据帧添加到Http2StreamChannel中,添加并不会立马发送,调用了flush才发送
3)连续从队列中取到了128个数据帧就flush一次,channel.flush();这里处理高并发情况,128就发送一次
4)如果i != 0,但是没有达到128,也会发送
具体的发送:
- 1)TripleCommandOutBoundHandler#write
- 2)QueuedCommand#send
- 3)DataQueueCommand#doSend
DataQueueCommand#doSend
public void doSend(ChannelHandlerContext ctx, ChannelPromise promise) {
if (data == null) {
ctx.write(new DefaultHttp2DataFrame(endStream), promise);
} else {
ByteBuf buf = ctx.alloc().buffer();
// 第一个字节记录请求体是否被压缩
buf.writeByte(compressFlag);
// 后四个字节记录请求体的长度
buf.writeInt(data.length);
// 真实的数据
buf.writeBytes(data);
// 发送
ctx.write(new DefaultHttp2DataFrame(buf, endStream), promise);
}
}
总结一下:
- 1)ClientCallToObserverAdapter#onNext
1-1)ClientStream#sendHeader发送请求头,HeaderQueueCommand,endStream是false;
1-2)ClientStream#writeMessage发送请求体,DataQueueCommand,endStream是false; - 2)ClientCallToObserverAdapter#onCompleted,DefaultHttp2DataFrame,endStream是true。
4.3.2 服务提供者处理请求逻辑
TripleHttp2FrameServerHandler#channelRead
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof Http2HeadersFrame) {
onHeadersRead(ctx, (Http2HeadersFrame) msg);
} else if (msg instanceof Http2DataFrame) {
onDataRead(ctx, (Http2DataFrame) msg);
} else if (msg instanceof ReferenceCounted) {
// ignored
ReferenceCountUtil.release(msg);
}
}
处理请求头TripleHttp2FrameServerHandler#onHeadersRead
- 1)收到一个Http2HeadersFrame时,生成一个ServerStream,和ClientStream对应,此处ctx.channel()拿到的是子Channel, 对应的是Http2StreamChannel,表示流
- 2)ServerStream.ServerTransportObserver#processHeader
A)各种校验
B)获取接口名(服务名)serviceName以及方法名originalMethodName,根据(serviceName, group, version)从本地注册表pathResolver中取出Invoker。
C)得到数据解压器TriDecoder,会解压数据,并把解压后的数据交给ServerDecoderListener进行处理
D)创建ReflectionServerCall,传入进来的executor是SerializingExecutor,但是在构造方法里会再包一层,以call里的executor为 SerializingExecutor(SerializingExecutor(ThreadPoolExecutor))SerializingExecutor;并调用ReflectionServerCall#startCall
D-1)ReflectionServerCall#doStartCall
D-2)调用至ReflectionServerCall.ServerStreamListenerImpl#startCall
D-3)调用至ReflectionServerCall.ServerStreamListenerImpl#trySetListener
D-4)调用至ServerCall#startInternalCall,这里会创建ServerCall.Listener:
UNARY:UnaryServerCallListener
SERVER_STREAM:ServerStreamServerCallListener
BI_STREAM或CLIENT_STREAM:BiStreamServerCallListener
处理请求体TripleHttp2FrameServerHandler#onDataRead
- 1)ServerStream.ServerTransportObserver#doOnData
- 2)deframer.deframe(data);这里deframer就是TriDecoder
- 3)TriDecoder#deframe处理请求体
- 4)TriDecoder#deliver
4-1)processHeader();处理请求体的前5个字节,第一个字节表示是否压缩;后四个字节表示请求体长度。
4-2)processBody();处理实际发过来的数据。如果压缩了,先解压缩。
ServerCall.ServerStreamListenerBase#onMessage
ReflectionServerCall.ServerStreamListenerImpl#doOnMessage
A)obj = packableMethod.getRequestUnpack().unpack(message);把解压之后的字节数据进行反序列化
B)listener.onMessage(obj);
UNARY:UnaryServerCallListener
SERVER_STREAM:ServerStreamServerCallListener
BI_STREAM或CLIENT_STREAM:BiStreamServerCallListener
UNARY调用逻辑(同步阻塞):
- 1)ClientCallToObserverAdapter#onNext发送请求头和请求体。
服务提供者接收到请求体才执行:UnaryServerCallListener#onMessage这里invocation.setArguments()会把接收到的数据作为方法参数,但是还没有调用Invoker#invoke方法。 - 2)ClientCallToObserverAdapter#onCompleted发送空的请求体,但是endStream是true。
服务提供者:ServerStream.ServerTransportObserver#doOnData中,endStream是true,会调用deframer.close();
TriDecoder#close,这里closing = true;
-> TriDecoder#deliver()
-> ServerStream.ServerTransportObserver.ServerDecoderListener#close
-> ReflectionServerCall.ServerStreamListenerImpl#complete
-> UnaryServerCallListener#onComplete 这里会调用invoke()
A)会执行final Result response = invoker.invoke(invocation);
B)onReturn(r.getValue());实际为UnaryServerCallListener#onReturn方法执行完后,把方法结果写回给客户端。
B-1)responseObserver.onNext(value);
B-2)responseObserver.onCompleted(TriRpcStatus.OK);
SERVER_STREAM调用逻辑(异步):
- 1)ClientCallToObserverAdapter#onNext发送请求头和请求体。
服务提供者接收到请求体才执行:ServerStreamServerCallListener#onMessage这里invocation.setArguments(new Object[]{message, responseObserver});把接收到的数据和ServerCallToObserverAdapter作为方法调用的参数,ServerCallToObserverAdapter可以用来可以客户端发送数据。 - 2)ServerStreamServerCallListener#onComplete也是调用invoke()。
在Invoker#invoker里面就可以调用responseObserver向客户端发送响应。
public void sayHelloServerStream(String name, StreamObserver<String> response) {
response.onNext(name + " hello");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
response.onNext(name + " world");
response.onCompleted();
}
BI_STREAM或CLIENT_STREAM(异步):
- 1)ClientCallToObserverAdapter#onNext发送请求头;
服务提供者接收到请求头就执行,在BiStreamServerCallListener构造方法里面:
1-1)invocation.setArguments(new Object[]{responseObserver});构造监听器的时候,把服务端流对象设置为业务方法参数;
1-2)invoke(); 执行业务方法-->onReturn,在onReturn里面this.requestObserver = (StreamObserver<Object>) value;会将程序员创建的StreamObserver赋值给requestObserver。 - 2)ClientCallToObserverAdapter#onNext发送请求体:
BiStreamServerCallListener#onMessage调用requestObserver.onNext(message);也即程序员写的StreamObserver.onNext()。
public StreamObserver<String> sayHelloBiStream(StreamObserver<String> response) {
return new StreamObserver<String>() {
@Override
public void onNext(String name) {
System.out.println(name);
response.onNext("hello: "+name);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCompleted() {
System.out.println("completed");
}
};
}
4.3.3 客户端接收到响应TripleHttp2ClientResponseHandler
TripleHttp2ClientResponseHandler#channelRead0
protected void channelRead0(ChannelHandlerContext ctx, Http2StreamFrame msg) throws Exception {
if (msg instanceof Http2HeadersFrame) {
final Http2HeadersFrame headers = (Http2HeadersFrame) msg;
transportListener.onHeader(headers.headers(), headers.isEndStream());
} else if (msg instanceof Http2DataFrame) {
final Http2DataFrame data = (Http2DataFrame) msg;
transportListener.onData(data.content(), data.isEndStream());
} else {
super.channelRead(ctx, msg);
}
}
ClientStream.ClientTransportListener#onData
public void onData(ByteBuf data, boolean endStream) {
executor.execute(() -> {
// transportError不等于null,表示处理响应头时就有问题了
if (transportError != null) {
transportError.appendDescription(
"Data:" + data.toString(StandardCharsets.UTF_8));
// 释放内存空间
ReferenceCountUtil.release(data);
//
if (transportError.description.length() > 512 || endStream) {
handleH2TransportError(transportError);
}
return;
}
if (!headerReceived) {
handleH2TransportError(TriRpcStatus.INTERNAL.withDescription(
"headers not received before payload"));
return;
}
// 接收到响应体数据后,把数据添加到accumulate中进行保存
deframer.deframe(data);
});
}
ClientStream.ClientTransportListener#onData
-> TriDecoder#deframe
-> TriDecoder#deliver
-> TriDecoder#processBody
-> ClientCall.ClientStreamListenerImpl#onMessage
-> listener.onMessage(unpacked);有两种方式:同步和异步
UnaryClientCallListener#onMessage同步
ObserverToClientCallListenerAdapter#onMessage异步
ObserverToClientCallListenerAdapter#onMessage异步
public void onMessage(Object message) {
// 接收到一个响应结果,回调StreamObserver
delegate.onNext(message);
// 继续处理下一个响应结果
if (call.isAutoRequestN()) {
call.requestN(1);
}
}
上面的delegate就是服务消费者这边创建的ZhouyuResultStreamObserver:
static class ZhouyuResultStreamObserver implements StreamObserver<String> {
@Override
public void onNext(String data) {
System.out.println(data);
}
@Override
public void onError(Throwable throwable) {
System.out.println(throwable);
}
@Override
public void onCompleted() {
System.out.println("complete");
}
}
梳理一下客户端AbstractInvoker#invoke整个流程
- 1)AsyncRpcResult asyncResult = doInvokeAndReturn(invocation);
1-1)TripleInvoker#doInvoke
A)UNARY,invokeUnary()返回是一个正常的CompletableFuture,未完成,后面就会阻塞。
B)SERVER_STREAM,invokeServerStream()返回return new AsyncRpcResult(CompletableFuture.completedFuture(new AppResponse()), invocation);直接返回一个已经完成了的CompletableFuture,外层invoke阻塞的地方将直接通过,异步的效果。
C)CLIENT_STREAM和BI_STREAM,invokeBiOrClientStream()返回的也是一个已经完成了的CompletableFuture。 - 2) waitForResultIfSync(asyncResult, invocation);如果需要同步就阻塞
2-1)responseFuture.get(timeout, unit),这里就会判断responseFuture,如果没有完成就阻塞。
同步阻塞什么时候唤醒?
- 服务提供者调用StreamObserver#onCompleted后,服务消费者就会调用到UnaryClientCallListener#onClose
- 这里future.received(status, result);触发DeadlineFuture完毕(调用CompletableFuture#complete),解开TripleInvoker的invoke方法中的同步阻塞
4.3.4 SERVER_STREAM的一个BUG
服务提供者逻辑如下。
public void sayHelloServerStream(String name, StreamObserver<String> response) {
response.onNext(name + " hello");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
response.onNext(name + " world");
response.onCompleted();
}
预期是服务消费者拿到一个数据帧后,停3s,然后再拿到后续的数据。但是Dubbo这里是直接等3s,然后一下子拿到了所有三个数据。
整体流程:
- 服务提供者处理请求TripleHttp2FrameServerHandler#onDataRead
1)executor.execute(() -> doOnData(data, endStream));这里线程池是SerializingExecutor
2)利用一个线程去执行SerializingExecutor中的run方法,从runQueue获取任务执行,也就是执行上面的lambda表达式。 - 服务提供者获取到请求体后就会调用invoker.invoke(invocation)执行具体的方法;
- 在服务方法里面会调用responseObserver.onNext,也即ServerCallToObserverAdapter#onNext向客户端发送数据。
1)executor.execute(writeMessage);将Runnable添加到队列中,此处的executor为SerializingExecutor(SerializingExecutor(ThreadPoolExecutor)),内部的SerializingExecutor(ThreadPoolExecutor)就是用来接收请求数据的。
执行效果:
A)首先将writeMessage加入到外面的SerializingExecutor队列中;
B)然后执行里面SerializingExecutor的execute时,将writeMessage加入到里面SerializingExecutor的队列中,然后由于里面SerializingExecutor的状态已经改为了true,所以就没法执行下去,直接返回了,所以这了数据并没有发送出去。 - 那什么时候数据才能发出去呢?就是要里面SerializingExecutor的状态改为false才可以,这时候需要整个方法sayHelloServerStream()都执行完后才行。
建议更改:
- ServerCall#writeMessage中executor.execute(writeMessage)直接改为doWriteMessage(message),不需要添加到线程池,直接发送即可。
SerializingExecutor#execute
public void execute(Runnable r) {
// SerializingExecutor会对加入到runQueue中的Runnable用一个线程进行串行处理
// 将Runnable任务添加到队列
runQueue.add(r);
// 使用内部线程池executor中的一个线程来运行
schedule(r);
}
SerializingExecutor#schedule
private void schedule(Runnable removable) {
if (atomicBoolean.compareAndSet(false, true)) {
boolean success = false;
try {
// SerializingExecutor内部保护了一个线程池executor,这个线程池是根据服务url创建出来的
// 注意:这里并不是把runQueue队列中的Runnable任务拿出来用线程去执行
// 而是把SerializingExecutor自己作为一个Runnable交给线程池中的一个线程去执行
// 这里其实就是利用一个线程去执行SerializingExecutor中的run方法,从而获取runQueue中的任务进行执行
executor.execute(this);
success = true;
} finally {
// It is possible that at this point that there are still tasks in
// the queue, it would be nice to keep trying but the error may not
// be recoverable. So we update our state and propagate so that if
// our caller deems it recoverable we won't be stuck.
if (!success) {
if (removable != null) {
// This case can only be reached if 'this' was not currently running, and we failed to
// reschedule. The item should still be in the queue for removal.
// ConcurrentLinkedQueue claims that null elements are not allowed, but seems to not
// throw if the item to remove is null. If removable is present in the queue twice,
// the wrong one may be removed. It doesn't seem possible for this case to exist today.
// This is important to run in case of RejectedExecutionException, so that future calls
// to execute don't succeed and accidentally run a previous runnable.
runQueue.remove(removable);
}
atomicBoolean.set(false);
}
}
}
}
SerializingExecutor#run
public void run() {
Runnable r;
try {
while ((r = runQueue.poll()) != null) {
try {
r.run();
} catch (RuntimeException e) {
LOGGER.error("Exception while executing runnable " + r, e);
}
}
} finally {
atomicBoolean.set(false);
}
// 如果队列中不为空,则继续获取一个线程执行run(),继续获取队列中的任务进行执行
if (!runQueue.isEmpty()) {
// we didn't enqueue anything but someone else did.
schedule(null);
}
}
网友评论