JUC中原子类
JUC中原子类介绍
什么是原子操作?
atomic 翻译成中文是原子的意思。在化学上,我们知道原子是构成一般物质的最小单位,在化学反应中是不可分割的。在我们这里 atomic 是指一个操作是不可中断的。即使是在多个线程一起执行的时候,一个操作一旦开始,就不会被其他线程干扰,所以,所谓原子类说简单点就是具有原子操作特征的类,原子操作类提供了一些修改数据的方法,这些方法都是原子操作的,在多线程情况下可以确保被修改数据的正确性。
JUC中对原子操作提供了强大的支持,这些类位于java.util.concurrent.atomic包中.
JUC中原子类思维导图
基本类型原子类
使用原子的方式更新基本类型
- AtomicInteger:int类型原子类
- AtomicLong:long类型原子类
- AtomicBoolean :boolean类型原子类
上面三个类提供的方法几乎相同,这里以 AtomicInteger 为例子来介绍。
AtomicInteger 类常用方法
public final int get() //获取当前的值
public final int getAndSet(int newValue)//获取当前的值,并设置新的值
public final int getAndIncrement()//获取当前的值,并自增
public final int getAndDecrement() //获取当前的值,并自减
public final int getAndAdd(int delta) //获取当前的值,并加上预期的值
boolean compareAndSet(int expect, int update) //如果输入的数值等于预期值,则以原子方式将该值设置为输入值(update)
public final void lazySet(int newValue)//最终设置为newValue,使用 lazySet 设置之后可能导致其他线程在之后的一小段时间内还是可以读到旧的值。
部分源码
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;
2个关键字段说明:
value:使用volatile修饰,可以确保value在多线程中的可见性。
valueOffset:value属性在AtomicInteger中的偏移量,通过这个偏移量可以快速定位到value字段,这个是实现AtomicInteger的关键。
getAndIncrement源码:
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
内部调用的是Unsafe类中的getAndAddInt方法,我们看一下getAndAddInt源码:
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
说明:
this.getIntVolatile:可以确保从主内存中获取变量最新的值。
compareAndSwapInt:CAS操作,CAS的原理是拿期望的值和原本的值作比较,如果相同则更新成新的值,可以确保在多线程情况下只有一个线程会操作成功,不成功的返回false。
上面有个do-while循环,compareAndSwapInt返回false之后,会再次从主内存中获取变量的值,继续做CAS操作,直到成功为止。
getAndAddInt操作相当于线程安全的count++操作,如同:
synchronize(lock){
count++;
}
count++操作实际上是被拆分为3步骤执行:
- 获取count的值,记做A:A=count
- 将A的值+1,得到B:B = A+1
- 让B赋值给count:count = B
多线程情况下会出现线程安全的问题,导致数据不准确。
synchronize的方式会导致占时无法获取锁的线程处于阻塞状态,性能比较低。CAS的性能比synchronize要快很多。
数组类型原子类介绍
使用原子的方式更新数组里的某个元素,可以确保修改数组中数据的线程安全性。
- AtomicIntegerArray:整形数组原子操作类
- AtomicLongArray:长整形数组原子操作类
- AtomicReferenceArray :引用类型数组原子操作类
上面三个类提供的方法几乎相同,所以我们这里以 AtomicIntegerArray 为例子来介绍。
AtomicIntegerArray 类常用方法
public final int get(int i) //获取 index=i 位置元素的值
public final int getAndSet(int i, int newValue)//返回 index=i 位置的当前的值,并将其设置为新值:newValue
public final int getAndIncrement(int i)//获取 index=i 位置元素的值,并让该位置的元素自增
public final int getAndDecrement(int i) //获取 index=i 位置元素的值,并让该位置的元素自减
public final int getAndAdd(int i, int delta) //获取 index=i 位置元素的值,并加上预期的值
boolean compareAndSet(int i, int expect, int update) //如果输入的数值等于预期值,则以原子方式将 index=i 位置的元素值设置为输入值(update)
public final void lazySet(int i, int newValue)//最终 将index=i 位置的元素设置为newValue,使用 lazySet 设置之后可能导致其他线程在之后的一小段时间内还是可以读到旧的值。
示例
统计网站页面访问量,假设网站有10个页面,现在模拟100个人并行访问每个页面10次,然后将每个页面访问量输出,应该每个页面都是1000次,代码如下:
public class AtomicIntegerArrayTest {
private static AtomicIntegerArray array = new AtomicIntegerArray(10);
private static void request(int page) throws InterruptedException {
//模拟耗时5毫秒
TimeUnit.MILLISECONDS.sleep(5);
array.getAndIncrement(page-1);
}
public static void main(String[] args) throws InterruptedException {
long starTime = System.currentTimeMillis();
ExecutorService threadPool = Executors.newCachedThreadPool();
int userCount = 100;
CountDownLatch latch = new CountDownLatch(100);
for (int i = 0; i < userCount; i++) {
threadPool.execute(() -> {
try {
for (int j = 0; j < 10; j++) {
for (int k = 1; k <= 10; k++) {
AtomicIntegerArrayTest.request(k);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown();
}
});
}
latch.await();
long endTime = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() + ",耗时:" + (endTime - starTime) + ",array=" + array.toString());
threadPool.shutdown();
}
}
输出:
main,耗时:672,array=[1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000, 1000]
引用类型原子类介绍
基本类型原子类只能更新一个变量,如果需要原子更新多个变量,需要使用 引用类型原子类。
- AtomicReference:引用类型原子类
- AtomicStampedRerence:原子更新引用类型里的字段原子类
- AtomicMarkableReference :原子更新带有标记位的引用类型
AtomicReference 和 AtomicInteger 非常类似,不同之处在于 AtomicInteger是对整数的封装,而AtomicReference则是对应普通的对象引用,它可以确保你在修改对象引用时的线程安全性。在介绍AtomicReference的同时,我们先来了解一个有关原子操作逻辑上的不足。
ABA问题
之前我们说过,线程判断被修改对象是否可以正确写入的条件是对象的当前值和期望值是否一致。这个逻辑从一般意义上来说是正确的,但是可能出现一个小小的例外,就是当你获得当前数据后,在准备修改为新值钱,对象的值被其他线程连续修改了两次,而经过这2次修改后,对象的值又恢复为旧值,这样,当前线程就无法正确判断这个对象究竟是否被修改过,这就是所谓的ABA问题,可能会引发一些问题。
举个例子
有一家蛋糕店,为了挽留客户,决定为贵宾卡客户一次性赠送20元,刺激客户充值和消费,但条件是,每一位客户只能被赠送一次,现在我们用AtomicReference来实现这个功能,代码如下:
public class AtomicReferenceTest1 {
//账户原始余额
static int accountMoney = 19;
//用于对账户余额做原子操作
static AtomicReference<Integer> money = new AtomicReference<>(accountMoney);
/**
* 模拟2个线程同时更新后台数据库,为用户充值
*/
static void recharge() {
for (int i = 0; i < 2; i++) {
new Thread(() -> {
for (int j = 0; j < 5; j++) {
Integer m = money.get();
if (m == accountMoney) {
if (money.compareAndSet(m, m + 20)) {
System.out.println("当前余额:" + m + ",充值20元成功,余额:" + money.get() + "元");
}
}
//休眠100ms
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
/**
* 模拟用户消费
*/
static void consume() throws InterruptedException {
for (int i = 0; i < 5; i++) {
Integer m = money.get();
if (m > 20) {
if (money.compareAndSet(m, m - 20)) {
System.out.println("当前余额:" + m + ",成功消费10元,余额:" + money.get() + "元");
}
}
//休眠50ms
TimeUnit.MILLISECONDS.sleep(50);
}
}
public static void main(String[] args) throws InterruptedException {
recharge();
consume();
}
}
输出:
当前余额:19,充值20元成功,余额:39元
当前余额:39,成功消费10元,余额:19元
当前余额:39,成功消费10元,余额:19元
当前余额:19,充值20元成功,余额:19元
当前余额:19,充值20元成功,余额:39元
当前余额:39,成功消费10元,余额:19元
当前余额:19,充值20元成功,余额:39元
从输出中可以看到,这个账户被先后反复多次充值。其原因是账户余额被反复修改,修改后的值和原有的数值19一样,使得CAS操作无法正确判断当前数据是否被修改过(是否被加过20)。虽然这种情况出现的概率不大,但是依然是有可能出现的,因此,当业务上确实可能出现这种情况时,我们必须多加防范。JDK也为我们考虑到了这种情况,使用AtomicStampedReference可以很好地解决这个问题。
AtomicStampedReference内部不仅维护了对象的值,还维护了一个版本号(我们这里把他称为时间戳,实际上它可以使用任何一个整形来表示状态值),当AtomicStampedReference对应的数值被修改时,除了更新数据本身外,还必须要更新版本号。当AtomicStampedReference设置对象值时,对象值及版本号都必须满足期望值,写入才会成功。因此,即使对象值被反复读写,写回原值,只要版本号发生变化,就能防止不恰当的写入。
AtomicStampedReference的几个Api在AtomicStampedReference的基础上新增了有关版本号的信息。
//比较设置,参数依次为:期望值、写入新值、期望版本号、新版本号
public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp);
//获得当前对象引用
public V getReference();
//获得当前版本号
public int getStamp();
//设置当前对象引用和版本号
public void set(V newReference, int newStamp);
AtomicStampedReference内部维护了一个Pair对象存放值,绑定了当前值和版本号。
public AtomicStampedReference(V initialRef, int initialStamp) {
pair = Pair.of(initialRef, initialStamp);
}
private static class Pair<T> {
final T reference;
final int stamp;
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}
private volatile Pair<V> pair;
现在我们使用AtomicStampedRerence来修改一下上面充值的问题,代码如下:
public class AtomicStampedReferenceTest1 {
//账户原始余额
static int accountMoney = 19;
//用于对账户余额做原子操作
static AtomicStampedReference<Integer> money = new AtomicStampedReference<>(accountMoney, 0);
/**
* 模拟2个线程同时更新后台数据库,为用户充值
*/
static void recharge() {
for (int i = 0; i < 2; i++) {
int stamp = money.getStamp();
new Thread(() -> {
for (int j = 0; j < 50; j++) {
Integer m = money.getReference();
if (m == accountMoney) {
if (money.compareAndSet(m, m + 20, stamp, stamp + 1)) {
System.out.println("当前时间戳:" + money.getStamp() + ",当前余额:" + m + ",充值20元成功,余额:" + money.getReference() + "元");
}
}
//休眠100ms
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
/**
* 模拟用户消费
*/
static void consume() throws InterruptedException {
for (int i = 0; i < 50; i++) {
Integer m = money.getReference();
int stamp = money.getStamp();
if (m > 20) {
if (money.compareAndSet(m, m - 20, stamp, stamp + 1)) {
System.out.println("当前时间戳:" + money.getStamp() + ",当前余额:" + m + ",成功消费20元,余额:" + money.getReference() + "元");
}
}
//休眠50ms
TimeUnit.MILLISECONDS.sleep(50);
}
}
public static void main(String[] args) throws InterruptedException {
recharge();
consume();
}
}
输出:
当前时间戳:1,当前余额:19,充值20元成功,余额:39元
当前时间戳:2,当前余额:39,成功消费20元,余额:19元
关于这个时间戳的,在数据库修改数据中也有类似的用法,比如2个编辑同时编辑一篇文章,同时提交,只允许一个用户提交成功,提示另外一个用户:博客已被其他人修改,如何实现呢?
数据库对于乐观锁的ABA问题也是同样的道理,加个版本号或者时间戳解决ABA问题。
博客表:t_blog(id,content,stamp),stamp默认值为0,每次更新+1
A、B 二个编辑同时对一篇文章进行编辑,stamp都为0,当点击提交的时候,将stamp和id作为条件更新博客内容,执行的sql如下:
update t_blog set content = 更新的内容,stamp = stamp+1 where id = 博客id and stamp = 0;
这条update会返回影响的行数,只有一个会返回1,表示更新成功,另外一个提交者返回0,表示需要修改的数据已经不满足条件了,被其他用户给修改了。这种修改数据的方式也叫乐观锁。
对象的属性修改原子类介绍
如果需要原子更新某个类里的某个字段时,需要用到对象的属性修改原子类。
- AtomicIntegerFieldUpdater:原子更新整形字段的值
- AtomicLongFieldUpdater:原子更新长整形字段的值
- AtomicReferenceFieldUpdater :原子更新应用类型字段的值
要想原子地更新对象的属性需要两步:
- 第一步,因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须使用静态方法 newUpdater()创建一个更新器,并且需要设置想要更新的类和属性。
- 第二步,更新的对象属性必须使用 public volatile 修饰符。
上面三个类提供的方法几乎相同,所以我们这里以AtomicReferenceFieldUpdater为例子来介绍。
调用AtomicReferenceFieldUpdater静态方法newUpdater创建AtomicReferenceFieldUpdater对象
public static <U, W> AtomicReferenceFieldUpdater<U, W> newUpdater(Class<U> tclass, Class<W> vclass, String fieldName)
说明:
三个参数
tclass:需要操作的字段所在的类
vclass:操作字段的类型
fieldName:字段名称
示例
多线程并发调用一个类的初始化方法,如果未被初始化过,将执行初始化工作,要求只能初始化一次
public class AtomicReferenceFieldUpdaterTest {
static AtomicReferenceFieldUpdaterTest updaterTest = new AtomicReferenceFieldUpdaterTest();
//不能操作static修饰的字段,会报Caused by: java.lang.IllegalArgumentException错误。compareAndSet操作的是对象实例的偏移值字段,static修饰的字段不属于对象实例
//必须被volatile修饰
private volatile Boolean isInit = Boolean.FALSE;
private static AtomicReferenceFieldUpdater referenceFieldUpdater = AtomicReferenceFieldUpdater.
newUpdater(AtomicReferenceFieldUpdaterTest.class, Boolean.class, "isInit");
public static void init() {
if (referenceFieldUpdater.compareAndSet(updaterTest, false, true)) {
System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + ",开始初始化!");
//模拟休眠3秒
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + ",初始化完毕!");
} else {
System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + ",有其他线程已经执行了初始化!");
}
}
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
new Thread(()->{
AtomicReferenceFieldUpdaterTest.init();
}).start();
}
}
}
输出:
1599030805588,Thread-0,开始初始化!
1599030805588,Thread-1,有其他线程已经执行了初始化!
1599030805588,Thread-2,有其他线程已经执行了初始化!
1599030805589,Thread-3,有其他线程已经执行了初始化!
1599030805589,Thread-4,有其他线程已经执行了初始化!
1599030808590,Thread-0,初始化完毕!
说明:
- isInit属性必须要volatille修饰,可以确保变量的可见性
- 可以看出多线程同时执行init()方法,只有一个线程执行了初始化的操作,其他线程跳过了。多个线程同时到达updater.compareAndSet,只有一个会成功。
ThreadLocal、InheritableThreadLocal
使用技巧,可以用static方法包装ThreadLocal的get/set方法,这样就可以直接调用了。也可以在抽象类中定义ThreadLocal,这样所有的继承类也能调用到。
ThreadLocal
线程就相当于一个人一样,每个请求相当于一个任务,任务来了,人来处理,处理完毕之后,再处理下一个请求任务。人身上是不是有很多口袋,人刚开始准备处理任务的时候,我们把任务的编号放在处理者的口袋中,然后处理中一路携带者,处理过程中如果需要用到这个编号,直接从口袋中获取就可以了。那么刚好java中线程设计的时候也考虑到了这些问题,Thread对象中就有很多口袋,用来放东西。Thread类中有这么一个变量:
ThreadLocal.ThreadLocalMap threadLocals = null;
这个就是用来操作Thread中所有口袋的东西,ThreadLocalMap源码中有一个数组(有兴趣的可以去看一下源码),对应处理者身上很多口袋一样,数组中的每个元素对应一个口袋。
如何来操作Thread中的这些口袋呢,java为我们提供了一个类ThreadLocal,ThreadLocal对象用来操作Thread中的某一个口袋,可以向这个口袋中放东西、获取里面的东西、清除里面的东西,这个口袋一次性只能放一个东西,重复放东西会将里面已经存在的东西覆盖掉。
常用的3个方法:
//向Thread中某个口袋中放东西
public void set(T value);
//获取这个口袋中目前放的东西
public T get();
//清空这个口袋中放的东西
public void remove()
ThreadLocal的官方API解释为:
“该类提供了线程局部 (thread-local) 变量。这些变量不同于它们的普通对应物,因为访问某个变量(通过其 get 或 set 方法)的每个线程都有自己的局部变量,它独立于变量的初始化副本。ThreadLocal 实例通常是类中的 private static 字段,它们希望将状态与某一个线程(例如,用户 ID 或事务 ID)相关联。”
InheritableThreadLocal
如果一个线程还做并发处理开启多个线程时,这时候子线程如果也想要父线程保留在口袋里的东西,就要使用InheritableThreadLocal来代替ThreadLocal。
父线程相当于主管,子线程相当于干活的小弟,主管让小弟们干活的时候,将自己兜里面的东西复制一份给小弟们使用,主管兜里面可能有很多牛逼的工具,为了提升小弟们的工作效率,给小弟们都复制一个,丢到小弟们的兜里,然后小弟就可以从自己的兜里拿去这些东西使用了,也可以清空自己兜里面的东西。
Thread对象中有个inheritableThreadLocals变量,代码如下:
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
inheritableThreadLocals相当于线程中另外一种兜,这种兜有什么特征呢,当创建子线程的时候,子线程会将父线程这种类型兜的东西全部复制一份放到自己的inheritableThreadLocals兜中,使用InheritableThreadLocal对象可以操作线程中的inheritableThreadLocals兜。
InheritableThreadLocal常用的方法也有3个:
//向Thread中某个口袋中放东西
public void set(T value);
//获取这个口袋中目前放的东西
public T get();
//清空这个口袋中放的东西
public void remove()
实例:
@Slf4j
public class ThreadLocalTest {
//private static ThreadLocal<String> threadLocal = new ThreadLocal<>();
//
private static InheritableThreadLocal<String> threadLocal = new InheritableThreadLocal<>();
//自定义包含策略
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 5, 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(5),
new DemoThreadFactory("订单创建组"), new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) {
//需要插入的数据
List<String> dataList = new ArrayList<>();
for (int i = 0; i < 3; i++) {
dataList.add("数据" + i);
}
for (int i = 0; i < 5; i++) {
String traceId = String.valueOf(i);
executor.execute(() -> {
threadLocal.set(traceId);
try {
ThreadLocalTest.controller(dataList);
} finally {
threadLocal.remove();
}
});
}
}
//模拟controller
public static void controller(List<String> dataList) {
log.error("接受请求: " + "traceId:" + threadLocal.get());
service(dataList);
}
//模拟service
public static void service(List<String> dataList) {
log.error("执行业务:" + "traceId:" + threadLocal.get());
//dao(dataList);
daoMuti(dataList);
}
//模拟dao
public static void dao(List<String> dataList) {
log.error("执行数据库操作" + "traceId:" + threadLocal.get());
//模拟插入数据
for (String s : dataList) {
log.error("插入数据" + s + "成功" + "traceId:" + threadLocal.get());
}
}
//模拟dao--多线程
public static void daoMuti(List<String> dataList) {
CountDownLatch countDownLatch = new CountDownLatch(dataList.size());
log.error("执行数据库操作" + "traceId:" + threadLocal.get());
String threadName = Thread.currentThread().getName();
//模拟插入数据
for (String s : dataList) {
new Thread(() -> {
try {
//模拟数据库操作耗时100毫秒
TimeUnit.MILLISECONDS.sleep(100);
log.error("插入数据" + s + "成功" + threadName + ",traceId:" + threadLocal.get());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
}).start();
}
//等待上面的dataList处理完毕
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
输出:
17:35:30.465 [From DemoThreadFactory's 订单创建组-Worker-2] ERROR com.self.current.ThreadLocalTest - 接受请求: traceId:1
17:35:30.465 [From DemoThreadFactory's 订单创建组-Worker-1] ERROR com.self.current.ThreadLocalTest - 接受请求: traceId:0
17:35:30.465 [From DemoThreadFactory's 订单创建组-Worker-3] ERROR com.self.current.ThreadLocalTest - 接受请求: traceId:2
17:35:30.471 [From DemoThreadFactory's 订单创建组-Worker-3] ERROR com.self.current.ThreadLocalTest - 执行业务:traceId:2
17:35:30.471 [From DemoThreadFactory's 订单创建组-Worker-1] ERROR com.self.current.ThreadLocalTest - 执行业务:traceId:0
17:35:30.471 [From DemoThreadFactory's 订单创建组-Worker-2] ERROR com.self.current.ThreadLocalTest - 执行业务:traceId:1
17:35:30.471 [From DemoThreadFactory's 订单创建组-Worker-3] ERROR com.self.current.ThreadLocalTest - 执行数据库操作traceId:2
17:35:30.471 [From DemoThreadFactory's 订单创建组-Worker-1] ERROR com.self.current.ThreadLocalTest - 执行数据库操作traceId:0
17:35:30.471 [From DemoThreadFactory's 订单创建组-Worker-2] ERROR com.self.current.ThreadLocalTest - 执行数据库操作traceId:1
17:35:30.574 [Thread-3] ERROR com.self.current.ThreadLocalTest - 插入数据数据2成功From DemoThreadFactory's 订单创建组-Worker-3,traceId:2
17:35:30.574 [Thread-4] ERROR com.self.current.ThreadLocalTest - 插入数据数据0成功From DemoThreadFactory's 订单创建组-Worker-2,traceId:1
17:35:30.574 [Thread-1] ERROR com.self.current.ThreadLocalTest - 插入数据数据0成功From DemoThreadFactory's 订单创建组-Worker-3,traceId:2
17:35:30.574 [Thread-2] ERROR com.self.current.ThreadLocalTest - 插入数据数据1成功From DemoThreadFactory's 订单创建组-Worker-3,traceId:2
17:35:30.574 [From DemoThreadFactory's 订单创建组-Worker-3] ERROR com.self.current.ThreadLocalTest - 接受请求: traceId:3
17:35:30.574 [From DemoThreadFactory's 订单创建组-Worker-3] ERROR com.self.current.ThreadLocalTest - 执行业务:traceId:3
17:35:30.574 [From DemoThreadFactory's 订单创建组-Worker-3] ERROR com.self.current.ThreadLocalTest - 执行数据库操作traceId:3
17:35:30.575 [Thread-9] ERROR com.self.current.ThreadLocalTest - 插入数据数据2成功From DemoThreadFactory's 订单创建组-Worker-1,traceId:0
17:35:30.575 [Thread-8] ERROR com.self.current.ThreadLocalTest - 插入数据数据1成功From DemoThreadFactory's 订单创建组-Worker-1,traceId:0
17:35:30.575 [Thread-7] ERROR com.self.current.ThreadLocalTest - 插入数据数据0成功From DemoThreadFactory's 订单创建组-Worker-1,traceId:0
17:35:30.575 [From DemoThreadFactory's 订单创建组-Worker-1] ERROR com.self.current.ThreadLocalTest - 接受请求: traceId:4
17:35:30.575 [From DemoThreadFactory's 订单创建组-Worker-1] ERROR com.self.current.ThreadLocalTest - 执行业务:traceId:4
17:35:30.575 [From DemoThreadFactory's 订单创建组-Worker-1] ERROR com.self.current.ThreadLocalTest - 执行数据库操作traceId:4
17:35:30.575 [Thread-6] ERROR com.self.current.ThreadLocalTest - 插入数据数据2成功From DemoThreadFactory's 订单创建组-Worker-2,traceId:1
17:35:30.575 [Thread-5] ERROR com.self.current.ThreadLocalTest - 插入数据数据1成功From DemoThreadFactory's 订单创建组-Worker-2,traceId:1
17:35:30.682 [Thread-10] ERROR com.self.current.ThreadLocalTest - 插入数据数据0成功From DemoThreadFactory's 订单创建组-Worker-3,traceId:3
17:35:30.682 [Thread-13] ERROR com.self.current.ThreadLocalTest - 插入数据数据0成功From DemoThreadFactory's 订单创建组-Worker-1,traceId:4
17:35:30.682 [Thread-14] ERROR com.self.current.ThreadLocalTest - 插入数据数据1成功From DemoThreadFactory's 订单创建组-Worker-1,traceId:4
17:35:30.682 [Thread-12] ERROR com.self.current.ThreadLocalTest - 插入数据数据2成功From DemoThreadFactory's 订单创建组-Worker-3,traceId:3
17:35:30.683 [Thread-15] ERROR com.self.current.ThreadLocalTest - 插入数据数据2成功From DemoThreadFactory's 订单创建组-Worker-1,traceId:4
17:35:30.683 [Thread-11] ERROR com.self.current.ThreadLocalTest - 插入数据数据1成功From DemoThreadFactory's 订单创建组-Worker-3,traceId:3
JUC中的阻塞队列
Queue接口
队列是一种先进先出(FIFO)的数据结构,java中用Queue接口来表示队列。
Queue接口中定义了6个方法:
public interface Queue<E> extends Collection<E> {
boolean add(e);
boolean offer(E e);
E remove();
E poll();
E element();
E peek();
}
每个Queue方法都有两种形式:
(1)如果操作失败则抛出异常,
(2)如果操作失败,则返回特殊值(null或false,具体取决于操作),接口的常规结构如下表所示。
操作类型 抛出异常 返回特殊值
插入 add(e) offer(e)
移除 remove() poll()
检查 element() peek()
Queue从Collection继承的add方法插入一个元素,除非它违反了队列的容量限制,在这种情况下它会抛出IllegalStateException;offer方法与add不同之处仅在于它通过返回false来表示插入元素失败。
remove和poll方法都移除并返回队列的头部,确切地移除哪个元素是由具体的实现来决定的,仅当队列为空时,remove和poll方法的行为才有所不同,在这些情况下,remove抛出NoSuchElementException,而poll返回null。
element和peek方法返回队列头部的元素,但不移除,它们之间的差异与remove和poll的方式完全相同,如果队列为空,则element抛出NoSuchElementException,而peek返回null。
队列一般不要插入空元素。
BlockingQueue接口
BlockingQueue位于juc中,熟称阻塞队列, 阻塞队列首先它是一个队列,继承Queue接口,是队列就会遵循先进先出(FIFO)的原则,又因为它是阻塞的,故与普通的队列有两点区别:
- 当一个线程向队列里面添加数据时,如果队列是满的,那么将阻塞该线程,暂停添加数据
- 当一个线程从队列里面取出数据时,如果队列是空的,那么将阻塞该线程,暂停取出数据
BlockingQueue相关方法:
操作类型 抛出异常 返回特殊值 一直阻塞 超时退出
插入 add(e) offer(e) put(e) offer(e,timeuout,unit)
移除 remove() poll() take() poll(timeout,unit)
检查 element() peek() 不支持 不支持
重点,再来解释一下,加深印象:
- 3个可能会有异常的方法,add、remove、element;这3个方法不会阻塞(是说队列满或者空的情况下是否会阻塞);队列满的情况下,add抛出异常;队列为空情况下,remove、element抛出异常
- offer、poll、peek 也不会阻塞(是说队列满或者空的情况下是否会阻塞);队列满的情况下,offer返回false;队列为空的情况下,pool、peek返回null
- 队列满的情况下,调用put方法会导致当前线程阻塞
- 队列为空的情况下,调用take方法会导致当前线程阻塞
- offer(e,timeuout,unit),超时之前,插入成功返回true,否者返回false
- poll(timeout,unit),超时之前,获取到头部元素并将其移除,返回true,否者返回false
BlockingQueue常见的实现类
ArrayBlockingQueue
基于数组的阻塞队列实现,其内部维护一个定长的数组,用于存储队列元素。线程阻塞的实现是通过ReentrantLock来完成的,数据的插入与取出共用同一个锁,因此ArrayBlockingQueue并不能实现生产、消费同时进行。而且在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。
LinkedBlockingQueue
基于单向链表的阻塞队列实现,在初始化LinkedBlockingQueue的时候可以指定大小,也可以不指定,默认类似一个无限大小的容量(Integer.MAX_VALUE),不指队列容量大小也是会有风险的,一旦数据生产速度大于消费速度,系统内存将有可能被消耗殆尽,因此要谨慎操作。另外LinkedBlockingQueue中用于阻塞生产者、消费者的锁是两个(锁分离),因此生产与消费是可以同时进行的。
PriorityBlockingQueue
一个支持优先级排序的无界阻塞队列,进入队列的元素会按照优先级进行排序
SynchronousQueue
同步阻塞队列,SynchronousQueue没有容量,与其他BlockingQueue不同,SynchronousQueue是一个不存储元素的BlockingQueue,每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然
DelayQueue
DelayQueue是一个支持延时获取元素的无界阻塞队列,里面的元素全部都是“可延期”的元素,列头的元素是最先“到期”的元素,如果队列里面没有元素到期,是不能从列头获取元素的,哪怕有元素也不行,也就是说只有在延迟期到时才能够从队列中取元素
LinkedTransferQueue
LinkedTransferQueue是基于链表的FIFO无界阻塞队列,它出现在JDK7中,Doug Lea 大神说LinkedTransferQueue是一个聪明的队列,它是ConcurrentLinkedQueue、SynchronousQueue(公平模式下)、无界的LinkedBlockingQueues等的超集,LinkedTransferQueue包含了ConcurrentLinkedQueue、SynchronousQueue、LinkedBlockingQueues三种队列的功能
ArrayBlockingQueue
有界阻塞队列,内部使用数组存储元素,有2个常用构造方法:
//capacity表示容量大小,默认内部采用非公平锁
public ArrayBlockingQueue(int capacity)
//capacity:容量大小,fair:内部是否是使用公平锁
public ArrayBlockingQueue(int capacity, boolean fair)
注意:ArrayBlockingQueue如果队列容量设置的太小,消费者发送的太快,消费者消费的太慢的情况下,会导致队列空间满,调用put方法会导致发送者线程阻塞,所以注意设置合理的大小,协调好消费者的速度。
LinkedBlockingQueue
内部使用单向链表实现的阻塞队列,3个构造方法:
//默认构造方法,容量大小为Integer.MAX_VALUE
public LinkedBlockingQueue();
//创建指定容量大小的LinkedBlockingQueue
public LinkedBlockingQueue(int capacity);
//容量为Integer.MAX_VALUE,并将传入的集合丢入队列中
public LinkedBlockingQueue(Collection<? extends E> c);
LinkedBlockingQueue的用法和ArrayBlockingQueue类似,建议使用的时候指定容量,如果不指定容量,插入的太快,移除的太慢,可能会产生OOM。
PriorityBlockingQueue
无界的优先级阻塞队列,内部使用数组存储数据,达到容量时,会自动进行扩容,放入的元素会按照优先级进行排序,4个构造方法:
//默认构造方法,默认初始化容量是11
public PriorityBlockingQueue();
//指定队列的初始化容量
public PriorityBlockingQueue(int initialCapacity);
//指定队列的初始化容量和放入元素的比较器
public PriorityBlockingQueue(int initialCapacity,Comparator<? super E> comparator);
//传入集合放入来初始化队列,传入的集合可以实现SortedSet接口或者PriorityQueue接口进行排序,如果没有实现这2个接口,按正常顺序放入队列
public PriorityBlockingQueue(Collection<? extends E> c);
优先级队列放入元素的时候,会进行排序,所以我们需要指定排序规则,有2种方式:
- 创建PriorityBlockingQueue指定比较器Comparator
- 放入的元素需要实现Comparable接口
上面2种方式必须选一个,如果2个都有,则走第一个规则排序。
示例:
public class PriorityBlockingQueueTest {
private static PriorityBlockingQueue<Msg> priorityBlockingQueue = new PriorityBlockingQueue<>();
private static class Msg implements Comparable<Msg>{
private String msg;
private int priority;
public Msg(String msg, int priority) {
this.msg = msg;
this.priority = priority;
}
@Override
public String toString() {
return "Msg{" +
"priority=" + priority +
", msg='" + msg + '\'' +
'}';
}
@Override
public int compareTo(Msg o) {
//return this.priority-o.priority;
return o.priority-this.priority;
//return Integer.compare(this.priority,o.priority);
}
}
public static void putMsg(Msg msg) throws InterruptedException {
priorityBlockingQueue.put(msg);
System.out.println("已推送"+msg);
}
static {
new Thread(() -> {
while (true) {
Msg msg;
try {
long starTime = System.currentTimeMillis();
//获取一条推送消息,此方法会进行阻塞,直到返回结果
msg = priorityBlockingQueue.take();
long endTime = System.currentTimeMillis();
//模拟推送耗时
TimeUnit.MILLISECONDS.sleep(500);
System.out.println(String.format("[%s,%s,take耗时:%s],%s,发送消息:%s", starTime, endTime, (endTime - starTime), Thread.currentThread().getName(), msg));
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 5; i++) {
PriorityBlockingQueueTest.putMsg(new Msg("消息" + i,i));
}
}
}
输出:
已推送Msg{priority=0, msg='消息0'}
已推送Msg{priority=1, msg='消息1'}
已推送Msg{priority=2, msg='消息2'}
已推送Msg{priority=3, msg='消息3'}
已推送Msg{priority=4, msg='消息4'}
[1599459824306,1599459824307,take耗时:1],Thread-0,发送消息:Msg{priority=0, msg='消息0'}
[1599459824829,1599459824829,take耗时:0],Thread-0,发送消息:Msg{priority=4, msg='消息4'}
[1599459825330,1599459825330,take耗时:0],Thread-0,发送消息:Msg{priority=3, msg='消息3'}
[1599459825831,1599459825831,take耗时:0],Thread-0,发送消息:Msg{priority=2, msg='消息2'}
[1599459826331,1599459826331,take耗时:0],Thread-0,发送消息:Msg{priority=1, msg='消息1'}
SynchronousQueue
同步阻塞队列,SynchronousQueue没有容量,与其他BlockingQueue不同,SynchronousQueue是一个不存储元素的BlockingQueue,每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。SynchronousQueue 在现实中用的不多,线程池中有用到过,Executors.newCachedThreadPool()实现中用到了这个队列,当有任务丢入线程池的时候,如果已创建的工作线程都在忙于处理任务,则会新建一个线程来处理丢入队列的任务。
调用queue.put方法向队列中丢入一条数据,调用的时候产生了阻塞,从输出结果中可以看出,直到take方法被调用时,put方法才从阻塞状态恢复正常。
DelayQueue
DelayQueue是一个支持延时获取元素的无界阻塞队列,里面的元素全部都是“可延期”的元素,列头的元素是最先“到期”的元素,如果队列里面没有元素到期,是不能从列头获取元素的,哪怕有元素也不行,也就是说只有在延迟期到时才能够从队列中取元素。
- DelayQueue是一个内部依靠AQS队列同步器所实现的无界延迟阻塞队列。
- 延迟对象需要覆盖 getDelay()与compareTo()方法,并且要注意 getDelay()的时间单位的统一,compareTo()根据业务逻辑进行合理的比较逻辑重写。
- DelayQueue中内聚的重入锁是非公平的。
- DelayQueue是实现定时任务的关键,ScheduledThreadPoolExecutor中就用到了DelayQueue。
示例:
public class DelayQueueTest {
//推送信息封装
static class Msg implements Delayed {
//优先级,越小优先级越高
private int priority;
//推送的信息
private String msg;
//定时发送时间,毫秒格式
private long sendTimeMs;
public Msg(int priority, String msg, long sendTimeMs) {
this.priority = priority;
this.msg = msg;
this.sendTimeMs = sendTimeMs;
}
@Override
public String toString() {
return "Msg{" +
"priority=" + priority +
", msg='" + msg + '\'' +
", sendTimeMs=" + sendTimeMs +
'}';
}
//@Override
//public long getDelay(TimeUnit unit) {
// return unit.convert(this.sendTimeMs - Calendar.getInstance().getTimeInMillis(), TimeUnit.MILLISECONDS);
//}
/**
* 需要实现的接口,获得延迟时间 用过期时间-当前时间
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.sendTimeMs - System.currentTimeMillis() , TimeUnit.MILLISECONDS);
}
/**
* 用于延迟队列内部比较排序 当前时间的延迟时间 - 比较对象的延迟时间
* @param o
* @return
*/
@Override
public int compareTo(Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) -o.getDelay(TimeUnit.MILLISECONDS));
}
}
//推送队列
static DelayQueue<Msg> pushQueue = new DelayQueue<Msg>();
static {
//启动一个线程做真实推送
new Thread(() -> {
while (true) {
Msg msg;
try {
//获取一条推送消息,此方法会进行阻塞,直到返回结果
msg = pushQueue.take();
//此处可以做真实推送
long endTime = System.currentTimeMillis();
System.out.println(String.format("定时发送时间:%s,实际发送时间:%s,发送消息:%s", msg.sendTimeMs, endTime, msg));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
//推送消息,需要发送推送消息的调用该方法,会将推送信息先加入推送队列
public static void pushMsg(int priority, String msg, long sendTimeMs) throws InterruptedException {
pushQueue.put(new Msg(priority, msg, sendTimeMs));
}
public static void main(String[] args) throws InterruptedException {
for (int i = 5; i >= 1; i--) {
String msg = "一起来学java高并发,第" + i + "天";
DelayQueueTest.pushMsg(i, msg, Calendar.getInstance().getTimeInMillis() + i * 2000);
}
}
}
输出:
定时发送时间:1599462287589,实际发送时间:1599462287590,发送消息:Msg{priority=1, msg='一起来学java高并发,第1天', sendTimeMs=1599462287589}
定时发送时间:1599462289589,实际发送时间:1599462289589,发送消息:Msg{priority=2, msg='一起来学java高并发,第2天', sendTimeMs=1599462289589}
定时发送时间:1599462291589,实际发送时间:1599462291590,发送消息:Msg{priority=3, msg='一起来学java高并发,第3天', sendTimeMs=1599462291589}
定时发送时间:1599462293588,实际发送时间:1599462293589,发送消息:Msg{priority=4, msg='一起来学java高并发,第4天', sendTimeMs=1599462293588}
定时发送时间:1599462295571,实际发送时间:1599462295571,发送消息:Msg{priority=5, msg='一起来学java高并发,第5天', sendTimeMs=1599462295571}
LinkedTransferQueue
LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法。
LinkedTransferQueue类继承自AbstractQueue抽象类,并且实现了TransferQueue接口:
public interface TransferQueue<E> extends BlockingQueue<E> {
// 如果存在一个消费者已经等待接收它,则立即传送指定的元素,否则返回false,并且不进入队列。
boolean tryTransfer(E e);
// 如果存在一个消费者已经等待接收它,则立即传送指定的元素,否则等待直到元素被消费者接收。
void transfer(E e) throws InterruptedException;
// 在上述方法的基础上设置超时时间
boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
// 如果至少有一位消费者在等待,则返回true
boolean hasWaitingConsumer();
// 获取所有等待获取元素的消费线程数量
int getWaitingConsumerCount();
}
再看一下上面的这些方法,transfer(E e)方法和SynchronousQueue的put方法类似,都需要等待消费者取走元素,否者一直等待。其他方法和ArrayBlockingQueue、LinkedBlockingQueue中的方法类似。
总结
- 重点需要了解BlockingQueue中的所有方法,以及他们的区别
- 重点掌握ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue的使用场景
- 需要处理的任务有优先级的,使用PriorityBlockingQueue
- 处理的任务需要延时处理的,使用DelayQueue
疑问:
Q:有界阻塞队列和无界阻塞队列的区别?是不是以是否定义队列大小来作为区分,没有定义的就是无界的,有定义的就算是容量(Integer.MAX_VALUE)也是有界的?
JUC中常见的集合
JUC集合框架图
图可以看到,JUC的集合框架也是从Map、List、Set、Queue、Collection等超级接口中继承而来的。所以,大概可以知道JUC下的集合包含了一一些基本操作,并且变得线程安全。
Map
ConcurrentHashMap
功能和HashMap基本一致,内部使用红黑树实现的。
特性:
- 迭代结果和存入顺序不一致
- key和value都不能为空
- 线程安全的
ConcurrentSkipListMap
内部使用跳表实现的,放入的元素会进行排序,排序算法支持2种方式来指定:
- 通过构造方法传入一个Comparator
- 放入的元素实现Comparable接口
上面2种方式必选一个,如果2种都有,走规则1。
特性:
- 迭代结果和存入顺序不一致
- 放入的元素会排序
- key和value都不能为空
- 线程安全的
List
CopyOnWriteArrayList
实现List的接口的,一般我们使用ArrayList、LinkedList、Vector,其中只有Vector是线程安全的,可以使用Collections静态类的synchronizedList方法对ArrayList、LinkedList包装为线程安全的List,不过这些方式在保证线程安全的情况下性能都不高。
CopyOnWriteArrayList是线程安全的List,内部使用数组存储数据,集合中多线程并行操作一般存在4种情况:读读、读写、写写、写读,这个只有在写写操作过程中会导致其他线程阻塞,其他3种情况均不会阻塞,所以读取的效率非常高。
可以看一下这个类的名称:CopyOnWrite,意思是在写入操作的时候,进行一次自我复制,换句话说,当这个List需要修改时,并不修改原有内容(这对于保证当前在读线程的数据一致性非常重要),而是在原有存放数据的数组上产生一个副本,在副本上修改数据,修改完毕之后,用副本替换原来的数组,这样也保证了写操作不会影响读。
特性:
- 迭代结果和存入顺序一致
- 元素不重复
- 元素可以为空
- 线程安全的
- 读读、读写、写读3种情况不会阻塞;写写会阻塞
- 无界的
Set
ConcurrentSkipListSet
有序的Set,内部基于ConcurrentSkipListMap实现的,放入的元素会进行排序,排序算法支持2种方式来指定:
- 通过构造方法传入一个Comparator
- 放入的元素实现Comparable接口
上面2种方式需要实现一个,如果2种都有,走规则1
特性:
- 迭代结果和存入顺序不一致
- 放入的元素会排序
- 元素不重复
- 元素不能为空
- 线程安全的
- 无界的
CopyOnWriteArraySet
内部使用CopyOnWriteArrayList实现的,将所有的操作都会转发给CopyOnWriteArrayList。
特性:
- 迭代结果和存入顺序不一致
- 元素不重复
- 元素可以为空
- 线程安全的
- 读读、读写、写读 不会阻塞;写写会阻塞
- 无界的
Queue
Queue接口中的方法,我们再回顾一下:
操作类型 抛出异常 返回特殊值
插入 add(e) offer(e)
移除 remove() poll()
检查 element() peek()
3种操作,每种操作有2个方法,不同点是队列为空或者满载时,调用方法是抛出异常还是返回特殊值,大家按照表格中的多看几遍,加深记忆。
ConcurrentLinkedQueue
高效并发队列,内部使用链表实现的。
特性:
- 线程安全的
- 迭代结果和存入顺序一致
- 元素可以重复
- 元素不能为空
- 线程安全的
- 无界队列
Deque
先介绍一下Deque接口,双向队列(Deque)是Queue的一个子接口,双向队列是指该队列两端的元素既能入队(offer)也能出队(poll),如果将Deque限制为只能从一端入队和出队,则可实现栈的数据结构。对于栈而言,有入栈(push)和出栈(pop),遵循先进后出原则。
一个线性 collection,支持在两端插入和移除元素。名称 deque 是“double ended queue(双端队列)”的缩写,通常读为“deck”。大多数 Deque 实现对于它们能够包含的元素数没有固定限制,但此接口既支持有容量限制的双端队列,也支持没有固定大小限制的双端队列。
此接口定义在双端队列两端访问元素的方法。提供插入、移除和检查元素的方法。每种方法都存在两种形式:一种形式在操作失败时抛出异常,另一种形式返回一个特殊值(null 或 false,具体取决于操作)。插入操作的后一种形式是专为使用有容量限制的 Deque 实现设计的;在大多数实现中,插入操作不能失败。
下表总结了上述 12 种方法:
此接口扩展了 Queue接口。在将双端队列用作队列时,将得到 FIFO(先进先出)行为。将元素添加到双端队列的末尾,从双端队列的开头移除元素。从 Queue 接口继承的方法完全等效于 Deque 方法,如下表所示:
Queue 方法 等效 Deque 方法
add(e) addLast(e)
offer(e) offerLast(e)
remove() removeFirst()
poll() pollFirst()
element() getFirst()
peek() peekFirst()
ConcurrentLinkedDeque
实现了Deque接口,内部使用链表实现的高效的并发双端队列。
特性:
- 线程安全的
- 迭代结果和存入顺序一致
- 元素可以重复
- 元素不能为空
- 线程安全的
- 无界队列
BlockingQueue
关于阻塞队列,上一篇有详细介绍。
疑问:
Q:跳表是什么?
接口性能提升实战篇
需求:电商app的商品详情页,需要给他们提供一个接口获取商品相关信息:
- 商品基本信息(名称、价格、库存、会员价格等)
- 商品图片列表
- 商品描述信息(描述信息一般是由富文本编辑的大文本信息)
普通接口实现伪代码如下:
public Map<String,Object> detail(long goodsId){
//创建一个map
//step1:查询商品基本信息,放入map
map.put("goodsModel",(select * from t_goods where id = #gooldsId#));
//step2:查询商品图片列表,返回一个集合放入map
map.put("goodsImgsModelList",(select * from t_goods_imgs where goods_id = #gooldsId#));
//step3:查询商品描述信息,放入map
map.put("goodsExtModel",(select * from t_goods_ext where goods_id = #gooldsId#));
return map;
}
上面这种写法应该很常见,代码很简单,假设上面每个步骤耗时200ms,此接口总共耗时>=600毫秒
整个过程是按顺序执行的,实际上3个查询之间是没有任何依赖关系,所以说3个查询可以同时执行,那我们对这3个步骤采用多线程并行执行实现如下:
示例:
public class GetProductDetailTest {
//自定义包含策略
private ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 20, 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(5),
new DemoThreadFactory("订单创建组"), new ThreadPoolExecutor.AbortPolicy());
/**
* 获取商品基本信息
*
* @param goodsId 商品id
* @return 商品基本信息
* @throws InterruptedException
*/
public String goodsDetailModel(long goodsId) throws InterruptedException {
//模拟耗时,休眠200ms
TimeUnit.MILLISECONDS.sleep(200);
return "商品id:" + goodsId + ",商品基本信息....";
}
/**
* 获取商品图片列表
*
* @param goodsId 商品id
* @return 商品图片列表
* @throws InterruptedException
*/
public List<String> goodsImgsModelList(long goodsId) throws InterruptedException {
//模拟耗时,休眠200ms
TimeUnit.MILLISECONDS.sleep(200);
return Arrays.asList("图1", "图2", "图3");
}
/**
* 获取商品描述信息
*
* @param goodsId 商品id
* @return 商品描述信息
* @throws InterruptedException
*/
public String goodsExtModel(long goodsId) throws InterruptedException {
//模拟耗时,休眠200ms
TimeUnit.MILLISECONDS.sleep(200);
return "商品id:" + goodsId + ",商品描述信息......";
}
public Map<String,Object> getGoodsDetail(long goodsId) throws ExecutionException, InterruptedException {
Map<String, Object> result = new HashMap<>();
Future<String> gooldsDetailModelFuture = executor.submit(() -> goodsDetailModel(goodsId));
Future<List<String>> goodsImgsModelFuture = executor.submit(() -> goodsImgsModelList(goodsId));
//异步获取商品描述信息
Future<String> goodsExtModelFuture = executor.submit(() -> goodsExtModel(goodsId));
result.put("gooldsDetailModel", gooldsDetailModelFuture.get());
result.put("goodsImgsModelList", goodsImgsModelFuture.get());
result.put("goodsExtModel", goodsExtModelFuture.get());
return result;
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
GetProductDetailTest detailTest = new GetProductDetailTest();
long starTime = System.currentTimeMillis();
Map<String, Object> map = detailTest.getGoodsDetail(1L);
System.out.println(map);
System.out.println("耗时(ms):" + (System.currentTimeMillis() - starTime));
}
}
输出:
{goodsImgsModelList=[图1, 图2, 图3], gooldsDetailModel=商品id:1,商品基本信息...., goodsExtModel=商品id:1,商品描述信息......}
耗时(ms):255
可以看出耗时200毫秒左右,性能提升了2倍,假如这个接口中还存在其他无依赖的操作,性能提升将更加显著,上面使用了线程池并行去执行3次查询的任务,最后通过Future获取异步执行结果。
整个优化过程:
- 先列出无依赖的一些操作
- 将这些操作改为并行的方式
总结
- 对于无依赖的操作尽量采用并行方式去执行,可以很好的提升接口的性能
解决微服务日志的痛点
日志有什么用?
- 系统出现故障的时候,可以通过日志信息快速定位问题,修复bug,恢复业务
- 提取有用数据,做数据分析使用
本文主要讨论通过日志来快速定位并解决问题。
日志存在的痛点
先介绍一下多数公司采用的方式:目前比较流行的是采用springcloud(或者dubbo)做微服务,按照业务拆分为多个独立的服务,服务采用集群的方式部署在不同的机器上,当一个请求过来的时候,可能会调用到很多服务进行处理,springcloud一般采用logback(或者log4j)输出日志到文件中。当系统出问题的时候,按照系统故障的严重程度,严重的会回退版本,然后排查bug,轻的,找运维去线上拉日志,然后排查问题。
这个过程中存在一些问题:
- 日志文件太大太多,不方便查找
- 日志分散在不同的机器上,也不方便查找
- 一个请求可能会调用多个服务,完整的日志难以追踪(没有完整的链路日志)
- 系统出现了问题,只能等到用户发现了,自己才知道(没有报错预警)
本文要解决上面的几个痛点,构建我们的日志系统,达到以下要求:
- 方便追踪一个请求完整的日志
- 方便快速检索日志
- 系统出现问题自动报警,通知相关人员
构建日志系统
方便追踪一个请求完整的日志
当一个请求过来的时候,可能会调用多个服务,多个服务内部可能又会产生子线程处理业务,所以这里面有两个问题需要解决:
- 多个服务之间日志的追踪
- 服务内部子线程和主线程日志的追踪,这个地方举个例子,比如一个请求内部需要给10000人发送推送,内部开启10个线程并行处理,处理完毕之后响应操作者,这里面有父子线程,我们要能够找到这个里面所有的日志
需要追踪一个请求完整日志,我们需要给每个请求设置一个全局唯一编号,可以使用UUID或者其他方式也行。
多个服务之间日志追踪的问题:当一个请求过来的时候,在入口处生成一个trace_id,然后放在ThreadLocal中,如果内部设计到多个服务之间相互调用,调用其他服务的时,将trace_id顺便携带过去。
父子线程日志追踪的问题:可以采用InheritableThreadLocal来存放trace_id,这样可以在线程中获取到父线程中的trace_id。
所以此处我们需要使用InheritableThreadLocal来存储trace_id。
使用了线程池处理请求的,由于线程池中的线程采用的是复用的方式,所以需要对执行的任务Runable做一些改造 包装。
public class TraceRunnable implements Runnable {
private String tranceId;
private Runnable target;
public TraceRunnable(Runnable target) {
this.tranceId = TraceUtil.get();
this.target = target;
}
@Override
public void run() {
try {
TraceUtil.set(this.tranceId);
MDC.put(TraceUtil.MDC_TRACE_ID, TraceUtil.get());
this.target.run();
} finally {
MDC.remove(TraceUtil.MDC_TRACE_ID);
TraceUtil.remove();
}
}
public static Runnable trace(Runnable target) {
return new TraceRunnable(target);
}
}
需要用线程池执行的任务使用TraceRunnable封装一下就可以了。
TraceUtil代码:
public class TraceUtil {
public static final String REQUEST_HEADER_TRACE_ID = "com.ms.header.trace.id";
public static final String MDC_TRACE_ID = "trace_id";
private static InheritableThreadLocal<String> inheritableThreadLocal = new InheritableThreadLocal<>();
/**
* 获取traceid
*
* @return
*/
public static String get() {
String traceId = inheritableThreadLocal.get();
if (traceId == null) {
traceId = IDUtil.getId();
inheritableThreadLocal.set(traceId);
}
return traceId;
}
public static void set(String trace_id) {
inheritableThreadLocal.set(trace_id);
}
public static void remove() {
inheritableThreadLocal.remove();
}
}
日志输出中携带上trace_id,这样最终我们就可以通过trace_id找到一个请求的完整日志了。
方便快速检索日志
日志分散在不同的机器上,如果要快速检索,需要将所有服务产生的日志汇集到一个地方。
关于检索日志的,列一下需求:
- 我们将收集日志发送到消息中间件中(可以是kafka、rocketmq),消息中间件这块不介绍,选择玩的比较溜的就可以了
- 系统产生日志尽量不要影响接口的效率
- 带宽有限的情况下,发送日志也尽量不要去影响业务
- 日志尽量低延次,产生的日志,尽量在生成之后1分钟后可以检索到
- 检索日志功能要能够快速响应
关于上面几点,我们需要做的:日志发送的地方进行改造,引入消息中间件,将日志异步发送到消息中间件中,查询的地方采用elasticsearch,日志系统需要订阅消息中间件中的日志,然后丢给elasticsearch建索引,方便快速检索,咱们来一点点的介绍。
日志发送端的改造
日志是由业务系统产生的,一个请求过来的时候会产生很多日志,日志产生时,我们尽量减少日志输出对业务耗时的影响,我们的过程如下:
- 业务系统内部引用一个线程池来异步处理日志,线程池内部可以使用一个容量稍微大一点的阻塞队列
- 业务系统将日志丢给线程池进行处理
- 线程池中将需要处理的日志先压缩一下,然后发送至mq
线程池的使用可以参考:JAVA线程池,这一篇就够了
引入mq存储日志
业务系统将日志先发送到mq中,后面由其他消费者订阅进行消费。日志量比较大的,对mq的要求也比较高,可以选择kafka,业务量小的,也可以选取activemq。
使用elasticsearch来检索日志
elasticsearch(以下简称es)是一个全文检索工具,具体详情可以参考其官网相关文档。使用它来检索数据效率非常高。日志系统中需要我们开发一个消费端来拉取mq中的消息,将其存储到es中方便快速检索,关于这块有几点说一下:
- 建议按天在es中建立数据库,日质量非常大的,也可以按小时建立数据库。查询的时候,时间就是必选条件了,这样可以快速让es定位到日志库进行检索,提升检索效率
- 日志常见的需要收集的信息:trace_id、时间、日志级别、类、方法、url、调用的接口开始时间、调用接口的结束时间、接口耗时、接口状态码、异常信息、日志信息等等,可以按照这些在es中建立索引,方便检索。
日志监控报警——可自定义配置报警
日志监控报警是非常重要的,这个必须要有,日志系统中需要开发监控报警功能,这块我们可以做成通过页面配置的方式,支持报警规则的配置,如日志中产生了某些异常、接口响应时间大于多少、接口返回状态码404等异常信息的时候能够报警,具体的报警可以是语音电话、短信通知、钉钉机器人报警等等,这些也做成可以配置的。
日志监控模块从mq中拉取日志,然后去匹配我们启用的一些规则进行报警。
日志处理结构图如下:
高并发中常见的限流方式
常见的限流的场景
- 秒杀活动,数量有限,访问量巨大,为了防止系统宕机,需要做限流处理
- 国庆期间,一般的旅游景点人口太多,采用排队方式做限流处理
- 医院看病通过发放排队号的方式来做限流处理。
常见的限流算法
- 通过控制最大并发数来进行限流
- 使用漏桶算法来进行限流
- 使用令牌桶算法来进行限流
通过控制最大并发数来进行限流
以秒杀业务为例,10个iphone,100万人抢购,100万人同时发起请求,最终能够抢到的人也就是前面几个人,后面的基本上都没有希望了,那么我们可以通过控制并发数来实现,比如并发数控制在10个,其他超过并发数的请求全部拒绝,提示:秒杀失败,请稍后重试。
单机中的JUC中提供了这样的工具类:Semaphore:如果是集群,则可以用redis或者zk代替Semaphore
示例:
public class MaxAccessLimiter {
private static Semaphore limiter = new Semaphore(5);
//自定义包含策略
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 20, 60,
TimeUnit.SECONDS, new SynchronousQueue(),
new DemoThreadFactory("订单创建组"), new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) {
for (int i = 0; i < 20; i++) {
executor.submit(() -> {
boolean flag = false;
try {
flag = limiter.tryAcquire(100, TimeUnit.MICROSECONDS);
if (flag) {
//休眠2秒,模拟下单操作
System.out.println(Thread.currentThread() + ",尝试下单中。。。。。");
TimeUnit.SECONDS.sleep(2);
} else {
System.out.println(Thread.currentThread() + ",秒杀失败,请稍微重试!");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (flag) {
limiter.release();
}
}
});
}
executor.shutdown();
}
}
输出:
Thread[From DemoThreadFactory's 订单创建组-Worker-1,5,main],尝试下单中。。。。。
Thread[From DemoThreadFactory's 订单创建组-Worker-2,5,main],尝试下单中。。。。。
Thread[From DemoThreadFactory's 订单创建组-Worker-3,5,main],尝试下单中。。。。。
Thread[From DemoThreadFactory's 订单创建组-Worker-4,5,main],尝试下单中。。。。。
Thread[From DemoThreadFactory's 订单创建组-Worker-5,5,main],尝试下单中。。。。。
Thread[From DemoThreadFactory's 订单创建组-Worker-9,5,main],秒杀失败,请稍微重试!
Thread[From DemoThreadFactory's 订单创建组-Worker-14,5,main],秒杀失败,请稍微重试!
Thread[From DemoThreadFactory's 订单创建组-Worker-16,5,main],秒杀失败,请稍微重试!
Thread[From DemoThreadFactory's 订单创建组-Worker-17,5,main],秒杀失败,请稍微重试!
Thread[From DemoThreadFactory's 订单创建组-Worker-18,5,main],秒杀失败,请稍微重试!
Thread[From DemoThreadFactory's 订单创建组-Worker-20,5,main],秒杀失败,请稍微重试!
Thread[From DemoThreadFactory's 订单创建组-Worker-12,5,main],秒杀失败,请稍微重试!
Thread[From DemoThreadFactory's 订单创建组-Worker-11,5,main],秒杀失败,请稍微重试!
Thread[From DemoThreadFactory's 订单创建组-Worker-7,5,main],秒杀失败,请稍微重试!
Thread[From DemoThreadFactory's 订单创建组-Worker-8,5,main],秒杀失败,请稍微重试!
Thread[From DemoThreadFactory's 订单创建组-Worker-6,5,main],秒杀失败,请稍微重试!
Thread[From DemoThreadFactory's 订单创建组-Worker-10,5,main],秒杀失败,请稍微重试!
Thread[From DemoThreadFactory's 订单创建组-Worker-19,5,main],秒杀失败,请稍微重试!
Thread[From DemoThreadFactory's 订单创建组-Worker-15,5,main],秒杀失败,请稍微重试!
Thread[From DemoThreadFactory's 订单创建组-Worker-13,5,main],秒杀失败,请稍微重试!
使用漏桶算法来进行限流
国庆期间比较火爆的景点,人流量巨大,一般入口处会有限流的弯道,让游客进去进行排队,排在前面的人,每隔一段时间会放一拨进入景区。排队人数超过了指定的限制,后面再来的人会被告知今天已经游客量已经达到峰值,会被拒绝排队,让其明天或者以后再来,这种玩法采用漏桶限流的方式。
漏桶算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率。
漏桶算法示意图:
示例:代码中BucketLimit.build(10, 60, TimeUnit.MINUTES);创建了一个容量为10,流水为60/分钟的漏桶。
public class BucketLimitTest {
public static class BucketLimit {
static AtomicInteger threadNum = new AtomicInteger(1);
//容量
private int capcity;
//流速
private int flowRate;
//流速时间单位
private TimeUnit flowRateUnit;
private BlockingQueue<Node> queue;
//漏桶流出的任务时间间隔(纳秒)
private long flowRateNanosTime;
public BucketLimit(int capcity, int flowRate, TimeUnit flowRateUnit) {
this.capcity = capcity;
this.flowRate = flowRate;
this.flowRateUnit = flowRateUnit;
this.bucketThreadWork();
}
//漏桶线程
public void bucketThreadWork() {
this.queue = new ArrayBlockingQueue<Node>(capcity);
//漏桶流出的任务时间间隔(纳秒)
this.flowRateNanosTime = flowRateUnit.toNanos(1) / flowRate;
System.out.println(TimeUnit.NANOSECONDS.toSeconds(this.flowRateNanosTime));
Thread thread = new Thread(this::bucketWork);
thread.setName("漏桶线程-" + threadNum.getAndIncrement());
thread.start();
}
//漏桶线程开始工作
public void bucketWork() {
while (true) {
Node node = this.queue.poll();
if (Objects.nonNull(node)) {
//唤醒任务线程
LockSupport.unpark(node.thread);
}
//阻塞当前线程,最长不超过nanos纳秒
//休眠flowRateNanosTime
LockSupport.parkNanos(this.flowRateNanosTime);
}
}
//返回一个漏桶
public static BucketLimit build(int capcity, int flowRate, TimeUnit flowRateUnit) {
if (capcity < 0 || flowRate < 0) {
throw new IllegalArgumentException("capcity、flowRate必须大于0!");
}
return new BucketLimit(capcity, flowRate, flowRateUnit);
}
//当前线程加入漏桶,返回false,表示漏桶已满;true:表示被漏桶限流成功,可以继续处理任务
public boolean acquire() {
Thread thread = Thread.currentThread();
Node node = new Node(thread);
if (this.queue.offer(node)) {
LockSupport.park();
return true;
}
return false;
}
//漏桶中存放的元素
class Node {
private Thread thread;
public Node(Thread thread) {
this.thread = thread;
}
}
}
//自定义包含策略
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(15, 15, 60,
TimeUnit.SECONDS, new SynchronousQueue(),
new DemoThreadFactory("订单创建组"), new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) {
//容量为10,流速为1个/秒,即60/每分钟
BucketLimit bucketLimit = BucketLimit.build(10, 60, TimeUnit.MINUTES);
for (int i = 0; i < 15; i++) {
executor.submit(() -> {
boolean acquire = bucketLimit.acquire();
System.out.println(Thread.currentThread().getName()+ " ," +System.currentTimeMillis() + " " + acquire);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
输出:
From DemoThreadFactory's 订单创建组-Worker-11 ,1599545066963 false
From DemoThreadFactory's 订单创建组-Worker-12 ,1599545066963 false
From DemoThreadFactory's 订单创建组-Worker-13 ,1599545066963 false
From DemoThreadFactory's 订单创建组-Worker-14 ,1599545066964 false
From DemoThreadFactory's 订单创建组-Worker-15 ,1599545066964 false
From DemoThreadFactory's 订单创建组-Worker-3 ,1599545067961 true
From DemoThreadFactory's 订单创建组-Worker-1 ,1599545068962 true
From DemoThreadFactory's 订单创建组-Worker-2 ,1599545069963 true
From DemoThreadFactory's 订单创建组-Worker-4 ,1599545070964 true
From DemoThreadFactory's 订单创建组-Worker-5 ,1599545071965 true
From DemoThreadFactory's 订单创建组-Worker-6 ,1599545072966 true
From DemoThreadFactory's 订单创建组-Worker-7 ,1599545073966 true
From DemoThreadFactory's 订单创建组-Worker-8 ,1599545074967 true
From DemoThreadFactory's 订单创建组-Worker-9 ,1599545075967 true
From DemoThreadFactory's 订单创建组-Worker-10 ,1599545076968 true
使用令牌桶算法来进行限流
令牌桶算法的原理是系统以恒定的速率产生令牌,然后把令牌放到令牌桶中,令牌桶有一个容量,当令牌桶满了的时候,再向其中放令牌,那么多余的令牌会被丢弃;当想要处理一个请求的时候,需要从令牌桶中取出一个令牌,如果此时令牌桶中没有令牌,那么则拒绝该请求。从原理上看,令牌桶算法和漏桶算法是相反的,一个“进水”,一个是“漏水”。这种算法可以应对突发程度的请求,因此比漏桶算法好。
令牌桶算法示意图:
限流工具类RateLimiter
Google开源工具包Guava提供了限流工具类RateLimiter,可以非常方便的控制系统每秒吞吐量.
示例:RateLimiter.create(5)创建QPS为5的限流对象,后面又调用rateLimiter.setRate(10);将速率设为10,输出中分2段,第一段每次输出相隔200毫秒,第二段每次输出相隔100毫秒,可以非常精准的控制系统的QPS。
public class RateLimiterTest {
public static void main(String[] args) {
//permitsPerSecond=1 即QPS=1
RateLimiter rateLimiter = RateLimiter.create(1);
for (int i = 0; i < 10; i++) {
//调用acquire会根据QPS计算需要睡眠多久,返回耗时时间
double acquire = rateLimiter.acquire();
System.out.println(System.currentTimeMillis()+"耗时"+acquire);
}
System.out.println("----------");
//可以随时调整速率,我们将qps调整为10
rateLimiter.setRate(10);
for (int i = 0; i < 10; i++) {
//rateLimiter.acquire();
double acquire = rateLimiter.acquire();
System.out.println(System.currentTimeMillis()+"耗时"+acquire);
}
}
}
输出:
1599545866820耗时0.0
1599545867820耗时0.998552
1599545868819耗时0.997836
1599545869820耗时0.999819
1599545870820耗时0.998723
1599545871819耗时0.999232
1599545872819耗时0.999328
1599545873819耗时1.000024
1599545874819耗时0.99995
1599545875820耗时0.999597
----------
1599545876819耗时0.998575
1599545876920耗时0.099593
1599545877020耗时0.098779
1599545877119耗时0.098661
1599545877220耗时0.099558
1599545877319耗时0.098965
1599545877419耗时0.099139
1599545877520耗时0.099768
1599545877620耗时0.098729
1599545877720耗时0.0986
网友评论