美文网首页
实现一个阻塞的List

实现一个阻塞的List

作者: 内沐 | 来源:发表于2021-06-30 17:55 被阅读0次

在并发编程中,阻塞队列非常重要,所谓的阻塞队列就是支持两个附加操作的队列.这两个附加操作是阻塞的插入与移除.阻塞的插入是指当队列满的之后,后续的插入线程会被阻塞直到队列不满,阻塞的移除是指当队列为空时后续的移除线程会被阻塞直接队列非空.

这里面就需要用到线程间的等待/通知,通常有两种实现,一种是synchronized,wait(),noftify()结合使用,一种是Lock,Condition结合使用.比如:ArrayBlockingQueue就是使用的第二种方式. 这种更灵活些.

那借助阻塞队列的思想也可以实现一个阻塞的list.

下面会通过两种方式实现,最后通过生产者与消费者这个场景来进行测试.

第一种:

 package concurrent.aqs;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 实现阻塞的arrylist
 *
 * 1.通过 synchronized, wait(),notify()实现等待通知模型
 * 2. 通过 lock, condition 实现等待通知模型
 *
 */
public class BlockList<T> {


    private static int size = 3;
    private List<T> list;

    public BlockList() {
        if (size <= 0)
            throw new IllegalArgumentException();

        list = new ArrayList(size);
    }

    public void put(T t) {
        if (null == t) {
            throw new NullPointerException();
        }
        synchronized (BlockList.class) {
            while (size == list.size()) {
                try {
                    System.out.println("list已满,thread:"+Thread.currentThread().getName());
                    BlockList.class.wait();
                } catch (InterruptedException e) {
                    System.out.println("put error !!!");
                }
            }
            list.add(t);
            //通知
            BlockList.class.notify();
        }

    }


    public T get() {

        T t;
        synchronized (BlockList.class) {
            while (0 == list.size()) {
                try {
                    System.out.println("list为空,thread:"+Thread.currentThread().getName());
                    BlockList.class.wait();
                } catch (InterruptedException e) {
                    System.out.println("get error!!!");
                }
            }

            t = list.remove(0);

            //通知
            BlockList.class.notify();
        }

        return t;
    }


    public static void main(String[] args) throws InterruptedException {

        BlockList blockList = new BlockList();

        Thread produce = new Thread(() -> {

            for (;;){
                int i = 1;
                blockList.put(i);
                System.out.println("生产者生产:"+ i);

                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }

        },"生产者");

        Thread customer =  new Thread(() -> {

           for (;;){
               Object o = blockList.get();
               System.out.println("消费者得到:" + o);
               try {
                   Thread.sleep(500);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }

           }

        },"消费者");

        Thread monitor =  new Thread(() -> {
            for (; ; ) {

                System.out.println("queue 长度:[" + blockList.list.size() + "] 当前线程为:" + Thread.currentThread().getName());
                try {
                    Thread.sleep(300);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }, "监控线程");


        produce.start();
        customer.start();
        monitor.start();

        //阻止主线程先执行完毕,可加可不加
        produce.join();
    }
}

第二种:

package concurrent.aqs;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class BlockListByLock<T>{

    private static int size = 10;
    private List<T> list;

    private Lock lock ;

    private Condition fullCondition ;
    private Condition emptyCondition;

    public BlockListByLock() {

        if (size <= 0)
            throw new IllegalArgumentException();

        list = new ArrayList<>(size);
        lock = new ReentrantLock();
        fullCondition = lock.newCondition();
        emptyCondition = lock.newCondition();
    }

    public void put(T t) throws InterruptedException {
        if (null == t) {
            throw new NullPointerException();
        }

        lock.lockInterruptibly();
     try {
            while (size == list.size()) {
                fullCondition.await();
            }

            list.add(t);

            emptyCondition.signal();
        }finally {

            //释放锁
            lock.unlock();
        }
    }


    public T get() throws InterruptedException {

        T t;
        lock.lockInterruptibly();
        try {
            while (0 == list.size()) {
                emptyCondition.await();
            }

            t = list.remove(0);

            fullCondition.signal();
        }finally {

            lock.unlock();
        }
        return t;
    }


    public static void main(String[] args) throws InterruptedException {

        BlockListByLock blockList = new BlockListByLock();

        Thread produce = new Thread(() -> {

            for (;;){
                int i = 1;
                try {
                    blockList.put(i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("生产者生产:"+ i);

                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }

        },"生产者");

        Thread customer =  new Thread(() -> {

            for (;;){
                Object o = null;
                try {
                    o = blockList.get();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费者得到:" + o);
                try {
                    Thread.sleep(300);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }

        },"消费者");

        Thread monitor =  new Thread(() -> {
            for (; ; ) {

                System.out.println("queue 长度:[" + blockList.list.size() + "] 当前线程为:" + Thread.currentThread().getName());
                try {
                    Thread.sleep(300);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }, "监控线程");


        produce.start();
        customer.start();
        monitor.start();
   //阻止主线程先执行完毕,可加可不加
        produce.join();
    }

}

END

相关文章

  • 实现一个阻塞的List

    在并发编程中,阻塞队列非常重要,所谓的阻塞队列就是支持两个附加操作的队列.这两个附加操作是阻塞的插入与移除.阻塞的...

  • php redis list

    list add list 查找 list 删除 list 修改 list 阻塞操作

  • 常见的5种数据类型

    string list 插入有序 可重复 同时可以实现栈、队列、数组及阻塞单播队列 hash set sorted...

  • 阻塞队列(一)(BlockingQueue)

    阻塞队列概要 阻塞队列与我们平常接触的普通队列(list)最大的不同点,在于阻塞队列支持阻塞添加和阻塞删除方法。 ...

  • 挂面09

    1.使用list队列/wait()/notify()实现生产者消费者 其中一种是通过阻塞队列(BlockingQu...

  • 多线程之非阻塞队列

    ConcurrentLinkedQueue 相对于阻塞队列加锁实现阻塞,非阻塞队列采用无锁CAS的方式来实现。

  • Java中List和ArrayList的区别

    List是一个接口,而ArrayList是List接口的一个实现类。 ArrayList类继承并实现了List接口...

  • Redis List

    在 redis 可以把 list 用作 栈、队列、阻塞队列

  • 阻塞队列--ArrayBlockingQueue

    什么是阻塞队列----阻塞队列概述 ArrayBlockingQueue是一个用数组实现的有界阻塞队列,按先进先出...

  • Java IO

    Before IO 分为:同步、异步阻塞、非阻塞 同步和异步是目的,阻塞和非阻塞是实现方式。 一个IO操作其实分成...

网友评论

      本文标题:实现一个阻塞的List

      本文链接:https://www.haomeiwen.com/subject/xiwrultx.html