十一、并发,线程
多线程,数据错误的原因
-
多处理器计算机能使用寄存器或本地内存缓冲区,来保存内存中的数值,运行在不用处理器上的线程可能会对同一内存取到不同的值A 线程从内存取出值,运算后放到寄存器或本地内存缓冲区,另一处理器的 B 线程,也从内存中取值,取到的是运算前的值,从而导致数据错误
-
编译器可以改变指令顺序来优化,认为内存的值只有代码中有修改的指令时,内存值才会变化,但是内存的值可能被其他线程修改
a=5; b=a+2; c=a+2; 优化成了 b=5+2 c=5+2 但是 a 可能会被其他线程修改
线程,线程中断标志
Runnable r=()->{
int i=20;
while (i>0){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.print(i--+",");
}
};
Thread t=new Thread(r);
// 启动线程
t.start();
// 只是单纯的调用 Thread.run() , 就是一般的函数调用,不会启动新线程
//t.run();
// 获取当前线程对象
Thread currentThread = Thread.currentThread();
// 关于中断线程,每个线程都有一个 boolean 标志中断状态,仅仅是个标志,并不是说标志置为 true 线程就中断
// 程序员应该在线程中,适当的时候时不时的检查这个标志,判断线程是否被中断,做出相应的处理,在线程内主动的结束线程
// 将进程中断标志置为 true
// 当线程被阻塞时(线程内调用 sleep、wait、join 中),
// 在线程上调用 interrupt() 方法,把阻塞标志置为 true后 ,阻塞调用会抛出InterruptedException(比如 sleep() 方法抛出异常,并且会导致阻塞调用sleep()方法终止)
// 而且中断标志会置为 false ,
// 推测其逻辑是:阻塞调用会监视 线程的中断标志,当中断标志为true时,抛出异常并吧中断标志置为false
t.interrupt();
// 如果想要避免阻塞调用导致中断标志设置失败,可以在捕获到异常后,调用 Thread.currentThread().interrupt();
// 注意不要在循环中这样做,一但触发此异常,中断标志通过Thread.currentThread().interrupt()置为 true 又会导致下次循环时,阻塞调用抛出异常
Runnable r2=()->{
//while(true){
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
//}
};
// 静态方法,返回当前线程的状态,并清除中断状态,置为 false
Thread.interrupted();
// 返回线程的中断状态
t.isInterrupted();
线程状态
-
New ( 新创建)
new Thread 后,但是线程还没有运行的时候
-
Runnable ( 可运行)
线程调用 start 方法后,变为此状态
正在运行的时候也为此状态,在抢占式系统中,线程时间片由系统分配,start了之后,实际上线程不一定是处于运行状态还是等待时间片的状态,所以将这种状态称为可运行状态,而不是运行状态
-
Blocked ( 被阻塞)
当线程试图获取一个内部的对象锁,但是没拿到的时候,将进入被阻塞的状态
-
Waiting ( 等待)
当线程等待其他线程通知的时候,进入等待状态
Object.wait、Thread.join、Lock、Condition
-
Timed waiting ( 计时等待)
当线程调用等待的方法,有超时参数的时候,进入计时等待状态
Thread.sleep Thread.join Object.wait Lock.tryLock Condition.await
-
Terminated ( 被终止)
当线程正常运行完,或者因未捕获异常中断时,会进入被终止状态
测试代码
package org.example.并发测试;
import java.util.concurrent.locks.Lock;
public class 线程状态 {
public static void main(String[] args) throws InterruptedException {
//New_Runnable_Terminated();
// Bolcked();
// Waiting();
TimedWaiting();
}
public static void New_Runnable_Terminated() throws InterruptedException {
Runnable r=()->{
int i=2000;
while (i>0){
i--;
}
};
Thread t=new Thread(r);
// 线程刚刚创建还没开始运行,此时状态为 NEW
System.out.println(t.getState()); // NEW
t.start();
// 线程调用start 方法后,线程处于 Runable 状态
// 这里在 t.start() 后立即输出 t.getState ,一般来说 t 还没有运行结束
System.out.println(t.getState()); // RUNNABLE
// 等待 2s ,保证 t 运行结束
Thread.sleep(2000);
// 线程 t 运行结束正常退出,此时线程 t 的状态为 Terminated
System.out.println(t.getState()); // TERMINATED
}
public static void Bolcked() throws InterruptedException {
// 用作锁
String syn="syn";
// 用死循环 把持住锁
new Thread(()->{
synchronized (syn){while (true);}
}).start();
Runnable r=()->{
synchronized (syn){
int i;
}
};
Thread t=new Thread(r);
t.start();
// 等待一段时间让线程启动完成
Thread.sleep(1000);
// 锁一直被死循环线程持有, t 拿不到锁,处于阻塞状态 Blocked
System.out.println(t.getState()); // BLOCKED
}
public static void Waiting() throws InterruptedException {
// 用作锁
String syn="syn";
Runnable r=()->{
synchronized (syn){
try {
syn.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
Thread t=new Thread(r);
t.start();
Thread.sleep(1000);
// 没有 notify ,t 一直处于 wait 中,此时状态为 waiting
System.out.println(t.getState());// WAITING
}
public static void TimedWaiting() throws InterruptedException {
Runnable r=()->{
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
Thread t=new Thread(r);
t.start();
Thread.sleep(1000);
// 此时 t 处于 sleep(5000) 中,在执行带有超时时间参数的方法时,线程状态为 Timed waiting
//
System.out.println(t.getState());// TIMED_WAITING
// 其他带有超时参数的方法
// Thread.sleep
// Thread.join
// Object.wait
// Lock.tryLock
// Condition.await
}
}
线程优先级
Runnable r=()->{
// 让掉当前线程的时间片,可以在重要工作完成后调用此方法,让其他线程有更多的机会
Thread.yield();
};
Thread t=new Thread(r);
// 设置线程优先级,范围是 1最低 到 10 最高 ,必须要在 调用start方法前设置,
t.setPriority(Thread.MAX_PRIORITY); // 10
t.setPriority(Thread.NORM_PRIORITY);// 5
t.setPriority(Thread.MIN_PRIORITY); // 1
t.start();
守护线程
设置线程为守护线程,在其他线程都结束时,守护线程会自动结束,虚拟机退出
不要在守护线程中访问固有资源,如数据库,文件,毕竟不知道什么时候就退出了
可以用来为其他线程提供服务,比如说时不时的检查一个线程是否存在,不存在就创建这个线程
public class 守护线程 {
public static void main(String[] args) {
Runnable r=()->{
while (true);
};
Thread t=new Thread(r);
// 设置线程为守护线程,在其他线程都结束时,守护线程会自动结束,虚拟机退出
// 不要在守护线程中访问固有资源,如数据库,文件,毕竟不知道什么时候就退出了
t.setDaemon(true);
t.start();
// 这段代码到这里就会结束退出,
}
}
线程未捕获异常处理器
可以自定义一个异常处理器,当线程发生未捕获异常时,进行自定义处理
setUncaughtExceptionHandler
Runnable r=()->{
int i=1/0;
};
Thread t=new Thread(r);
// 设置异常处理器
t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
System.out.println(e);
}
});
t.start();
// 为全局的线程 设置一个默认的异常处理器
//Thread.setDefaultUncaughtExceptionHandler(xxx);
同步-锁对象
在需要同步的代码执行前,通过或 Lock.lock 让线程持有锁对象,拿不到则一直去争取,拿到后在执行后续代码
执行结束后,通过 Lock.unlock 释放锁对象,使其他线程有机会拿到锁对象
锁使用完后,一定要 释放,否则其他线程一直请求不到锁,会一直无法继续执行
// 创建一个锁对象
Lock reentrantLock=new ReentrantLock();
// 是否构建一个带有公平策略的锁,公平锁会偏爱等待时间最长的线程,但是会导致性能大大降低,一般不会用到
// new ReentrantLock(true);
reentrantLock.lock();
try{
//做需要保证同步的部分
}finally {
reentrantLock.unlock();
}
同步-条件对象
Condition 是一个条件对象
-
调用 condition.await() 会释放锁,并进入等待状态,加入该条件的等待进程集,等待被唤醒
就是说 我这个线程 把锁释放了,我也先不抢锁了,等别人叫我来抢
-
condition.single() 会唤醒一个等待中的线程
就是把等待别人叫他的线程中的一个,叫起来,告诉它快来抢锁了
-
condition.singleAll() 会唤醒全部等待中的线程
-
condition 的操作 要夹在 lock() 和 unlock() 中间,没有拿到锁怎么释放,一会要释放锁了才叫醒别人
Lock reentrantLock=new ReentrantLock();
Condition condition = reentrantLock.newCondition();
// nums[0] 用来自增 , nums[1] 用来控制自增线程的停止 num[2] 用来统计 线程一被唤醒次数
int[] nums={1,0,0};
new Thread(()->{
reentrantLock.lock();
while (nums[0]<999999){
try {
nums[2]++;
// 这里把锁释放掉了,想当于 unlock() ,
// 等其他线程唤醒后就去抢锁,相当于被唤醒就 lock()
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("过来了");
nums[1]=1;
reentrantLock.unlock();
}).start();
new Thread(()->{
while (true){
if (nums[1]==1){
System.out.println("线程一被唤醒了"+nums[2]+"次");
break;
}
reentrantLock.lock();
nums[0]++;
condition.signal();
reentrantLock.unlock();
}
}).start();
synchronized
synchronized
的用法与 Lock 相似, synchronize 包裹的代码块 对应lock() 与 unlock() 中间 所夹的代码段
wait() 对应 await() , notify 和 notifyAll 对应 single 和 signalAll
synchronized
的用法可以这么理解
在执行到被synchronized
划定范围的代码时,会去查看一个标识
①如果这个标识没有被上锁,将这个标识锁上,待执行完毕后解锁
②如果这个标识被上锁了,则等待解锁
synchronized 的三种应用方式(以什么作为了标识)
实际上,每个对象都有一个内部锁,使用 synchronized 关键字使用的就是这个内部锁。
1.修饰代码块
//括号内的参数应该是一个具体对象
synchronized(Object object){
}
例:
class Timer{
private static int num=0;
public void funSyn1() {
int i;
xxxxx;
synchronized (this) {
//synchronized (object) {
num++;
}
xxxxx;
xxxxx;
}
public void funSyn2() {
int i;
xxxxx;
synchronized (this) {
//synchronized (object) {
num++;
}
xxxxx;
xxxxx;
}
public void funSyn3() {
int i;
xxxxx;
num++;
xxxxx;
xxxxx;
}
}
在执行 synchronized 包围的代码块时,会检查 括号内的 参数对象是否被上锁(也就是将括号内的对象作为标识)
如果括号内是 this
关键字,则检查 调用此方法的类的实例 是否被上锁(也就是将调用该方法的这个此类的实例作为标识)
Timer t1,t2;
不同线程执行 t1.funSyn1 与 t1.funSyn1 与 t1.funSyn2 中synchronized修饰块内代码的时候,会同步(因为它们执行的时候都会先去 检查 t1 有没有锁住)
不同线程执行 t1.funSyn1 与 t2.funSyn1 与 t1.funSyn3 中synchronized修饰块内代码的时候,不会用不(它们执行的时候分别是 检查 t1 与 检查 t2 与 不检查直接执行)
2.修饰类的方法
Class A{
public synchronized void fun(){
xxx;
xxx;
}
}
等效
Class A{
public void fun(){
synchronized(this){
xxx;
xxx;
}
}
}
在修饰类的方法时,相当于将this
作为参数,将整个方法作为代码块,与 上种情况类似
- 修饰静态方法
Class T{
public synchronized static funSyn1(){
}
public synchronized funSyn2(){
}
}
修饰静态方法时,调用该静态方法时,会先检查该类是否上锁(可以理解为 将 类T 作为 标识)(实际上是对 T.class 这个对象的内部锁上锁)
T t1;
不同线程执行 T.funSyn1 与 T.funSyn1 与 t1.funSyn1 时,会同步(因为执行的时候会先检查 类T 与 类T 与 类T 是否上锁)
不同线程执行 t1.funSyn1 与 t1.funSyn2 时,不会同步(因为执行的时候会先检查类T 与 实例 t1 是否上锁)
应该使用哪种锁
Java 核心技术
内部锁和条件存在一些局限。
包括 :
•不能中断一个正在试图获得锁的线程。
•试图获得锁时不能设定超时。
•每个锁仅有单一的条件,可能是不够的。
在代码中应该使用哪一种? Lock 和 Condition 对象还是同步方法?下面是一些建议 :
•最好既不使用 Lock/Condition 也不使用 synchronized 关键字。
在许多情况下你可以使用 java . util.concurrent 包中的一种机制,它会为你处理所有的加锁。
•如果 synchronized 关键字适合你的程序,那么请尽量使用它,这样可以减少编写的代码数量,减少出错的几率。
•如果特别需要 Lock/Condition 结构提供的独有特性时,才使用 Lock/Condition 。
在复杂应用中,尽量考虑使用 lock ,性能更好
悲观锁:
- synchronized
- ReentrantLock
乐观锁:
- ReentrantReadWriteLock
volatile
如果仅仅为读写一个实例域,就使用同步开销较大,此时可以使用volatile
class TestVolatile{
public volatile int i=1;
public static volatile int q=1;
}
被 volatile 修饰,编译器和虚拟机就会知道,对象可能会被其他线程并发更新
volatile int i=1;
int geti;
ini seti=9;
可以理解为
需要读写的时候
读:
synchronized (一个独立的锁){
geti=i;
}
写:
synchronized (一个独立的锁){
i=seti;
}
保证的是读的过程不会被中断,写的过程不会被中断
像 i=-i; 这样的过程是无法保证的
这段代码要
读取 i ,
翻转 i,
把 -i 写入 i
比如在翻转i 之后,可能会有其他线程就会读写 i
final 变量
还有一种情况可以安全地访问一个共享域,即这个域声明为 final 时。
以下声明 :
final Map<String, Double> accounts = new HashKap<>();
其他线程会在构造函数完成构造之后才看到这个 accounts 变量。
如果不使用 final,就不能保证其他线程看到的是 accounts 更新后的值,
它们可能都只是看到 null , 而不是新构造的 HashMap 。
当然,对这个映射表的操作并不是线程安全的。
如果多个线程在读写这个映射表,仍然需要进行同步。
上面这段没明白应该是在什么场景下使用,揣测是下面这个意思
class T {
final Map<String, Double> accounts = new HashKap<>();
}
t=null;
线程一{
t=new T();
}
线程二{
t 不等于 null 的时候, t.accounts 也不等于 null,保证 t 可用的时候,accounts 也一定构建完成可以使用
if(t!=null){
t.accounts.get(xxx)
}
}
T t=new T();
原子性
java 提供了一些类,可以当做 安全的变量来使用,
比如 可以保证 自增并赋值给其他变量的过程不被打断
//Atomiclnteger、AtomicIntegerArray、AtomicIntegerFieldUpdater、AtomicLongArray、
//AtomicLongFieldUpdater 、AtomicReference、AtomicReferenceArray 和 AtomicReferenceFieldUpdater
// 创建一个原子整型对象,可以安全的做一些数值操作,可以视作 一个安全的 Integer 来使用
// 初始为 0
AtomicInteger nextNum= new AtomicInteger();
// 设定一个初始值
nextNum= new AtomicInteger(5);
nextNum.incrementAndGet();
nextNum.getAndIncrement();
nextNum.getAndDecrement();
nextNum.set(6);
// observed 是要与之比较的值
int oldValue,newValue,observed=7;
AtomicInteger largest=new AtomicInteger(3);
// 这段代码的作用,根据 largest 的原值,做出一些运算,将结果赋给 largest
do {
// 先获取到 largest 原来的值
oldValue=largest.get();
// 获取希望赋给 largest 的新值,这里新值和旧值之间是由关联的,所以才采用这种方式,否则直接用 set 就可以了
newValue=Math.max(oldValue,observed);
// 如果这期间没有其他进程改变largest的值, oldValue 和 largest 的相同,则更新 largest 为 newValue,并返回true
// 如果对比出值 不同,则返回 false,进入下次循环,再次尝试更新值
}while (!largest.compareAndSet(oldValue,newValue));
// 在 java SE 8 中,可以通过如下四种方式,效果同上
// 根据 largest 原值,做出一些运算,并更新 largest
// 返回新值
largest.updateAndGet(x->Math.max(x,observed));
largest.accumulateAndGet(observed,Math::max);
// 返回原值
largest.getAndUpdate(x->Math.max(x,observed));
largest.getAndAccumulate(observed,Math::max);
// AtomicXXX 使用的是 cas ,在线程量较大的时候,太多次的重试会导致性能下降
// LongAdder 和 LongAccumulator 可以解决这种问题,
// 它是为每个线程分配一个线程分配一个变量,以供做数值改变操作,在需要取值的时候才统计所有变量
LongAdder longAdder=new LongAdder();
longAdder.add(9);
longAdder.add(-5);
longAdder.increment();
longAdder.decrement();
// 取得值
longAdder.sum();
LongAccumulator longAccumulator = new LongAccumulator(Long::sum,0);
// 注意这里 可以使用 a+b ,a * b
// a-b, a/b ,a*b+1 这样的都不能获得预期的结果,详细参考 LongAccumulator 和 DoubleAccumulator 的参数 DoubleBinaryOperator 和 LongBinaryOperator 这篇
LongAccumulator longAccumulator2 = new LongAccumulator((a,b)->{
return a+b;
},0);
// 传入一个参数并,应用计算规则计算
longAccumulator.accumulate(18);
// 取得值
longAccumulator.get();
// 小数 可以使用这两个类
new DoubleAdder();
new DoubleAccumulator(Double::sum,0);
ThreadLocal
线程局部变量
使用 ThreadLocal 可以为 每个线程提供各自的实例
这样各自线程使用各自的实例,互不冲突,也避免了在线程中每当需要使用的时候就创建实例
// 创建一个 ThreadLocal ,参数是 如何创建一个对象并将其 return 的过程
public static ThreadLocal<Random> randomThreadLocal = ThreadLocal.withInitial(() -> new Random());
// 在需要使用的时候调用此方法获取实例,根据线程获得对象,如果线程中有这个对象直接拿来用,没有的时候根据上面的参数,创建一个并返回
Random random = randomThreadLocal.get();
关于 Random , Java SE 提供了一个便利的类
// public class ThreadLocalRandom extends Random
ThreadLocalRandom current = ThreadLocalRandom.current();
current.nextInt();
测试代码
将两个线程使用的 Random 实例记录下来,最后比较一下
int times=50000;
Object[][] objects=new Object[2][times];
ThreadLocal<Random> randomThreadLocal = ThreadLocal.withInitial(() -> new Random());
new Thread(()->{
for (int i = 0; i < times; i++) {
Random random = randomThreadLocal.get();
objects[0][i]=random;
}
}).start();
new Thread(()->{
for (int i = 0; i < times; i++) {
Random random = randomThreadLocal.get();
objects[1][i]=random;
}
}).start();
Thread.sleep(1000);
for (int i = 0; i < times; i++) {
if (objects[0][i]!=objects[0][0])
System.out.println("存在同一个线程不是一个实例");
}
for (int i = 0; i < times; i++) {
if (objects[1][i]!=objects[1][0])
System.out.println("存在同一个线程不是一个实例");
}
if (objects[1][0]!=objects[0][0])
System.out.println("俩线程不是一个实例");
// 最后只会输出 俩线程不是一个实例,说明 两个线程各自使用一个实例
tryLock
可以尝试获得锁,不成功可以做一些其他事情
Lock lock = new ReentrantLock();
new Thread(()->{
// 尝试获得锁,返回一个 Boolean 值 ,表示是否成功获得锁,没得到锁可以去先做一些其他事情
// 这个方法会抢夺可用的锁,即使该锁有公平加锁策略,即便其他线程已经等待很久也是如此
if (lock.tryLock()){
try {
// 做一些事
}finally {
lock.unlock();
}
}
lock.lock();
}).start();
Thread t=new Thread(()->{
try {
// 可以设置一个超时参数,带有超时参数的tryLock,在等待期间可以通过 t.interrupt() 方法被中断,并抛出 InterruptedException
if (lock.tryLock(1000, TimeUnit.SECONDS)){
try {
// 做一些事
}finally {
lock.unlock();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
t.start();
// 打断tryLock的等待
t.interrupt();
// 想当于一个超时无线的方法
lock.lockInterruptibly();
读写锁 ReentrantReadWriteLock
可以允许读线程共享访问,对于经常读很少写的时候,可以提升性能
ReentrantReadWriteLock lock=new ReentrantReadWriteLock();
// 得到一个用于读的锁,排斥所有写操作
ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
// 得到一个用于写的锁,排斥所有其他的读操作和写操作
ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
int i[]=new int[]{0};
void readSome(){
readLock.lock();
try {
System.out.println(i[0]);
}finally {
readLock.unlock();
}
}
void writeSome(){
writeLock.lock();
try {
i[0]++;
}finally {
writeLock.unlock();
}
}
阻塞队列
多线程问题可以通过队列改变形式,如银行转账问题,可以维护一个队列,将转账指令存入队列,然后由一个单独的线程取出转账指令,访问银行数据,这样只要保证这个队列线程同步即可,而不需要同步银行数据
阻塞队列就是这样一个队列
以下类实现了 BlockingQueue 接口
-
LinkedBlockingQueue
默认容量没有界限,也可以指定容量上限,是一个双端队列
-
ArrayBlockingQueue
构造时需要指定容量,并且有一个可选参数指定是否需要公平性
-
PriorityBlockingQueue
带优先级的队列,插入的元素会按优先级排序,移出的时候按照优先级顺序移出
-
DelayQueue
队列的元素 ,只有在延迟用完的情况下才可以移除,并且带有顺序
-
LinkedTransferQueue
添加元素时允许等待,直到其他线程将元素移除
// interface BlockingQueue<E> extends Queue<E>
// 继承了 Queue 接口,所以Queue 的方法 BlockingQueue 也有,此外还有特有的 put 和 take 方法
BlockingQueue blockingQueue;
blockingQueue =new LinkedBlockingQueue<>();
blockingQueue =new ArrayBlockingQueue<>(20);
blockingQueue =new PriorityBlockingQueue();
// 添加元素,如果队列满,则阻塞
blockingQueue.put(2);
// 移出并返回头元素,如果队列空,则阻塞
blockingQueue.take();
DelayQueue<Delayed> delayQueue = new DelayQueue<>();
// 添加的元素,需要实现 Delayed 接口的 getDelay 和 compareTo 方法
// DelayQueue 内部使用了 PriorityQueue ,因而插入的对象也需要实现 compareTo
delayQueue.add(new Delayed() {
@Override
public long getDelay(TimeUnit unit) {
return 0;
}
@Override
public int compareTo(Delayed o) {
return 0;
}
});
//
TransferQueue linkedTransferQueue = new LinkedTransferQueue<>();
// 添加一个元素,并使线程阻塞直到有线程取出这个元素,
//在其他线程取出(注意是取到,并移出, 使用peek仍会继续阻塞下去)这个元素的时候,直接交给它
linkedTransferQueue.transfer(1);
网友评论