1. Lock接口
JDK 1.5之前,synchronized几乎是使用锁的唯一选择。JDK 1.5之后,Doug Lea贡献了java并发工具包java.util.concurrent。工具包中提供了一个接口: java.util.concurrent.locks.Lock,以及一些接口的实现类(如:可重入锁、读写锁等等)。
2. Lock VS synchronized
synchronized是Java语言内置的关键字,调用native方法实现。而Lock是一个接口,这个接口的实现类在Java代码层面实现了锁的功能。
形象地说,synchronized关键字是自动档,可以满足一切日常驾驶需求。但是如果你想要玩漂移或者各种骚操作,就需要手动档了——各种Lock的实现类。
synchronized 代码简洁。调用时不用关心锁的获取与释放。
Lock:操作灵活可控。适合使用Lock的场景:1) 获取锁的过程可以被中断 2) 获取锁需要设置超时时间 3) 需要尝试获取锁:不管成功与否,立即返回结果 4) 需要使用共享锁(限定同时获得锁的线程数量)(如:读锁)
3. Lock的核心方法
Lock这里简单说明一下
- void lock
获取锁。如果锁被其它线程占用。则当前线程自旋等待或休眠。直到锁被获取到。 - void lockInterruptibly
获取锁。如果锁被其它线程占用。则当前线程自旋等待或休眠。直到锁被获取到或当前线程被中断。 - boolean tryLock
尝试获取锁:不管成功与否,立即返回结果 - boolean tryLock(Long,TimeUnit)
尝试获取锁:如果成功立即返回;在给能时间内:如果失败,则当前线程自旋等待或休眠,直到获取到锁;如果超过给定时间仍未能得到锁,则返回失败。 - unlock
释放锁。 - newCondition
创建一个新的条件。用于控制多线程之间,基于条件的等待与通知。
AQS有两个核心结构:同步队列、等待队列。一个Condition对应一个等待队列。相关问题会在AQS章节会详细说明。
Lock经典用法
Lock l = ...;
l.lock();
try {
// access the resource protected by this lock
} finally {
l.unlock();
}}
unlock() 操作放到finally是为了发生异常时能够释放锁
4. ReentrantLock 可重入锁
可重入锁又叫递归锁。所谓可重入即允许同一个线程多次获取同一把锁。比如一个递归函数里有加锁操作,它不会回阻塞自己。因为这个原因可重入锁也叫做递归锁。
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
public class TestReentrantLock {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Staff Staff = new Staff(lock);
for (int i = 0; i < 5; i++)
new Thread(() -> {
try {
Staff.method1();
Staff.method2();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
class Staff {
private final ReentrantLock lock;
public Staff(ReentrantLock lock) {
this.lock = lock;
}
public void method1() throws InterruptedException {
Random random = new Random();
TimeUnit.MILLISECONDS.sleep(random.nextInt(10));
lock.lock(); // block until condition holds
try {
System.out.println(Thread.currentThread().getName() + "正在执行method1");
} finally {
lock.unlock();
}
}
public void method2() throws InterruptedException {
lock.lock(); // block until condition holds
try {
System.out.println(Thread.currentThread().getName() + "正在执行method2");
} finally {
lock.unlock();
}
}
}
执行结果
Thread-1正在执行method1
Thread-1正在执行method2
Thread-2正在执行method1
Thread-2正在执行method2
Thread-0正在执行method1
Thread-0正在执行method2
Thread-3正在执行method1
Thread-3正在执行method2
Thread-4正在执行method1
Thread-4正在执行method2
ReentrantLock可以看做是用Java代码实现了synchronized关键字,并进行了扩展。见本文1.1. Lock VS synchronized章节。
上面的示例同时展示了偏向锁的概念。见各种锁的概念 四、synchronized锁升级:偏向锁 → 轻量级锁 → 重量级锁章节
ReentrantLock和synchronized关键字,都是排它锁/互斥锁。
4.1. 核心方法
ReentrantLock通过上图可以发现:ReentrantLock实现了Lock接口,并额外实现了一些同步队列、等待队列相关的方法。ReentrantLock的核心方法主要通过成员属性Sync sync同步器实现。Sync继承自AQS。
4.2. 成员类及成员属性
成员类Sync:ReentrantLock同步控制的基础。继承自抽象类AQS。
成员类NonfairSync、FairSync:是Sync的子类。字面直译,一个是非公平同步器、一个是公平同步器。实现公平锁、非公平锁相关功能。
成员属性Sync sync :可以是公平的(FairSync),也可以是非公平的(NonfairSync),构造时确定。关于同步器AQS章节会详细介绍。
ReentrantLock有两个构造函数
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
通过源码可以发现:ReentrantLock默认是非公平锁。通过设定构造参数fair=true,可创建公平锁。关于公平锁、非公平锁:如果在时间上,先对锁进行获取的请求,一定先被满足,这个锁就是公平的,不满足,就是非公平的。非公平的效率一般来讲更高。
5. ReadWriteLock接口、ReentrantReadWriteLock
- ReadWriteLock接口:同一时刻允许多个读线程同时访问,但是写线程访问的时候,所有的读和写都被阻塞。
- ReentrantReadWriteLock实现了ReadWriteLock接口。
使用示例:模拟多线程读写商品库存
public interface IStockService {
/**
* 获取库存
* @return
*/
ProductStock getStock();
/**
* 减少库存
* @param number
*/
void reduceStock(int number);
}
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class StockServiceImpl implements IStockService {
private ProductStock productStock;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock getLock = lock.readLock();//读锁
private final Lock setLock = lock.writeLock();//写锁
public StockServiceImpl(ProductStock productStock) {
this.productStock = productStock;
}
@Override
public ProductStock getStock() {
getLock.lock();
try {
TimeUnit.MILLISECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
getLock.unlock();
return this.productStock;
}
}
@Override
public void reduceStock(int number) {
setLock.lock();
try {
TimeUnit.MILLISECONDS.sleep(5);
System.out.println(Thread.currentThread().getName()+" 写库存前:"+productStock.toString());
productStock.reduceStock(number);
System.out.println(Thread.currentThread().getName()+" 减少数量:"+number+",写库存后:"+productStock.toString());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
setLock.unlock();
}
}
}
public class ProductStock {
private final String productCode;
private double amount;//库存数量
private final String unit;//单位
public ProductStock(String productCode, int amount, String unit) {
this.productCode = productCode;
this.amount = amount;
this.unit = unit;
}
public String getProductCode(){return productCode;}
public double getAmount() {
return amount;
}
public String getUnit() {
return unit;
}
public void reduceStock(double sellNumber){
this.amount -= sellNumber;
}
@Override
public String toString(){
return String.format("商品名称[%s],商品数量[%s %s]",productCode,amount,unit);
}
}
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class TestReadWriteLock {
static final int readWriteRatio = 10;//读写线程的比例
static final int writeThreadNum = 3;//写线程数量
//读操作
private static class QueryThread implements Runnable {
private IStockService IStockService;
public QueryThread(IStockService IStockService) {
this.IStockService = IStockService;
}
@Override
public void run() {
long start = System.currentTimeMillis();
ProductStock stock = IStockService.getStock();
System.out.println(Thread.currentThread().getName()+" 读取库存信息:" + stock.toString() + "读取耗时:"
+ (System.currentTimeMillis() - start) + "ms");
}
}
//写操做
private static class SaleThread implements Runnable {
private IStockService IStockService;
public SaleThread(IStockService IStockService) {
this.IStockService = IStockService;
}
@Override
public void run() {
long start = System.currentTimeMillis();
Random r = new Random();
IStockService.reduceStock(r.nextInt(10));
System.out.println(Thread.currentThread().getName()
+ " 写入耗时:" + (System.currentTimeMillis() - start) + "ms");
}
}
public static void main(String[] args) throws InterruptedException {
ProductStock productStock = new ProductStock("飞模", 100000, "PC");
IStockService IStockService = new StockServiceImpl(productStock);/*new UseSyn(productStock);*/
for (int i = 0; i < writeThreadNum; i++) {
Thread saleThread = new Thread(new SaleThread(IStockService));
for (int j = 0; j < readWriteRatio; j++) {
Thread getT = new Thread(new QueryThread(IStockService));
getT.start();
}
TimeUnit.MILLISECONDS.sleep(100);
saleThread.start();
}
}
}
执行结果
Thread-2 读取库存信息:商品名称[飞模],商品数量[100000.0 PC]读取耗时:32ms
Thread-7 读取库存信息:商品名称[飞模],商品数量[100000.0 PC]读取耗时:33ms
Thread-10 读取库存信息:商品名称[飞模],商品数量[100000.0 PC]读取耗时:31ms
Thread-1 读取库存信息:商品名称[飞模],商品数量[100000.0 PC]读取耗时:33ms
Thread-5 读取库存信息:商品名称[飞模],商品数量[100000.0 PC]读取耗时:33ms
Thread-9 读取库存信息:商品名称[飞模],商品数量[100000.0 PC]读取耗时:32ms
Thread-4 读取库存信息:商品名称[飞模],商品数量[100000.0 PC]读取耗时:32ms
Thread-8 读取库存信息:商品名称[飞模],商品数量[100000.0 PC]读取耗时:32ms
Thread-3 读取库存信息:商品名称[飞模],商品数量[100000.0 PC]读取耗时:32ms
Thread-6 读取库存信息:商品名称[飞模],商品数量[100000.0 PC]读取耗时:32ms
Thread-13 读取库存信息:商品名称[飞模],商品数量[100000.0 PC]读取耗时:7ms
Thread-14 读取库存信息:商品名称[飞模],商品数量[100000.0 PC]读取耗时:6ms
Thread-12 读取库存信息:商品名称[飞模],商品数量[100000.0 PC]读取耗时:7ms
Thread-15 读取库存信息:商品名称[飞模],商品数量[100000.0 PC]读取耗时:6ms
Thread-0 写库存前:商品名称[飞模],商品数量[100000.0 PC]
Thread-0 减少数量:9,写库存后:商品名称[飞模],商品数量[99991.0 PC]
Thread-0 写入耗时:13ms
Thread-21 读取库存信息:商品名称[飞模],商品数量[99991.0 PC]读取耗时:18ms
Thread-20 读取库存信息:商品名称[飞模],商品数量[99991.0 PC]读取耗时:18ms
Thread-16 读取库存信息:商品名称[飞模],商品数量[99991.0 PC]读取耗时:18ms
Thread-19 读取库存信息:商品名称[飞模],商品数量[99991.0 PC]读取耗时:18ms
Thread-17 读取库存信息:商品名称[飞模],商品数量[99991.0 PC]读取耗时:18ms
Thread-18 读取库存信息:商品名称[飞模],商品数量[99991.0 PC]读取耗时:18ms
Thread-11 写库存前:商品名称[飞模],商品数量[99991.0 PC]
Thread-11 减少数量:3,写库存后:商品名称[飞模],商品数量[99988.0 PC]
Thread-11 写入耗时:6ms
Thread-23 读取库存信息:商品名称[飞模],商品数量[99988.0 PC]读取耗时:12ms
Thread-26 读取库存信息:商品名称[飞模],商品数量[99988.0 PC]读取耗时:12ms
Thread-32 读取库存信息:商品名称[飞模],商品数量[99988.0 PC]读取耗时:11ms
Thread-27 读取库存信息:商品名称[飞模],商品数量[99988.0 PC]读取耗时:12ms
Thread-25 读取库存信息:商品名称[飞模],商品数量[99988.0 PC]读取耗时:12ms
Thread-24 读取库存信息:商品名称[飞模],商品数量[99988.0 PC]读取耗时:12ms
Thread-29 读取库存信息:商品名称[飞模],商品数量[99988.0 PC]读取耗时:11ms
Thread-28 读取库存信息:商品名称[飞模],商品数量[99988.0 PC]读取耗时:12ms
Thread-30 读取库存信息:商品名称[飞模],商品数量[99988.0 PC]读取耗时:11ms
Thread-31 读取库存信息:商品名称[飞模],商品数量[99988.0 PC]读取耗时:11ms
Thread-22 写库存前:商品名称[飞模],商品数量[99988.0 PC]
Thread-22 减少数量:1,写库存后:商品名称[飞模],商品数量[99987.0 PC]
Thread-22 写入耗时:6ms
6. Condition 接口
Condition接口每个condition对象都包含着一个等待队列,实现等待/通知等关键功能。下文AQS章节会详细介绍。Lock接口中的newCondition()方法:声明一个新的条件和等待队列。线程可以在Condition上等待和被通知。
6.1.栗子:用ReentrantLock演示Condition的用法:实现线程间等待通知。
拿前文JAVA并发编程(二)线程的基本用法
5.3章节的wait/notify栗子进行改造
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class FlightCondition {
private String origin;
private String destination;
private String location;
private Lock lock = new ReentrantLock();
private Condition ferryConditon = lock.newCondition();
private Condition pickUpCondition = lock.newCondition();
private static List<String> demoCityList = new LinkedList<>();
private static List<Integer> demoDisList = new LinkedList<>();
static {
demoCityList.add("厦门");
demoCityList.add("南昌");
demoCityList.add("济南");
demoCityList.add("天津");
demoDisList.add(3000);
demoDisList.add(2000);
demoDisList.add(1000);
demoDisList.add(0);
}
public void changeLocation(int i) throws InterruptedException {
lock.lock();
try {
location = demoCityList.get(i);
System.out.println("当前位置变化为:" + location);
Thread.currentThread().sleep(100);
ferryConditon.signal();
pickUpCondition.signal();
} finally {
lock.unlock();
}
}
/**
* 乱写的计算剩余航程的DEMO方法
*
* @return
*/
private int getRemainDistance() {
if (demoCityList.contains(location)) {
int i = demoCityList.indexOf(location);
return demoDisList.get(i);
} else {
return 3000;
}
}
public FlightCondition(String origin, String destination) {
this.location = origin;
this.origin = origin;
this.destination = destination;
}
public void pickUpWait() throws InterruptedException {
lock.lock();
try {
System.out.println("接机线程[" + Thread.currentThread().getId() + "]判断是否等待");
Thread.currentThread().sleep(1000);
while (!Thread.currentThread().isInterrupted() && getRemainDistance() > 1000) {
//wait 会释放锁
System.out.println("接机线程[" + Thread.currentThread().getId()
+ "]在等待.");
pickUpCondition.await();
System.out.println("接机线程[" + Thread.currentThread().getId()
+ "]被唤醒,重新判断剩余航程是否大于1000");
System.out.println("剩余航程:" + getRemainDistance());
Thread.currentThread().sleep(1000);
}
System.out.println("剩余航程:" + getRemainDistance() + ",接机人员已就位.");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
}
public void ferryWait() throws InterruptedException {
lock.lock();
try {
System.out.println("摆渡车线程[" + Thread.currentThread().getId() + "]判断是否等待");
Thread.currentThread().sleep(1000);
while (!Thread.currentThread().isInterrupted() && !location.equals(this.destination)) {
//wait 会释放锁
System.out.println("摆渡车线程[" + Thread.currentThread().getId()
+ "]在等待.");
ferryConditon.await();
System.out.println("摆渡车线程[" + Thread.currentThread().getId()
+ "]被唤醒,重新判断当前城市与目的地是否相同.");
System.out.println("当前城市:" + location + ",目的城市:" + destination);
Thread.currentThread().sleep(1000);
}
System.out.println("当前城市:" + this.location + ",摆渡车已就位");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
}
}
执行结果
摆渡车线程[19]判断是否等待
摆渡车线程[19]在等待.
接机线程[20]判断是否等待
接机线程[20]在等待.
当前位置变化为:厦门
摆渡车线程[19]被唤醒,重新判断当前城市与目的地是否相同.
当前城市:厦门,目的城市:天津
摆渡车线程[19]在等待.
接机线程[20]被唤醒,重新判断剩余航程是否大于1000
剩余航程:3000
接机线程[20]在等待.
当前位置变化为:南昌
摆渡车线程[19]被唤醒,重新判断当前城市与目的地是否相同.
当前城市:南昌,目的城市:天津
摆渡车线程[19]在等待.
接机线程[20]被唤醒,重新判断剩余航程是否大于1000
剩余航程:2000
接机线程[20]在等待.
当前位置变化为:济南
摆渡车线程[19]被唤醒,重新判断当前城市与目的地是否相同.
当前城市:济南,目的城市:天津
摆渡车线程[19]在等待.
接机线程[20]被唤醒,重新判断剩余航程是否大于1000
剩余航程:1000
剩余航程:1000,接机人员已就位.
当前位置变化为:天津
摆渡车线程[19]被唤醒,重新判断当前城市与目的地是否相同.
当前城市:天津,目的城市:天津
当前城市:天津,摆渡车已就位
需要注意的是:一般情况下,实现线程等待通知使用wait()和notifyAll()方法,而不用notify()方法。或者ConditionObject实现的await()、signal()、signalAll()方法。
这是因为:
1.使用原生关键字synchronized,代码中无从得知有多少种类型的线程。不同类型的线程获取对象的锁之后,判定是否可执行的条件并不相同。如接机线程、摆渡车线程,一个是判断城市、一个判断行距。如果仅通知一个:极端情况下,notify的都是不符合执行条件的线程,而这些线程又马上进入阻塞状态。符合执行条件的线程永远不会被唤醒。故需要通知所有在这个对象资源上等待的线程。
2.Condition是代码可控的条件。如我们可以声明一个接机的Condition、一个摆渡车的Condition。两个Condition下分别对应一个等待队列。当位置变化时,我们分别通知接机Condition和摆渡车Condition等待队列中第一个线程。这样就能够保障符合执行条件的线程能够被唤醒。优雅地实现基于多个条件的等待与通知操作。
7. LockSupport工具
LockSupport定义了一组以park开头的方法来阻塞当前线程,unpark来唤醒被阻塞的线程。
它是构建同步组件的基础工具。这里不展开说。
LockSupport
网友评论