随着分布式与微服务的发展,系统复杂度指数式上升,对服务的鲁棒性要求也日渐增高。本文采取自研(代码可控,符合需求的情况下)的方式,探究分布式服务监控中的链路监控,并介绍相关开源产品。
文章第一部分介绍目前分布式ID主流生成方案,作为链路traceId
,并基于数据库 + 号段模式进行实现。
后续部分进入链路监控,探究如何保证分布式系统的可观测性(Observability)与可控制性(Controllability)以实现 “可以由其外部输出推断其内部状态的程度”。本部分主要采用transmittableThreadlocal
、MDC
等实现。
一、分布式唯一ID
1.1 常用方式
- UUID
UUID一共36bit,组成方式为8-4-4-4-12。优点是java类库提供API,使用简单,但存在一定性能与安全问题,如造成MYSQL页分裂等。
- 数据库自增主键
使用自增主键。优点同样是使用方便,但频繁操作数据库会产生性能瓶颈。
- SnowFlake ID
基于雪花算法实现的分布式唯一ID,组成方式为:
符号位一般不变,时间戳与序列号保证递增,时间戳单位为ms,序列号位数表示一毫秒内可产生的序列数,工作进程位表示机器描述标识,可以通过zk持久顺序节点实现。雪花算法优点是性能较高,但存在时钟回拨问题,可能产生重复ID。
- MongoDB ObjectID
类似于SnowFlake ID
,组成结构为:
包含了时间戳、机器描述、进程号、自增序列。
1.2 自研实现
综合自身业务以及系统的可控性,这里我们使用数据库+号段模式进行实现。 核心流程:
1.2.1 创建数据表
表结构:
CREATE TABLE `id_factory_config` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键id',
`business_code` varchar(60) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '业务编码,作为语义化前缀',
`init_value` bigint(13) DEFAULT NULL COMMENT '初始值',
`current_start` bigint(20) DEFAULT NULL COMMENT '本次id起始值',
`current_threshold` bigint(20) DEFAULT NULL COMMENT '本次id段阈值',
`step` int(11) DEFAULT NULL COMMENT '步长,可动态调整',
`version` int(11) NOT NULL DEFAULT '0' COMMENT '版本号',
`create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
`create_user` varchar(60) DEFAULT NULL COMMENT '创建用户',
`update_user` varchar(60) DEFAULT NULL COMMENT '修改用户',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
对应entity
:
@TableName("id_factory_config")
@Data
public class IdFactoryConfig {
@TableId(type = IdType.AUTO)
private Integer id;
private String businessCode;
private Long initValue;
private Long currentStart;
private Long currentThreshold;
private Integer step;
private Integer version;
private Date createTime;
private Date updateTime;
private String createUser;
private String updateUser;
}
各字段含义,参考字段注释与后续代码。
对应mapper
:
/**
* @author WinsonWu
* @create 2022-08-26 9:39
* @desc 分布式id配置查询
*
**/
@Mapper
public interface IdFactoryConfigMapper extends BaseMapper<IdFactoryConfig> {
@Select("select * from id_factory_config")
List<IdFactoryConfig> selectAll();
@Select("select * from id_factory_config where business_code=#{businessCode} limit 1 for update")
IdFactoryConfig selectOneForUpdate(@Param("businessCode") String businessCode);
@Update("UPDATE id_factory_config set current_threshold=#{currentThreshold},current_start=#{currentStart},version=version+1 where id=#{id} and version=#{version}")
Integer updateCurrentThreshold(@Param("currentThreshold") long currentThreshold,@Param("currentStart") long currentStart, @Param("id") int id, @Param("version") int version);
}
数据库配置:
1.2.2 代码实现
1.2.2.1 接口定义
/**
* @Description: ID创建工厂
* @Author: WinsonWu
* @Date: 2022/8/25 16:38
**/
public interface IdFactory {
/**
* get distributed sequence id
* @return id
*/
String getSeqId(String businessCode);
}
复制代码
1.2.2.2 接口实现
实现思路:
->系统初始化加载id段
-> CAS更新数据库
-> 获取id
-> 提前异步更新
-> 过载兜底保护
/**
* @Description: ID工厂实现
* @Author: WinsonWu
* @Date: 2022/8/25 16:41
**/
@Slf4j
@Component
public class SimpleIdFactory implements IdFactory {
/**
* 序列字典
*/
private static Map<String, LocalSeqId> localSeqMap;
@Resource
private IdFactoryConfigMapper idFactoryConfigMapper;
@Autowired
@Qualifier("poolForUpdate")
private ThreadPoolExecutor poolForUpdate;
@PostConstruct
public void initFactoryConfig() {
// 查询全量配置
List<IdFactoryConfig> idFactoryConfigs = idFactoryConfigMapper.selectAll();
List<IdFactoryConfig> refreshList = new ArrayList<>();
for (IdFactoryConfig idFactoryConfig : idFactoryConfigs) {
// 重启时抛弃旧数据
refreshList.add(updateFactoryConfig(idFactoryConfig));
}
localSeqMap = new ConcurrentHashMap<>(refreshList.size());
for (IdFactoryConfig idFactoryConfig : refreshList) {
LocalSeqId localSeqId = new LocalSeqId();
localSeqId.setCurrentThreshold(new AtomicLong(idFactoryConfig.getCurrentThreshold()));
localSeqId.setCurrentId(new AtomicLong(idFactoryConfig.getCurrentStart()));
localSeqId.setStep(idFactoryConfig.getStep());
localSeqId.setPrefix(idFactoryConfig.getBusinessCode());
localSeqMap.put(idFactoryConfig.getBusinessCode(), localSeqId);
}
}
/**
* @param idFactoryConfig
* @return
*/
private IdFactoryConfig updateFactoryConfig(IdFactoryConfig idFactoryConfig) {
int updateResult;
int retryCount = 0;
while (true) {
// permit CAS 10 times
if (retryCount >= 10) {
//todo 告警
log.error("retry too much!");
return null;
}
try {
IdFactoryConfig newIdFactoryConfig = idFactoryConfigMapper.selectOneForUpdate(idFactoryConfig.getBusinessCode());
long currentThreshold = idFactoryConfig.getCurrentThreshold();
long step = idFactoryConfig.getStep();
updateResult = idFactoryConfigMapper.updateCurrentThreshold(currentThreshold + step, currentThreshold, newIdFactoryConfig.getId(), newIdFactoryConfig.getVersion());
if (updateResult > 0) {
return newIdFactoryConfig;
} else {
retryCount++;
}
} catch (Exception e) {
//todo 告警
log.error("error occurred: ", e);
}
}
}
@Override
public String getSeqId(String businessCode) {
// 直接从本地缓存中提取id数据
final LocalSeqId localSeqId = localSeqMap.get(businessCode);
if (Objects.isNull(localSeqId)) {
log.error("business code not exists at MEM", businessCode);
// todo 告警
return null;
}
AtomicLong currentId = localSeqId.getCurrentId();
// 使用超过80%,则异步更新
//todo 也可以考虑添加一个缓冲区,互相备份缓冲
if (localSeqId.getCurrentThreshold().get() - currentId.get() < 0.2 * localSeqId.getStep()) {
poolForUpdate.submit(() -> {
int updateResult = -1;
//如果更新失败,进行重试,五次仍然失败,则放弃
for (int i = 0; i < 5; i++) {
IdFactoryConfig newIdFactoryConfig = idFactoryConfigMapper.selectOneForUpdate(businessCode);
long currentThreshold = newIdFactoryConfig.getCurrentThreshold();
long currentStart = newIdFactoryConfig.getCurrentStart();
long step = newIdFactoryConfig.getStep();
updateResult = idFactoryConfigMapper.updateCurrentThreshold(currentThreshold + step, currentThreshold,
newIdFactoryConfig.getId(), newIdFactoryConfig.getVersion());
if (updateResult > 0) {
LocalSeqId newLocalSeqId = new LocalSeqId();
newLocalSeqId.setCurrentId(new AtomicLong(currentStart));
newLocalSeqId.setPrefix(businessCode);
newLocalSeqId.setStep(newIdFactoryConfig.getStep());
newLocalSeqId.setCurrentThreshold(new AtomicLong(currentThreshold));
localSeqMap.put(businessCode, newLocalSeqId);
break;
}
}
});
}
// 过载保护
if (localSeqMap.get(businessCode).getCurrentId().get() >= localSeqMap.get(businessCode).getCurrentThreshold().get() - 1) {
synchronized (this) {
if (localSeqMap.get(businessCode).getCurrentId().get() >= localSeqMap.get(businessCode).getCurrentThreshold().get() - 1) {
//阻塞更新数据库
int updateResult = -1;
int retryCount = 0;
//如果更新失败,进行重试
while(true) {
if (retryCount >= 10){
// todo 告警
log.error("retry too much!");
}
IdFactoryConfig newIdFactoryConfig = idFactoryConfigMapper.selectOneForUpdate(businessCode);
long currentThreshold = newIdFactoryConfig.getCurrentThreshold();
long currentStart = newIdFactoryConfig.getCurrentStart();
long step = newIdFactoryConfig.getStep();
updateResult = idFactoryConfigMapper.updateCurrentThreshold(currentThreshold + step, currentThreshold,
newIdFactoryConfig.getId(), newIdFactoryConfig.getVersion());
if (updateResult > 0) {
LocalSeqId newLocalSeqId = new LocalSeqId();
newLocalSeqId.setCurrentId(new AtomicLong(currentStart));
newLocalSeqId.setPrefix(businessCode);
newLocalSeqId.setStep(newIdFactoryConfig.getStep());
newLocalSeqId.setCurrentThreshold(new AtomicLong(currentThreshold));
localSeqMap.put(businessCode, newLocalSeqId);
break;
}else {
retryCount ++;
}
}
}
}
}
String result = businessCode + localSeqMap.get(businessCode).getCurrentId().getAndAdd(1);
return result;
}
}
LocalSeqId
:
@Data
public class LocalSeqId {
/**
* 当前缓存id起始值
*/
private AtomicLong currentId;
/**
* 达到这个阈值就需要进行更新
*/
private AtomicLong currentThreshold;
/**
* 步长
*/
private int step;
/**
* 前缀
*/
private String prefix;
}
实现优化点较多,如文章中包含的告警内容、id资源浪费,以及step的动态更新,具体可以参考leaf实现,这里附参考架构图:
1.2.3 并发测试
1.2.3.1 定义线程池
/**
* @author winsonWu
* @date 2022.08.26
* @desc
*/
@Configuration
public class ThreadPoolUtil {
/**
* get core num
*/
private static int corePoolSize = Runtime.getRuntime().availableProcessors() * 3;
/**
* max thread num
*/
private static int maximumPoolSize = corePoolSize;
/**
* thread keep alive time
*/
private static long keepAliveTime = 1;
/**
* thread keep alive time unit
*/
private static TimeUnit unit = TimeUnit.HOURS;
/**
* use array blocking queue to avoid out of memory
*/
private static BlockingQueue<Runnable> queueForUpdate = new ArrayBlockingQueue<>(10);
/**
* use array blocking queue to avoid out of memory
*/
private static BlockingQueue<Runnable> queueForFuture = new ArrayBlockingQueue<>(500);
/**
* default thread factory
*/
private static ThreadFactory threadFactory = Executors.defaultThreadFactory();
/**
* reject policy
*/
private static RejectedExecutionHandler handlerForUpdate = new ThreadPoolExecutor.DiscardPolicy();
/**
* reject policy
*/
private static RejectedExecutionHandler handlerForFuture = new ThreadPoolExecutor.CallerRunsPolicy();
@Bean("poolForCompletableFuture")
public ThreadPoolExecutor poolForCompletableFuture(){
return new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime, unit,
queueForFuture,
threadFactory,
handlerForFuture);
}
@Bean("poolForUpdate")
public ThreadPoolExecutor poolForUpdate(){
return new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime, unit,
queueForUpdate,
threadFactory,
handlerForUpdate);
}
}
1.2.3.2 单元测试代码
...
@Autowired
@Qualifier("poolForCompletableFuture")
private ThreadPoolExecutor poolForCompletableFuture;
@Resource
private SimpleIdFactory simpleIdFactory;
@Test
public void testSeqId(){
StopWatch stopwatch = new StopWatch();
stopwatch.start("id生成器");
List<CompletableFuture<String>> futures = new ArrayList<>(10000);
List<String> results = new ArrayList<>(10000);
String businessCode = "order";
for (int i=0; i<10000; i++){
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> simpleIdFactory.getSeqId(businessCode), poolForCompletableFuture);
futures.add(future);
}
CompletableFuture
.allOf(futures.toArray(new CompletableFuture[futures.size()]))
.whenComplete((v, t)-> futures.forEach(eachFuture -> {
results.add(eachFuture.getNow(null));
})).join();
// 去重
List<String> finalList =new ArrayList<>(10000);
for(String each : results) {
if (!finalList.contains(each))
finalList.add(each);
}
// 去重后,最终对象数量为10000
Assertions.assertEquals(finalList.stream().collect(Collectors.toSet()).size(), 10000);
stopwatch.stop();
System.out.println(stopwatch.prettyPrint());
}
1.2.3.3 单元测试结果
这里生成10000个id,耗时654ms。
1.3 开源实现
开源实现较多,我们这里介绍下gayhub stars较多且使用简单的[美团leaf
leaf实现了号段与雪花算法两种方式。号段模式使用了双buffer优化(提前加载id段),提升ID获取效率;雪花算法模式提供了时钟回拨校验(等待与报错告警),避免重复生成ID。下面提供架构图,方便理解。
号段模式架构图:
雪花模式架构图:
二、监控
实现系统的可观测性(Observability)与可控制性(Controllability),主要包含三个方面——事件日志、链路追踪和聚合度量。
- 日志(Logging):日志的职责是记录离散事件,通过这些记录事后分析出程序的行为,譬如曾经调用过什么方法,曾经操作过哪些数据,等等。打印日志被认为是程序中最简单的工作之一,调试问题时常有人会说“当初这里记得打点日志就好了”,可见这就是一项举手之劳的任务。输出日志的确很容易,但收集和分析日志却可能会很复杂,面对成千上万的集群节点,面对迅速滚动的事件信息,面对数以 TB 计算的文本,传输与归集都并不简单。对大多数程序员来说,分析日志也许就是最常遇见也最有实践可行性的“大数据系统”了。
常用的实现思路是通过异步线程收集,并持久化到存储介质,再通过搜索引擎进行索引,最终使用可视化界面进行展示搜索。常用的开源框架为Elastic Stack: [图片上传失败...(image-c23888-1661657876785)]
- 追踪(Tracing):单体系统时代追踪的范畴基本只局限于[栈追踪]
- (en.wikipedia.org/wiki/Stack_… Tracing),调试程序时,在 IDE 打个断点,看到的 Call Stack 视图上的内容便是追踪;编写代码时,处理异常调用了
Exception::printStackTrace()
方法,它输出的堆栈信息也是追踪。微服务时代,追踪就不只局限于调用栈了,一个外部请求需要内部若干服务的联动响应,这时候完整的调用轨迹将跨越多个服务,同时包括服务间的网络传输信息与各个服务内部的调用堆栈信息,因此,分布式系统中的追踪在国内常被称为“全链路追踪”(后文就直接称“链路追踪”了),许多资料中也称它为“分布式追踪(Distributed Tracing)。追踪的主要目的是排查故障,如分析调用链的哪一部分、哪个方法出现错误或阻塞,输入输出是否符合预期,等等。
该模块我们使用transmittableThreadlocal
、MDC
针对各个系统与中间件定制化开发,也可以使用javaAgent+ASM的方式进行无侵入监控。
- 度量(Metrics):度量是指对系统中某一类信息的统计聚合。譬如,证券市场的每一只股票都会定期公布财务报表,通过财报上的营收、净利、毛利、资产、负债等等一系列数据来体现过去一个财务周期中公司的经营状况,这便是一种信息聚合。Java 天生自带有一种基本的度量,就是由虚拟机直接提供的 JMX(Java Management eXtensions)度量,诸如内存大小、各分代的用量、峰值的线程数、垃圾收集的吞吐量、频率,等等都可以从 JMX 中获得。度量的主要目的是监控(Monitoring)和预警(Alert),如某些度量指标达到风险阈值时触发事件,以便自动处理或者呼叫程序员。
对于系统而言,通过异步线程收集的数据,可以进行相关业务指标定义,通过批处理、流处理聚合,进行统一可视化展示或监控预警。
各类型产品介绍:
2.1 链路追踪(tracing)
现代分布式链路追踪公认的起源是 Google 在 2010 年发表的论文《Dapper : a Large-Scale Distributed Systems Tracing Infrastructure》,这篇论文介绍了 Google 从 2004 年开始使用的分布式追踪系统 Dapper 的实现原理。此后,所有业界有名的追踪系统,无论是国外 Twitter 的Zipkin(Naver 是 Line 的母公司,Pinpoint 出现其实早于 Dapper 论文发表,在 Dapper 论文中还提到了 Pinpoint),抑或是国内阿里的鹰眼、大众点评的CAT、个人开源的SkyWalking(后进入 Apache 基金会孵化毕业)都受到 Dapper 论文的直接影响。
pinpoint追踪示例:
2.1.1 实现思路
本文模拟环境为:
springboot网关应用 -> http应用 -> rpc应用 -> 消息队列
- 网关应用到http使用httpclient拦截器实现;
- http应用到rpc应用使用dubbo filter实现;
- rpc应用到rocketMQ应用使用钩子函数实现;
2.1.2 实现代码
2.1.2.1 http服务
构建基于threadlocal的上下文容器;
public class CommonRequestContext {
private static final ThreadLocal<Map<Object,Object>> requestContentMap = new TransmittableThreadLocal<Map<Object, Object>>(){
@Override
protected Map<Object, Object> initialValue() {
return new HashMap<>();
}
@Override
public Map<Object, Object> copy(Map<Object, Object> parentValue) {
return parentValue != null ? new HashMap<>(parentValue) : null;
}
};
public static void put(Object key,Object value) {
requestContentMap.get().put(key,value);
}
public static Object get(Object key){
return requestContentMap.get().get(key);
}
public static void clear() {
requestContentMap.remove();
}
}
这里我们使用阿里开源的TransmittableThreadLocal
,方便父子线程传递,也解决了InheritableThreadLocal
在线程池场景无法传递参数到子线程的问题。
TransmittableThreadLocal
基于InheritableThreadLocal
实现,但通过快照和holder管理了父子线程的变量。
构建httpclient:
/**
* initialize HttpClient
*
*/
@Configuration
@ConditionalOnProperty(value = "ins.httpclient.enabled", matchIfMissing = true)
@EnableConfigurationProperties(HttpClientCfg.class)
public class HttpClientAutoConfiguration {
@ConditionalOnMissingBean(CloseableHttpClient.class)
@Bean(name = "defaultHttpClient", destroyMethod = "close")
public CloseableHttpClient apacheHttpClient(HttpClientCfg cfg) {
RequestConfig config =
RequestConfig.custom()
.setConnectTimeout(cfg.getConnectTimeout())
.setSocketTimeout(cfg.getSoTimeout())
.build();
HttpClientBuilder builder = HttpClients.custom()
.addInterceptorFirst((HttpRequest request, HttpContext context) -> {
request.addHeader("traceId", String.valueOf(CommonRequestContext.get(RequestContentConstants.COMMON_REQUEST)));
})
.setDefaultRequestConfig(config);
if (StringUtils.isNotEmpty(cfg.getUserAgent())) {
builder.setUserAgent(cfg.getUserAgent());
}
builder.setConnectionTimeToLive(cfg.getConnTimeToLive(), TimeUnit.MILLISECONDS);
builder.setMaxConnTotal(cfg.getMaxTotalConnections()).setMaxConnPerRoute(cfg.getDefaultMaxConnectionsPerHost());
return builder.build();
}
@DependsOn("defaultHttpClient")
@Bean
public HttpClientHelper initHttpClientHelper(CloseableHttpClient httpClient) {
return HttpClientHelper.setHttpClient(httpClient);
}
}
可以看到这段代码将traceId通过header传递到了http服务:
...
.addInterceptorFirst((HttpRequest request, HttpContext context) -> {
request.addHeader("traceId", String.valueOf(CommonRequestContext.get(RequestContentConstants.COMMON_REQUEST)));
...
接下来我们通过filter来接受并解析traceId:
/**
* @Description: TODO
* @Author: WinsonWu
* @Date: 2022/8/27 12:22
**/
@Slf4j
public class TracingFilter implements Filter {
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
HttpServletRequest httpServletRequest = (HttpServletRequest) servletRequest;
String traceId = httpServletRequest.getHeader("traceId");
MDC.put("traceId", traceId);
filterChain.doFilter(servletRequest,servletResponse);
}
}
注册我们的filter:
@Configuration
public class WebConfig implements WebMvcConfigurer {
@Bean
public TracingFilter channelFilter(){
return new TracingFilter();
}
}
添加测试controller
:
/**
* @Description: TODO
* @Author: WinsonWu
* @Date: 2022/8/27 12:00
**/
@RestController("/")
@Slf4j
public class TestController {
@RequestMapping("tracing")
public String helloTracing(){
log.info("trace id is coming");
return "hello tracing";
}
}
logback配置:
<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false">
<property name="logPath" value="/usr/local/simulation/log"/>
<!-- 控制台输出 -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符-->
<pattern> %X{traceId} %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
</encoder>
</appender>
<!-- 按照每天生成日志文件 -->
<appender name="FILEINFOLOG" class="ch.qos.logback.core.rolling.RollingFileAppender">
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>ERROR</level> <!--过滤掉error的级别-->
<onMatch>DENY</onMatch>
<onMismatch>ACCEPT</onMismatch>
</filter>
<encoder>
<pattern>
%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n
</pattern>
</encoder>
<!--滚动策略-->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!--路径-->
<fileNamePattern>${logPath}/simulation_info.%d.log</fileNamePattern>
</rollingPolicy>
</appender>
<appender name="FILEERRORLOG" class="ch.qos.logback.core.rolling.RollingFileAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>ERROR</level> <!--//打印error-->
</filter>
<encoder>
<pattern>
%X{traceId} %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n
</pattern>
</encoder>
<!--滚动策略-->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!--路径-->
<fileNamePattern>${logPath}/simulation_error.%d.log</fileNamePattern>
</rollingPolicy>
</appender>
<!--myibatis log configure-->
<logger name="com.apache.ibatis" level="TRACE"/>
<logger name="java.sql.Connection" level="DEBUG"/>
<logger name="java.sql.Statement" level="DEBUG"/>
<logger name="java.sql.PreparedStatement" level="DEBUG"/>
<!-- 日志输出级别 -->
<root level="INFO">
<appender-ref ref="STDOUT" />
<appender-ref ref="FILEINFOLOG" />
<appender-ref ref="FILEERRORLOG" />
</root>
</configuration>
注意%X{traceId}
,这里是取MDC中的变量,放入pattern中。
测试http服务:
@Test
public void testTracing() throws IOException {
final String orderSeqId = simpleIdFactory.getSeqId("order");
CommonRequestContext.put(RequestContentConstants.COMMON_REQUEST, orderSeqId);
MDC.put("traceId", orderSeqId);
log.info("invoke");
String result = HttpClientHelper.getInstance().get("http://localhost:8080/tracing", null);
log.info("result");
}
测试结果:
发起端:
被调用端:
2.1.1.2 rpc服务
rpc中间件,这里使用dubbo,traceId的传递,我们通过dubbo filter实现,核心代码如下:
服务消费方filter:
@Activate(group = CommonConstants.CONSUMER)
@Slf4j
public class DubboConsumerTraceFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
String traceId = (String) CommonRequestContext.get(RequestContentConstants.COMMON_REQUEST);
try {
if (!Objects.isNull(traceId)) {
invocation.getAttachments().put(String.valueOf(RequestContentConstants.COMMON_REQUEST), traceId);
}
} catch (Exception e) {
e.printStackTrace();
}
return invoker.invoke(invocation);
}
}
服务提供方filter:
@Activate(group = CommonConstants.PROVIDER)
public class DubboProviderTraceFilter implements Filter {
private static final Logger LOGGER = LoggerFactory.getLogger(DubboProviderTraceFilter.class);
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
try {
String attachment = invocation.getAttachment(String.valueOf(RequestContentConstants.COMMON_REQUEST));
if (StringUtils.isNotEmpty(attachment)) {
MDC.put("traceId", attachment);
}
} catch (Exception e) {
e.printStackTrace();
}
return invoker.invoke(invocation);
}
}
2.1.2.3 消息队列
这里我们演示rockerMQ,这里我们通过hook实现。核心代码如下:
producerHook:
public class ProducerSendMessageHook implements SendMessageHook {
@Override
public String hookName() {
return "CommonMessageHook";
}
@Override
public void sendMessageBefore(SendMessageContext sendMessageContext) {
String traceId = (String) CommonRequestContext.get(RequestContentConstants.COMMON_REQUEST);
if(StringUtils.isNotEmpty(traceId)){
sendMessageContext.getMessage().putUserProperty(RequestContentConstants.COMMON_REQUEST.name(), traceId);
}
}
@Override
public void sendMessageAfter(SendMessageContext sendMessageContext) {
}
}
consumerHook:
public class ConsumerPullMessageHook implements FilterMessageHook {
@Override
public String hookName() {
return "ConsumerPullMessageHook";
}
@Override
public void filterMessage(FilterMessageContext filterMessageContext) {
Iterator<MessageExt> iterator = filterMessageContext.getMsgList().iterator();
while (iterator.hasNext()) {
MessageExt messageExt = iterator.next();
String traceId = messageExt.getUserProperty(RequestContentConstants.COMMON_REQUEST.name());
if (StringUtils.isNotEmpty(traceId)) {
CommonRequestContext.put(RequestContentConstants.COMMON_REQUEST,traceId);
MDC.put("traceId", traceId);
}
}
}
}
2.1.3 总结
至此我们就实现了基本的链路监控功能,在符合业务需求的情况下,采取自研的方式,可有效提高代码的可控制性,大型分布式系统,建议使用skywalking等开源组件。
作者:WinsonWu
链接:https://juejin.cn/post/7136423390398644231
来源:稀土掘金
网友评论