美文网首页
Java并发——Lock

Java并发——Lock

作者: 坠尘_ae94 | 来源:发表于2020-08-05 21:27 被阅读0次

CAS

Compare and Swap,语义为“我认为V的值应该为A,如果是,那么将V的值更新为B,否则不修改并告诉V的值实际为多少”

想了解CAS算法,可以看看这个Java Compare and Swap Example – CAS Algorithm

CountDownLatch

A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.

成员变量

private final Sync sync;

Sync为CountDownLatch的一个内部类:

    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c - 1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

Sync类中有点锁的味道了,一个tryAcquireShared方法尝试在共享模式下获取对象状态,一个tryReleaseShared方法在共享模式下设置状态。

可以看到这里使用了compareAndSetState方法,底层调用:

    public final native
    @MethodHandle.PolymorphicSignature
    @HotSpotIntrinsicCandidate
    boolean compareAndSet(Object... args);

也就是CAS算法。

构造方法

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

CountDownLatch只有一个构造方法,在此构造sync属性.

一些方法

await

CountDownLatch中主要方法就俩,一个await,一个countDown.

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

await方法调用sync的acquireSharedInterruptibly方法,一般这种以xxxInterruptibly形式命名的方法都是可中断的,并且如果被中断会抛出InterruptedException:

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

事实也确实如此.继续进入doAcquireSharedInterruptibly,此方法由Sync的父类AQS提供

    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //添加节点至等待队列
        //构造了一个Node,将当前的线程存进去了,模式是共享模式。
        final Node node = addWaiter(Node.SHARED);
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    //试图在共享模式下获取对象状态
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        //设置头节点并繁殖
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } catch (Throwable t) {
            cancelAcquire(node);
            throw t;
        }
    }

这里有调用Sync的tryAcquireShared方法:

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

getState方法返回AQS的state属性值,这里也就是判断AQS的state是否为0.

如果获取到的state仍然不为0,也就是说程序会进入shouldParkAfterFailedAcquire方法:

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
        }
        return false;
    }

compareAndSetWaitStatus 将 prev 的 waitStatus 设置为 Node.SIGNAL 。

Node.SIGNAL 表示后续结点中的线程需要被unparking(类似被唤醒的意思)。该方法返回false。

写不下去了..

countDown

countDown方法调用了releaseShared方法

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

算了,一下子实在看不明白

应用一 线程间通信

当前有两个任务A、B正在进行,A的中期依赖于B前期的结果,此时可以使用CountDwonLatch进行阻止A线程(其实是所有其他线程)继续执行。

public static void testAwait() {
    CountDownLatch countDownLatch = new CountDownLatch(1);
    new Thread(() -> {
        try {
            countDownLatch.await();
            System.out.println("Using Data");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
    new Thread(() -> {
        System.out.println("prepare for data");
        try {
            TimeUnit.SECONDS.sleep(3);
            System.out.println("data is done");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            countDownLatch.countDown();
        }
    }).start();
}

结果:

prepare for data
data is done
Using Data

扩展一 增加回调方法

public class MyCountDownLatchWithCallBack extends CountDownLatch {
    private Runnable runnable;

    public MyCountDownLatchWithCallBack(int count, Runnable runnable) {
        super(count);
        this.runnable = runnable;
    }

    @Override
    public void countDown() {
        super.countDown();
        if (getCount() <= 0) {
            runnable.run();
        }
    }
}

如此即可实现在执行完countDown之后判断count是否为0,若是则执行Runnable接口传入的任务.

CyclicBarrier

A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called <em>cyclic</em> because it can be re-used after the waiting threads are released.

同步屏障,可以让一组线程达到一个屏障时被阻塞,直到最后一个线程达到屏障时,此后被阻塞的线程才能继续执行。

成员变量

/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
/** The number of parties */
private final int parties;
/** The command to run when tripped */
private final Runnable barrierCommand;
/** The current generation */
private Generation generation = new Generation();

/**
 * Number of parties still waiting. Counts down from parties to 0
 * on each generation.  It is reset to parties on each new
 * generation or when broken.
 */
private int count;

Generation是CyclicBarrier的一个内部类:

    private static class Generation {
        Generation() {}                 // prevent access constructor creation
        boolean broken;                 // initially false
    }

构造方法

public CyclicBarrier(int parties) {}
public CyclicBarrier(int parties, Runnable barrierAction) {}

两个参数的构造方法用于指定回调方法。

API

await

最重要的方法了:

        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
                System.out.println("T1 finished");
                cyclicBarrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }).start();
        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(3);
                System.out.println("T2 finished");
                cyclicBarrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }).start();

