前言
相信大家对Hystrix都很熟悉,它的源码大量使用RxJava,正好笔者的老本行是Android开发工程师,以前也略微接触过,想分享下自己看完Hystix的请求合并与请求缓存部分源码的一些收获。
Hystrix简介
- Hystrix由Netflix开源,官方定义如下:
Hystrix is a latency and fault tolerance library designed to isolate points of access to remote systems, services and 3rd party libraries, stop cascading failure and enable resilience in complex distributed systems where failure is inevitable.
- 笔者理解:在分布式环境中,错误不可避免,Hystrix提供了一种隔离、降级、熔断等机制。
1、隔离:通过隔离,避免服务之间相互影响,一个服务不可用,不会影响别的服务,避免了服务雪崩。 2、降级:分布式环境中,服务不可用的情况无法避免,降级机制可以给出更加友好的交互(默认值、异常返回)。 3、熔断:熔断机制可以避免在服务不可用时,服务调用方还在调用不可用的服务,导致资源消耗、耗时增加。 4、提供可视化的监控,Hystrix Dashboard。 4、当然,还有笔者今天要讲的请求合并与请求缓存。
- 请求合并与请求缓存,对应于官方给出的What does it do?的第3项:
Parallel execution. Concurrency aware request caching. Automated batching through request collapsing.
- 以下都是通过官方给的测试用例作为入口,查找源码并进行分析。
1、请求缓存:CommandUsingRequestCache 2、请求合并:CommandCollapserGetValueForKey
请求缓存
- 请求缓存的例子在
CommandUsingRequestCache
,继承自HystrixCommand
,和一般的Command
一致。 - 那么,使用缓存和不使用缓存代码层面有何不同呢?
1、初始化
HystrixRequestContext
2、重写getCacheKey
HystrixRequestContext
-
HystrixRequestContext.initializeContext
代码在HystrixRequestContext
中,从类名可以看出这是个请求上下文,保存一些请求的信息。 -
从源码可以看出,new出一个
HystrixRequestContext
,塞入ThreadLocal
变量中。
private static ThreadLocal<HystrixRequestContext> requestVariables = new ThreadLocal<HystrixRequestContext>();
/**
* Call this at the beginning of each request (from parent thread)
* to initialize the underlying context so that {@link HystrixRequestVariableDefault} can be used on any children threads and be accessible from
* the parent thread.
* <p>
* <b>NOTE: If this method is called then <code>shutdown()</code> must also be called or a memory leak will occur.</b>
* <p>
* See class header JavaDoc for example Servlet Filter implementation that initializes and shuts down the context.
*/
public static HystrixRequestContext initializeContext() {
HystrixRequestContext state = new HystrixRequestContext();
requestVariables.set(state);
return state;
}
- 那么,
HystrixRequestContext
存储上下文的数据结构是怎样的呢?
// 每个HystrixRequestContext实例,都会有一个ConcurrentMap
ConcurrentHashMap<HystrixRequestVariableDefault<?>, HystrixRequestVariableDefault.LazyInitializer<?>> state = new ConcurrentHashMap<HystrixRequestVariableDefault<?>, HystrixRequestVariableDefault.LazyInitializer<?>>();
/**
删除ConcurrentMap中存储的所有键值对,如果初始化了HystrixRequestContext对象,没有调用shutdown方法,确实会导致内存泄漏,因为state还在。
*/
public void shutdown() {
if (state != null) {
for (HystrixRequestVariableDefault<?> v : state.keySet()) {
// for each RequestVariable we call 'remove' which performs the shutdown logic
try {
HystrixRequestVariableDefault.remove(this, v);
} catch (Throwable t) {
HystrixRequestVariableDefault.logger.error("Error in shutdown, will continue with shutdown of other variables", t);
}
}
// null out so it can be garbage collected even if the containing object is still
// being held in ThreadLocals on threads that weren't cleaned up
state = null;
}
}
- 这个
ConcurrentHashMap
里存的HystrixRequestVariableDefault
及静态内部类HystrixRequestVariableDefault.LazyInitializer
又是什么呢?
HystrixRequestVariableDefault
-
HystrixRequestVariableDefault
其实就是存储了泛型T
的value
,并且封装了initialValue
、get
、set
方法。 -
LazyInitializer
顾名思义就是为了懒汉式初始化value
,而设计的内部类。
// 作用一:作为内部类调用HystrixRequestVariableDefault.initialValue方法,通过维护initialized布尔值,使HystrixRequestVariableDefault.initialValue方法只调用一次。
// 作用二:new一个LazyInitializer对象或LazyInitializer被垃圾回收时不会调用HystrixRequestVariableDefault.initialValue方法,也就是说对于业务初始化逻辑的影响被排除。
// 作用三:调用get方法时,可以通过CAS乐观锁的方式实现value的获取,具体请参照get方法。
static final class LazyInitializer<T> {
// @GuardedBy("synchronization on get() or construction")
private T value;
/*
* Boolean to ensure only-once initialValue() execution instead of using
* a null check in case initialValue() returns null
*/
// @GuardedBy("synchronization on get() or construction")
private boolean initialized = false;
private final HystrixRequestVariableDefault<T> rv;
// 不会调用HystrixRequestVariableDefault.initialValue,不会更新initialized值
private LazyInitializer(HystrixRequestVariableDefault<T> rv) {
this.rv = rv;
}
// 不会调用HystrixRequestVariableDefault.initialValue,只能通过set方式调用
private LazyInitializer(HystrixRequestVariableDefault<T> rv, T value) {
this.rv = rv;
this.value = value;
this.initialized = true;
}
// 如果未初始化(没有调用过set方法)过,则返回HystrixRequestVariableDefault.initialValue的值,初始化过则返回初始化的值
public synchronized T get() {
if (!initialized) {
value = rv.initialValue();
initialized = true;
}
return value;
}
}
- get方法,先从
ConcurrentHashMap
中取出对应的LazyInitializer
,如果为空则使用CAS乐观锁的方式,new一个LazyInitializer
并存入ConcurrentHashMap
,最后返回调用LazyInitializer.get()
并返回
public T get() {
// 当前线程的HystrixRequestContext为null 或 ConcurrentHashMap<HystrixRequestVariableDefault<?>, HystrixRequestVariableDefault.LazyInitializer<?>> 为null
if (HystrixRequestContext.getContextForCurrentThread() == null) {
throw new IllegalStateException(HystrixRequestContext.class.getSimpleName() + ".initializeContext() must be called at the beginning of each request before RequestVariable functionality can be used.");
}
ConcurrentHashMap<HystrixRequestVariableDefault<?>, LazyInitializer<?>> variableMap = HystrixRequestContext.getContextForCurrentThread().state;
// short-circuit the synchronized path below if we already have the value in the ConcurrentHashMap
LazyInitializer<?> v = variableMap.get(this);
if (v != null) {
return (T) v.get();
}
/*
* 乐观锁方式(CAS)new一个LazyInitializer,放进ConcurrentHashMap
* 这里值得注意的是,不调用LazyInitializer.get方法是不会执行HystrixRequestVariableDefault.initialValue,故当putIfAbsent失败时,可以乐观地放弃该实例,使该实例被GC。
* 不管哪个LazyInitializer实例的get方法被调用,HystrixRequestVariableDefault.initialValue也只会被调用一次。
*/
LazyInitializer<T> l = new LazyInitializer<T>(this);
LazyInitializer<?> existing = variableMap.putIfAbsent(this, l);
if (existing == null) {
/*
* We won the thread-race so can use 'l' that we just created.
*/
return l.get();
} else {
/*
* We lost the thread-race so let 'l' be garbage collected and instead return 'existing'
*/
return (T) existing.get();
}
}
各类之间的关系
- 一个request(不局限于一个线程) -> HystrixRequestContext -> ConcurrentHashMap<HystrixRequestVariableDefault, HystrixRequestVariableDefault.LazyInitializer>
- 也就是说每个request都有一个ConcurrentHashMap<HystrixRequestVariableDefault, HystrixRequestVariableDefault.LazyInitializer> map。
获取缓存
-
getCacheKey
重写了AbstractCommand.getCacheKey
方法,AbstractCommand
为HystrixCommand
的基类。
- 根据上图,我们可以看出
execute
方法,最终调用toObservable
方法,而toObservable
方法在AbstractCommand
中,因此我们可以初步断定在AbstractCommand.toObservable
方法中,会与HystrixRequestVariableDefault
或者其实现的接口产生关联,进行缓存的读取和写入。
AbstractCommand.toObservable
的关键代码如下:
final String cacheKey = getCacheKey();
/* 如果开启了缓存功能,从缓存读取 */
if (requestCacheEnabled) {
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}
// 缓存对象
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
Observable<R> afterCache;
// 放进缓存
if (requestCacheEnabled && cacheKey != null) {
// 包装成缓存Observable对象
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
- 接下来,我们就只要寻找
HystrixRequestCache
与HystrixRequestVariableDefault
之间的关联了,AbstractCommand
构造器中通过HystrixRequestCache.getInstance
构造了HystrixRequestCache
对象。
// 又是CAS,putIfAbsent。。。
private static HystrixRequestCache getInstance(RequestCacheKey rcKey, HystrixConcurrencyStrategy concurrencyStrategy) {
HystrixRequestCache c = caches.get(rcKey);
if (c == null) {
HystrixRequestCache newRequestCache = new HystrixRequestCache(rcKey, concurrencyStrategy);
HystrixRequestCache existing = caches.putIfAbsent(rcKey, newRequestCache);
if (existing == null) {
// we won so use the new one
c = newRequestCache;
} else {
// we lost so use the existing
c = existing;
}
}
return c;
}
- 来看
HystrixRequestCache
的值是怎么存储的,看HystrixRequestCache.putIfAbsent
。
HystrixCachedObservable<T> putIfAbsent(String cacheKey, HystrixCachedObservable<T> f) {
// 使用HystrixRequestCache.prefix + concurrencyStrategy + HystrixCommand.getCacheKey包装成缓存key
ValueCacheKey key = getRequestCacheKey(cacheKey);
if (key != null) {
// 寻找缓存,关键代码
ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>> cacheInstance = requestVariableForCache.get(concurrencyStrategy);
if (cacheInstance == null) {
throw new IllegalStateException("Request caching is not available. Maybe you need to initialize the HystrixRequestContext?");
}
HystrixCachedObservable<T> alreadySet = (HystrixCachedObservable<T>) cacheInstance.putIfAbsent(key, f);
if (alreadySet != null) {
// someone beat us so we didn't cache this
return alreadySet;
}
}
// we either set it in the cache or do not have a cache key
return null;
}
-
requestVariableInstance.get(key)
为HystrixRequestVariableHolder
中的方法。
// 找到了关联。。。这里有HystrixRequestVariable
private static ConcurrentHashMap<RVCacheKey, HystrixRequestVariable<?>> requestVariableInstance = new ConcurrentHashMap<RVCacheKey, HystrixRequestVariable<?>>();
//
public T get(HystrixConcurrencyStrategy concurrencyStrategy) {
RVCacheKey key = new RVCacheKey(this, concurrencyStrategy);
HystrixRequestVariable<?> rvInstance = requestVariableInstance.get(key);
if (rvInstance == null) {
requestVariableInstance.putIfAbsent(key, concurrencyStrategy.getRequestVariable(lifeCycleMethods));
/*
* 内存泄漏检测,
*/
if (requestVariableInstance.size() > 100) {
logger.warn("Over 100 instances of HystrixRequestVariable are being stored. This is likely the sign of a memory leak caused by using unique instances of HystrixConcurrencyStrategy instead of a single instance.");
}
}
// HystrixRequestVariable.get取出ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>>的map,再从ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>>中根据重写的getCacheKey构造出ValueCacheKey,拿出缓存值。
return (T) requestVariableInstance.get(key).get();
}
获取缓存过程中各个对象的对应关系
- 一个commandKey
- 一个HystrixRequestVariableHolder<ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>>>
- 一个ConcurrentHashMap<RVCacheKey, HystrixRequestVariable> requestVariableInstance = new ConcurrentHashMap>()
请求缓存总结
最后,再总结下请求缓存机制,一个request对应一个HystrixRequestContext
、HystrixRequestVariable
中存储缓存值,通过重写getCacheKey
构造对应RVCacheKey
,通过HystrixRequestVariableHolder
拿到HystrixRequestVariable
的值。
总结
看了源码才发现,作者有如下感受:
1、各种ConcurrentHashMap 2、终于RxJava第一次看到在非Android领域运用 3、懒加载+CAS伴随整个流程,后续也会考虑这种非锁实现。
网友评论