响应式编程
1 spring mvc与spring webflux
- 两个框架都可以使用注解方式,都运行在 Tomet 等容器中;SpringMVC 采用命令式编程,Webflux 采用异步响应式编程
- SpringMVC 方式实现,同步阻塞的方式,基于 SpringMVC+Servlet+Tomcat;SpringWebflux 方式实现,异步非阻塞 方式,基于 SpringWebflux+Reactor+Netty
2 响应式编程(Reactor 实现)
- 响应式编程操作中,Reactor 是满足 Reactive 规范框架
- Reactor 有两个核心类,Mono 和 Flux,这两个类实现接口 Publisher,提供丰富操作符。Flux 对象实现发布者,返回 N 个元素;Mono 实现发布者,返回 0 或者 1 个元素
- Flux 和 Mono 都是数据流的发布者,使用 Flux 和 Mono 都可以发出三种数据信号:元素值,错误信号,完成信号,错误信号和完成信号都代表终止信号,终止信号用于告诉
- 订阅者数据流结束了,错误信号终止数据流同时把错误信息传递给订阅者
3 SpringWebflux(基于注解编程模型)
- 使用注解编程模型方式,和之前 SpringMVC 使用相似的,只需要把相关依赖配置到项目中,SpringBoot 自动配置相关运行容器,默认情况下使用 Netty 服务器
- SpringWebflux 里面 DispatcherHandler,负责请求的处理
- HandlerMapping:请求查询到处理的方法
- HandlerAdapter:真正负责请求处理
- HandlerResultHandler:响应结果处理
- service:
//添加用户
@Override
public Mono<Void> saveUserInfo(Mono<User> userMono) {
return userMono.doOnNext(person -> {
//向map集合里面放值
int id = users.size()+1;
users.put(id,person);
}).thenEmpty(Mono.empty());
}
//添加
@PostMapping("/saveuser")
public Mono<Void> saveUser(@RequestBody User user) {
Mono<User> userMono = Mono.just(user);
return userService.saveUserInfo(userMono);
}
4 SpringWebflux(基于函数式编程模型)
- 在使用函数式编程模型操作时候,需要自己初始化服务器
- 基于函数式编程模型时候,有两个核心接口:RouterFunction(实现路由功能,请求转发给对应的 handler)和 HandlerFunction(处理请求生成响应的函数)。核心任务定义两个函数式接口的实现并且启动需要的服务器。
- SpringWebflux请求和响应不再是ServletRequest 和 ServletResponse ,而是ServerRequest 和 ServerResponse
- 创建服务
public void createReactorServer() {
//路由和handler适配
RouterFunction<ServerResponse> route = routingFunction();
HttpHandler httpHandler = toHttpHandler(route);
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
//创建服务器
HttpServer httpServer = HttpServer.create();
httpServer.handle(adapter).bindNow();
}
public RouterFunction<ServerResponse> routingFunction() {
//创建hanler对象
UserService userService = new UserServiceImpl();
UserHandler handler = new UserHandler(userService);
//设置路由
return RouterFunctions.route(
GET("/users/{id}").and(accept(APPLICATION_JSON)),handler::getUserById)
.andRoute(GET("/users").and(accept(APPLICATION_JSON)),handler::getAllUsers);
}
public Mono<ServerResponse> getAllUsers(ServerRequest request) {
//调用service得到结果
Flux<User> users = this.userService.getAllUser();
return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(users,User.class);
}
public Flux<User> getAllUser() {
return Flux.fromIterable(this.users.values());
}
网关gateway
1 核心核心概念
- Route:是构建网关的基本模块,它是由ID、目标URL、一系列的断言和过滤器组成,如果断言为true则匹配改路由
- Predicate:参考的是java8的java.util.function.Predicate,开发人员可以匹配http请求中的所有内容,如果请求与断言相匹配则进行路由
- Filter:指spring框架中GatewayFilter实例,使用过滤器可以在请求被路由前或者之后对请求进行修改。自定义filter:全局过滤器需要实现GlobalFilter, Ordered,局部服务应用过滤器需要继承AbstractGatewayFilterFactory<CacheRequestFilter.Config>
2 动态路由服务
cloud:
gateway:
discovery:
locator:
enabled: true #开启从注册中心动态创建路由的功能,利用微服务名称j进行路由
#表示将请求路径的服务名配置改成小写 因为服务注册的时候,向注册中心注册时将服务名转成大写的了
lower-case-service-id: true
3 lb负载均衡
routes:
# 认证中心
- id: ry-auth
uri: lb://ry-auth #从注册中心获取服务,且以lb(load-balance)负载均衡方式转发
predicates:
- Path=/auth/**
filters:
- CacheRequestFilter #gateway自定义filter
- ValidateCodeFilter
#截取路径的个数,请求/hello/HiController/aaa后端匹配到的请求路径,就会变成http://localhost:8762/HiController/aaa
- StripPrefix=1
4 多过滤器形成chain,流不能多次读取
DataBufferFactory dataBufferFactory = exchange.getResponse().bufferFactory();
ServerHttpRequestDecorator decorator = new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
if (bytes.length > 0) {
return Flux.just(dataBufferFactory.wrap(bytes));
}
return Flux.empty();
}
};
5 gateway与sentinel限流
@Bean
@Order(Ordered.HIGHEST_PRECEDENCE)
public GlobalFilter sentinelGatewayFilter() {
return new SentinelGatewayFilter();
}
@PostConstruct
public void initGatewayRules(){
Set<GatewayFlowRule> rules = new HashSet<>();
rules.add(
new GatewayFlowRule("provider_route") //路由名称,serviceid
.setCount(1) //数量
.setIntervalSec(1) //每秒
);
GatewayRuleManager.loadRules(rules);
}
public class SentinelFallbackHandler implements WebExceptionHandler{
private Mono<Void> writeResponse(ServerResponse response, ServerWebExchange exchange) {
return ServletUtils.webFluxResponseWriter(exchange.getResponse(), "请求超过最大数,请稍候再试");
}
@Override
public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) {
if (exchange.getResponse().isCommitted()) {
return Mono.error(ex);
}
if (!BlockException.isBlockException(ex)) {
return Mono.error(ex);
}
return handleBlockedRequest(exchange, ex).flatMap(response -> writeResponse(response, exchange));
}
private Mono<ServerResponse> handleBlockedRequest(ServerWebExchange exchange, Throwable throwable) {
return GatewayCallbackManager.getBlockHandler().handleRequest(exchange, throwable);
}
}
断路器sentinel
1 sentinel与hystrix对比
sentinel hystrix
- 隔离策略 信号量隔离 线程池/信号量隔离
- 熔断降级 基于响应时间或失败比率 基于失败比率
- 限流 基于qps、调用关系 不支持
- 流量整形 支持慢启动、匀速器模式 不支持
- 系统负载保护 支持 不支持
- 实时指标 滑动窗口 滑动窗口rxjava
- 控制台 支持 不支持
2 流量控制
- 模式:直接、关联、链路;
- 运行指标: QPS、线程池;
- 控制的效果:直接限流、冷启动、排队。
2.1 模式
- 关联:当关联的资源大道阈值时,就限流自己
- 链路:多个请求调用同一个微服务
2.2 效果:
- 直接:直接失败抛出异常;
- 冷启动:默认的coldFactor为3,即请求qps从threshold/3开始,经过预热时长(dashboard设置)逐渐升至设定的qps阈值
应用场景:秒杀系统在开启瞬间,会有很多流量上来,很有可能把系统打死,预热方式就是为了把系统保护起来,可慢慢
的把流量放进来,慢慢的把阈值增长到设置的阈值
- 匀速排队:让请求以均匀的速度通过,阈值类型必须设成qps,否则无效。如/test每秒1次请求,超过的话就排队等待,等待的超时时间为2000毫秒
3 熔断降级:
- 当调用链路中某个资源出现不稳定,例如timeout,异常比例升高的时候,则对这个资源的调用进行限制,并让请求快速失败,避免影响到其它的资源,最终产生雪崩的效果
3.1 降级方式
- Hystrix 通过线程池的方式,来对依赖(在我们的概念中对应资源)进行了隔离。这样做的好处是资源和资源之间做到了最彻底的隔离。缺点是除了增加了线程切换的成本,还需要预先给各个资源做线程池大小的分配。
- Sentinel 对这个问题采取了两种手段:
- 通过并发线程数进行限制:和资源池隔离的方法不同,Sentinel 通过限制资源并发线程的数量,来减少不稳定资源对其它资源的影响。这样不但没有线程切换的损耗,也不需要您预先分配线程池的大小。当某个资源出现不稳定的情况下,例如响应时间变长,对资源的直接影响就是会造成线程数的逐步堆积。当线程数在特定资源上堆积到一定的数量之后,对该资源的新请求就会被拒绝。堆积的线程完成任务后才开始继续接收请求。
- 通过响应时间对资源进行降级:除了对并发线程数进行控制以外,Sentinel 还可以通过响应时间来快速降级不稳定的资源。当依赖的资源出现响应时间过长后,所有对该资源的访问都会被直接拒绝,直到过了指定的时间窗口之后才重新恢复。
- sentinel没有半开状态
3.2 降级策略
- RT(平均响应时间,秒级)
平均响应时间 超出阈值 且 在时间窗口内通过的请求>=5,两个条件同时满足后触发降级
窗口期过后关闭断路器
RT最大4900(更大的需要通过-Dcsp.sentinel.statistic.max.rt=XXXX才能生效
- 异常比列(秒级)
QPS >= 5 且异常比例(秒级统计)超过阈值时,触发降级;时间窗口结束后,关闭降级
- 异常数(分钟级)
异常数(分钟统计)超过阈值时,触发降级;时间窗口结束后,关闭降级
3 热点key
- 限流模式只支持QPS模式,固定写死了(这才叫热点)
- @SentinelResource注解的方法参数索引,0代表第一个参数,1代表第二个参数,以此类推单机阀值以及统计窗口时长表示在此窗口时间超过阀值就限流。如1秒的QPS为1,超过就限流,限流后调用blockHandler支持方法。
- 热点参数必须是基本类型或者string
4 自定义异常
@GetMapping("/rateLimit/customerBlockHandler")
@SentinelResource(value = "customerBlockHandler", blockHandlerClass = CustomerBlockHandler.class, blockHandler = "handlerException2")
public CommonResult customerBlockHandler() {
return new CommonResult(200,"按客戶自定义",new Payment(2020L,"serial003"));
}
public class CustomerBlockHandler {
public static CommonResult handlerException(BlockException exception) {
return new CommonResult(4444,"按客戶自定义,global handlerException----1");
}
public static CommonResult handlerException2(BlockException exception) {
return new CommonResult(4444,"按客戶自定义,global handlerException----2");
}
}
网友评论