线程不安全的表现
- 操作共享数据,导致数据错误(非原子操作)
- i++ (Java 多线程原理内有详细代码例子)
- if-then-do 从以下结果的代码来看,相同的数字被加入
map
了不止一次。
class Main {
private static final Map<Integer, Integer> map = new HashMap<>();
public static void main(String[] args) {
for (int i = 0; i < 1000; i++) {
new Thread(Main::putARandomNumberIntoMap).start();
}
}
public static void putARandomNumberIntoMap() {
int n = new Random().nextInt(10);
if (!map.containsKey(n)) {
map.put(n, n);
System.out.println("put " + n);
}
}
}
put 8 put 1 put 9 put 0 put 5 put 0 put 6 put 6 put 2 put 4 put 5 put 4 put 0 put 3 put 7 put 6 put 1 put 5
- 死锁
- 重现死锁
以下代码是一个死锁的例子。首先先对synchronized
关键词作出解释:synchronized
能够保证同一时刻最多只有一个线程执行该段代码。它就像一把锁,只有获得了某项资源,该线程才能执行synchronized
所包的代码块。
在代码中,Thread1
获得了lock1
后,睡了500毫秒,然后申请获得lock2
。而Thread1
获得lock2
后,只睡了100毫秒,申请lock1
。于是Thread1
在等待被Thread2
占用的lock2
,同样的Thread2
在等待被Thread1
占用的lock1
,无限等待,造成了死锁。
class Main {
private static final Object lock1 = new Object();
private static final Object lock2 = new Object();
public static void main(String[] args) {
new Thread1().start();
new Thread2().start();
}
static class Thread1 extends Thread {
@Override
public void run() {
synchronized (lock1) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lock2) {
}
}
}
}
static class Thread2 extends Thread {
@Override
public void run() {
synchronized (lock2) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lock1) {
}
}
}
}
}
- 死锁问题的排查
使用jps
命令查看 java 所有进程,可以得到相应的进程编号。
使用jstack 编号
查看进程状态。
线程安全
实现线程安全的基本手段
- synchronized 同步块
- synchronized(⼀个对象),把这个对象当成锁。
class Main {
private static final Object lock = new Object();
static int i = 0;
public static void main(String[] args) {
for (int j = 0; j < 1000; j++) {
new Thread(Main::addOne).start();
}
}
static void addOne() {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lock) {
i++;
System.out.println(i);
}
}
}
- Static synchronized⽅法,把Class对象当成锁。 如下例子中的实例
main
。这里简单介绍下什么是Class。Class则用于存放每个类的类型信息,可以简单的理解为类的说明书。
static synchronized void addOne() {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
i++;
System.out.println(i);
}
- 实例的synchronnized⽅法把该实例当成锁。
class Main {
private int i = 0;
public static void main(String[] args) {
final Main main = new Main();
for (int j = 0; j < 1000; j++) {
new Thread(main::addOne).start();
}
}
synchronized void addOne() {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
i++;
System.out.println(i);
}
}
- Collections.synchronized
- JUC包
- ConcurrentHashMap
任何使⽤HashMap有线程安全问题的地⽅,都⽆脑地使⽤ConcurrentHashMap
替换即可。 - AtomicInteger
对于Integer的操作将变成原子的。同样的还有AtomicBoolean
和AtomicLong
。
class Main {
private final AtomicInteger i = new AtomicInteger(0);
public static void main(String[] args) {
final Main main = new Main();
for (int j = 0; j < 1000; j++) {
new Thread(main::addOne).start();
}
}
public void addOne() {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(i.addAndGet(1));
}
}
- ReentrantLock
首先解释什么是可重入锁。aaa拿到锁后执行bbb,发现需要同一把锁。可重入锁就是同一把锁你可以重新进入一次。
private synchronized void aaa {
bbb()
}
private synchronized void bbb { }
下面是ReentrantLock
官方示例:
class X {
private final ReentrantLock lock = new ReentrantLock();
public void m() {
lock.lock(); // block until condition holds
try {
// ... method body
} finally {
lock.unlock()
}
}
}
Object类里的线程方法
为什么Java中的所有对象都可以成为锁?Java从⼀开始就把线程作为语⾔特性,提供语⾔级的⽀持。Object.wait()
/notify()
/notifyAll()
⽅法,实现了我们自由地调度线程(synchronized
称为同步,这三个则实现协同)。总的来说:wait()使线程停止运行,notify()使停止运行的线程继续运行。再举一个直观的例子:
• 线程:⼀个⼯⼈
• 代码:⼀份说明书
• 同步对象:⼀个印章
• wait():拿到印章,把⾃⼰的名字加⼊等待队列,然后把印章
放回去
• notify():拿到印章,挑⼀个等待队列的⼈通知⼀下
• notifyAll():拿到印章,把等待队列的⼈全通知⼀下
- Object.wait()
作用是使当前执行代码的线程进行等待,并且wait()所在的代码处停止执行,直到被notify换新。在调用wait()之前,线程必须获得该对象的锁,因此只能在同步方法/同步代码块中调用wait()方法。即:
private void aaa() {
sysynchronized (lock) {
lock.wait()
}
}
- Object.notify()
作用是,如果有多个线程等待,那么JVM随机挑选出一个wait的线程,对其发出通知notify(),并等待notify()方法的线程释放锁。notify()也要在同步块中调用。 - Object.notifyAll()
作用是唤醒所有线程,让所有线程竞争锁。
三种方法实现多线程的经典问题——⽣产者/消费者模型
具体需求是这样的:开启两个线程,生产者生产一个数字,消费者消费这个数字。生产者必须等待消费者消费完这一个数字才能生产,消费者必须等待生产者生产出一个数字才能消费。也就是说放数字的容器内最多只有一个待消费的数字。
- wait/notify/notifyAll
可以参考Object.wait
文档里的写法
synchronized (obj) {
while (condition does not hold)
obj.wait();
... // Perform action appropriate to condition
}
容器:我们选择Optional
,Optional 类是一个可以为null的容器对象。如果值存在则isPresent()方法会返回true,调用get()方法会返回该对象。
生产者:如果容器内还有值,则等待。反之,则生产一个数字,加入容器,并唤醒其他线程。
消费者:如果容器为空,则等待。反之,消费一个数字,并唤醒其他线程。
class Main {
public static void main(String[] args) {
final Container container = new Container();
final Object lock = new Object();
new Producer(container, lock).start();
new Consumer(container, lock).start();
}
static class Container {
private Optional<Integer> value = Optional.empty();
public Optional<Integer> getValue() {
return value;
}
public void setValue(Optional<Integer> value) {
this.value = value;
}
}
static class Producer extends Thread {
private final Container container;
private final Object lock;
Producer(Container container, Object lock) {
this.container = container;
this.lock = lock;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
synchronized (lock) {
while (container.getValue().isPresent()) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
int randomNumber = new Random().nextInt();
System.out.println("producing:" + randomNumber);
container.setValue(Optional.of(randomNumber));
lock.notify();
}
}
}
}
static class Consumer extends Thread {
private final Container container;
private final Object lock;
public Consumer(Container container, Object lock) {
this.container = container;
this.lock = lock;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
synchronized (lock) {
while (!container.value.isPresent()) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("consuming:" + container.getValue().get());
container.setValue(Optional.empty());
lock.notify();
}
}
}
}
}
- Lock/Condition
Condition
在管程模型中代表的就是等待的条件。Condition
在Lock
的基础上使用,在原来Lock
的基础上实现了可以基于多种条件来让线程实现同步的效果增加了多个条件后,我们可以更有针对性,也更灵活的协调多种条件下的线程协调。同样的,我们参考Condition
文档中给的例子(由于篇幅原因,这里就不给出了),来完成我们的代码。
class Main {
public static void main(String[] args) {
final ReentrantLock lock = new ReentrantLock();
final Container container = new Container(lock);
new Producer(container, lock).start();
new Consumer(container, lock).start();
}
static class Container {
private final Condition notConsumeYet;
private final Condition notProduceYet;
private Optional<Integer> value = Optional.empty();
public Container(ReentrantLock lock) {
this.notConsumeYet = lock.newCondition();
this.notProduceYet = lock.newCondition();
}
public Condition getNotConsumeYet() {
return notConsumeYet;
}
public Condition getNotProduceYet() {
return notProduceYet;
}
public Optional<Integer> getValue() {
return value;
}
public void setValue(Optional<Integer> value) {
this.value = value;
}
}
static class Producer extends Thread {
private final Container container;
private final ReentrantLock lock;
Producer(Container container, ReentrantLock lock) {
this.container = container;
this.lock = lock;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
lock.lock();
try {
while (container.getValue().isPresent()) {
try {
container.getNotConsumeYet().await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
int randomNumber = new Random().nextInt();
System.out.println("producing:" + randomNumber);
container.setValue(Optional.of(randomNumber));
container.getNotProduceYet().signal();
} finally {
lock.unlock();
}
}
}
}
static class Consumer extends Thread {
private final Container container;
private final ReentrantLock lock;
public Consumer(Container container, ReentrantLock lock) {
this.container = container;
this.lock = lock;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
lock.lock();
try {
while (!container.value.isPresent()) {
try {
container.getNotProduceYet().await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("consuming:" + container.getValue().get());
container.setValue(Optional.empty());
container.getNotConsumeYet().signal();
} finally {
lock.unlock();
}
}
}
}
}
- BlockingQueue
java.util.concurrent.BlockingQueue
接口(Queue的子接口),它的主要作用是作为线程同步的工具。下面代码中加入signal
的原因是为了防止连续输出两个producing xxx
。如果没有控制信号,则会立刻开始下一次循环,再打印一次producing xxx
。
class Main {
public static void main(String[] args) {
LinkedBlockingDeque<Integer> blockingQueue = new LinkedBlockingDeque<>(1);
LinkedBlockingDeque<Integer> signal = new LinkedBlockingDeque<>(1);
new Producer(blockingQueue, signal).start();
new Consumer(blockingQueue, signal).start();
}
static class Producer extends Thread {
private final BlockingQueue<Integer> queue;
private final BlockingQueue<Integer> signal;
public Producer(BlockingQueue<Integer> queue, BlockingQueue<Integer> signal) {
this.queue = queue;
this.signal = signal;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
final int number = new Random().nextInt();
System.out.println("producing:" + number);
queue.put(number);
signal.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
static class Consumer extends Thread {
private final BlockingQueue<Integer> queue;
private final BlockingQueue<Integer> signal;
public Consumer(BlockingQueue<Integer> queue, BlockingQueue<Integer> signal) {
this.queue = queue;
this.signal = signal;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
System.out.println("consuming:" + queue.take());
signal.put(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
线程池
线程池就是预先定义好的若⼲个线程。线程是昂贵的(Java线程模型的缺陷),线程池的出现实现了线程的重用。java中的线程池是由Executors创建的,Executors是一个工厂、工具方法。
data:image/s3,"s3://crabby-images/5531d/5531d9ae6b268d9a26eb906fa068141d43f2b211" alt=""
我们用
Executors.newFixedThreadPool(int)
创建了一个线程池,以下是线程池提供的一些方法。注意到submit
方法的参数有Runnable
和Callable<T>
两种。Runnable
最大的问题就是它没有返回值,那么我们就必须操作共享变量,这是一个很烦人的事情。不同的是,Callable
可以有<T>
类型的返回值,也可以抛出异常。 值得注意的是,这两种submit
方法返回的都是Future
类型的值。Future
表示的是异步返回的结果,根据他的名字,我们也可以理解为“未来返回的结果”。data:image/s3,"s3://crabby-images/ca535/ca53565f4bbaab3915267f8b2502d29780c65f47" alt=""
以下是一个简单的线程池使用示例和运行结果:
class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
final ExecutorService threadPool = Executors.newFixedThreadPool(10);
final Future<Integer> future1 = threadPool.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return 1;
}
});
final Future<String> future2 = threadPool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return "Ben";
}
});
final Future<Object> future3 = threadPool.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
throw new RuntimeException();
}
});
System.out.println(future1.get());
System.out.println(future2.get());
System.out.println(future3.get());
}
}
data:image/s3,"s3://crabby-images/2288e/2288e9d59c088cb3b13f04973c607b36f7da243d" alt=""
当然我们也可以写成Lamda表达式:
final Future<Integer> future1 = threadPool.submit(() -> 1);
final Future<String> future2 = threadPool.submit(() -> "Ben");
final Future<Object> future3 = threadPool.submit(() -> { throw new RuntimeException(); });
练习
特别地,下面是我练习多线程的两个程序:
- 多线程练习程序——统计文件中各单词数量。
- 多线程实战——200行代码实现多线程网络聊天室
网友评论