美文网首页
10.信号量、栅栏和倒计数器

10.信号量、栅栏和倒计数器

作者: 强某某 | 来源:发表于2020-03-03 15:13 被阅读0次
  1. Semaphore
    又称“信号量”,控制多个线程争抢许可;还是通过AQS的形式实现的。
  • acquire:获取一个许可,如果没有就等待
  • release: 释放一个许可
  • availablePermits:方法得到可用的许可数目

典型场景:
1、代码并发处理限流

//使用jdk提供的信号量Semaphore
public class SemaphoreDemo {
    public static void main(String[] args) {
        SemaphoreDemo semaphoreDemo = new SemaphoreDemo();
        int N = 8;
        //手牌数量,限制请求数量
        Semaphore semaphore = new Semaphore(5);
        for (int i = 0; i < N; i++) {
            String vipNo = "vip-00" + i;
            new Thread(() -> {
                try {
                    semaphore.acquire();
                    semaphoreDemo.service(vipNo);
                    semaphore.release();
                } catch (Exception e) {

                }
            }).start();
        }
    }

    public void service(String vipNo) throws InterruptedException {
        System.out.println("进入一位" + vipNo);
        Thread.sleep(new Random().nextInt(3000));
        System.out.println("出去一位" + vipNo);
    }
}
  • 自定义信号量

信号量基于aqs,此处使用自定义aqs

public class TonyAQS {
    //判断一个所得状态或者说拥有者
    volatile AtomicReference<Thread> owner = new AtomicReference<>();
    //保存正在等待的队列
    volatile LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();
    volatile AtomicInteger state=new AtomicInteger(0);

    //下面是共享资源的占用的逻辑,返回资源的占用情况
    public void releaseShared() {
        if (tryReleaseShared()) {
            Iterator<Thread> iterator=waiters.iterator();
            while (iterator.hasNext()) {
                LockSupport.unpark(iterator.next());
            }
        }
    }

    public boolean tryReleaseShared() {
        throw new UnsupportedOperationException();
    }


    public int tryAcquireShared() {
        throw new UnsupportedOperationException();
    }
    public void acquireShared() {
        boolean flag = true;
        while (tryAcquireShared()<0) {
            if (flag) {
                waiters.add(Thread.currentThread());
                flag = false;
            }else{
                LockSupport.park();//while循环,避免伪唤醒
            }
        }
        waiters.remove(Thread.currentThread());
    }

    public AtomicInteger getState() {
        return state;
    }

    public void setState(AtomicInteger state) {
        this.state = state;
    }

    //下面是独占资源相关的代码
    //交给使用者去实现,如果不实现,默认会抛异常
    public boolean tryAcquire() {
        throw new UnsupportedOperationException();
    }

    public void acquire() {
        boolean flag = true;
        while (!tryAcquire()) {
            if (flag) {
                waiters.add(Thread.currentThread());
                flag = false;
            }else{
                LockSupport.park();//while循环,避免伪唤醒
            }
        }
        waiters.remove(Thread.currentThread());
    }

    public boolean tryRelease() {
        throw new UnsupportedOperationException();
    }

    public void release() {
        if (tryRelease()) {
            Iterator<Thread> iterator=waiters.iterator();
            while (iterator.hasNext()) {
                LockSupport.unpark(iterator.next());
            }
        }
    }
}


//自定义信号量只使用了自定义aqs的共享资源代码
public class TonySemaphore {

    //构造函数
    public TonySemaphore(int count) {
        t.getState().set(count);
    }
    TonyAQS t=new TonyAQS(){
        @Override
        public int tryAcquireShared() {//信号量获取,数量-1
            for (; ; ) {
                int count=getState().get();
                int n=count-1;
                if (count<=0||n<0) {
                    return -1;
                }
                if ( getState().compareAndSet(count,n)) {
                    return 1;
                }
                return -1;
            }
        }

        @Override
        public boolean tryReleaseShared() {//state+1
            return getState().incrementAndGet()>=0;
        }
    };

    public void acquire() {
        t.acquireShared();
    }

    public void release() {
        t.releaseShared();
    }
}

