美文网首页ZooKeeper大家一起来学Zookeeperzookeeper
zookeeper源码分析(9)-Curator相关介绍

zookeeper源码分析(9)-Curator相关介绍

作者: Monica2333 | 来源:发表于2019-03-31 19:31 被阅读2次

zookeeper常用的Java客户端有三种:zookeeper原生的、Apache Curator、开源的zkclient。Curator官网上这么说

image

一般生产环境我们使用curator,它主要解决了三类问题:
1.封装ZooKeeper client与ZooKeeper server之间的连接处理,当会话超时时可自动重连。
2.提供了一套流式风格的操作API
3.提供ZooKeeper各种分布式协调应用场景(recipe, 比如leader选举,分布式锁,分布式缓存等
)的抽象封装。
本文结合curator的使用简要介绍curator的启动加载,会话管理通知方式recipe功能的实现。

curator的用法

curator组件如下:



其中curator-recipes是建立在Curator Framework之上实现的,提供了zookeeper分布式协调相关的技巧,大多时候我们只需要依赖这一个jar包即可。

启动加载

//1.初始化过程
CuratorFramework client = CuratorFrameworkFactory.builder().connectString(connectionString)
                .retryPolicy(retryPolicy)
                .connectionTimeoutMs(connectionTimeoutMs)
                .sessionTimeoutMs(sessionTimeoutMs)
                // etc
                .build();
//2.启动过程
client.start();

可分为初始化过程和启动过程。
初始化过程
通过以上builder模式即可创建一个CuratorFrameworkImpl客户端实例,初始化方法主要为:

public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder)
    {
//定义创建原生客户端实例zookeeper的工厂方法
        ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());
//zookeeper的包装类,可处理curator较低层次的会话保持和同步请求等
        this.client = new CuratorZookeeperClient
            (
                localZookeeperFactory,
                builder.getEnsembleProvider(),
                builder.getSessionTimeoutMs(),
                builder.getConnectionTimeoutMs(),
                new Watcher()
                {
                    @Override
                    public void process(WatchedEvent watchedEvent)
                    {
                        CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
                        processEvent(event);
                    }
                },
                builder.getRetryPolicy(),
                builder.canBeReadOnly(),
                builder.getConnectionHandlingPolicy()
            );
//用于判断连接断开和连接超时的状态,设置curator的连接状态,并通过connectionStateManager触发连接事件状态通知
        internalConnectionHandler = isClassic ? new ClassicInternalConnectionHandler() : new StandardInternalConnectionHandler();
//接收事件的通知。后台线程操作事件和连接状态事件会触发
        listeners = new ListenerContainer<CuratorListener>();
//当后台线程发生异常或者handler发生异常的时候会触发
        unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>();
//后台线程执行的操作队列
        backgroundOperations = new DelayQueue<OperationAndData<?>>();
//命名空间
        namespace = new NamespaceImpl(this, builder.getNamespace());
    //线程工厂方法,初始化后台线程池时会使用
        threadFactory = getThreadFactory(builder);
//负责连接状态变化时的通知
        connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent());
//CuratorFrameworkImpl的状态,调用start方法之前为 LATENT,调用start方法之后为 STARTED ,调用close()方法之后为STOPPED
        state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
//错误连接策略
        connectionStateErrorPolicy = Preconditions.checkNotNull(builder.getConnectionStateErrorPolicy(), "errorPolicy cannot be null");
//有保障的执行删除操作,其实是不断尝试直到删除成功,通过递归调用实现
        failedDeleteManager = new FailedDeleteManager(this);
//有保障的执行删除watch操作
        failedRemoveWatcherManager = new FailedRemoveWatchManager(this);
//服务端可用节点的检测器,第一次连接和重连成功之后都会触发重新获取服务端列表
        ensembleTracker = new EnsembleTracker(this, builder.getEnsembleProvider());
    }

