美文网首页
聊聊flink的jobstore配置

聊聊flink的jobstore配置

作者: go4it | 来源:发表于2019-03-09 11:17 被阅读26次

    本文主要研究一下flink的jobstore配置

    JobManagerOptions

    flink-1.7.2/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java

    @PublicEvolving
    public class JobManagerOptions {
        //......
    
        /**
         * The job store cache size in bytes which is used to keep completed
         * jobs in memory.
         */
        public static final ConfigOption<Long> JOB_STORE_CACHE_SIZE =
            key("jobstore.cache-size")
            .defaultValue(50L * 1024L * 1024L)
            .withDescription("The job store cache size in bytes which is used to keep completed jobs in memory.");
    
        /**
         * The time in seconds after which a completed job expires and is purged from the job store.
         */
        public static final ConfigOption<Long> JOB_STORE_EXPIRATION_TIME =
            key("jobstore.expiration-time")
            .defaultValue(60L * 60L)
            .withDescription("The time in seconds after which a completed job expires and is purged from the job store.");
    
        //......
    }
    
    • jobstore.cache-size默认是50M;jobstore.expiration-time默认是1小时

    SessionClusterEntrypoint

    flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java

    public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
    
        public SessionClusterEntrypoint(Configuration configuration) {
            super(configuration);
        }
    
        @Override
        protected ArchivedExecutionGraphStore createSerializableExecutionGraphStore(
                Configuration configuration,
                ScheduledExecutor scheduledExecutor) throws IOException {
            final File tmpDir = new File(ConfigurationUtils.parseTempDirectories(configuration)[0]);
    
            final Time expirationTime =  Time.seconds(configuration.getLong(JobManagerOptions.JOB_STORE_EXPIRATION_TIME));
            final long maximumCacheSizeBytes = configuration.getLong(JobManagerOptions.JOB_STORE_CACHE_SIZE);
    
            return new FileArchivedExecutionGraphStore(
                tmpDir,
                expirationTime,
                maximumCacheSizeBytes,
                scheduledExecutor,
                Ticker.systemTicker());
        }
    }
    
    • SessionClusterEntrypoint的createSerializableExecutionGraphStore方法读取了JobManagerOptions.JOB_STORE_EXPIRATION_TIME及JobManagerOptions.JOB_STORE_CACHE_SIZE配置,然后创建FileArchivedExecutionGraphStore

    FileArchivedExecutionGraphStore

    flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java

    public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphStore {
    
        private static final Logger LOG = LoggerFactory.getLogger(FileArchivedExecutionGraphStore.class);
    
        private final File storageDir;
    
        private final Cache<JobID, JobDetails> jobDetailsCache;
    
        private final LoadingCache<JobID, ArchivedExecutionGraph> archivedExecutionGraphCache;
    
        private final ScheduledFuture<?> cleanupFuture;
    
        private final Thread shutdownHook;
    
        private int numFinishedJobs;
    
        private int numFailedJobs;
    
        private int numCanceledJobs;
    
        public FileArchivedExecutionGraphStore(
                File rootDir,
                Time expirationTime,
                long maximumCacheSizeBytes,
                ScheduledExecutor scheduledExecutor,
                Ticker ticker) throws IOException {
    
            final File storageDirectory = initExecutionGraphStorageDirectory(rootDir);
    
            LOG.info(
                "Initializing {}: Storage directory {}, expiration time {}, maximum cache size {} bytes.",
                FileArchivedExecutionGraphStore.class.getSimpleName(),
                storageDirectory,
                expirationTime.toMilliseconds(),
                maximumCacheSizeBytes);
    
            this.storageDir = Preconditions.checkNotNull(storageDirectory);
            Preconditions.checkArgument(
                storageDirectory.exists() && storageDirectory.isDirectory(),
                "The storage directory must exist and be a directory.");
            this.jobDetailsCache = CacheBuilder.newBuilder()
                .expireAfterWrite(expirationTime.toMilliseconds(), TimeUnit.MILLISECONDS)
                .removalListener(
                    (RemovalListener<JobID, JobDetails>) notification -> deleteExecutionGraphFile(notification.getKey()))
                .ticker(ticker)
                .build();
    
            this.archivedExecutionGraphCache = CacheBuilder.newBuilder()
                .maximumWeight(maximumCacheSizeBytes)
                .weigher(this::calculateSize)
                .build(new CacheLoader<JobID, ArchivedExecutionGraph>() {
                    @Override
                    public ArchivedExecutionGraph load(JobID jobId) throws Exception {
                        return loadExecutionGraph(jobId);
                    }});
    
            this.cleanupFuture = scheduledExecutor.scheduleWithFixedDelay(
                jobDetailsCache::cleanUp,
                expirationTime.toMilliseconds(),
                expirationTime.toMilliseconds(),
                TimeUnit.MILLISECONDS);
    
            this.shutdownHook = ShutdownHookUtil.addShutdownHook(this, getClass().getSimpleName(), LOG);
    
            this.numFinishedJobs = 0;
            this.numFailedJobs = 0;
            this.numCanceledJobs = 0;
        }
    
        @Override
        public int size() {
            return Math.toIntExact(jobDetailsCache.size());
        }
    
        @Override
        @Nullable
        public ArchivedExecutionGraph get(JobID jobId) {
            try {
                return archivedExecutionGraphCache.get(jobId);
            } catch (ExecutionException e) {
                LOG.debug("Could not load archived execution graph for job id {}.", jobId, e);
                return null;
            }
        }
    
        @Override
        public void put(ArchivedExecutionGraph archivedExecutionGraph) throws IOException {
            final JobStatus jobStatus = archivedExecutionGraph.getState();
            final JobID jobId = archivedExecutionGraph.getJobID();
            final String jobName = archivedExecutionGraph.getJobName();
    
            Preconditions.checkArgument(
                jobStatus.isGloballyTerminalState(),
                "The job " + jobName + '(' + jobId +
                    ") is not in a globally terminal state. Instead it is in state " + jobStatus + '.');
    
            switch (jobStatus) {
                case FINISHED:
                    numFinishedJobs++;
                    break;
                case CANCELED:
                    numCanceledJobs++;
                    break;
                case FAILED:
                    numFailedJobs++;
                    break;
                default:
                    throw new IllegalStateException("The job " + jobName + '(' +
                        jobId + ") should have been in a globally terminal state. " +
                        "Instead it was in state " + jobStatus + '.');
            }
    
            // write the ArchivedExecutionGraph to disk
            storeArchivedExecutionGraph(archivedExecutionGraph);
    
            final JobDetails detailsForJob = WebMonitorUtils.createDetailsForJob(archivedExecutionGraph);
    
            jobDetailsCache.put(jobId, detailsForJob);
            archivedExecutionGraphCache.put(jobId, archivedExecutionGraph);
        }
    
        @Override
        public JobsOverview getStoredJobsOverview() {
            return new JobsOverview(0, numFinishedJobs, numCanceledJobs, numFailedJobs);
        }
    
        @Override
        public Collection<JobDetails> getAvailableJobDetails() {
            return jobDetailsCache.asMap().values();
        }
    
        @Nullable
        @Override
        public JobDetails getAvailableJobDetails(JobID jobId) {
            return jobDetailsCache.getIfPresent(jobId);
        }
    
        @Override
        public void close() throws IOException {
            cleanupFuture.cancel(false);
    
            jobDetailsCache.invalidateAll();
    
            // clean up the storage directory
            FileUtils.deleteFileOrDirectory(storageDir);
    
            // Remove shutdown hook to prevent resource leaks
            ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG);
        }
    
        // --------------------------------------------------------------
        // Internal methods
        // --------------------------------------------------------------
    
        private int calculateSize(JobID jobId, ArchivedExecutionGraph serializableExecutionGraph) {
            final File archivedExecutionGraphFile = getExecutionGraphFile(jobId);
    
            if (archivedExecutionGraphFile.exists()) {
                return Math.toIntExact(archivedExecutionGraphFile.length());
            } else {
                LOG.debug("Could not find archived execution graph file for {}. Estimating the size instead.", jobId);
                return serializableExecutionGraph.getAllVertices().size() * 1000 +
                    serializableExecutionGraph.getAccumulatorsSerialized().size() * 1000;
            }
        }
    
        private ArchivedExecutionGraph loadExecutionGraph(JobID jobId) throws IOException, ClassNotFoundException {
            final File archivedExecutionGraphFile = getExecutionGraphFile(jobId);
    
            if (archivedExecutionGraphFile.exists()) {
                try (FileInputStream fileInputStream = new FileInputStream(archivedExecutionGraphFile)) {
                    return InstantiationUtil.deserializeObject(fileInputStream, getClass().getClassLoader());
                }
            } else {
                throw new FileNotFoundException("Could not find file for archived execution graph " + jobId +
                    ". This indicates that the file either has been deleted or never written.");
            }
        }
    
        private void storeArchivedExecutionGraph(ArchivedExecutionGraph archivedExecutionGraph) throws IOException {
            final File archivedExecutionGraphFile = getExecutionGraphFile(archivedExecutionGraph.getJobID());
    
            try (FileOutputStream fileOutputStream = new FileOutputStream(archivedExecutionGraphFile)) {
                InstantiationUtil.serializeObject(fileOutputStream, archivedExecutionGraph);
            }
        }
    
        private File getExecutionGraphFile(JobID jobId) {
            return new File(storageDir, jobId.toString());
        }
    
        private void deleteExecutionGraphFile(JobID jobId) {
            Preconditions.checkNotNull(jobId);
    
            final File archivedExecutionGraphFile = getExecutionGraphFile(jobId);
    
            try {
                FileUtils.deleteFileOrDirectory(archivedExecutionGraphFile);
            } catch (IOException e) {
                LOG.debug("Could not delete file {}.", archivedExecutionGraphFile, e);
            }
    
            archivedExecutionGraphCache.invalidate(jobId);
            jobDetailsCache.invalidate(jobId);
        }
    
        private static File initExecutionGraphStorageDirectory(File tmpDir) throws IOException {
            final int maxAttempts = 10;
    
            for (int attempt = 0; attempt < maxAttempts; attempt++) {
                final File storageDirectory = new File(tmpDir, "executionGraphStore-" + UUID.randomUUID());
    
                if (storageDirectory.mkdir()) {
                    return storageDirectory;
                }
            }
    
            throw new IOException("Could not create executionGraphStorage directory in " + tmpDir + '.');
        }
    
        // --------------------------------------------------------------
        // Testing methods
        // --------------------------------------------------------------
    
        @VisibleForTesting
        File getStorageDir() {
            return storageDir;
        }
    
        @VisibleForTesting
        LoadingCache<JobID, ArchivedExecutionGraph> getArchivedExecutionGraphCache() {
            return archivedExecutionGraphCache;
        }
    }
    
    • FileArchivedExecutionGraphStore实现了ArchivedExecutionGraphStore接口,它的构造器使用guava cache创建了jobDetailsCache及archivedExecutionGraphCache
    • jobDetailsCache的expireAfterWrite使用的是expirationTime,即使用jobstore.expiration-time配置;archivedExecutionGraphCache的maximumWeight使用的是maximumCacheSizeBytes,即jobstore.cache-size配置
    • FileArchivedExecutionGraphStore还设置了一个定时任务,每隔expirationTime的时间去执行jobDetailsCache的cleanUp方法来清理缓存

    小结

    • flink的jobstore有两个配置,分别是jobstore.cache-size默认是50M,jobstore.expiration-time默认是1小时
    • SessionClusterEntrypoint的createSerializableExecutionGraphStore方法读取了JobManagerOptions.JOB_STORE_EXPIRATION_TIME及JobManagerOptions.JOB_STORE_CACHE_SIZE配置,然后创建FileArchivedExecutionGraphStore
    • FileArchivedExecutionGraphStore实现了ArchivedExecutionGraphStore接口,它的构造器使用guava cache创建了jobDetailsCache及archivedExecutionGraphCache;jobDetailsCache的expireAfterWrite使用的是expirationTime,即使用jobstore.expiration-time配置;archivedExecutionGraphCache的maximumWeight使用的是maximumCacheSizeBytes,即jobstore.cache-size配置;它还设置了一个定时任务,每隔expirationTime的时间去执行jobDetailsCache的cleanUp方法来清理缓存

    doc

    相关文章

      网友评论

          本文标题:聊聊flink的jobstore配置

          本文链接:https://www.haomeiwen.com/subject/iahspqtx.html