先看接口
public interface Sampler {
/**
* @param operation The operation name set on the span
* @param id The traceId on the span
* @return whether or not the new trace should be sampled
*/
SamplingStatus sample(String operation, long id);
/**
* Release any resources used by the sampler.
*/
void close();
}
两个方法,注释说的很清楚了,不废话。close 基本都是空实现,因为实在是没有用到啥需要关闭的资源。
image.png
官方6个实现类,一个一个来
ConstSampler
- 一句话:要么全采样,要么不采样
public static final String TYPE = "const";
public ConstSampler(boolean decision) {
this.decision = decision;
...
}
构造 ConstSampler 时需要传入 decision 参数
public SamplingStatus sample(String operation, long id) {
return SamplingStatus.of(decision, tags);
}
sample 方法中直接就返回这个 decision 了。是个 boolean,所以要么全采样,要么不采样。只会在测试环境用到。
RateLimitingSampler
- 基于漏桶( LeakyBucket )算法的采样器
public static final String TYPE = "ratelimiting";
public RateLimitingSampler(double maxTracesPerSecond) {
this.maxTracesPerSecond = maxTracesPerSecond;
double maxBalance = maxTracesPerSecond < 1.0 ? 1.0 : maxTracesPerSecond;
this.rateLimiter = new RateLimiter(maxTracesPerSecond, maxBalance);
...
}
一个参数 maxTracesPerSecond,每秒最大采样数
public SamplingStatus sample(String operation, long id) {
return SamplingStatus.of(this.rateLimiter.checkCredit(1.0), tags);
}
sample 方法是否采样取决于漏桶满没满。
- RateLimiter 的实现
private final double creditsPerNanosecond;
private final Clock clock;
private double balance;
private double maxBalance;
private long lastTick;
public RateLimiter(double creditsPerSecond, double maxBalance, Clock clock) {
this.clock = clock;
this.balance = maxBalance;
this.maxBalance = maxBalance;
this.creditsPerNanosecond = creditsPerSecond / 1.0e9;
}
参数说明:
-- clock 封装 System 时间操作
-- balance 桶中资源(后续分析中已水代替资源,便于理解)数量
-- maxBalance 桶的最大容量
-- creditsPerNanosecond 放行速率
-- lastTick 上次加水时间
桶最大容量就是每秒采样数,然后以纳秒级的速率均匀放过请求(这里说的均匀并不是说严格按照每 creditsPerNanosecond 纳秒放过一个采样请求,见下面分析)
public boolean checkCredit(double itemCost) {
long currentTime = clock.currentNanoTicks();
// 距上次加水的时间
double elapsedTime = currentTime - lastTick;
// 更新加水时间
lastTick = currentTime;
// 尝试加水,剩余水量 + 过去这段时间漏出去的水量
balance += elapsedTime * creditsPerNanosecond;
// 如果尝试加水会导致桶满溢出,就把桶加到满
if (balance > maxBalance) {
balance = maxBalance;
}
// 剩的水比要漏出去的水多,就漏过去,即放过这个请求
if (balance >= itemCost) {
balance -= itemCost;
return true;
}
// 否则不漏,拒绝请求
return false;
}
这个漏桶有点奇怪,反而有点像令牌桶。反复往桶里加水,加到足够漏出一次才漏出。。另外注意这个 RateLimiter 不是线程安全的,Jaeger 之所以直接用,是因为他在 synchronized 修饰的方法里。如果想拿出来另做他用的话,需要注意。
ProbabilisticSampler
随机采样,每个请求都有一定的概率被采样,掷硬币
public ProbabilisticSampler(double samplingRate) {
if (samplingRate < 0.0 || samplingRate > 1.0) {
throw new IllegalArgumentException(
"The sampling rate must be greater than 0.0 and less than 1.0");
}
this.samplingRate = samplingRate;
this.positiveSamplingBoundary = (long) (((1L << 63) - 1) * samplingRate);
this.negativeSamplingBoundary = (long) ((1L << 63) * samplingRate);
...
}
构造参数为采样率,正边界为最大 long * 采样率,负边界为最小 long * 采样率,有什么用?
public SamplingStatus sample(String operation, long id) {
if (id > 0) {
return SamplingStatus.of(id <= this.positiveSamplingBoundary, tags);
} else {
return SamplingStatus.of(id >= this.negativeSamplingBoundary, tags);
}
}
id 跟采样边界比较,决定是否采样。我们已经知道 sample 的入参 id 是 span 的 id,而它是通过 ThreadlocalRandom 生成的一个随机 long,所以这个比较就相当于掷硬币,也就实现了随机采样。
GuaranteedThroughputSampler
从名字可以看出来,它会保证 Throughput 并采样,这是什么东西?继续看
public static final String TYPE = "lowerbound";
private ProbabilisticSampler probabilisticSampler;
private RateLimitingSampler lowerBoundSampler;
private Map<String, Object> tags;
public GuaranteedThroughputSampler(double samplingRate, double lowerBound) {
....
probabilisticSampler = new ProbabilisticSampler(samplingRate);
lowerBoundSampler = new RateLimitingSampler(lowerBound);
}
很明显,这是一个复合采样器。内部同时持有概率采样器和漏桶采样器,他们的特性前文已经说过了
public synchronized SamplingStatus sample(String operation, long id) {
SamplingStatus probabilisticSamplingStatus = probabilisticSampler.sample(operation, id);
SamplingStatus lowerBoundSamplingStatus = lowerBoundSampler.sample(operation, id);
if (probabilisticSamplingStatus.isSampled()) {
return probabilisticSamplingStatus;
}
return SamplingStatus.of(lowerBoundSamplingStatus.isSampled(), tags);
}
看 sample 方法,优先使用概率采样器采集,概率采样器没采集到的会漏到漏桶采样器,然后再由漏桶采样器来控制采样比率,这样基本上可以保证每种 operation(即一类调用,一般是接口/方法名) 都可以被采样到。
PerOperationSampler
还是顾名思义,每种操作一个 sampler ?
private final int maxOperations;
private final HashMap<String, GuaranteedThroughputSampler> operationNameToSampler;
private ProbabilisticSampler defaultSampler;
private double lowerBound;
public PerOperationSampler(int maxOperations, OperationSamplingParameters strategies) {
this(maxOperations,
new HashMap<String, GuaranteedThroughputSampler>(),
new ProbabilisticSampler(strategies.getDefaultSamplingProbability()),
strategies.getDefaultLowerBoundTracesPerSecond());
update(strategies);
}
属性说明:
-- maxOperations 最大支持的 operation 数
-- operationNameToSampler operation to simpler 的缓存 map,可以看出最终都是GuaranteedThroughputSampler 这个复合采样器
-- defaultSampler 默认采样器
-- lowerBound GuaranteedThroughputSampler 的 lowerBound((ノ`Д)ノ 实在不知道这个翻成什么合适,反正就是复合采样器里漏桶采样器的每秒采样数)
构造参数说明:
-- maxOperations 最大支持的 operation 数
-- strategies 需要提前构造好的一堆采样器
构造方法中,首先初始化采样器 operation - sampler 的映射 map 和 默认的概率采样器
然后在 update 方法中按照 strategies 传入的数据统一初始化,最终都是GuaranteedThroughput复合采样器,并放入 map,这个不详说了
public synchronized SamplingStatus sample(String operation, long id) {
GuaranteedThroughputSampler sampler = operationNameToSampler.get(operation);
if (sampler != null) {
return sampler.sample(operation, id);
}
if (operationNameToSampler.size() < maxOperations) {
sampler = new GuaranteedThroughputSampler(defaultSampler.getSamplingRate(), lowerBound);
operationNameToSampler.put(operation, sampler);
return sampler.sample(operation, id);
}
return defaultSampler.sample(operation, id);
}
按照 operation 从映射 map 取采样器并由它判断是否采样,没有提前创建采样器的就新建并放入缓存,但不得超过最大值,否则不缓存,不抛异常。
这种采样器自己编码用不到。
RemoteControlledSampler
远程控制的采样器,这个是默认采样器,由 jaeger-controller 的 strategies.json 配置,并下发到 jaeger-agent,然后 jaeger-client 会从 jaeger-agent 拉取。
void updateSampler() {
SamplingStrategyResponse response;
try {
response = manager.getSamplingStrategy(serviceName);
metrics.samplerRetrieved.inc(1);
} catch (SamplingStrategyErrorException e) {
metrics.samplerQueryFailure.inc(1);
return;
}
if (response.getOperationSampling() != null) {
updatePerOperationSampler(response.getOperationSampling());
} else {
updateRateLimitingOrProbabilisticSampler(response);
}
}
主要看一下 updateSampler 方法就行了,启动一个 RemoteControlledSampler 后,他会定时去 jaeger-agent 拉取配置,更新采样器,然后由配置的采样器规则决定是否采样。
网友评论