美文网首页
聊聊flink的RestClientConfiguration

聊聊flink的RestClientConfiguration

作者: go4it | 来源:发表于2019-03-07 13:58 被阅读5次

    本文主要研究一下flink的RestClientConfiguration

    RestClientConfiguration

    flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java

    public final class RestClientConfiguration {
    
        @Nullable
        private final SSLHandlerFactory sslHandlerFactory;
    
        private final long connectionTimeout;
    
        private final long idlenessTimeout;
    
        private final int maxContentLength;
    
        private RestClientConfiguration(
                @Nullable final SSLHandlerFactory sslHandlerFactory,
                final long connectionTimeout,
                final long idlenessTimeout,
                final int maxContentLength) {
            checkArgument(maxContentLength > 0, "maxContentLength must be positive, was: %d", maxContentLength);
            this.sslHandlerFactory = sslHandlerFactory;
            this.connectionTimeout = connectionTimeout;
            this.idlenessTimeout = idlenessTimeout;
            this.maxContentLength = maxContentLength;
        }
    
        /**
         * Returns the {@link SSLEngine} that the REST client endpoint should use.
         *
         * @return SSLEngine that the REST client endpoint should use, or null if SSL was disabled
         */
        @Nullable
        public SSLHandlerFactory getSslHandlerFactory() {
            return sslHandlerFactory;
        }
    
        /**
         * {@see RestOptions#CONNECTION_TIMEOUT}.
         */
        public long getConnectionTimeout() {
            return connectionTimeout;
        }
    
        /**
         * {@see RestOptions#IDLENESS_TIMEOUT}.
         */
        public long getIdlenessTimeout() {
            return idlenessTimeout;
        }
    
        /**
         * Returns the max content length that the REST client endpoint could handle.
         *
         * @return max content length that the REST client endpoint could handle
         */
        public int getMaxContentLength() {
            return maxContentLength;
        }
    
        /**
         * Creates and returns a new {@link RestClientConfiguration} from the given {@link Configuration}.
         *
         * @param config configuration from which the REST client endpoint configuration should be created from
         * @return REST client endpoint configuration
         * @throws ConfigurationException if SSL was configured incorrectly
         */
    
        public static RestClientConfiguration fromConfiguration(Configuration config) throws ConfigurationException {
            Preconditions.checkNotNull(config);
    
            final SSLHandlerFactory sslHandlerFactory;
            if (SSLUtils.isRestSSLEnabled(config)) {
                try {
                    sslHandlerFactory = SSLUtils.createRestClientSSLEngineFactory(config);
                } catch (Exception e) {
                    throw new ConfigurationException("Failed to initialize SSLContext for the REST client", e);
                }
            } else {
                sslHandlerFactory = null;
            }
    
            final long connectionTimeout = config.getLong(RestOptions.CONNECTION_TIMEOUT);
    
            final long idlenessTimeout = config.getLong(RestOptions.IDLENESS_TIMEOUT);
    
            int maxContentLength = config.getInteger(RestOptions.CLIENT_MAX_CONTENT_LENGTH);
    
            return new RestClientConfiguration(sslHandlerFactory, connectionTimeout, idlenessTimeout, maxContentLength);
        }
    }
    
    • RestClientConfiguration有四个属性,分别是sslHandlerFactory、connectionTimeout、idlenessTimeout、maxContentLength
    • fromConfiguration方法从Configuration中创建SSLHandlerFactory,其读取的是相关配置有security.ssl.rest.enabled,默认为false;security.ssl.protocol,默认为TLSv1.2;security.ssl.algorithms,默认为TLS_RSA_WITH_AES_128_CBC_SHA;security.ssl.rest.authentication-enabled,默认为false
    • connectionTimeout读取的是rest.connection-timeout配置,默认是15000毫秒;idlenessTimeout读取的是rest.idleness-timeout配置,默认5分钟;maxContentLength读取的是rest.client.max-content-length配置,默认是104_857_600

    RestClient

    flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java

    public class RestClient implements AutoCloseableAsync {
        private static final Logger LOG = LoggerFactory.getLogger(RestClient.class);
    
        private static final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
    
        // used to open connections to a rest server endpoint
        private final Executor executor;
    
        private final Bootstrap bootstrap;
    
        private final CompletableFuture<Void> terminationFuture;
    
        private final AtomicBoolean isRunning = new AtomicBoolean(true);
    
        public RestClient(RestClientConfiguration configuration, Executor executor) {
            Preconditions.checkNotNull(configuration);
            this.executor = Preconditions.checkNotNull(executor);
            this.terminationFuture = new CompletableFuture<>();
    
            final SSLHandlerFactory sslHandlerFactory = configuration.getSslHandlerFactory();
            ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) {
                    try {
                        // SSL should be the first handler in the pipeline
                        if (sslHandlerFactory != null) {
                            socketChannel.pipeline().addLast("ssl", sslHandlerFactory.createNettySSLHandler());
                        }
    
                        socketChannel.pipeline()
                            .addLast(new HttpClientCodec())
                            .addLast(new HttpObjectAggregator(configuration.getMaxContentLength()))
                            .addLast(new ChunkedWriteHandler()) // required for multipart-requests
                            .addLast(new IdleStateHandler(configuration.getIdlenessTimeout(), configuration.getIdlenessTimeout(), configuration.getIdlenessTimeout(), TimeUnit.MILLISECONDS))
                            .addLast(new ClientHandler());
                    } catch (Throwable t) {
                        t.printStackTrace();
                        ExceptionUtils.rethrow(t);
                    }
                }
            };
            NioEventLoopGroup group = new NioEventLoopGroup(1, new ExecutorThreadFactory("flink-rest-client-netty"));
    
            bootstrap = new Bootstrap();
            bootstrap
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(configuration.getConnectionTimeout()))
                .group(group)
                .channel(NioSocketChannel.class)
                .handler(initializer);
    
            LOG.info("Rest client endpoint started.");
        }
    
        @Override
        public CompletableFuture<Void> closeAsync() {
            return shutdownInternally(Time.seconds(10L));
        }
    
        public void shutdown(Time timeout) {
            final CompletableFuture<Void> shutDownFuture = shutdownInternally(timeout);
    
            try {
                shutDownFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
                LOG.info("Rest endpoint shutdown complete.");
            } catch (Exception e) {
                LOG.warn("Rest endpoint shutdown failed.", e);
            }
        }
    
        private CompletableFuture<Void> shutdownInternally(Time timeout) {
            if (isRunning.compareAndSet(true, false)) {
                LOG.info("Shutting down rest endpoint.");
    
                if (bootstrap != null) {
                    if (bootstrap.group() != null) {
                        bootstrap.group().shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
                            .addListener(finished -> {
                                if (finished.isSuccess()) {
                                    terminationFuture.complete(null);
                                } else {
                                    terminationFuture.completeExceptionally(finished.cause());
                                }
                            });
                    }
                }
            }
            return terminationFuture;
        }
    
        //......
    }
    
    • RestClient的构造器接收RestClientConfiguration及Executor两个参数,构造器里头创建了netty的Bootstrap,其中ChannelOption.CONNECT_TIMEOUT_MILLIS使用的是configuration.getConnectionTimeout();IdleStateHandler的readerIdleTime、writerIdleTime、allIdleTime使用的是configuration.getIdlenessTimeout();HttpObjectAggregator的maxContentLength使用的是configuration.getMaxContentLength();SSLHandlerFactory使用的是configuration.getSslHandlerFactory()

    小结

    • RestClientConfiguration有四个属性,分别是sslHandlerFactory、connectionTimeout、idlenessTimeout、maxContentLength;fromConfiguration方法从Configuration中创建SSLHandlerFactory,其读取的是相关配置有security.ssl.rest.enabled,默认为false;security.ssl.protocol,默认为TLSv1.2;security.ssl.algorithms,默认为TLS_RSA_WITH_AES_128_CBC_SHA;security.ssl.rest.authentication-enabled,默认为false
    • connectionTimeout读取的是rest.connection-timeout配置,默认是15000毫秒;idlenessTimeout读取的是rest.idleness-timeout配置,默认5分钟;maxContentLength读取的是rest.client.max-content-length配置,默认是104_857_600
    • RestClient的构造器接收RestClientConfiguration及Executor两个参数,构造器里头创建了netty的Bootstrap,其中ChannelOption.CONNECT_TIMEOUT_MILLIS使用的是configuration.getConnectionTimeout();IdleStateHandler的readerIdleTime、writerIdleTime、allIdleTime使用的是configuration.getIdlenessTimeout();HttpObjectAggregator的maxContentLength使用的是configuration.getMaxContentLength();SSLHandlerFactory使用的是configuration.getSslHandlerFactory()

    doc

    相关文章

      网友评论

          本文标题:聊聊flink的RestClientConfiguration

          本文链接:https://www.haomeiwen.com/subject/rgscpqtx.html