美文网首页
Skywalking-09:OAL原理——如何通过动态生成的Cl

Skywalking-09:OAL原理——如何通过动态生成的Cl

作者: Switch_vov | 来源:发表于2021-08-23 15:31 被阅读0次

    OAL 如何通过动态生成的 Class 类,保存数据

    前置工作

    OAL 如何将动态生成的 SourceDispatcher 添加到 DispatcherManager

        // org.apache.skywalking.oap.server.core.oal.rt.OALEngineLoaderService#load
        public void load(OALDefine define) throws ModuleStartException {
            if (oalDefineSet.contains(define)) {
                // each oal define will only be activated once
                return;
            }
            try {
                OALEngine engine = loadOALEngine(define);
                // 设置Stream注解监听器,用来处理org.apache.skywalking.oap.server.core.analysis.Stream注解
                StreamAnnotationListener streamAnnotationListener = new StreamAnnotationListener(moduleManager);
                engine.setStreamListener(streamAnnotationListener);
                
                // org.apache.skywalking.oap.server.core.source.SourceReceiverImpl#getDispatcherDetectorListener
                // 获取的就是org.apache.skywalking.oap.server.core.analysis.DispatcherManager对象
                engine.setDispatcherListener(moduleManager.find(CoreModule.NAME)
                                                          .provider()
                                                          .getService(SourceReceiver.class)
                                                          .getDispatcherDetectorListener());
                
                // 调用的就是 org.apache.skywalking.oal.rt.OALRuntime#start
                engine.start(OALEngineLoaderService.class.getClassLoader());
                
                // 通知所有的监听器
                engine.notifyAllListeners();
    
                oalDefineSet.add(define);
            } catch (ReflectiveOperationException | OALCompileException e) {
                throw new ModuleStartException(e.getMessage(), e);
            }
        }
    

    org.apache.skywalking.oap.server.core.oal.rt.OALEngineLoaderService#load 方法做了如下操作:

    1. 设置 Stream 注解监听器,用来获取指标类的基本信息,并进行相应处理
    @Stream(
        name = "instance_jvm_class_loaded_class_count",
        scopeId = 11000,
        builder = InstanceJvmClassLoadedClassCountMetricsBuilder.class,
        processor = MetricsStreamProcessor.class
    )
    public class InstanceJvmClassLoadedClassCountMetrics extends LongAvgMetrics implements WithMetadata {
        // 省略
    }
    
    1. 通过模块管理器,先获取到 SourceReceiver 对象,借由此对象获取到 DispatcherManager 对象
    public class SourceReceiverImpl implements SourceReceiver {
        @Getter
        private final DispatcherManager dispatcherManager;
    
        @Override
        public DispatcherDetectorListener getDispatcherDetectorListener() {
            return getDispatcherManager();
        }
    }
    
    1. 启动 OAL 引擎
    2. 通知所有的监听器

    org.apache.skywalking.oal.rt.OALRuntime#notifyAllListeners

        @Override
        public void notifyAllListeners() throws ModuleStartException {
            for (Class metricsClass : metricsClasses) {
                try {
                    // 将动态生成的Metrics添加到MetricsStreamProcessor
                    streamAnnotationListener.notify(metricsClass);
                } catch (StorageException e) {
                    throw new ModuleStartException(e.getMessage(), e);
                }
            }
            for (Class dispatcherClass : dispatcherClasses) {
                try {
                    // 添加动态生成的SourceDispatch至DispatcherManager
                    dispatcherDetectorListener.addIfAsSourceDispatcher(dispatcherClass);
                } catch (Exception e) {
                    throw new ModuleStartException(e.getMessage(), e);
                }
            }
        }
    

    org.apache.skywalking.oap.server.core.analysis.DispatcherManager#addIfAsSourceDispatcher

        @Override
        public void addIfAsSourceDispatcher(Class aClass) throws IllegalAccessException, InstantiationException {
            if (!aClass.isInterface() && !Modifier.isAbstract(
                aClass.getModifiers()) && SourceDispatcher.class.isAssignableFrom(aClass)) {
                Type[] genericInterfaces = aClass.getGenericInterfaces();
                for (Type genericInterface : genericInterfaces) {
                    ParameterizedType anInterface = (ParameterizedType) genericInterface;
                    if (anInterface.getRawType().getTypeName().equals(SourceDispatcher.class.getName())) {
                        Type[] arguments = anInterface.getActualTypeArguments();
    
                        if (arguments.length != 1) {
                            throw new UnexpectedException("unexpected type argument number, class " + aClass.getName());
                        }
                        Type argument = arguments[0];
    
                        Object source = ((Class) argument).newInstance();
    
                        if (!Source.class.isAssignableFrom(source.getClass())) {
                            throw new UnexpectedException(
                                "unexpected type argument of class " + aClass.getName() + ", should be `org.apache.skywalking.oap.server.core.source.Source`. ");
                        }
    
                        Source dispatcherSource = (Source) source;
                        SourceDispatcher dispatcher = (SourceDispatcher) aClass.newInstance();
    
                        int scopeId = dispatcherSource.scope();
                        
                        // 使用scope做SourceDispatcher Map的key
                        List<SourceDispatcher> dispatchers = this.dispatcherMap.get(scopeId);
                        if (dispatchers == null) {
                            dispatchers = new ArrayList<>();
                            this.dispatcherMap.put(scopeId, dispatchers);
                        }
                        // 添加
                        dispatchers.add(dispatcher);
    
                        LOGGER.info("Dispatcher {} is added into DefaultScopeDefine {}.", dispatcher.getClass()
                                                                                                    .getName(), scopeId);
                    }
                }
            }
        }
    

    OAL 如何将动态生成的 Metrics 添加到 MetricsStreamProcessor

    与“ OAL 如何将动态生成的 SourceDispatcher 添加到 DispatcherManager ”流程基本一致,都是在 org.apache.skywalking.oal.rt.OALRuntime#notifyAllListeners 方法中处理的

        @Override
        public void notifyAllListeners() throws ModuleStartException {
            for (Class metricsClass : metricsClasses) {
                try {
                    // 将动态生成的Metrics添加到MetricsStreamProcessor
                    streamAnnotationListener.notify(metricsClass);
                } catch (StorageException e) {
                    throw new ModuleStartException(e.getMessage(), e);
                }
            }
            for (Class dispatcherClass : dispatcherClasses) {
                try {
                    // 添加动态生成的SourceDispatch至DispatcherManager
                    dispatcherDetectorListener.addIfAsSourceDispatcher(dispatcherClass);
                } catch (Exception e) {
                    throw new ModuleStartException(e.getMessage(), e);
                }
            }
        }
    

    org.apache.skywalking.oap.server.core.analysis.StreamAnnotationListener#notify

        @Override
        public void notify(Class aClass) throws StorageException {
            if (aClass.isAnnotationPresent(Stream.class)) {
                Stream stream = (Stream) aClass.getAnnotation(Stream.class);
    
                if (stream.processor().equals(RecordStreamProcessor.class)) {
                    RecordStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
                } else if (stream.processor().equals(MetricsStreamProcessor.class)) {
                    // 因为所有的Metrics类上的@Stream注解的processor = MetricsStreamProcessor.class,所以只会走该分支
                    MetricsStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
                } else if (stream.processor().equals(TopNStreamProcessor.class)) {
                    TopNStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
                } else if (stream.processor().equals(NoneStreamProcessor.class)) {
                    NoneStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
                } else if (stream.processor().equals(ManagementStreamProcessor.class)) {
                    ManagementStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
                } else {
                    throw new UnexpectedException("Unknown stream processor.");
                }
            } else {
                throw new UnexpectedException(
                        "Stream annotation listener could only parse the class present stream annotation.");
            }
        }
    

    org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor#create 中,通过一系列的处理,最后将 Worker (处理器)放入 map 中,等待后续被使用

        /**
         * Create the workers and work flow for every metrics.
         *
         * @param moduleDefineHolder pointer of the module define.
         * @param stream             definition of the metrics class.
         * @param metricsClass       data type of the streaming calculation.
         */
        public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends Metrics> metricsClass) throws StorageException {
            this.create(moduleDefineHolder, StreamDefinition.from(stream), metricsClass);
        }
    
        @SuppressWarnings("unchecked")
        public void create(ModuleDefineHolder moduleDefineHolder,
                           StreamDefinition stream,
                           Class<? extends Metrics> metricsClass) throws StorageException {
            if (DisableRegister.INSTANCE.include(stream.getName())) {
                return;
            }
    
            StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
            IMetricsDAO metricsDAO;
            try {
                // 获取@Stream注解上的builder类,并创建Metrics存储DAO对象
                metricsDAO = storageDAO.newMetricsDao(stream.getBuilder().newInstance());
            } catch (InstantiationException | IllegalAccessException e) {
                throw new UnexpectedException("Create " + stream.getBuilder().getSimpleName() + " metrics DAO failure.", e);
            }
    
            ModelCreator modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(ModelCreator.class);
            DownSamplingConfigService configService = moduleDefineHolder.find(CoreModule.NAME)
                                                                        .provider()
                                                                        .getService(DownSamplingConfigService.class);
    
            MetricsPersistentWorker hourPersistentWorker = null;
            MetricsPersistentWorker dayPersistentWorker = null;
    
            MetricsTransWorker transWorker = null;
    
            final MetricsExtension metricsExtension = metricsClass.getAnnotation(MetricsExtension.class);
            /**
             * All metrics default are `supportDownSampling` and `insertAndUpdate`, unless it has explicit definition.
             */
            boolean supportDownSampling = true;
            boolean supportUpdate = true;
            if (metricsExtension != null) {
                supportDownSampling = metricsExtension.supportDownSampling();
                supportUpdate = metricsExtension.supportUpdate();
            }
            if (supportDownSampling) {
                if (configService.shouldToHour()) {
                    Model model = modelSetter.add(
                        metricsClass, stream.getScopeId(), new Storage(stream.getName(), DownSampling.Hour), false);
                    hourPersistentWorker = downSamplingWorker(moduleDefineHolder, metricsDAO, model, supportUpdate);
                }
                if (configService.shouldToDay()) {
                    Model model = modelSetter.add(
                        metricsClass, stream.getScopeId(), new Storage(stream.getName(), DownSampling.Day), false);
                    dayPersistentWorker = downSamplingWorker(moduleDefineHolder, metricsDAO, model, supportUpdate);
                }
    
                transWorker = new MetricsTransWorker(
                    moduleDefineHolder, hourPersistentWorker, dayPersistentWorker);
            }
    
            Model model = modelSetter.add(
                metricsClass, stream.getScopeId(), new Storage(stream.getName(), DownSampling.Minute), false);
            MetricsPersistentWorker minutePersistentWorker = minutePersistentWorker(
                moduleDefineHolder, metricsDAO, model, transWorker, supportUpdate);
    
            String remoteReceiverWorkerName = stream.getName() + "_rec";
            IWorkerInstanceSetter workerInstanceSetter = moduleDefineHolder.find(CoreModule.NAME)
                                                                           .provider()
                                                                           .getService(IWorkerInstanceSetter.class);
            workerInstanceSetter.put(remoteReceiverWorkerName, minutePersistentWorker, metricsClass);
    
            MetricsRemoteWorker remoteWorker = new MetricsRemoteWorker(moduleDefineHolder, remoteReceiverWorkerName);
            MetricsAggregateWorker aggregateWorker = new MetricsAggregateWorker(
                moduleDefineHolder, remoteWorker, stream.getName());
    
            // private Map<Class<? extends Metrics>, MetricsAggregateWorker> entryWorkers = new HashMap<>();
            // 将指标类的Class与MetricsAggregateWorker放入map中
            // 当需要处理指标数据时,从map中获取即可
            entryWorkers.put(metricsClass, aggregateWorker);
        }
    

    SourceReceiver 处理 Source 相关流程

    在“从一个案例开始分析 OAL 原理”一节,聊到了 oap server 将从 agent 收到的指标信息,发送至 SourceReceive
    对应的坐标是:org.apache.skywalking.oap.server.analyzer.provider.jvm.JVMSourceDispatcher#sendToClassMetricProcess

        private void sendToClassMetricProcess(String service,
                String serviceId,
                String serviceInstance,
                String serviceInstanceId,
                long timeBucket,
                Class clazz) {
            // 拼装Source对象
            ServiceInstanceJVMClass serviceInstanceJVMClass = new ServiceInstanceJVMClass();
            serviceInstanceJVMClass.setId(serviceInstanceId);
            serviceInstanceJVMClass.setName(serviceInstance);
            serviceInstanceJVMClass.setServiceId(serviceId);
            serviceInstanceJVMClass.setServiceName(service);
            serviceInstanceJVMClass.setLoadedClassCount(clazz.getLoadedClassCount());
            serviceInstanceJVMClass.setUnloadedClassCount(clazz.getUnloadedClassCount());
            serviceInstanceJVMClass.setTotalLoadedClassCount(clazz.getTotalLoadedClassCount());
            serviceInstanceJVMClass.setTimeBucket(timeBucket);
            // 将Source对象发送至SourceReceive进行处理
            sourceReceiver.receive(serviceInstanceJVMClass);
        }
    

    SourceReceiver 的默认实现类 org.apache.skywalking.oap.server.core.source.SourceReceiverImpl ,将收集到的指标通过 org.apache.skywalking.oap.server.core.analysis.DispatcherManager#forward 进行分发

    package org.apache.skywalking.oap.server.core.source;
    
    import java.io.IOException;
    import lombok.Getter;
    import org.apache.skywalking.oap.server.core.analysis.DispatcherDetectorListener;
    import org.apache.skywalking.oap.server.core.analysis.DispatcherManager;
    
    public class SourceReceiverImpl implements SourceReceiver {
        @Getter
        private final DispatcherManager dispatcherManager;
    
        public SourceReceiverImpl() {
            this.dispatcherManager = new DispatcherManager();
        }
    
        @Override
        public void receive(Source source) {
            // 通过调配器管理器进行转发
            dispatcherManager.forward(source);
        }
    
        @Override
        public DispatcherDetectorListener getDispatcherDetectorListener() {
            return getDispatcherManager();
        }
    
        public void scan() throws IOException, InstantiationException, IllegalAccessException {
            dispatcherManager.scan();
        }
    }
    
        // org.apache.skywalking.oap.server.core.analysis.DispatcherManager#forward
        public void forward(Source source) {
            if (source == null) {
                return;
            }
            // 通过source的scope找到对应的调度器
            List<SourceDispatcher> dispatchers = dispatcherMap.get(source.scope());
    
            /**
             * Dispatcher is only generated by oal script analysis result.
             * So these will/could be possible, the given source doesn't have the dispatcher,
             * when the receiver is open, and oal script doesn't ask for analysis.
             */
            if (dispatchers != null) {
                source.prepare();
                // 调度器进行分发,OAL动态生成的调度器,也会在这进行分发
                for (SourceDispatcher dispatcher : dispatchers) {
                    dispatcher.dispatch(source);
                }
            }
        }
    

    MetricsStreamProcessor 如何处理 SourceDispatcher 发送过来的指标数据

    完整代码请见“ OAL 如何动态生成 Class 类”下“案例”一节

    org.apache.skywalking.oap.server.core.source.oal.rt.dispatcher.ServiceInstanceJVMClassDispatcher#doInstanceJvmClassLoadedClassCount 发送数据至 org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor

    package org.apache.skywalking.oap.server.core.source.oal.rt.dispatcher;
    
    import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
    import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
    import org.apache.skywalking.oap.server.core.source.ServiceInstanceJVMClass;
    import org.apache.skywalking.oap.server.core.source.Source;
    import org.apache.skywalking.oap.server.core.source.oal.rt.metrics.InstanceJvmClassLoadedClassCountMetrics;
    
    public class ServiceInstanceJVMClassDispatcher implements SourceDispatcher<ServiceInstanceJVMClass> {
        private void doInstanceJvmClassLoadedClassCount(ServiceInstanceJVMClass var1) {
            InstanceJvmClassLoadedClassCountMetrics var2 = new InstanceJvmClassLoadedClassCountMetrics();
            var2.setTimeBucket(var1.getTimeBucket());
            var2.setEntityId(var1.getEntityId());
            var2.setServiceId(var1.getServiceId());
            var2.combine(var1.getLoadedClassCount(), (long)1);
            // 发送数据到指标流处理器
            MetricsStreamProcessor.getInstance().in(var2);
        }
    
        public void dispatch(Source var1) {
            ServiceInstanceJVMClass var2 = (ServiceInstanceJVMClass)var1;
            this.doInstanceJvmClassLoadedClassCount(var2);
        }
    
        public ServiceInstanceJVMClassDispatcher() {
        }
    }
    

    org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor#in 方法中,使用在 org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor#create 中创建的 Worker 对象,保存数据

        public void in(Metrics metrics) {
            MetricsAggregateWorker worker = entryWorkers.get(metrics.getClass());
            if (worker != null) {
                worker.in(metrics);
            }
        }
    

    PS:内部再细节一些的数据处理流程,相关的关键字有: DataCarrierWorkerStorageModule ,暂且不表,不是这篇文章的内容。

    总结

    Skywalking Metrics处理流程

    file

    参考文档

    分享并记录所学所见

    相关文章

      网友评论

          本文标题:Skywalking-09:OAL原理——如何通过动态生成的Cl

          本文链接:https://www.haomeiwen.com/subject/qyrliltx.html