美文网首页
服务降级熔断 - Metrics

服务降级熔断 - Metrics

作者: 爱情小傻蛋 | 来源:发表于2019-10-23 14:52 被阅读0次

    在 Hystrix Command 执行过程(开始执行、结束执行、异常、超时)时会不断发出各类事件,通过收集这些数据,提供给消费者。如断路器、Hystrix Dashboard可以统计分析这些数据,从而完成特定的功能。

    本文学习了 Metrics 收集的源码,并整理成下图。由于 Hystrix 发出的事件种类很多,本文仅以命令结束执行事件作为学习实例。


    Subject简述

    Hystrix 基于 RxJava,本文涉及到 Subject 概念,这里提一下 rx.subjects.Subject。

    public abstract class Subject<T, R> extends Observable<R> implements Observer<T> {}
    

    Subject 继承Observable,因此可作为被观察者、数据源,也就是一个数据发射器;

    实现了接口 Observer,因此可作为观察者,可以订阅其他Observable,处理Observable发射出的数据。

    因此,Subject既可以发射数据,也可以接收数据。类比于菜鸟驿站,可以收、发快递。

    Metrics 收集流程

    整个过程分成以下三步:

    1.使用HystrixCommandMetrics记录metrics

    每个Command的构造器中会获取一个HystrixCommandMetrics工具,用来记录metrics。

    // 构造器利用HystrixCommandMetrics获取命令key对应的对象
    HystrixCommandMetrics.getInstance(commandKey, groupKey, threadPoolKey, properties);
    // HystrixCommandMetrics 中存储HystrixCommandMetrics的数据结构
    private static final ConcurrentHashMap<String, HystrixCommandMetrics> metrics;
    

    也就是说,每个CommandKey会拥有一个对应的HystrixCommandMetrics工具。

    例如:A服务利用Feign远程调用B服务,那下面的 service-B 会自动作为命令的key。

    @FeignClient(name = "service-B")
    

    下面是利用HystrixCommandMetrics工具发射 标记命令结束 的事件代码:

    void markCommandDone(...) {
        HystrixThreadEventStream.getInstance().executionDone(...);
    }
    
    2.Per-Thread 事件处理者

    HystrixCommandMetrics提供了基础工具方法给Command使用,而HystrixCommandMetrics的实际使用的是HystrixThreadEventStream: Per-thread event stream。

    它是线程级别的数据处理者,每个线程拥有自己的HystrixThreadEventStream,HystrixThreadEventStream.getInstance() 是从ThreadLocal中获取对象。

    它包含了很多Subject<事件,事件>,用来接收和发射数据。下面是HystrixThreadEventStream类:

    public class HystrixThreadEventStream {
        // Per-thread 的HystrixThreadEventStream
        private static final ThreadLocal<HystrixThreadEventStream> threadLocalStreams;
        // 用来接收和发射HystrixCommandCompletion事件的Subject
        private final Subject<HystrixCommandCompletion, HystrixCommandCompletion> writeOnlyCommandCompletionSubject;
    }
    

    HystrixCommandCompletion是事件(HystrixCommandEvent)的一种,writeOnlyCommandCompletionSubject这个Subject的初始化方式如下:

    // 创建为一个数据发射器
    writeOnlyCommandCompletionSubject = PublishSubject.create();
    writeOnlyCommandCompletionSubject
            .onBackpressureBuffer()
            // 绑定发射数据时的处理者
            .doOnNext(writeCommandCompletionsToShardedStreams)
            .unsafeSubscribe(Subscribers.empty());
    

    writeCommandCompletionsToShardedStreams会怎么处理数据呢?下面是它的定义:

    // 它是一个可执行的实体,没有返回值,可以传入一个参数; 和 Runnable很像
    private static final Action1<HystrixCommandCompletion> writeCommandCompletionsToShardedStreams = new Action1<HystrixCommandCompletion>() {
        // 当接收到数据时, 又将数据发送给了command级别的处理者
        @Override
        public void call(HystrixCommandCompletion commandCompletion) {
            // 获取CommandKey对应的HystrixCommandCompletionStream
            HystrixCommandCompletionStream commandStream = HystrixCommandCompletionStream.getInstance(commandCompletion.getCommandKey());
            // 写入数据
            commandStream.write(commandCompletion);
            ...
        }
    };
    

    现在再回过来看HystrixThreadEventStream这个Per-thread的工具发射 标记命令结束事件 的代码:

    public void executionDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey) {
        // 构建命令结束的数据对象
        HystrixCommandCompletion event = HystrixCommandCompletion.from(executionResult, commandKey, threadPoolKey);
        // 利用上面的Subject发射数据, onNext()就是发射一条数据。
        writeOnlyCommandCompletionSubject.onNext(event);
    }
    

    由于writeOnlyCommandCompletionSubject绑定了数据处理者(上面的writeCommandCompletionsToShardedStreams这个Action1)。它会利用command级别的工具来发射数据。

    3.Per-Command 事件处理者

    通过上一步知道,每个线程有自己的工具(HystrixThreadEventStream)来处理数据,最终这个工具利用了命令级别的工具。上面的HystrixCommandCompletionStream 属于 HystrixEventStream 的一种,HystrixEventStream专门用于处理command级别的数据,它有如下几个子类:

    HystrixCommandCompletionStream
    HystrixCommandStartStream
    HystrixThreadPoolCompletionStream
    HystrixThreadPoolStartStream
    HystrixCollapserEventStream
    

    这几个子类都是用来处理特定类型事件的工具,以HystrixCommandCompletionStream为例子,这些子类的结构都很类似,可以接收数据,并将数据提供给其他消费者。

    public class HystrixCommandCompletionStream {
        // 一个用于接收和发射结束事件的Subject
        private final Subject<HystrixCommandCompletion, HystrixCommandCompletion> writeOnlySubject;
        // 一个Observable,将接收到的数据作为数据源发射给其他消费者
        private final Observable<HystrixCommandCompletion> readOnlyStream;
    }
    

    先看看这个Per-Command 的对象是怎么创建的?

    // 存储结构
    private static final ConcurrentMap<String, HystrixCommandCompletionStream> streams = new ConcurrentHashMap<String, HystrixCommandCompletionStream>();
    
    // 单例模式拿到HystrixCommandCompletionStream,以命令的key为索引存储在ConcurrentMap中
    public static HystrixCommandCompletionStream getInstance(HystrixCommandKey commandKey) {
        HystrixCommandCompletionStream initialStream = streams.get(commandKey.name());
        if (initialStream != null) {
            return initialStream;
        } else {
            synchronized (HystrixCommandCompletionStream.class) {
                ...
            }
        }
    }
    

    下面是它的构造函数:

    HystrixCommandCompletionStream(final HystrixCommandKey commandKey) {
        this.commandKey = commandKey;
        // 创建可以发射数据的Subject
        this.writeOnlySubject = new SerializedSubject<HystrixCommandCompletion, HystrixCommandCompletion>(PublishSubject.<HystrixCommandCompletion>create());
        // readOnlyStream是一个Observable, share()方法可以将上面Subject发射的数据全部广播给readOnlyStream,相当于拷贝了一份一模一样的数据
        this.readOnlyStream = writeOnlySubject.share();
    }
    

    这个类提供了很重要的两个方法:

    // 提供了接收数据的方法,其他工具(如HystrixThreadEventStream)可以将数据写进来
    public void write(HystrixCommandCompletion event) {
        writeOnlySubject.onNext(event);
    }
    
    // 实现HystrixEventStream的observe(方法), 其他消费者可以利用observe()拿到这个数据源,然后订阅它,处理它发射的所有数据
    @Override
    public Observable<HystrixCommandCompletion> observe() {
        return readOnlyStream;
    }
    
    小结

    通过上面三步,数据流向就很清楚了:

    • Command直接使用HystrixCommandMetrics来记录命令开始、结束等事件
    • HystrixCommandMetrics利用线程级别的HystrixThreadEventStream的来接收数据
    • HystrixThreadEventStream完成各种事件的封装(如将结束事件封装成HystrixCommandCompletion),再利用command级别的HystrixEventStream来接收数据(HystrixEventStream有不同的子类来处理不同的事件)
    • 最终消费者通过HystrixEventStream的observe()方法,拿到这个数据源,然后订阅它,从而源源不断的拿到Command发射出的各种数据

    谁在最终消费数据?

    通过上述步骤,将Hystrix Command执行过程的各种信息转化成了特定数据结构的事件,然后提供了一个Observable作为数据源。如果需要使用这些数据,各观察者只需要订阅Observable就可以拿到这些已经分门别类且结构化的数据了。

    例如:断路器就是利用这些信息,然后统计分析数据,最终提供断路器的功能。

    相关文章

      网友评论

          本文标题:服务降级熔断 - Metrics

          本文链接:https://www.haomeiwen.com/subject/hilkvctx.html