- Semaphore
又称“信号量”,控制多个线程争抢许可;还是通过AQS的形式实现的。
- acquire:获取一个许可,如果没有就等待
- release: 释放一个许可
- availablePermits:方法得到可用的许可数目
典型场景:
1、代码并发处理限流
//使用jdk提供的信号量Semaphore
public class SemaphoreDemo {
public static void main(String[] args) {
SemaphoreDemo semaphoreDemo = new SemaphoreDemo();
int N = 8;
//手牌数量,限制请求数量
Semaphore semaphore = new Semaphore(5);
for (int i = 0; i < N; i++) {
String vipNo = "vip-00" + i;
new Thread(() -> {
try {
semaphore.acquire();
semaphoreDemo.service(vipNo);
semaphore.release();
} catch (Exception e) {
}
}).start();
}
}
public void service(String vipNo) throws InterruptedException {
System.out.println("进入一位" + vipNo);
Thread.sleep(new Random().nextInt(3000));
System.out.println("出去一位" + vipNo);
}
}
- 自定义信号量
信号量基于aqs,此处使用自定义aqs
public class TonyAQS {
//判断一个所得状态或者说拥有者
volatile AtomicReference<Thread> owner = new AtomicReference<>();
//保存正在等待的队列
volatile LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();
volatile AtomicInteger state=new AtomicInteger(0);
//下面是共享资源的占用的逻辑,返回资源的占用情况
public void releaseShared() {
if (tryReleaseShared()) {
Iterator<Thread> iterator=waiters.iterator();
while (iterator.hasNext()) {
LockSupport.unpark(iterator.next());
}
}
}
public boolean tryReleaseShared() {
throw new UnsupportedOperationException();
}
public int tryAcquireShared() {
throw new UnsupportedOperationException();
}
public void acquireShared() {
boolean flag = true;
while (tryAcquireShared()<0) {
if (flag) {
waiters.add(Thread.currentThread());
flag = false;
}else{
LockSupport.park();//while循环,避免伪唤醒
}
}
waiters.remove(Thread.currentThread());
}
public AtomicInteger getState() {
return state;
}
public void setState(AtomicInteger state) {
this.state = state;
}
//下面是独占资源相关的代码
//交给使用者去实现,如果不实现,默认会抛异常
public boolean tryAcquire() {
throw new UnsupportedOperationException();
}
public void acquire() {
boolean flag = true;
while (!tryAcquire()) {
if (flag) {
waiters.add(Thread.currentThread());
flag = false;
}else{
LockSupport.park();//while循环,避免伪唤醒
}
}
waiters.remove(Thread.currentThread());
}
public boolean tryRelease() {
throw new UnsupportedOperationException();
}
public void release() {
if (tryRelease()) {
Iterator<Thread> iterator=waiters.iterator();
while (iterator.hasNext()) {
LockSupport.unpark(iterator.next());
}
}
}
}
//自定义信号量只使用了自定义aqs的共享资源代码
public class TonySemaphore {
//构造函数
public TonySemaphore(int count) {
t.getState().set(count);
}
TonyAQS t=new TonyAQS(){
@Override
public int tryAcquireShared() {//信号量获取,数量-1
for (; ; ) {
int count=getState().get();
int n=count-1;
if (count<=0||n<0) {
return -1;
}
if ( getState().compareAndSet(count,n)) {
return 1;
}
return -1;
}
}
@Override
public boolean tryReleaseShared() {//state+1
return getState().incrementAndGet()>=0;
}
};
public void acquire() {
t.acquireShared();
}
public void release() {
t.releaseShared();
}
}
//使用
public class SemaphoreDemo {
public static void main(String[] args) {
SemaphoreDemo semaphoreDemo = new SemaphoreDemo();
int N = 8;
//手牌数量,限制请求数量--自定义信号量
TonySemaphore semaphore = new TonySemaphore(5);
for (int i = 0; i < N; i++) {
String vipNo = "vip-00" + i;
new Thread(() -> {
try {
semaphore.acquire();
semaphoreDemo.service(vipNo);
semaphore.release();
} catch (Exception e) {
}
}).start();
}
}
public void service(String vipNo) throws InterruptedException {
System.out.println("进入一位" + vipNo);
Thread.sleep(new Random().nextInt(3000));
System.out.println("出去一位" + vipNo);
}
}
- CountDownLatch
java1.5被引入的一个工具类,常被称为:倒计数器;创建对象时,传入指定数值作为线程参与的数量
- await: 方法等待计数器值变为0,在这之前,线程进入等待状态
- countdown: 计数器数值减一,直到为0
使用场景示例:
- 统计线程执行的情况
- 压力测试中,使用countDownLatch实现最大程度的并发处理
- 多个线程之间,相互通信,比如线程异步调用完接口,结果通知
//自定义aqs,因为下面的自定义倒计数器时基于这个aqs的
public class TonyAQS {
//判断一个所得状态或者说拥有者
volatile AtomicReference<Thread> owner = new AtomicReference<>();
//保存正在等待的队列
volatile LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();
volatile AtomicInteger state=new AtomicInteger(0);
//下面是共享资源的占用的逻辑,返回资源的占用情况
public void releaseShared() {
if (tryReleaseShared()) {
Iterator<Thread> iterator=waiters.iterator();
while (iterator.hasNext()) {
LockSupport.unpark(iterator.next());
}
}
}
public boolean tryReleaseShared() {
throw new UnsupportedOperationException();
}
public int tryAcquireShared() {
throw new UnsupportedOperationException();
}
public void acquireShared() {
boolean flag = true;
while (tryAcquireShared()<0) {
System.out.println(waiters.size());
if (flag) {
waiters.add(Thread.currentThread());
flag = false;
}else{
LockSupport.park();//while循环,避免伪唤醒
}
}
waiters.remove(Thread.currentThread());
}
public AtomicInteger getState() {
return state;
}
public void setState(AtomicInteger state) {
this.state = state;
}
//下面是独占资源相关的代码
//交给使用者去实现,如果不实现,默认会抛异常
public boolean tryAcquire() {
throw new UnsupportedOperationException();
}
public void acquire() {
boolean flag = true;
while (!tryAcquire()) {
if (flag) {
waiters.add(Thread.currentThread());
flag = false;
}else{
LockSupport.park();//while循环,避免伪唤醒
}
}
waiters.remove(Thread.currentThread());
}
public boolean tryRelease() {
throw new UnsupportedOperationException();
}
public void release() {
if (tryRelease()) {
Iterator<Thread> iterator=waiters.iterator();
while (iterator.hasNext()) {
LockSupport.unpark(iterator.next());
}
}
}
}
//自定义倒计数器,并使用
public class CDLdemo {
TonyAQS tonyAQS = new TonyAQS() {
@Override
public int tryAcquireShared() {//如果不等于0,代表当前还有线程没准备就绪,则认为需要等待
return this.getState().get() == 0 ? 1 : -1;
}
@Override
public boolean tryReleaseShared() {//如果不等于0,代表当前还有线程没准备就绪,则不会通知继续执行
return this.getState().decrementAndGet() == 0;
}
};
public CDLdemo(int count) {
tonyAQS.setState(new AtomicInteger(count));
}
public void await() {
tonyAQS.acquireShared();
}
public void countDown() {
tonyAQS.releaseShared();
}
public static void main(String[] args) throws InterruptedException {
CDLdemo cdLdemo = new CDLdemo(10);
for (int i = 0; i < 9; i++) {
new Thread(() -> {
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("我是" + Thread.currentThread() + "被执行了");
cdLdemo.countDown();
}).start();
}
//等待两秒,最后线程才启动
cdLdemo.countDown();
cdLdemo.await();
System.out.println("我是最后一个线程了了");
}
}
其实简单说await会进入while,但是不是死循环,因为内部有park。然后每次countDown其实就是总计数建议,当最后一个countDown调用的时候,触发releaseShared内部的tryReleaseShared()的调用通过,而后进入唤醒(unpark)。则之前的park处被唤醒,继续执行,然后正确结束;从而实现等待所有预期都执行完毕才执行后续代码的功能
- CyclicBarrier
1.5加入,又称为"线程栅栏";创建对象时,指定栅栏线程数据。
- await:等指定数量的线程都处于等待状态时,继续执行后续代码
- barrierAction: 线程数量到了指定量之后,自动触发执行指定任务
- 和CounDownLatch重要区别在于,CyclicBarrier对象可多次触发执行
典型场景:
- 数据量比较大时,实现批量插入数据到数据库
- 数据统计,30个线程统计30天数据,全部统计完毕后,执行汇总
//基本使用案例
public class CyclicBarrierTest {
public static void main(String[] args) throws InterruptedException {
ArrayList<String> sqls=new ArrayList<>();
CyclicBarrier barrier=new CyclicBarrier(4,()->{
System.out.println("有四个线程执行了"+Thread.currentThread());
for (String sql:sqls){
// System.out.println(sql);
}
});
for (int i = 0; i < 10; i++) {
new Thread(()->{
try {
sqls.add("data-"+Thread.currentThread());
Thread.sleep(1000L);
barrier.await();//等待屏障打开,只有四个线程都执行到这段代码的时候,才继续往下执行
System.out.println(Thread.currentThread()+"插入完毕");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
Thread.sleep(2000L);
}
}
/**
有四个线程执行了Thread[Thread-3,5,main]
Thread[Thread-3,5,main]插入完毕
有四个线程执行了Thread[Thread-6,5,main]
Thread[Thread-6,5,main]插入完毕
Thread[Thread-5,5,main]插入完毕
Thread[Thread-1,5,main]插入完毕
Thread[Thread-7,5,main]插入完毕
Thread[Thread-2,5,main]插入完毕
Thread[Thread-9,5,main]插入完毕
Thread[Thread-8,5,main]插入完毕
*/
//可知,虽然4个一组,但是每组也是并行的
说明:程序不会结束,每4个线程执行到await则上面的回调执行,总共两次,8个插入完毕,但是还有两个,不够四个,所以这两个永远不会继续向下执行
网友评论