可以看出,主要初始化了zookeeper客户端包装实例CuratorZookeeperClient,与后台操作,连接事件,异常相关的listener容器,命名空间和负载均衡等。这些都是与curator 功能密切相关的实现。这里具体看下CuratorZookeeperClientConnectionStateManager的初始化过程。
CuratorZookeeperClient初始化

public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy)
    {
//3.0.0版本后默认为StandardInternalConnectionHandler,之前session Expired是由服务端告知才会触发Expired事件,
//StandardConnectionHandler当收到Disconnect事件后,如果在规定时间内没有重连到服务器,则会主动触发Expired事件
        this.connectionHandlingPolicy = connectionHandlingPolicy;
        if ( sessionTimeoutMs < connectionTimeoutMs )
        {
            log.warn(String.format("session timeout [%d] is less than connection timeout [%d]", sessionTimeoutMs, connectionTimeoutMs));
        }
//重新尝试连接的策略
        retryPolicy = Preconditions.checkNotNull(retryPolicy, "retryPolicy cannot be null");
        ensembleProvider = Preconditions.checkNotNull(ensembleProvider, "ensembleProvider cannot be null");
        this.connectionTimeoutMs = connectionTimeoutMs;
//curator注册到原生客户端上的defaultWatcher,会收到和连接状态有关的事件通知等,负责超时重连
        state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, tracer, canBeReadOnly, connectionHandlingPolicy);
        setRetryPolicy(retryPolicy);
    }

可以看出他主要负责了连接的创建和保证连接正常,此外如果直接同步调用客户端与服务端操作,他也根据retryPolicy负责同步操作时候的连接保证。ConnectionState是注册到原生客户端上的defaultWatcher.

ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy)
    {
        this.ensembleProvider = ensembleProvider;
        this.sessionTimeoutMs = sessionTimeoutMs;
        this.connectionTimeoutMs = connectionTimeoutMs;
        this.tracer = tracer;
        this.connectionHandlingPolicy = connectionHandlingPolicy;
        if ( parentWatcher != null )
        {
//因为defaultWatcher只能有一个,通过parentWatchers可实现defaultWatcher接到事件通知时parentWatchers的回调
            parentWatchers.offer(parentWatcher);
        }

        zooKeeper = new HandleHolder(zookeeperFactory, this, ensembleProvider, sessionTimeoutMs, canBeReadOnly);
    }

ConnectionStateManager初始化
它主要负责curator相关连接状态的处理和通知,如果我们想要监听连接状态的改变,就需要向它的listeners上注册一个ConnectionStateListener

//连接状态事件通知队列
private final BlockingQueue<ConnectionState> eventQueue = new ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE);
//需要通知的listeners
    private final ListenerContainer<ConnectionStateListener> listeners = new ListenerContainer<ConnectionStateListener>();
//ConnectionStateManager的运行状态
 private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);

public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, int sessionTimeoutMs, int sessionExpirationPercent)
    {
        this.client = client;
        this.sessionTimeoutMs = sessionTimeoutMs;
        this.sessionExpirationPercent = sessionExpirationPercent;
        if ( threadFactory == null )
        {
            threadFactory = ThreadUtils.newThreadFactory("ConnectionStateManager");
        }
//事件队列处理线程池
        service = Executors.newSingleThreadExecutor(threadFactory);
    }

启动过程
建立与服务端的会话连接和相关功能的启动
CuratorFrameworkImpl.start

public void start()
    {
        log.info("Starting");
        if ( !state.compareAndSet(CuratorFrameworkState.LATENT, CuratorFrameworkState.STARTED) )
        {
            throw new IllegalStateException("Cannot be started more than once");
        }

        try
        {
//启动connectionStateManager
            connectionStateManager.start(); // ordering dependency - must be called before client.start()

            final ConnectionStateListener listener = new ConnectionStateListener()
            {
                @Override
                public void stateChanged(CuratorFramework client, ConnectionState newState)
                {
                    if ( ConnectionState.CONNECTED == newState || ConnectionState.RECONNECTED == newState )
                    {
                        logAsErrorConnectionErrors.set(true);
                    }
                }
            };

            this.getConnectionStateListenable().addListener(listener);
//建立与服务端的连接
            client.start();

            executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
            executorService.submit(new Callable<Object>()
            {
                @Override
                public Object call() throws Exception
                {
                    backgroundOperationsLoop();
                    return null;
                }
            });

            ensembleTracker.start();
        }
        catch ( Exception e )
        {
            ThreadUtils.checkInterrupted(e);
            handleBackgroundOperationException(null, e);
        }
    }

