美文网首页
用 Hystrix 构建高可用服务架构(中)

用 Hystrix 构建高可用服务架构(中)

作者: 久伴_不离 | 来源:发表于2019-05-24 17:33 被阅读0次

4.Hystrix 隔离策略细粒度控制

Hystrix 实现资源隔离,有两种策略:

            线程池隔离

            信号量隔离

对资源隔离这一块东西,其实可以做一定细粒度的一些控制。

execution.isolation.strategy

指定了 HystrixCommand.run() 的资源隔离策略:THREAD or SEMAPHORE,一种基于线程池,一种基于信号量。

// to use thread

isolationHystrixCommandProperties.Setter().withExecutionIsolationStrategy(ExecutionIsolation

Strategy.THREAD)

// to use semaphore

isolationHystrixCommandProperties.Setter().withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)

线程池机制,每个 command 运行在一个线程中,限流是通过线程池的大小来控制的;信号量机制,command是运行在调用线程中,通过信号量的容量来进行限流。

如何在线程池和信号量之间做选择?

默认的策略就是线程池。

线程池其实最大的好处就是对于网络访问请求,如果有超时的话,可以避免调用线程阻塞住。

而使用信号量的场景,通常是针对超大并发量的场景下,每个服务实例每秒都几百的 QPS,那么此时你用线程池的话,线程一般不会太多,可能撑不住那么高的并发,如果要撑住,可能要耗费大量的线程资源,那么就是用信号量,来进行限流保护。一般用信号量常见于那种基于纯内存的一些业务逻辑服务,而不涉及到任何网络访问请求。

command key & command group

我们使用线程池隔离,要怎么对依赖服务、依赖服务接口、线程池三者做划分呢?

每一个 command,都可以设置一个自己的名称 command key,同时可以设置一个自己的组 command group。

private static final Setter cachedSetter =

Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))

.andCommandKey(HystrixCommandKey.Factory.asK

ey("HelloWorld"));

public CommandHelloWorld(String name) {

super(cachedSetter);

this.name = name;

}

command group 是一个非常重要的概念,默认情况下,就是通过 command group 来定义一个线程池的,而且还会通过 command group 来聚合一些监控和报警信息。同一个 command group 中的请求,都会进入同一个线程池中。

command thread pool

ThreadPoolKey 代表了一个 HystrixThreadPool,用来进行统一监控、统计、缓存。默认的 ThreadPoolKey就是 command group 的名称。每个 command 都会跟它的 ThreadPoolKey 对应的 ThreadPool 绑定在一起。如果不想直接用 command group,也可以手动设置 ThreadPool 的名称。

private static final Setter cachedSetter =

Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")).andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld")).andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("HelloWorldPool"));public CommandHelloWorld(String name) {super(cachedSetter);this.name = name;}
command key & command group & command thread pool

command key ,代表了一类 command,一般来说,代表了底层的依赖服务的一个接口。

command group ,代表了某一个底层的依赖服务,这是很合理的,一个依赖服务可能会暴露出来多个接口,每个接口就是一个 command key。command group 在逻辑上去组织起来一堆 command key 的调用、统计信息、成功次数、timeout 超时次数、失败次数等,可以看到某一个服务整体的一些访问情况。一般来说,推荐根据一个服务区划分出一个线程池,command key 默认都是属于同一个线程池的。比如说你以一个服务为粒度,估算出来这个服务每秒的所有接口加起来的整体 QPS 在 100 左右,你调用这个服务,当前这个服务部署了 10 个服务实例,每个服务实例上,其实用这个 command group 对应这个服务,给一个线程池,量大概在 10 个左右就可以了,你对整个服务的整体的访问 QPS 就大概在每秒 100左右。但是,如果说 command group 对应了一个服务,而这个服务暴露出来的几个接口,访问量很不一样,差异非常之大。你可能就希望在这个服务 command group 内部,包含的对应多个接口的 command key,做一些细粒度的资源隔离。就是说,对同一个服务的不同接口,使用不同的线程池。

command key -> command group

command key -> 自己的 thread pool key

逻辑上来说,多个 command key 属于一个 command group,在做统计的时候,会放在一起统计。每个 command key 有自己的线程池,每个接口有自己的线程池,去做资源隔离和限流。

