Dubbo 自 2011 年 10 月 27 日开源后,已被许多非阿里系的公司使用,其中既有当当网、网易考拉等互联网公司,也不乏中国人寿、青岛海尔等大型传统企业。
自去年 12 月开始,Dubbo 3.0 便已正式进入开发阶段,并备受社区和广大 Dubbo 用户的关注,本文将为您详细解读 3.0 预览版的新特性和新功能。
下面先解答一下两个有意思的与 Dubbo 相关的疑问。
- 为什么 Dubbo 一开源就是 2.0 版本?之前是否存在 1.0 版本?
- 笔者曾做过 Dubbo 协议的适配兼容,Dubbo 确实存在过 1.x 版本,而且从协议设计和模型设计上都与 2.0 的开源版本协议是完全不一样的。下图是关于 Dubbo 的发展路径:
- 阿里内部正在使用 Dubbo 开源版本吗?
- 是的,非常确定,当前开源版本的 Dubbo 在阿里巴巴被广泛使用,而阿里的电商核心部门是用的 HSF2.2 版本,这个版本是兼容了 Dubbo 使用方式和 Remoting 协议。当然,我们现在正在做 HSF2.2 的升级,直接依赖开源版本的 Dubbo 来做内核的统一。所以,Dubbo 是得到大规模线上系统验证的分布式服务框架,这一点毋容置疑。
Dubbo 3.0 预览版的要点
Dubbo 3.0 在设计和功能上的新增支持和改进,主要是以下四方面:
- Dubbo 内核之 Filter 链的异步化
- 这里要指出的是,3.0 中规划的异步去阻塞和 2.7 中提供的异步是两个层面的特性。2.7 中的异步是建立在传统 RPC 中 request – response 会话模型上的,而 3.0 中的异步将会从通讯协议层面由下向上构建,关注的是跨进程、全链路的异步问题。通过底层协议开始支持 streaming 方式,不单单可以支持多种会话模型,还可以在协议层面开始支持反压、限流等特性,使得整个分布式体系更具有弹性。综上所述,2.7 关注的异步更局限在点对点的异步(一个 consumer 调用一个 provider),3.0 关注的异步化,宽度上则关注整个调用链上的异步,高度上则向上又可以包装成 Rx 的编程模型。有趣的是,Spring 5.0 发布了对 Flux 的支持,随后开始解决跨进程的异步问题。
- 功能方面是 reactive(响应式)支持
- 最近几年, reactive programming这个词语的热度迅速提升,Wikipedia 上的 reactive programming 解释是 reactive programming is a programming paradigm oriented around data flows and the propagation of change. Dubbo3.0会实现Reactive Stream 的 rx 接口,从而能让用户享受到RP带来的响应性提升,甚至面向 RP 的架构升级。当然,我们希望 reactive 不单单能够带来事件(event)驱动的应用集成方式的升级,也希望在 Load Balance(选择最优的服务节点),fault tolerance(限流降级时最好做到自适应)等方面发挥其积极价值。
- 云原生/ ServiceMesh 方向的探索
- 我们定下的策略是进入 Envoy 社区来实现 Dubbo 融入 mesh 的理念思想,目前 Dubbo 协议已经被 Envoy 支持。当然,Dubbo Mesh 离真正可用还有很长一段距离,其在选址、负载均衡和服务治理方面的工作需要继续在数据面建设,另外,控制面板的建设在社区也没有提上日程。
- 融合并支持阿里内部
- Dubbo 3.0 定下了内外融合的策略,也就是说 3.0 的核心最终会在阿里巴巴的生产系统中部署,相信通过大流量、大规模的考验,Dubbo 用户可以获得一个性能、稳定、服务治理实践各方面俱佳的核心,用户在生产系统中采用 3.0 也会更加放心。这一点也是 Dubbo 3.0 最重要的使命。
Filter 链的异步化设计
Dubbo 最强大的一处设计是其在 Filter 链上的抽象设计,通过其扩展机制的开放性支持,用户可以对 Dubbo 做功能增强,并允许各个扩展点被定制来是否保留。
Dubbo 的 Filter 定义如下:
<pre style="-webkit-tap-highlight-color: transparent; box-sizing: border-box; font-family: Consolas, Menlo, Courier, monospace; font-size: 16px; white-space: pre-wrap; position: relative; line-height: 1.5; color: rgb(153, 153, 153); margin: 1em 0px; padding: 12px 10px; background: rgb(244, 245, 246); border: 1px solid rgb(232, 232, 232); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">
@SPI
public
interface
Filter
{
/**
-
do invoke filter.
-
<p>
-
<code>
-
// before filter
-
Result result = invoker.invoke(invocation);
-
// after filter
-
return result;
-
</code>
-
@param invoker service
-
@param invocation invocation.
-
@return invoke result.
-
@throws RpcException
-
@see org.apache.dubbo.rpc.Invoker#invoke(Invocation)
*/
Result
invoke(
Invoker
<?> invoker,
Invocation
invocation)
throws
RpcException
;
}
</pre>
按照“调用一个远程服务的方法就像调用本地的方法一样”这种说法,这个直接返回 Result 响应的方式是非常好的,用起来是简单直接,问题是时代变换到了需要关注体验,需要走 Reactive 响应式的时代,也回到基本点:invoke一个 invocation 需要经过网络在不同的进程处理,天然就是异步的过程,也就是发送请求(invocation)与接收响应(Result)本身是两个不同的事件,是需要两个过程方法来在 Filter 链处理。那么如何改造这个关键的 SPI 呢?有两种方案:
第一种,把 invoke 的返回值改成 CompletableFuture, 好处是一目了然,Result 不在建议同步获取了;但基础接口的签名一改会导致代码改造量巨大,同时也会让原有的 SPI 扩展不在支持。
第二种,Result 接口直接继承 CompletationStage,是代表了响应的异步计算。这样能进避免第一种的劣势。所以,3.0.0 Preview 版本对内部调用链路实现做了一次重构:基于 CompletableFuture 实现了框架内部的全异步调用,而在外围编程上,同时支持同步、异步调用模式。
值得注意的是,此次重构仅限于框架内部实现,对使用方没有任何影响即接口上保持完全兼容。要了解 Dubbo 异步 API 如何使用,请参考《如何基于 Dubbo 实现全异步的调用链》(地址:http://dubbo.apache.org/zh-cn/blog/dubbo-new-async.html),这篇文章将着重对实现思路和原理做一些简单介绍。此次重构的要点有:
- 框架内部采用全异步调用模型,仅在外围做同步、异步适配;
- 内置Filter链支持异步回调;
基本工作流程
首先我们来看一个通用的跨网络异步调用的线程模型:
image通信框架异步发送请求消息,请求消息发送成功后,返回代表业务结果的 CompletableFuture 给业务线程。之后对于 Future 的处理,根据调用类型会有所区别:
- 对于同步请求(如上图体现的场景),业务线程会调用 future.get 同步阻塞等待结果,当收到网络层返回的业务结果后,future.get 返回并最终将结果传递给调用发起方。
- 对于异步请求,业务线程不会调用 future.get,而是将 future 保存在调用上下文或者直接返回给调用者,同时会为 future 注册回调监听器,以便当真正的业务结果从通信层返回时监听器可以对结果做进一步的处理。
接下来具体看一下一次异步 Dubbo RPC 请求的调用流程:
image- 消费方面向 Proxy 代理编程,发出调用请求,请求经过 Filter 链向下传递。
- Invoker.invoke() 将请求异步转发给网络层,并收到代表返回结果的 Future。
- Future 被包装到 Result,转而由 Result 代表这次远程调用的结果(由于 Result 的异步属性,此时它可能并不包含真正的返回值)。
- Result 继续沿着调用链返回,在经过每个 Filter 时,Filter 可选择注册 Listener 监听器,以便在业务结果返回时执行结果预处理。
- 最终 Proxy 调用 result.recreate() 将结果返回给消费者:
- 如果方法是 CompletableFuture 签名,则返回 Future;
- 如果方法是普通同步签名,则返回对象默认值,Future 可通过 RpcContext 拿到;
6. 调用方在拿到代表异步业务结果的 Future 后,可选择注册回调监听器,以监听真正的业务结果返回。
image同步调用和异步调用基本上是一致的,并且也是走的回调模式,只是在链路返回之前做了一次阻塞 get 调用,以确保在收到实际结果时再返回。Filter 在注册 Listener 时由于 Future 已处于 complete 状态,因此会同时触发回调 onResponse()/onError()。
关于流程图中提到的 Result,Result 在 Dubbo 的一次 RPC 调用中代表返回结果,在 3.0 中 Result 自身增加了代表状态的接口,类似 Future 现在 Result 可以代表一次未完成的调用。
要让 Result 具备代表异步返回结果的能力,有两中方式来实现:
1. Result is a Future,在 Java 8 中更合理的方式是继承 CompletionStage 接口。
<pre style="-webkit-tap-highlight-color: transparent; box-sizing: border-box; font-family: Consolas, Menlo, Courier, monospace; font-size: 16px; white-space: pre-wrap; position: relative; line-height: 1.5; color: rgb(153, 153, 153); margin: 1em 0px; padding: 12px 10px; background: rgb(244, 245, 246); border: 1px solid rgb(232, 232, 232); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">
public
interface
Result
extends
CompletionStage
{
}
</pre>
2. 让 Result 实例持有 Future 实例,与 1 的区别即是设计中选用“继承”还是“组合”。
<pre style="-webkit-tap-highlight-color: transparent; box-sizing: border-box; font-family: Consolas, Menlo, Courier, monospace; font-size: 16px; white-space: pre-wrap; position: relative; line-height: 1.5; color: rgb(153, 153, 153); margin: 1em 0px; padding: 12px 10px; background: rgb(244, 245, 246); border: 1px solid rgb(232, 232, 232); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">
public
class
AsyncRpcResult
implements
Result
{
private
CompletableFuture
<
RpcResult
resultFuture;
}
</pre>
同时,为了让 Result 更直观的体现其异步结果的特性,也为了方便面向 Result 接口编程,我们可以考虑为Result增加一些异步接口:
<pre style="-webkit-tap-highlight-color: transparent; box-sizing: border-box; font-family: Consolas, Menlo, Courier, monospace; font-size: 16px; white-space: pre-wrap; position: relative; line-height: 1.5; color: rgb(153, 153, 153); margin: 1em 0px; padding: 12px 10px; background: rgb(244, 245, 246); border: 1px solid rgb(232, 232, 232); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">
public
interface
Result
extends
Serializable
{
Result
thenApplyWithContext(
Function
<
Result
,
Result
fn);
<U>
CompletableFuture
<U> thenApply(
Function
<
Result
, ?
extends
U> fn);
Result
get()
throws
InterruptedException
,
ExecutionException
;
}
</pre>
Filter SPI
Filter 是 Dubbo 预置的拦截器扩展 SPI,用来做请求的预处理、结果的后处理,框架本身内置了一些拦截器实现,而从用户层面,我相信这个 SPI 也应该是被扩展最多的一个。在 3.0 版本中,Filter 回归单一职责的设计模式,将回调接口单独提取到 Listener 中。
<pre style="-webkit-tap-highlight-color: transparent; box-sizing: border-box; font-family: Consolas, Menlo, Courier, monospace; font-size: 16px; white-space: pre-wrap; position: relative; line-height: 1.5; color: rgb(153, 153, 153); margin: 1em 0px; padding: 12px 10px; background: rgb(244, 245, 246); border: 1px solid rgb(232, 232, 232); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">
@SPI
public
interface
Filter
{
Result
invoke(
Invoker
<?> invoker,
Invocation
invocation)
throws
RpcException
;
interface
Listener
{
void
onResponse(
Result
result,
Invoker
<?> invoker,
Invocation
invocation);
void
onError(
Throwable
t,
Invoker
<?> invoker,
Invocation
invocation);
}
}
</pre>
以上是 Filter 的 SPI 定义,Filter 的核心定义中只有一个 invoke() 方法用来传递调用请求。
同时,增加了一个新的回调接口 Listener,每个 Filter 实现可以定义自己的 Listenr 回调器,从而实现对返回结果的异步监听,参考以下是为 MonitorFilter 增加的 Listener 回调实现:
<pre style="-webkit-tap-highlight-color: transparent; box-sizing: border-box; font-family: Consolas, Menlo, Courier, monospace; font-size: 16px; white-space: pre-wrap; position: relative; line-height: 1.5; color: rgb(153, 153, 153); margin: 1em 0px; padding: 12px 10px; background: rgb(244, 245, 246); border: 1px solid rgb(232, 232, 232); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">
class
MonitorListener
implements
Listener
{
@Override
public
void
onResponse(
Result
result,
Invoker
<?> invoker,
Invocation
invocation) {
if
(invoker.getUrl().hasParameter(
Constants
.MONITOR_KEY)) {
collect(invoker, invocation, result,
RpcContext
.getContext().getRemoteHost(),
Long
.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)),
false
);
getConcurrent(invoker, invocation).decrementAndGet();
// count down
}
}
@Override
public
void
onError(
Throwable
t,
Invoker
<?> invoker,
Invocation
invocation) {
if
(invoker.getUrl().hasParameter(
Constants
.MONITOR_KEY)) {
collect(invoker, invocation,
null
,
RpcContext
.getContext().getRemoteHost(),
Long
.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)),
true
);
getConcurrent(invoker, invocation).decrementAndGet();
// count down
}
}
}
</pre>
泛化调用异步接口支持
为了更直观的做异步调用,泛化接口新增了 CompletableFuture<Object>$invokeAsync(Stringmethod,String[]parameterTypes,Object[]args)接口:
<pre style="-webkit-tap-highlight-color: transparent; box-sizing: border-box; font-family: Consolas, Menlo, Courier, monospace; font-size: 16px; white-space: pre-wrap; position: relative; line-height: 1.5; color: rgb(153, 153, 153); margin: 1em 0px; padding: 12px 10px; background: rgb(244, 245, 246); border: 1px solid rgb(232, 232, 232); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">
public
interface
GenericService
{
/**
-
Generic invocation
-
@param method Method name, e.g. findPerson. If there are overridden methods, parameter info is
-
required, e.g. findPerson(java.lang.String)
-
@param parameterTypes Parameter types
-
@param args Arguments
-
@return invocation return value
-
@throws GenericException potential exception thrown from the invocation
*/
Object
$invoke(
String
method,
String
[] parameterTypes,
Object
[] args)
throws
GenericException
;
default
CompletableFuture
<
Object
$invokeAsync(
String
method,
String
[] parameterTypes,
Object
[] args)
throws
GenericException
{
Object
object = $invoke(method, parameterTypes, args);
if
(object
instanceof
CompletableFuture
) {
return
(
CompletableFuture
<
Object
) object;
}
return
CompletableFuture
.completedFuture(object);
}
}
</pre>
这样,当我们想做异步调用时,就可以直接这样使用:
<pre style="-webkit-tap-highlight-color: transparent; box-sizing: border-box; font-family: Consolas, Menlo, Courier, monospace; font-size: 16px; white-space: pre-wrap; position: relative; line-height: 1.5; color: rgb(153, 153, 153); margin: 1em 0px; padding: 12px 10px; background: rgb(244, 245, 246); border: 1px solid rgb(232, 232, 232); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">
CompletableFuture
<
Object
genericService.$invokeAsync(method, parameterTypes, args);
</pre>
更具体用例请参见《泛化调用示例》
异步与性能
组要注意的是,框架内部的异步实现本身并不能提高单次调用的性能,相反,由于线程切换和回调逻辑的存在,异步反而可能会导致单次调用性能的下降,但是异步带来的优势是能减少对资源的占用,提升整个系统的并发程度和吞吐量,这点对于 RPC 这种需要处理网络延迟的场景非常适用。更多关于异步化设计的好处,请参考其他异步化原理介绍相关文章。
响应式编程支持
响应式编程让开发者更方便地编写高性能的异步代码,很可惜,在之前很长一段时间里,dubbo 并不支持响应式编程,简单来说,dubbo 不支持在 rpc 调用时使用 Mono/Flux 这种流对象(reative-stream 里流的概念),给用户使用带来了不便。(关于响应式编程更详细的信息请参见这里:http://reactivex.io/)。
RSocket 是一个开源的支持 reactive-stream 语义的网络通信协议,他将 reative 语义的复杂逻辑封装起来了,使得上层可以方便实现网络程序。(RSocket详细资料请参见这里:http://rsocket.io/)。
dubbo 在 3.0.0-SNAPSHOT 版本里基于 RSocket 对响应式编程进行了简单的支持,用户可以在请求参数和返回值里使用 Mono 和 Flux 类型的对象。下面我们给出使用范例,(范例源码可以在这里获取:https://github.com/apache/incubator-dubbo-samples/tree/3.x/dubbo-samples-rsocket)。
首先定义接口如下:
<pre style="-webkit-tap-highlight-color: transparent; box-sizing: border-box; font-family: Consolas, Menlo, Courier, monospace; font-size: 16px; white-space: pre-wrap; position: relative; line-height: 1.5; color: rgb(153, 153, 153); margin: 1em 0px; padding: 12px 10px; background: rgb(244, 245, 246); border: 1px solid rgb(232, 232, 232); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">
public
interface
DemoService
{
Mono
<
String
requestMonoWithMonoArg(
Mono
<
String
m1,
Mono
<
String
m2);
Flux
<
String
requestFluxWithFluxArg(
Flux
<
String
f1,
Flux
<
String
f2);
}
</pre>
然后实现该 demo 接口:
<pre style="-webkit-tap-highlight-color: transparent; box-sizing: border-box; font-family: Consolas, Menlo, Courier, monospace; font-size: 16px; white-space: pre-wrap; position: relative; line-height: 1.5; color: rgb(153, 153, 153); margin: 1em 0px; padding: 12px 10px; background: rgb(244, 245, 246); border: 1px solid rgb(232, 232, 232); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">
public
class
DemoServiceImpl
implements
DemoService
{
@Override
public
Mono
<
String
requestMonoWithMonoArg(
Mono
<
String
m1,
Mono
<
String
m2) {
return
m1.zipWith(m2,
new
BiFunction
<
String
,
String
,
String
() {
@Override
public
String
apply(
String
s,
String
s2) {
return
s+
" "
+s2;
}
});
}
@Override
public
Flux
<
String
requestFluxWithFluxArg(
Flux
<
String
f1,
Flux
<
String
f2) {
return
f1.zipWith(f2,
new
BiFunction
<
String
,
String
,
String
() {
@Override
public
String
apply(
String
s,
String
s2) {
return
s+
" "
+s2;
}
});
}
}
</pre>
然后配置并启动服务端,注意协议名字填写 rsocket:
<pre style="-webkit-tap-highlight-color: transparent; box-sizing: border-box; font-family: Consolas, Menlo, Courier, monospace; font-size: 16px; white-space: pre-wrap; position: relative; line-height: 1.5; color: rgb(153, 153, 153); margin: 1em 0px; padding: 12px 10px; background: rgb(244, 245, 246); border: 1px solid rgb(232, 232, 232); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">
<beans
xmlns:xsi
"http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo
"http://dubbo.apache.org/schema/dubbo"
xmlns
"http://www.springframework.org/schema/beans"
xsi:schemaLocation
"http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsd
http://dubbo.apache.org/schema/dubbohttp://dubbo.apache.org/schema/dubbo/dubbo.xsd"
<dubbo:application
name
"demo-provider"
/>
<dubbo:registry
address
"zookeeper://127.0.0.1:2181"
/>
<dubbo:protocol
name
"rsocket"
port
"20890"
/>
<bean
id
"demoService"
class
"org.apache.dubbo.samples.basic.impl.DemoServiceImpl"
/>
<dubbo:service
interface
"org.apache.dubbo.samples.basic.api.DemoService"
ref
"demoService"
/>
</beans>
public
class
RsocketProvider
{
public
static
void
main(
String
[] args)
throws
Exception
{
new
EmbeddedZooKeeper
(
2181
,
false
).start();
ClassPathXmlApplicationContext
context =
new
ClassPathXmlApplicationContext
(
new
String
[]{
"spring/rsocket-provider.xml"
});
context.start();
System
.in.read();
// press any key to exit
}
}
</pre>
然后配置并启动消费者消费者如下, 注意协议名填写 rsocket:
<pre style="-webkit-tap-highlight-color: transparent; box-sizing: border-box; font-family: Consolas, Menlo, Courier, monospace; font-size: 16px; white-space: pre-wrap; position: relative; line-height: 1.5; color: rgb(153, 153, 153); margin: 1em 0px; padding: 12px 10px; background: rgb(244, 245, 246); border: 1px solid rgb(232, 232, 232); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">
<beans
xmlns:xsi
"http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo
"http://dubbo.apache.org/schema/dubbo"
xmlns
"http://www.springframework.org/schema/beans"
xsi:schemaLocation
"http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsd
http://dubbo.apache.org/schema/dubbohttp://dubbo.apache.org/schema/dubbo/dubbo.xsd"
<dubbo:application
name
"demo-consumer"
/>
<dubbo:registry
address
"zookeeper://127.0.0.1:2181"
/>
<dubbo:reference
id
"demoService"
check
"true"
interface
"org.apache.dubbo.samples.basic.api.DemoService"
/>
</beans>
public
class
RsocketConsumer
{
public
static
void
main(
String
[] args) {
ClassPathXmlApplicationContext
context =
new
ClassPathXmlApplicationContext
(
new
String
[]{
"spring/rsocket-consumer.xml"
});
context.start();
DemoService
demoService = (
DemoService
) context.getBean(
"demoService"
);
// get remote service proxy
while
(
true
) {
try
{
Mono
<
String
monoResult = demoService.requestMonoWithMonoArg(
Mono
.just(
"A"
),
Mono
.just(
"B"
));
monoResult.doOnNext(
new
Consumer
<
String
() {
@Override
public
void
accept(
String
s) {
System
.out.println(s);
}
}).block();
Flux
<
String
fluxResult = demoService.requestFluxWithFluxArg(
Flux
.just(
"A"
,
"B"
,
"C"
),
Flux
.just(
"1"
,
"2"
,
"3"
));
fluxResult.doOnNext(
new
Consumer
<
String
() {
@Override
public
void
accept(
String
s) {
System
.out.println(s);
}
}).blockLast();
}
catch
(
Throwable
throwable) {
throwable.printStackTrace();
}
}
}
}
</pre>
可以看到配置上除了协议名使用 rsocket 以外其他并没有特殊之处。
实现原理
以前用户并不能在参数或者返回值里使用 Mono/Flux 这种流对象(reative-stream 里的流的概念)。因为流对象自带异步属性,当业务把流对象作为参数或者返回值传递给框架之后,框架并不能将流对象正确的进行序列化。
dubbo 基于 RSocket 实现了 reative 支持。RSocket 将 reative 语义的复杂逻辑封装起来了,给上层提供了简洁的抽象如下:
<pre style="-webkit-tap-highlight-color: transparent; box-sizing: border-box; font-family: Consolas, Menlo, Courier, monospace; font-size: 16px; white-space: pre-wrap; position: relative; line-height: 1.5; color: rgb(153, 153, 153); margin: 1em 0px; padding: 12px 10px; background: rgb(244, 245, 246); border: 1px solid rgb(232, 232, 232); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">
/**
-
Fire and Forget interaction model of {@code RSocket}.
-
@param payload Request payload.
-
@return {@code Publisher} that completes when the passed {@code payload} is successfully
-
handled, otherwise errors.
*/
Mono
<
Void
fireAndForget(
Payload
payload);
/**
-
Request-Response interaction model of {@code RSocket}.
-
@param payload Request payload.
-
@return {@code Publisher} containing at most a single {@code Payload} representing the
-
response.
*/
Mono
<
Payload
requestResponse(
Payload
payload);
/**
-
Request-Stream interaction model of {@code RSocket}.
-
@param payload Request payload.
-
@return {@code Publisher} containing the stream of {@code Payload}s representing the response.
*/
Flux
<
Payload
requestStream(
Payload
payload);
/**
-
Request-Channel interaction model of {@code RSocket}.
-
@param payloads Stream of request payloads.
-
@return Stream of response payloads.
*/
Flux
<
Payload
requestChannel(
Publisher
<
Payload
payloads);
</pre>
我们只需要在此基础上添加我们的 rpc 逻辑即可。
- 从客户端视角看,框架建立连接之后,只需要将请求信息编码到 Payload 里,然后通过 requestStream 方法即可向服务端发起请求。
- 从服务端视角看,rsocket 收到请求之后,会调用我们实现的 requestStream 方法,我们从 Payload 里解码得到请求信息之后,调用业务方法,然后拿到 Flux 类型的返回值即可。
- 需要注意的是业务返回值一般是 Flux,而 RSocket 要求的是 Flux,所以我们需要通过 map operator 拦截业务数据,将 BizDO 编码为 Payload 才可以递交给我 RSocket。而 RSocket 会负责数据的传输和 reative 语义的实现。
经过上面的分析,我们知道了 Dubbo 如何基于 RSocket 实现了响应式编程的支持。有了响应式编程支持,业务可以更加方便的实现异步逻辑。
小结
当前 Dubbo 3.0 将提供具备当代特性(如响应性编程)的相关支持,同时汲取阿里内部 HSF 的设计长处来实现两者的融合,当前预览版的很多地方还在探讨中,希望大家能够积极反馈,我们都会虚心学习并参考。
网友评论