美文网首页
Curator源码分析--创建client

Curator源码分析--创建client

作者: Audience0 | 来源:发表于2019-11-27 19:05 被阅读0次

    curator创建zookeeper连接
    1.初始化CuratorFramework client = CuratorFrameworkFactory.newClient()
    1)入参:服务器IP地址,session超时时间,连接超时时间,重试策略
    2)初始化ZookeeperFactory,实现newZooKeeper方法,该方法实现zookeeper的连接创建
    3)初始化CuratorZookeeperClient,传入Watcher,但这个并不是传给zookeeper的,zookeeper回调时,会调到这个watcher,这个watcher会回调我们创建连接时的client.getCuratorListenable().addListener(new LcpCuratorListener(ip));这个操作。可以用于我们对事件的监控与处理(eventReceived(CuratorFramework client, CuratorEvent event))。
    4)初始化ConnectionState,将3)中的watcher传给ConnectionState的parentWatchers属性,ConnectionState也是最终传给zookeeper的watcher
    2.创建连接client.start();
    1)连接状态管理 connectionStateManager.start()
    connectionStateManager中,BlockingQueue<ConnectionState>保存着连接状态的状态变化值,
    start()方法中循环获取队列中的状态值,然后执行在创建client时 client.getConnectionStateListenable().addListener(new LcpConnectionStateListener(ip));进行状态变化的监控或处理
    2)连接创建 client.start();
    该方法主要执行的是ConnectionState的reset()方法,reset ()主要完成老连接的关闭,和新连接的创建,此处创建连接即调用初始化ZookeeperFactory实现的newZooKeeper方法
    3.zookeeper回调watcher
    传入zookeeper的watcher是ConnectionState对象,则回调时,则先调用ConnectionState中的process方法,此处会判断连接状态,SyncConnected,ConnectedReadOnly则连接成功,Expired连接过期,则重新调用2中的reset()方法,其他状态则连接失败。如果连接状态有变化则通过AtomicBoolean进行保存。
    此处还会调用之前初始化进去的parentWatchers,回调到初始化CuratorZookeeperClient时传入的watcher。此处,会校验连接状态,并将连接状态加入ConnectionStateManager状态管理器重进行管理(ConnectionStateManager的BlockingQueue<ConnectionState>)。加入ConnectionStateManager管理的状态会在connectionStateManager.start()中获取到,并可以通过 client.getConnectionStateListenable().addListener(new LcpConnectionStateListener(ip));来监控或处理

    //创建Curator连接
    public void init(){
            Assert.hasText(zkServer,"zkServer is empty");
            Assert.hasText(zkPath,"zkPath is empty");
            ip = IpUtils.getOneIpV4();
            //RetryNTimes 重试策略,
            client = CuratorFrameworkFactory.newClient(zkServer,sessionTimeoutMs,connectionTimeoutMs,new RetryNTimes(maxRetries,sleepMsBetweenRetries));
            client.getCuratorListenable().addListener(new LcpCuratorListener(ip));
            client.getConnectionStateListenable().addListener(new LcpConnectionStateListener(ip));
            client.getUnhandledErrorListenable().addListener(new LcpErrorListener(ip));
            client.start();
        }
    

    client = CuratorFrameworkFactory.newClient(zkServer,sessionTimeoutMs,connectionTimeoutMs,new RetryNTimes(maxRetries,sleepMsBetweenRetries));

     public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
        {
            return builder().
                connectString(connectString).
                sessionTimeoutMs(sessionTimeoutMs).
                connectionTimeoutMs(connectionTimeoutMs).
                retryPolicy(retryPolicy).
                build();
        }
    //返回一个CuratorFrameworkImpl对象
    public CuratorFramework build()
            {
                return new CuratorFrameworkImpl(this);
            }
    
    public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder)
        {
            //初始化ZookeeperFactory
            ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());
           初始化ConnectionState,HandleHolder
            this.client = new CuratorZookeeperClient
                (
                    localZookeeperFactory,
                    builder.getEnsembleProvider(),
                    builder.getSessionTimeoutMs(),
                    builder.getConnectionTimeoutMs(),
                    builder.getWaitForShutdownTimeoutMs(),
                    //这个watcher并不是真正传给zookeeper的watcher,传给zookeeper的是ConnectionState,
                    //ConnectionState中重写process(WatchedEvent event)方法中,会调用这个Watcher
                    new Watcher()
                    {
                        @Override
                        public void process(WatchedEvent watchedEvent)
                        {
                            CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
                            //在这里,实现CuratorListener接口的listener重写eventReceived方法,接收zk事件信息
                            processEvent(event);
                        }
                    },
                    builder.getRetryPolicy(),
                    builder.canBeReadOnly(),
                    builder.getConnectionHandlingPolicy()
                );
    
           //zk连接状态的管理类, 状态发生变化时,回掉listener的
            connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent(), builder.getConnectionStateListenerDecorator());
           //K节点默认值为本机IP,ZK本身是不允许创建没有value的节点的,但curator允许,就是使用了该默认值
            byte[] builderDefaultData = builder.getDefaultData();
           //省略其他变量赋值
        }
    

    CuratorFramework.start();

     @Override
        public void start()
        {
           try
              {
                //开启 连接状态管理
                connectionStateManager.start(); 
                //CuratorZookeeperClient中的start方法,真正与ZK建立连接
                client.start();
                }
            catch ( Exception e )
            {
                ThreadUtils.checkInterrupted(e);
                handleBackgroundOperationException(null, e);
            }
        }
    

    CuratorZookeeperClient

     public void start() throws Exception
        {
            log.debug("Starting");
    
            if ( !started.compareAndSet(false, true) )
            {
                throw new IllegalStateException("Already started");
            }
            //ConnectionState 的start方法
            state.start();
        }
    

    ConnectionState类

    void start() throws Exception
        {
            log.debug("Starting");
            ensembleProvider.start();
            reset();
        }
    
    synchronized void reset() throws Exception
        {
            log.debug("reset");
            //用来记录zookeeper实例创建次数
            instanceIndex.incrementAndGet();
            isConnected.set(false);
            //连接开始时间
            connectionStartMs = System.currentTimeMillis();
            //HandleHolder  关闭老的zookeeper实例,重新构建新的helper
            zooKeeper.closeAndReset();
            //调用zookeeperFactory.newZooKeeper创建原生zookeeper连接
            zooKeeper.getZooKeeper();  
        }
    

    HandleHolder类

    void closeAndReset() throws Exception
        {
            //如果有的话关闭之前的zookeeper实例,重构HandleHolder
            internalClose(0);
            helper = new Helper()
            {
                private volatile ZooKeeper zooKeeperHandle = null;
                private volatile String connectionString = null;
    
                @Override
                public ZooKeeper getZooKeeper() throws Exception
                {
                    synchronized(this)
                    {
                        if ( zooKeeperHandle == null )
                        {
                            connectionString = ensembleProvider.getConnectionString();
                          //这里创建zookeeper连接,传入的watcher就是 ConnectionState
                            zooKeeperHandle = zookeeperFactory.newZooKeeper(connectionString, sessionTimeout, watcher, canBeReadOnly);
                        }
    
                        helper = new Helper()
                        {
                            @Override
                            public ZooKeeper getZooKeeper() throws Exception
                            {
                                return zooKeeperHandle;
                            }
    
                            @Override
                            public String getConnectionString()
                            {
                                return connectionString;
                            }
    
                            @Override
                            public int getNegotiatedSessionTimeoutMs()
                            {
                                return (zooKeeperHandle != null) ? zooKeeperHandle.getSessionTimeout() : 0;
                            }
                        };
    
                        return zooKeeperHandle;
                    }
                }
    
                @Override
                public String getConnectionString()
                {
                    return connectionString;
                }
    
                @Override
                public int getNegotiatedSessionTimeoutMs()
                {
                    return (zooKeeperHandle != null) ? zooKeeperHandle.getSessionTimeout() : 0;
                }
            };
        }
    //通过上面的helper实现知道这里真正与ZK建立连接
    ZooKeeper getZooKeeper() throws Exception
        {
            return (helper != null) ? helper.getZooKeeper() : null;
        }
    

    创建zookeeper连接之后,watcher接收zookeeper返回的连接事件并进行处理,这里的watcher就是ConnectionState类,执行其中的process方法

        @Override
        public void process(WatchedEvent event)
        {
            //这里为None说明收到的事件是ZK连接状态改变的事件
            if ( event.getType() == Watcher.Event.EventType.None )
            {
                boolean wasConnected = isConnected.get();
                boolean newIsConnected = checkState(event.getState(), wasConnected);
                //连接状态发生变化
                if ( newIsConnected != wasConnected )
                {
                    isConnected.set(newIsConnected);
                    //记录连接开始时间
                    connectionStartMs = System.currentTimeMillis();
                    //连接状态变化为已连接则记录新协商的回话超市时间
                    if ( newIsConnected )
                    {
                        //重置session超时时间
                        lastNegotiatedSessionTimeoutMs.set(zooKeeper.getNegotiatedSessionTimeoutMs());
                        log.debug("Negotiated session timeout: " + lastNegotiatedSessionTimeoutMs.get());
                    }
                }
            }
            //回调CuratorZookeeperClient创建时的watcher,
            for ( Watcher parentWatcher : parentWatchers )
            {
                OperationTrace trace = new OperationTrace("connection-state-parent-process", tracer.get(), getSessionId());
                 //回调CuratorZookeeperClient创建时的watcher
                parentWatcher.process(event);
                trace.commit();
            }
        }
    // 获取当前连接状态
    private boolean checkState(Event.KeeperState state, boolean wasConnected)
        {
            // AtomicBoolean isConnected = new AtomicBoolean(false); 原子boolean保存连接状态
            boolean isConnected = wasConnected;
            boolean checkNewConnectionString = true;
            switch ( state )
            {
            default:
            //连接断开
            case Disconnected:
            {
                isConnected = false;
                break;
            }
          //连接成功
            case SyncConnected:
            case ConnectedReadOnly:
            {
                isConnected = true;
                break;
            }
            //权限验证失败连接失败
            case AuthFailed:
            {
                isConnected = false;
                log.error("Authentication failed");
                break;
            }
          //连接过期
            case Expired:
            {
                isConnected = false;
                checkNewConnectionString = false;
                //处理连接过期
                //调用ConnectionState.reset() 重新构建zookeeper连接
                handleExpiredSession();
                break;
            }
    
            case SaslAuthenticated:
            {
                // NOP
                break;
            }
            }
             //当连接状态发生改变且不是会话过期时,检查ZK地址是否发生变化
            if ( checkNewConnectionString )
            {
                String newConnectionString = zooKeeper.getNewConnectionString();
                if ( newConnectionString != null )
                {  //处理ZK地址发生变化
                    handleNewConnectionString(newConnectionString);
                }
            }
            return isConnected;
        }
    

    parentWatcher.process(event);回调初始化CuratorZookeeperClient时传入的watcher

    new Watcher()
                    {
                        @Override
                        public void process(WatchedEvent watchedEvent)
                        {  //将zookeeper的event包装成CuratorEvent
                            CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
                            processEvent(event);
                        }
                    }
    
    
    private void processEvent(final CuratorEvent curatorEvent)
        {
            if ( curatorEvent.getType() == CuratorEventType.WATCHED )
            {
                //校验连接状态,并将状态加入connectionStateManager进行管理
                validateConnection(curatorEvent.getWatchedEvent().getState());
            }
    
            listeners.forEach(new Function<CuratorListener, Void>()
            {
                @Override
                public Void apply(CuratorListener listener)
                {
                    try
                    {
                        OperationTrace trace = client.startAdvancedTracer("EventListener");
                        //去回调创建client时的client.getCuratorListenable().addListener(new LcpCuratorListener(ip));
                        listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent);
                        trace.commit();
                    }
                    catch ( Exception e )
                    {
                        ThreadUtils.checkInterrupted(e);
                        logError("Event listener threw exception", e);
                    }
                    return null;
                }
            });
        }
    
    void validateConnection(Watcher.Event.KeeperState state)
        {
            if ( state == Watcher.Event.KeeperState.Disconnected )
            {
                internalConnectionHandler.suspendConnection(this);
            }
            else if ( state == Watcher.Event.KeeperState.Expired )
            {  //将状态加入 阻塞队列中,在connectionStateManager.start()中循环获取该队列中的状态数据,并执行我们初始化client时的getConnectionStateListenable().addListener(new LcpConnectionStateListener(ip));
                connectionStateManager.addStateChange(ConnectionState.LOST);
            }
            else if ( state == Watcher.Event.KeeperState.SyncConnected )
            {
                internalConnectionHandler.checkNewConnection(this);
                connectionStateManager.addStateChange(ConnectionState.RECONNECTED);
                unSleepBackgroundOperations();
            }
            else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly )
            {
                internalConnectionHandler.checkNewConnection(this);
                connectionStateManager.addStateChange(ConnectionState.READ_ONLY);
            }
        }
    
    

    connectionStateManager.start(); 开启连接状态管理

    public void start()
        {
            Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
    
            service.submit
                (
                    new Callable<Object>()
                    {
                        @Override
                        public Object call() throws Exception
                        {
                            processEvents();
                            return null;
                        }
                    }
                );
        }
    
     private void processEvents()
        {    //注意是个循环,一直在获取 上面那个阻塞队列中的状态值
            while ( state.get() == State.STARTED )
            {
                try
                {
                    //第一次ZK还没有建立连接,这里得到的就是用户指定的会话超时时间
                    int useSessionTimeoutMs = getUseSessionTimeoutMs();
                    long elapsedMs = startOfSuspendedEpoch == 0 ? useSessionTimeoutMs / 2 : System.currentTimeMillis() - startOfSuspendedEpoch;
                    long pollMaxMs = useSessionTimeoutMs - elapsedMs;
    
                    //这个队列就是刚才放进去事件的队列
                    final ConnectionState newState = eventQueue.poll(pollMaxMs, TimeUnit.MILLISECONDS);
                    if ( newState != null )
                    {
                        if ( listeners.size() == 0 )
                        {
                            log.warn("There are no ConnectionStateListeners registered.");
                        }
                         //这里仅仅就是回调监听器StandardListenerManager<ConnectionStateListener>
                        //client.getConnectionStateListenable().addListener(new ConnectionStateListener());
                        //连接状态变化
                        listeners.forEach(listener -> listener.stateChanged(client, newState));
                    }
                    //该值默认100,如果长时间没有收到事件变化就判断下会话是否过期
                    else if ( sessionExpirationPercent > 0 )
                    {
                        synchronized(this)
                        {
                            checkSessionExpiration();
                        }
                    }
                }
                catch ( InterruptedException e )
                {
                    // swallow the interrupt as it's only possible from either a background
                    // operation and, thus, doesn't apply to this loop or the instance
                    // is being closed in which case the while test will get it
                }
            }
        }
    

    相关文章

      网友评论

          本文标题:Curator源码分析--创建client

          本文链接:https://www.haomeiwen.com/subject/vbypwctx.html