说白点,就是说如果你的 command key 要用自己的线程池,可以定义自己的 thread pool key,就 ok 了。

coreSize

设置线程池的大小,默认是 10。一般来说,用这个默认的 10 个线程大小就够了。

HystrixThreadPoolProperties.Setter().withCoreSize(int value);

queueSizeRejectionThreshold

如果说线程池中的 10 个线程都在工作中,没有空闲的线程来做其它的事情,此时再有请求过来,会先进入队列积压。如果说队列积压满了,再有请求过来,就直接 reject,拒绝请求,执行 fallback 降级的逻辑,快速返回。

控制 queue 满了之后 reject 的 threshold,因为 maxQueueSize 不允许热修改,因此提供这个参数可以热修改,控制队列的最大大小。

HystrixThreadPoolProperties.Setter().withQueueSizeRejectionThreshold(int value);

execution.isolation.semaphore.maxConcurrentRequests

设置使用 SEMAPHORE 隔离策略的时候允许访问的最大并发量,超过这个最大并发量,请求直接被 reject。

这个并发量的设置,跟线程池大小的设置,应该是类似的,但是基于信号量的话,性能会好很多,而且Hystrix 框架本身的开销会小很多。

默认值是 10,尽量设置的小一些,因为一旦设置的太大,而且有延时发生,可能瞬间导致 tomcat 本身的线程资源被占满。

HystrixCommandProperties.Setter().withExecutionIsolationSemaphoreMaxConcurrentRequests(int value);


5.深入 Hystrix 执行时内部原理

前面我们了解了 Hystrix 最基本的支持高可用的技术:资源隔离 + 限流。

创建 command;

执行这个 command;

配置这个 command 对应的 group 和线程池。

这里,我们要讲一下,你开始执行这个 command,调用了这个 command 的 execute() 方法之后,Hystrix

底层的执行流程和步骤以及原理是什么。

在讲解这个流程的过程中,我会带出来 Hystrix 其他的一些核心以及重要的功能。

这里是整个 8 大步骤的流程图,我会对每个步骤进行细致的讲解。学习的过程中,对照着这个流程图,相信思路会比较清晰。

步骤一:创建 command 

一个 HystrixCommand 或 HystrixObservableCommand 对象,代表了对某个依赖服务发起的一次请求或者调用。创建的时候,可以在构造函数中传入任何需要的参数。

             HystrixCommand 主要用于仅仅会返回一个结果的调用。

             HystrixObservableCommand 主要用于可能会返回多条结果的调用。

// 创建 HystrixCommandHystrixCommand hystrixCommand = new HystrixCommand(arg1, arg2);

// 创建 HystrixObservableCommandHystrixObservableCommand hystrixObservableCommand = new HystrixObservableCommand(arg1, arg2);

步骤二:调用 command 执行方法

执行 command,就可以发起一次对依赖服务的调用。

要执行 command,可以在 4 个方法中选择其中的一个:execute()、queue()、observe()、toObservable()。

其中 execute() 和 queue() 方法仅仅对 HystrixCommand 适用。

             execute():调用后直接 block 住,属于同步调用,直到依赖服务返回单条结果,或者抛出异常。

             queue():返回一个 Future,属于异步调用,后面可以通过 Future 获取单条结果。

             observe():订阅一个 Observable 对象,Observable 代表的是依赖服务返回的结果,获取到一个那个代表结果的 Observable 对象的拷贝对象。

             toObservable():返回一个 Observable 对象,如果我们订阅这个对象,就会执行 command 并且获取返回结果。

K value = hystrixCommand.execute();Future<K> fValue =

hystrixCommand.queue();Observable<K> oValue = hystrixObservableCommand.observe();Observable<K>

toOValue = hystrixObservableCommand.toObservable();

execute() 实际上会调用 queue().get() 方法,可以看一下 Hystrix 源码。

public R execute() {

try {

return queue().get();

} catch (Exception e) {

throw Exceptions.sneakyThrow(decomposeException(e));

}

}

而在 queue() 方法中,会调用 toObservable().toBlocking().toFuture()。

final Future<R> delegate = toObservable().toBlocking().toFuture();

