美文网首页
SpringCloud升级之路2020.0.x版-35. 验证线

SpringCloud升级之路2020.0.x版-35. 验证线

作者: 干货满满张哈希 | 来源:发表于2021-11-16 23:51 被阅读0次
    image

    本系列代码地址:https://github.com/JoJoTec/spring-cloud-parent

    上一节我们通过单元测试验证了重试的正确性,这一节我们来验证我们线程隔离的正确性,主要包括:

    1. 验证配置正确加载:即我们在 Spring 配置(例如 application.yml)中的加入的 Resilience4j 的配置被正确加载应用了。
    2. 相同微服务调用不同实例的时候,使用的是不同的线程(池)。

    验证配置正确加载

    与之前验证重试类似,我们可以定义不同的 FeignClient,之后检查 resilience4j 加载的线程隔离配置来验证线程隔离配置的正确加载。

    并且,与重试配置不同的是,通过系列前面的源码分析,我们知道 spring-cloud-openfeign 的 FeignClient 其实是懒加载的。所以我们实现的线程隔离也是懒加载的,需要先调用,之后才会初始化线程池。所以这里我们需要先进行调用之后,再验证线程池配置。

    首先定义两个 FeignClient,微服务分别是 testService1 和 testService2,contextId 分别是 testService1Client 和 testService2Client

    @FeignClient(name = "testService1", contextId = "testService1Client")
    public interface TestService1Client {
        @GetMapping("/anything")
        HttpBinAnythingResponse anything();
    }
    @FeignClient(name = "testService2", contextId = "testService2Client")
        public interface TestService2Client {
            @GetMapping("/anything")
            HttpBinAnythingResponse anything();
    }
    

    然后,我们增加 Spring 配置,并且给两个微服务都添加一个实例,使用 SpringExtension 编写单元测试类:

    //SpringExtension也包含了 Mockito 相关的 Extension,所以 @Mock 等注解也生效了
    @ExtendWith(SpringExtension.class)
    @SpringBootTest(properties = {
            //默认请求重试次数为 3
            "resilience4j.retry.configs.default.maxAttempts=3",
            // testService2Client 里面的所有方法请求重试次数为 2
            "resilience4j.retry.configs.testService2Client.maxAttempts=2",
            //默认线程池配置
            "resilience4j.thread-pool-bulkhead.configs.default.coreThreadPoolSize=10",
            "resilience4j.thread-pool-bulkhead.configs.default.maxThreadPoolSize=10",
            "resilience4j.thread-pool-bulkhead.configs.default.queueCapacity=1" ,
            //testService2Client 的线程池配置
            "resilience4j.thread-pool-bulkhead.configs.testService2Client.coreThreadPoolSize=5",
            "resilience4j.thread-pool-bulkhead.configs.testService2Client.maxThreadPoolSize=5",
            "resilience4j.thread-pool-bulkhead.configs.testService2Client.queueCapacity=1",
    })
    @Log4j2
    public class OpenFeignClientTest {
        @SpringBootApplication
        @Configuration
        public static class App {
            @Bean
            public DiscoveryClient discoveryClient() {
                //模拟两个服务实例
                ServiceInstance service1Instance1 = Mockito.spy(ServiceInstance.class);
                ServiceInstance service2Instance2 = Mockito.spy(ServiceInstance.class);
                Map<String, String> zone1 = Map.ofEntries(
                        Map.entry("zone", "zone1")
                );
                when(service1Instance1.getMetadata()).thenReturn(zone1);
                when(service1Instance1.getInstanceId()).thenReturn("service1Instance1");
                when(service1Instance1.getHost()).thenReturn("www.httpbin.org");
                when(service1Instance1.getPort()).thenReturn(80);
                when(service2Instance2.getInstanceId()).thenReturn("service1Instance2");
                when(service2Instance2.getHost()).thenReturn("httpbin.org");
                when(service2Instance2.getPort()).thenReturn(80);
                DiscoveryClient spy = Mockito.spy(DiscoveryClient.class);
                Mockito.when(spy.getInstances("testService1"))
                        .thenReturn(List.of(service1Instance1));
                Mockito.when(spy.getInstances("testService2"))
                        .thenReturn(List.of(service2Instance2));
                return spy;
            }
        }
    }
    

    编写测试代码,验证配置正确:

    @Test
    public void testConfigureThreadPool() {
        //防止断路器影响
        circuitBreakerRegistry.getAllCircuitBreakers().asJava().forEach(CircuitBreaker::reset);
        //调用下这两个 FeignClient 确保对应的 NamedContext 被初始化
        testService1Client.anything();
        testService2Client.anything();
        //验证线程隔离的实际配置,符合我们的填入的配置
        ThreadPoolBulkhead threadPoolBulkhead = threadPoolBulkheadRegistry.getAllBulkheads().asJava()
                .stream().filter(t -> t.getName().contains("service1Instance1")).findFirst().get();
        Assertions.assertEquals(threadPoolBulkhead.getBulkheadConfig().getCoreThreadPoolSize(), 10);
        Assertions.assertEquals(threadPoolBulkhead.getBulkheadConfig().getMaxThreadPoolSize(), 10);
        threadPoolBulkhead = threadPoolBulkheadRegistry.getAllBulkheads().asJava()
                .stream().filter(t -> t.getName().contains("service1Instance2")).findFirst().get();
        Assertions.assertEquals(threadPoolBulkhead.getBulkheadConfig().getCoreThreadPoolSize(), 5);
        Assertions.assertEquals(threadPoolBulkhead.getBulkheadConfig().getMaxThreadPoolSize(), 5);
    }
    

    相同微服务调用不同实例的时候,使用的是不同的线程(池)。

    我们需要确保,最后调用(也就是发送 http 请求)的执行的线程池,必须是对应的 ThreadPoolBulkHead 中的线程池。这个需要我们对 ApacheHttpClient 做切面实现,添加注解 @EnableAspectJAutoProxy(proxyTargetClass = true)

    //SpringExtension也包含了 Mockito 相关的 Extension,所以 @Mock 等注解也生效了
    @ExtendWith(SpringExtension.class)
    @SpringBootTest(properties = {
            //默认请求重试次数为 3
            "resilience4j.retry.configs.default.maxAttempts=3",
            // testService2Client 里面的所有方法请求重试次数为 2
            "resilience4j.retry.configs.testService2Client.maxAttempts=2",
            //默认线程池配置
            "resilience4j.thread-pool-bulkhead.configs.default.coreThreadPoolSize=10",
            "resilience4j.thread-pool-bulkhead.configs.default.maxThreadPoolSize=10",
            "resilience4j.thread-pool-bulkhead.configs.default.queueCapacity=1" ,
            //testService2Client 的线程池配置
            "resilience4j.thread-pool-bulkhead.configs.testService2Client.coreThreadPoolSize=5",
            "resilience4j.thread-pool-bulkhead.configs.testService2Client.maxThreadPoolSize=5",
            "resilience4j.thread-pool-bulkhead.configs.testService2Client.queueCapacity=1",
    })
    @Log4j2
    public class OpenFeignClientTest {
        @SpringBootApplication
        @Configuration
        @EnableAspectJAutoProxy(proxyTargetClass = true)
        public static class App {
            @Bean
            public DiscoveryClient discoveryClient() {
                //模拟两个服务实例
                ServiceInstance service1Instance1 = Mockito.spy(ServiceInstance.class);
                ServiceInstance service2Instance2 = Mockito.spy(ServiceInstance.class);
                Map<String, String> zone1 = Map.ofEntries(
                        Map.entry("zone", "zone1")
                );
                when(service1Instance1.getMetadata()).thenReturn(zone1);
                when(service1Instance1.getInstanceId()).thenReturn("service1Instance1");
                when(service1Instance1.getHost()).thenReturn("www.httpbin.org");
                when(service1Instance1.getPort()).thenReturn(80);
                when(service2Instance2.getInstanceId()).thenReturn("service1Instance2");
                when(service2Instance2.getHost()).thenReturn("httpbin.org");
                when(service2Instance2.getPort()).thenReturn(80);
                DiscoveryClient spy = Mockito.spy(DiscoveryClient.class);
                Mockito.when(spy.getInstances("testService1"))
                        .thenReturn(List.of(service1Instance1));
                Mockito.when(spy.getInstances("testService2"))
                        .thenReturn(List.of(service2Instance2));
                return spy;
            }
        }
    }
    

    拦截 ApacheHttpClientexecute 方法,这样可以拿到真正负责 http 调用的线程池,将线程其放入请求的 Header:

    @Aspect
    public static class ApacheHttpClientAop {
        //在最后一步 ApacheHttpClient 切面
        @Pointcut("execution(* com.github.jojotech.spring.cloud.webmvc.feign.ApacheHttpClient.execute(..))")
        public void annotationPointcut() {
        }
    
        @Around("annotationPointcut()")
        public Object around(ProceedingJoinPoint pjp) throws Throwable {
            //设置 Header,不能通过 Feign 的 RequestInterceptor,因为我们要拿到最后调用 ApacheHttpClient 的线程上下文
            Request request = (Request) pjp.getArgs()[0];
            Field headers = ReflectionUtils.findField(Request.class, "headers");
            ReflectionUtils.makeAccessible(headers);
            Map<String, Collection<String>> map = (Map<String, Collection<String>>) ReflectionUtils.getField(headers, request);
            HashMap<String, Collection<String>> stringCollectionHashMap = new HashMap<>(map);
            stringCollectionHashMap.put(THREAD_ID_HEADER, List.of(String.valueOf(Thread.currentThread().getName())));
            ReflectionUtils.setField(headers, request, stringCollectionHashMap);
            return pjp.proceed();
        }
    }
    

    这样,我们就能拿到具体承载请求的线程的名称,从名称中可以看出他所处于的线程池(格式为“bulkhead-线程隔离名称-n”,例如 bulkhead-testService1Client:www.httpbin.org:80-1),接下来我们就来看下不同的实例是否用了不同的线程池进行调用:

    @Test
    public void testDifferentThreadPoolForDifferentInstance() throws InterruptedException {
        //防止断路器影响
        circuitBreakerRegistry.getAllCircuitBreakers().asJava().forEach(CircuitBreaker::reset);
        Set<String> threadIds = Sets.newConcurrentHashSet();
        Thread[] threads = new Thread[100];
        //循环100次
        for (int i = 0; i < 100; i++) {
            threads[i] = new Thread(() -> {
                Span span = tracer.nextSpan();
                try (Tracer.SpanInScope cleared = tracer.withSpanInScope(span)) {
                    HttpBinAnythingResponse response = testService1Client.anything();
                    //因为 anything 会返回我们发送的请求实体的所有内容,所以我们能获取到请求的线程名称 header
                    String threadId = response.getHeaders().get(THREAD_ID_HEADER);
                    threadIds.add(threadId);
                }
            });
            threads[i].start();
        }
        for (int i = 0; i < 100; i++) {
            threads[i].join();
        }
        //确认实例 testService1Client:httpbin.org:80 线程池的线程存在
        Assertions.assertTrue(threadIds.stream().anyMatch(s -> s.contains("testService1Client:httpbin.org:80")));
        //确认实例 testService1Client:httpbin.org:80 线程池的线程存在
        Assertions.assertTrue(threadIds.stream().anyMatch(s -> s.contains("testService1Client:www.httpbin.org:80")));
    }
    

    这样,我们就成功验证了,实例调用的线程池隔离。

    相关文章

      网友评论

          本文标题:SpringCloud升级之路2020.0.x版-35. 验证线

          本文链接:https://www.haomeiwen.com/subject/iegktrtx.html