当两个线程同时到达屏障时,第一个等待的线程才可以和第二个线程一样,执行接下来的代码。

reset

CyclicBarrie为啥会有Cyclic?就是因为reset方法。

比如,在await示例代码中再加个线程:

        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(3);
                System.out.println("T3 finished");
                cyclicBarrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }).start();

就会发现,程序完全停不下来了。

await方法想要获取parties,但是此时parties被上面的程序调用了两次已经归0了,所以它不可能还能获取。

cyclicBarrier.reset();

它可以使CyclicBarrier内部维护的parties恢复原值,从而达到可复用的目的。

Exchanger

A synchronization point at which threads can pair and swap elements within pairs. Each thread presents some object on entry to the {@link #exchange exchange} method, matches with a partner thread, and receives its partner's object on return. An Exchanger may be viewed as a bidirectional form of a {@link SynchronousQueue}. Exchangers may be useful in applications such as genetic algorithms and pipeline designs.

一个线程在完成一定的事务后想与另一个线程交换数据,则第一个先拿出数据的线程会一直等待第二个线程,直到第二个线程拿着数据到来时才能彼此交换对应数据。

构造方法

    public Exchanger() {
        participant = new Participant();
    }

Participant是一个内部类,继承自ThreadLocal

应用

两个线程分享数据:

Exchanger<String> exchanger = new Exchanger<>();
new Thread(() -> {
    System.out.println(System.currentTimeMillis() + " before exchange A");
    try {
        TimeUnit.SECONDS.sleep(1);
        System.out.println("A " + exchanger.exchange("From A"));
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}).start();
new Thread(() -> {
    System.out.println(System.currentTimeMillis() + " before exchange B");
    try {
        TimeUnit.SECONDS.sleep(3);
        System.out.println("B " + exchanger.exchange("From B"));
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}).start();

结果:

1596614639211 before exchange B
1596614639210 before exchange A
B From A
A From B

exchange方法本身是阻塞方法,也就是说如果线程A内执行了Exchanger的exchange方法,并且线程B尚未执行exchange方法,则A就会被阻塞。

线程交换的数据为原数据。

Semaphore

A counting semaphore. Conceptually, a semaphore maintains a set of permits. Each {@link #acquire} blocks if necessary until a permit is available, and then takes it. Each {@link #release} adds a permit, potentially releasing a blocking acquirer.
However, no actual permit objects are used; the {@code Semaphore} just keeps a count of the number available and acts accordingly.

Semaphore也叫信号量,在JDK1.5被引入,可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源。

构造方法

public Semaphore(int permits) {}
public Semaphore(int permits, boolean fair) {}

这里的fair指的是:if this semaphore will guarantee first-in first-out granting of permits under contention ,也就是公平与否。

permits就是Semaphore内部维护的一组虚拟的许可。

  • 访问特定资源前,必须使用acquire方法获得许可,如果许可数量为0,该线程则一直阻塞,直到有可用许可。
  • 访问资源后,使用release释放许可。

这个特性可以用来做锁。

应用

public class SemaphoreLock {
    private Semaphore semaphore = new Semaphore(1);
    public void lock() throws InterruptedException {
        semaphore.acquire();
    }
    public void unlock(){
        semaphore.release();
    }
}

测试:

        SemaphoreLock lock = new SemaphoreLock();
        IntStream.range(0, 2).forEach(i -> new Thread( () -> {
            try {
                lock.lock();
                System.out.println(Thread.currentThread().getName() + " get the semaphore");
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                lock.unlock();
                System.out.println(Thread.currentThread().getName() + " release the semaphore");
            }
        },"Thread-" + i).start());

那么这个锁与synchronized有何区别?

答:最明显的区别是Semaphore可以指定许可证的数量(permits),实现同时允许多个线程取得锁,而synchronized只可以被一个线程获取锁

APIs

acquireUniterruptibly(int)

不理会中断异常

availablePermints()

当前可用的许可证数目

getQueueLength()

调用acquire()方法被阻塞的线程(评估值,不一定正确,因为线程在不断改变)

drainPermits()

排干所有许可证

hasQueuedThread()

当前有多少个block的线程(仅供参考)

tryAcquire()

尝试获取许可,不阻塞,拿不到就算了

ReentrantLock

A reentrant mutual exclusion {@link Lock} with the same basic behavior and semantics as the implicit monitor lock accessed using {@code synchronized} methods and statements, but with extended capabilities.

一个在内存语义上和synchronized一样效果的Java类,同时还扩展了其他一些高级特性,比如定时的锁等待、可中断的锁等待和公平性等.

同样,他也是可重入锁(可重入锁,也叫做递归锁,指的是在同一线程内,外层函数获得锁之后,内层递归函数仍然可以获取到该锁。换一种说法:同一个线程再次进入同步代码时,可以使用自己已获取到的锁,详细信息可参考此博客)。

构造方法

public ReentrantLock() {}
public ReentrantLock(boolean fair) {}       

这里的fair:if this lock should use a fair ordering policy,用来区分公平锁和非公平锁。

应用

得益于Lock接口的tryLock方法,ReentrantLock可作为不可重入锁:

        ReentrantLock lock = new ReentrantLock();
        IntStream.range(0,2).forEach(i -> new Thread(()->{
            if (lock.tryLock()){
                try {
                    Optional.of("The thread-"+Thread.currentThread().getName()+" get lock..").ifPresent(System.out::println);
                    TimeUnit.SECONDS.sleep(3);
                    System.out.println(Thread.currentThread().getName()+" finished");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }else {
                Optional.of("The thread-"+Thread.currentThread().getName()+" not get lock..").ifPresent(System.out::println);
            }
        }).start());

与synchronized一般使用:

ReentrantLock lock = new ReentrantLock();
try {
        lock.lock();
     //……
 }finally {
     lock.unlock();
 }

ReadWriteLock

A {@code ReadWriteLock} maintains a pair of associated {@link Lock locks}, one for read-only operations and one for writing. The {@linkplain #readLock read lock} may be held simultaneously by multiple reader threads, so long as there are no writers. The {@linkplain #writeLock write lock} is exclusive.

这是一个接口,其中定义了两个方法:

    Lock readLock();
    Lock writeLock();

应用

这里主要看它的实现类ReentrantReadWriteLock。

private final static ReadWriteLock lock = new ReentrantReadWriteLock();
private final static Lock readLock = lock.readLock();
private final static Lock writeLock = lock.writeLock();
private static final List<Long > data =new ArrayList<>();
public static void main(String[] args) throws InterruptedException {
    Thread t1 = new Thread(ReadWriteLockTest::read);
    t1.start();
    TimeUnit.SECONDS.sleep(1);
    Thread t2 = new Thread(ReadWriteLockTest::read);
    t2.start();
    t1.join();
    t2.join();
    data.forEach(System.out::println);
}
public static void write(){
    try {
        writeLock.lock();
        data.add(System.currentTimeMillis());
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        writeLock.unlock();
    }
}
public static void read(){
    try {
        readLock.lock();
        data.forEach(System.out::println);
        TimeUnit.SECONDS.sleep(3);
        System.out.println(Thread.currentThread().getName()+":============================");
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        readLock.unlock();
    }
}

读写锁,线程同时读数据不加锁,一旦写数据就会加锁

如下表:W代表写,R代表读

T1 T2 Allow
W W N
W R N
R R Y
R W N

Condition

{@code Condition} factors out the {@code Object} monitor methods ({@link Object#wait() wait}, {@link Object#notify notify} and {@link Object#notifyAll notifyAll}) into distinct objects to give the effect of having multiple wait-sets per object, by combining them with the use of arbitrary {@link Lock} implementations.
Where a {@code Lock} replaces the use of {@code synchronized} methods and statements, a {@code Condition} replaces the use of the Object monitor methods.

在java中,对于任意一个java对象,它都拥有一组定义在java.lang.Object上监视器方法,包括wait()wait(long timeout)notify()notifyAll(),这些方法配合synchronized关键字一起使用可以实现等待/通知模式。

同样,Condition接口也提供了类似Object监视器的方法,通过与Lock配合来实现等待/通知模式。

Condition对象一般通过Lock得出,直接用new新建的对象与Lock没有关系。

应用

比如:当前有两个线程T1和T2,T1负责生产数据,T2负责消费(输出)数据,T1没数据,T2就不能消费。类似生产者消费者模型。


    
    private final static ReentrantLock lock = new ReentrantLock();
    
    private final static Condition condition = lock.newCondition();
    
    private static int data =0;
    
    //  使用volatile保证变量的可见性
    private static volatile boolean noUse = true;
    
    private static void buildData(){
        
        try {
//            synchronized key word #monitor enter
            lock.lock();
            while (noUse){
//                monitor.wait()
                condition.await();
            }
            data ++;
            Optional.of("P:"+data).ifPresent(System.out::println);
            TimeUnit.SECONDS.sleep(1);
            noUse = true;
//            monitor.notify()
            condition.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
//            synchronized end #monitor end
            lock.unlock();
        }
    }
    private static void useData(){
        try {
            lock.lock();
            while (!noUse){
                condition.await();
            }
            TimeUnit.SECONDS.sleep(1);
            Optional.of("C:"+data).ifPresent(System.out::println);
            noUse = false;
            condition.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    public static void main(String[] args) {
        new Thread(() -> {
            while (true){
                buildData();
            }
        }).start();

        new Thread(() -> {
            while (true){
                useData();
            }
        }).start();
    }

StampedLock

A capability-based lock with three modes for controlling read/write access. The state of a StampedLock consists of a version and mode. Lock acquisition methods return a stamp that represents and controls access with respect to a lock state; "try" versions of these methods may instead return the special value zero to
represent failure to acquire access. Lock release and conversion methods require stamps as arguments, and fail if they do not match the state of the lock.

StampedLock控制锁有三种模式,一个StampedLock状态是由版本和模式两个部分组成,锁获取方法返回一个数字作为票据stamp,它用相应的锁状态表示并控制访问,数字0表示没有写锁被授权访问。在读锁上分为悲观锁和乐观锁。

在ReadWriteLock中写和读是互斥的,也就是如果有一个线程在写共享变量的话,其他线程读共享变量都会阻塞。

StampedLock把读分为了悲观读和乐观读,悲观读就等价于ReadWriteLock的读,而乐观读在一个线程写共享变量时,不会被阻塞,乐观读是不加锁的

构造方法

public StampedLock() {}

应用

使用StampedLock实现读(悲观锁)写锁:

private final static StampedLock lock =new StampedLock();
private final static List<Long> DATA = new ArrayList<>();
public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(10);
    Runnable readTask =()->{
        while (true){
            read();
        }
    };
    Runnable writeTask =()->{
        while (true){
            write();
        }
    };
    executorService.submit(readTask);
    executorService.submit(readTask);
    executorService.submit(writeTask);
}
private static void read(){
    long stamped =-1;
    try {
        stamped = lock.readLock();
        Optional.of(              DATA.stream().map(String::valueOf).collect(Collectors.joining("#","R-","")))
                .ifPresent(System.out::println);
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        lock.unlockRead(stamped);
    }
}
private static void write(){
    long stamped =-1;
    try {
        stamped = lock.writeLock();
        DATA.add(System.currentTimeMillis());
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        lock.unlockWrite(stamped);
    }
}

乐观读(重写读方法)

    private static void read(){
        long stamp = lock.tryOptimisticRead();
        if (lock.validate(stamp)){
            try {
                stamp = lock.readLock();
                Optional.of(
                        DATA.stream().map(String::valueOf).collect(Collectors.joining("#","R-","")))
                        .ifPresent(System.out::println);
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlockRead(stamp);
            }
        }
    }

参考

相关文章

网友评论

      本文标题:Java并发——Lock

      本文链接:https://www.haomeiwen.com/subject/vtxarktx.html