也就是说,先通过 toObservable() 获得 Future 对象,然后调用 Future 的 get() 方法。那么,其实无论是哪种方式执行 command,最终都是依赖于 toObservable() 去执行的。

步骤三:检查是否开启缓存

从这一步开始,就进入到 Hystrix 底层运行原理啦,看一下 Hystrix 一些更高级的功能和特性。

如果这个 command 开启了请求缓存 Request Cache,而且这个调用的结果在缓存中存在,那么直接从缓存中返回结果。否则,继续往后的步骤。

步骤四:检查是否开启了断路器

检查这个 command 对应的依赖服务是否开启了断路器。如果断路器被打开了,那么 Hystrix 就不会执行这个 command,而是直接去执行 fallback 降级机制,返回降级结果。

步骤五:检查线程池/队列/信号量是否已满

如果这个 command 线程池和队列已满,或者 semaphore 信号量已满,那么也不会执行 command,而是直接去调用 fallback 降级机制,同时发送 reject 信息给断路器统计。

步骤六:执行 command

调用 HystrixObservableCommand 对象的 construct() 方法,或者 HystrixCommand 的 run() 方法来实际执行这个 command。             HystrixCommand.run() 返回单条结果,或者抛出异常。// 通过 command 执行,获取最新一条商品数据 ProductInfo productInfo =getProductInfoCommand.execute();             HystrixObservableCommand.construct() 返回一个 Observable 对象,可以获取多条结果。

Observable<ProductInfo> observable = getProductInfosCommand.observe();

// 订阅获取多条结果observable.subscribe(new Observer<ProductInfo>() {@Overridepublic void onCompleted() {System.out.println("获取完了所有的商品数据");}@Overridepublic void onError(Throwable e) {e.printStackTrace();}/** * 获取完一条数据,就回调一次这个方法 * @param productInfo */

@Override

public void onNext(ProductInfo productInfo) {System.out.println(productInfo);}});

如果是采用线程池方式,并且 HystrixCommand.run() 或者HystrixObservableCommand.construct() 的执行时间超过了 timeout 时长的话,那么 command 所在的线程会抛出一个TimeoutException,这时会执行 fallback 降级机制,不会去管 run() 或 construct() 返回的值了。另一种情况,如果 command 执行出错抛出了其它异常,那么也会走 fallback 降级。这两种情况下,Hystrix 都会发送异常事件给断路器统计。

注意,我们是不可能终止掉一个调用严重延迟的依赖服务的线程的,只能说给你抛出来一个TimeoutException。

如果没有 timeout,也正常执行的话,那么调用线程就会拿到一些调用依赖服务获取到的结果,然后Hystrix 也会做一些 logging 记录和 metric 度量统计。

步骤七:断路健康检查

Hystrix 会把每一个依赖服务的调用成功、失败、Reject、Timeout 等事件发送给 circuit breaker 断路器。断路器就会对这些事件的次数进行统计,根据异常事件发生的比例来决定是否要进行断路(熔断)。

如果打开了断路器,那么在接下来一段时间内,会直接断路,返回降级结果。

如果在之后,断路器尝试执行 command,调用没有出错,返回了正常结果,那么 Hystrix 就会把断路器关闭。

步骤八:调用 fallback 降级机制

在以下几种情况中,Hystrix 会调用 fallback 降级机制。

             断路器处于打开状态;

             线程池/队列/semaphore 满了;

             command 执行超时;

             run() 或者 construct() 抛出异常。

一般在降级机制中,都建议给出一些默认的返回值,比如静态的一些代码逻辑,或者从内存中的缓存中提取一些数据,在这里尽量不要再进行网络请求了。

在降级中,如果一定要进行网络调用的话,也应该将那个调用放在一个 HystrixCommand 中进行隔离。

             HystrixCommand 中,实现 getFallback() 方法,可以提供降级机制。

             HystrixObservableCommand 中,实现 resumeWithFallback() 方法,返回一个 Observable 对象,可以提供降级结果。

如果没有实现 fallback,或者 fallback 抛出了异常,Hystrix 会返回一个 Observable,但是不会返回任何数据。

