美文网首页Java面试
zookeeper源码分析之curator客户端

zookeeper源码分析之curator客户端

作者: 1d96ba4c1912 | 来源:发表于2018-09-16 22:16 被阅读67次

    curator是对zookeeper原生客户端的一个封装,让我们使用起来更加方便。本文针对它的工作原理做一个总结,由于可能需要对zookeeper原生客户端的使用方式有一些了解才能更好的理解本文,因此建议先看下zookeeper源码分析之客户端源码解密

    一、执行过程概述

    先来看一段常见的使用curator连接ZK并创建节点的代码:

    //初始化客户端
    CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("localhost:2181")
            .sessionTimeoutMs(60 * 1000)
            .connectionTimeoutMs(3 * 1000)
            .retryPolicy(new ExponentialBackoffRetry(1000, 29, 60 * 1000))
            .build();
    //开始连接ZK集群
    client.start();
    //阻塞等待连接ZK集群
    boolean success = client.blockUntilConnected(5, SECONDS);
    if (success){
        //创建ZK节点
        client.create().creatingParentsIfNeeded().forPath("test", "test".getBytes("UTF-8"));
    }
    

    从这个Demo看出,使用curator需要先创建实例,然后使用start方法开启ZK连接,最后才是执行增删改查操作,下面顺着这个思路一起来看下源码吧。

    二、源码分析

    先来看下初始化CuratorFramework的源码:

    //最后的build就是返回一个CuratorFrameworkImpl对象
    public CuratorFramework build()
    {
        return new CuratorFrameworkImpl(this);
    }
    
    //看下CuratorFrameworkImpl的构造函数
    public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder)
    {
        //zk初始化工厂,默认的ZK工厂为DefaultZookeeperFactory,就是调用zk原生的客户端new Zookeeper(...)
        ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());
        //注意这里client的类型
        this.client = new CuratorZookeeperClient
            (
                localZookeeperFactory,
                builder.getEnsembleProvider(),
                builder.getSessionTimeoutMs(),
                builder.getConnectionTimeoutMs(),
                builder.getWaitForShutdownTimeoutMs(),
                //这个watcher收到ZK原生的事件以后封装成curator的事件类型CuratorEvent然后进行处理
                new Watcher()
                {
                    @Override
                    public void process(WatchedEvent watchedEvent)
                    {
                        CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
                        processEvent(event);
                    }
                },
                builder.getRetryPolicy(),
                builder.canBeReadOnly(),
                builder.getConnectionHandlingPolicy()
            );
        //zk连接状态的管理类,状态发生变化时回调listener的逻辑就在里面
        connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent());
        //ZK节点默认值为本机IP,ZK本身是不允许创建没有value的节点的,但curator允许,就是使用了该默认值
        byte[] builderDefaultData = builder.getDefaultData();
        //为了便于理解,省略了大量其他变量的初始化
    }
    //上面调用的CuratorZookeeperClient的构造函数
    public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider,
            int sessionTimeoutMs, int connectionTimeoutMs, int waitForShutdownTimeoutMs, Watcher watcher,
            RetryPolicy retryPolicy, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy)
    {
        //封装了ZK客户端,传入的watcher也继续传下去
        state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, tracer, canBeReadOnly, connectionHandlingPolicy);
        //继续省略大量初始化其他变量代码
    }
    //ConnectionState的构造函数
    ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy)
    {
        //ensembleProvider是用来获取ZK地址的类,以zk地址字符串作为参数会封装成FixedEnsembleProvider类型
        this.ensembleProvider = ensembleProvider;
        this.sessionTimeoutMs = sessionTimeoutMs;
        this.connectionTimeoutMs = connectionTimeoutMs;
        this.tracer = tracer;
        this.connectionHandlingPolicy = connectionHandlingPolicy;
        //这里的parentWatcher就是由CuratorFrameworkImpl的构造函数传进来的
        if ( parentWatcher != null )
        {
            parentWatchers.offer(parentWatcher);
        }
    
        //从这个this可以看出来该类继承了ZK中的watcher,也就是说该类本质上就是一个watcher
        zooKeeper = new HandleHolder(zookeeperFactory, this, ensembleProvider, sessionTimeoutMs, canBeReadOnly);
    }
    //最后一个构造函数什么也不做,只是赋值
    HandleHolder(ZookeeperFactory zookeeperFactory, Watcher watcher, EnsembleProvider ensembleProvider, int sessionTimeout, boolean canBeReadOnly)
    {
        this.zookeeperFactory = zookeeperFactory;
        this.watcher = watcher;
        this.ensembleProvider = ensembleProvider;
        this.sessionTimeout = sessionTimeout;
        this.canBeReadOnly = canBeReadOnly;
    }
    

    从上面一系列的初始化过程来看:

    1. CuratorFrameworkImpl对外暴露的是操作ZK的API,例如增删改查,持有的客户端引用是CuratorZookeeperClient类型。
    2. CuratorZookeeperClient提供的API较少,它持有客户端引用是ConnectionState类型。
    3. ConnectionState本身实现了zookeeper中的watcher接口,它负责处理zookeeper事件,例如会话超时,它持有的客户端引用是HandleHolder类型。
    4. HandleHolder里面封装了连接以及关闭ZK的逻辑。

    完成变量初始化以后想真正连接ZK就要调用CuratorFramework类中的start方法,看下源码,同样还是省略了部分代码:

    public void start()
    {
        //这个state是CuratorFrameworkImpl中表示自己状态的,别弄混
        if ( !state.compareAndSet(CuratorFrameworkState.LATENT, CuratorFrameworkState.STARTED) )
        {
            throw new IllegalStateException("Cannot be started more than once");
        }
    
        try
        {
            //看类名可以知道这是启动连接状态的管理线程
            connectionStateManager.start(); 
            //这里调用的是CuratorZookeeperClient中的start方法,真正与ZK建立连接
            client.start();
            //开启一个异步线程去执行一些背后的任务
            executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
            executorService.submit(new Callable<Object>()
            {
                @Override
                public Object call() throws Exception
                {
                    backgroundOperationsLoop();
                    return null;
                }
            });
        }
        catch ( Exception e )
        {
            ThreadUtils.checkInterrupted(e);
            handleBackgroundOperationException(null, e);
        }
    }
    

    通过之前zookeeper原生API的源码可以知道,建立ZK连接的时候需要注册一个watcher来处理ZK返回的连接状态跟节点状态的变化的事件。上面的源码虽然先启动了connectionStateManager类,但我们这里还是先看下CuratorZookeeperClient的start方法:

    public void start() throws Exception
    {
        //这里又一个类似的started变量,它是CuratorZookeeperClient类自己的,别弄混
        if ( !started.compareAndSet(false, true) )
        {
            throw new IllegalStateException("Already started");
        }
        //这里就是调用的ConnectionState的start方法
        state.start();
    }
    //ConnectionState的start方法
    void start() throws Exception
    {
        //这个是在获取ZK地址前做一些操作,默认为空
        ensembleProvider.start();
        reset();
    }
    
    synchronized void reset() throws Exception
    {
        log.debug("reset");
    
        //用来记录zookeeper实例被构建了多少次,通过这个计数可以区分状态没有变化时连接有没有发生改变,例如连接秒断开就秒连上。
        instanceIndex.incrementAndGet();
        //连接状态置为false
        isConnected.set(false);
        //连接开始时间
        connectionStartMs = System.currentTimeMillis();
        //关闭老的zookeeper实例,重新构建新的helper
        zooKeeper.closeAndReset();
        //连接ZK
        zooKeeper.getZooKeeper();   // initiate connection
    }
    //上面调用的HandleHolder类中的方法
    void closeAndReset() throws Exception
    {
        //如果有的话关闭之前的zookeeper实例,重构构建helper
        internalClose(0);
        //这里初始化了helper对象
        helper = new Helper()
        {
            private volatile ZooKeeper zooKeeperHandle = null;
            private volatile String connectionString = null;
    
            @Override
            public ZooKeeper getZooKeeper() throws Exception
            {
                synchronized(this)
                {
                    if ( zooKeeperHandle == null )
                    {
                        connectionString = ensembleProvider.getConnectionString();
                        //这里传入的watcher就是ConnectionState
                        zooKeeperHandle = zookeeperFactory.newZooKeeper(connectionString, sessionTimeout, watcher, canBeReadOnly);
                    }
    
                    //连接ZK以后修改helper的实现,后面读取不用再加锁
                    //这个思路有点怪,但是合理
                    helper = new Helper()
                    {
                        @Override
                        public ZooKeeper getZooKeeper() throws Exception
                        {
                            return zooKeeperHandle;
                        }
    
                        @Override
                        public String getConnectionString()
                        {
                            return connectionString;
                        }
    
                        @Override
                        public int getNegotiatedSessionTimeoutMs()
                        {
                            return (zooKeeperHandle != null) ? zooKeeperHandle.getSessionTimeout() : 0;
                        }
                    };
    
                    return zooKeeperHandle;
                }
            }
    
            @Override
            public String getConnectionString()
            {
                return connectionString;
            }
    
            @Override
            public int getNegotiatedSessionTimeoutMs()
            {
                return (zooKeeperHandle != null) ? zooKeeperHandle.getSessionTimeout() : 0;
            }
        };
    }
    //通过上面的helper实现知道这里真正与ZK建立连接
    ZooKeeper getZooKeeper() throws Exception
    {
        return (helper != null) ? helper.getZooKeeper() : null;
    }
    

    上面的代码成功与ZK集群建立了连接,顺着思路,接下来应该通过watcher接收ZK返回的连接事件并进行处理,而这里的watcher就是ConnectionState类,看下它处理事件的源码,直接看它实现的Watcher接口的process方法即可:

    public void process(WatchedEvent event)
    {
        //这里为None说明收到的事件是ZK连接状态改变的事件
        if ( event.getType() == Watcher.Event.EventType.None )
        {
            boolean wasConnected = isConnected.get();
            boolean newIsConnected = checkState(event.getState(), wasConnected);
            //连接状态发生变化
            if ( newIsConnected != wasConnected )
            {
                isConnected.set(newIsConnected);
                //记录连接开始时间
                connectionStartMs = System.currentTimeMillis();
                //连接状态变化为已连接则记录新协商的会话超时时间
                if ( newIsConnected )
                {
                    lastNegotiatedSessionTimeoutMs.set(zooKeeper.getNegotiatedSessionTimeoutMs());
                }
            }
        }
    
        //回调curator自己的watcher,注意这个parentWatchers在初始化的时候就传进来一个
        for ( Watcher parentWatcher : parentWatchers )
        {
            OperationTrace trace = new OperationTrace("connection-state-parent-process", tracer.get(), getSessionId());
            parentWatcher.process(event);
            trace.commit();
        }
    }
    

    下面先来看下关键的对于连接状态的处理,就是上面的checkState方法源码:

    private boolean checkState(Event.KeeperState state, boolean wasConnected)
        {
            boolean isConnected = wasConnected;
            boolean checkNewConnectionString = true;
            switch ( state )
            {
                    default:
                    case Disconnected:
                    {
                        isConnected = false;
                        break;
                    }
                    case SyncConnected:
                    case ConnectedReadOnly:
                    {
                        isConnected = true;
                        break;
                    }
                    case AuthFailed:
                    {
                        isConnected = false;
                        break;
                    }
                    case Expired:
                    {
                        isConnected = false;
                        checkNewConnectionString = false;
                        //处理会话过期,里面其实就是重新构建ZK连接
                        handleExpiredSession();
                        break;
                    }
                    case SaslAuthenticated:
                    {
                        break;
                    }
            }
            //当连接状态发生改变且不是会话过期时,检查ZK地址是否发生变化
            if ( checkNewConnectionString )
            {
                String newConnectionString = zooKeeper.getNewConnectionString();
                if ( newConnectionString != null )
                {
                    //处理ZK地址发生变化
                    handleNewConnectionString(newConnectionString);
                }
            }
    
            return isConnected;
        }
    

    处理会话过期的逻辑很简单,就是销毁当前ZK实例,重建一个新的ZK实例,如下:

    private void handleExpiredSession()
    {
        try
        {
            //session超时会重建ZK连接,该方法上面已经贴过,此处省略
            reset();
        }
        catch ( Exception e )
        {
            ThreadUtils.checkInterrupted(e);
            //重连ZK失败,加入异常队列处理
            queueBackgroundException(e);
        }
    }
    

    再来看下ZK地址发生变化的情况,即上面的handleNewConnectionString方法:

    private void handleNewConnectionString(String newConnectionString)
    {
        try
        {
            ZooKeeper zooKeeper = this.zooKeeper.getZooKeeper();
            if ( zooKeeper == null )
            {
                //ZK连接还没建立,什么也不做
            }
            else
            {
                if ( ensembleProvider.updateServerListEnabled() )
                {
                    //调用ZK原生的API更新地址列表,会引发其他客户端重连
                    zooKeeper.updateServerList(newConnectionString);
                }
                else
                {
                    //ZK地址发生变化,重置ZK连接
                    reset();
                }
            }
        }
        catch ( Exception e )
        {
            ThreadUtils.checkInterrupted(e);
            queueBackgroundException(e);
        }
    }
    

    看完了对连接状态的处理以后,回到上面的process方法,它最后会回调parentWatcher.process(event)方法,而该watcher就是我们最一开始初始化的时候传进去的,看下该watcher处理事件的逻辑:

    new Watcher()
    {
        @Override
        public void process(WatchedEvent watchedEvent)
        {
            //把原生的WatchedEvent事件封装成自己的事件类型
            CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
            processEvent(event);
        }
    }
    
    private void processEvent(final CuratorEvent curatorEvent)
    {
        if ( curatorEvent.getType() == CuratorEventType.WATCHED )
        {
            //广播连接状态变化事件
            validateConnection(curatorEvent.getWatchedEvent().getState());
        }
    
        //回调listeners
        listeners.forEach(new Function<CuratorListener, Void>()
        {
            @Override
            public Void apply(CuratorListener listener)
            {
                try
                {
                    OperationTrace trace = client.startAdvancedTracer("EventListener");
                    listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent);
                    trace.commit();
                }
                catch ( Exception e )
                {
                    ThreadUtils.checkInterrupted(e);
                    logError("Event listener threw exception", e);
                }
                return null;
            }
        });
    }
    //这里根据ZK原生的KeeperState事件类型转换成curator自己的ConnectionState事件
    //一定要注意这里的ConnectionState是个枚举,跟上面封装ZK连接的ConnectionState不是同一个类,
    //只是名字相同
    void validateConnection(Watcher.Event.KeeperState state)
    {
        if ( state == Watcher.Event.KeeperState.Disconnected )
        {
            internalConnectionHandler.suspendConnection(this);
        }
        else if ( state == Watcher.Event.KeeperState.Expired )
        {
            //广播事件
            connectionStateManager.addStateChange(ConnectionState.LOST);
        }
        else if ( state == Watcher.Event.KeeperState.SyncConnected )
        {
            //如果连接发生变化则广播LOST事件
            internalConnectionHandler.checkNewConnection(this);
            connectionStateManager.addStateChange(ConnectionState.RECONNECTED);
            unSleepBackgroundOperations();
        }
        else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly )
        {
            internalConnectionHandler.checkNewConnection(this);
            connectionStateManager.addStateChange(ConnectionState.READ_ONLY);
        }
    }
    
    public synchronized boolean addStateChange(ConnectionState newConnectionState)
    {
        if ( state.get() != State.STARTED )
        {
            return false;
        }
    
        ConnectionState previousState = currentConnectionState;
        if ( previousState == newConnectionState )
        {
            return false;
        }
        setCurrentConnectionState(newConnectionState);
    
        ConnectionState localState = newConnectionState;
        boolean isNegativeMessage = ((newConnectionState == ConnectionState.LOST) || (newConnectionState == ConnectionState.SUSPENDED) || (newConnectionState == ConnectionState.READ_ONLY));
        //如果是初始化连接,则把RECONNECTED状态修改为CONNECTED,经过上面的判断这里就只剩下CONNECTED跟RECONNECTED状态了
        if ( !isNegativeMessage && initialConnectMessageSent.compareAndSet(false, true) )
        {
            localState = ConnectionState.CONNECTED;
        }
        //广播事件
        postState(localState);
        return true;
    }
    
    private void postState(ConnectionState state)
    {
        notifyAll();
        //把ConnectionState放入队列中
        while ( !eventQueue.offer(state) )
        {
            eventQueue.poll();
        }
    }
    

    通过上面一系列的代码,我们可以看出一开始传入的watcher就是对事件进行一个转换,修改当前连接状态并把该事件加入到一个队列中。

    回到启动ZK连接的地方,我们跳过了connectionStateManager.start()段代码,现在可以来看了:

    public void start()
    {
        //这里把state设置为STARTED,不要跟CuratorFrameworkImpl弄混
        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
        //这里完全就是想要异步执行一下processEvents方法
        service.submit
            (
                new Callable<Object>()
                {
                    @Override
                    public Object call() throws Exception
                    {
                        processEvents();
                        return null;
                    }
                }
            );
    }
    
    private void processEvents()
    {
        //start方法里已经把该值置为STARTED
        while ( state.get() == State.STARTED )
        {
            try
            {
                //第一次ZK还没有建立连接,这里得到的就是用户指定的会话超时时间
                int useSessionTimeoutMs = getUseSessionTimeoutMs();
                long elapsedMs = startOfSuspendedEpoch == 0 ? useSessionTimeoutMs / 2 : System.currentTimeMillis() - startOfSuspendedEpoch;
                long pollMaxMs = useSessionTimeoutMs - elapsedMs;
    
                //这个队列就是刚才放进去事件的队列
                final ConnectionState newState = eventQueue.poll(pollMaxMs, TimeUnit.MILLISECONDS);
                if ( newState != null )
                {
                    if ( listeners.size() == 0 )
                    {
                        log.warn("There are no ConnectionStateListeners registered.");
                    }
                    //这里仅仅就是回调监听器
                    listeners.forEach
                        (
                            new Function<ConnectionStateListener, Void>()
                            {
                                @Override
                                public Void apply(ConnectionStateListener listener)
                                {
                                    listener.stateChanged(client, newState);
                                    return null;
                                }
                            }
                        );
                }
                //该值默认100,如果长时间没有收到事件变化就判断下会话是否过期
                else if ( sessionExpirationPercent > 0 )
                {
                    synchronized(this)
                    {
                        checkSessionExpiration();
                    }
                }
            }
            catch ( InterruptedException e )
            {
            }
        }
    }
    

    看完了与ZK建立连接并注册watcher的逻辑,接下来再看下创建ZK节点的逻辑,首先从create方法开始:

    public CreateBuilder create()
    {
        checkState();
        return new CreateBuilderImpl(this);
    }
    

    从create方法可以看出返回一个CreateBuilderImpl对象,使用了建造者模式,因此直接看下forPath的代码:

    public String forPath(String path) throws Exception
    {
        //这里使用了默认值,即客户端IP
        return forPath(path, client.getDefaultData());
    }
    
    public String forPath(final String givenPath, byte[] data) throws Exception
    {
        //是否对数据进行压缩
        if ( compress )
        {
            data = client.getCompressionProvider().compress(givenPath, data);
        }
    
        //构建路径
        final String adjustedPath = adjustPath(client.fixForNamespace(givenPath, createMode.isSequential()));
        //如果设置了schema的话也加进去
        List<ACL> aclList = acling.getAclList(adjustedPath);
        client.getSchemaSet().getSchema(givenPath).validateCreate(createMode, givenPath, data, aclList);
    
        String returnPath = null;
        //异步执行
        if ( backgrounding.inBackground() )
        {
            pathInBackground(adjustedPath, data, givenPath);
        }
        else
        {
            //同步执行
            String path = protectedPathInForeground(adjustedPath, data, aclList);
            //这里是返回用户传入的path,比如启动ZK的时候指定了namespace,则在创建的时候会加上
            returnPath = client.unfixForNamespace(path);
        }
        return returnPath;
    }
    

    最后来看下同步执行的代码逻辑:

    private String pathInForeground(final String path, final byte[] data, final List<ACL> aclList) throws Exception
    {
        final AtomicBoolean firstTime = new AtomicBoolean(true);
        //核心逻辑封装在了重试策略里
        String returnPath = RetryLoop.callWithRetry
            (
                client.getZookeeperClient(),
                new Callable<String>()
                {
                    @Override
                    public String call() throws Exception
                    {
                        boolean localFirstTime = firstTime.getAndSet(false) && !debugForceFindProtectedNode;
    
                        String createdPath = null;
                        //doProtected默认是false,如果开启的话有节点就不去创建了
                        if ( !localFirstTime && doProtected )
                        {
                            debugForceFindProtectedNode = false;
                            createdPath = findProtectedNodeInForeground(path);
                        }
    
                        if ( createdPath == null )
                        {
                            try
                            {
                                //根据ZK不同版本分情况创建节点
                                if ( client.isZk34CompatibilityMode() )
                                {
                                    createdPath = client.getZooKeeper().create(path, data, aclList, createMode);
                                }
                                else
                                {
                                    createdPath = client.getZooKeeper().create(path, data, aclList, createMode, storingStat, ttl);
                                }
                            }
                            catch ( KeeperException.NoNodeException e )
                            {
                                //如果父节点不存在且设置了createParentsIfNeeded属性,则挨个创建父节点
                                if ( createParentsIfNeeded )
                                {
                                    ZKPaths.mkdirs(client.getZooKeeper(), path, false, acling.getACLProviderForParents(), createParentsAsContainers);
                                    if ( client.isZk34CompatibilityMode() )
                                    {
                                        createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode);
                                    }
                                    else
                                    {
                                        createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode, storingStat, ttl);
                                    }
                                }
                                else
                                {
                                    throw e;
                                }
                            }
                            catch ( KeeperException.NodeExistsException e )
                            {
                                //同样的,如果设置了setDataIfExists,则把创建改为更新
                                if ( setDataIfExists )
                                {
                                    Stat setStat = client.getZooKeeper().setData(path, data, setDataIfExistsVersion);
                                    if(storingStat != null)
                                    {
                                        DataTree.copyStat(setStat, storingStat);
                                    }
                                    createdPath = path;
                                }
                                else
                                {
                                    throw e;
                                }
                            }
                        }
                        return createdPath;
                    }
                }
            );
        return returnPath;
    }
    

    重点来看下重试策略的使用:

    public static<T> T callWithRetry(CuratorZookeeperClient client, Callable<T> proc) throws Exception
    {
        return client.getConnectionHandlingPolicy().callWithRetry(client, proc);
    }
    
    public <T> T callWithRetry(CuratorZookeeperClient client, Callable<T> proc) throws Exception
    {
        //阻塞1s等待ZK连接成功
        client.internalBlockUntilConnectedOrTimedOut();
    
        T result = null;
        //获取重试策略
        RetryLoop retryLoop = client.newRetryLoop();
        //如果需要重试
        while ( retryLoop.shouldContinue() )
        {
            try
            {
                //执行业务逻辑
                result = proc.call();
                //标记成功
                retryLoop.markComplete();
            }
            catch ( Exception e )
            {
                ThreadUtils.checkInterrupted(e);
                //处理异常
                retryLoop.takeException(e);
            }
        }
    
        return result;
    }
    

    接下来看下几个重试的关键点,首先是retryLoop.shouldContinue():

    //很简单,没有完成就需要重试
    public boolean shouldContinue()
    {
        return !isDone;
    }
    

    由上面就可以推断出retryLoop.markComplete()的逻辑:

    public void     markComplete()
    {
        isDone = true;
    }
    

    可见最关键的应该就是异常处理retryLoop.takeException(e)了:

    public void takeException(Exception exception) throws Exception
    {
        boolean rethrow = true;
        //只有特定的异常才需要重试
        if ( isRetryException(exception) )
        {
            //重试策略中留给用户实现的允许策略
            if ( retryPolicy.allowRetry(retryCount++, System.currentTimeMillis() - startTimeMs, sleeper) )
            {
                //如果允许重试则忽略异常
                rethrow = false;
            }
        }
        //不重试的话抛出异常
        if ( rethrow )
        {
            throw exception;
        }
    }
    
    public static boolean isRetryException(Throwable exception)
    {
        if ( exception instanceof KeeperException )
        {
            KeeperException keeperException = (KeeperException)exception;
            return shouldRetry(keeperException.code().intValue());
        }
        return false;
    }
    
    //连接有问题的异常才需要重试
    public static boolean shouldRetry(int rc)
    {
        return (rc == KeeperException.Code.CONNECTIONLOSS.intValue()) ||
            (rc == KeeperException.Code.OPERATIONTIMEOUT.intValue()) ||
            (rc == KeeperException.Code.SESSIONMOVED.intValue()) ||
            (rc == KeeperException.Code.SESSIONEXPIRED.intValue()) ||
            (rc == -13); //该值是为了兼容
    }
    

    从上面的逻辑可以得出一个很重要的结论:
    curator针对每次操作ZK都会使用我们指定的重试策略进行包装处理!
    这点可能跟我们主观上面的想法不同,比如我自己开始一直认为只有断开连接的时候才需要重试。

    三、总结

    这里重点分析了curator跟ZK的交互,API只选择了create总结了一下,事实上除了增删改查,很多人经常还会用到类似于NodeCache,TreeCache等高级数据结构,其实这些数据结构都是curator对底层交互API的封装,只要了解了底层的交互逻辑再去看这部分代码就会容易很多了。

    相关文章

      网友评论

        本文标题:zookeeper源码分析之curator客户端

        本文链接:https://www.haomeiwen.com/subject/xbfvwftx.html