Hystrix实现请求合并/请求缓存
Hystrix请求合并用于应对服务器的高并发场景,通过合并请求,减少线程的创建和使用,降低服务器请求压力,提高在高并发场景下服务的吞吐量和并发能力
请求合并的作用场景和原理
非合并模式(图片源自网络)在正常的分布式请求中,客户端发送请求,服务器在接受请求后,向服务提供者发送请求获取数据,这种模式在高并发的场景下就会导致线程池有大量的线程处于等待状态,从而导致响应缓慢,同时线程池的资源也是有限的,每一个请求都分配一个资源,也是无谓的消耗了很多服务器资源,在这种场景下我们可以通过使用hystrix的请求合并来降低对提供者过多的访问,减少线程池资源的消耗,从而提高系统的吞吐量和响应速度,如下图,采用请求合并后的服务模式
请求合并模式(图片源自网络)
请求合并可以通过构建类和添加注解的方式实现,这里我们先说通过构建合并类的方式实现请求合并
请求合并的实现类
请求合并的实现包含了两个主要实现类,一个是合并请求类,一个是批量处理类,合并请求类的作用是收集一定时间内的请求,将他们传递的参数汇总,然后调用批量处理类,通过向服务调用者发送请求获取批量处理的结果数据,最后在对应的request方法体中依次封装获取的结果,并返回回去
请求合并类的具体实现
public class UserCollapseCommand extends HystrixCollapser<List<UserInfo>,UserInfo,Integer> {
private CacheServiceImpl service;
private Integer userId;
/**
* 构造方法,主要是用来设置合并器的时间,多长时间合并一次请求
* @param cacheService 调用的服务
* @param userId 单次需要传递的业务id
*/
public UserCollapseCommand(CacheServiceImpl cacheService, Integer userId){
super(Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("userCollapseCommand")).
andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(100)));
this.service = cacheService;
this.userId = userId;
}
@Override
public Integer getRequestArgument() {
return userId;
}
/**
* 获取传入的业务参数集合,调用执行方法执行批量请求
* @param collapsedRequests
* @return
*/
@Override
protected HystrixCommand<List<UserInfo>> createCommand(Collection<CollapsedRequest<UserInfo, Integer>> collapsedRequests) {
System.out.println("HystrixCommandHystrixCommand========>");
//按请求数声名UserId的集合
List<Integer> userIds = new ArrayList<>(collapsedRequests.size());
//通过请求将100毫秒中的请求参数取出来装进集合中
userIds.addAll(collapsedRequests.stream().map(CollapsedRequest::getArgument).collect(Collectors.toList()));
//返回UserBatchCommand对象,自动执行UserBatchCommand的run方法
HystrixCommand<List<UserInfo>> a=new UserBatchCommand(service, userIds);
return a;
}
/**
* 返回请求的执行结果,并根据对应的request将结果返回到对应的request相应体中
* 通过循环所有被合并的请求依次从批处理的结果集中获取对应的结果
* @param userInfos 这里是批处理后返回的结果集
* @param collection 所有被合并的请求
*/
@Override
protected void mapResponseToRequests(List<UserInfo> userInfos, Collection<CollapsedRequest<UserInfo, Integer>> collection) {
int count = 0 ;
for(CollapsedRequest<UserInfo,Integer> collapsedRequest : collection){
//从批响应集合中按顺序取出结果
UserInfo user = userInfos.get(count++);
//将结果放回原Request的响应体内
collapsedRequest.setResponse(user);
}
}
}
通过创建构造方法来设置合并器的收集时间,也就是合并器一次收集客户端请求的时间是多久,然后通过收集请求器获取在这段时间内收集的所有请求参数,在传递给批量执行程序去批量执行,mapResponseToRequests方法获取返回的结果,并根据对应的request请求将结果返回到对应的request请求中
批量处理类
public class UserBatchCommand extends HystrixCommand<List<UserInfo>> {
private static final Logger LOGGER = LoggerFactory.getLogger(UserBatchCommand.class);
private CacheServiceImpl service;
private List<Integer> ids;
public UserBatchCommand(CacheServiceImpl cacheService, List<Integer> ids){
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")).
andCommandKey(HystrixCommandKey.Factory.asKey("GetValueForKey")));
this.service =cacheService ;
this.ids = ids;
}
/**
* 调用批量处理的方法
* @return
*/
@Override
protected List<UserInfo> run() {
List<UserInfo> users = service.findAll(ids);
return users;
}
/**
* Fallback回调方法,如果没有会报错
*/
@Override
protected List<UserInfo> getFallback(){
System.out.println("UserBatchCommand的run方法,调用失败");
return null;
}
}
API调用实现请求合并
/**
* 模拟合并请求测试(非注解)
* 这里通过
*/
@GetMapping("/collapse")
public void collapseTest(){
System.out.println("==========>collapseTest方法执行了");
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
Future<UserInfo> f1 = new UserCollapseCommand(cacheService, 1).queue();
Future<UserInfo> f2 = new UserCollapseCommand(cacheService, 2).queue();
Future<UserInfo> f3 = new UserCollapseCommand(cacheService, 3).queue();
Thread.sleep(3000);
Future<UserInfo> f4 = new UserCollapseCommand(cacheService, 5).queue();
Future<UserInfo> f5 = new UserCollapseCommand(cacheService, 6).queue();
UserInfo u1 = f1.get();
UserInfo u2 = f2.get();
UserInfo u3 = f3.get();
UserInfo u4 = f4.get();
UserInfo u5 = f5.get();
System.out.println(u1.getName());
System.out.println(u2.getName());
System.out.println(u3.getName());
System.out.println(u4.getName());
System.out.println(u5.getName());
} catch (Exception e) {
e.printStackTrace();
}finally {
context.close();
}
}
请求结果
image.png请求缓存
请求缓存是对一次请求的数据进行缓存,当有新的请求进来的时候,将会进行初始化操作,保证读取到的数据是最新的而不是上次缓存的数据,hystrix就是去缓存一次请求中相同cachekey的查询结果
个人认为这个功能相对比较鸡肋,使用的场景并不多
请求缓存实现
对于请求缓存可以通过实现类和注解的方式进行实现,这里我并没有去研究创建实现类的方式,比较复杂,我这里是使用了注解的方式进行请求缓存的实现
相关注解释义
@CacheResult:标记这是一个执行缓存的方法,其结果将会被缓存,需要注意的是,该注解需要配合HystrixCommand注解使用
@CacheKey:这个注解会对请求参数进行标记,相同参数将会获取缓存得到的结果
具体实现逻辑
首先创建一个服务提供者的实现类,用于返回调用值,这里采用随机数的方式,保证每次生成的数值不重复,以便于验证我们的缓存是否起作用
/**
* hystrix请求缓存
*
* @param
* @return
*/
@RequestMapping("/hystrix/cache")
public Integer getRandomInteger(){
Random random = new Random();
int randomInt = random.nextInt(99999);
return randomInt;
}
请求缓存具体注解实现类
@Override
@CacheResult
@HystrixCommand(commandKey = "commandKey2")
public Integer getRandomInteger(@CacheKey int id) {
Integer a1=storeClient.getRandomInteger();
return a1;
}
最后调用接口
/**
* 请求缓存测试
* 请求方式处设置缓存相关设置
*/
@GetMapping("/getCache1")
public String getCache1() {
//初始化Hystrix请求上下文
HystrixRequestContext context =HystrixRequestContext.initializeContext();//初始化请求上下文
Integer a=cacheService.getRandomInteger(1);
System.out.println("第一次获取缓存值为"+a);
Integer a1=cacheService.getRandomInteger(1);
System.out.println("第二次获取缓存值为"+a1);
Integer a2=cacheService.getRandomInteger(1);
System.out.println("第三次获取缓存值为"+a2);
String show="第一次请求值:"+a+",第二次请求值为:"+a1+",第三次请求值为:"+a2;
//上下文环境用完需要进行关闭
context.close();
return show;
}
查看结果
网友评论