在 Hystrix Command 执行过程(开始执行、结束执行、异常、超时)时会不断发出各类事件,通过收集这些数据,提供给消费者。如断路器、Hystrix Dashboard可以统计分析这些数据,从而完成特定的功能。
本文学习了 Metrics 收集的源码,并整理成下图。由于 Hystrix 发出的事件种类很多,本文仅以命令结束执行事件作为学习实例。
Subject简述
Hystrix 基于 RxJava,本文涉及到 Subject 概念,这里提一下 rx.subjects.Subject。
public abstract class Subject<T, R> extends Observable<R> implements Observer<T> {}
Subject 继承Observable,因此可作为被观察者、数据源,也就是一个数据发射器;
实现了接口 Observer,因此可作为观察者,可以订阅其他Observable,处理Observable发射出的数据。
因此,Subject既可以发射数据,也可以接收数据。类比于菜鸟驿站,可以收、发快递。
Metrics 收集流程
整个过程分成以下三步:
1.使用HystrixCommandMetrics记录metrics
每个Command的构造器中会获取一个HystrixCommandMetrics工具,用来记录metrics。
// 构造器利用HystrixCommandMetrics获取命令key对应的对象
HystrixCommandMetrics.getInstance(commandKey, groupKey, threadPoolKey, properties);
// HystrixCommandMetrics 中存储HystrixCommandMetrics的数据结构
private static final ConcurrentHashMap<String, HystrixCommandMetrics> metrics;
也就是说,每个CommandKey会拥有一个对应的HystrixCommandMetrics工具。
例如:A服务利用Feign远程调用B服务,那下面的 service-B 会自动作为命令的key。
@FeignClient(name = "service-B")
下面是利用HystrixCommandMetrics工具发射 标记命令结束 的事件代码:
void markCommandDone(...) {
HystrixThreadEventStream.getInstance().executionDone(...);
}
2.Per-Thread 事件处理者
HystrixCommandMetrics提供了基础工具方法给Command使用,而HystrixCommandMetrics的实际使用的是HystrixThreadEventStream: Per-thread event stream。
它是线程级别的数据处理者,每个线程拥有自己的HystrixThreadEventStream,HystrixThreadEventStream.getInstance() 是从ThreadLocal中获取对象。
它包含了很多Subject<事件,事件>,用来接收和发射数据。下面是HystrixThreadEventStream类:
public class HystrixThreadEventStream {
// Per-thread 的HystrixThreadEventStream
private static final ThreadLocal<HystrixThreadEventStream> threadLocalStreams;
// 用来接收和发射HystrixCommandCompletion事件的Subject
private final Subject<HystrixCommandCompletion, HystrixCommandCompletion> writeOnlyCommandCompletionSubject;
}
HystrixCommandCompletion是事件(HystrixCommandEvent)的一种,writeOnlyCommandCompletionSubject这个Subject的初始化方式如下:
// 创建为一个数据发射器
writeOnlyCommandCompletionSubject = PublishSubject.create();
writeOnlyCommandCompletionSubject
.onBackpressureBuffer()
// 绑定发射数据时的处理者
.doOnNext(writeCommandCompletionsToShardedStreams)
.unsafeSubscribe(Subscribers.empty());
writeCommandCompletionsToShardedStreams会怎么处理数据呢?下面是它的定义:
// 它是一个可执行的实体,没有返回值,可以传入一个参数; 和 Runnable很像
private static final Action1<HystrixCommandCompletion> writeCommandCompletionsToShardedStreams = new Action1<HystrixCommandCompletion>() {
// 当接收到数据时, 又将数据发送给了command级别的处理者
@Override
public void call(HystrixCommandCompletion commandCompletion) {
// 获取CommandKey对应的HystrixCommandCompletionStream
HystrixCommandCompletionStream commandStream = HystrixCommandCompletionStream.getInstance(commandCompletion.getCommandKey());
// 写入数据
commandStream.write(commandCompletion);
...
}
};
现在再回过来看HystrixThreadEventStream这个Per-thread的工具发射 标记命令结束事件 的代码:
public void executionDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey) {
// 构建命令结束的数据对象
HystrixCommandCompletion event = HystrixCommandCompletion.from(executionResult, commandKey, threadPoolKey);
// 利用上面的Subject发射数据, onNext()就是发射一条数据。
writeOnlyCommandCompletionSubject.onNext(event);
}
由于writeOnlyCommandCompletionSubject绑定了数据处理者(上面的writeCommandCompletionsToShardedStreams这个Action1)。它会利用command级别的工具来发射数据。
3.Per-Command 事件处理者
通过上一步知道,每个线程有自己的工具(HystrixThreadEventStream)来处理数据,最终这个工具利用了命令级别的工具。上面的HystrixCommandCompletionStream 属于 HystrixEventStream 的一种,HystrixEventStream专门用于处理command级别的数据,它有如下几个子类:
HystrixCommandCompletionStream
HystrixCommandStartStream
HystrixThreadPoolCompletionStream
HystrixThreadPoolStartStream
HystrixCollapserEventStream
这几个子类都是用来处理特定类型事件的工具,以HystrixCommandCompletionStream为例子,这些子类的结构都很类似,可以接收数据,并将数据提供给其他消费者。
public class HystrixCommandCompletionStream {
// 一个用于接收和发射结束事件的Subject
private final Subject<HystrixCommandCompletion, HystrixCommandCompletion> writeOnlySubject;
// 一个Observable,将接收到的数据作为数据源发射给其他消费者
private final Observable<HystrixCommandCompletion> readOnlyStream;
}
先看看这个Per-Command 的对象是怎么创建的?
// 存储结构
private static final ConcurrentMap<String, HystrixCommandCompletionStream> streams = new ConcurrentHashMap<String, HystrixCommandCompletionStream>();
// 单例模式拿到HystrixCommandCompletionStream,以命令的key为索引存储在ConcurrentMap中
public static HystrixCommandCompletionStream getInstance(HystrixCommandKey commandKey) {
HystrixCommandCompletionStream initialStream = streams.get(commandKey.name());
if (initialStream != null) {
return initialStream;
} else {
synchronized (HystrixCommandCompletionStream.class) {
...
}
}
}
下面是它的构造函数:
HystrixCommandCompletionStream(final HystrixCommandKey commandKey) {
this.commandKey = commandKey;
// 创建可以发射数据的Subject
this.writeOnlySubject = new SerializedSubject<HystrixCommandCompletion, HystrixCommandCompletion>(PublishSubject.<HystrixCommandCompletion>create());
// readOnlyStream是一个Observable, share()方法可以将上面Subject发射的数据全部广播给readOnlyStream,相当于拷贝了一份一模一样的数据
this.readOnlyStream = writeOnlySubject.share();
}
这个类提供了很重要的两个方法:
// 提供了接收数据的方法,其他工具(如HystrixThreadEventStream)可以将数据写进来
public void write(HystrixCommandCompletion event) {
writeOnlySubject.onNext(event);
}
// 实现HystrixEventStream的observe(方法), 其他消费者可以利用observe()拿到这个数据源,然后订阅它,处理它发射的所有数据
@Override
public Observable<HystrixCommandCompletion> observe() {
return readOnlyStream;
}
小结
通过上面三步,数据流向就很清楚了:
- Command直接使用HystrixCommandMetrics来记录命令开始、结束等事件
- HystrixCommandMetrics利用线程级别的HystrixThreadEventStream的来接收数据
- HystrixThreadEventStream完成各种事件的封装(如将结束事件封装成HystrixCommandCompletion),再利用command级别的HystrixEventStream来接收数据(HystrixEventStream有不同的子类来处理不同的事件)
- 最终消费者通过HystrixEventStream的observe()方法,拿到这个数据源,然后订阅它,从而源源不断的拿到Command发射出的各种数据
谁在最终消费数据?
通过上述步骤,将Hystrix Command执行过程的各种信息转化成了特定数据结构的事件,然后提供了一个Observable作为数据源。如果需要使用这些数据,各观察者只需要订阅Observable就可以拿到这些已经分门别类且结构化的数据了。
例如:断路器就是利用这些信息,然后统计分析数据,最终提供断路器的功能。
网友评论