可以看出分别启动ConnectionStateManager和CuratorZookeeperClient。启动ConnectionStateManager可做好连接事件通知的准备,启动CuratorZookeeperClient建立与服务端的会话连接。

会话管理

curator的会话管理是在原生客户端的会话管理基础上包装而来,在上面的启动过程中我们介绍到ConnectionState会负责超时的重连,ConnectionStateManager会负责连接状态的改变和通知, connectionHandlingPolicy会负责连接超时的主动触发。此外,在客户端执行一些操作时如果感知到连接断开,也可以主动进行连接重连。下面会介绍下curator如何在原生客户端的会话管理基础上进行会话状态的通知会话超时的重连
我们知道会话连接状态相关的事件类型为Watcher.Event.EventType.None,会通知到客户端上所有的Watcher,ConnectionState作为defaultWatcher,它的事件回调如下:

 public void process(WatchedEvent event)
    {
        if ( LOG_EVENTS )
        {
            log.debug("ConnectState watcher: " + event);
        }

        if ( event.getType() == Watcher.Event.EventType.None )
        {
          //isConnected:客户当前的连接状态,true表示已连接(SyncConnected和ConnectedReadOnly状态)
            boolean wasConnected = isConnected.get();
            boolean newIsConnected = checkState(event.getState(), wasConnected);
            if ( newIsConnected != wasConnected )
            {
//如果连接状态发生改变,则更新
                isConnected.set(newIsConnected);
                connectionStartMs = System.currentTimeMillis();
                if ( newIsConnected )
                {
//说明是重连,更新会话超时协商时间
                    lastNegotiatedSessionTimeoutMs.set(zooKeeper.getNegotiatedSessionTimeoutMs());
                    log.debug("Negotiated session timeout: " + lastNegotiatedSessionTimeoutMs.get());
                }
            }
        }
//通知parentWatchers,注意初始化的时候其实传入了一个parentWatcher,会调用CuratorFrameworkImpl.processEvent
        for ( Watcher parentWatcher : parentWatchers )
        {
            TimeTrace timeTrace = new TimeTrace("connection-state-parent-process", tracer.get());
            parentWatcher.process(event);
            timeTrace.commit();
        }
    }

可以看到,对连接状态事件的处理主要是checkState方法

private boolean checkState(Event.KeeperState state, boolean wasConnected)
    {
        boolean isConnected = wasConnected;
        boolean checkNewConnectionString = true;
        switch ( state )
        {
        default:
        case Disconnected:
        {
            isConnected = false;
            break;
        }

        case SyncConnected:
        case ConnectedReadOnly:
        {
            isConnected = true;
            break;
        }

        case AuthFailed:
        {
            isConnected = false;
            log.error("Authentication failed");
            break;
        }

        case Expired:
        {
            isConnected = false;
            checkNewConnectionString = false;
            handleExpiredSession();
            break;
        }

        case SaslAuthenticated:
        {
            // NOP
            break;
        }
        }

        if ( checkNewConnectionString )
        {
//如果服务端列表发生变化,则更新
            String newConnectionString = zooKeeper.getNewConnectionString();
            if ( newConnectionString != null )
            {
                handleNewConnectionString(newConnectionString);
            }
        }

        return isConnected;
    }

可以看到会根据不同的会话状态判断连接是否正常,isConnected = true表示正常。当会话超时过期Expired时,会调用handleExpiredSession进行reset操作,也就是连接的关闭和重新建立新的会话连接。即会话超时的被动重连。在连接过程中,会根据客户端设置的连接重试机制retryPolicy检测重连是否超时。

  1. parentWatchers的回调
    其实在CuratorFramework client初始化时,会初始化一个watcher添加到ConnectionStateparentWatcher中,负责连接状态改变时的会话状态改变。
