美文网首页Ovirt程序员
【Ovirt 笔记】Reactive Streams 的实现原理

【Ovirt 笔记】Reactive Streams 的实现原理

作者: 58bc06151329 | 来源:发表于2018-05-29 16:00 被阅读19次

文前说明

作为码农中的一员,需要不断的学习,我工作之余将一些分析总结和学习笔记写成博客与大家一起交流,也希望采用这种方式记录自己的学习之旅。

本文仅供学习交流使用,侵权必删。
不用于商业目的,转载请注明出处。

分析整理的版本为 Ovirt 4.2.3 版本。

1. 简介

1.1 流

  • 流是由生产者生产并由一个或多个消费者消费的元素(item)的序列。
    • 这种生产者 - 消费者模型也被称为 source/sink 模型或发布者 - 订阅者(Publisher-Subscriber )模型。
  • 在流的处理机制中,pull 模型和 push 模型最为常见。
    • push 模型中,发布者将元素推送给订阅者。
      • 采用 push 方式,可以尽可能快地将消息发送给消费者,但是若消费者的处理消息的能力较弱(一条消息长时间处理),发布者会不断地向订阅者发送消息,消费者的缓冲区可能会溢出。
    • pull 模型中,订阅者向发布者请求元素。
      • 采用 pull 方式,会增加消息的延迟,即消息到达消费者的时间变长。
    • 在理想的情况下,发布者和订阅者都以同样的速率工作。

1.1.1 发布订阅模式与观察者模式的区别

  • 在观察者模式中,观察者需要直接订阅目标事件。在目标发出内容改变的事件后,直接接收事件并作出响应。
╭─────────────╮  Fire Event  ╭──────────────╮
│               │────────────>│                 │
│   Subject     │             │   Observer      │
│               │<────────────│                 │
╰─────────────╯  Subscribe  ╰────────────────╯
  • 在发布订阅模式中,发布者和订阅者之间多了一个发布通道,一方面从发布者接收事件,另一方面向订阅者发布事件,订阅者需要从事件通道订阅事件,以此避免发布者和订阅者之间产生依赖关系。
╭─────────────╮                 ╭───────────────╮   Fire Event  ╭──────────────╮
│               │  Publish Event │                  │─────────────>│                │
│  Publisher    │───────────────>│ Event Channel    │              │  Subscriber    │
│               │                │                  │<─────────────│                │
╰─────────────╯                ╰─────────────────╯    Subscribe ╰──────────────╯

1.2 特殊情况处理

1.2.1 发布者与订阅者不按照同样的速率工作

  • 策略一,发布者比订阅者快,后者必须有一个无边界缓冲区来保存快速传入的元素或者丢弃无法处理的元素。
  • 策略二,使用背压(backpressure)策略,其中订阅者告诉发布者减慢速率并保存元素,直到订阅者做好准备。
    • 使用背压可确保更快的发布者不会压制较慢的订阅者。
    • 背压策略的几种实现方式。
      • 要求发布者拥有无限制的缓冲区,一直生成和保存元素。
      • 发布者可以实现有界缓冲区来保存有限数量的元素,如果缓冲区已满,可以选择放弃。
      • 发布者将发布元素重新发送到订阅者,这些元素发布时订阅者不能接受。

1.2.2 订阅者请求发布者的元素不可用

  • 发布者同步地向订阅者发送元素,并且订阅者同步处理它们,则发布者必须阻塞直到数据处理完成。
  • 两端进行异步处理,订阅者可以在从发布者请求元素之后继续处理其他任务。
    • 当更多的元素准备就绪时,发布者将它们异步发送给订阅者。

1.3 响应式流(Reactive Streams)

  • 响应式流从 2013 年开始,作为提供非阻塞背压的异步流处理标准的倡议,旨在解决处理元素流的问题。
    • 如何将元素流从发布者传递到订阅者,而不需要发布者阻塞,或订阅者有无限制的缓冲区或丢弃。
    • 响应式流模型非常简单。订阅者向发布者发送多个元素的异步请求。 发布者向订阅者异步发送多个或稍少的元素。
  • 响应式流在 pull 模型和 push 模型流处理机制之间动态切换。当订阅者较慢时,使用 pull 模型,当订阅者更快时使用 push 模型。
  • 响应式流的更多信息,可访问 http://www.reactive-streams.org/

2. 实现分析与整理

  • vdsm-jsonrpc-java-clientorg.reactivestreams 包中进行了以下接口的定义。
    • Publisher<T, S extends Subscriber<T>>
    • Subscriber<T>
    • Subscription