//使用
public class SemaphoreDemo {
    public static void main(String[] args) {
        SemaphoreDemo semaphoreDemo = new SemaphoreDemo();
        int N = 8;
        //手牌数量,限制请求数量--自定义信号量
        TonySemaphore semaphore = new TonySemaphore(5);
        for (int i = 0; i < N; i++) {
            String vipNo = "vip-00" + i;
            new Thread(() -> {
                try {
                    semaphore.acquire();
                    semaphoreDemo.service(vipNo);
                    semaphore.release();
                } catch (Exception e) {

                }
            }).start();
        }
    }

    public void service(String vipNo) throws InterruptedException {
        System.out.println("进入一位" + vipNo);
        Thread.sleep(new Random().nextInt(3000));
        System.out.println("出去一位" + vipNo);
    }
}
  1. CountDownLatch
    java1.5被引入的一个工具类,常被称为:倒计数器;创建对象时,传入指定数值作为线程参与的数量
  • await: 方法等待计数器值变为0,在这之前,线程进入等待状态
  • countdown: 计数器数值减一,直到为0

使用场景示例:

  • 统计线程执行的情况
  • 压力测试中,使用countDownLatch实现最大程度的并发处理
  • 多个线程之间,相互通信,比如线程异步调用完接口,结果通知
//自定义aqs,因为下面的自定义倒计数器时基于这个aqs的
public class TonyAQS {
    //判断一个所得状态或者说拥有者
    volatile AtomicReference<Thread> owner = new AtomicReference<>();
    //保存正在等待的队列
    volatile LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();
    volatile AtomicInteger state=new AtomicInteger(0);

    //下面是共享资源的占用的逻辑,返回资源的占用情况
    public void releaseShared() {
        if (tryReleaseShared()) {
            Iterator<Thread> iterator=waiters.iterator();
            while (iterator.hasNext()) {
                LockSupport.unpark(iterator.next());
            }
        }
    }

    public boolean tryReleaseShared() {
        throw new UnsupportedOperationException();
    }


    public int tryAcquireShared() {
        throw new UnsupportedOperationException();
    }
    public void acquireShared() {
        boolean flag = true;
        while (tryAcquireShared()<0) {
            System.out.println(waiters.size());
            if (flag) {
                waiters.add(Thread.currentThread());
                flag = false;
            }else{
                LockSupport.park();//while循环,避免伪唤醒
            }
        }
        waiters.remove(Thread.currentThread());
    }

    public AtomicInteger getState() {
        return state;
    }

    public void setState(AtomicInteger state) {
        this.state = state;
    }

    //下面是独占资源相关的代码
    //交给使用者去实现,如果不实现,默认会抛异常
    public boolean tryAcquire() {
        throw new UnsupportedOperationException();
    }

    public void acquire() {
        boolean flag = true;
        while (!tryAcquire()) {
            if (flag) {
                waiters.add(Thread.currentThread());
                flag = false;
            }else{
                LockSupport.park();//while循环,避免伪唤醒
            }
        }
        waiters.remove(Thread.currentThread());
    }

    public boolean tryRelease() {
        throw new UnsupportedOperationException();
    }

    public void release() {
        if (tryRelease()) {
            Iterator<Thread> iterator=waiters.iterator();
            while (iterator.hasNext()) {
                LockSupport.unpark(iterator.next());
            }
        }
    }
}

//自定义倒计数器,并使用
public class CDLdemo {
    TonyAQS tonyAQS = new TonyAQS() {
        @Override
        public int tryAcquireShared() {//如果不等于0,代表当前还有线程没准备就绪,则认为需要等待
            return this.getState().get() == 0 ? 1 : -1;
        }

        @Override
        public boolean tryReleaseShared() {//如果不等于0,代表当前还有线程没准备就绪,则不会通知继续执行
            return this.getState().decrementAndGet() == 0;
        }
    };

    public CDLdemo(int count) {
        tonyAQS.setState(new AtomicInteger(count));
    }

    public void await() {
        tonyAQS.acquireShared();
    }

    public void countDown() {
        tonyAQS.releaseShared();
    }

