美文网首页
分布式链路追踪skywalking(一)-agent客户端

分布式链路追踪skywalking(一)-agent客户端

作者: 后来丶_a24d | 来源:发表于2022-02-14 22:40 被阅读0次

    思维导图

    思维导图.png

    分布式链路追踪skywalking系列


    前置知识

    背景

    • 随着微服务的划分,一次请求往往需要涉及到多个服务。不同微服务负责的团队也不同,所以需要一些可以帮助理解系统行为、用于分析性能问题的工具,以便发生故障的时候,能够快速定位和解决问题。全链路监控组件就在这样的问题背景下产生
    • 全链路性能监控 从整体维度到局部维度展示各项指标,将跨应用的所有调用链性能信息集中展现,可方便度量整体和局部性能,并且方便找到故障产生的源头,生产上可极大缩短故障排除时间

    Java agent

    • java agent本质上可以理解为一个插件,agent是一个精心提供的jar包,这个jar包通过JVMTI(JVM Tool Interface)完成加载,并对目标代码的修改
    • 可以无入侵的修改代码
    • 类似这种加启动参数方式来增强:
    -javaagent:***(增强功能的jar包).jar -jar ***(待增强的jar包).jar
    
    场景
    • 可以在加载java文件之前做拦截把字节码做修改
    • 可以在运行期将已经加载的类的字节码做变更
    原理
    • 启动时修改


      image.png
    • 可以看到jvm启动时,进行类加载会有hook钩子,调用InstrumentationImpl的loadClassAndCallPremain方法,在这个方法里会去调用javaagent里MANIFEST.MF里指定的Premain-Class类的premain方法
    • 运行时修改


      运行时修改.png
    • 可以看到运行时也是通过hook调用进行更改
    实战
    1. 引用bytebuddy(动态生成类,skywalking就是用这个框架实现)
     <dependencies>
            <!-- https://mvnrepository.com/artifact/net.bytebuddy/byte-buddy -->
            <dependency>
                <groupId>net.bytebuddy</groupId>
                <artifactId>byte-buddy</artifactId>
                <version>1.11.0</version>
            </dependency>
        </dependencies>
    
    1. 定义/resources/META-INF/MANIFEST.MF
    Manifest-Version: 1.0
    Can-Redefine-Classes: true
    Can-Retransform-Classes: true
    Premain-Class: com.seeger.demo.agent.Agent
    
    1. 定义premain
    
    public class Agent {
        // 需要增强的类,这个是开源elastic-job的核心类,用户可自定义其他类
        private static final String ENHANCE_CLASS = "com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor";
    
        public static void premain(String agentArgs, Instrumentation instrumentation) {
    
            new AgentBuilder.Default()
                    .type(ElementMatchers.named(ENHANCE_CLASS))
                    .transform((builder, type, classLoader, module) ->
                            builder.method(ElementMatchers.named("execute").and(ModifierReviewable.OfByteCodeElement::isPublic))
                                    .intercept(MethodDelegation.to(new DelegateTemplate(new DemoServiceInterceptor())))
                    ).installOn(instrumentation);
    
            System.out.println("一点不萌");
        }
    
    }
    
    • 增强处理
    public class DelegateTemplate {
        private InstMethodAroundInterceptor interceptor;
    
        public DelegateTemplate(InstMethodAroundInterceptor interceptor) {
            this.interceptor = interceptor;
        }
    
        /**
         * 拦截增强主方法
         *
         * @param inst:                              被拦截对象本身
         * @param allArguments:被代理方法原参数
         * @param zuper:被代理方法的包装对象,zuper.call()调用原方法
         * @param method:原方法对象
         * @return
         */
        public Object interceptor(@This Object inst, @AllArguments Object[] allArguments,
                                  @SuperCall Callable<?> zuper, @Origin Method method) {
            ResultWrapper rw = new ResultWrapper();
            if (this.interceptor != null) {
                try {
                    // 调用前拦截处理
                    this.interceptor.beforeMethod(inst, method,
                            allArguments, method.getParameterTypes(), rw);
                } catch (Throwable t) {
                    t.printStackTrace();
                }
            }
            
    
            Object result = null;
            try {
                // 被代理方法调用
                result = zuper.call();
                if (this.interceptor != null) {
                    try {
                        // 调用后拦截处理
                        result = this.interceptor.afterMethod(inst, method,
                                allArguments, method.getParameterTypes(), result);
                    } catch (Throwable t) {
                        t.printStackTrace();
                    }
                }
            } catch (Exception e) {
                if (this.interceptor != null) {
                    try {
                        // 调用异常拦截处理
                        this.interceptor.handleMethodException(inst, method,
                                allArguments, method.getParameterTypes(), e);
                    } catch (Throwable t) {
                        t.printStackTrace();
                    }
                }
            }
    
            return result;
        }
    
    }
    
    public class DemoServiceInterceptor implements InstMethodAroundInterceptor {
        @Override
        public void beforeMethod(Object inst, Method interceptPoint, Object[] allArguments,
                                 Class<?>[] argumentsTypes, ResultWrapper result) {
            System.out.println("DemoService Interceptor in haha ...");
        }
    
        @Override
        public Object afterMethod(Object inst, Method interceptPoint, Object[] allArguments,
                                  Class<?>[] argumentsTypes, Object ret) {
            System.out.println("DemoService Interceptor out haha ...");
            return ret;
        }
    
        @Override
        public void handleMethodException(Object inst, Method method, Object[] allArguments,
                                          Class<?>[] argumentsTypes, Throwable t) {
            System.out.println("DemoService Interceptor error handle ...");
        }
    }
    
    public interface InstMethodAroundInterceptor {
        /**
         * 拦截点前
         *
         * @param inst:                    被增强类实例
         * @param interceptPoint:被增强方法
         * @param allArguments:被增强方法入参
         * @param argumentsTypes:被增强方法入参类型
         * @param result:result            包装类
         */
        void beforeMethod(Object inst, Method interceptPoint,
                          Object[] allArguments, Class<?>[] argumentsTypes,
                          ResultWrapper result);
    
        Object afterMethod(Object inst, Method interceptPoint,
                           Object[] allArguments, Class<?>[] argumentsTypes,
                           Object ret);
    
        void handleMethodException(Object inst, Method method, Object[] allArguments,
                                   Class<?>[] argumentsTypes, Throwable t);
    }
    
    public class ResultWrapper {
        private boolean isContinue;
        private Object result;
    
        public boolean isContinue() {
            return isContinue;
        }
    
        public void setContinue(boolean aContinue) {
            isContinue = aContinue;
        }
    
        public Object getResult() {
            return result;
        }
    
        public void setResult(Object result) {
            this.result = result;
        }
    }
    
    1. 可在有elastic-job的demo项目加入启动项
    -javaagent:***(增强功能的jar包).jar -jar ***(待增强的jar包).jar
    
    1. 运行到elastic-job的job时就会无入侵式增强

    APM

    • APM (Application Performance Management) 即应用性能管理(监控), 最出名的是谷歌公开的论文提到的 Google Dapper
    • 一个请求的调用链


      请求的调用链.png
    • 各个模块定义:
    Span
    • Span是最最基本的单元,一次RPC或者数据库等调用都会创建Span, Span有uuid标识它,还有:
    1. 描述信息
    2. 时间戳
    3. Annotation的tag信息
    4. parent_id(可追溯用)
    • 典型图示: Frontend.Request请求过来span id是1,然后调用Backend.Call,span id是2,这时候parent id就是1。然后调Backend.Dosomething这时候span id是3,parent id是1。在继续两次调用Helper.Call同理。这样就能梳理起整个分布式链路调用


      典型图示.png
    Trace
    • 类似于 树结构的Span集合,表示一次完整的跟踪,从请求到服务器开始,服务器返回response结束,跟踪每次rpc调用的耗时,存在唯一标识trace_id
    • 图示可以看到每次调用都是相同的trace_id,以便整体的看整个调用链。每种颜色的note标注了一个span,一条链路通过TraceId唯一标识,Span标识发起的请求信息。树节点是整个架构的基本单元,而每一个节点又是对span的引用


      Trace.png
    Annotation
    • 用来记录请求特定事件相关信息(例如时间),一个span中会有多个annotation注解描述。key-value结构

    Agent启动流程

    • 流程图


      aw_agent_启动流程.png
    • skywalking采用微内核架构也被称为插件化架构,是一种面向功能进行拆分的可扩展性架构。内核功能是比较稳定的,只负责管理插件的生命周期,外部的插件不断变动不影响核心功能。美团到家的不少业务平台架构就是微内核架构,各个业务侧可插拔,不同业务侧数据隔离


      微内核框架.png
    • SkyWalking Agent 源码从SkyWalkingAgent.premain方法作为入口
    • 核心源码如下,省略了不必要代码
    public static void premain(String agentArgs, Instrumentation instrumentation) throws PluginException {
        // 步骤1、初始化配置信息
        SnifferConfigInitializer.initialize(agentArgs); 
        // 步骤2~4、查找并解析skywalking-plugin.def插件文件;
        // AgentClassLoader加载插件类并进行实例化;PluginFinder提供插件匹配的功能
        final PluginFinder pluginFinder = new PluginFinder(
           new PluginBootstrap().loadPlugins());
        // 步骤5、使用 Byte Buddy 库创建 AgentBuilder
        final ByteBuddy byteBuddy = new ByteBuddy()
           .with(TypeValidation.of(Config.Agent.IS_OPEN_DEBUGGING_CLASS));
        new AgentBuilder.Default(byteBuddy)...installOn(instrumentation);
        // 这里省略创建 AgentBuilder的具体代码,后面展开详细说
        // 步骤6、使用 JDK SPI加载的方式并启动 BootService 服务。
        ServiceManager.INSTANCE.boot();
        // 步骤7、添加一个JVM钩子
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
          public void run() { ServiceManager.INSTANCE.shutdown(); }
        }, "skywalking service shutdown thread"));
    }
    
    1. 步骤1初始化配置信息: 将配置信息解析到Config类中,Config类中有很多静态类


      初始化配置信息.png
    2. 步骤2~4、查找并解析skywalking-plugin.def插件文件,解析自定的apm-sniffer底下apm-sdk-plugin插件,以收集各个中间件,rpc, mq的监控信息,这里使用AgentClassLoader自定义的类加载器,方便将不在应用的 Classpath 中引入 SkyWalking 的插件 jar 包:


      插件.png
    3. 步骤5、使用 Byte Buddy 库创建 AgentBuilder
    4. 步骤6、使用 JDK SPI加载的方式并启动 BootService 服务,BootService的SPI实现有Jvm, Grpc,Kafka,还有将数据发送到server的BootService
    public void boot() {
            bootedServices = loadAllServices();
            // 准备
            prepare();
           // 开始
            startup();
           // 完成
            onComplete();
        }
    
    BootService插件.png
    1. 步骤7、添加一个JVM钩子

    Agent发送数据

    agent客户端与服务端连接

    • GRPCChannelManager 负责维护 Agent 与后端 OAP 集群通信时使用的网络连接。由BootService服务统一负责启动
    • Agent 启动过程中会依次调用 BootService 实现的 prepare() 方法 → boot() 方法 → onComplete() 方法之后
    • GRPCChannelManager 的 prepare() 方法 、onComplete() 方法都是空实现,在 boot() 方法中首先会解析 agent.config 配置文件指定的后端 OAP 实例地址初始化 grpcServers 字段,然后会初始化这个定时任务,初次会立即执行,之后每隔 30s 执行一次, boot方法
    public void boot() {
            grpcServers = Arrays.asList(Config.Collector.BACKEND_SERVICE.split(","));
            connectCheckFuture = Executors.newSingleThreadScheduledExecutor(
                new DefaultNamedThreadFactory("GRPCChannelManager")
            ).scheduleAtFixedRate(
                new RunnableWithExceptionProtection(
                    this,
                    t -> LOGGER.error("unexpected exception.", t)
                ), 0, Config.Collector.GRPC_CHANNEL_CHECK_INTERVAL, TimeUnit.SECONDS
            );
        }
    
    • run方法
    public void run() {
        if (reconnect && grpcServers.size() > 0) {
            // 根据配置,连接指定OAP实例的IP和端口
            managedChannel = GRPCChannel.newBuilder(ipAndPort[0], 
                    Integer.parseInt(ipAndPort[1]))
                .addManagedChannelBuilder(new StandardChannelBuilder())
                .addManagedChannelBuilder(new TLSChannelBuilder())
                .addChannelDecorator(new AuthenticationDecorator())
                .build();
            // notify()方法会循环调用所有注册在当前连接上的GRPCChannelListener实
            // 例(记录在listeners集合中)的statusChanged()方法,通知它们连接创建
            // 成功的事件
            notify(GRPCChannelStatus.CONNECTED);
            // 设置 reconnect字段为false,暂时不会再重建连接了
            reconnect = false;
        }
    }
    

    默认grpc异步发送

    • 连接注册心跳好了之后,发送JVM等监控数据时,默认采用grpc发送

    使用Kafka发送

    • 跟同步方式比,使用Kafka发送能提高吞吐量

    Agent自定义发送方式,比如RocketMQ

    • 可以根据公司采用Mq自定义RocketMQ插件,或自己公司自研的Mq框架

    DataCarrier

    • DataCarrier 是一个轻量级的生产者-消费者模式的实现库, SkyWalking Agent 在收集到 Trace 数据之后,会先写入到 DataCarrier 中的缓存,然后由后台线程定时发送到后端的 OAP


      DataCarrier.png
    • DataCarrier 底层使用多个定长数组作为存储缓冲区, 即Buffer类
    • Channels 底层管理了多个 Buffer 对象,提供了 IDataPartitioner 选择器用于确定一个数据元素写入到底层的哪个 Buffer 对象中,类似Kafka Producer 在发送数据时也会有相应的分区策略
    • DataCarrier 消费者的具体行为都定义在 IConsumer 接口之中
    • 每个 ConsumerThread 线程可以消费多个 DataSource,这里的 DataSource 是 Buffer 的一部分或是完整的 Buffer


      消费者.png

    学习总结

    Agent初始化

    • 初始化配置都放到配置Config, 利用ConfigInitializer.initialize反射进行初始化把配置一个个放进去,是一个相对抽象的比较好的地方。
    • 通过读取所有插件配置类,实现一一拦截,每个对应的插件都增加,遇到相关插件类,会走增强方法,比如JobExecutorInstrumentation -> ClassInstanceMethodsEnhancePluginDefine -> ClassEnhancePluginDefine的enhanceInstance抽出了公共方法,非常统一,所有插件一起用,不同插件有不同用法实现,从而达到增强目的

    发送trace的grpc

    trace.png
    • TraceSegmentServiceClient实现多个接口功能,但最终都是为了实现发送GRPC,DataCarrier是数据搬运工,起到门面设计功能。Channel里面对应多个Buffer, 多个buffer还是实现了不同分区策略,类似Kafka分区, Driver管理线程对应消费的Buffer。Buffer分为ArrayBlockingQueue实现和Skywalking自己实现的,自己实现是基于cas,在客户端用,ArrayBQ是JDK实现在服务端用

    参考文章

    相关文章

      网友评论

          本文标题:分布式链路追踪skywalking(一)-agent客户端

          本文链接:https://www.haomeiwen.com/subject/tquysltx.html