转:http://throwable.coding.me/2018/12/16/zookeeper-curator-usage
提醒:
1.推荐使用ConnectionStateListener监控连接的状态,因为当连接LOST时你不再拥有锁
2.分布式的锁全局同步, 这意味着任何一个时间点不会有两个客户端都拥有相同的锁。
可重入共享锁—Shared Reentrant Lock
Shared意味着锁是全局可见的, 客户端都可以请求锁。 Reentrant和JDK的ReentrantLock类似,即可重入, 意味着同一个客户端在拥有锁的同时,可以多次获取,不会被阻塞。 它是由类InterProcessMutex
来实现。 它的构造函数为:
public InterProcessMutex(CuratorFramework client, String path)
通过acquire()
获得锁,并提供超时机制:
public void acquire()
Acquire the mutex - blocking until it's available. Note: the same thread can call acquire
re-entrantly. Each call to acquire must be balanced by a call to release()
public boolean acquire(long time,TimeUnit unit)
Acquire the mutex - blocks until it's available or the given time expires. Note: the same thread can call acquire re-entrantly. Each call to acquire that returns true must be balanced by a call to release()
Parameters:
time - time to wait
unit - time unit
Returns:
true if the mutex was acquired, false if not
通过release()
方法释放锁。 InterProcessMutex 实例可以重用。
Revoking ZooKeeper recipes wiki定义了可协商的撤销机制。 为了撤销mutex, 调用下面的方法:
public void makeRevocable(RevocationListener<T> listener)
将锁设为可撤销的. 当别的进程或线程想让你释放锁时Listener会被调用。
Parameters:
listener - the listener
如果你请求撤销当前的锁, 调用attemptRevoke()
方法,注意锁释放时RevocationListener
将会回调。
public static void attemptRevoke(CuratorFramework client,String path) throws Exception
Utility to mark a lock for revocation. Assuming that the lock has been registered
with a RevocationListener, it will get called and the lock should be released. Note,
however, that revocation is cooperative.
Parameters:
client - the client
path - the path of the lock - usually from something like InterProcessMutex.getParticipantNodes()
二次提醒:错误处理 还是强烈推荐你使用ConnectionStateListener
处理连接状态的改变。 当连接LOST时你不再拥有锁。
首先让我们创建一个模拟的共享资源, 这个资源期望只能单线程的访问,否则会有并发问题。
public class FakeLimitedResource {
private final AtomicBoolean inUse = new AtomicBoolean(false);
public void use() throws InterruptedException {
// 真实环境中我们会在这里访问/维护一个共享的资源
//这个例子在使用锁的情况下不会非法并发异常IllegalStateException
//但是在无锁的情况由于sleep了一段时间,很容易抛出异常
if (!inUse.compareAndSet(false, true)) {
throw new IllegalStateException("Needs to be used by one client at a time");
}
try {
Thread.sleep((long) (3 * Math.random()));
} finally {
inUse.set(false);
}
}
}
然后创建一个InterProcessMutexDemo
类, 它负责请求锁, 使用资源,释放锁这样一个完整的访问过程。
public class InterProcessMutexDemo {
private InterProcessMutex lock;
private final FakeLimitedResource resource;
private final String clientName;
public InterProcessMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
this.resource = resource;
this.clientName = clientName;
this.lock = new InterProcessMutex(client, lockPath);
}
public void doWork(long time, TimeUnit unit) throws Exception {
if (!lock.acquire(time, unit)) {
throw new IllegalStateException(clientName + " could not acquire the lock");
}
try {
System.out.println(clientName + " get the lock");
resource.use(); //access resource exclusively
} finally {
System.out.println(clientName + " releasing the lock");
lock.release(); // always release the lock in a finally block
}
}
private static final int QTY = 5;
private static final int REPETITIONS = QTY * 10;
private static final String PATH = "/examples/locks";
public static void main(String[] args) throws Exception {
final FakeLimitedResource resource = new FakeLimitedResource();
ExecutorService service = Executors.newFixedThreadPool(QTY);
final TestingServer server = new TestingServer();
try {
for (int i = 0; i < QTY; ++i) {
final int index = i;
Callable<Void> task = new Callable<Void>() {
@Override
public Void call() throws Exception {
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
try {
client.start();
final InterProcessMutexDemo example = new InterProcessMutexDemo(client, PATH, resource, "Client " + index);
for (int j = 0; j < REPETITIONS; ++j) {
example.doWork(10, TimeUnit.SECONDS);
}
} catch (Throwable e) {
e.printStackTrace();
} finally {
CloseableUtils.closeQuietly(client);
}
return null;
}
};
service.submit(task);
}
service.shutdown();
service.awaitTermination(10, TimeUnit.MINUTES);
} finally {
CloseableUtils.closeQuietly(server);
}
}
}
代码也很简单,生成10个client, 每个client重复执行10次 请求锁–访问资源–释放锁的过程。每个client都在独立的线程中。 结果可以看到,锁是随机的被每个实例排他性的使用。
既然是可重用的,你可以在一个线程中多次调用acquire()
,在线程拥有锁时它总是返回true。
你不应该在多个线程中用同一个InterProcessMutex
, 你可以在每个线程中都生成一个新的InterProcessMutex实例,它们的path都一样,这样它们可以共享同一个锁。
不可重入共享锁—Shared Lock
这个锁和上面的InterProcessMutex
相比,就是少了Reentrant的功能,也就意味着它不能在同一个线程中重入。这个类是InterProcessSemaphoreMutex
,使用方法和InterProcessMutex
类似
public class InterProcessSemaphoreMutexDemo {
private InterProcessSemaphoreMutex lock;
private final FakeLimitedResource resource;
private final String clientName;
public InterProcessSemaphoreMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
this.resource = resource;
this.clientName = clientName;
this.lock = new InterProcessSemaphoreMutex(client, lockPath);
}
public void doWork(long time, TimeUnit unit) throws Exception {
if (!lock.acquire(time, unit))
{
throw new IllegalStateException(clientName + " 不能得到互斥锁");
}
System.out.println(clientName + " 已获取到互斥锁");
if (!lock.acquire(time, unit))
{
throw new IllegalStateException(clientName + " 不能得到互斥锁");
}
System.out.println(clientName + " 再次获取到互斥锁");
try {
System.out.println(clientName + " get the lock");
resource.use(); //access resource exclusively
} finally {
System.out.println(clientName + " releasing the lock");
lock.release(); // always release the lock in a finally block
lock.release(); // 获取锁几次 释放锁也要几次
}
}
private static final int QTY = 5;
private static final int REPETITIONS = QTY * 10;
private static final String PATH = "/examples/locks";
public static void main(String[] args) throws Exception {
final FakeLimitedResource resource = new FakeLimitedResource();
ExecutorService service = Executors.newFixedThreadPool(QTY);
final TestingServer server = new TestingServer();
try {
for (int i = 0; i < QTY; ++i) {
final int index = i;
Callable<Void> task = new Callable<Void>() {
@Override
public Void call() throws Exception {
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
try {
client.start();
final InterProcessSemaphoreMutexDemo example = new InterProcessSemaphoreMutexDemo(client, PATH, resource, "Client " + index);
for (int j = 0; j < REPETITIONS; ++j) {
example.doWork(10, TimeUnit.SECONDS);
}
} catch (Throwable e) {
e.printStackTrace();
} finally {
CloseableUtils.closeQuietly(client);
}
return null;
}
};
service.submit(task);
}
service.shutdown();
service.awaitTermination(10, TimeUnit.MINUTES);
} finally {
CloseableUtils.closeQuietly(server);
}
Thread.sleep(Integer.MAX_VALUE);
}
}
运行后发现,有且只有一个client成功获取第一个锁(第一个acquire()
方法返回true),然后它自己阻塞在第二个acquire()
方法,获取第二个锁超时;其他所有的客户端都阻塞在第一个acquire()
方法超时并且抛出异常。
这样也就验证了InterProcessSemaphoreMutex
实现的锁是不可重入的。
可重入读写锁—Shared Reentrant Read Write Lock
类似JDK的ReentrantReadWriteLock。一个读写锁管理一对相关的锁。一个负责读操作,另外一个负责写操作。读操作在写锁没被使用时可同时由多个进程使用,而写锁在使用时不允许读(阻塞)。
此锁是可重入的。一个拥有写锁的线程可重入读锁,但是读锁却不能进入写锁。这也意味着写锁可以降级成读锁, 比如请求写锁 —>请求读锁—>释放读锁 —->释放写锁。从读锁升级成写锁是不行的。
可重入读写锁主要由两个类实现:InterProcessReadWriteLock
、InterProcessMutex
。使用时首先创建一个InterProcessReadWriteLock
实例,然后再根据你的需求得到读锁或者写锁,读写锁的类型是InterProcessMutex
。
public class ReentrantReadWriteLockDemo {
private final InterProcessReadWriteLock lock;
private final InterProcessMutex readLock;
private final InterProcessMutex writeLock;
private final FakeLimitedResource resource;
private final String clientName;
public ReentrantReadWriteLockDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
this.resource = resource;
this.clientName = clientName;
lock = new InterProcessReadWriteLock(client, lockPath);
readLock = lock.readLock();
writeLock = lock.writeLock();
}
public void doWork(long time, TimeUnit unit) throws Exception {
// 注意只能先得到写锁再得到读锁,不能反过来!!!
if (!writeLock.acquire(time, unit)) {
throw new IllegalStateException(clientName + " 不能得到写锁");
}
System.out.println(clientName + " 已得到写锁");
if (!readLock.acquire(time, unit)) {
throw new IllegalStateException(clientName + " 不能得到读锁");
}
System.out.println(clientName + " 已得到读锁");
try {
resource.use(); // 使用资源
Thread.sleep(1000);
} finally {
System.out.println(clientName + " 释放读写锁");
readLock.release();
writeLock.release();
}
}
private static final int QTY = 5;
private static final int REPETITIONS = QTY ;
private static final String PATH = "/examples/locks";
public static void main(String[] args) throws Exception {
final FakeLimitedResource resource = new FakeLimitedResource();
ExecutorService service = Executors.newFixedThreadPool(QTY);
final TestingServer server = new TestingServer();
try {
for (int i = 0; i < QTY; ++i) {
final int index = i;
Callable<Void> task = new Callable<Void>() {
@Override
public Void call() throws Exception {
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
try {
client.start();
final ReentrantReadWriteLockDemo example = new ReentrantReadWriteLockDemo(client, PATH, resource, "Client " + index);
for (int j = 0; j < REPETITIONS; ++j) {
example.doWork(10, TimeUnit.SECONDS);
}
} catch (Throwable e) {
e.printStackTrace();
} finally {
CloseableUtils.closeQuietly(client);
}
return null;
}
};
service.submit(task);
}
service.shutdown();
service.awaitTermination(10, TimeUnit.MINUTES);
} finally {
CloseableUtils.closeQuietly(server);
}
}
}
信号量—Shared Semaphore
一个计数的信号量类似JDK的Semaphore。 JDK中Semaphore维护的一组许可(permits),而Curator中称之为租约(Lease)。 有两种方式可以决定semaphore的最大租约数。第一种方式是用户给定path并且指定最大LeaseSize。第二种方式用户给定path并且使用SharedCountReader
类。如果不使用SharedCountReader, 必须保证所有实例在多进程中使用相同的(最大)租约数量,否则有可能出现A进程中的实例持有最大租约数量为10,但是在B进程中持有的最大租约数量为20,此时租约的意义就失效了。
这次调用acquire()
会返回一个租约对象。 客户端必须在finally中close这些租约对象,否则这些租约会丢失掉。 但是, 但是,如果客户端session由于某种原因比如crash丢掉, 那么这些客户端持有的租约会自动close, 这样其它客户端可以继续使用这些租约。 租约还可以通过下面的方式返还:
public void returnAll(Collection<Lease> leases)
public void returnLease(Lease lease)
注意你可以一次性请求多个租约,如果Semaphore当前的租约不够,则请求线程会被阻塞。 同时还提供了超时的重载方法。
public Lease acquire()
public Collection<Lease> acquire(int qty)
public Lease acquire(long time, TimeUnit unit)
public Collection<Lease> acquire(int qty, long time, TimeUnit unit)
Shared Semaphore使用的主要类包括下面几个:
InterProcessSemaphoreV2
Lease
SharedCountReader
private static final int MAX_LEASE = 10;
private static final String PATH = "/examples/locks";
public static void main(String[] args) throws Exception {
FakeLimitedResource resource = new FakeLimitedResource();
try (TestingServer server = new TestingServer()) {
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
client.start();
InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE);
Collection<Lease> leases = semaphore.acquire(5);
System.out.println("get " + leases.size() + " leases");
Lease lease = semaphore.acquire();
System.out.println("get another lease");
resource.use();
Collection<Lease> leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);
System.out.println("Should timeout and acquire return " + leases2);
System.out.println("return one lease");
semaphore.returnLease(lease);
System.out.println("return another 5 leases");
semaphore.returnAll(leases);
}
}
}
首先我们先获得了5个租约, 最后我们把它还给了semaphore。 接着请求了一个租约,因为semaphore还有5个租约,所以请求可以满足,返回一个租约,还剩4个租约。 然后再请求一个租约,因为租约不够,阻塞到超时,还是没能满足,返回结果为null(租约不足会阻塞到超时,然后返回null,不会主动抛出异常;如果不设置超时时间,会一致阻塞)。
上面说讲的锁都是公平锁(fair)。 总ZooKeeper的角度看, 每个客户端都按照请求的顺序获得锁,不存在非公平的抢占的情况。
多共享锁对象 —Multi Shared Lock
Multi Shared Lock是一个锁的容器。 当调用acquire()
, 所有的锁都会被acquire()
,如果请求失败,所有的锁都会被release。 同样调用release时所有的锁都被release(失败被忽略)。 基本上,它就是组锁的代表,在它上面的请求释放操作都会传递给它包含的所有的锁。
主要涉及两个类:
InterProcessMultiLock
InterProcessLock
它的构造函数需要包含的锁的集合,或者一组ZooKeeper的path。
public InterProcessMultiLock(List<InterProcessLock> locks)
public InterProcessMultiLock(CuratorFramework client, List<String> paths)
用法和Shared Lock相同。
public class MultiSharedLockDemo {
private static final String PATH1 = "/examples/locks1";
private static final String PATH2 = "/examples/locks2";
public static void main(String[] args) throws Exception {
FakeLimitedResource resource = new FakeLimitedResource();
try (TestingServer server = new TestingServer()) {
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
client.start();
InterProcessLock lock1 = new InterProcessMutex(client, PATH1);
InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, PATH2);
InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));
if (!lock.acquire(10, TimeUnit.SECONDS)) {
throw new IllegalStateException("could not acquire the lock");
}
System.out.println("has got all lock");
System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());
try {
resource.use(); //access resource exclusively
} finally {
System.out.println("releasing the lock");
lock.release(); // always release the lock in a finally block
}
System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());
}
}
}
新建一个InterProcessMultiLock
, 包含一个重入锁和一个非重入锁。 调用acquire()
后可以看到线程同时拥有了这两个锁。 调用release()
看到这两个锁都被释放了。
最后再重申一次, 强烈推荐使用ConnectionStateListener监控连接的状态,当连接状态为LOST,锁将会丢失。
分布式计数器
顾名思义,计数器是用来计数的, 利用ZooKeeper可以实现一个集群共享的计数器。 只要使用相同的path就可以得到最新的计数器值, 这是由ZooKeeper的一致性保证的。Curator有两个计数器, 一个是用int来计数(SharedCount
),一个用long来计数(DistributedAtomicLong
)。
分布式int计数器—SharedCount
这个类使用int类型来计数。 主要涉及三个类。
- SharedCount
- SharedCountReader
- SharedCountListener
SharedCount
代表计数器, 可以为它增加一个SharedCountListener
,当计数器改变时此Listener可以监听到改变的事件,而SharedCountReader
可以读取到最新的值, 包括字面值和带版本信息的值VersionedValue。
public class SharedCounterDemo implements SharedCountListener {
private static final int QTY = 5;
private static final String PATH = "/examples/counter";
public static void main(String[] args) throws IOException, Exception {
final Random rand = new Random();
SharedCounterDemo example = new SharedCounterDemo();
try (TestingServer server = new TestingServer()) {
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
client.start();
SharedCount baseCount = new SharedCount(client, PATH, 0);
baseCount.addListener(example);
baseCount.start();
List<SharedCount> examples = Lists.newArrayList();
ExecutorService service = Executors.newFixedThreadPool(QTY);
for (int i = 0; i < QTY; ++i) {
final SharedCount count = new SharedCount(client, PATH, 0);
examples.add(count);
Callable<Void> task = () -> {
count.start();
Thread.sleep(rand.nextInt(10000));
System.out.println("Increment:" + count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10)));
return null;
};
service.submit(task);
}
service.shutdown();
service.awaitTermination(10, TimeUnit.MINUTES);
for (int i = 0; i < QTY; ++i) {
examples.get(i).close();
}
baseCount.close();
}
Thread.sleep(Integer.MAX_VALUE);
}
@Override
public void stateChanged(CuratorFramework arg0, ConnectionState arg1) {
System.out.println("State changed: " + arg1.toString());
}
@Override
public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
System.out.println("Counter's value is changed to " + newCount);
}
}
这里我们使用trySetCount
去设置计数器。 第一个参数提供当前的VersionedValue,如果期间其它client更新了此计数值, 你的更新可能不成功, 但是这时你的client更新了最新的值,所以失败了你可以尝试再更新一次。 而setCount
是强制更新计数器的值。
注意计数器必须start
,使用完之后必须调用close
关闭它。
强烈推荐使用ConnectionStateListener
。 在本例中SharedCountListener
扩展ConnectionStateListener
。
分布式long计数器—DistributedAtomicLong
再看一个Long类型的计数器。 除了计数的范围比SharedCount
大了之外, 它首先尝试使用乐观锁的方式设置计数器, 如果不成功(比如期间计数器已经被其它client更新了), 它使用InterProcessMutex
方式来更新计数值。
可以从它的内部实现DistributedAtomicValue.trySet()
中看出:
AtomicValue<byte[]> trySet(MakeValue makeValue) throws Exception
{
MutableAtomicValue<byte[]> result = new MutableAtomicValue<byte[]>(null, null, false);
tryOptimistic(result, makeValue);
if ( !result.succeeded() && (mutex != null) )
{
tryWithMutex(result, makeValue);
}
return result;
}
此计数器有一系列的操作:
- get(): 获取当前值
- increment(): 加一
- decrement(): 减一
- add(): 增加特定的值
- subtract(): 减去特定的值
- trySet(): 尝试设置计数值
- forceSet(): 强制设置计数值
你必须检查返回结果的succeeded()
, 它代表此操作是否成功。 如果操作成功, preValue()
代表操作前的值, postValue()
代表操作后的值。
public class DistributedAtomicLongDemo {
private static final int QTY = 5;
private static final String PATH = "/examples/counter";
public static void main(String[] args) throws IOException, Exception {
List<DistributedAtomicLong> examples = Lists.newArrayList();
try (TestingServer server = new TestingServer()) {
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
client.start();
ExecutorService service = Executors.newFixedThreadPool(QTY);
for (int i = 0; i < QTY; ++i) {
final DistributedAtomicLong count = new DistributedAtomicLong(client, PATH, new RetryNTimes(10, 10));
examples.add(count);
Callable<Void> task = () -> {
try {
AtomicValue<Long> value = count.increment();
System.out.println("succeed: " + value.succeeded());
if (value.succeeded())
System.out.println("Increment: from " + value.preValue() + " to " + value.postValue());
} catch (Exception e) {
e.printStackTrace();
}
return null;
};
service.submit(task);
}
service.shutdown();
service.awaitTermination(10, TimeUnit.MINUTES);
Thread.sleep(Integer.MAX_VALUE);
}
}
}
网友评论