美文网首页
用 RSocket 解决响应式服务之间的通讯-Part 1

用 RSocket 解决响应式服务之间的通讯-Part 1

作者: 涤生YQ | 来源:发表于2019-10-04 10:53 被阅读0次

用 RSocket 解决响应式服务之间的通讯-Part 1

作者:Rafał Kowalski,原文 Reactive Service-to-Service Communication With RSocket

本文是《用 RSocket 解决响应式服务之间的通讯》微型系列的第一篇文章,它将帮助你熟悉 RSocket——一种可能会彻底改变机器之间通讯的新二进制协议。在以下各段中,我们首先讨论当前分布式系统的问题,然后说明如何使用 RSocket 解决这些问题。本文聚焦于微服务之间的通信与 RSocket 交互模型。

分布式系统中的通讯问题

确实,微服务无处不在。从部署和维护非常麻烦的单体应用程序到完全分布式、微型、可扩展的微服务,我们经历了漫长的过程。微服务架构设计有很多好处。但是,它也有缺点。首先,为了向客户交付最终产品,服务之间必须交换大量数据。在单体应用程序中这不是问题,因为它整个通信都在单个 JVM 进程中进行。而在“微服务架构”中,部署在单独的容器中服务需要通过内部或外部网络进行通信。此时,“网络”是一等公民。如果在云上运行应用程序,事情将变得更加复杂。在这种情况下,网络问题和延迟增加将是不可避免的事情。与其尝试解决网络问题,不如设计具有弹性的体系结构,让其即使在网络抖动的情况下也能完全正常运行,这样岂不是更好。

我们来更深入地研究下微服务、数据、通信和云的概念。试想一下,对于一般的企业级系统,外部可以通过网站和移动 App 访问,或者通过小型外部设备(如家用加热控制器)与其进行交互。这些系统都是由多个微服务组成,这些微服务大多数是用 Java 编写的,其中一小部分是 Python 和 node.js 实现的组件,另外,为了确保整个系统高度可用,所有服务之间的传输数据都需要跨多个可用区进行复制备份。

IaaS 层是不可控的,为了改善开发人员体验,一般需要将应用程序运行在 PaaS 平台之上。对于 PaaS 平台,我们可以选择:Cloud FoundryKubernetes 或两者结合使用都是可行的。在服务之间的通信方面,设计比较简单,每个组件都暴露普通的 REST API,如下图所示。

System architecture

乍一看,组件都被分散在云中运行,这样的体系结构看起来还不错。额,它可能出什么问题?主要有两个问题:它们都与通讯有关。

第一个问题HTTP 的请求/响应交互模型。尽管使用 HTTP 的案例有很多,但它并不是为机器之间的通信而设计的。微服务在不关心操作结果的情况下将某些数据发送到另一个组件是很常见的(即发即弃),或者在数据可用时自动流传输数据(数据流)。使用 HTTP 请求/响应交互模型难以用优雅、有效的方式实现这些交互模式。例如,在使用请求/响应交互模型时,执行简单的即发即弃操作也会产生副作用,会出现即使客户端对处理响应不感兴趣,服务器也必须将响应发送回客户端的问题。

第二个问题性能。假设我们的系统被客户大量使用,流量增加了,并且我们注意到我们正在努力处理每秒数百个请求。借助容器和云,我们可以轻松扩展我们的服务;但是,如果我们关注下资源消耗的情况,则会发现一些问题。例如,当机器内存会出现不足时,可能 VM 的 CPU 还几乎处于空闲状态。这个问题主要来自于使用 HTTP 1.x 协议通常处理每个请求需要一个线程,致使每个请求都存在堆栈内存。在这种情况下,我们可以利用反应式编程模型和非阻塞 IO。它将在在不增加延迟的情况下大大减少内存使用量。 HTTP 1.x 是基于文本的协议,因此与二进制协议相比,需要传输的数据大小显著增大。

在机器之间的通信中,我们不应将自己局限于 HTTP(尤其是 1.x 版本,请求/响应交互模型以及性能低下)。在市场上还有许多更合适、更强大的解决方案。例如,基于 RabbitMQ、gRPC 或者 HTTP 2(支持多路复用和二进制化负载)进行信息传输在性能和效率方面会比纯 HTTP 1.x 更好。

System architecture

