前两天做了流量迁移,发现cnames 迁移之后。上游的pool 虽然调用cname, 但是流量一直没有迁移过来,还是坚韧的停留在老的pool中, 我们需要查明一下DNS 为啥没有refresh。流量没有切换的client 都是来自于WebFlux 的 Webclient, 内部实现是用的netty 的reactor.netty.http.client.HttpClient
idea.debugger.rt.GeneratedEvaluationClass.invoke(GeneratedEvaluationClass.java:5)
io.netty.resolver.dns.DnsResolveContext.query(DnsResolveContext.java:416)
io.netty.resolver.dns.DnsResolveContext.query(DnsResolveContext.java:1135)
io.netty.resolver.dns.DnsResolveContext.internalResolve(DnsResolveContext.java:358)
io.netty.resolver.dns.DnsResolveContext.doSearchDomainQuery(DnsResolveContext.java:284)
io.netty.resolver.dns.DnsAddressResolveContext.doSearchDomainQuery(DnsAddressResolveContext.java:96)
io.netty.resolver.dns.DnsResolveContext.resolve(DnsResolveContext.java:249)
io.netty.resolver.dns.DnsNameResolver.doResolveAllUncached0(DnsNameResolver.java:1150)
----- executor.execute
idea.debugger.rt.GeneratedEvaluationClass.invoke(GeneratedEvaluationClass.java:5)
io.netty.resolver.dns.DnsNameResolver.doResolveAllUncached(DnsNameResolver.java:1121)
io.netty.resolver.dns.DnsNameResolver.doResolveAll(DnsNameResolver.java:1072)
io.netty.resolver.dns.DnsNameResolver.doResolveAll(DnsNameResolver.java:1040)
io.netty.resolver.SimpleNameResolver.resolveAll(SimpleNameResolver.java:79)
io.netty.resolver.dns.InflightNameResolver.resolve(InflightNameResolver.java:96)
io.netty.resolver.dns.InflightNameResolver.resolveAll(InflightNameResolver.java:71)
io.netty.resolver.dns.InflightNameResolver.resolveAll(InflightNameResolver.java:56)
io.netty.resolver.InetSocketAddressResolver.doResolveAll(InetSocketAddressResolver.java:73)
io.netty.resolver.InetSocketAddressResolver.doResolveAll(InetSocketAddressResolver.java:31)
io.netty.resolver.AbstractAddressResolver.resolveAll(AbstractAddressResolver.java:158)
reactor.netty.transport.TransportConnector.doResolveAndConnect(TransportConnector.java:346)
connect:163, TransportConnector (reactor.netty.transport)
connect:123, TransportConnector (reactor.netty.transport)
lambda$connectChannel$0:519, DefaultPooledConnectionProvider$PooledConnectionAllocator (reactor.netty.resources)
createPool:95, DefaultPooledConnectionProvider (reactor.netty.resources)
lambda$acquire$0:133, PooledConnectionProvider (reactor.netty.resources)
apply:-1, 283293993 (reactor.netty.resources.PooledConnectionProvider$$Lambda$1791)
computeIfAbsent:1705, ConcurrentHashMap (java.util.concurrent)
computeIfAbsent:46, MapUtils (reactor.netty.internal.util)
lambda$acquire$2:128, PooledConnectionProvider (reactor.netty.resources)
accept:-1, 762174048 (reactor.netty.resources.PooledConnectionProvider$$Lambda$1790)
subscribe:58, MonoCreate (reactor.core.publisher)
lambda$subscribe$0:269, HttpClientConnect$MonoHttpConnect (reactor.netty.http.client)
调用过程的堆栈大致如此,因为是reactive 编程, 并不能从一个线程来抓到所有。 从这个堆栈可以看出,这个dns是从dns cache 里拿到的, 什么时候cache 被build
cache:137, DefaultDnsCache (io.netty.resolver.dns)
cache:83, DnsAddressResolveContext (io.netty.resolver.dns)
cache:30, DnsAddressResolveContext (io.netty.resolver.dns)
onExpectedResponse:880, DnsResolveContext (io.netty.resolver.dns)
onResponse:616, DnsResolveContext (io.netty.resolver.dns)
access$400:66, DnsResolveContext (io.netty.resolver.dns)
operationComplete:478, DnsResolveContext$2 (io.netty.resolver.dns)
如果一个call 成功, 这个dns 就会被cache , ttl 也在此时设置。see DefaultDnsCache
public DnsCacheEntry cache(String hostname, DnsRecord[] additionals, InetAddress address, long originalTtl, EventLoop loop) {
ObjectUtil.checkNotNull(hostname, "hostname");
ObjectUtil.checkNotNull(address, "address");
ObjectUtil.checkNotNull(loop, "loop");
DefaultDnsCache.DefaultDnsCacheEntry e = new DefaultDnsCache.DefaultDnsCacheEntry(hostname, address);
if (this.maxTtl != 0 && emptyAdditionals(additionals)) {
this.resolveCache.cache(appendDot(hostname), e, Math.max(this.minTtl, (int)Math.min((long)this.maxTtl, originalTtl)), loop);
return e;
} else {
return e;
}
}
public DnsCacheEntry cache(String hostname, DnsRecord[] additionals, Throwable cause, EventLoop loop) {
ObjectUtil.checkNotNull(hostname, "hostname");
ObjectUtil.checkNotNull(cause, "cause");
ObjectUtil.checkNotNull(loop, "loop");
DefaultDnsCache.DefaultDnsCacheEntry e = new DefaultDnsCache.DefaultDnsCacheEntry(hostname, cause);
if (this.negativeTtl != 0 && emptyAdditionals(additionals)) {
this.resolveCache.cache(appendDot(hostname), e, this.negativeTtl, loop);
return e;
} else {
return e;
}
}
从cache 的情况看, ttl 是从dns server 的A record拿出。
二、地址解析器图谱
1.AddressResolverGroup类图
AddressResolverGroup有连个实现, 如果从DNSAdressResolverGroup 是从DNSServer 来拿ttl, 不从JVM 来读。 DefaultAddressResolverGroup 是从JVM来读ttl。
image.png2.AddressResolver类图
image.png3.NameResolver类图
image.png4.关系图示
image.png三、地址解析过程
地址解析通过下面的方法来实现。分别看下isSupported、isResolved、doResolve的逻辑。
image.png
- isSupported主要判断传入socket地址是否属于InetSocketAddress,通过JDK中isInstance来实现。
- 2 doIsResolved判断包含了isSupported和非空判断,入参非空并且属于InetSocketAddress则标记解析成功
- doResolve 根据host name解析成InetSocketAddress,通过InetAddress.getByName(hostname)实现。
小结:地址解析主要得到SocketAddress是合法有效的,如果为host name默认为通过InetAddress.getByName转换为InetAddress。
四、 cache 过期
看一下connection 的基本创建过程
image.png
对于DNSAddressResolverGroup cache 过期刷新 io.netty.resolver.dns.Cache , 但是目前这个刷新时间不定, 并不能保证到了ttl 一定会刷新。
image.png private final class Entries extends AtomicReference<List<E>> implements Runnable {
private final String hostname;
volatile ScheduledFuture<?> expirationFuture;
Entries(String hostname) {
super(Collections.emptyList());
this.hostname = hostname;
}
void add(E e, int ttl, EventLoop loop) {
if (Cache.this.shouldReplaceAll(e)) {
this.set(Collections.singletonList(e));
this.scheduleCacheExpirationIfNeeded(ttl, loop);
} else {
List entries;
label59:
do {
while(true) {
while(true) {
entries = (List)this.get();
if (!entries.isEmpty()) {
E firstEntry = entries.get(0);
if (Cache.this.shouldReplaceAll(firstEntry)) {
assert entries.size() == 1;
continue label59;
}
List<E> newEntries = new ArrayList(entries.size() + 1);
int i = 0;
Object replacedEntry = null;
label54:
do {
E entry = entries.get(i);
if (Cache.this.equals(e, entry)) {
replacedEntry = entry;
newEntries.add(e);
++i;
while(true) {
if (i >= entries.size()) {
break label54;
}
newEntries.add(entries.get(i));
++i;
}
}
newEntries.add(entry);
++i;
} while(i < entries.size());
if (replacedEntry == null) {
newEntries.add(e);
}
Cache.this.sortEntries(this.hostname, newEntries);
if (this.compareAndSet(entries, Collections.unmodifiableList(newEntries))) {
this.scheduleCacheExpirationIfNeeded(ttl, loop);
return;
}
} else if (this.compareAndSet(entries, Collections.singletonList(e))) {
this.scheduleCacheExpirationIfNeeded(ttl, loop);
return;
}
}
}
} while(!this.compareAndSet(entries, Collections.singletonList(e)));
this.scheduleCacheExpirationIfNeeded(ttl, loop);
}
}
private void scheduleCacheExpirationIfNeeded(int ttl, EventLoop loop) {
while(true) {
ScheduledFuture<?> oldFuture = (ScheduledFuture)Cache.FUTURE_UPDATER.get(this);
if (oldFuture != null && oldFuture.getDelay(TimeUnit.SECONDS) <= (long)ttl) {
break;
}
ScheduledFuture<?> newFuture = loop.schedule(this, (long)ttl, TimeUnit.SECONDS);
if (Cache.FUTURE_UPDATER.compareAndSet(this, oldFuture, newFuture)) {
if (oldFuture != null) {
oldFuture.cancel(true);
}
break;
}
newFuture.cancel(true);
}
}
对于DefaultAddressResolverGroup cache 过期 ,大家可以参考InetAddress.getAllByName0, 从这个方法可以看出基于jvm 的tll 过期是非常准确的。
private static InetAddress[] getAllByName0(String host,
InetAddress reqAddr,
boolean check,
boolean useCache)
throws UnknownHostException {
/* If it gets here it is presumed to be a hostname */
/* make sure the connection to the host is allowed, before we
* give out a hostname
*/
if (check) {
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkConnect(host, -1);
}
}
// remove expired addresses from cache - expirySet keeps them ordered
// by expiry time so we only need to iterate the prefix of the NavigableSet...
long now = System.nanoTime();
for (CachedAddresses caddrs : expirySet) {
// compare difference of time instants rather than
// time instants directly, to avoid possible overflow.
// (see System.nanoTime() recommendations...)
if ((caddrs.expiryTime - now) < 0L) {
// ConcurrentSkipListSet uses weakly consistent iterator,
// so removing while iterating is OK...
if (expirySet.remove(caddrs)) {
// ... remove from cache
cache.remove(caddrs.host, caddrs);
}
} else {
// we encountered 1st element that expires in future
break;
}
}
// look-up or remove from cache
Addresses addrs;
if (useCache) {
addrs = cache.get(host);
} else {
addrs = cache.remove(host);
if (addrs != null) {
if (addrs instanceof CachedAddresses) {
// try removing from expirySet too if CachedAddresses
expirySet.remove(addrs);
}
addrs = null;
}
}
if (addrs == null) {
// create a NameServiceAddresses instance which will look up
// the name service and install it within cache...
Addresses oldAddrs = cache.putIfAbsent(
host,
addrs = new NameServiceAddresses(host, reqAddr)
);
if (oldAddrs != null) { // lost putIfAbsent race
addrs = oldAddrs;
}
}
// ask Addresses to get an array of InetAddress(es) and clone it
return addrs.get().clone();
}
DNS不刷新的理由
@Override
public final Mono<? extends Connection> acquire(
TransportConfig config,
ConnectionObserver connectionObserver,
@Nullable Supplier<? extends SocketAddress> remote,
@Nullable AddressResolverGroup<?> resolverGroup) {
Objects.requireNonNull(config, "config");
Objects.requireNonNull(connectionObserver, "connectionObserver");
Objects.requireNonNull(remote, "remoteAddress");
Objects.requireNonNull(resolverGroup, "resolverGroup");
return Mono.create(sink -> {
SocketAddress remoteAddress = Objects.requireNonNull(remote.get(), "Remote Address supplier returned null");
PoolKey holder = new PoolKey(remoteAddress, config.channelHash());
PoolFactory<T> poolFactory = poolFactory(remoteAddress);
InstrumentedPool<T> pool = MapUtils.computeIfAbsent(channelPools, holder, poolKey -> {
if (log.isDebugEnabled()) {
log.debug("Creating a new [{}] client pool [{}] for [{}]", name, poolFactory, remoteAddress);
}
InstrumentedPool<T> newPool = createPool(config, poolFactory, remoteAddress, resolverGroup);
if (poolFactory.metricsEnabled || config.metricsRecorder() != null) {
// registrar is null when metrics are enabled on HttpClient level or
// with the `metrics(boolean metricsEnabled)` method on ConnectionProvider
String id = poolKey.hashCode() + "";
if (poolFactory.registrar != null) {
poolFactory.registrar.get().registerMetrics(name, id, remoteAddress,
new DelegatingConnectionPoolMetrics(newPool.metrics()));
}
else if (Metrics.isInstrumentationAvailable()) {
// work directly with the pool otherwise a weak reference is needed to ConnectionPoolMetrics
// we don't want to keep another map with weak references
registerDefaultMetrics(id, remoteAddress, newPool.metrics());
}
}
return newPool;
});
EventLoop eventLoop;
if (sink.contextView().hasKey(CONTEXT_CALLER_EVENTLOOP)) {
eventLoop = sink.contextView().get(CONTEXT_CALLER_EVENTLOOP);
}
else {
EventLoopGroup group = config.loopResources().onClient(config.isPreferNative());
if (group instanceof ColocatedEventLoopGroup) {
eventLoop = ((ColocatedEventLoopGroup) group).nextInternal();
}
else {
eventLoop = null;
}
}
Mono<PooledRef<T>> mono = pool.acquire(Duration.ofMillis(poolFactory.pendingAcquireTimeout));
if (eventLoop != null) {
mono = mono.contextWrite(ctx -> ctx.put(CONTEXT_CALLER_EVENTLOOP, eventLoop));
}
mono.subscribe(createDisposableAcquire(config, connectionObserver,
poolFactory.pendingAcquireTimeout, pool, sink));
});
}
关键的原因是MapUtils.computeIfAbsent(channelPools, holder, poolKey -> {
当create pool 之后就不再需要channel pools, 可以理解成,如果长链接一旦建立,就不需要ttl了。
那么这个问题就是什么时候channel 会timeout ?
PooledConnectionProvider 里会有个schedule 来check pool 的activity ,
PooledConnectionProvider(Builder builder, @Nullable Clock clock) {
this.builder = builder;
this.name = builder.name;
this.inactivePoolDisposeInterval = builder.inactivePoolDisposeInterval;
this.poolInactivity = builder.poolInactivity;
this.disposeTimeout = builder.disposeTimeout;
this.defaultPoolFactory = new PoolFactory<>(builder, builder.disposeTimeout, clock);
for (Map.Entry<SocketAddress, ConnectionPoolSpec<?>> entry : builder.confPerRemoteHost.entrySet()) {
poolFactoryPerRemoteHost.put(entry.getKey(), new PoolFactory<>(entry.getValue(), builder.disposeTimeout));
maxConnections.put(entry.getKey(), entry.getValue().maxConnections);
}
this.onDispose = Mono.empty();
scheduleInactivePoolsDisposal();
}
但是这里是要通过!inactivePoolDisposeInterval.isZero()
来实现。这个是删除connection pool
final void scheduleInactivePoolsDisposal() {
if (!inactivePoolDisposeInterval.isZero()) {
Schedulers.parallel()
.schedule(this::disposeInactivePoolsInBackground, inactivePoolDisposeInterval.toMillis(), TimeUnit.MILLISECONDS);
}
}
final void disposeInactivePoolsInBackground() {
if (!channelPools.isEmpty()) {
List<Map.Entry<PoolKey, InstrumentedPool<T>>> toDispose;
toDispose = channelPools.entrySet()
.stream()
.filter(p -> p.getValue().metrics().isInactiveForMoreThan(poolInactivity))
.collect(Collectors.toList());
toDispose.forEach(e -> {
if (channelPools.remove(e.getKey(), e.getValue())) {
if (log.isDebugEnabled()) {
log.debug("ConnectionProvider[name={}]: Disposing inactive pool for [{}]", name, e.getKey().fqdn);
}
e.getValue().dispose();
}
});
}
scheduleInactivePoolsDisposal();
}
SimpleDequePool 也有 但是也要靠!this.poolConfig.evictInBackgroundInterval().isZero
来激活,这个是删除connection。
void scheduleEviction() {
if (!this.poolConfig.evictInBackgroundInterval().isZero()) {
long nanosEvictionInterval = this.poolConfig.evictInBackgroundInterval().toNanos();
this.evictionTask = this.poolConfig.evictInBackgroundScheduler().schedule(this::evictInBackground, nanosEvictionInterval, TimeUnit.NANOSECONDS);
} else {
this.evictionTask = Disposables.disposed();
}
}
void evictInBackground() {
Queue<SimpleDequePool.QueuePooledRef<POOLABLE>> e = (Queue)IDLE_RESOURCES.get(this);
if (e != null) {
if (WIP.getAndIncrement(this) == 0) {
if (this.pendingSize == 0) {
BiPredicate<POOLABLE, PooledRefMetadata> evictionPredicate = this.poolConfig.evictionPredicate();
Iterator iterator = e.iterator();
while(iterator.hasNext()) {
SimpleDequePool.QueuePooledRef<POOLABLE> pooledRef = (SimpleDequePool.QueuePooledRef)iterator.next();
if (evictionPredicate.test(pooledRef.poolable, pooledRef) && pooledRef.markDestroy()) {
this.recordInteractionTimestamp();
iterator.remove();
this.decrementIdle();
this.destroyPoolable(pooledRef).subscribe((v) -> {
}, (destroyError) -> {
this.logger.warn("Error while destroying resource in background eviction:", destroyError);
});
}
}
}
if (WIP.decrementAndGet(this) > 0) {
this.drainLoop();
}
}
this.scheduleEviction();
}
}
缺省情况下,这两个都不会被激活, 那么到底哪里去吧connection去掉的呢?起点在SimpleDequePool.doAcquire 里, 生成了一个
void doAcquire(Borrower<POOLABLE> borrower) {
if (this.isDisposed()) {
borrower.fail(new PoolShutdownException());
} else {
this.pendingOffer(borrower);
this.drain();
}
}
void drain() {
if (WIP.getAndIncrement(this) == 0) {
this.drainLoop();
}
}
private void drainLoop() {
this.recordInteractionTimestamp();
int maxPending = this.poolConfig.maxPending();
while(true) {
Deque<SimpleDequePool.QueuePooledRef<POOLABLE>> resources = (Deque)IDLE_RESOURCES.get(this);
ConcurrentLinkedDeque<Borrower<POOLABLE>> borrowers = (ConcurrentLinkedDeque)PENDING.get(this);
if (resources == null || borrowers == TERMINATED) {
return;
}
int borrowersCount = this.pendingSize;
int resourcesCount = this.idleSize;
if (borrowersCount != 0) {
Borrower borrower;
if (resourcesCount > 0) {
SimpleDequePool.QueuePooledRef<POOLABLE> slot = this.idleResourceLeastRecentlyUsed ? (SimpleDequePool.QueuePooledRef)resources.pollFirst() : (SimpleDequePool.QueuePooledRef)resources.pollLast();
if (slot == null) {
continue;
}
this.decrementIdle();
if (this.poolConfig.evictionPredicate().test(slot.poolable, slot)) {
if (slot.markDestroy()) {
this.destroyPoolable(slot).subscribe((Consumer)null, (error) -> {
this.drain();
}, this::drain);
}
continue;
}
borrower = this.pendingPoll(borrowers);
if (borrower == null) {
if (this.idleResourceLeastRecentlyUsed) {
resources.offerFirst(slot);
} else {
resources.offerLast(slot);
}
this.incrementIdle();
continue;
}
if (this.isDisposed()) {
slot.invalidate().subscribe();
borrower.fail(new PoolShutdownException());
return;
}
borrower.stopPendingCountdown();
ACQUIRED.incrementAndGet(this);
this.poolConfig.acquisitionScheduler().schedule(() -> {
borrower.deliver(slot);
});
} else {
int permits = this.poolConfig.allocationStrategy().getPermits(1);
if (permits <= 0) {
if (maxPending >= 0) {
borrowersCount = this.pendingSize;
int toCull = borrowersCount - maxPending;
for(int i = 0; i < toCull; ++i) {
Borrower<POOLABLE> extraneous = this.pendingPoll(borrowers);
if (extraneous != null) {
if (maxPending == 0) {
extraneous.fail(new PoolAcquirePendingLimitException(0, "No pending allowed and pool has reached allocation limit"));
} else {
extraneous.fail(new PoolAcquirePendingLimitException(maxPending));
}
}
}
}
} else {
borrower = this.pendingPoll(borrowers);
if (borrower == null) {
continue;
}
if (this.isDisposed()) {
borrower.fail(new PoolShutdownException());
return;
}
borrower.stopPendingCountdown();
long start = this.clock.millis();
Mono<POOLABLE> allocator = this.allocatorWithScheduler();
Mono<POOLABLE> primary = allocator.doOnEach((sig) -> {
if (sig.isOnNext()) {
POOLABLE newInstance = sig.get();
assert newInstance != null;
ACQUIRED.incrementAndGet(this);
this.metricsRecorder.recordAllocationSuccessAndLatency(this.clock.millis() - start);
borrower.deliver(this.createSlot(newInstance));
} else if (sig.isOnError()) {
Throwable error = sig.getThrowable();
assert error != null;
this.metricsRecorder.recordAllocationFailureAndLatency(this.clock.millis() - start);
this.poolConfig.allocationStrategy().returnPermits(1);
borrower.fail(error);
}
}).contextWrite(borrower.currentContext());
if (permits == 1) {
primary.subscribe((alreadyPropagated) -> {
}, (alreadyPropagatedOrLogged) -> {
this.drain();
}, this::drain);
} else {
int toWarmup = permits - 1;
this.logger.debug("should warm up {} extra resources", new Object[]{toWarmup});
long startWarmupIteration = this.clock.millis();
Flux<Void> warmupFlux = Flux.range(1, toWarmup).flatMap((ix) -> {
return this.warmupMono(ix, toWarmup, startWarmupIteration, allocator);
});
primary.onErrorResume((e) -> {
return Mono.empty();
}).thenMany(warmupFlux).subscribe((aVoid) -> {
}, (alreadyPropagatedOrLogged) -> {
this.drain();
}, this::drain);
}
}
}
}
if (WIP.decrementAndGet(this) == 0) {
this.recordInteractionTimestamp();
return;
}
}
}
因此让长连接过期,可以设置下面两个参数, 通常用maxLifeTime 来决定连接时间。
maxIdleTime - 有资格成为的时间 空闲时关闭(分辨率:ms)。默认值:最大空闲时间不是 指定。
maxLifeTime - 符合条件的总生命周期 关闭(分辨率:毫秒)。默认值:最长寿命不是 指定。
代码可以这样
image.png如果按照上面的解释,那么流量不太可能去到,新的pool 去,那么为啥流量还是慢慢过去了。这是因为在client端配置了ReadTimeoutHandler 和 WriteTimeoutHandler
HttpClient httpClient = (HttpClient)((HttpClient)(custClient == null ? HttpClient.create() : custClient).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, bean.getConnectTimeout())).responseTimeout(Duration.ofMillis((long)bean.getResponseTimeout())).doOnConnected((conn) -> {
conn.addHandlerLast(new ReadTimeoutHandler(500l, TimeUnit.MILLISECONDS)).addHandlerLast(new WriteTimeoutHandler(500l, TimeUnit.MILLISECONDS));
});
而ReadTimeoutHandler在readtimeout是会断开连接,重连时候会从新拿新的dns 对应的ip。
public class ReadTimeoutHandler extends IdleStateHandler {
private boolean closed;
public ReadTimeoutHandler(int timeoutSeconds) {
this((long)timeoutSeconds, TimeUnit.SECONDS);
}
public ReadTimeoutHandler(long timeout, TimeUnit unit) {
super(timeout, 0L, 0L, unit);
}
protected final void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
assert evt.state() == IdleState.READER_IDLE;
this.readTimedOut(ctx);
}
protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
if (!this.closed) {
ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE);
ctx.close();
this.closed = true;
}
}
}
同理WriteTimeoutHandler 也是同样的道理
protected void writeTimedOut(ChannelHandlerContext ctx) throws Exception {
if (!this.closed) {
ctx.fireExceptionCaught(WriteTimeoutException.INSTANCE);
ctx.close();
this.closed = true;
}
}
网友评论