zookeeper常用的Java客户端有三种:zookeeper原生的、Apache Curator、开源的zkclient。Curator官网上这么说
image一般生产环境我们使用curator,它主要解决了三类问题:
1.封装ZooKeeper client与ZooKeeper server之间的连接处理,当会话超时时可自动重连。
2.提供了一套流式风格的操作API
3.提供ZooKeeper各种分布式协调应用场景(recipe, 比如leader选举,分布式锁,分布式缓存等
)的抽象封装。
本文结合curator的使用简要介绍curator的启动加载,会话管理,通知方式和recipe功能的实现。
curator的用法
curator组件如下:
其中curator-recipes是建立在Curator Framework之上实现的,提供了zookeeper分布式协调相关的技巧,大多时候我们只需要依赖这一个jar包即可。
启动加载
//1.初始化过程
CuratorFramework client = CuratorFrameworkFactory.builder().connectString(connectionString)
.retryPolicy(retryPolicy)
.connectionTimeoutMs(connectionTimeoutMs)
.sessionTimeoutMs(sessionTimeoutMs)
// etc
.build();
//2.启动过程
client.start();
可分为初始化过程和启动过程。
初始化过程
通过以上builder模式即可创建一个CuratorFrameworkImpl客户端实例,初始化方法主要为:
public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder)
{
//定义创建原生客户端实例zookeeper的工厂方法
ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());
//zookeeper的包装类,可处理curator较低层次的会话保持和同步请求等
this.client = new CuratorZookeeperClient
(
localZookeeperFactory,
builder.getEnsembleProvider(),
builder.getSessionTimeoutMs(),
builder.getConnectionTimeoutMs(),
new Watcher()
{
@Override
public void process(WatchedEvent watchedEvent)
{
CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
processEvent(event);
}
},
builder.getRetryPolicy(),
builder.canBeReadOnly(),
builder.getConnectionHandlingPolicy()
);
//用于判断连接断开和连接超时的状态,设置curator的连接状态,并通过connectionStateManager触发连接事件状态通知
internalConnectionHandler = isClassic ? new ClassicInternalConnectionHandler() : new StandardInternalConnectionHandler();
//接收事件的通知。后台线程操作事件和连接状态事件会触发
listeners = new ListenerContainer<CuratorListener>();
//当后台线程发生异常或者handler发生异常的时候会触发
unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>();
//后台线程执行的操作队列
backgroundOperations = new DelayQueue<OperationAndData<?>>();
//命名空间
namespace = new NamespaceImpl(this, builder.getNamespace());
//线程工厂方法,初始化后台线程池时会使用
threadFactory = getThreadFactory(builder);
//负责连接状态变化时的通知
connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent());
//CuratorFrameworkImpl的状态,调用start方法之前为 LATENT,调用start方法之后为 STARTED ,调用close()方法之后为STOPPED
state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
//错误连接策略
connectionStateErrorPolicy = Preconditions.checkNotNull(builder.getConnectionStateErrorPolicy(), "errorPolicy cannot be null");
//有保障的执行删除操作,其实是不断尝试直到删除成功,通过递归调用实现
failedDeleteManager = new FailedDeleteManager(this);
//有保障的执行删除watch操作
failedRemoveWatcherManager = new FailedRemoveWatchManager(this);
//服务端可用节点的检测器,第一次连接和重连成功之后都会触发重新获取服务端列表
ensembleTracker = new EnsembleTracker(this, builder.getEnsembleProvider());
}
可以看出,主要初始化了zookeeper客户端包装实例CuratorZookeeperClient
,与后台操作,连接事件,异常相关的listener
容器,命名空间和负载均衡等。这些都是与curator 功能密切相关的实现。这里具体看下CuratorZookeeperClient
和ConnectionStateManager
的初始化过程。
CuratorZookeeperClient初始化
public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy)
{
//3.0.0版本后默认为StandardInternalConnectionHandler,之前session Expired是由服务端告知才会触发Expired事件,
//StandardConnectionHandler当收到Disconnect事件后,如果在规定时间内没有重连到服务器,则会主动触发Expired事件
this.connectionHandlingPolicy = connectionHandlingPolicy;
if ( sessionTimeoutMs < connectionTimeoutMs )
{
log.warn(String.format("session timeout [%d] is less than connection timeout [%d]", sessionTimeoutMs, connectionTimeoutMs));
}
//重新尝试连接的策略
retryPolicy = Preconditions.checkNotNull(retryPolicy, "retryPolicy cannot be null");
ensembleProvider = Preconditions.checkNotNull(ensembleProvider, "ensembleProvider cannot be null");
this.connectionTimeoutMs = connectionTimeoutMs;
//curator注册到原生客户端上的defaultWatcher,会收到和连接状态有关的事件通知等,负责超时重连
state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, tracer, canBeReadOnly, connectionHandlingPolicy);
setRetryPolicy(retryPolicy);
}
可以看出他主要负责了连接的创建和保证连接正常,此外如果直接同步调用客户端与服务端操作,他也根据retryPolicy
负责同步操作时候的连接保证。ConnectionState
是注册到原生客户端上的defaultWatcher.
ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy)
{
this.ensembleProvider = ensembleProvider;
this.sessionTimeoutMs = sessionTimeoutMs;
this.connectionTimeoutMs = connectionTimeoutMs;
this.tracer = tracer;
this.connectionHandlingPolicy = connectionHandlingPolicy;
if ( parentWatcher != null )
{
//因为defaultWatcher只能有一个,通过parentWatchers可实现defaultWatcher接到事件通知时parentWatchers的回调
parentWatchers.offer(parentWatcher);
}
zooKeeper = new HandleHolder(zookeeperFactory, this, ensembleProvider, sessionTimeoutMs, canBeReadOnly);
}
ConnectionStateManager初始化
它主要负责curator相关连接状态的处理和通知,如果我们想要监听连接状态的改变,就需要向它的listeners
上注册一个ConnectionStateListener
。
//连接状态事件通知队列
private final BlockingQueue<ConnectionState> eventQueue = new ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE);
//需要通知的listeners
private final ListenerContainer<ConnectionStateListener> listeners = new ListenerContainer<ConnectionStateListener>();
//ConnectionStateManager的运行状态
private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, int sessionTimeoutMs, int sessionExpirationPercent)
{
this.client = client;
this.sessionTimeoutMs = sessionTimeoutMs;
this.sessionExpirationPercent = sessionExpirationPercent;
if ( threadFactory == null )
{
threadFactory = ThreadUtils.newThreadFactory("ConnectionStateManager");
}
//事件队列处理线程池
service = Executors.newSingleThreadExecutor(threadFactory);
}
启动过程
建立与服务端的会话连接和相关功能的启动
CuratorFrameworkImpl.start
public void start()
{
log.info("Starting");
if ( !state.compareAndSet(CuratorFrameworkState.LATENT, CuratorFrameworkState.STARTED) )
{
throw new IllegalStateException("Cannot be started more than once");
}
try
{
//启动connectionStateManager
connectionStateManager.start(); // ordering dependency - must be called before client.start()
final ConnectionStateListener listener = new ConnectionStateListener()
{
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
if ( ConnectionState.CONNECTED == newState || ConnectionState.RECONNECTED == newState )
{
logAsErrorConnectionErrors.set(true);
}
}
};
this.getConnectionStateListenable().addListener(listener);
//建立与服务端的连接
client.start();
executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
executorService.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
backgroundOperationsLoop();
return null;
}
});
ensembleTracker.start();
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
handleBackgroundOperationException(null, e);
}
}
可以看出分别启动ConnectionStateManager和CuratorZookeeperClient。启动ConnectionStateManager可做好连接事件通知的准备,启动CuratorZookeeperClient建立与服务端的会话连接。
会话管理
curator的会话管理是在原生客户端的会话管理基础上包装而来,在上面的启动过程中我们介绍到ConnectionState会负责超时的重连,ConnectionStateManager会负责连接状态的改变和通知, connectionHandlingPolicy会负责连接超时的主动触发。此外,在客户端执行一些操作时如果感知到连接断开,也可以主动进行连接重连。下面会介绍下curator如何在原生客户端的会话管理基础上进行会话状态的通知
和会话超时的重连
。
我们知道会话连接状态相关的事件类型为Watcher.Event.EventType.None
,会通知到客户端上所有的Watcher,ConnectionState
作为defaultWatcher,它的事件回调如下:
public void process(WatchedEvent event)
{
if ( LOG_EVENTS )
{
log.debug("ConnectState watcher: " + event);
}
if ( event.getType() == Watcher.Event.EventType.None )
{
//isConnected:客户当前的连接状态,true表示已连接(SyncConnected和ConnectedReadOnly状态)
boolean wasConnected = isConnected.get();
boolean newIsConnected = checkState(event.getState(), wasConnected);
if ( newIsConnected != wasConnected )
{
//如果连接状态发生改变,则更新
isConnected.set(newIsConnected);
connectionStartMs = System.currentTimeMillis();
if ( newIsConnected )
{
//说明是重连,更新会话超时协商时间
lastNegotiatedSessionTimeoutMs.set(zooKeeper.getNegotiatedSessionTimeoutMs());
log.debug("Negotiated session timeout: " + lastNegotiatedSessionTimeoutMs.get());
}
}
}
//通知parentWatchers,注意初始化的时候其实传入了一个parentWatcher,会调用CuratorFrameworkImpl.processEvent
for ( Watcher parentWatcher : parentWatchers )
{
TimeTrace timeTrace = new TimeTrace("connection-state-parent-process", tracer.get());
parentWatcher.process(event);
timeTrace.commit();
}
}
可以看到,对连接状态事件的处理主要是checkState
方法
private boolean checkState(Event.KeeperState state, boolean wasConnected)
{
boolean isConnected = wasConnected;
boolean checkNewConnectionString = true;
switch ( state )
{
default:
case Disconnected:
{
isConnected = false;
break;
}
case SyncConnected:
case ConnectedReadOnly:
{
isConnected = true;
break;
}
case AuthFailed:
{
isConnected = false;
log.error("Authentication failed");
break;
}
case Expired:
{
isConnected = false;
checkNewConnectionString = false;
handleExpiredSession();
break;
}
case SaslAuthenticated:
{
// NOP
break;
}
}
if ( checkNewConnectionString )
{
//如果服务端列表发生变化,则更新
String newConnectionString = zooKeeper.getNewConnectionString();
if ( newConnectionString != null )
{
handleNewConnectionString(newConnectionString);
}
}
return isConnected;
}
可以看到会根据不同的会话状态判断连接是否正常,isConnected = true
表示正常。当会话超时过期Expired
时,会调用handleExpiredSession
进行reset
操作,也就是连接的关闭和重新建立新的会话连接。即会话超时的被动重连
。在连接过程中,会根据客户端设置的连接重试机制retryPolicy
检测重连是否超时。
- parentWatchers的回调
其实在CuratorFramework client
初始化时,会初始化一个watcher添加到ConnectionState
的parentWatcher
中,负责连接状态改变时的会话状态改变。
//初始化的parentWatcher
new Watcher()
{
@Override
public void process(WatchedEvent watchedEvent)
{
CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
processEvent(event);
}
}
会话状态改变时调用上面watcher的process
方法,调用至CuratorFrameworkImpl.processEvent
private void processEvent(final CuratorEvent curatorEvent)
{
if ( curatorEvent.getType() == CuratorEventType.WATCHED )
{
//状态转换
validateConnection(curatorEvent.getWatchedEvent().getState());
}
//通知所有注册的CuratorListener
listeners.forEach(new Function<CuratorListener, Void>()
{
@Override
public Void apply(CuratorListener listener)
{
try
{
TimeTrace trace = client.startTracer("EventListener");
listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent);
trace.commit();
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
logError("Event listener threw exception", e);
}
return null;
}
});
}
其中,validateConnection
负责连接状态的转换
void validateConnection(Watcher.Event.KeeperState state)
{
if ( state == Watcher.Event.KeeperState.Disconnected )
{
internalConnectionHandler.suspendConnection(this);
}
else if ( state == Watcher.Event.KeeperState.Expired )
{
connectionStateManager.addStateChange(ConnectionState.LOST);
}
else if ( state == Watcher.Event.KeeperState.SyncConnected )
{
internalConnectionHandler.checkNewConnection(this);
connectionStateManager.addStateChange(ConnectionState.RECONNECTED);
}
else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly )
{
internalConnectionHandler.checkNewConnection(this);
connectionStateManager.addStateChange(ConnectionState.READ_ONLY);
}
}
可以看出ConnectionStateManager
负责curator连接状态的更新。原生客户端的连接状态和curator包装的连接状态对应关系如下:
同时,
ConnectionStateManager
会将当前状态ConnectionState
放入自身的事件队列中,通知所有注册到自身listeners
的ConnectionStateListener
此外,当相关操作(包括同步和后台线程的操作,如getData)发现连接断开了,也会调用client.getZooKeeper()
重连,(注意底层建立客户端连接是加锁的,保证一个客户端只有一个线程可以创建会话成功)。如CuratorFrameworkImpl的后台线程任务:
void performBackgroundOperation(OperationAndData<?> operationAndData)
{
try
{
if ( !operationAndData.isConnectionRequired() || client.isConnected() )
{
operationAndData.callPerformBackgroundOperation();
}
else
{
client.getZooKeeper(); // important - allow connection resets, timeouts, etc. to occur
if ( operationAndData.getElapsedTimeMs() >= client.getConnectionTimeoutMs() )
{
throw new CuratorConnectionLossException();
}
operationAndData.sleepFor(1, TimeUnit.SECONDS);
queueOperation(operationAndData);
}
}
catch ( Throwable e )
{
ThreadUtils.checkInterrupted(e);
/**
* Fix edge case reported as CURATOR-52. ConnectionState.checkTimeouts() throws KeeperException.ConnectionLossException
* when the initial (or previously failed) connection cannot be re-established. This needs to be run through the retry policy
* and callbacks need to get invoked, etc.
*/
if ( e instanceof CuratorConnectionLossException )
{
WatchedEvent watchedEvent = new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Disconnected, null);
CuratorEvent event = new CuratorEventImpl(this, CuratorEventType.WATCHED, KeeperException.Code.CONNECTIONLOSS.intValue(), null, null, operationAndData.getContext(), null, null, null, watchedEvent, null, null);
if ( checkBackgroundRetry(operationAndData, event) )
{
queueOperation(operationAndData);
}
else
{
logError("Background retry gave up", e);
}
}
else
{
handleBackgroundOperationException(operationAndData, e);
}
}
}
通知机制
通知机制其实就是在事件发生的地方触发已经注册好的listerner相应的回调函数(观察者模式)。CuratorFrameworkImpl client可注册listener的方式有:
- 一次性watch
client.checkExists().creatingParentContainersIfNeeded().usingWatcher(watcher).inBackground().forPath(path);
同原生客户端的watch,只能生效一次,需要反复注册。
- 注册CuratorListener
// this is one method of getting event/async notifications
CuratorListener listener = new CuratorListener() {
@Override
public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
// examine event for details
}
};
client.getCuratorListenable().addListener(listener);
其实是将listener添加到CuratorFrameworkImpl.listeners
中。当后台线程完成操作会触发相应的事件通知该listener,如异步创建路径会触发CuratorEventType.CREATE事件。此外当连接状态事件触发时,parentWatcher也会回调这些listeners.
- 注册ConnectionStateListener
ConnectionStateListener connectionStateListener = new ConnectionStateListener()
{
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
//Some details
}
};
client.getConnectionStateListenable().addListener(connectionStateListener);
其实是将connectionStateListener添加到connectionStateManager.listeners
中,在连接状态发生改变时,会收到通知。
- 注册UnhandledErrorListener
UnhandledErrorListener unhandledErrorListener = new UnhandledErrorListener() {
@Override
public void unhandledError(String message, Throwable e) {
//
}
};
client.getUnhandledErrorListenable().addListener(unhandledErrorListener);
其实是将unhandledErrorListener添加到CuratorFrameworkImpl.unhandledErrorListeners
中,当后台线程操作发生异常或者handler发生异常的时候会触发,收到通知。
- 后台线程操作完成时的回调
public static void setDataAsyncWithCallback(CuratorFramework client, BackgroundCallback callback, String path, byte[] payload) throws Exception {
// this is another method of getting notification of an async completion
client.setData().inBackground(callback).forPath(path, payload);
}
对于不同的操作,如setData,可通过链式调用inBackground(callback)传入回调函数callback,这样当操作完成后,会执行回调函数。
- 缓存机制,多次注册
curator提供了三种类型的缓存方式:Path Cache,Node Cache 和Tree Cache。相当于和服务端的节点数据进行对比,没当数据不一致时,会通过watch机制触发,后台回调更新本地缓存数据,同时再次注册相应的watch。同时,每次重连成功之后也会重新注册watch,保证了watch不丢失。
结合上面会话管理和通知机制的介绍,可以知道原生客户端的watcher是同步通知的,当然可以在特定watcher中的处理做异步。connectionStateManager.listeners
是由内部的线程池做异步通知的,CuratorFrameworkImpl.listeners
对于连接状态的通知是与watcher通知线程同步,由后台线程通知时为异步。如果客户端watcher注册过多,那么可能就会导致重连之后watch丢失(重连会清空sendThread的发送和接收队列,可能会导致watch丢失),甚至重连不成功(本文分析的版本3.1.0中只要调用client.getZooKeeper()
就会重连,和当时的连接状态无关。所以我觉得看版本,如参考资料二)。
recipes功能
curator实现的recipes功能主要有:分布式锁,Leader选举,Barriers,计数器,缓存,临时节点,
队列,事务等。具体使用方式可参考资料一。下面简要介绍下实现原理。
TODO
-
分布式锁
-
Leader选举
-
Barriers
-
计数器
-
缓存
-
临时节点
-
队列
-
事务
参考资料:
跟着实例学习ZooKeeper的用法: 文章汇总
Zookeeper Client架构分析——ZK链接重连失败排查
网友评论