在给定场景下,使用多种协议可以使我们最有效、最合适地连接微服务;但是,采用多种协议迫使我们一次又一次地重新发明轮子,另外,为了保证保证通讯的安全性,我们不得不用安全性相关的额外信息来丰富我们的数据;并且还需要创建多个适配器来处理协议之间的转换。在某些情况下,数据传输可能需要依赖外部资源(代理、服务等),这些服务必须高度可用。因此,尽管我们所需要的只是基于消息的简单“即发即弃”操作,但 HTTP 请求/响应交互模型由于其性能比较差,产生额外的资源会带来额外的成本。此外,多种不同的协议可能会引入与应用程序治理相关的严重问题,尤其是如果我们的系统包含数百个微服务时。

上面提到的两个核心问题是推出 RSocket 的原因,同时也是它可能彻底改变云通讯的原因。通过其反应式和内置的强大交互模型,RSocket可以应用于各种业务场景中,并可能最终统一我们在分布式系统中使用的通信模式。

RSocket 如何解决

RSocket 是一种新的、消息驱动的二进制协议,它规范了云中的通讯方式。它有助于以一致的方式解决常见的应用程序问题,并且它支持多种语言(例如 Java、JS、Python)和传输协议(TCP、WebSocket、Aeron)。在下面的部分中,我们将深入探讨协议内部实现并讨论交互模型。

基于帧和消息驱动

RSocket 中的传输的信息可以分解为一个个的帧。每个帧都包含一个帧头,其中包含流 ID、帧类型定义和特定于该帧类型的其他数据。帧头部后紧跟着元数据和有效负载(这些部分承载用户指定的数据)。

RSocket Frame

有多种类型的帧,它们表示不同的行为和交互模型的可用方法。我们将不讨论相关所有内容,因为它们的详细内容在官方文档中已有描述。不过,值得关注的信息可能不多,其中比较重要的有:客户端在通信开始时需要给服务器发送“设置帧”——该“设置帧”在连接初始化期间可以自定义,自定义的内容包括添加自己的安全规则或所需的其他信息。应当注意,在建立连接之后,RSocket 不会区分客户端和服务端。每一侧都可以开始将数据发送到另一侧(这使协议几乎完全对称)。

性能

帧作为“字节流”发送。它使 RSocket 方式比典型的基于文本的协议更有效。从开发人员的角度来看,通过 JSON 格式在网络中传输数据时,调试系统更容易,但是它对性能是有影响的。RSocket 的协议不强加任何特定的序列化/反序列化机制,而是将帧视为可以转换为任何东西的一串比特。这样就可以使用 JSON 序列化或更高效的其他方案,如 Protobuf 或 AVRO。

影响 RSocket 性能的第二个因素是“多路复用”。该协议在单个物理连接上创建“逻辑流”(通道)。每个流都有其唯一的 ID,在某种程度上,可以将其理解为类似消息系统的消息队列。这种设计解决了 HTTP 1.x 版本中已知的主要问题(每个请求模型独占连接以及“流水线”的性能较弱)。此外,RSocket 原生支持大型有效负载的传输。在这种情况下,“有效载荷的帧”会被分成带有额外标志的多个帧(给定片段的序号)。

反应式和流量控制

RSocket 协议完全包含《反应式宣言》中所述的原则。它在异步特性和从某种意义上资源节约帮助用户减少了所经历的延迟以及基础设施的成本。由于流式传输,我们无需将数据从一个服务拉到另一个服务,而是在数据可用时将其推送到相关服务。这是一个非常强大的机制,但它也可能具有风险。让我们考虑一个简单的场景:在我们的系统中,我们将事件从服务 A 传输到服务 B。在接收方 B 执行的操作很简单,但需要一定的计算时间。如果服务 A 推送事件的速度快于 B 处理事件的速度,则最终 B 将耗尽资源(发送方将终止接收方)。RSocket 是 Reactor 模式的,它内置了对“流控制”的支持,这有助于避免这种情况。

通过“背压机制”的实现,我们可以轻松根据需要进行调整。接收方可以指定要消费多少数据,而不会收到更多数据,除非它通知发送方准备处理更多数据。另一方面,为了限制来自请求者的传入帧数,RSocket 实现了一种“租约机制”。响应者可以指定请求者可以在定义的时间范围内发送多少个请求。

RSocket 接口