不同的 command 执行方式,其 fallback 为空或者异常时的返回结果不同。

 对于 execute(),直接抛出异常。

 对于 queue(),返回一个 Future,调用 get() 时抛出异常。

 对于 observe(),返回一个 Observable 对象,但是调用 subscribe() 方法订阅它时,立即抛出调用者的 onError() 方法。

 对于 toObservable(),返回一个 Observable 对象,但是调用 subscribe() 方法订阅它时,立即抛出调用者的 onError() 方法。

不同的执行方式

 execute(),获取一个 Future.get(),然后拿到单个结果。

 queue(),返回一个 Future。

 observe(),立即订阅 Observable,然后启动 8 大执行步骤,返回一个拷贝的 Observable,订阅时立即回调给你结果。

 toObservable(),返回一个原始的 Observable,必须手动订阅才会去执行 8 大步骤。


6.基于 request cache 请求缓存技术优化批量商品数据查询接口

Hystrix command 执行时 8 大步骤第三步,就是检查 Request cache 是否有缓存。

首先,有一个概念,叫做 Request Context 请求上下文,一般来说,在一个 web 应用中,如果我们用到了 Hystrix,我们会在一个 filter 里面,对每一个请求都施加一个请求上下文。就是说,每一次请求,就是一次请求上下文。然后在这次请求上下文中,我们会去执行 N 多代码,调用 N 多依赖服务,有的依赖服务可能还会调用好几次。

在一次请求上下文中,如果有多个 command,参数都是一样的,调用的接口也是一样的,而结果可以认为也是一样的。那么这个时候,我们可以让第一个 command 执行返回的结果缓存在内存中,然后这个请求上下文后续的其它对这个依赖的调用全部从内存中取出缓存结果就可以了。

这样的话,好处在于不用在一次请求上下文中反复多次执行一样的 command,避免重复执行网络请求,提升整个请求的性能。

举个栗子。比如说我们在一次请求上下文中,请求获取 productId 为 1 的数据,第一次缓存中没有,那么会从商品服务中获取数据,返回最新数据结果,同时将数据缓存在内存中。后续同一次请求上下文中,如果还有获取 productId 为 1 的数据的请求,直接从缓存中取就好了。

HystrixCommand 和 HystrixObservableCommand 都可以指定一个缓存 key,然后 Hystrix 会自动进行缓存,接着在同一个 request context 内,再次访问的话,就会直接取用缓存。

下面,我们结合一个具体的业务场景,来看一下如何使用 request cache 请求缓存技术。当然,以下代码只作为一个基本的 Demo 而已。

现在,假设我们要做一个批量查询商品数据的接口,在这个里面,我们是用 HystrixCommand 一次性批量查询多个商品 id 的数据。但是这里有个问题,如果说 Nginx 在本地缓存失效了,重新获取一批缓存,传递过来的 productIds 都没有进行去重,比如 productIds=1,1,1,2,2,那么可能说,商品 id 出现了重复,如果按照我们之前的业务逻辑,可能就会重复对 productId=1 的商品查询三次,productId=2 的商品查询两次。

我们对批量查询商品数据的接口,可以用 request cache 做一个优化,就是说一次请求,就是一次 request context,对相同的商品查询只执行一次,其余重复的都走 request cache。

实现 Hystrix 请求上下文过滤器并注册

定义 HystrixRequestContextFilter 类,实现 Filter 接口。

/** * Hystrix 请求上下文过滤器 */public class HystrixRequestContextFilter implements Filter {

@Override

public void init(FilterConfig filterConfig) throws ServletException {

}

@Override

public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse,

FilterChain filterChain) {

HystrixRequestContext context = HystrixRequestContext.initializeContext();

try {

filterChain.doFilter(servletRequest, servletResponse);

} catch (IOException | ServletException e) {

e.printStackTrace();

} finally {

context.shutdown();

}

}

@Override

public void destroy() {

}

}

然后将该 filter 对象注册到 SpringBoot Application 中。

@SpringBootApplicationpublic class EshopApplication {

public static void main(String[] args) {

SpringApplication.run(EshopApplication.class, args);

}

@Bean

public FilterRegistrationBean filterRegistrationBean() {

FilterRegistrationBean filterRegistrationBean = new FilterRegistrationBean(new

HystrixRequestContextFilter());

filterRegistrationBean.addUrlPatterns("/*");

return filterRegistrationBean;

}

}