响应式流模型
  • 发布者(Publisher)是潜在的无限数量的有序元素的生产者。
    • 根据收到的要求向当前订阅者发布(或发送)元素。
  • 订阅者(Subscriber)从发布者那里订阅并接收元素。
    • 发布者向订阅者发送订阅令牌(Subscription)。
    • 使用订阅令牌,订阅者从发布者那里请求多个元素。
    • 当元素准备就绪时,发布者向订阅者发送多个或更少的元素。
  • 订阅者可以请求更多的元素。
  • 发布者可能有多个来自订阅者的待处理请求。

2.1 发布者与订阅者的交互

  • 发布者可以拥有零个或多个订阅者。

发布者与订阅者的交互交互步骤,以虚拟机迁移功能模块代码为例(发布者 EventPublisher 与订阅者 VmMigrationProgressMonitoring 的交互,订阅令牌为 Subscription)。

  1. 创建发布者和订阅者,它们分别是 PublisherSubscriber 接口的实例。
    • EventSubscriber 是订阅者的基类。
    • VmMigrationProgressMonitoring 是订阅者的其中一个子类。
    • EventPublisher 是发布者实现类。
  2. 订阅者通过调用发布者的 subscribe() 方法来尝试订阅(绑定)发布者。
@PostConstruct
private void subscribe() {
     resourceManager.subscribe(this);
}
public void subscribe(EventSubscriber subscriber) {
     log.debug("subscribe called with subscription id: {}", subscriber.getSubscriptionId());
     ReactorFactory.getWorker(this.parallelism).getPublisher().subscribe(subscriber);
}
  1. 订阅成功,发布者 EventPublisher 使用 Subscription 异步调用订阅者的 onSubscribe(Subscription s) 方法。
@Override
public void subscribe(final EventSubscriber subscriber) {
    final AtomicInteger count = new AtomicInteger();
    final SubscriptionHolder holder = new SubscriptionHolder(subscriber, count);
    Subscription subscription = new Subscription() {

            @Override
            public void request(int n) {
                count.addAndGet(n);
                process(holder);
            }

            @Override
            public void cancel() {
                clean(holder);
                subscriber.onComplete();
            }
     };
     subscriber.onSubscribe(subscription);
     this.matcher.add(holder);
}
  1. 订阅者可以通过调用 Subscriptionrequest(int n) 方法向发布者发送多个元素的请求。订阅者可以向发布者发送更多元素的多个请求,而不必等待其先前请求是否完成。
@Override
public void onSubscribe(Subscription sub) {
      subscription = sub;
      subscription.request(1);
}
  1. 发布者在所有先前的请求中调用订阅者的 onNext(T t) 方法,直到订阅者请求的元素数量上限,在每次调用中向订阅者发送一个元素(这里的元素为 Map<String, Object>)。
    • 如果发布者没有更多的元素要发送给订阅者,则发布者调用订阅者的 onComplete() 方法来发信号通知流,从而结束发布者与订阅者交互。
    • 如果订阅者请求 Long.MAX_VALUE 元素,则实际上是无限制的请求,并且流实际上是推送流。
    • 如果请求处理失败,则使用调用订阅者的 onError(Throwable t) 方法,并且发布者与订阅者交互结束。