    public static void main(String[] args) throws InterruptedException {
        CDLdemo cdLdemo = new CDLdemo(10);
        for (int i = 0; i < 9; i++) {
            new Thread(() -> {
                try {
                    Thread.sleep(2000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("我是" + Thread.currentThread() + "被执行了");
                cdLdemo.countDown();
            }).start();
        }
        //等待两秒,最后线程才启动
        cdLdemo.countDown();
        cdLdemo.await();
        System.out.println("我是最后一个线程了了");
    }
}

其实简单说await会进入while,但是不是死循环,因为内部有park。然后每次countDown其实就是总计数建议,当最后一个countDown调用的时候,触发releaseShared内部的tryReleaseShared()的调用通过,而后进入唤醒(unpark)。则之前的park处被唤醒,继续执行,然后正确结束;从而实现等待所有预期都执行完毕才执行后续代码的功能

  1. CyclicBarrier
    1.5加入,又称为"线程栅栏";创建对象时,指定栅栏线程数据。
  • await:等指定数量的线程都处于等待状态时,继续执行后续代码
  • barrierAction: 线程数量到了指定量之后,自动触发执行指定任务
  • 和CounDownLatch重要区别在于,CyclicBarrier对象可多次触发执行

典型场景:

  • 数据量比较大时,实现批量插入数据到数据库
  • 数据统计,30个线程统计30天数据,全部统计完毕后,执行汇总
//基本使用案例
public class CyclicBarrierTest {
    public static void main(String[] args) throws InterruptedException {
        ArrayList<String> sqls=new ArrayList<>();
        CyclicBarrier barrier=new CyclicBarrier(4,()->{
            System.out.println("有四个线程执行了"+Thread.currentThread());
            for (String sql:sqls){
                // System.out.println(sql);
            }
        });

        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                try {
                    sqls.add("data-"+Thread.currentThread());
                    Thread.sleep(1000L);
                    barrier.await();//等待屏障打开,只有四个线程都执行到这段代码的时候,才继续往下执行
                    System.out.println(Thread.currentThread()+"插入完毕");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
        Thread.sleep(2000L);
    }
}
/**
有四个线程执行了Thread[Thread-3,5,main]
Thread[Thread-3,5,main]插入完毕
有四个线程执行了Thread[Thread-6,5,main]
Thread[Thread-6,5,main]插入完毕
Thread[Thread-5,5,main]插入完毕
Thread[Thread-1,5,main]插入完毕
Thread[Thread-7,5,main]插入完毕
Thread[Thread-2,5,main]插入完毕
Thread[Thread-9,5,main]插入完毕
Thread[Thread-8,5,main]插入完毕
*/
//可知,虽然4个一组,但是每组也是并行的

说明:程序不会结束,每4个线程执行到await则上面的回调执行,总共两次,8个插入完毕,但是还有两个,不够四个,所以这两个永远不会继续向下执行

相关文章

  • 10.信号量、栅栏和倒计数器

    Semaphore又称“信号量”,控制多个线程争抢许可;还是通过AQS的形式实现的。 acquire:获取一个许可...

  • 1.3.1.2 信号量和栅栏和倒计数器

    Semaphore 又称“信号量”,控制多个线程争抢许可。 acquire: 获取一个许可,如果没有就等待。 re...

  • 理解semaphore

    Semaphore(信号量) 信号量可以简单的概括为:一个计数器,一个等待队列,三个方法。在信号量模型里,计数器和...

  • Smaphor(信号量)实现限流器

    Semaphore信号量 在管程被提出来之前用的是信号量。 信号量模型 一个计数器,一个等待队列,三个方法。计数器...

  • iOS底层--GCD应用

    手动目录GCD简介信号量信号量的理解信号量的使用信号量的代码实操信号量阻塞哪里的任务?栅栏函数dispatch_b...

  • iOS中信号量在网络请求中的使用

    信号量介绍 信号量就是一个资源计数器,对信号量有两个操作来达到互斥,分别是P和V操作。 我们平常说的加锁其实就是信...

  • 第十章 Posix信号量

    知乎讨论 通俗的描述信号量,是这么个东西:每个信号量拥有一个计数器,sem_post()可以使该计数器+1(V操作...

  • 多线程的运用

    同步串行队列 同步并发队列 异步串行队列 异步并发队列 队列组 栅栏 队列组和栅栏的组合 信号量 死锁主线程 分析...

  • GCD的几种使用方法

    1.栅栏方法: 2.快速迭代 3.信号量

  • redisson 应用(三)

    Lock 锁 Semaphore 信号量 CountDownLatch 栅栏 远程服务调用 Redisson 提供...

网友评论

      本文标题:10.信号量、栅栏和倒计数器

      本文链接:https://www.haomeiwen.com/subject/fhhhlhtx.html