美文网首页
聊聊flink的RestClusterClientConfigu

聊聊flink的RestClusterClientConfigu

作者: go4it | 来源:发表于2019-03-08 09:47 被阅读5次

    本文主要研究一下flink的RestClusterClientConfiguration

    RestClusterClientConfiguration

    flink-release-1.7.2/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java

    public final class RestClusterClientConfiguration {
    
        private final RestClientConfiguration restClientConfiguration;
    
        private final long awaitLeaderTimeout;
    
        private final int retryMaxAttempts;
    
        private final long retryDelay;
    
        private RestClusterClientConfiguration(
                final RestClientConfiguration endpointConfiguration,
                final long awaitLeaderTimeout,
                final int retryMaxAttempts,
                final long retryDelay) {
            checkArgument(awaitLeaderTimeout >= 0, "awaitLeaderTimeout must be equal to or greater than 0");
            checkArgument(retryMaxAttempts >= 0, "retryMaxAttempts must be equal to or greater than 0");
            checkArgument(retryDelay >= 0, "retryDelay must be equal to or greater than 0");
    
            this.restClientConfiguration = Preconditions.checkNotNull(endpointConfiguration);
            this.awaitLeaderTimeout = awaitLeaderTimeout;
            this.retryMaxAttempts = retryMaxAttempts;
            this.retryDelay = retryDelay;
        }
    
        public RestClientConfiguration getRestClientConfiguration() {
            return restClientConfiguration;
        }
    
        /**
         * @see RestOptions#AWAIT_LEADER_TIMEOUT
         */
        public long getAwaitLeaderTimeout() {
            return awaitLeaderTimeout;
        }
    
        /**
         * @see RestOptions#RETRY_MAX_ATTEMPTS
         */
        public int getRetryMaxAttempts() {
            return retryMaxAttempts;
        }
    
        /**
         * @see RestOptions#RETRY_DELAY
         */
        public long getRetryDelay() {
            return retryDelay;
        }
    
        public static RestClusterClientConfiguration fromConfiguration(Configuration config) throws ConfigurationException {
            RestClientConfiguration restClientConfiguration = RestClientConfiguration.fromConfiguration(config);
    
            final long awaitLeaderTimeout = config.getLong(RestOptions.AWAIT_LEADER_TIMEOUT);
            final int retryMaxAttempts = config.getInteger(RestOptions.RETRY_MAX_ATTEMPTS);
            final long retryDelay = config.getLong(RestOptions.RETRY_DELAY);
    
            return new RestClusterClientConfiguration(restClientConfiguration, awaitLeaderTimeout, retryMaxAttempts, retryDelay);
        }
    }
    
    • RestClusterClientConfiguration除了RestClientConfiguration外,还有3个属性,分别是awaitLeaderTimeout、retryMaxAttempts、retryDelay;awaitLeaderTimeout读取的是rest.await-leader-timeout配置,默认是30秒;retryMaxAttempts读取的是rest.retry.max-attempts配置,默认是20;retryDelay读取的是rest.retry.delay配置,默认是3秒

    RestClusterClient

    flink-release-1.7.2/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java

    public class RestClusterClient<T> extends ClusterClient<T> implements NewClusterClient {
    
        private final RestClusterClientConfiguration restClusterClientConfiguration;
    
        private final RestClient restClient;
    
        private final ExecutorService executorService = Executors.newFixedThreadPool(4, new ExecutorThreadFactory("Flink-RestClusterClient-IO"));
    
        private final WaitStrategy waitStrategy;
    
        private final T clusterId;
    
        private final LeaderRetrievalService webMonitorRetrievalService;
    
        private final LeaderRetrievalService dispatcherRetrievalService;
    
        private final LeaderRetriever webMonitorLeaderRetriever = new LeaderRetriever();
    
        private final LeaderRetriever dispatcherLeaderRetriever = new LeaderRetriever();
    
        /** ExecutorService to run operations that can be retried on exceptions. */
        private ScheduledExecutorService retryExecutorService;
    
        //......
    
        private <C> CompletableFuture<C> retry(
                CheckedSupplier<CompletableFuture<C>> operation,
                Predicate<Throwable> retryPredicate) {
            return FutureUtils.retryWithDelay(
                CheckedSupplier.unchecked(operation),
                restClusterClientConfiguration.getRetryMaxAttempts(),
                Time.milliseconds(restClusterClientConfiguration.getRetryDelay()),
                retryPredicate,
                new ScheduledExecutorServiceAdapter(retryExecutorService));
        }
    
        @VisibleForTesting
        CompletableFuture<URL> getWebMonitorBaseUrl() {
            return FutureUtils.orTimeout(
                    webMonitorLeaderRetriever.getLeaderFuture(),
                    restClusterClientConfiguration.getAwaitLeaderTimeout(),
                    TimeUnit.MILLISECONDS)
                .thenApplyAsync(leaderAddressSessionId -> {
                    final String url = leaderAddressSessionId.f0;
                    try {
                        return new URL(url);
                    } catch (MalformedURLException e) {
                        throw new IllegalArgumentException("Could not parse URL from " + url, e);
                    }
                }, executorService);
        }
    
        //......
    }
    
    • RestClusterClient的构造器会从使用RestClusterClientConfiguration.fromConfiguration(configuration)方法从Configuration构建RestClusterClientConfiguration
    • retry方法内部使用的是FutureUtils.retryWithDelay方法,其retries参数使用的是restClusterClientConfiguration.getRetryMaxAttempts(),retryDelay参数使用的是Time.milliseconds(restClusterClientConfiguration.getRetryDelay())
    • getWebMonitorBaseUrl方法内部使用的是FutureUtils.orTimeout方法,其timeout参数使用的是restClusterClientConfiguration.getAwaitLeaderTimeout()

    小结

    • RestClusterClientConfiguration除了RestClientConfiguration外,还有3个属性,分别是awaitLeaderTimeout、retryMaxAttempts、retryDelay
    • awaitLeaderTimeout读取的是rest.await-leader-timeout配置,默认是30秒;retryMaxAttempts读取的是rest.retry.max-attempts配置,默认是20;retryDelay读取的是rest.retry.delay配置,默认是3秒
    • RestClusterClient的etry方法内部使用的是FutureUtils.retryWithDelay方法,其retries参数使用的是restClusterClientConfiguration.getRetryMaxAttempts(),retryDelay参数使用的是Time.milliseconds(restClusterClientConfiguration.getRetryDelay());getWebMonitorBaseUrl方法内部使用的是FutureUtils.orTimeout方法,其timeout参数使用的是restClusterClientConfiguration.getAwaitLeaderTimeout()

    doc

    相关文章

      网友评论

          本文标题:聊聊flink的RestClusterClientConfigu

          本文链接:https://www.haomeiwen.com/subject/uxulpqtx.html