Exchanger(交换机)
交换机(Exchanger)主要用于线程之间数据交换的工具,它提供一个同步点,在这个同步点两个线程可以交换彼此的数据.如果第一个线程先执行exchange
方法,它会等待第二个线程也执行exchange
方法.当两个线程都到达同步点时,这个两个线程就可以交换数据.如下图所示:
例如生活中A和B约定地点进行商品交易,A是买家B是卖家.我们可以使用Exchanger来描述两个人的交易过程.
public class App11 {
public static void main(String[] args) {
//约定交易地点
Exchanger<String> exchanger = new Exchanger<>();
//买家带500块钱去买辣条
Thread buyer = new Thread(new Buyer("500元",exchanger));
//卖家带辣条去卖
Thread seller = new Thread(new Seller("500根辣条",exchanger));
//买家和卖家出发去交易
buyer.start();
seller.start();
}
}
/**
* 买家
*/
class Buyer implements Runnable{
private String amount;
private Exchanger<String> exchanger;
public Buyer(String amount, Exchanger<String> exchanger) {
this.amount = amount;
this.exchanger = exchanger;
}
@Override
public void run() {
try {
System.out.println("买家去交易地点");
//模拟去交易地点耗时
Thread.sleep(2000);
//到了交易地点等卖家
String goods = exchanger.exchange(amount);
//买到东西
System.out.println(String.format("买家买到[%s]回家",goods));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 卖家
*/
class Seller implements Runnable{
private String goods;
private Exchanger<String> exchanger;
public Seller(String goods, Exchanger<String> exchanger) {
this.goods = goods;
this.exchanger = exchanger;
}
@Override
public void run() {
try {
System.out.println("卖家去交易地点");
//模拟去交易地点
Thread.sleep(5000);
//到了交易地点等待买家
String data = exchanger.exchange(goods);
//卖出了东西
System.out.println(String.format("卖家卖了[%s]钱回家",data));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
最后的打印结果如下:
卖家去交易地点
买家去交易地点
买家买到[500根辣条]回家
卖家卖了[500元]钱回家
上面例子中,买家和卖家调用exchange没有设置超时时间.如果当前等待的人一直没有等待到对方,当前的人将会一直等待下去.所以exchange为我们提供了带有超时时间参数的方法,通过超时时间将会通过抛出TimeoutException
中断线程的等待.下面我们模拟买家中途钱被小偷偷了,然后卖家等了时间久了就放弃交易回去了.下面修改买家和卖家的代码如下:
/**
* 买家
*/
class Buyer implements Runnable{
private String amount;
private Exchanger<String> exchanger;
public Buyer(String amount, Exchanger<String> exchanger) {
this.amount = amount;
this.exchanger = exchanger;
}
@Override
public void run() {
try {
System.out.println("买家去交易地点");
//模拟去交易地点耗时
Thread.sleep(2000);
//模拟中途可能遇到小偷钱被偷走
Random random = new Random();
int i = random.nextInt(10);
if (i % 2 == 0){
throw new RuntimeException("买家中途遇见小偷,钱丢了");
}
//到了交易地点等卖家
String goods = exchanger.exchange(amount);
//买到东西
System.out.println(String.format("买家买到[%s]回家",goods));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 卖家
*/
class Seller implements Runnable{
private String goods;
private Exchanger<String> exchanger;
public Seller(String goods, Exchanger<String> exchanger) {
this.goods = goods;
this.exchanger = exchanger;
}
@Override
public void run() {
try {
System.out.println("卖家去交易地点");
//模拟去交易地点
Thread.sleep(5000);
//到了交易地点等待买家
String data = exchanger.exchange(goods,2,TimeUnit.SECONDS);
//卖出了东西
System.out.println(String.format("卖家卖了[%s]钱回家",data));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (TimeoutException e) {
System.out.println("卖家等了太久了,不卖了回家");
e.printStackTrace();
}
}
}
多次执行代码,当买家的钱被小偷偷了,打印的结果语句如下:
卖家去交易地点
买家去交易地点
Exception in thread "Thread-0" java.lang.RuntimeException: 买家中途遇见小偷,钱丢了
at com.buydeem.Buyer.run(App11.java:49)
at java.lang.Thread.run(Thread.java:745)
java.util.concurrent.TimeoutException
卖家等了太久了,不卖了回家
at java.util.concurrent.Exchanger.exchange(Exchanger.java:626)
at com.buydeem.Seller.run(App11.java:80)
at java.lang.Thread.run(Thread.java:745)
Semaphore (信号量)
Semaphore
通常我们叫它信号量
,可以用来控制同时访问特定资源的线程数.它提供acquire
方法用来获取许可,在没有足够的许可之前将调用该方法的线程将一直阻塞知道有可用的许可.同时还提供release
方法用来添加许可,用来释放一个正在阻塞的获取者.
它的使用场景类似于我们平常去窗口买票.如果现在有10个人去买票.可是窗口只有三个.也就是说最多同时容纳三个人同时买票,而其他的人必须等待当前窗口买票的人离开了才能去窗口买票.这个例子中的三个窗口就可以理解为信号量初始的许可个数为3
,而去买票的用户理解为线程
.我们使用下面的代码来描述整个过程:
public class App12 {
public static void main(String[] args) throws InterruptedException {
//创建买票的窗口
Semaphore semaphore = new Semaphore(3);
//创建乘客买票
ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
service.execute(new Passenger("乘客"+i+"号",semaphore));
}
service.shutdown();
}
}
/**
* 乘客
*/
class Passenger implements Runnable{
private String name;
private Semaphore semaphore;
public Passenger(String name, Semaphore semaphore) {
this.name = name;
this.semaphore = semaphore;
}
@Override
public void run() {
boolean success = false;
try {
//等待空余窗口买票
semaphore.acquire();
success = true;
System.out.println(String.format("[%s]开始买票",name));
//模拟当前用户买票耗时
Random random = new Random();
int second = random.nextInt(4) + 1;
Thread.sleep(second * 1000);
//当前用户买票完成
System.out.println(String.format("[%s]买票结束",name));
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
if (success){
semaphore.release();
}
}
}
}
最后的打印结果如下:
[乘客1号]开始买票
[乘客2号]开始买票
[乘客0号]开始买票
[乘客0号]买票结束
[乘客3号]开始买票
[乘客1号]买票结束
[乘客4号]开始买票
[乘客2号]买票结束
[乘客5号]开始买票
[乘客5号]买票结束
[乘客6号]开始买票
[乘客3号]买票结束
[乘客7号]开始买票
[乘客4号]买票结束
[乘客8号]开始买票
[乘客6号]买票结束
[乘客9号]开始买票
[乘客9号]买票结束
[乘客8号]买票结束
[乘客7号]买票结束
acquire,acquireUninterruptibly和tryAcquire
这三个方法都是用来获取许可的,但是它们之间还是有部分不同之处.
方法 | 是否阻塞 | 是否响应中断 |
---|---|---|
acquire | 是 | 是 |
acquireUninterruptibly | 是 | 否 |
tryAcquire | 否 | 否 |
tryAcquire带超时时间 | 是 | 是 |
获取许可的方法基本上就是上面这几种,同时每次获取许可时还可以指定获取的个数.上面的所指的是否会响应中断指的是当线程因为无法获取许可而阻塞,该调用该线程的**interrupt**将会抛出**InterruptedException**
.
relase
该方法用来释放许可,将其返回到信号量.同时该方法也提供了释放许可数量的参数,可以一次释放多个信号量.
公平与非公平
在构建信号量时可以在构造方法中传入true
设置该信号量为公平模式.该公平性是针对获取许可的线程来说的.该模式能保证对于任何调用获取
相关方法的线程FIFO
.需要注意的是可能线程A先于线程B调用了acquire
,但是A线程却在B线程之后到达排序点,这将导致看上去不公平
.还有一点值得注意的就是公平设置对于tryAcquire
非阻塞方法无效.
网友评论