最近在研究全链路监控的实现方式,目的是计划在项目中加入全链路日志的支持,说到这个问题肯定有人会想到 APM,如:SkyWalking、Cat、Zipkin、Pinpoint 、Elastic APM 等,确实市面上已经存在现成的全链路监控框架可以直接使用,不过说实话,在免费领域 .NET 这方面的实现确实还不够成熟,当然和很多组件本身实现方式也有关,并没有预置埋点。SkyAPM-dotnet 是目前支持组件诊断分析较多的一个实现,如下图:
SkyApm.Diagnostics基于对 SkyWalking 的 SkyAPM-dotnet 和 Elastic APM 的 apm-agent-dotnet 源码阅读,它们本质上都是基于 DiagnosticSource
来实现的诊断跟踪,本身也定义了一套较规范的标准,如果需要实现更多组件的诊断跟踪,基本上是可以直接基于这套标准扩展即可,所以本文就主要介绍 DiagnosticSource
的使用,初步了解实现原理。
DiagnosticSource 是什么
简单来说 DiagnosticSource
一个基于观察者模式的日志模块,日志写入 DiagnosticSource
,然后供订阅者消费。DiagnosticSource
只是一个抽象类,它定义了记录事件所需的方法,实际核心的是 DiagnosticListener
实现类,每个 DiagnosticListener
都具有一个 Name
属性(诊断器名),一个应用程序中可包含多个 DiagnosticListener
,每个 DiagnosticListener
有自己唯一的诊断器名标识。 DiagnosticListener
充当发布者角色,通过 Write
向 DiagnosticSource
写入日志,同时提供了 Subscribe
方法设置订阅者来消费 DiagnosticSource
中的日志。
DiagnosticSource 事件发布
- 在事件发布前需要先创建
DiagnosticSource
,如下定义了一个诊断器名为TestDiagnosticListener
的DiagnosticListener
:private static readonly DiagnosticSource testDiagnosticListener = new DiagnosticListener("TestDiagnosticListener");
- 判断当前诊断器的某个事件名是否存在消费者监听:
bool IsEnabled(string name);
- 携带数据对象写入诊断器 DiagnosticSource 中:
使用示例:void Write(string name, object value);
if (testDiagnosticListener.IsEnabled("RequestStart")) { testDiagnosticListener.Write("RequestStart", "hello world"); }
DiagnosticSource 事件消费
-
定义
DiagnosticListener
事件消费处理接口,实现类中的ListenerName
必须与对应DiagnosticListener
的诊断器名一致:public interface IDiagnosticProcessor { string ListenerName { get; } }
-
定义诊断器名为
TestDiagnosticListener
的DiagnosticListener
事件消费处理逻辑:public class TestDiagnosticProcessor : IDiagnosticProcessor { public string ListenerName { get; } = "TestDiagnosticListener"; [DiagnosticName("RequestStart")] public void RequestStart([Object]string name) { Console.WriteLine(name); } }
-
创建
IObserver<DiagnosticListener>
实现类订阅所有类型的DiagnosticListener
,通过OnNext
方法的DiagnosticListener
对象获取当前的诊断器名,不同(诊断器名不同)DiagnosticListener
发布的事件设置不同的订阅者,主要代码如下(完整代码):public class DiagnosticListenerObserver : IObserver<DiagnosticListener> { private readonly IEnumerable<IDiagnosticProcessor> _diagnosticProcessors; public DiagnosticListenerObserver(IEnumerable<IDiagnosticProcessor> diagnosticProcessors) { _diagnosticProcessors = diagnosticProcessors; } public void OnNext(DiagnosticListener value) { var diagnosticProcessor = _diagnosticProcessors?.FirstOrDefault(_ => _.ListenerName == value.Name); if (diagnosticProcessor == null) return; value.Subscribe(new DiagnosticEventObserver(diagnosticProcessor)); } }
-
事件订阅者需要创建基于
IObserver<KeyValuePair<string, object>>
的实现类,根据触发的事件名(value.Key
)和已订阅的事件处理集合(_eventCollection
)进行匹对查找,匹配上的通过反射执行对应的消费方法,主要代码如下(完整代码):public class DiagnosticEventObserver : IObserver<KeyValuePair<string, object>> { private readonly DiagnosticEventCollection _eventCollection; public DiagnosticEventObserver(IDiagnosticProcessor diagnosticProcessor) { _eventCollection = new DiagnosticEventCollection(diagnosticProcessor); } public void OnNext(KeyValuePair<string, object> value) { var diagnosticEvent = _eventCollection.GetDiagnosticEvent(value.Key); if (diagnosticEvent == null) return; try { diagnosticEvent.Invoke(value.Value); } catch (Exception ex) { Console.WriteLine(ex.Message); } } }
-
最后需要通过
DiagnosticListener.AllListeners.Subscribe
设置DiagnosticListenerObserver
对象; -
执行结果:
result
总结
通过以上示例,我们完全可以参考基于这样的标准在组件封装(MongoDB、Dapper、Kafka、Redis ...)过程中自己埋点(创建相应的 DiagnosticListener
并发布事件),然后订阅者根据需求监听需要的事件,从而达到诊断日志全链路收集的目的。
注:本文涉及的代码主要是参考了 SkyAPM-dotnet 。
网友评论