美文网首页
Skywalking:源码阅览

Skywalking:源码阅览

作者: 程序员王旺 | 来源:发表于2021-08-18 22:37 被阅读0次

    一、Compile

    编译源码请查看源码中的文档 How-to-build.md

    1. 首先确保你Maven的版本为3.6+,然后源码根目录执行下面命令

      mvn clean package -DskipTests
      
    1. 编译成功后,将工程导入到 IDEA中,因为里面用到了proto,默认情况下IDE是识别不到这些代码的,如果你使用的是 IDEA工具,请安装下面插件。

      确保target目录没有被隐藏,如果隐藏了,可通过 Setting - Editor - File Types 将 target 清除

    image-20210813162204334.png

    二、OAP

    OAP是SkyWalkingAgent 的服务端,启动一个基于H2存储简单的OAP服务器,运行 oap-server/server-starter中OAPServerStartUp类。

    可以看到OAPServerStartUp类又调用了一个 OAPServerBootstrap 的类,这个类才是真正的入口类。

    public class OAPServerStartUp {
        public static void main(String[] args) {
            OAPServerBootstrap.start();
        }
    }
    

    OAPServerBootstrap.start()方法主要工作就是解析application.yml文件,加载配置,初始化各个模块。

    public class OAPServerBootstrap {
        public static void start() {
            String mode = System.getProperty("mode");
            RunningMode.setMode(mode);
    
            ApplicationConfigLoader configLoader = new ApplicationConfigLoader();
            ModuleManager manager = new ModuleManager();
            try {
                // 加载 application.yml 中的配置
                ApplicationConfiguration applicationConfiguration = configLoader.load();
                // 加载所有 Module 的实现类
                manager.init(applicationConfiguration);
    
                // 加载 Prometheus指标收集器
                manager.find(TelemetryModule.NAME)
                       .provider()
                       .getService(MetricsCreator.class)
                       .createGauge("uptime", "oap server start up time", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE)
                       // Set uptime to second
                       .setValue(System.currentTimeMillis() / 1000d);
    
                if (RunningMode.isInitMode()) {
                    log.info("OAP starts up in init mode successfully, exit now...");
                    System.exit(0);
                }
            } catch (Throwable t) {
                log.error(t.getMessage(), t);
                System.exit(1);
            }
        }
    }
    

    上面的 manager.init() 方法,加载模块时,采用 Java SPI 机制,读取每个模块的 META-INF.services 下配置,获取实现类。

    注意:值为 -的模块不会被加载,例如下面模块将不会被加载,如果不注意的话会让你抓狂。详细请看我的文章 《Skywalking:定制化》node-exporter章节。

    prometheus-fetcher:
      selector: ${SW_PROMETHEUS_FETCHER:-}
    
    public void init(
        ApplicationConfiguration applicationConfiguration) throws ModuleNotFoundException, ProviderNotFoundException, ServiceNotProvidedException, CycleDependencyException, ModuleConfigException, ModuleStartException {
        // 获取所有可用模块列表
        String[] moduleNames = applicationConfiguration.moduleList();
        // 加载ModuleDefine的实现类
        ServiceLoader<ModuleDefine> moduleServiceLoader = ServiceLoader.load(ModuleDefine.class);
        // 加载ModuleProvider的实现类
        ServiceLoader<ModuleProvider> moduleProviderLoader = ServiceLoader.load(ModuleProvider.class);
    
        HashSet<String> moduleSet = new HashSet<>(Arrays.asList(moduleNames));
        for (ModuleDefine module : moduleServiceLoader) {
            if (moduleSet.contains(module.name())) {
                module.prepare(this, applicationConfiguration.getModuleConfiguration(module.name()), moduleProviderLoader);
                loadedModules.put(module.name(), module);
                moduleSet.remove(module.name());
            }
        }
        // Finish prepare stage
        isInPrepareStage = false;
    
        if (moduleSet.size() > 0) {
            throw new ModuleNotFoundException(moduleSet.toString() + " missing.");
        }
    
        BootstrapFlow bootstrapFlow = new BootstrapFlow(loadedModules);
    
        bootstrapFlow.start(this);
        bootstrapFlow.notifyAfterCompleted();
    }
    

    三、 Agent

    Agent 作为代理端插件 ,协助 OAP 收集应用的日志,作为一个字节码插件 skywalking-agent.jar 完成这个工作,上面提到,当业务应用端部署时除了这个jar包,还需依赖 agent.config配置和其他插件 jar包。

      public static void premain(String agentArgs, Instrumentation instrumentation) throws PluginException {
            final PluginFinder pluginFinder;
          
            //1.  初始化配置文件 agent.config,会在和skywalking-agent.jar同目录的config下查找
            try {
                SnifferConfigInitializer.initializeCoreConfig(agentArgs);
            } catch (Exception e) {
                // try to resolve a new logger, and use the new logger to write the error log here
                LogManager.getLogger(SkyWalkingAgent.class)
                        .error(e, "SkyWalking agent initialized failure. Shutting down.");
                return;
            } finally {
                // refresh logger again after initialization finishes
                LOGGER = LogManager.getLogger(SkyWalkingAgent.class);
            }
    
            //2. 第一步: 读取agent.config的plugin.mount参数,从plugins,activations两个目录下加载插件
            //   第二步: 每个插件下面都有一个skywalking-plugin.def文件(SPI模式),定义了此插件监控的各个点的类     
            try {
                pluginFinder = new PluginFinder(new PluginBootstrap().loadPlugins());
            } catch (AgentPackageNotFoundException ape) {
                LOGGER.error(ape, "Locate agent.jar failure. Shutting down.");
                return;
            } catch (Exception e) {
                LOGGER.error(e, "SkyWalking agent initialized failure. Shutting down.");
                return;
            }
    
            //3. 初始化Byte Buddy字节码工具,此工具是新一代的字节码工具,性能优于ASM、javasisst
            final ByteBuddy byteBuddy = new ByteBuddy().with(TypeValidation.of(Config.Agent.IS_OPEN_DEBUGGING_CLASS));
            AgentBuilder agentBuilder = new AgentBuilder.Default(byteBuddy).ignore(
                    nameStartsWith("net.bytebuddy.")
                            .or(nameStartsWith("org.slf4j."))
                            .or(nameStartsWith("org.groovy."))
                            .or(nameContains("javassist"))
                            .or(nameContains(".asm."))
                            .or(nameContains(".reflectasm."))
                            .or(nameStartsWith("sun.reflect"))
                            .or(allSkyWalkingAgentExcludeToolkit())
                            .or(ElementMatchers.isSynthetic()));
    
            JDK9ModuleExporter.EdgeClasses edgeClasses = new JDK9ModuleExporter.EdgeClasses();
            try {
                agentBuilder = BootstrapInstrumentBoost.inject(pluginFinder, instrumentation, agentBuilder, edgeClasses);
            } catch (Exception e) {
                LOGGER.error(e, "SkyWalking agent inject bootstrap instrumentation failure. Shutting down.");
                return;
            }
    
            try {
                agentBuilder = JDK9ModuleExporter.openReadEdge(instrumentation, agentBuilder, edgeClasses);
            } catch (Exception e) {
                LOGGER.error(e, "SkyWalking agent open read edge in JDK 9+ failure. Shutting down.");
                return;
            }
    
            if (Config.Agent.IS_CACHE_ENHANCED_CLASS) {
                try {
                    agentBuilder = agentBuilder.with(new CacheableTransformerDecorator(Config.Agent.CLASS_CACHE_MODE));
                    LOGGER.info("SkyWalking agent class cache [{}] activated.", Config.Agent.CLASS_CACHE_MODE);
                } catch (Exception e) {
                    LOGGER.error(e, "SkyWalking agent can't active class cache.");
                }
            }
    
            // 4. 初始化Agent工具
            agentBuilder.type(pluginFinder.buildMatch())
                        .transform(new Transformer(pluginFinder))
                        .with(AgentBuilder.RedefinitionStrategy.RETRANSFORMATION)
                        .with(new RedefinitionListener())
                        .with(new Listener())
                        .installOn(instrumentation);
    
            // 5. 启动所有插件服务,每个插件服务必须实现BootService接口类
            try {
                ServiceManager.INSTANCE.boot();
            } catch (Exception e) {
                LOGGER.error(e, "Skywalking agent boot failure.");
            }
    
            Runtime.getRuntime()
                    .addShutdownHook(new Thread(ServiceManager.INSTANCE::shutdown, "skywalking service shutdown thread"));
        }
    
    

    上面第5 步中 服务类,如果带@DefaultImplementor注解就是缺省的实现;如果带@OverrideImplementor注解,则会覆盖注解中的指定的某个实现类,从而产生一个新的实现类

    @OverrideImplementor(TraceSegmentServiceClient.class)
    public class KafkaTraceSegmentServiceClient implements BootService, IConsumer<TraceSegment>, TracingContextListener, KafkaConnectionStatusListener {
        private static final ILog LOGGER = LogManager.getLogger(KafkaTraceSegmentServiceClient.class);
    

    ServiceManager类,用来管理所有插件

     private Map<Class, BootService> loadAllServices() {
            Map<Class, BootService> bootedServices = new LinkedHashMap<>();
            List<BootService> allServices = new LinkedList<>();
            load(allServices);
            for (final BootService bootService : allServices) {
                Class<? extends BootService> bootServiceClass = bootService.getClass();
                boolean isDefaultImplementor = bootServiceClass.isAnnotationPresent(DefaultImplementor.class);
                if (isDefaultImplementor) {//有@DefaultImplementor注解
                    if (!bootedServices.containsKey(bootServiceClass)) {
                        bootedServices.put(bootServiceClass, bootService);
                    } else {
                        //ignore the default service
                    }
                } else {//有@OverrideImplementor注解
                    OverrideImplementor overrideImplementor = bootServiceClass.getAnnotation(OverrideImplementor.class);
                    if (overrideImplementor == null) {
                        if (!bootedServices.containsKey(bootServiceClass)) {
                            bootedServices.put(bootServiceClass, bootService);
                        } else {
                            throw new ServiceConflictException("Duplicate service define for :" + bootServiceClass);
                        }
                    }
    

    发送数据

    大概分三类发送数据的客户端:

    1. 服务类客户端,如ServiceManagementClient

    2. 跟踪类客户端,如TraceSegmentServiceClient

    3. 指标类客户端,如MeterSender。

    这三类客户端除前面讲的实现了BootService外,还实现了GRPCChannelListener接口,这个接口大概作用就是创建用于和服务端通信的通道。大概流程是在调用ServiceManager.boot()方法时,会调用每个服务的prepare()方法,收集通道监听器,然后遍历每个通道的statusChanged方法,初始化一个XxxServiceStub服务存根类,此类是由Protobuf序列化而成。

    //实现了 GRPCChannelListener 类的prepare()、statusChanged()方法
    @DefaultImplementor
    public class MeterSender implements BootService, GRPCChannelListener {
        private static final ILog LOGGER = LogManager.getLogger(MeterSender.class);
        private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;
        private volatile MeterReportServiceGrpc.MeterReportServiceStub meterReportServiceStub;
    
        @Override
        public void prepare() {
            ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);
        }
    
        public void send(Map<MeterId, BaseMeter> meterMap, MeterService meterService) {
            if (status == GRPCChannelStatus.CONNECTED) {
                StreamObserver<MeterData> reportStreamObserver = null;
                final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
                try {
                    reportStreamObserver = meterReportServiceStub.withDeadlineAfter(
                   ......
        }
                        
        @Override
        public void statusChanged(final GRPCChannelStatus status) {
            if (GRPCChannelStatus.CONNECTED.equals(status)) {
                Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
                meterReportServiceStub = MeterReportServiceGrpc.newStub(channel);
            } else {
                meterReportServiceStub = null;
            }
            this.status = status;
        }
    }
    

    Agent Instance
    什么是Agent Instance呢?每通过skywalking-agent.jar启动一个应用,就会在OAP服务上注册一个服务,而这时就会在Agent端产生一个 Agent实例,它负责向OAP上报各种信息。ServiceManagementClient类就是做这些工作的,除了发送应用跟踪信息外,还会向OAP发送心跳等信息。

    @DefaultImplementor
    public class ServiceManagementClient implements BootService, Runnable, GRPCChannelListener {
        private static final ILog LOGGER = LogManager.getLogger(ServiceManagementClient.class);
        private static List<KeyStringValuePair> SERVICE_INSTANCE_PROPERTIES;
    
        private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;
        private volatile ManagementServiceGrpc.ManagementServiceBlockingStub managementServiceBlockingStub;
        private volatile ScheduledFuture<?> heartbeatFuture;
        private volatile AtomicInteger sendPropertiesCounter = new AtomicInteger(0);
    
        @Override
        public void statusChanged(GRPCChannelStatus status) {
            if (GRPCChannelStatus.CONNECTED.equals(status)) {
                Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
                managementServiceBlockingStub = ManagementServiceGrpc.newBlockingStub(channel);
            } else {
                managementServiceBlockingStub = null;
            }
            this.status = status;
        }
    
        @Override
        public void prepare() {
            ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);
    
            SERVICE_INSTANCE_PROPERTIES = new ArrayList<>();
    
            for (String key : Config.Agent.INSTANCE_PROPERTIES.keySet()) {
                SERVICE_INSTANCE_PROPERTIES.add(KeyStringValuePair.newBuilder()
                                                                  .setKey(key)
                                                                  .setValue(Config.Agent.INSTANCE_PROPERTIES.get(key))
                                                                  .build());
            }
        }
    
        @Override
        public void boot() {
            heartbeatFuture = Executors.newSingleThreadScheduledExecutor(
                new DefaultNamedThreadFactory("ServiceManagementClient")
            ).scheduleAtFixedRate(
                new RunnableWithExceptionProtection(
                    this,
                    t -> LOGGER.error("unexpected exception.", t)
                ), 0, Config.Collector.HEARTBEAT_PERIOD,
                TimeUnit.SECONDS
            );
        }
    
        @Override
        public void onComplete() {
        }
    
        @Override
        public void shutdown() {
            heartbeatFuture.cancel(true);
        }
    
        @Override
        public void run() {
            LOGGER.debug("ServiceManagementClient running, status:{}.", status);
    
            if (GRPCChannelStatus.CONNECTED.equals(status)) {
                try {
                    if (managementServiceBlockingStub != null) {
                        if (Math.abs(sendPropertiesCounter.getAndAdd(1)) % Config.Collector.PROPERTIES_REPORT_PERIOD_FACTOR == 0) {
    
                            managementServiceBlockingStub
                                .withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS)
                                .reportInstanceProperties(InstanceProperties.newBuilder()
                                                                            .setService(Config.Agent.SERVICE_NAME)
                                                                            .setServiceInstance(Config.Agent.INSTANCE_NAME)
                                                                            .addAllProperties(OSUtil.buildOSInfo(
                                                                                Config.OsInfo.IPV4_LIST_SIZE))
                                                                            .addAllProperties(SERVICE_INSTANCE_PROPERTIES)
                                                                            .build());
                        } else {
                            final Commands commands = managementServiceBlockingStub.withDeadlineAfter(
                                GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
                            ).keepAlive(InstancePingPkg.newBuilder()
                                                       .setService(Config.Agent.SERVICE_NAME)
                                                       .setServiceInstance(Config.Agent.INSTANCE_NAME)
                                                       .build());
    
                            ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
                        }
                    }
                } catch (Throwable t) {
                    LOGGER.error(t, "ServiceManagementClient execute fail.");
                    ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(t);
                }
            }
        }
    }
    

    四、 Plugins

    Skywalking中有大量的插件,正是这些插件帮助Skywalking建立起一个庞大的可观察森林。每个插件都有两个必须的类:

    • 拦截点类(XxxInstrumentation),在那些方法上干活。
    • 拦截器类(XxxInterceptor),具体干活的类。

    拦截点

    下面以Dubbo插件为例,DubboInstrumentation拦截点实现了三个方法:enhanceClass(增强类)、getConstructorsInterceptPoints(拦截构造方法)、getInstanceMethodsInterceptPoints(拦截实例方法)

    public class DubboInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
        //被拦截的类,也就是需要增强的类
        private static final String ENHANCE_CLASS = "com.alibaba.dubbo.monitor.support.MonitorFilter";
        //拦截器类
        private static final String INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.dubbo.DubboInterceptor";
    
        @Override
        protected ClassMatch enhanceClass() {
            return NameMatch.byName(ENHANCE_CLASS);
        }
    
        @Override
        public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
            return null;
        }
    
        @Override
        public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
            return new InstanceMethodsInterceptPoint[] {
                new InstanceMethodsInterceptPoint() {
                    // 获取匹配到的拦截方法,WitnessClass
                    @Override
                    public ElementMatcher<MethodDescription> getMethodsMatcher() {
                        return named("invoke");
                    }
    
                    // 返回拦截器
                    @Override
                    public String getMethodsInterceptor() {
                        return INTERCEPT_CLASS;
                    }
    
                    // 是否要对原方法参数做修改
                    @Override
                    public boolean isOverrideArgs() {
                        return false;
                    }
                }
            };
        }
    }
    

    如何解决多版本问题

    当应用存在多个版本时,Skywalking为应用的每个版本都写一个相对应插件版本,例如针对 spring mvc不同版本有以下插件:

    spring3.x => mvc-annotation-3.x-plugin
    spring4.x => mvc-annotation-4.x-plugin
    spring5.x => mvc-annotation-5.x-plugin

    那么有个问题来了,比如我用spring4.x时,这时会把3个插件都加载上,如何值加载 mvc-annotation-4.x-plugin插件呢?这时用到了一个技术叫 WitnessClass,原理很简单,就是在每个插件中定义 WITHNESS_CLASSES变量,并定义一个能区分出版本独特的类。然后在应用启动时,在加载类集合里是否能找到WITHNESS_CLASSES变量定义的类,如果找对了,那么也找到了对应的版本的插件。

    拦截器

    DubboInterceptor用来定义对增强类做什么处理

    public class DubboInterceptor implements InstanceMethodsAroundInterceptor {
        /**
         * <h2>Consumer:</h2> The serialized trace context data will
         * inject to the {@link RpcContext#attachments} for transport to provider side.
         * <p>
         * <h2>Provider:</h2> The serialized trace context data will extract from
         * {@link RpcContext#attachments}. current trace segment will ref if the serialize context data is not null.
         */
        @Override
        public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
            MethodInterceptResult result) throws Throwable {
            Invoker invoker = (Invoker) allArguments[0];
            Invocation invocation = (Invocation) allArguments[1];
            RpcContext rpcContext = RpcContext.getContext();
            boolean isConsumer = rpcContext.isConsumerSide();
            URL requestURL = invoker.getUrl();
    
            AbstractSpan span;
    
            final String host = requestURL.getHost();
            final int port = requestURL.getPort();
            if (isConsumer) {
                final ContextCarrier contextCarrier = new ContextCarrier();
                span = ContextManager.createExitSpan(generateOperationName(requestURL, invocation), contextCarrier, host + ":" + port);
                //invocation.getAttachments().put("contextData", contextDataStr);
                //@see https://github.com/alibaba/dubbo/blob/dubbo-2.5.3/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/RpcInvocation.java#L154-L161
                CarrierItem next = contextCarrier.items();
                while (next.hasNext()) {
                    next = next.next();
                    rpcContext.getAttachments().put(next.getHeadKey(), next.getHeadValue());
                    if (invocation.getAttachments().containsKey(next.getHeadKey())) {
                        invocation.getAttachments().remove(next.getHeadKey());
                    }
                }
            } else {
                ContextCarrier contextCarrier = new ContextCarrier();
                CarrierItem next = contextCarrier.items();
                while (next.hasNext()) {
                    next = next.next();
                    next.setHeadValue(rpcContext.getAttachment(next.getHeadKey()));
                }
    
                span = ContextManager.createEntrySpan(generateOperationName(requestURL, invocation), contextCarrier);
            }
    
            Tags.URL.set(span, generateRequestURL(requestURL, invocation));
            span.setComponent(ComponentsDefine.DUBBO);
            SpanLayer.asRPCFramework(span);
        }
    
        @Override
        public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
            Object ret) throws Throwable {
            Result result = (Result) ret;
            if (result != null && result.getException() != null) {
                dealException(result.getException());
            }
    
            ContextManager.stopSpan();
            return ret;
        }
    
        @Override
        public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
            Class<?>[] argumentsTypes, Throwable t) {
            dealException(t);
        }
    
        /**
         * Log the throwable, which occurs in Dubbo RPC service.
         */
        private void dealException(Throwable throwable) {
            AbstractSpan span = ContextManager.activeSpan();
            span.log(throwable);
        }
    
        /**
         * Format operation name. e.g. org.apache.skywalking.apm.plugin.test.Test.test(String)
         *
         * @return operation name.
         */
        private String generateOperationName(URL requestURL, Invocation invocation) {
            StringBuilder operationName = new StringBuilder();
            String groupStr = requestURL.getParameter(Constants.GROUP_KEY);
            groupStr = StringUtil.isEmpty(groupStr) ? "" : groupStr + "/";
            operationName.append(groupStr);
            operationName.append(requestURL.getPath());
            operationName.append("." + invocation.getMethodName() + "(");
            for (Class<?> classes : invocation.getParameterTypes()) {
                operationName.append(classes.getSimpleName() + ",");
            }
    
            if (invocation.getParameterTypes().length > 0) {
                operationName.delete(operationName.length() - 1, operationName.length());
            }
    
            operationName.append(")");
    
            return operationName.toString();
        }
    
        /**
         * Format request url. e.g. dubbo://127.0.0.1:20880/org.apache.skywalking.apm.plugin.test.Test.test(String).
         *
         * @return request url.
         */
        private String generateRequestURL(URL url, Invocation invocation) {
            StringBuilder requestURL = new StringBuilder();
            requestURL.append(url.getProtocol() + "://");
            requestURL.append(url.getHost());
            requestURL.append(":" + url.getPort() + "/");
            requestURL.append(generateOperationName(url, invocation));
            return requestURL.toString();
        }
    }
    

    五、GraphQL

    GraphQL 是一种新的API 查询语言,由Facebook开源,对前端提供少许接口,就可以查询整个系统的数据,且可以实现按需返回。

    后台需要先定义一个 schema.graphqls的文件,下面定义了两个接口:

    • findAuthorById 返回 Author对象,其中这个对象又关联了 Book对象
    • saveBook 保存Book接口,它的输入参数由 input 来定义
    #查询接口
    type Query {
        findAuthorById(id:Long!): Author
    }
    
    # 更新接口
    type Mutation {
        saveBook(input: BookInput!) : Book!
    }
    
    # Author 对象
    type Author {
        #作者Id
        id: Long!
        #创建时间
        createdTime: String
        #名
        firstName: String
        #姓
        lastName: String
        #该作者的所有书籍
        books: [Book]
    }
    
    # 输入参数
    input BookInput {
        title: String!
        isbn: String!
        pageCount: Int
        authorId: Long
    }
    
    # Book对象
    type Book {
        id: Long!
        title: String!
        isbn: String!
        pageCount: Int
        author: Author
    }
    

    对于查询类接口和 更新类接口继承的父类是不一样的,比如查询是继承GraphQLQueryResolver,更新是继承GraphQLMutationResolver

    @Component
    @AllArgsConstructor
    public class Query implements GraphQLQueryResolver {
        private AuthorRepo authorRepo;
    
        public Author findAuthorById(Long id) {
            return authorRepo.findAuthorById(id);
        }
    }
    
    @Component
    @AllArgsConstructor
    public class Mutation implements GraphQLMutationResolver {
        private BookRepo bookRepo;
     
        public Book saveBook(BookInput input) {
            Book book = new Book();
            book.setTitle(input.getTitle());
            book.setIsbn(input.getIsbn());
            book.setPageCount(input.getPageCount());
            book.setAuthorId(input.getAuthorId());
            return bookRepo.save(book);
        }
    }
    

    后台定义好了GraphQL 接口,前端请求 http://x.x.x.x/graphql ,输入请求参数:

    {
      findAuthorById(id: 3){
         id
         firstName
         lastName
      }
    }
    

    在Skywalking中,有一个插件: query-graphql-plugin,就相当于一个graphql,支撑整个 UI的查询,访问前端 Web时,F12看下发现调用的都是 …/graphql 的接口。

    六、OpenTelemeTry

    在了解OpenTelemeTry之前,先说说 OpenTracing 和 OpenCensus

    OpenTracing:是CNCF的第三个项目,其目的是制定一套标准的分布式追踪协议,可谓是Tracing界的slf4j。

    OpenCensus: 除了OpenTracing 链路跟踪外,还有一类 Metrics监控指标也是经常用到的,例如cpu、内存、硬盘、网络请求延迟、错误率、用户数、访问数、订单数等各类指标。没错OpenCensus就是将链路跟踪和指标监控都囊括了。

    所谓分久必合,OpenTelemetry的横空出世,结束了监控界的纷争乱世,并以“可观察性”全新定义了监控技术,重塑了监控规范,实现了Metrics、Tracing、Logging 的大融合,俨然成了监控界的大佬。

    image-20210816162849338.png

    Skywalking收集 VM (操作系统)指标,没有自己实现,是通过 prometheus 的 node-exporter 完成的,并借助OpenTelemeTry 将Skywalking 和 prometheus 整合到一块。


    image.png

    otel-collector-config.yaml

    Receiver做为采集端,Exporter作为数据的输出端,Opentelemetry-collector充当一个中间媒介将两者统一起来

    receivers:
      prometheus:
        config:
          scrape_configs:
            - job_name: 'otel-collector'
              scrape_interval: 10s
              static_configs:
                - targets: [ 'vm-1:9100' ]
                - targets: [ 'vm-2:9100' ]
                - targets: [ 'vm-3:9100' ]
    
    processors:
      batch:
    
    exporters:
      opencensus:
        endpoint: "oap:11800" # The OAP Server address
        insecure: true
      # Exports data to the console  
      logging:
        logLevel: debug
    
    service:
      pipelines:
        metrics:
          receivers: [prometheus]
          processors: [batch]
          exporters: [opencensus,logging]
    
    
    image-20210817164057977.png

    在Skywalking中是通过 proto文件方式来集成opentelemetry的,当然你可以通过 集成opentelemetry sdk来接收node-exporter的数据。Skywalking重写了 MetricsServiceGrpc.MetricsServiceImplBase的exporer方法,此方法就是专门接收collector发过来的数据,只不过它转了一次转换,将OpenTelemetry 数据格式转换为了Skywalking的meter格式。

    @Slf4j
    public class OCMetricHandler extends MetricsServiceGrpc.MetricsServiceImplBase implements Handler {
    
        private List<PrometheusMetricConverter> metrics;
    
        @Override public StreamObserver<ExportMetricsServiceRequest> export(
            StreamObserver<ExportMetricsServiceResponse> responseObserver) {
            return new StreamObserver<ExportMetricsServiceRequest>() {
                private Node node;
                private Map<String, String> nodeLabels = new HashMap<>();
    
                @Override
                public void onNext(ExportMetricsServiceRequest request) {
                    ....
                }
    
                @Override public void onError(Throwable throwable) {
    
                }
    
                @Override public void onCompleted() {
                    responseObserver.onCompleted();
                }
            };
        }
    
        .....
    
        @Override public void active(List<String> enabledRules,
            MeterSystem service, GRPCHandlerRegister grpcHandlerRegister) {
            List<Rule> rules;
            try {
                rules = Rules.loadRules("otel-oc-rules", enabledRules);
            } catch (ModuleStartException e) {
                log.warn("failed to load otel-oc-rules");
                return;
            }
            if (rules.isEmpty()) {
                return;
            }
            this.metrics = rules.stream().map(r ->
                new PrometheusMetricConverter(r, service))
                .collect(toList());
            grpcHandlerRegister.addHandler(this);
        }
    }
    

    Application

    在Skywalking 源码工程中创建一个模块apm-webapp(已经存在的有问题,我就删掉了),然后把skywalking-ui工程下的dist目录拷贝到 resources下面,并改名为public,这样这个工程即可以访问 UI ,又能集成skywalking-agent.jar测试 。此工程已经上传到gitee上,请点击这里下载。

    在工程启动 VM 参数添加

    -javaagent:skywalking-agent\skywalking-agent.jar -Dskywalking.agent.service_name=webapp -Dskywalking.agent.instance_name=webapp  -Dskywalking.collector.backend_service=127.0.0.1:11800
    

    访问 UI

    http://localhost:8080/

    相关文章

      网友评论

          本文标题:Skywalking:源码阅览

          本文链接:https://www.haomeiwen.com/subject/tpmnbltx.html