一、什么是高并发
高并发(High Concurrency)是互联网分布式系统架构设计中必须考虑的因素之一。它通常是指,通过设计保证系统能够同时并行处理很多请求。
高并发指标通常有四点:响应时间(RT)、吞吐量(Throughput)、QPS(Queries-per-second)、并发用户数。
注:一个用户一秒内狂点按钮10次,这不是并发。只有10次请求在同一时刻同时向服务器发送请求(比如用CountDownLatch模拟实现),这才是并发。
1.1 响应时间(RT)
响应时间是指系统对请求作出响应的时间,一般来讲,用户能接受的响应时间是小于5秒。
1.2 吞吐量(Throughput)
吞吐量是指系统在单位时间内处理请求的数量。对于无并发的应用系统而言,吞吐量与响应时间成严格的反比关系,实际上此时吞吐量就是响应时间的倒数。
一个系统的吞度量(承压能力)与request对CPU的消耗、外部接口、IO等等紧密关联。
系统吞吐量几个重要参数:QPS(TPS)、并发数、响应时间
- QPS(TPS):每秒钟request/事务 数量
- 并发数: 系统同时处理的request/事务数
- 响应时间: 一般取平均响应时间
理解了上面三个要素的意义之后,就能推算出它们之间的关系:
QPS(TPS)= 并发数/平均响应时间
一个系统吞吐量通常由QPS(TPS)、并发数两个因素决定,每套系统这两个值都有一个相对极限值,在应用场景访问压力下,只要某一项达到系统最高值,系统的吞吐量就上不去了,如果压力继续增大,系统的吞吐量反而会下降,原因是系统超负荷工作,上下文切换、内存等等其它消耗导致系统性能下降。
11.3 QPS(Queries-per-second)
每秒响应请求数
1.4 并发用户数
并发用户数是指系统可以同时承载的正常使用系统功能的用户的数量。与吞吐量相比,并发用户数是一个更直观但也更笼统的性能指标。实际上,并发用户数是一个非常不准确的指标,因为用户不同的使用模式会导致不同用户在单位时间发出不同数量的请求。
二、高并发带来的问题
2.1 服务端
-
某一时间片刻系统流量异常高,系统濒临阀值;
-
服务器CPU,内存爆满,磁盘IO繁忙;
-
系统雪崩:分布式系统中经常会出现某个基础服务不可用造成整个系统不可用的情况,这种现象被称为服务雪崩效应。服务雪崩效应是一种因服务提供者的不可用导致服务调用者的不可用,并将不可用逐渐放大的过程。A为服务提供者,B为A的服务调用者,C和D是B的服务调用者。当A的不可用,引起B的不可用,并将不可用逐渐放大C和D时,服务雪崩就形成了。
2.2 用户角度
使用体验差
三、并发解决方案-四大法定
- 缓存:Redis
- 异步:消息中间件MQ
- 并发编程
- 分布式
四、优化方案——并发编程
4.1阿里笔试题
支付宝登录后,请问你可以从哪些角度优化提升性能?
支付宝-我的页面 业务场景架构图
4.2模拟业务场景
采用SpringBoot创建两个项目,一个项目(remote-server)为模拟远程服务接口项目,提供用户基本信息、用户余额查询等API。另外一个项目(app-server)调用远程服务接口,然后用fastjson组装数据后向APP端提供统一的用户信息API。
4.2.1 remote-server项目
1、端口设置8081
server:
port: 8081
2、导入我们需要用到的fastjson
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
3、创建两个实体类,分别为:用户基本信息
public class UserBaseInfo {
private String userId;
private String userName;
private String sex;
private int age;
// get set 省略
}
用户余额
public class UserMoney {
private String userId;
private String money;
// get set 省略
}
4、分别提供两个API供远程调用,分别为:用户基本信息API
@RestController
@RequestMapping("users-base")
public class UserBaseInfoController {
@GetMapping("/{userId}")
public UserBaseInfo getUserBaseInfo(@PathVariable String userId){
System.out.println("userId:"+userId);
try {
// 模拟业务逻辑,等待2秒
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
UserBaseInfo userBaseInfo = new UserBaseInfo();
userBaseInfo.setUserId("1");
userBaseInfo.setUserName("AlanChen");
userBaseInfo.setSex("男");
userBaseInfo.setAge(18);
return userBaseInfo;
}
}
用户余额API
@RestController
@RequestMapping("users-money")
public class UserMoneyController {
@GetMapping("/{userId}")
public UserMoney getUserMoney(@PathVariable String userId){
System.out.println("userId:"+userId);
try {
// 模拟业务逻辑,等待2秒
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
UserMoney userMoney = new UserMoney();
userMoney.setUserId("1");
userMoney.setMoney("1000");
return userMoney;
}
}
5、用Postman测试接口如下
用户基本信息 用户余额
4.2.2 app-server项目
1、端口设置为8888
server:
port: 8888
2、导入我们需要用到的fastjson
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
3、用Spring的RestTemplate接口实现远程调用服务
/**
* @author Alan Chen
* @description 远程接口
* @date 2020-07-20
*/
@Service
public class RemoteService {
/**
* 获取用户基本信息
* @param userId
* @return
*/
public String getUserInfo(String userId){
RestTemplate restTemplate = new RestTemplate();
long t1 = System.currentTimeMillis();
String result = restTemplate.getForObject("http://127.0.0.1:8081/users-base/{userId}",String.class,userId);
System.out.println("获取用户基本信息时间为:"+(System.currentTimeMillis()-t1));
return result;
}
/**
* 获取用户余额
* @param userId
* @return
*/
public String getUserMoney(String userId){
RestTemplate restTemplate = new RestTemplate();
long t1 = System.currentTimeMillis();
String result = restTemplate.getForObject("http://127.0.0.1:8081/users-money/{userId}",String.class,userId);
System.out.println("获取用户余额时间为:"+(System.currentTimeMillis()-t1));
return result;
}
}
4、调用远程接口,数据组装后返回给客户端
@Service
public class UserService {
@Autowired
RemoteService remoteService;
ExecutorService task = Executors.newFixedThreadPool(10);
/**
* 串行调用远程接口
* @param userId
* @return
*/
public String getUserInfo(String userId){
long t1 = System.currentTimeMillis();
// 分别调用两个接口
String v1 = remoteService.getUserInfo(userId);
JSONObject userInfo = JSONObject.parseObject(v1);
String v2 = remoteService.getUserMoney(userId);
JSONObject moneyInfo = JSONObject.parseObject(v2);
// 结果集进行合并
JSONObject result = new JSONObject();
result.putAll(userInfo);
result.putAll(moneyInfo);
System.out.println("执行总时间为:"+(System.currentTimeMillis()-t1));
return result.toString();
}
/**
* 线程并行调用远程接口(Thread+FutureTask+Callable)
* @param userId
* @return
*/
public String getUserInfoThread(String userId){
// Runnable没有返回值,Callable有返回值
long t1 = System.currentTimeMillis();
Callable<JSONObject> queryUserInfo = new Callable<JSONObject>() {
@Override
public JSONObject call() throws Exception {
String v1 = remoteService.getUserInfo(userId);
JSONObject userInfo = JSONObject.parseObject(v1);
return userInfo;
}
};
Callable<JSONObject> queryUserMoney = new Callable<JSONObject>() {
@Override
public JSONObject call() throws Exception {
String v2 = remoteService.getUserMoney(userId);
JSONObject moneyInfo = JSONObject.parseObject(v2);
return moneyInfo;
}
};
FutureTask <JSONObject> queryUserInfoFutureTask = new FutureTask<>(queryUserInfo);
FutureTask <JSONObject> queryUserMoneyFutureTask = new FutureTask<>(queryUserMoney);
new Thread(queryUserInfoFutureTask).start();
new Thread(queryUserMoneyFutureTask).start();
// 结果集进行合并
JSONObject result = new JSONObject();
try {
result.putAll(queryUserInfoFutureTask.get());// 阻塞方法拿结果
result.putAll(queryUserMoneyFutureTask.get());// 阻塞方法拿结果
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("执行总时间为:"+(System.currentTimeMillis()-t1));
return result.toString();
}
/**
* 线程并行调用远程接口(Thread+Callable+自定义FutureTask)
* @param userId
* @return
*/
public String getUserInfoThreadMyFutureTask(String userId){
// Runnable没有返回值,Callable有返回值
long t1 = System.currentTimeMillis();
Callable<JSONObject> queryUserInfo = new Callable<JSONObject>() {
@Override
public JSONObject call() throws Exception {
String v1 = remoteService.getUserInfo(userId);
JSONObject userInfo = JSONObject.parseObject(v1);
return userInfo;
}
};
Callable<JSONObject> queryUserMoney = new Callable<JSONObject>() {
@Override
public JSONObject call() throws Exception {
String v2 = remoteService.getUserMoney(userId);
JSONObject moneyInfo = JSONObject.parseObject(v2);
return moneyInfo;
}
};
AlanChenFutureTask<JSONObject> queryUserInfoFutureTask = new AlanChenFutureTask<>(queryUserInfo);
AlanChenFutureTask <JSONObject> queryUserMoneyFutureTask = new AlanChenFutureTask<>(queryUserMoney);
new Thread(queryUserInfoFutureTask).start();
new Thread(queryUserMoneyFutureTask).start();
// 结果集进行合并
JSONObject result = new JSONObject();
try {
result.putAll(queryUserInfoFutureTask.get());// 阻塞方法拿结果
result.putAll(queryUserMoneyFutureTask.get());// 阻塞方法拿结果
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("执行总时间为:"+(System.currentTimeMillis()-t1));
return result.toString();
}
/**
* 线程并行调用远程接口(Thread+FutureTask+Callable+ExecutorService)
* @param userId
* @return
*/
public String getUserInfoThreadPool(String userId){
// Runnable没有返回值,Callable有返回值
long t1 = System.currentTimeMillis();
Callable<JSONObject> queryUserInfo = new Callable<JSONObject>() {
@Override
public JSONObject call() throws Exception {
String v1 = remoteService.getUserInfo(userId);
JSONObject userInfo = JSONObject.parseObject(v1);
return userInfo;
}
};
Callable<JSONObject> queryUserMoney = new Callable<JSONObject>() {
@Override
public JSONObject call() throws Exception {
String v2 = remoteService.getUserMoney(userId);
JSONObject moneyInfo = JSONObject.parseObject(v2);
return moneyInfo;
}
};
FutureTask <JSONObject> queryUserInfoFutureTask = new FutureTask<>(queryUserInfo);
FutureTask <JSONObject> queryUserMoneyFutureTask = new FutureTask<>(queryUserMoney);
//用线程池执行
task.submit(queryUserInfoFutureTask);
task.submit(queryUserMoneyFutureTask);
// 结果集进行合并
JSONObject result = new JSONObject();
try {
result.putAll(queryUserInfoFutureTask.get());// 阻塞方法拿结果
result.putAll(queryUserMoneyFutureTask.get());// 阻塞方法拿结果
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("执行总时间为:"+(System.currentTimeMillis()-t1));
return result.toString();
}
/**
* 异步请求(节省tomcat线程池线程) Callable或DeferredResult
* @param userId
* @return
*/
public Callable<String> getUserInfoAsync(@PathVariable String userId){
long t = System.currentTimeMillis();
System.out.println("主线程开始..."+Thread.currentThread());
Callable<String> callable = new Callable<String>() {
@Override
public String call() throws Exception {
long t1 = System.currentTimeMillis();
System.out.println("子线程开始..."+Thread.currentThread());
String result = getUserInfoThreadPool(userId);
System.out.println("子线程结束..."+Thread.currentThread()+"---->"+(System.currentTimeMillis()-t1));
return result;
}
};
System.out.println("主线程结束..."+Thread.currentThread()+"---->"+(System.currentTimeMillis()-t));
return callable;
}
}
5、提供Controller接口给客户端访问
@RestController
@RequestMapping("users")
public class UserController {
@Autowired
UserService userService;
@GetMapping("/{userId}")
public String getUserInfo(@PathVariable String userId){
return userService.getUserInfo(userId);
}
@GetMapping("/thread/{userId}")
public String getUserInfoThread(@PathVariable String userId){
return userService.getUserInfoThread(userId);
}
@GetMapping("/thread/pool/{userId}")
public String getUserInfoThreadPool(@PathVariable String userId){
return userService.getUserInfoThreadPool(userId);
}
@GetMapping("/thread/my_future_task/{userId}")
public String getUserInfoThreadMyFutureTask(@PathVariable String userId){
return userService.getUserInfoThreadMyFutureTask(userId);
}
@GetMapping("/thread/async/{userId}")
public Callable<String> getUserInfoAsync(@PathVariable String userId){
return userService.getUserInfoAsync(userId);
}
}
4.3 优化方案详解
所有的优化方案都在app-server的UserService实现类里
4.3.1 常规实现方式:串行调用远程接口
/**
* 串行调用远程接口
* @param userId
* @return
*/
public String getUserInfo(String userId){
long t1 = System.currentTimeMillis();
// 分别调用两个接口
String v1 = remoteService.getUserInfo(userId);
JSONObject userInfo = JSONObject.parseObject(v1);
String v2 = remoteService.getUserMoney(userId);
JSONObject moneyInfo = JSONObject.parseObject(v2);
// 结果集进行合并
JSONObject result = new JSONObject();
result.putAll(userInfo);
result.putAll(moneyInfo);
System.out.println("执行总时间为:"+(System.currentTimeMillis()-t1));
return result.toString();
}
执行结果
postman执行结果 控制台打印执行时间
我看可以看到,执行时间大约是4秒钟。这种串行调用远程接口的方式,时间执行为各远程接口调用的时间总和,接口响应慢,不可取。
4.3.2 线程并行调用远程接口:Thread+FutureTask+Callable
/**
* 线程并行调用远程接口(Thread+FutureTask+Callable)
* @param userId
* @return
*/
public String getUserInfoThread(String userId){
// Runnable没有返回值,Callable有返回值
long t1 = System.currentTimeMillis();
Callable<JSONObject> queryUserInfo = new Callable<JSONObject>() {
@Override
public JSONObject call() throws Exception {
String v1 = remoteService.getUserInfo(userId);
JSONObject userInfo = JSONObject.parseObject(v1);
return userInfo;
}
};
Callable<JSONObject> queryUserMoney = new Callable<JSONObject>() {
@Override
public JSONObject call() throws Exception {
String v2 = remoteService.getUserMoney(userId);
JSONObject moneyInfo = JSONObject.parseObject(v2);
return moneyInfo;
}
};
FutureTask <JSONObject> queryUserInfoFutureTask = new FutureTask<>(queryUserInfo);
FutureTask <JSONObject> queryUserMoneyFutureTask = new FutureTask<>(queryUserMoney);
new Thread(queryUserInfoFutureTask).start();
new Thread(queryUserMoneyFutureTask).start();
// 结果集进行合并
JSONObject result = new JSONObject();
try {
result.putAll(queryUserInfoFutureTask.get());// 阻塞方法拿结果
result.putAll(queryUserMoneyFutureTask.get());// 阻塞方法拿结果
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("执行总时间为:"+(System.currentTimeMillis()-t1));
return result.toString();
}
控制台打印执行时间
我看可以看到,执行时间大约是2秒钟,执行时间减少了一半。利用Thread+FutureTask+Callable,可以同时发送多个远程调用请求,再用FutureTask的get()方法阻塞拿到各异步请求的结果集,再进行合并。这种方案,接口执行的总时间取决于各异步远程接口调用的最长的那个时间。
线程并行调用接口4.3.3 线程并行调用远程接口:Thread+Callable+自定义FutureTask
FutureTask除了用JDK自带的接口外,我们自己同样也可以实现一个简单的FutureTask。
/**
* @author Alan Chen
* @description 自定义FutureTask
* @date 2020-07-20
*/
public class AlanChenFutureTask<V> implements Runnable, Future<V> {
Callable<V> callable;
V result;
public AlanChenFutureTask(Callable<V> callable){
this.callable = callable;
}
@Override
public void run() {
try {
result = callable.call();
synchronized (this){
this.notifyAll();
}
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public V get() throws InterruptedException, ExecutionException {
if(result!=null){
return result;
}
synchronized (this){
//阻塞等待获取返回值
this.wait();
}
return result;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return false;
}
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return null;
}
}
/**
* 线程并行调用远程接口(Thread+Callable+自定义FutureTask)
* @param userId
* @return
*/
public String getUserInfoThreadMyFutureTask(String userId){
// Runnable没有返回值,Callable有返回值
long t1 = System.currentTimeMillis();
Callable<JSONObject> queryUserInfo = new Callable<JSONObject>() {
@Override
public JSONObject call() throws Exception {
String v1 = remoteService.getUserInfo(userId);
JSONObject userInfo = JSONObject.parseObject(v1);
return userInfo;
}
};
Callable<JSONObject> queryUserMoney = new Callable<JSONObject>() {
@Override
public JSONObject call() throws Exception {
String v2 = remoteService.getUserMoney(userId);
JSONObject moneyInfo = JSONObject.parseObject(v2);
return moneyInfo;
}
};
AlanChenFutureTask<JSONObject> queryUserInfoFutureTask = new AlanChenFutureTask<>(queryUserInfo);
AlanChenFutureTask <JSONObject> queryUserMoneyFutureTask = new AlanChenFutureTask<>(queryUserMoney);
new Thread(queryUserInfoFutureTask).start();
new Thread(queryUserMoneyFutureTask).start();
// 结果集进行合并
JSONObject result = new JSONObject();
try {
result.putAll(queryUserInfoFutureTask.get());// 阻塞方法拿结果
result.putAll(queryUserMoneyFutureTask.get());// 阻塞方法拿结果
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("执行总时间为:"+(System.currentTimeMillis()-t1));
return result.toString();
}
控制台打印执行时间
4.3.4 线程池并行调用远程接口:Thread+FutureTask+Callable+ExecutorService
我们可以进一步进行优化,将线程换成线程池
/**
* 线程池并行调用远程接口(Thread+FutureTask+Callable+ExecutorService)
* @param userId
* @return
*/
public String getUserInfoThreadPool(String userId){
// Runnable没有返回值,Callable有返回值
long t1 = System.currentTimeMillis();
Callable<JSONObject> queryUserInfo = new Callable<JSONObject>() {
@Override
public JSONObject call() throws Exception {
String v1 = remoteService.getUserInfo(userId);
JSONObject userInfo = JSONObject.parseObject(v1);
return userInfo;
}
};
Callable<JSONObject> queryUserMoney = new Callable<JSONObject>() {
@Override
public JSONObject call() throws Exception {
String v2 = remoteService.getUserMoney(userId);
JSONObject moneyInfo = JSONObject.parseObject(v2);
return moneyInfo;
}
};
FutureTask <JSONObject> queryUserInfoFutureTask = new FutureTask<>(queryUserInfo);
FutureTask <JSONObject> queryUserMoneyFutureTask = new FutureTask<>(queryUserMoney);
//用线程池执行
task.submit(queryUserInfoFutureTask);
task.submit(queryUserMoneyFutureTask);
// 结果集进行合并
JSONObject result = new JSONObject();
try {
result.putAll(queryUserInfoFutureTask.get());// 阻塞方法拿结果
result.putAll(queryUserMoneyFutureTask.get());// 阻塞方法拿结果
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("执行总时间为:"+(System.currentTimeMillis()-t1));
return result.toString();
}
控制台打印执行时间
4.3.5 异步请求(节省tomcat线程池线程) :Callable或DeferredResult
我们知道tomcat的请求连接数是有限的,如果接口响应时间过长,会占用和消耗tomcat的连接数。如果tomcat主线程接收到请求后,立即开启一个子线程异步去执行业务逻辑,然后tomcat主线程快速返回释放连接,等有结果后子线程再返回给前端客户端,这样就可以大大节省tomcat的连接数,提高tomcat连接利用率。具体实现如下:
异步请求架构图/**
* 异步请求(节省tomcat线程池线程) Callable或DeferredResult
* @param userId
* @return
*/
public Callable<String> getUserInfoAsync(@PathVariable String userId){
long t = System.currentTimeMillis();
System.out.println("主线程开始..."+Thread.currentThread());
Callable<String> callable = new Callable<String>() {
@Override
public String call() throws Exception {
long t1 = System.currentTimeMillis();
System.out.println("子线程开始..."+Thread.currentThread());
String result = getUserInfoThreadPool(userId);
System.out.println("子线程结束..."+Thread.currentThread()+"---->"+(System.currentTimeMillis()-t1));
return result;
}
};
System.out.println("主线程结束..."+Thread.currentThread()+"---->"+(System.currentTimeMillis()-t));
return callable;
}
postman执行测试
控制台打印结果
我们从打印结果中可以看到,tomcat主线程几乎只用了0毫秒就快速返回了,等子线程取到结果后再返回给客户端,消耗时间大约为2秒。
资料来源:享学堂 网络课程
网友评论