美文网首页
从生产者-消费者模式开始说java锁

从生产者-消费者模式开始说java锁

作者: ExceptionIO | 来源:发表于2018-11-16 13:31 被阅读0次

一.生产者-消费者模式

一个生产数据到队列中,一个从队列中消费数据,这便是生产者-消费者模式。这里面涉及到两个问题:当队列已满时,生产者需要暂停工作,等待消费者进行消费;当队列为空时,消费者需要暂停工作,等待生产者进行生产。上述问题可以通过多种方式来解决,但都多少与阻塞、锁相关。比如以下方式:

1.通过BlockingQueue方式:

使用BlockingQueue之前,需要先了解它的几个方法:

add/remove:当队列已满/为空时,若执行该方法,会抛出异常;

offer/poll:当队列已满/为空时,若执行该方法,会返回false;

put/take:当队列已满/为空时,若执行该方法,当前线程会进入阻塞状态。

实现代码如下:

public class Producer implements Runnable {

public BlockingQueue<Integer> queue;

public Producer(BlockingQueue<Integer> queue)

{

    this.queue=queue;

}

@Override

public void run() {

    while(true) {

    Integer num = (int) (Math.random() * 10);

    try {

        queue.put(num);

        System.out.println("produce:"+num);

    } catch (InterruptedException e) {

        e.printStackTrace();

        }

    }

  }

}

public class Consumer implements Runnable {

public BlockingQueue<Integer> queue;

public Consumer(BlockingQueue<Integer> queue)

{

    this.queue=queue;

}

@Override

public void run() {

    while(true) {

    try {

        Integer num=queue.take();

        System.out.println("consumer:"+num);

      } catch (InterruptedException e) {

          e.printStackTrace();

      }

    }

  }

}

public static void main(String []args) throws Exception {

        BlockingQueue<Integer> bq=new LinkedBlockingDeque<>(1);

        ExecutorService service=Executors.newCachedThreadPool();

        service.execute(new Producer(bq));

        service.execute(new Consumer(bq));

}

2.通过wait/notify实现:

wait/notify(notifyall)是Object的方法,用于等待或者唤起其他线程,它们经常配对使用。wait会释放它所获得的锁,而Sleep不会释放。wait/notify必须在同步代码块内执行,不然会抛出异常!

Consumer:

synchronize(list)

{

    if(list.size()>value){

    list.wait();

  }

  list.add(......);//生产

  list.notifyall();

}

Producer:

synchronize(list)

{

if(list.size()==0){

list.wait();

}

list.remove(......);//消费

list.notifyall();

}

顺便说一下sleep,yield和join方法,这三个是Thread类的方法,sleep是让本线程进入休眠状态,不会释放自己所获得的锁。yield是本线程回到就绪状态,让其他同等优先级的线程获得被运行的机会(但也有可能自己马上被执行),yield不会释放自己获得的锁。join是主线程阻塞,直到调用join方法的线程执行完毕。

[图片上传失败...(image-2be36c-1542346245447)]

3.通过await/signal实现:

await/signal(signalall)是Condition接口的方法,作用等同于wait/notify。Lock提供了方法可以产生一个Condition对象。await/signal经常是和Lock配对使用,它比方法2中介绍的synchronize灵活很多,只是在这个模型中没有体现出来。

private Lock lock=new ReentrantLock();

private Condition condition=lock.newCondition();

Consumer:

lock=lock.lock();

try{

if(list.size()>value){

condition.await();

}

list.add(...);

condition.signal();

catch(Exception e){

...

}finally{

lock.unlock();

}

Producer:

lock=lock.lock();

try{

if(list.size()==0){

condition.await();

}

list.remove(...);

condition.signal();

catch(Exception e){

...

}finally{

lock.unlock();

}

二.Synchronized和Lock

二者是在多线程场景中经常使用到的,对于底层方法也需要加深了解。

1.synchronized关键字

该关键字可以用在多个地方,但对应的锁也是不同的:

1)普通同步方法,锁是当前实例对象

2)静态同步方法,锁是当前类的class对象

当synchronized关键字用于修饰方法时,通过javap反编译可以看到,方法的入口及出口新增了一个标记:ACC_SYNCHRONIZED。

3)同步方法块,锁是括号里面的对象

当synchronized关键字用于修饰方法块时,通过javap反编译可以看到,方法的入口及出口新增了两条指令:monitorenter,monitorexit。这两条指令保证了该方法只能同时被一个线程访问。