command 重写 getCacheKey() 方法

在 GetProductInfoCommand 中,重写 getCacheKey() 方法,这样的话,每一次请求的结果,都会放在

Hystrix 请求上下文中。下一次同一个 productId 的数据请求,直接取缓存,无须再调用 run() 方法。

public class GetProductInfoCommand extends HystrixCommand<ProductInfo> {

private Long productId;

private static final HystrixCommandKey KEY =

HystrixCommandKey.Factory.asKey("GetProductInfoCommand");

public GetProductInfoCommand(Long productId) {

super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ProductInfoService"))

.andCommandKey(KEY));

this.productId = productId;

}

@Override

protected ProductInfo run() {

String url = "http://localhost:8081/getProductInfo?productId=" + productId;

String response = HttpClientUtils.sendGetRequest(url);

System.out.println("调用接口查询商品数据,productId=" + productId);

return JSONObject.parseObject(response, ProductInfo.class);

}

/** * 每次请求的结果,都会放在 Hystrix 绑定的请求上下文上 * * @return cacheKey

缓存 key */

@Override

public String getCacheKey() {

return "product_info_" + productId;

}

/** * 将某个商品 id 的缓存清空 * * @param productId 商品 id */

public static void flushCache(Long productId) {

HystrixRequestCache.getInstance(KEY,

HystrixConcurrencyStrategyDefault.getInstance()).clear("product_info_" +

productId);

}

}

这里写了一个 flushCache() 方法,用于我们开发手动删除缓存。

controller 调用 command 查询商品信息

在一次 web 请求上下文中,传入商品 id 列表,查询多条商品数据信息。对于每个 productId,都创建一个 command。

如果 id 列表没有去重,那么重复的 id,第二次查询的时候就会直接走缓存。

@Controllerpublic class CacheController {

/** * 一次性批量查询多条商品数据的请求 * * @param productIds 以,分隔的商品 id

列表 * @return 响应状态 */

@RequestMapping("/getProductInfos")

@ResponseBody

public String getProductInfos(String productIds) {

for (String productId : productIds.split(",")) {

// 对每个 productId,都创建一个 command

GetProductInfoCommand getProductInfoCommand = new

GetProductInfoCommand(Long.valueOf(productId));

ProductInfo productInfo = getProductInfoCommand.execute();

System.out.println("是否是从缓存中取的结果:" +

getProductInfoCommand.isResponseFromCache());

}

return "success";

}

}

发起请求

调用接口,查询多个商品的信息。

http://localhost:8080/getProductInfos?productIds=1,1,1,2,2,5

在控制台,我们可以看到以下结果。

调用接口查询商品数据,productId=1

是否是从缓存中取的结果:false

是否是从缓存中取的结果:true

是否是从缓存中取的结果:true

调用接口查询商品数据,productId=2

是否是从缓存中取的结果:false

是否是从缓存中取的结果:true

调用接口查询商品数据,productId=5

是否是从缓存中取的结果:false

第一次查询 productId=1 的数据,会调用接口进行查询,不是从缓存中取结果。而随后再出现查询

productId=1 的请求,就直接取缓存了,这样的话,效率明显高很多。

删除缓存

我们写一个 UpdateProductInfoCommand,在更新商品信息之后,手动调用之前写的 flushCache(),手动将缓存删除。

public class UpdateProductInfoCommand extends HystrixCommand<Boolean> {

private Long productId;

public UpdateProductInfoCommand(Long productId) {

super(HystrixCommandGroupKey.Factory.asKey("UpdateProductInfoGroup"));

this.productId = productId;

}

@Override

protected Boolean run() throws Exception {

// 这里执行一次商品信息的更新

// ...

// 然后清空缓存

GetProductInfoCommand.flushCache(productId);

return true;

}

}

这样,以后查询该商品的请求,第一次就会走接口调用去查询最新的商品信息。


相关文章

网友评论

      本文标题:用 Hystrix 构建高可用服务架构(中)

      本文链接:https://www.haomeiwen.com/subject/rndnzqtx.html