美文网首页
flink elasticsearch sink连接报错:Fai

flink elasticsearch sink连接报错:Fai

作者: 不将就51y | 来源:发表于2021-08-03 11:19 被阅读0次

    异常堆栈

    java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
        at sun.nio.ch.IOUtil.read(IOUtil.java:197)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
        at org.apache.http.impl.nio.reactor.SessionInputBufferImpl.fill(SessionInputBufferImpl.java:231)
        at org.apache.http.impl.nio.codecs.AbstractMessageParser.fillBuffer(AbstractMessageParser.java:136)
        at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:241)
        at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
        at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
        at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
        at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
        at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
        at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
        at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
        at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
        at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
        at java.lang.Thread.run(Thread.java:748)
        Suppressed: java.io.IOException: Connection reset by peer
            ... 18 more
            Suppressed: java.io.IOException: Connection reset by peer
                ... 18 more
    

    <meta charset="utf-8">

    1.长连接请求

    以浏览器发起请求为例,在Request Headers中会包含Connection: keep-alive信息

    image

    当tomcat收到浏览器端的长连接请求后,如在限制范围内,则会在Response Headers中返回Connection: keep-alive以保持连接不断开

    image

    如要断开长连接则在Request Headers中的信息为Connection: close

    2、了解了长连接原理后查看源码ElasticsearchSinkBase.java的创建restclient的open

        @Override
        public void open(Configuration parameters) throws Exception {
            client = callBridge.createClient(userConfig);
            bulkProcessor = buildBulkProcessor(new BulkProcessorListener());
            requestIndexer = callBridge.createBulkProcessorIndexer(bulkProcessor, flushOnCheckpoint, numPendingRequests);
            failureRequestIndexer = new BufferingNoOpRequestIndexer();
        }
    
        @Override
        public RestHighLevelClient createClient(Map<String, String> clientConfig) throws IOException {
            RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new HttpHost[httpHosts.size()]));
            restClientFactory.configureRestClientBuilder(builder);
    
            RestHighLevelClient rhlClient = new RestHighLevelClient(builder);
    
            if (LOG.isInfoEnabled()) {
                LOG.info("Pinging Elasticsearch cluster via hosts {} ...", httpHosts);
            }
    
            if (!rhlClient.ping(RequestOptions.DEFAULT)) {
                throw new RuntimeException("There are no reachable Elasticsearch nodes!");
            }
    
            if (LOG.isInfoEnabled()) {
                LOG.info("Created Elasticsearch RestHighLevelClient connected to {}", httpHosts.toString());
            }
    
            return rhlClient;
        }
    

    可以看到这里创建了一个RestHighLevelClient,继续查看构造函数

     /**
         * Creates a {@link RestHighLevelClient} given the low level {@link RestClientBuilder} that allows to build the
         * {@link RestClient} to be used to perform requests.
         */
        public RestHighLevelClient(RestClientBuilder restClientBuilder) {
            this(restClientBuilder, Collections.emptyList());
        }
    
        /**
         * Creates a {@link RestHighLevelClient} given the low level {@link RestClientBuilder} that allows to build the
         * {@link RestClient} to be used to perform requests and parsers for custom response sections added to Elasticsearch through plugins.
         */
        protected RestHighLevelClient(RestClientBuilder restClientBuilder, List<NamedXContentRegistry.Entry> namedXContentEntries) {
            this(restClientBuilder.build(), RestClient::close, namedXContentEntries);
        }
    
        //进入restClientBuilder.build()
    
       /**
         * Creates a new {@link RestClient} based on the provided configuration.
         */
        public RestClient build() {
            if (failureListener == null) {
                failureListener = new RestClient.FailureListener();
            }
            CloseableHttpAsyncClient httpClient = AccessController.doPrivileged(
                (PrivilegedAction<CloseableHttpAsyncClient>) this::createHttpClient);
            RestClient restClient = new RestClient(httpClient, defaultHeaders, nodes,
                    pathPrefix, failureListener, nodeSelector, strictDeprecationMode);
            httpClient.start();
            return restClient;
        }
        //进入 this::createHttpClient
            private CloseableHttpAsyncClient createHttpClient() {
            //default timeouts are all infinite
            RequestConfig.Builder requestConfigBuilder = RequestConfig.custom()
                    .setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_MILLIS)
                    .setSocketTimeout(DEFAULT_SOCKET_TIMEOUT_MILLIS);
            if (requestConfigCallback != null) {
                requestConfigBuilder = requestConfigCallback.customizeRequestConfig(requestConfigBuilder);
            }
    
            try {
                HttpAsyncClientBuilder httpClientBuilder = HttpAsyncClientBuilder.create().setDefaultRequestConfig(requestConfigBuilder.build())
                    //default settings for connection pooling may be too constraining
                    .setMaxConnPerRoute(DEFAULT_MAX_CONN_PER_ROUTE).setMaxConnTotal(DEFAULT_MAX_CONN_TOTAL)
                    .setSSLContext(SSLContext.getDefault())
                    .setTargetAuthenticationStrategy(new PersistentCredentialsAuthenticationStrategy());
                if (httpClientConfigCallback != null) {
                    httpClientBuilder = httpClientConfigCallback.customizeHttpClient(httpClientBuilder);
                }
    
                final HttpAsyncClientBuilder finalBuilder = httpClientBuilder;
                return AccessController.doPrivileged((PrivilegedAction<CloseableHttpAsyncClient>) finalBuilder::build);
            } catch (NoSuchAlgorithmException e) {
                throw new IllegalStateException("could not create the default ssl context", e);
            }
        }
        
        // finalBuilder::build可以看到
       ConnectionKeepAliveStrategy keepAliveStrategy = this.keepAliveStrategy;
            if (keepAliveStrategy == null) {
                keepAliveStrategy = DefaultConnectionKeepAliveStrategy.INSTANCE;
            }
    

    如果客户端使用的是apache httpclient 4.x版本,默认的keep-alive是读取response heade中Keep-Alive字段,没有的话就是无限(代码中返回-1)。"If the Keep-Alive header is not present in the response, HttpClient assumes the connection can be kept alive indefinitely",详细代码DefaultConnectionKeepAliveStrategy.class类设置的,代码如下:

    @Contract(threading = ThreadingBehavior.IMMUTABLE)
    public class DefaultConnectionKeepAliveStrategy implements ConnectionKeepAliveStrategy {
    
        public static final DefaultConnectionKeepAliveStrategy INSTANCE = new DefaultConnectionKeepAliveStrategy();
    
        @Override
        public long getKeepAliveDuration(final HttpResponse response, final HttpContext context) {
            Args.notNull(response, "HTTP response");
            final HeaderElementIterator it = new BasicHeaderElementIterator(
                    response.headerIterator(HTTP.CONN_KEEP_ALIVE));
            while (it.hasNext()) {
                final HeaderElement he = it.nextElement();
                final String param = he.getName();
                final String value = he.getValue();
                if (value != null && param.equalsIgnoreCase("timeout")) {
                    try {
                        return Long.parseLong(value) * 1000;
                    } catch(final NumberFormatException ignore) {
                    }
                }
            }
            return -1;
        }
    }
    

    网上搜了一下这个的含义,具体如下


    image.png

    这里需要自定义实现ConnectionKeepAliveStrategy

    public class CustomConnectionKeepAliveStrategy extends DefaultConnectionKeepAliveStrategy {
        /**
         * 最大keep alive的时间(分钟)
         * 这里默认为10分钟,可以根据实际情况设置。可以观察客户端机器状态为TIME_WAIT的TCP连接数,如果太多,可以增大此值。
         */
        private final static long MAX_KEEP_ALIVE_MINUTES = 10;
    
        public static final CustomConnectionKeepAliveStrategy INSTANCE = new CustomConnectionKeepAliveStrategy();
    
        private CustomConnectionKeepAliveStrategy() {
            super();
        }
    
        @Override
        public long getKeepAliveDuration(HttpResponse response, HttpContext context) {
            long keepAliveDuration = super.getKeepAliveDuration(response, context);
            // <0 为无限期keepalive,将无限期替换成一个默认的时间
            if (keepAliveDuration < 0) {
                return TimeUnit.MINUTES.toMillis(MAX_KEEP_ALIVE_MINUTES);
            }
            return keepAliveDuration;
        }
    }
    
    // 在客户端创建的时候设置KeepAlive策略为自定义策略即可
    esSinkBuilder.setRestClientFactory(restClientBuilder -> {
                restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
    httpClientBuilder.setKeepAliveStrategy(CustomConnectionKeepAliveStrategy.INSTANCE);
                    return httpClientBuilder;
                });
            });
    

    参考链接:【Elasticsearch】解决Elasticsearch HTTP方式查询报SocketTimeoutException的问题 Connection reset by peer
    两种由java http长连接(keep-alive)导致的问题
    Connection keep alive strategy

    相关文章

      网友评论

          本文标题:flink elasticsearch sink连接报错:Fai

          本文链接:https://www.haomeiwen.com/subject/rwosvltx.html