如上所述,RSocket 是 Reactor 模式的,因此在 API 级别上,我们主要在MonoFlux对象上进行操作。它也完全支持反应性信号–我们可以轻松地对不同事件(onNextonErroronClose等)执行“反应”。

以下各段将介绍 RSocket 的 API 可用的每个交互选项。下面介绍将以代码片段和所有示例的描述为根本。在讲解交互模型之前,有必要介绍一下API基础知识,因为它将在多个代码示例中提出。

  RSocketFactory.connect()
 .transport(TcpClientTransport.create(HOST, PORT))
 .start()
 .subscribe();
 RSocketFactory.receive()
 .acceptor(new HelloWorldSocketAcceptor())
 .transport(TcpServerTransport.create(HOST, PORT))
 .start()
 .subscribe();

对于响应方,我们必须创建一个套接字接受器实例。 SocketAcceptor 是提供两方之间契约的接口。它只有一个方法,该方法接受 RSocket 发送请求,并返回一个 RSocket 实例,该实例将用于处理来自对方的请求。除了提供契约外,SocketAcceptor 还使我们能够访问配置帧的内容。在 API 级别,它由 ConnectionSetupPayload 类型的参数来设置。

public interface SocketAcceptor {
 Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket);
}

如上所示,在双方之间建立连接是相对非常容易,特别是对于之前使用过 WebSocket 的同学来说,就 API 而言,两种解决方案都非常相似。

交互模型

建立连接后,我们可以继续了解其交互模型。 RSocket 支持以下操作:

Interaction Model

“即发即忘(fire and forget)”,或者“元数据推送(metadata push)”,旨在将数据从发送方推送到接收方。在这两种场景下,发送者都不在乎操作的结果(在 API 上它的返回类型是Mono)。这两者之间的区别在于帧的处理。“即发即忘”,将完整的帧发送到接收方,而对于元数据推送操作,该帧不具有有效负载-它仅含有头部和元数据。此类轻量级消息可用于将通知发送到点对点通信的 IoT 设备或者移动设备。 RSocket 还能够模仿 HTTP 行为。它支持请求 / 响应(request-response)场景,这可能是你使用 RSocket 主要交互类型。在流式场景中,此类操作可以表示为由单个对象组成的流。在这种情况下,客户端正在等待响应帧,但是它是以完全“非阻塞”的方式进行响应的。

更有趣的是,云平台上应用程序通常通过“请求流(request stream)”“请求通道(request channel)”的方式对数据流(通常是无限的)进行操作。在请求流方式下,请求方将单个帧发送到响应方,并获取数据流。这种交互方式使服务能够从“拉数据”切换为“推数据”策略。无需向响应者发送定期请求,请求方可以订阅流并对收到的数据做出反应(当数据可用时,它将自动到达)。

由于多路复用和双向数据传输的支持,我们未来可以使用“请求通道(request channel)”方式。RSocket 可以使用单个物理连接将数据从请求方传输到响应方,反之亦然。当请求方更新订阅时(如,更改订阅规则),这种交互方式可能很有用。如果没有双向通道,客户端将不得不取消流并使用新参数重新请求它。

在 API 中,交互模型的所有操作均由下面显示的 RSocket 接口的方法表示。

public interface RSocket extends Availability, Closeable {
 Mono<Void> fireAndForget(Payload payload);
 Mono<Payload> requestResponse(Payload payload);
 Flux<Payload> requestStream(Payload payload);
 Flux<Payload> requestChannel(Publisher<Payload> payloads);
 Mono<Void> metadataPush(Payload payload);
}

为了改善开发人员的体验并避免实现 RSocket 接口的每个接口方法,API 提供了我们可以扩展的抽象接口类AbstractRSocket。通过将 SocketAcceptorAbstractRSocket 放在一起,我们可以得到了服务器端的基本实现,如下:

@Slf4j
public class HelloWorldSocketAcceptor implements SocketAcceptor {
 @Override
 public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) {
 log.info("Received connection with setup payload: [{}] and meta-data: [{}]", setup.getDataUtf8(), setup.getMetadataUtf8());
 return Mono.just(new AbstractRSocket() {
 @Override
 public Mono<Void> fireAndForget(Payload payload) {
 log.info("Received 'fire-and-forget' request with payload: [{}]", payload.getDataUtf8());
 return Mono.empty();
 @Override
 public Mono<Payload> requestResponse(Payload payload) {
 log.info("Received 'request response' request with payload: [{}] ", payload.getDataUtf8());
 return Mono.just(DefaultPayload.create("Hello " + payload.getDataUtf8()));
 @Override
 public Flux<Payload> requestStream(Payload payload) {
 log.info("Received 'request stream' request with payload: [{}] ", payload.getDataUtf8());
 return Flux.interval(Duration.ofMillis(1000))
 .map(time -> DefaultPayload.create("Hello " + payload.getDataUtf8() + " @ " + Instant.now()));
 @Override
 public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
 return Flux.from(payloads)
 .doOnNext(payload -> {
 log.info("Received payload: [{}]", payload.getDataUtf8());
 })
 .map(payload -> DefaultPayload.create("Hello " + payload.getDataUtf8() + " @ " + Instant.now()))
 .subscribeOn(Schedulers.parallel());
 @Override
 public Mono<Void> metadataPush(Payload payload) {
 log.info("Received 'metadata push' request with metadata: [{}]", payload.getMetadataUtf8());
 return Mono.empty();
 }
 });
 }
}

在发送方,使用交互模型非常简单,我们需要做的就是在我们使用 RSocketFactory 创建的 RSocket 实例上调用特定方法,例如

socket.fireAndForget(DefaultPayload.create("Hello world!"));</pre>

有关 RSocket 交互模型中可用方法的更多示例,请访问 GitHub

发送方更有趣的是背压机制的实现。我们来看下发送方实现示例:

public class RequestStream {
 public static void main(String[] args) {
 RSocket socket = RSocketFactory.connect()
 .transport(TcpClientTransport.create(HOST, PORT))
 .start()
 .block();
 socket.requestStream(DefaultPayload.create("Jenny", "example-metadata"))
 .limitRequest(100)
 .subscribe(new BackPressureSubscriber());
 socket.dispose();
 }
 @Slf4j
 private static class BackPressureSubscriber implements Subscriber<Payload> {
 private static final Integer NUMBER_OF_REQUESTS_TO_PROCESS = 5;
 private Subscription subscription;
 int receivedItems;
 @Override
 public void onSubscribe(Subscription s) {
 this.subscription = s;
 subscription.request(NUMBER_OF_REQUESTS_TO_PROCESS);
 }
 @Override
 public void onNext(Payload payload) {
 receivedItems++;
 if (receivedItems % NUMBER_OF_REQUESTS_TO_PROCESS == 0) {
 log.info("Requesting next [{}] elements");
 subscription.request(NUMBER_OF_REQUESTS_TO_PROCESS);
 }
 }
 @Override
 public void onError(Throwable t) {
 log.error("Stream subscription error [{}]", t);
 }
 @Override
 public void onComplete() {
 log.info("Completing subscription");
 }
 }
}

在此示例中,我们正在请求数据流,但是为了确保返回的帧数据不会压垮请求方,我们采用了背压机制。为了实现这种机制,我们使用指定请求数量的方式,它在API级别上由subscription.request(n)方法反映出来。在订阅 onSubscribe(Subscription s) 方法的开始处,我们请求 5 个数据,然后在onNext(Payload payload)中统计接收到的数量。当所有预期帧都到达请求方时,我们再请求接下来的 5 个数据(再次使用subscription.request(n)方法)。下图显示了该订户的流程:

Backpressure Mechanism

本段介绍的背压机制的实现非常基础。在生产中,我们应提供一个基于更准确的统计指标的完善解决方案。例如,预测/平均消费时间等。毕竟,背压机制不会让响应方生产过剩的问题消失。它只是将问题转移到响应方,来更好地解决问题。有关背压的更多信息,请参见这里

总结

在本文中,我们讨论了微服务体系结构中的通信问题,以及如何通过 RSocket 解决这些问题。我们以一个简单的“ hello world”示例和基本的背压机制实现为背景,介绍了其 API 和交互模型。

在本系列的后续文章中,我们将介绍 RSocket 的更高级的特性,包括负载均衡和可恢复性,以及我们将讨论基于 RSocket 进行抽象,实现 RPC and Spring Reactor。


个人微信公共号,感兴趣的关注下,获取更多技术文章

涤生-微信公共号

相关文章

网友评论

      本文标题:用 RSocket 解决响应式服务之间的通讯-Part 1

      本文链接:https://www.haomeiwen.com/subject/mtojpctx.html