美文网首页
聊聊flink的AbstractNonHaServices

聊聊flink的AbstractNonHaServices

作者: go4it | 来源:发表于2019-02-17 10:43 被阅读28次

    本文主要研究一下flink的AbstractNonHaServices

    HighAvailabilityServices

    flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java

    public interface HighAvailabilityServices extends AutoCloseable {
    
        // ------------------------------------------------------------------------
        //  Constants
        // ------------------------------------------------------------------------
    
        /**
         * This UUID should be used when no proper leader election happens, but a simple
         * pre-configured leader is used. That is for example the case in non-highly-available
         * standalone setups.
         */
        UUID DEFAULT_LEADER_ID = new UUID(0, 0);
    
        /**
         * This JobID should be used to identify the old JobManager when using the
         * {@link HighAvailabilityServices}. With the new mode every JobMaster will have a
         * distinct JobID assigned.
         */
        JobID DEFAULT_JOB_ID = new JobID(0L, 0L);
    
        // ------------------------------------------------------------------------
        //  Services
        // ------------------------------------------------------------------------
    
        /**
         * Gets the leader retriever for the cluster's resource manager.
         */
        LeaderRetrievalService getResourceManagerLeaderRetriever();
    
        /**
         * Gets the leader retriever for the dispatcher. This leader retrieval service
         * is not always accessible.
         */
        LeaderRetrievalService getDispatcherLeaderRetriever();
    
        /**
         * Gets the leader retriever for the job JobMaster which is responsible for the given job
         *
         * @param jobID The identifier of the job.
         * @return Leader retrieval service to retrieve the job manager for the given job
         * @deprecated This method should only be used by the legacy code where the JobManager acts as the master.
         */
        @Deprecated
        LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID);
    
        /**
         * Gets the leader retriever for the job JobMaster which is responsible for the given job
         *
         * @param jobID The identifier of the job.
         * @param defaultJobManagerAddress JobManager address which will be returned by
         *                              a static leader retrieval service.
         * @return Leader retrieval service to retrieve the job manager for the given job
         */
        LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress);
    
        LeaderRetrievalService getWebMonitorLeaderRetriever();
    
        /**
         * Gets the leader election service for the cluster's resource manager.
         *
         * @return Leader election service for the resource manager leader election
         */
        LeaderElectionService getResourceManagerLeaderElectionService();
    
        /**
         * Gets the leader election service for the cluster's dispatcher.
         *
         * @return Leader election service for the dispatcher leader election
         */
        LeaderElectionService getDispatcherLeaderElectionService();
    
        /**
         * Gets the leader election service for the given job.
         *
         * @param jobID The identifier of the job running the election.
         * @return Leader election service for the job manager leader election
         */
        LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);
    
        LeaderElectionService getWebMonitorLeaderElectionService();
    
        /**
         * Gets the checkpoint recovery factory for the job manager
         *
         * @return Checkpoint recovery factory
         */
        CheckpointRecoveryFactory getCheckpointRecoveryFactory();
    
        /**
         * Gets the submitted job graph store for the job manager
         *
         * @return Submitted job graph store
         * @throws Exception if the submitted job graph store could not be created
         */
        SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception;
    
        /**
         * Gets the registry that holds information about whether jobs are currently running.
         *
         * @return Running job registry to retrieve running jobs
         */
        RunningJobsRegistry getRunningJobsRegistry() throws Exception;
    
        /**
         * Creates the BLOB store in which BLOBs are stored in a highly-available fashion.
         *
         * @return Blob store
         * @throws IOException if the blob store could not be created
         */
        BlobStore createBlobStore() throws IOException;
    
        // ------------------------------------------------------------------------
        //  Shutdown and Cleanup
        // ------------------------------------------------------------------------
    
        /**
         * Closes the high availability services, releasing all resources.
         * 
         * <p>This method <b>does not delete or clean up</b> any data stored in external stores
         * (file systems, ZooKeeper, etc). Another instance of the high availability
         * services will be able to recover the job.
         * 
         * <p>If an exception occurs during closing services, this method will attempt to
         * continue closing other services and report exceptions only after all services
         * have been attempted to be closed.
         *
         * @throws Exception Thrown, if an exception occurred while closing these services.
         */
        @Override
        void close() throws Exception;
    
        /**
         * Closes the high availability services (releasing all resources) and deletes
         * all data stored by these services in external stores.
         * 
         * <p>After this method was called, the any job or session that was managed by
         * these high availability services will be unrecoverable.
         * 
         * <p>If an exception occurs during cleanup, this method will attempt to
         * continue the cleanup and report exceptions only after all cleanup steps have
         * been attempted.
         * 
         * @throws Exception Thrown, if an exception occurred while closing these services
         *                   or cleaning up data stored by them.
         */
        void closeAndCleanupAllData() throws Exception;
    }
    
    • HighAvailabilityServices定义了highly-available所需的各种services的get方法,它有两个直接子类,一个是ZooKeeperHaServices,一个是AbstractNonHaServices

    AbstractNonHaServices

    flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java

    public abstract class AbstractNonHaServices implements HighAvailabilityServices {
        protected final Object lock = new Object();
    
        private final RunningJobsRegistry runningJobsRegistry;
    
        private final VoidBlobStore voidBlobStore;
    
        private boolean shutdown;
    
        public AbstractNonHaServices() {
            this.runningJobsRegistry = new StandaloneRunningJobsRegistry();
            this.voidBlobStore = new VoidBlobStore();
    
            shutdown = false;
        }
    
        // ----------------------------------------------------------------------
        // HighAvailabilityServices method implementations
        // ----------------------------------------------------------------------
    
        @Override
        public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
            synchronized (lock) {
                checkNotShutdown();
    
                return new StandaloneCheckpointRecoveryFactory();
            }
        }
    
        @Override
        public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
            synchronized (lock) {
                checkNotShutdown();
    
                return new StandaloneSubmittedJobGraphStore();
            }
        }
    
        @Override
        public RunningJobsRegistry getRunningJobsRegistry() throws Exception {
            synchronized (lock) {
                checkNotShutdown();
    
                return runningJobsRegistry;
            }
        }
    
        @Override
        public BlobStore createBlobStore() throws IOException {
            synchronized (lock) {
                checkNotShutdown();
    
                return voidBlobStore;
            }
        }
    
        @Override
        public void close() throws Exception {
            synchronized (lock) {
                if (!shutdown) {
                    shutdown = true;
                }
            }
        }
    
        @Override
        public void closeAndCleanupAllData() throws Exception {
            // this stores no data, so this method is the same as 'close()'
            close();
        }
    
        // ----------------------------------------------------------------------
        // Helper methods
        // ----------------------------------------------------------------------
    
        @GuardedBy("lock")
        protected void checkNotShutdown() {
            checkState(!shutdown, "high availability services are shut down");
        }
    
        protected boolean isShutDown() {
            return shutdown;
        }
    }
    
    • AbstractNonHaServices实现了HighAvailabilityServices的getCheckpointRecoveryFactory、getSubmittedJobGraphStore、getRunningJobsRegistry、createBlobStore、close、closeAndCleanupAllData方法;它有两个子类,分别是EmbeddedHaServices及StandaloneHaServices

    EmbeddedHaServices

    flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServices.java

    public class EmbeddedHaServices extends AbstractNonHaServices {
    
        private final Executor executor;
    
        private final EmbeddedLeaderService resourceManagerLeaderService;
    
        private final EmbeddedLeaderService dispatcherLeaderService;
    
        private final HashMap<JobID, EmbeddedLeaderService> jobManagerLeaderServices;
    
        private final EmbeddedLeaderService webMonitorLeaderService;
    
        public EmbeddedHaServices(Executor executor) {
            this.executor = Preconditions.checkNotNull(executor);
            this.resourceManagerLeaderService = new EmbeddedLeaderService(executor);
            this.dispatcherLeaderService = new EmbeddedLeaderService(executor);
            this.jobManagerLeaderServices = new HashMap<>();
            this.webMonitorLeaderService = new EmbeddedLeaderService(executor);
        }
    
        // ------------------------------------------------------------------------
        //  services
        // ------------------------------------------------------------------------
    
        @Override
        public LeaderRetrievalService getResourceManagerLeaderRetriever() {
            return resourceManagerLeaderService.createLeaderRetrievalService();
        }
    
        @Override
        public LeaderRetrievalService getDispatcherLeaderRetriever() {
            return dispatcherLeaderService.createLeaderRetrievalService();
        }
    
        @Override
        public LeaderElectionService getResourceManagerLeaderElectionService() {
            return resourceManagerLeaderService.createLeaderElectionService();
        }
    
        @Override
        public LeaderElectionService getDispatcherLeaderElectionService() {
            return dispatcherLeaderService.createLeaderElectionService();
        }
    
        @Override
        public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
            checkNotNull(jobID);
    
            synchronized (lock) {
                checkNotShutdown();
                EmbeddedLeaderService service = getOrCreateJobManagerService(jobID);
                return service.createLeaderRetrievalService();
            }
        }
    
        @Override
        public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress) {
            return getJobManagerLeaderRetriever(jobID);
        }
    
        @Override
        public LeaderRetrievalService getWebMonitorLeaderRetriever() {
            return webMonitorLeaderService.createLeaderRetrievalService();
        }
    
        @Override
        public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
            checkNotNull(jobID);
    
            synchronized (lock) {
                checkNotShutdown();
                EmbeddedLeaderService service = getOrCreateJobManagerService(jobID);
                return service.createLeaderElectionService();
            }
        }
    
        @Override
        public LeaderElectionService getWebMonitorLeaderElectionService() {
            return webMonitorLeaderService.createLeaderElectionService();
        }
    
        // ------------------------------------------------------------------------
        // internal
        // ------------------------------------------------------------------------
    
        @GuardedBy("lock")
        private EmbeddedLeaderService getOrCreateJobManagerService(JobID jobID) {
            EmbeddedLeaderService service = jobManagerLeaderServices.get(jobID);
            if (service == null) {
                service = new EmbeddedLeaderService(executor);
                jobManagerLeaderServices.put(jobID, service);
            }
            return service;
        }
    
        // ------------------------------------------------------------------------
        //  shutdown
        // ------------------------------------------------------------------------
    
        @Override
        public void close() throws Exception {
            synchronized (lock) {
                if (!isShutDown()) {
                    // stop all job manager leader services
                    for (EmbeddedLeaderService service : jobManagerLeaderServices.values()) {
                        service.shutdown();
                    }
                    jobManagerLeaderServices.clear();
    
                    resourceManagerLeaderService.shutdown();
    
                    webMonitorLeaderService.shutdown();
                }
    
                super.close();
            }
        }
    }
    
    • EmbeddedHaServices继承了AbstractNonHaServices,它是对HighAvailabilityServices接口在ResourceManager, JobManagers, TaskManagers运行在同一个进程的non-high-availability场景下的实现,FlinkMiniCluster使用的就是EmbeddedHaServices

    StandaloneHaServices

    flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServices.java

    public class StandaloneHaServices extends AbstractNonHaServices {
    
        /** The constant name of the ResourceManager RPC endpoint */
        private static final String RESOURCE_MANAGER_RPC_ENDPOINT_NAME = "resource_manager";
    
        /** The fix address of the ResourceManager */
        private final String resourceManagerAddress;
    
        /** The fix address of the Dispatcher */
        private final String dispatcherAddress;
    
        /** The fix address of the JobManager */
        private final String jobManagerAddress;
    
        private final String webMonitorAddress;
    
        /**
         * Creates a new services class for the fix pre-defined leaders.
         *
         * @param resourceManagerAddress    The fix address of the ResourceManager
         * @param webMonitorAddress
         */
        public StandaloneHaServices(
                String resourceManagerAddress,
                String dispatcherAddress,
                String jobManagerAddress,
                String webMonitorAddress) {
            this.resourceManagerAddress = checkNotNull(resourceManagerAddress, "resourceManagerAddress");
            this.dispatcherAddress = checkNotNull(dispatcherAddress, "dispatcherAddress");
            this.jobManagerAddress = checkNotNull(jobManagerAddress, "jobManagerAddress");
            this.webMonitorAddress = checkNotNull(webMonitorAddress, webMonitorAddress);
        }
    
        // ------------------------------------------------------------------------
        //  Services
        // ------------------------------------------------------------------------
    
        @Override
        public LeaderRetrievalService getResourceManagerLeaderRetriever() {
            synchronized (lock) {
                checkNotShutdown();
    
                return new StandaloneLeaderRetrievalService(resourceManagerAddress, DEFAULT_LEADER_ID);
            }
    
        }
    
        @Override
        public LeaderRetrievalService getDispatcherLeaderRetriever() {
            synchronized (lock) {
                checkNotShutdown();
    
                return new StandaloneLeaderRetrievalService(dispatcherAddress, DEFAULT_LEADER_ID);
            }
        }
    
        @Override
        public LeaderElectionService getResourceManagerLeaderElectionService() {
            synchronized (lock) {
                checkNotShutdown();
    
                return new StandaloneLeaderElectionService();
            }
        }
    
        @Override
        public LeaderElectionService getDispatcherLeaderElectionService() {
            synchronized (lock) {
                checkNotShutdown();
    
                return new StandaloneLeaderElectionService();
            }
        }
    
        @Override
        public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
            synchronized (lock) {
                checkNotShutdown();
    
                return new StandaloneLeaderRetrievalService(jobManagerAddress, DEFAULT_LEADER_ID);
            }
        }
    
        @Override
        public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress) {
            synchronized (lock) {
                checkNotShutdown();
    
                return new StandaloneLeaderRetrievalService(defaultJobManagerAddress, DEFAULT_LEADER_ID);
            }
        }
    
        @Override
        public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
            synchronized (lock) {
                checkNotShutdown();
    
                return new StandaloneLeaderElectionService();
            }
        }
    
        @Override
        public LeaderRetrievalService getWebMonitorLeaderRetriever() {
            synchronized (lock) {
                checkNotShutdown();
    
                return new StandaloneLeaderRetrievalService(webMonitorAddress, DEFAULT_LEADER_ID);
            }
        }
    
        @Override
        public LeaderElectionService getWebMonitorLeaderElectionService() {
            synchronized (lock) {
                checkNotShutdown();
    
                return new StandaloneLeaderElectionService();
            }
        }
    
    }
    
    • StandaloneHaServices继承了AbstractNonHaServices,它是对HighAvailabilityServices接口在non-high-availability场景下的实现,ClusterEntrypoint在highAvailabilityMode为NONE的时候使用的是StandaloneHaServices

    小结

    • HighAvailabilityServices定义了highly-available所需的各种services的get方法,它有两个直接子类,一个是ZooKeeperHaServices,一个是AbstractNonHaServices
    • AbstractNonHaServices实现了HighAvailabilityServices的getCheckpointRecoveryFactory、getSubmittedJobGraphStore、getRunningJobsRegistry、createBlobStore、close、closeAndCleanupAllData方法;它有两个子类,分别是EmbeddedHaServices及StandaloneHaServices
    • EmbeddedHaServices继承了AbstractNonHaServices,它是对HighAvailabilityServices接口在ResourceManager, JobManagers, TaskManagers运行在同一个进程的non-high-availability场景下的实现,FlinkMiniCluster使用的就是EmbeddedHaServices;StandaloneHaServices继承了AbstractNonHaServices,它是对HighAvailabilityServices接口在non-high-availability场景下的实现,ClusterEntrypoint在highAvailabilityMode为NONE的时候使用的是StandaloneHaServices

    doc

    相关文章

      网友评论

          本文标题:聊聊flink的AbstractNonHaServices

          本文链接:https://www.haomeiwen.com/subject/ipxgeqtx.html