Map<String, Object> map = this.decomposer.decompose(event);
......
subscriber.onNext(map);
if (map.containsKey(JsonRpcEvent.ERROR_KEY)) {
    subscriber.onError(new ClientConnectionException((String) map.get(JsonRpcEvent.ERROR_KEY)));
......
  1. 如果发布者随时遇到错误,会调用订阅者的 onError() 方法。
  2. 订阅者可以通过调用其 Subscriptioncancel() 方法来取消订阅。
    • 一旦订阅被取消,发布者与订阅者交互结束。然而,如果在请求取消之前存在未决请求,订阅者可以在取消订阅之后接收元素。
@Override
public void cancel() {
   clean(holder);
   subscriber.onComplete();
}

2.2 响应式流总结

用例的响应式流模型
  • 一旦在订阅者上调用了 onComplete()onError() 方法,订阅者就不再收到发布者的通知。
  • 发布者的 subscribe() 方法被调用之后,如果订阅者不取消其订阅,则保证以下订阅方法调用序列(正则表达式)。
    • 符号 * 表示零次或多次, ? 表示零次或一次。
onSubscribe onNext* (onError | onComplete)?
  • 在订阅者上的第一个方法调用是 onSubscribe() 方法,它是成功订阅(绑定)发布者的通知。
  • 订阅者的 onNext() 方法可以被调用零次或多次,每次调用指定元素发布。onComplete()
    onError() 方法可以被调用零次或一次来指示终止状态,只要订阅者不取消其订阅,就会调用这些方法。

3. 结合业务实现分析

  • 以虚拟机迁移为例,engine 与 vdsm 采用了 JSON-RPC 进行通信。
  • engine 下发迁移命令
public StatusOnlyReturn migrate(Map<String, Object> migrationInfo) {
    JsonRpcRequest request = new RequestBuilder("VM.migrate").withParameter("vmID", getVmId(migrationInfo)).withParameter("params", migrationInfo).build();
    Map<String, Object> response = new FutureMap(this.client, request);
    return new StatusOnlyReturn(response);
}
  • vdsm 收到命令后进行迁移处理,创建调度任务,向 engine 发送迁移进度。
def migrate(self, params):
        self._acquireCpuLockWithTimeout()
        try:
            # It is unlikely, but we could receive migrate()
            # request right after a VM was started or right
            # after a VM just went down
            if self._lastStatus in (vmstatus.WAIT_FOR_LAUNCH,
                                    vmstatus.DOWN):
                raise exception.NoSuchVM()
            if self.hasTransientDisks():
                return response.error('transientErr')
            self._migration_downtime = None
            self._migrationSourceThread = migration.SourceThread(
                self, **params)
            self._migrationSourceThread.start()
            self._migrationSourceThread.getStat()
            self.send_status_event()
            return self._migrationSourceThread.status
        finally:
            self._guestCpuLock.release()
def monitor_migration(self):
......
progress = Progress.from_job_stats(job_stats)
self._vm.send_migration_status_event()
def send_migration_status_event(self):
        migrate_status = self.migrateStatus()
        postcopy = self._post_copy == migration.PostCopyPhase.RUNNING
        status = {
            'progress': migrate_status['progress'],
            'postcopy': postcopy,
        }
        if 'downtime' in migrate_status:
            status['downtime'] = migrate_status['downtime']
        self._notify('VM_migration_status', status)
def _notify(self, operation, params):
        sub_id = '|virt|%s|%s' % (operation, self.id)
        self.cif.notify(sub_id, {self.id: params})
  • notify 方法是向 engine 发送迁移进度的通知。
def notify(self, event_id, params=None):
        """
        Send notification using provided subscription id as
        event_id and a dictionary as event body. Before sending
        there is notify_time added on top level to the dictionary.

        Please consult event-schema.yml in order to build an appropriate event.
        https://github.com/oVirt/vdsm/blob/master/lib/api/vdsm-events.yml

        Args:
            event_id (string): unique event name
            params (dict): event content
        """
        if not params:
            params = {}

        if not self.ready:
            self.log.warning('Not ready yet, ignoring event %r args=%r',
                             event_id, params)
            return

        json_binding = self.servers['jsonrpc']

        def _send_notification(message):
            json_binding.reactor.server.send(
                message, config.get('addresses', 'event_queue'))

        try:
            notification = Notification(event_id, _send_notification,
                                        json_binding.bridge.event_schema)
            notification.emit(params)
            self.log.debug("Sending notification %s with params %s ",
                           event_id, params)
        except KeyError:
            self.log.warning("Attempt to send an event when jsonrpc binding"
                             " not available")
class Notification(object):
    """
    Represents jsonrpc notification message. It builds proper jsonrpc
    notification and pass it a callback which is responsible for
    sending it.
    """
    log = logging.getLogger("jsonrpc.Notification")

    def __init__(self, event_id, cb, event_schema):
        self._event_id = event_id
        self._cb = cb
        self._event_schema = event_schema

    def emit(self, params):
        """
        emit method, builds notification message and sends it.

        Args:
            params(dict): event content

        Returns: None
        """
        self._add_notify_time(params)
        self._event_schema.verify_event_params(self._event_id, params)
        notification = json.dumps({'jsonrpc': '2.0',
                                   'method': self._event_id,
                                   'params': params})

        self.log.debug("Sending event %s", notification)
        self._cb(notification)

    def _add_notify_time(self, body):
        body['notify_time'] = int(monotonic_time() * 1000)
  • 向 engine 发送的通知。
{"jsonrpc": "2.0", "method": "|virt|VM_migration_status|<vmId>", "params": {<vmId>: {"progress": "0", "postcopy": "2"}}}
  • engine 的 ResponseWorker 类收到通知,进行解析。
if (id == null || NullNode.class.isInstance(id)) {
      JsonRpcEvent event = JsonRpcEvent.fromJsonNode(node);
      String method = client.getHostname() + event.getMethod();
      event.setMethod(method);
      if (log.isDebugEnabled()) {
          log.debug("Event arrived from " + client.getHostname() + " containing " + event.getParams());
      }
      processNotifications(event);
      return;
}
  • 组装的 JsonRpcEvent 对象。

    • method 为 " rhvh.cetc-cloud.com|virt|VM_migration_status|<vmId> "。
  • 解析

public void process(JsonRpcEvent event) {
        Set<SubscriptionHolder> holders = matcher.match(event);
        holders.stream()
                .peek(holder -> holder.putEvent(event))
                .filter(holder -> holder.canProcess())
                .forEach(holder -> this.executorService.submit(new EventCallable(holder, this.decomposer)));
}
  • VmMigrationProgressMonitoring 中定义 subscriptionId 为 " *|*|VM_migration_status|* "。
    • 初始化 VmMigrationProgressMonitoring 时,根据 subscriptionId 生成了 SubscriptionHolder 订阅处理人(与发布者 EventPublisher 进行一对一关联,保存至缓存中)。
    • 把 vdsm 发送的通知进行分析与比对,从缓存中确认最终的 SubscriptionHolder 订阅处理人(这个过程确认了之前 vdsm 发送的通知是针对 VM_migration_status 迁移状态的)。
public Set<SubscriptionHolder> match(JsonRpcEvent event) {
        String[] ids = parse(event.getMethod());
        Set<SubscriptionHolder> subscriptions = new HashSet<>();
        SubscriptionHolder holder = this.unique_id.get(ids[3]);
        if (holder != null) {
            subscriptions.add(holder);
        }
        Predicate predicate = new Predicate() {

            @Override
            public boolean apply(int one, int two) {
                return one == two;
            }
        };
        addHolders(subscriptions, this.operation, 2, ids, predicate);
        addHolders(subscriptions, this.component, 1, ids, predicate);
        addHolders(subscriptions, this.receiver, 0, ids, predicate);
        return subscriptions;
}
  • 通过 SubscriptionHolder 能够得到 EventPublisher
    • SubscriptionHolder 订阅处理人开始处理订阅的消息。
public Void call() throws Exception {
            Subscriber<Map<String, Object>> subscriber = this.holder.getSubscriber();
            JsonRpcEvent event = null;
            while ((event = this.holder.canProcessMore()) != null) {
                Map<String, Object> map = this.decomposer.decompose(event);
                if (map.containsKey(JsonRpcEvent.ERROR_KEY)) {
                    subscriber.onError(new ClientConnectionException((String) map.get(JsonRpcEvent.ERROR_KEY)));
                } else {
                    subscriber.onNext(map);
                }
            }
            return null;
}
  • 该过程就是解析订阅消息(通知中 params 属性)并处理的过程。
    • 将 {<vmId>: {"progress": "0", "postcopy": "2"}} 转为 Map 对象。
public Map<String, Object> decompose(JsonRpcEvent event) {
        try {
            return mapper.readValue(event.getParams(),
                    new TypeReference<Map<String, Object>>() {
                    });
        } catch (IOException e) {
            logException(log, "Event decomposition failed", e);
            return null;
        }
}
  • 之前发布者 EventPublisher 与订阅者 VmMigrationProgressMonitoring 已经通过订阅令牌 Subscription 进行了绑定。
    • 这里的发布者直接调用订阅者 VmMigrationProgressMonitoringonNext(T t) 方法。
    • 根据 vmId(虚拟机 ID)更新迁移的百分比。
@Override
    public void onNext(Map<String, Object> map) {
        try {
            map.remove(VdsProperties.notify_time);
            map.entrySet().forEach(vmInfo -> {
                Guid vmId = new Guid(vmInfo.getKey());
                Map<?, ?> properties = (Map<?, ?>) vmInfo.getValue();
                int progress = Integer.valueOf(properties.get(VdsProperties.vm_migration_progress).toString());
                VmStatistics vmStatistics = resourceManager.getVmManager(vmId).getStatistics();
                vmStatistics.setMigrationProgressPercent(progress);
                resourceManager.getEventListener().migrationProgressReported(vmId, progress);
                Integer actualDowntime = (Integer) properties.get(VdsProperties.MIGRATION_DOWNTIME);
                if (actualDowntime != null) {
                    resourceManager.getEventListener().actualDowntimeReported(vmId, actualDowntime);
                }
            });
        } finally {
            subscription.request(1);
        }
}
响应流执行过程

相关文章

网友评论

    本文标题:【Ovirt 笔记】Reactive Streams 的实现原理

    本文链接:https://www.haomeiwen.com/subject/vxxajftx.html