美文网首页
NettyClient DNS refresh

NettyClient DNS refresh

作者: 程序员札记 | 来源:发表于2023-03-28 14:40 被阅读0次

前两天做了流量迁移,发现cnames 迁移之后。上游的pool 虽然调用cname, 但是流量一直没有迁移过来,还是坚韧的停留在老的pool中, 我们需要查明一下DNS 为啥没有refresh。流量没有切换的client 都是来自于WebFlux 的 Webclient, 内部实现是用的netty 的reactor.netty.http.client.HttpClient

idea.debugger.rt.GeneratedEvaluationClass.invoke(GeneratedEvaluationClass.java:5)
io.netty.resolver.dns.DnsResolveContext.query(DnsResolveContext.java:416)
io.netty.resolver.dns.DnsResolveContext.query(DnsResolveContext.java:1135)
io.netty.resolver.dns.DnsResolveContext.internalResolve(DnsResolveContext.java:358)
io.netty.resolver.dns.DnsResolveContext.doSearchDomainQuery(DnsResolveContext.java:284)
io.netty.resolver.dns.DnsAddressResolveContext.doSearchDomainQuery(DnsAddressResolveContext.java:96)
io.netty.resolver.dns.DnsResolveContext.resolve(DnsResolveContext.java:249)
io.netty.resolver.dns.DnsNameResolver.doResolveAllUncached0(DnsNameResolver.java:1150)
----- executor.execute
idea.debugger.rt.GeneratedEvaluationClass.invoke(GeneratedEvaluationClass.java:5)
io.netty.resolver.dns.DnsNameResolver.doResolveAllUncached(DnsNameResolver.java:1121)
io.netty.resolver.dns.DnsNameResolver.doResolveAll(DnsNameResolver.java:1072)
io.netty.resolver.dns.DnsNameResolver.doResolveAll(DnsNameResolver.java:1040)
io.netty.resolver.SimpleNameResolver.resolveAll(SimpleNameResolver.java:79)
io.netty.resolver.dns.InflightNameResolver.resolve(InflightNameResolver.java:96)
io.netty.resolver.dns.InflightNameResolver.resolveAll(InflightNameResolver.java:71)
io.netty.resolver.dns.InflightNameResolver.resolveAll(InflightNameResolver.java:56)
io.netty.resolver.InetSocketAddressResolver.doResolveAll(InetSocketAddressResolver.java:73)
io.netty.resolver.InetSocketAddressResolver.doResolveAll(InetSocketAddressResolver.java:31)
io.netty.resolver.AbstractAddressResolver.resolveAll(AbstractAddressResolver.java:158)
reactor.netty.transport.TransportConnector.doResolveAndConnect(TransportConnector.java:346)
connect:163, TransportConnector (reactor.netty.transport)
connect:123, TransportConnector (reactor.netty.transport)
lambda$connectChannel$0:519, DefaultPooledConnectionProvider$PooledConnectionAllocator (reactor.netty.resources)
createPool:95, DefaultPooledConnectionProvider (reactor.netty.resources)
lambda$acquire$0:133, PooledConnectionProvider (reactor.netty.resources)
apply:-1, 283293993 (reactor.netty.resources.PooledConnectionProvider$$Lambda$1791)
computeIfAbsent:1705, ConcurrentHashMap (java.util.concurrent)
computeIfAbsent:46, MapUtils (reactor.netty.internal.util)
lambda$acquire$2:128, PooledConnectionProvider (reactor.netty.resources)
accept:-1, 762174048 (reactor.netty.resources.PooledConnectionProvider$$Lambda$1790)
subscribe:58, MonoCreate (reactor.core.publisher)
lambda$subscribe$0:269, HttpClientConnect$MonoHttpConnect (reactor.netty.http.client)

调用过程的堆栈大致如此,因为是reactive 编程, 并不能从一个线程来抓到所有。 从这个堆栈可以看出,这个dns是从dns cache 里拿到的, 什么时候cache 被build