//初始化的parentWatcher
new Watcher()
                {
                    @Override
                    public void process(WatchedEvent watchedEvent)
                    {
                        CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
                        processEvent(event);
                    }
                }

会话状态改变时调用上面watcher的process方法,调用至CuratorFrameworkImpl.processEvent

private void processEvent(final CuratorEvent curatorEvent)
    {
        if ( curatorEvent.getType() == CuratorEventType.WATCHED )
        {
//状态转换
            validateConnection(curatorEvent.getWatchedEvent().getState());
        }
//通知所有注册的CuratorListener 
        listeners.forEach(new Function<CuratorListener, Void>()
        {
            @Override
            public Void apply(CuratorListener listener)
            {
                try
                {
                    TimeTrace trace = client.startTracer("EventListener");
                    listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent);
                    trace.commit();
                }
                catch ( Exception e )
                {
                    ThreadUtils.checkInterrupted(e);
                    logError("Event listener threw exception", e);
                }
                return null;
            }
        });
    }

其中,validateConnection负责连接状态的转换

void validateConnection(Watcher.Event.KeeperState state)
    {
        if ( state == Watcher.Event.KeeperState.Disconnected )
        {
            internalConnectionHandler.suspendConnection(this);
        }
        else if ( state == Watcher.Event.KeeperState.Expired )
        {
            connectionStateManager.addStateChange(ConnectionState.LOST);
        }
        else if ( state == Watcher.Event.KeeperState.SyncConnected )
        {
            internalConnectionHandler.checkNewConnection(this);
            connectionStateManager.addStateChange(ConnectionState.RECONNECTED);
        }
        else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly )
        {
            internalConnectionHandler.checkNewConnection(this);
            connectionStateManager.addStateChange(ConnectionState.READ_ONLY);
        }
    }

可以看出ConnectionStateManager负责curator连接状态的更新。原生客户端的连接状态和curator包装的连接状态对应关系如下:


同时,ConnectionStateManager会将当前状态ConnectionState放入自身的事件队列中,通知所有注册到自身listenersConnectionStateListener

此外,当相关操作(包括同步和后台线程的操作,如getData)发现连接断开了,也会调用client.getZooKeeper()重连,(注意底层建立客户端连接是加锁的,保证一个客户端只有一个线程可以创建会话成功)。如CuratorFrameworkImpl的后台线程任务:

 void performBackgroundOperation(OperationAndData<?> operationAndData)
    {
        try
        {
            if ( !operationAndData.isConnectionRequired() || client.isConnected() )
            {
                operationAndData.callPerformBackgroundOperation();
            }
            else
            {
                client.getZooKeeper();  // important - allow connection resets, timeouts, etc. to occur
                if ( operationAndData.getElapsedTimeMs() >= client.getConnectionTimeoutMs() )
                {
                    throw new CuratorConnectionLossException();
                }
                operationAndData.sleepFor(1, TimeUnit.SECONDS);
                queueOperation(operationAndData);
            }
        }
        catch ( Throwable e )
        {
            ThreadUtils.checkInterrupted(e);

            /**
             * Fix edge case reported as CURATOR-52. ConnectionState.checkTimeouts() throws KeeperException.ConnectionLossException
             * when the initial (or previously failed) connection cannot be re-established. This needs to be run through the retry policy
             * and callbacks need to get invoked, etc.
             */
            if ( e instanceof CuratorConnectionLossException )
            {
                WatchedEvent watchedEvent = new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Disconnected, null);
                CuratorEvent event = new CuratorEventImpl(this, CuratorEventType.WATCHED, KeeperException.Code.CONNECTIONLOSS.intValue(), null, null, operationAndData.getContext(), null, null, null, watchedEvent, null, null);
                if ( checkBackgroundRetry(operationAndData, event) )
                {
                    queueOperation(operationAndData);
                }
                else
                {
                    logError("Background retry gave up", e);
                }
            }
            else
            {
                handleBackgroundOperationException(operationAndData, e);
            }
        }
    }