何为monitor?与一切皆对象一样,所有的Java对象是天生的Monitor,每一个Java对象都有成为Monitor的潜质,因为在Java的设计中 ,每一个Java对象天生就带了一把看不见的锁(保存在对象头中),它叫做内部锁或者Monitor锁。每次执行同步方法时,都会先尝试去获取这个锁,获取到了的话,计数加1.如果没有获取到,就进行等待。此后,获取到锁的线程每执行一次monitorexit,这把锁的计数就会减1,直到变成0。这个时候,其他线程就可以重新获得锁。

从上面的细节可以看到,正因为计数加1,所以synchronized是一个可重入锁;释放锁之后,其他线程随机获取这把锁,因此,synchronized是一个非公平锁(并不是按照申请顺序来获得,而是随机分配)。

在早期版本的jdk中,synchronized是一个重量级锁,利用这个关键字进行同步时会有性能损耗,主要是因为底层是使用mutex技术实现,这需要从用户态切换到核心态。但是,后来引入了锁的分级以及锁的消除技术,使得它得到了很大优化。锁的分级,即偏向锁,轻量级锁,自旋锁以及重量级锁,它们是逐步加上去的;锁消除,即jit会对那些不必要加锁的代码进行优化。

当有多个线程读写文件时,读操作和写操作会发生冲突现象,写操作和写操作会发生冲突现象,但是读操作和读操作不会发生冲突现象。采用synchronized关键字来实现同步的话,就会导致一个问题:如果多个线程都只是进行读操作,所以当一个线程在进行读操作时,其他线程只能等待无法进行读操作。因此就需要一种机制来使得多个线程都只是进行读操作时,线程之间不会发生冲突,通过Lock就可以办到。

另外,通过Lock可以知道线程有没有成功获取到锁。这个是synchronized无法办到的。

2.Lock

Lock不是关键字,而是一个接口,常用方式如下:

Lock lock = ...;

lock.lock();

try{

    //处理任务

}catch(Exception ex){

}finally{

    lock.unlock();   //释放锁

}

Lock的本质是AQS,AQS自己维护的队列是当前等待资源的队列,AQS会在被释放后,依次唤醒队列中从前到后的所有节点,使他们对应的线程恢复执行,直到队列为空。

1)ReentrantLock

顾名思义,即可重入锁,同一个线程已经获得该对象的锁,再进去其他需要该锁的方法时,可以直接进入。这个是比较好理解的,就不必扩展讲了。需要注意的是,该锁是一个独占锁,即同一时刻只允许一个线程持有该锁。

2)ReadWriteLock

读写锁。在实际应用场景中,读远大于写,如果还是用上面的锁,就会导致一个问题,同一时刻只有一个线程能进行读和写。但实际上,多个线程同时读是不会影响数据(当然,不能允许有线程在写)。因此,ReadWriteLock应运而生。

在ReadWriteLock中,它维护了两把锁,分别是readLock和writeLock。多个线程可以同时持有readLock,但是只要有一个线程持有writeLock,其他线程尝试获取readLock或者writeLock时都会失败,因为写锁的优先级要高于读锁。因此,ReadWriteLock不是一个独占锁,因为它里面的读锁是允许被多个线程同时持有的。

三.AQS(AbstractQueuedSynchronizer)

AQS是JUC中很多同步组件的构建基础,简单来讲,它内部实现主要是状态变量state和一个FIFO队列来完成,同步队列的头结点是当前获取到同步状态的结点,获取同步状态state失败的线程,会被构造成一个结点(或共享式或独占式)加入到同步队列尾部(采用自旋CAS来保证此操作的线程安全),随后线程会阻塞;释放时唤醒头结点的后继结点,使其加入对同步状态的争夺中。

对于独占锁,比如ReentrantLock,该锁只能被一个线程持有,获取时,会判断该锁的计数,如果为0,才有机会获得;获取锁后,计数加1;释放锁时,计数减1。对于共享锁,比如ReadLock,CountDownLatch,该锁可以被多个线程持有,获取时,会判断该锁的计数,如果大于等于0,才能获得;获取后,计数减1;释放锁时,计数加1。二者在操作思路上刚好相反。

Condition是一个接口,在执行await操作时,会将对应的线程从上述AQS的同步队列中转移到等待队列,即这个线程无法被AQS同步队列中的其他线程唤醒;当对应的Condition执行完signal操作时,对应的线程会被从等待队列转移到同步队列,这个时候该线程就可以通过竞争获取相应的锁。具体应用详见上述生产者-消费者模式3.

相关文章

网友评论

      本文标题:从生产者-消费者模式开始说java锁

      本文链接:https://www.haomeiwen.com/subject/dbcmfqtx.html