cache:137, DefaultDnsCache (io.netty.resolver.dns)
cache:83, DnsAddressResolveContext (io.netty.resolver.dns)
cache:30, DnsAddressResolveContext (io.netty.resolver.dns)
onExpectedResponse:880, DnsResolveContext (io.netty.resolver.dns)
onResponse:616, DnsResolveContext (io.netty.resolver.dns)
access$400:66, DnsResolveContext (io.netty.resolver.dns)
operationComplete:478, DnsResolveContext$2 (io.netty.resolver.dns)

如果一个call 成功, 这个dns 就会被cache , ttl 也在此时设置。see DefaultDnsCache

    public DnsCacheEntry cache(String hostname, DnsRecord[] additionals, InetAddress address, long originalTtl, EventLoop loop) {
        ObjectUtil.checkNotNull(hostname, "hostname");
        ObjectUtil.checkNotNull(address, "address");
        ObjectUtil.checkNotNull(loop, "loop");
        DefaultDnsCache.DefaultDnsCacheEntry e = new DefaultDnsCache.DefaultDnsCacheEntry(hostname, address);
        if (this.maxTtl != 0 && emptyAdditionals(additionals)) {
            this.resolveCache.cache(appendDot(hostname), e, Math.max(this.minTtl, (int)Math.min((long)this.maxTtl, originalTtl)), loop);
            return e;
        } else {
            return e;
        }
    }

    public DnsCacheEntry cache(String hostname, DnsRecord[] additionals, Throwable cause, EventLoop loop) {
        ObjectUtil.checkNotNull(hostname, "hostname");
        ObjectUtil.checkNotNull(cause, "cause");
        ObjectUtil.checkNotNull(loop, "loop");
        DefaultDnsCache.DefaultDnsCacheEntry e = new DefaultDnsCache.DefaultDnsCacheEntry(hostname, cause);
        if (this.negativeTtl != 0 && emptyAdditionals(additionals)) {
            this.resolveCache.cache(appendDot(hostname), e, this.negativeTtl, loop);
            return e;
        } else {
            return e;
        }
    }

从cache 的情况看, ttl 是从dns server 的A record拿出。

二、地址解析器图谱

1.AddressResolverGroup类图

AddressResolverGroup有连个实现, 如果从DNSAdressResolverGroup 是从DNSServer 来拿ttl, 不从JVM 来读。 DefaultAddressResolverGroup 是从JVM来读ttl。

image.png

2.AddressResolver类图

image.png

3.NameResolver类图

image.png

4.关系图示

image.png

三、地址解析过程

地址解析通过下面的方法来实现。分别看下isSupported、isResolved、doResolve的逻辑。


image.png
  • isSupported主要判断传入socket地址是否属于InetSocketAddress,通过JDK中isInstance来实现。
  • 2 doIsResolved判断包含了isSupported和非空判断,入参非空并且属于InetSocketAddress则标记解析成功
  • doResolve 根据host name解析成InetSocketAddress,通过InetAddress.getByName(hostname)实现。

小结:地址解析主要得到SocketAddress是合法有效的,如果为host name默认为通过InetAddress.getByName转换为InetAddress。

四、 cache 过期
看一下connection 的基本创建过程


image.png

对于DNSAddressResolverGroup cache 过期刷新 io.netty.resolver.dns.Cache , 但是目前这个刷新时间不定, 并不能保证到了ttl 一定会刷新。