通知机制

通知机制其实就是在事件发生的地方触发已经注册好的listerner相应的回调函数(观察者模式)。CuratorFrameworkImpl client可注册listener的方式有:

  • 一次性watch
client.checkExists().creatingParentContainersIfNeeded().usingWatcher(watcher).inBackground().forPath(path);

同原生客户端的watch,只能生效一次,需要反复注册。

  • 注册CuratorListener
// this is one method of getting event/async notifications
        CuratorListener listener = new CuratorListener() {
            @Override
            public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
                // examine event for details
            }
        };
        client.getCuratorListenable().addListener(listener);

其实是将listener添加到CuratorFrameworkImpl.listeners中。当后台线程完成操作会触发相应的事件通知该listener,如异步创建路径会触发CuratorEventType.CREATE事件。此外当连接状态事件触发时,parentWatcher也会回调这些listeners.

  • 注册ConnectionStateListener
ConnectionStateListener connectionStateListener = new ConnectionStateListener()
    {
        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState)
        {
          //Some details
        }
    };
client.getConnectionStateListenable().addListener(connectionStateListener);

其实是将connectionStateListener添加到connectionStateManager.listeners中,在连接状态发生改变时,会收到通知。

  • 注册UnhandledErrorListener
UnhandledErrorListener unhandledErrorListener = new UnhandledErrorListener() {
            @Override
            public void unhandledError(String message, Throwable e) {
                //
            }
        };
        client.getUnhandledErrorListenable().addListener(unhandledErrorListener);

其实是将unhandledErrorListener添加到CuratorFrameworkImpl.unhandledErrorListeners中,当后台线程操作发生异常或者handler发生异常的时候会触发,收到通知。

  • 后台线程操作完成时的回调
    public static void setDataAsyncWithCallback(CuratorFramework client, BackgroundCallback callback, String path, byte[] payload) throws Exception {
        // this is another method of getting notification of an async completion
        client.setData().inBackground(callback).forPath(path, payload);
    }

对于不同的操作,如setData,可通过链式调用inBackground(callback)传入回调函数callback,这样当操作完成后,会执行回调函数。

  • 缓存机制,多次注册
    curator提供了三种类型的缓存方式:Path Cache,Node Cache 和Tree Cache。相当于和服务端的节点数据进行对比,没当数据不一致时,会通过watch机制触发,后台回调更新本地缓存数据,同时再次注册相应的watch。同时,每次重连成功之后也会重新注册watch,保证了watch不丢失。

结合上面会话管理和通知机制的介绍,可以知道原生客户端的watcher是同步通知的,当然可以在特定watcher中的处理做异步。connectionStateManager.listeners是由内部的线程池做异步通知的,CuratorFrameworkImpl.listeners对于连接状态的通知是与watcher通知线程同步,由后台线程通知时为异步。如果客户端watcher注册过多,那么可能就会导致重连之后watch丢失(重连会清空sendThread的发送和接收队列,可能会导致watch丢失),甚至重连不成功(本文分析的版本3.1.0中只要调用client.getZooKeeper()就会重连,和当时的连接状态无关。所以我觉得看版本,如参考资料二)。

recipes功能

curator实现的recipes功能主要有:分布式锁Leader选举Barriers计数器缓存临时节点
队列事务等。具体使用方式可参考资料一。下面简要介绍下实现原理。

TODO

  • 分布式锁

  • Leader选举

  • Barriers

  • 计数器

  • 缓存

  • 临时节点

  • 队列

  • 事务

参考资料:
跟着实例学习ZooKeeper的用法: 文章汇总
Zookeeper Client架构分析——ZK链接重连失败排查

相关文章

网友评论

    本文标题:zookeeper源码分析(9)-Curator相关介绍

    本文链接:https://www.haomeiwen.com/subject/djoysqtx.html