美文网首页
brave+kafka+zipkin+cassandra搭建分布

brave+kafka+zipkin+cassandra搭建分布

作者: yingyingguigui | 来源:发表于2017-07-21 11:15 被阅读0次

    摘要: 上一篇博客,介绍的是brave直接通过http发送请求到zipkin,完成最简单的rpc跟踪,适合理解zipkin的跟踪链。这次来完成生产环境真实架构的部署。

    211759_mxWH_223522.png

    先来浏览一下架构图(下图的scribe采用kafka替代)

    首先我们完成brave+kafka+zipkin这一步先,这时候采用的存储是zipkin的内存。


    下载kafka后

    tar -xzf kafka_2.11-0.9.0.0.tgz

    cd kafka_2.11-0.9.0.0

    启动zookeeper:

    bin/zookeeper-server-start.sh config/zookeeper.properties

    (windows 启动 bin\windows\zookeeper-server-start.bat config\zookeeper.properties)

    启动kafka服务:

    bin/kafka-server-start.sh config/server.properties

    (windows 启动 bin\windows\kafka-server-start.bat config\server.properties)


    下载zipkin的jar包,

    wget -O zipkin.jar 'https://search.maven.org/remote_content?g=io.zipkin.java&a=zipkin-server&v=LATEST&c=exec'

    运行: java -jar zipkin.jar

    用解压工具可以看到zipkin.jar里的BOOT-INF\classes\zipkin-server-shared.yml文件,所有的配置都在这个文件里。

    改变配置里的值,可以通过java启动脚本里-D带入配置参数。

    现在我们通过kafka做数据通道的话就采用如下参数:

    java -DKAFKA_ZOOKEEPER=localhost:2181 -jar zipkin-server-1.19.2-exec.jar

    打开ui查看 http://localhost:9411/


    接着附上测试插入数据到kafka的代码。

    import com.github.kristofa.brave.*;
    import com.twitter.zipkin.gen.Endpoint;
    import zipkin.Span;
    import zipkin.reporter.AsyncReporter;
    import zipkin.reporter.kafka08.KafkaSender;
    
    import java.util.ArrayList;
    import java.util.Collection;
    import java.util.Random;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    
    public class BraveKafkaTest {
    
        private static Brave brave = null;
        private static Brave brave2 = null;
    
        private static void braveInit(){
    //        KafkaSender sender = KafkaSender.builder().bootstrapServers("localhost:9092").messageMaxBytes(10000).build();
            KafkaSender sender = KafkaSender.builder().bootstrapServers("localhost:9092").build();
            AsyncReporter<Span> report = AsyncReporter.builder(sender).build();
    
            brave = new Brave.Builder("appserver").reporter(report).build();
            brave2 = new Brave.Builder("datacenter").reporter(report).build();
        }
    
        static class Task {
            String name;
            SpanId spanId;
            public Task(String name, SpanId spanId) {
                super();
                this.name = name;
                this.spanId = spanId;
            }
        }
    
        public static void main(String[] args) throws Exception {
            braveInit();
    
            final BlockingQueue<Task> queue = new ArrayBlockingQueue<Task>(10);
            Thread thread = new Thread(){
                public void run() {
                    while (true) {
                        try {
                            Task task = queue.take();
                            dcHandle(task.name, task.spanId);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            };
            thread.start();
    
            for (int i = 0; i < 10; i++) {
                ServerRequestInterceptor serverRequestInterceptor = brave.serverRequestInterceptor();
                ServerResponseInterceptor serverResponseInterceptor = brave.serverResponseInterceptor();
                ClientRequestInterceptor clientRequestInterceptor = brave.clientRequestInterceptor();
                ClientResponseInterceptor clientResponseInterceptor = brave.clientResponseInterceptor();
    
                serverRequestInterceptor.handle(new ServerRequestAdapterImpl("group_data"));
    
                ClientRequestAdapterImpl clientRequestAdapterImpl = new ClientRequestAdapterImpl("get_radio_list");
                clientRequestInterceptor.handle(clientRequestAdapterImpl);
                queue.offer(new Task("get_radio_list", clientRequestAdapterImpl.getSpanId()));
                Thread.sleep(10);
                clientResponseInterceptor.handle(new ClientResponseAdapterImpl());
    
                clientRequestAdapterImpl = new ClientRequestAdapterImpl("get_user_list");
                clientRequestInterceptor.handle(clientRequestAdapterImpl);
                queue.offer(new Task("get_user_list", clientRequestAdapterImpl.getSpanId()));
                Thread.sleep(10);
                clientResponseInterceptor.handle(new ClientResponseAdapterImpl());
    
                clientRequestAdapterImpl = new ClientRequestAdapterImpl("get_program_list");
                clientRequestInterceptor.handle(clientRequestAdapterImpl);
                queue.offer(new Task("get_program_list", clientRequestAdapterImpl.getSpanId()));
                Thread.sleep(10);
                clientResponseInterceptor.handle(new ClientResponseAdapterImpl());
    
                serverResponseInterceptor.handle(new ServerResponseAdapterImpl());
                Thread.sleep(10);
            }
        }
    
        public static void dcHandle(String spanName, SpanId spanId){
            ServerRequestInterceptor serverRequestInterceptor = brave2.serverRequestInterceptor();
            ServerResponseInterceptor serverResponseInterceptor = brave2.serverResponseInterceptor();
            serverRequestInterceptor.handle(new ServerRequestAdapterImpl(spanName, spanId));
            serverResponseInterceptor.handle(new ServerResponseAdapterImpl());
        }
    
    
        static class ServerRequestAdapterImpl implements ServerRequestAdapter {
    
            Random randomGenerator = new Random();
            SpanId spanId;
            String spanName;
    
            ServerRequestAdapterImpl(String spanName){
                this.spanName = spanName;
                long startId = randomGenerator.nextLong();
                SpanId spanId = SpanId.builder().spanId(startId).traceId(startId).parentId(startId).build();
                this.spanId = spanId;
            }
    
            ServerRequestAdapterImpl(String spanName, SpanId spanId){
                this.spanName = spanName;
                this.spanId = spanId;
            }
    
            public TraceData getTraceData() {
                if (this.spanId != null) {
                    return TraceData.builder().spanId(this.spanId).build();
                }
                long startId = randomGenerator.nextLong();
                SpanId spanId = SpanId.builder().spanId(startId).traceId(startId).parentId(startId).build();
                return TraceData.builder().spanId(spanId).build();
            }
    
            public String getSpanName() {
                return spanName;
            }
    
            public Collection<KeyValueAnnotation> requestAnnotations() {
                Collection<KeyValueAnnotation> collection = new ArrayList<KeyValueAnnotation>();
                KeyValueAnnotation kv = KeyValueAnnotation.create("radioid", "165646485468486364");
                collection.add(kv);
                return collection;
            }
    
        }
    
        static class ServerResponseAdapterImpl implements ServerResponseAdapter {
    
            public Collection<KeyValueAnnotation> responseAnnotations() {
                Collection<KeyValueAnnotation> collection = new ArrayList<KeyValueAnnotation>();
                KeyValueAnnotation kv = KeyValueAnnotation.create("radioid", "165646485468486364");
                collection.add(kv);
                return collection;
            }
    
        }
    
        static class ClientRequestAdapterImpl implements ClientRequestAdapter {
    
            String spanName;
            SpanId spanId;
    
            ClientRequestAdapterImpl(String spanName){
                this.spanName = spanName;
            }
    
            public SpanId getSpanId() {
                return spanId;
            }
    
            public String getSpanName() {
                return this.spanName;
            }
    
            public void addSpanIdToRequest(SpanId spanId) {
                //记录传输到远程服务
                System.out.println(spanId);
                if (spanId != null) {
                    this.spanId = spanId;
                    System.out.println(String.format("trace_id=%s, parent_id=%s, span_id=%s", spanId.traceId, spanId.parentId, spanId.spanId));
                }
            }
    
            public Collection<KeyValueAnnotation> requestAnnotations() {
                Collection<KeyValueAnnotation> collection = new ArrayList<KeyValueAnnotation>();
                KeyValueAnnotation kv = KeyValueAnnotation.create("radioid", "165646485468486364");
                collection.add(kv);
                return collection;
            }
    
            public Endpoint serverAddress() {
                return null;
            }
    
        }
    
        static class ClientResponseAdapterImpl implements ClientResponseAdapter {
    
            public Collection<KeyValueAnnotation> responseAnnotations() {
                Collection<KeyValueAnnotation> collection = new ArrayList<KeyValueAnnotation>();
                KeyValueAnnotation kv = KeyValueAnnotation.create("radioname", "火星人1");
                collection.add(kv);
                return collection;
            }
    
        }
    }
    

    maven依赖:

    <dependencies>
            <dependency>
                <groupId>io.zipkin.brave</groupId>
                <artifactId>brave-core</artifactId>
                <version>3.17.0</version>
            </dependency>
            <dependency>
                <groupId>io.zipkin.brave</groupId>
                <artifactId>brave-spancollector-http</artifactId>
                <version>3.17.0</version>
            </dependency>
            <dependency>
                <groupId>com.google.auto.value</groupId>
                <artifactId>auto-value</artifactId>
                <version>1.3</version>
            </dependency>
            <dependency>
                <groupId>io.zipkin.brave</groupId>
                <artifactId>brave-spancollector-kafka</artifactId>
                <version>3.17.0</version>
            </dependency>
            <dependency>
                <groupId>io.zipkin.reporter</groupId>
                <artifactId>zipkin-sender-kafka08</artifactId>
                <version>0.4.3</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>1.7.2</version>
            </dependency>
            <dependency>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-access</artifactId>
                <version>1.1.3</version>
            </dependency>
            <dependency>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-classic</artifactId>
                <version>1.1.3</version>
            </dependency>
            <dependency>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-core</artifactId>
                <version>1.1.3</version>
            </dependency>
    
        </dependencies>
    

    运行完成后,插入了10条跟踪信息导kafka,打开 http://localhost:9411/ 即可看到效果。

    154606_vkkV_223522.png

    接下来,采用cassandra存储从kafka采集过来的数据。

    下载apache-cassandra-2.2.8-bin.tar.gz,解压后

    启动cassandra:bin\cassandra.bat

    重新运行zipkin(加入-DSTORAGE_TYPE=cassandra参数):

    java -DKAFKA_ZOOKEEPER=localhost:2181 -DSTORAGE_TYPE=cassandra -jar zipkin-server-1.19.2-exec.jar

    相关文章

      网友评论

          本文标题:brave+kafka+zipkin+cassandra搭建分布

          本文链接:https://www.haomeiwen.com/subject/kjdhkxtx.html