美文网首页
Zipkin部署

Zipkin部署

作者: neilspears | 来源:发表于2017-11-30 09:59 被阅读0次

    背景

    系统中已存在以下三种类型的应用:

    - 采用Spring boot开发
    - 采用传统的Spring Web开发
    - 采用CXF开发
    

    随着系统复杂性增多,现在需要引入Zipkin来监测系统中各节点的链路耗时。

    功能特性

    1. 采用Kafka作为数据收集器
      • 支持Sleuth采集方式
      • 支持Brave采集方式
    2. 采用ElasticSearch作为数据源

    搭建Zipkin服务器

    1. maven 依赖
    <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-actuator</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-binder-kafka</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-sleuth-zipkin-stream</artifactId>
            </dependency>
            <dependency>
                <groupId>io.zipkin.java</groupId>
                <artifactId>zipkin-autoconfigure-ui</artifactId>
                <version>${zipkin.version}</version>
            </dependency>
            <dependency>
                <groupId>io.zipkin.java</groupId>
                <artifactId>zipkin-collector-kafka10</artifactId>
                <version>${zipkin.version}</version>
            </dependency>
            <dependency>
                <groupId>io.zipkin.java</groupId>
                <artifactId>zipkin</artifactId>
                <version>${zipkin.version}</version>
            </dependency>
            <dependency>
                <groupId>io.zipkin.java</groupId>
                <artifactId>zipkin-server</artifactId>
                <version>${zipkin.version}</version>
            </dependency>
            <dependency>
                <groupId>io.zipkin.java</groupId>
                <artifactId>zipkin-autoconfigure-storage-elasticsearch-http</artifactId>
                <version>${zipkin.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
                <version>${zookeeper.version}</version>
            </dependency>
    
    1. Zipkin应用实例
    @EnableZipkinStreamServer
    @SpringBootApplication
    public class Application {
    
      @Autowired
      private StorageComponent storageComponent;
    
      @Autowired
      private BraveKafka braveKafka;
    
      @Bean
      public KafkaCollector kafka() {
        return KafkaCollector.builder()
            .bootstrapServers(braveKafka.getBootstrapServers())
            .topic(braveKafka.getTopic()).storage(storageComponent).build().start();
      }
    
      public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
      }
    }
    

    部署配置

    spring:
      application:
        name: qianfan-zipkin
      cloud:
        stream:
          kafka:
            binder:
              brokers: pgpool.hd.com:19092,pgdb1.hd.com:19092,pgdb2.hd.com:19092 #Sleuth方式收集
              zkNodes: pgpool.hd.com:12181,pgdb1.hd.com:12181,pgdb2.hd.com:12181
          bindings:
            sleuth:
              destination: sleuth-test #Sleuth方式收集
    braveKafka:
      bootstrapServers: ${spring.cloud.stream.kafka.binder.brokers}
      topic: sleuth-test2 #Brave方式收集
    zipkin:
      storage:
        elasticsearch:
          cluster: xxxxxx
          index: xxxxx
          hosts: 118.xxx.xx.xx:9200
          username: xxxxx
          password: xxxxxxx
    

    应用接入

    Spring Boot接入

    1. 在maven中增加依赖
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-sleuth</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-sleuth-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
        </dependency>
    
    1. 在application.yml中增加以下配置
    spring:
      cloud:
        stream:
          kafka:
            binder:
              brokers: pgpool.hd.com:19092,pgdb1.hd.com:19092,pgdb2.hd.com:19092
              zkNodes: pgpool.hd.com:12181,pgdb1.hd.com:12181,pgdb2.hd.com:12181
          bindings:
            sleuth:
              destination: sleuth-test
    logging:
      level:
        org.springframework.web: DEBUG
    
    1. 建议应用本身不再指定日志输出格式,默认使用sleuth的日志格式,与zipkin统一。

    Spring Web与CFX接入

    理论上CXF可以直接使用Brave接入,但是官方最新的Brave只支持jax-rs版本为2.0.1且要求JDK>=1.8。而我本地的JDK=1.7,jax-rs版本为2.0-m10,所以采取了折衷方案,只使用brave jaxrs2中的两个过滤器。

    1. maven中新增:
          <properties>  
            <brave.version>4.7.2</brave.version>
            <zipkin-reporter.version>1.1.2</zipkin-reporter.version>
          </properties>
          
          <dependency>
            <groupId>io.zipkin.brave</groupId>
            <artifactId>brave</artifactId>
            <version>${brave.version}</version>
          </dependency>
          <dependency>
            <groupId>io.zipkin.reporter</groupId>
            <artifactId>zipkin-reporter</artifactId>
            <version>${zipkin-reporter.version}</version>
          </dependency>
          <dependency>
            <groupId>io.zipkin.reporter</groupId>
            <artifactId>zipkin-sender-kafka10</artifactId>
            <version>${zipkin-reporter.version}</version>
          </dependency>
          <dependency>
            <groupId>io.zipkin.brave</groupId>
            <artifactId>brave-context-log4j2</artifactId>
            <version>${brave.version}</version>
          </dependency>
          
          <!-- 拦截spring web必须 -->
          <dependency>
            <groupId>io.zipkin.brave</groupId>
            <artifactId>brave-instrumentation-spring-web</artifactId>
            <version>${brave.version}</version>
          </dependency>
          <dependency>
            <groupId>io.zipkin.brave</groupId>
            <artifactId>brave-instrumentation-spring-webmvc</artifactId>
            <version>${brave.version}</version>
          </dependency>
          
          
          <!-- 拦截jax-rs必须 -->
          <dependency>
            <groupId>io.zipkin.brave</groupId>
            <artifactId>brave-instrumentation-jaxrs2</artifactId>
            <version>${brave.version}</version>
          </dependency>
    
    1. 引入配置定义
    import brave.Tracing;
    import brave.context.log4j2.ThreadContextCurrentTraceContext;
    import brave.http.HttpTracing;
    import brave.sampler.Sampler;
    import brave.spring.web.TracingClientHttpRequestInterceptor;
    import brave.spring.webmvc.TracingHandlerInterceptor;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.http.client.ClientHttpRequestInterceptor;
    import org.springframework.web.client.RestTemplate;
    import zipkin.Span;
    import zipkin.reporter.AsyncReporter;
    import zipkin.reporter.Sender;
    import zipkin.reporter.kafka10.KafkaSender;
    import javax.annotation.PostConstruct;
    import java.util.ArrayList;
    import java.util.List;
    @Configuration
    public class WebTracingConfiguration {
        @Bean
        Sender sender() {
            return KafkaSender.builder().bootstrapServers("pgpool.hd.com:19092,pgdb1.hd.com:19092,pgdb2.hd.com:19092").topic("sleuth-test2").build();
        }
    
        /**
         * 用什么方式显示span信息
         */
        @Bean
        AsyncReporter<Span> spanReporter() {
            return AsyncReporter.create(sender());
        }
    
        @Bean
        Tracing tracing() {
            return Tracing.newBuilder()
                    .localServiceName("dpos-web").reporter(spanReporter())
                    .currentTraceContext(ThreadContextCurrentTraceContext.create()) // puts trace IDs into logs
                    .sampler(Sampler.create(1f)).build();
        }
    
        // decides how to name and tag spans. By default they are named the same as the http method.
        @Bean
        HttpTracing httpTracing(Tracing tracing) {
            return HttpTracing.create(tracing);
        }
    
        // 拦截spring web
        @Bean
        TracingHandlerInterceptor serverInterceptor(HttpTracing tracing) {
            return (TracingHandlerInterceptor) TracingHandlerInterceptor.create(tracing);
        }
        
        // 拦截jax-rs 服务端
        @Bean(name = "tracingContainerFilter")
        TracingContainerFilter tracingContainerFilter(HttpTracing tracing) {
            return new TracingContainerFilter(tracing);
        }
        
        // 拦截jax-rs 客户端
        @Bean(name = "tracingClientFilter")
        TracingClientFilter tracingClientFilter(HttpTracing tracing) {
            return new TracingClientFilter(tracing);
        }
        
    
        // 拦截spring template
        @Bean
        TracingClientHttpRequestInterceptor clientInterceptor(HttpTracing tracing) {
            return (TracingClientHttpRequestInterceptor) TracingClientHttpRequestInterceptor.create(tracing);
        }
        
        @Autowired
        private RestTemplate restTemplate;
    
        /**
         * adds tracing to the application-defined rest template
         */
        @PostConstruct
        public void init() {
            List<ClientHttpRequestInterceptor> interceptors =
                    new ArrayList<>(restTemplate.getInterceptors());
            interceptors.add(clientInterceptor(httpTracing(tracing())));
            restTemplate.setInterceptors(interceptors);
        }
    }
    
    1. 在spring web xml中加入拦载
        <mvc:interceptors>
            <bean class="brave.spring.webmvc.TracingHandlerInterceptor"/>
        </mvc:interceptors>
    
    1. 在jax-rs服务中加入拦载
        <jaxrs:server id="dpos-auth-service.rsServices" address="/">
            <jaxrs:serviceBeans>
            <bean class="com.hd123.dpos.auth.rs.service.notification.RSSubscriptionConfigServiceImpl"
                p:service-ref="subscriptionConfigServiceImpl"
                p:codecBean-ref="dpos-auth-service.codecBean" />
            </jaxrs:serviceBeans>
            <jaxrs:providers>
                <ref bean="tracingContainerFilter" /> <!-- 拦截 -->
            </jaxrs:providers>
        </jaxrs:server>
    
    1. 在jax-rs客户端中加入拦截
      <jaxrs-client:client id="dpos-auth-api.rs.shopService"
        address="${dpos-auth-api.server.url:http://localhost:8080/dpos-auth-web}"
        serviceClass="com.hd123.dpos.auth.rs.api.shop.RSShopService">
        <jaxrs-client:providers>
          <ref bean="dpos-auth-api.jsonProvider" />
          <ref bean="tracingClientFilter"/> <!-- 拦截 -->
        </jaxrs-client:providers>
      </jaxrs-client:client>
    
    1. 实现jax-rs过滤器,来自官方的brave jaxrs2包中。
    import brave.Span;
    import brave.Tracer;
    import brave.Tracer.SpanInScope;
    import brave.http.HttpClientHandler;
    import brave.http.HttpTracing;
    import brave.propagation.Propagation;
    import brave.propagation.TraceContext;
    import javax.ws.rs.ConstrainedTo;
    import javax.ws.rs.client.ClientRequestContext;
    import javax.ws.rs.client.ClientRequestFilter;
    import javax.ws.rs.client.ClientResponseContext;
    import javax.ws.rs.client.ClientResponseFilter;
    import javax.ws.rs.core.MultivaluedMap;
    import javax.ws.rs.ext.Provider;
    import static javax.ws.rs.ConstrainedTo.Type.CLIENT;
    @Provider
    @ConstrainedTo(CLIENT)
    public class TracingClientFilter implements ClientRequestFilter, ClientResponseFilter {
        static final Propagation.Setter<MultivaluedMap, String> SETTER = new Propagation.Setter<MultivaluedMap, String>() {
            @Override
            public void put(MultivaluedMap carrier, String key, String value) {
                carrier.putSingle(key, value);
            }
        };
    
        final Tracer tracer;
        final HttpClientHandler<ClientRequestContext, ClientResponseContext> handler;
        final TraceContext.Injector<MultivaluedMap> injector;
    
        public TracingClientFilter(HttpTracing httpTracing) {
            if (httpTracing == null) throw new NullPointerException("HttpTracing == null");
            tracer = httpTracing.tracing().tracer();
            handler = HttpClientHandler.create(httpTracing, new HttpAdapter());
            injector = httpTracing.tracing().propagation().injector(SETTER);
        }
    
        @Override
        public void filter(ClientRequestContext request) {
            Span span = handler.handleSend(injector, request.getHeaders(), request);
            request.setProperty(SpanInScope.class.getName(), tracer.withSpanInScope(span));
        }
    
        @Override
        public void filter(ClientRequestContext request, ClientResponseContext response) {
            Span span = tracer.currentSpan();
            if (span == null) return;
            ((SpanInScope) request.getProperty(SpanInScope.class.getName())).close();
            handler.handleReceive(response, null, span);
        }
    
        static final class HttpAdapter
                extends brave.http.HttpClientAdapter<ClientRequestContext, ClientResponseContext> {
    
            @Override
            public String method(ClientRequestContext request) {
                return request.getMethod();
            }
    
            @Override
            public String path(ClientRequestContext request) {
                return request.getUri().getPath();
            }
    
            @Override
            public String url(ClientRequestContext request) {
                return request.getUri().toString();
            }
    
            @Override
            public String requestHeader(ClientRequestContext request, String name) {
                return request.getHeaderString(name);
            }
    
            @Override
            public Integer statusCode(ClientResponseContext response) {
                return response.getStatus();
            }
        }
    }
    
    import brave.Span;
    import brave.Tracer;
    import brave.http.HttpServerHandler;
    import brave.http.HttpTracing;
    import brave.jaxrs2.ContainerAdapter;
    import brave.propagation.Propagation;
    import brave.propagation.TraceContext;
    import javax.ws.rs.ConstrainedTo;
    import javax.ws.rs.container.*;
    import javax.ws.rs.core.Context;
    import javax.ws.rs.ext.Provider;
    import java.lang.annotation.Annotation;
    import static javax.ws.rs.ConstrainedTo.Type.SERVER;
    @Provider
    @ConstrainedTo(SERVER)
    public class TracingContainerFilter implements ContainerRequestFilter, ContainerResponseFilter {
    
        final Tracer tracer;
        final HttpServerHandler<ContainerRequestContext, ContainerResponseContext> handler;
        final TraceContext.Extractor<ContainerRequestContext> extractor;
    
        public TracingContainerFilter(HttpTracing httpTracing) {
            tracer = httpTracing.tracing().tracer();
            handler = HttpServerHandler.create(httpTracing, new ContainerAdapter());
            extractor = httpTracing.tracing().propagation()
                    .extractor(new Propagation.Getter<ContainerRequestContext, String>() {
                        @Override
                        public String get(ContainerRequestContext carrier, String key) {
                            return carrier.getHeaderString(key);
                        }
                    });
        }
    
        /**
         * This implementation peeks to see if the request is async or not, which means {@link
         * PreMatching} cannot be used: pre-matching doesn't inject the resource info!
         */
        @Context
        ResourceInfo resourceInfo;
    
        @Override
        public void filter(ContainerRequestContext request) {
            if (resourceInfo != null) request.setProperty(ResourceInfo.class.getName(), resourceInfo);
            Span span = handler.handleReceive(extractor, request);
            request.removeProperty(ResourceInfo.class.getName());
            if (shouldPutSpanInScope(resourceInfo)) {
                request.setProperty(Tracer.SpanInScope.class.getName(), tracer.withSpanInScope(span));
            } else {
                request.setProperty(Span.class.getName(), span);
            }
        }
    
        @Override
        public void filter(ContainerRequestContext request, ContainerResponseContext response) {
            Span span = (Span) request.getProperty(Span.class.getName());
            Tracer.SpanInScope spanInScope = (Tracer.SpanInScope) request.getProperty(Tracer.SpanInScope.class.getName());
            if (span != null) { // asynchronous response or we couldn't figure it out
            } else if (spanInScope != null) { // synchronous response
                span = tracer.currentSpan();
                spanInScope.close();
            } else if (response.getStatus() == 404) {
                span = handler.handleReceive(extractor, request);
            } else {
                return; // unknown state
            }
            handler.handleSend(response, null, span);
        }
    
        /**
         * We shouldn't put a span in scope unless we know for sure the request is not async. That's
         * because we cannot detach if from the calling thread when async is used.
         */
        // TODO: add benchmark and cache if slow
        static boolean shouldPutSpanInScope(ResourceInfo resourceInfo) {
            if (resourceInfo == null) return false;
            for (Annotation[] annotations : resourceInfo.getResourceMethod().getParameterAnnotations()) {
                for (Annotation annotation : annotations) {
                    if (annotation.annotationType().equals(Suspended.class)) {
                        return false;
                    }
                }
            }
            return true;
        }
    }
    

    相关文章

      网友评论

          本文标题:Zipkin部署

          本文链接:https://www.haomeiwen.com/subject/jxnmbxtx.html