image.png
    private final class Entries extends AtomicReference<List<E>> implements Runnable {
        private final String hostname;
        volatile ScheduledFuture<?> expirationFuture;

        Entries(String hostname) {
            super(Collections.emptyList());
            this.hostname = hostname;
        }

        void add(E e, int ttl, EventLoop loop) {
            if (Cache.this.shouldReplaceAll(e)) {
                this.set(Collections.singletonList(e));
                this.scheduleCacheExpirationIfNeeded(ttl, loop);
            } else {
                List entries;
                label59:
                do {
                    while(true) {
                        while(true) {
                            entries = (List)this.get();
                            if (!entries.isEmpty()) {
                                E firstEntry = entries.get(0);
                                if (Cache.this.shouldReplaceAll(firstEntry)) {
                                    assert entries.size() == 1;
                                    continue label59;
                                }

                                List<E> newEntries = new ArrayList(entries.size() + 1);
                                int i = 0;
                                Object replacedEntry = null;

                                label54:
                                do {
                                    E entry = entries.get(i);
                                    if (Cache.this.equals(e, entry)) {
                                        replacedEntry = entry;
                                        newEntries.add(e);
                                        ++i;

                                        while(true) {
                                            if (i >= entries.size()) {
                                                break label54;
                                            }

                                            newEntries.add(entries.get(i));
                                            ++i;
                                        }
                                    }

                                    newEntries.add(entry);
                                    ++i;
                                } while(i < entries.size());

                                if (replacedEntry == null) {
                                    newEntries.add(e);
                                }

                                Cache.this.sortEntries(this.hostname, newEntries);
                                if (this.compareAndSet(entries, Collections.unmodifiableList(newEntries))) {
                                    this.scheduleCacheExpirationIfNeeded(ttl, loop);
                                    return;
                                }
                            } else if (this.compareAndSet(entries, Collections.singletonList(e))) {
                                this.scheduleCacheExpirationIfNeeded(ttl, loop);
                                return;
                            }
                        }
                    }
                } while(!this.compareAndSet(entries, Collections.singletonList(e)));

                this.scheduleCacheExpirationIfNeeded(ttl, loop);
            }
        }

        private void scheduleCacheExpirationIfNeeded(int ttl, EventLoop loop) {
            while(true) {
                ScheduledFuture<?> oldFuture = (ScheduledFuture)Cache.FUTURE_UPDATER.get(this);
                if (oldFuture != null && oldFuture.getDelay(TimeUnit.SECONDS) <= (long)ttl) {
                    break;
                }

                ScheduledFuture<?> newFuture = loop.schedule(this, (long)ttl, TimeUnit.SECONDS);
                if (Cache.FUTURE_UPDATER.compareAndSet(this, oldFuture, newFuture)) {
                    if (oldFuture != null) {
                        oldFuture.cancel(true);
                    }
                    break;
                }

                newFuture.cancel(true);
            }

        }

对于DefaultAddressResolverGroup cache 过期 ,大家可以参考InetAddress.getAllByName0, 从这个方法可以看出基于jvm 的tll 过期是非常准确的。

    private static InetAddress[] getAllByName0(String host,
                                               InetAddress reqAddr,
                                               boolean check,
                                               boolean useCache)
        throws UnknownHostException  {

        /* If it gets here it is presumed to be a hostname */

        /* make sure the connection to the host is allowed, before we
         * give out a hostname
         */
        if (check) {
            SecurityManager security = System.getSecurityManager();
            if (security != null) {
                security.checkConnect(host, -1);
            }
        }

        // remove expired addresses from cache - expirySet keeps them ordered
        // by expiry time so we only need to iterate the prefix of the NavigableSet...
        long now = System.nanoTime();
        for (CachedAddresses caddrs : expirySet) {
            // compare difference of time instants rather than
            // time instants directly, to avoid possible overflow.
            // (see System.nanoTime() recommendations...)
            if ((caddrs.expiryTime - now) < 0L) {
                // ConcurrentSkipListSet uses weakly consistent iterator,
                // so removing while iterating is OK...
                if (expirySet.remove(caddrs)) {
                    // ... remove from cache
                    cache.remove(caddrs.host, caddrs);
                }
            } else {
                // we encountered 1st element that expires in future
                break;
            }
        }

        // look-up or remove from cache
        Addresses addrs;
        if (useCache) {
            addrs = cache.get(host);
        } else {
            addrs = cache.remove(host);
            if (addrs != null) {
                if (addrs instanceof CachedAddresses) {
                    // try removing from expirySet too if CachedAddresses
                    expirySet.remove(addrs);
                }
                addrs = null;
            }
        }

        if (addrs == null) {
            // create a NameServiceAddresses instance which will look up
            // the name service and install it within cache...
            Addresses oldAddrs = cache.putIfAbsent(
                host,
                addrs = new NameServiceAddresses(host, reqAddr)
            );
            if (oldAddrs != null) { // lost putIfAbsent race
                addrs = oldAddrs;
            }
        }

        // ask Addresses to get an array of InetAddress(es) and clone it
        return addrs.get().clone();
    }

DNS不刷新的理由

@Override
    public final Mono<? extends Connection> acquire(
            TransportConfig config,
            ConnectionObserver connectionObserver,
            @Nullable Supplier<? extends SocketAddress> remote,
            @Nullable AddressResolverGroup<?> resolverGroup) {
        Objects.requireNonNull(config, "config");
        Objects.requireNonNull(connectionObserver, "connectionObserver");
        Objects.requireNonNull(remote, "remoteAddress");
        Objects.requireNonNull(resolverGroup, "resolverGroup");
        return Mono.create(sink -> {
            SocketAddress remoteAddress = Objects.requireNonNull(remote.get(), "Remote Address supplier returned null");
            PoolKey holder = new PoolKey(remoteAddress, config.channelHash());
            PoolFactory<T> poolFactory = poolFactory(remoteAddress);
            InstrumentedPool<T> pool = MapUtils.computeIfAbsent(channelPools, holder, poolKey -> {
                if (log.isDebugEnabled()) {
                    log.debug("Creating a new [{}] client pool [{}] for [{}]", name, poolFactory, remoteAddress);
                }

                InstrumentedPool<T> newPool = createPool(config, poolFactory, remoteAddress, resolverGroup);

                if (poolFactory.metricsEnabled || config.metricsRecorder() != null) {
                    // registrar is null when metrics are enabled on HttpClient level or
                    // with the `metrics(boolean metricsEnabled)` method on ConnectionProvider
                    String id = poolKey.hashCode() + "";
                    if (poolFactory.registrar != null) {
                        poolFactory.registrar.get().registerMetrics(name, id, remoteAddress,
                                new DelegatingConnectionPoolMetrics(newPool.metrics()));
                    }
                    else if (Metrics.isInstrumentationAvailable()) {
                        // work directly with the pool otherwise a weak reference is needed to ConnectionPoolMetrics
                        // we don't want to keep another map with weak references
                        registerDefaultMetrics(id, remoteAddress, newPool.metrics());
                    }
                }
                return newPool;
            });

            EventLoop eventLoop;
            if (sink.contextView().hasKey(CONTEXT_CALLER_EVENTLOOP)) {
                eventLoop = sink.contextView().get(CONTEXT_CALLER_EVENTLOOP);
            }
            else {
                EventLoopGroup group = config.loopResources().onClient(config.isPreferNative());
                if (group instanceof ColocatedEventLoopGroup) {
                    eventLoop = ((ColocatedEventLoopGroup) group).nextInternal();
                }
                else {
                    eventLoop = null;
                }
            }

            Mono<PooledRef<T>> mono = pool.acquire(Duration.ofMillis(poolFactory.pendingAcquireTimeout));
            if (eventLoop != null) {
                mono = mono.contextWrite(ctx -> ctx.put(CONTEXT_CALLER_EVENTLOOP, eventLoop));
            }
            mono.subscribe(createDisposableAcquire(config, connectionObserver,
                    poolFactory.pendingAcquireTimeout, pool, sink));
        });
    }

关键的原因是MapUtils.computeIfAbsent(channelPools, holder, poolKey -> { 当create pool 之后就不再需要channel pools, 可以理解成,如果长链接一旦建立,就不需要ttl了。

image.png

那么这个问题就是什么时候channel 会timeout ?
PooledConnectionProvider 里会有个schedule 来check pool 的activity ,


    PooledConnectionProvider(Builder builder, @Nullable Clock clock) {
        this.builder = builder;
        this.name = builder.name;
        this.inactivePoolDisposeInterval = builder.inactivePoolDisposeInterval;
        this.poolInactivity = builder.poolInactivity;
        this.disposeTimeout = builder.disposeTimeout;
        this.defaultPoolFactory = new PoolFactory<>(builder, builder.disposeTimeout, clock);
        for (Map.Entry<SocketAddress, ConnectionPoolSpec<?>> entry : builder.confPerRemoteHost.entrySet()) {
            poolFactoryPerRemoteHost.put(entry.getKey(), new PoolFactory<>(entry.getValue(), builder.disposeTimeout));
            maxConnections.put(entry.getKey(), entry.getValue().maxConnections);
        }
        this.onDispose = Mono.empty();
        scheduleInactivePoolsDisposal();
    }

但是这里是要通过!inactivePoolDisposeInterval.isZero() 来实现。这个是删除connection pool



    final void scheduleInactivePoolsDisposal() {
        if (!inactivePoolDisposeInterval.isZero()) {
            Schedulers.parallel()
                      .schedule(this::disposeInactivePoolsInBackground, inactivePoolDisposeInterval.toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    final void disposeInactivePoolsInBackground() {
        if (!channelPools.isEmpty()) {
            List<Map.Entry<PoolKey, InstrumentedPool<T>>> toDispose;

            toDispose = channelPools.entrySet()
                                    .stream()
                                    .filter(p -> p.getValue().metrics().isInactiveForMoreThan(poolInactivity))
                                    .collect(Collectors.toList());

            toDispose.forEach(e -> {
                if (channelPools.remove(e.getKey(), e.getValue())) {
                    if (log.isDebugEnabled()) {
                        log.debug("ConnectionProvider[name={}]: Disposing inactive pool for [{}]", name, e.getKey().fqdn);
                    }
                    e.getValue().dispose();
                }
            });
        }
        scheduleInactivePoolsDisposal();
    }


SimpleDequePool 也有 但是也要靠!this.poolConfig.evictInBackgroundInterval().isZero 来激活,这个是删除connection。

  void scheduleEviction() {
        if (!this.poolConfig.evictInBackgroundInterval().isZero()) {
            long nanosEvictionInterval = this.poolConfig.evictInBackgroundInterval().toNanos();
            this.evictionTask = this.poolConfig.evictInBackgroundScheduler().schedule(this::evictInBackground, nanosEvictionInterval, TimeUnit.NANOSECONDS);
        } else {
            this.evictionTask = Disposables.disposed();
        }

    }

    void evictInBackground() {
        Queue<SimpleDequePool.QueuePooledRef<POOLABLE>> e = (Queue)IDLE_RESOURCES.get(this);
        if (e != null) {
            if (WIP.getAndIncrement(this) == 0) {
                if (this.pendingSize == 0) {
                    BiPredicate<POOLABLE, PooledRefMetadata> evictionPredicate = this.poolConfig.evictionPredicate();
                    Iterator iterator = e.iterator();

                    while(iterator.hasNext()) {
                        SimpleDequePool.QueuePooledRef<POOLABLE> pooledRef = (SimpleDequePool.QueuePooledRef)iterator.next();
                        if (evictionPredicate.test(pooledRef.poolable, pooledRef) && pooledRef.markDestroy()) {
                            this.recordInteractionTimestamp();
                            iterator.remove();
                            this.decrementIdle();
                            this.destroyPoolable(pooledRef).subscribe((v) -> {
                            }, (destroyError) -> {
                                this.logger.warn("Error while destroying resource in background eviction:", destroyError);
                            });
                        }
                    }
                }

                if (WIP.decrementAndGet(this) > 0) {
                    this.drainLoop();
                }
            }

            this.scheduleEviction();
        }
    }


缺省情况下,这两个都不会被激活, 那么到底哪里去吧connection去掉的呢?起点在SimpleDequePool.doAcquire 里, 生成了一个

 void doAcquire(Borrower<POOLABLE> borrower) {
        if (this.isDisposed()) {
            borrower.fail(new PoolShutdownException());
        } else {
            this.pendingOffer(borrower);
            this.drain();
        }
    }

    void drain() {
        if (WIP.getAndIncrement(this) == 0) {
            this.drainLoop();
        }

    }

    private void drainLoop() {
        this.recordInteractionTimestamp();
        int maxPending = this.poolConfig.maxPending();

        while(true) {
            Deque<SimpleDequePool.QueuePooledRef<POOLABLE>> resources = (Deque)IDLE_RESOURCES.get(this);
            ConcurrentLinkedDeque<Borrower<POOLABLE>> borrowers = (ConcurrentLinkedDeque)PENDING.get(this);
            if (resources == null || borrowers == TERMINATED) {
                return;
            }

            int borrowersCount = this.pendingSize;
            int resourcesCount = this.idleSize;
            if (borrowersCount != 0) {
                Borrower borrower;
                if (resourcesCount > 0) {
                    SimpleDequePool.QueuePooledRef<POOLABLE> slot = this.idleResourceLeastRecentlyUsed ? (SimpleDequePool.QueuePooledRef)resources.pollFirst() : (SimpleDequePool.QueuePooledRef)resources.pollLast();
                    if (slot == null) {
                        continue;
                    }

                    this.decrementIdle();
                    if (this.poolConfig.evictionPredicate().test(slot.poolable, slot)) {
                        if (slot.markDestroy()) {
                            this.destroyPoolable(slot).subscribe((Consumer)null, (error) -> {
                                this.drain();
                            }, this::drain);
                        }
                        continue;
                    }

                    borrower = this.pendingPoll(borrowers);
                    if (borrower == null) {
                        if (this.idleResourceLeastRecentlyUsed) {
                            resources.offerFirst(slot);
                        } else {
                            resources.offerLast(slot);
                        }

                        this.incrementIdle();
                        continue;
                    }

                    if (this.isDisposed()) {
                        slot.invalidate().subscribe();
                        borrower.fail(new PoolShutdownException());
                        return;
                    }

                    borrower.stopPendingCountdown();
                    ACQUIRED.incrementAndGet(this);
                    this.poolConfig.acquisitionScheduler().schedule(() -> {
                        borrower.deliver(slot);
                    });
                } else {
                    int permits = this.poolConfig.allocationStrategy().getPermits(1);
                    if (permits <= 0) {
                        if (maxPending >= 0) {
                            borrowersCount = this.pendingSize;
                            int toCull = borrowersCount - maxPending;

                            for(int i = 0; i < toCull; ++i) {
                                Borrower<POOLABLE> extraneous = this.pendingPoll(borrowers);
                                if (extraneous != null) {
                                    if (maxPending == 0) {
                                        extraneous.fail(new PoolAcquirePendingLimitException(0, "No pending allowed and pool has reached allocation limit"));
                                    } else {
                                        extraneous.fail(new PoolAcquirePendingLimitException(maxPending));
                                    }
                                }
                            }
                        }
                    } else {
                        borrower = this.pendingPoll(borrowers);
                        if (borrower == null) {
                            continue;
                        }

                        if (this.isDisposed()) {
                            borrower.fail(new PoolShutdownException());
                            return;
                        }

                        borrower.stopPendingCountdown();
                        long start = this.clock.millis();
                        Mono<POOLABLE> allocator = this.allocatorWithScheduler();
                        Mono<POOLABLE> primary = allocator.doOnEach((sig) -> {
                            if (sig.isOnNext()) {
                                POOLABLE newInstance = sig.get();

                                assert newInstance != null;

                                ACQUIRED.incrementAndGet(this);
                                this.metricsRecorder.recordAllocationSuccessAndLatency(this.clock.millis() - start);
                                borrower.deliver(this.createSlot(newInstance));
                            } else if (sig.isOnError()) {
                                Throwable error = sig.getThrowable();

                                assert error != null;

                                this.metricsRecorder.recordAllocationFailureAndLatency(this.clock.millis() - start);
                                this.poolConfig.allocationStrategy().returnPermits(1);
                                borrower.fail(error);
                            }

                        }).contextWrite(borrower.currentContext());
                        if (permits == 1) {
                            primary.subscribe((alreadyPropagated) -> {
                            }, (alreadyPropagatedOrLogged) -> {
                                this.drain();
                            }, this::drain);
                        } else {
                            int toWarmup = permits - 1;
                            this.logger.debug("should warm up {} extra resources", new Object[]{toWarmup});
                            long startWarmupIteration = this.clock.millis();
                            Flux<Void> warmupFlux = Flux.range(1, toWarmup).flatMap((ix) -> {
                                return this.warmupMono(ix, toWarmup, startWarmupIteration, allocator);
                            });
                            primary.onErrorResume((e) -> {
                                return Mono.empty();
                            }).thenMany(warmupFlux).subscribe((aVoid) -> {
                            }, (alreadyPropagatedOrLogged) -> {
                                this.drain();
                            }, this::drain);
                        }
                    }
                }
            }

            if (WIP.decrementAndGet(this) == 0) {
                this.recordInteractionTimestamp();
                return;
            }
        }
    }



因此让长连接过期,可以设置下面两个参数, 通常用maxLifeTime 来决定连接时间。
maxIdleTime - 有资格成为的时间 空闲时关闭(分辨率:ms)。默认值:最大空闲时间不是 指定。

maxLifeTime - 符合条件的总生命周期 关闭(分辨率:毫秒)。默认值:最长寿命不是 指定。

代码可以这样

image.png

如果按照上面的解释,那么流量不太可能去到,新的pool 去,那么为啥流量还是慢慢过去了。这是因为在client端配置了ReadTimeoutHandler 和 WriteTimeoutHandler

               HttpClient httpClient = (HttpClient)((HttpClient)(custClient == null ? HttpClient.create() : custClient).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, bean.getConnectTimeout())).responseTimeout(Duration.ofMillis((long)bean.getResponseTimeout())).doOnConnected((conn) -> {
                    conn.addHandlerLast(new ReadTimeoutHandler(500l, TimeUnit.MILLISECONDS)).addHandlerLast(new WriteTimeoutHandler(500l, TimeUnit.MILLISECONDS));
                });

而ReadTimeoutHandler在readtimeout是会断开连接,重连时候会从新拿新的dns 对应的ip。

public class ReadTimeoutHandler extends IdleStateHandler {
    private boolean closed;

    public ReadTimeoutHandler(int timeoutSeconds) {
        this((long)timeoutSeconds, TimeUnit.SECONDS);
    }

    public ReadTimeoutHandler(long timeout, TimeUnit unit) {
        super(timeout, 0L, 0L, unit);
    }

    protected final void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
        assert evt.state() == IdleState.READER_IDLE;

        this.readTimedOut(ctx);
    }

    protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
        if (!this.closed) {
            ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE);
            ctx.close();
            this.closed = true;
        }

    }
}

同理WriteTimeoutHandler 也是同样的道理


    protected void writeTimedOut(ChannelHandlerContext ctx) throws Exception {
        if (!this.closed) {
            ctx.fireExceptionCaught(WriteTimeoutException.INSTANCE);
            ctx.close();
            this.closed = true;
        }

    }


相关文章

网友评论

      本文标题:NettyClient DNS refresh

      本文链接:https://www.haomeiwen.com/subject/fjbtddtx.html