高并发≠多线程,高并发是指系统短时间内遇到大量操作请求的情况,而多线程是指一种处理方式。
1. 实现高并发需要考虑的因素:
* 系统的架构设计,如何在架构层面上减少不必要的处理(网络请求、数据库操作等)
* 网路拓扑优化减少网路请求时间,如何设计拓扑结构、分布式如何实现
* 系统代码级别的优化,使用什么设计模式进行工作?哪些类需要单例,哪些需要减少new操作
* 提高代码层面的运行效率,如何选取合适的数据结构进行数据存取?如何设计算法
* 任务执行方式级别的同异步操作,该在什么地方使用哪种方式
* JVM调优,是以server模式还是以clien模式运行,如何设置Heap、Stack、Eden的大小,如何选择GC策略、控制Full GC的频率
* 数据库优化减少查询修改时间。数据库的选取?数据库引擎的选取?表结构的设计?数据库索引、触发器等设计、是否使用读写分离、还是使用数据库仓库
* 缓存数据库的使用,如何选择缓存数据库 设计缓存机制
* 数据通信问题,如何选择通信方式TCP还是UDP,长短连接等
* 操作系统选取
* 硬件配置等
* .....
2. 多线程的难点:
多线程的难点在于对共享数据的读写顺序的问题,保证多个线程对同一数据的操作不会产生混乱。在多个线程同事访问共享数据的时候,由于线程的读取/写入的时机不对而导致数据出错,进而影响业务。
3. 线程的简单引用:
启用线程通常是通过Thread或其子类通过调用start()方法启动。常见的线程有两种,实现Runnable和Thread。而继承Thread或使用TimerTask其底层依旧实现了Runnable接口。考虑到java的单继承的限制,所以在开发过程中大部分情况在使用线程的时候是通过实现Runnabel接口或者Runnbel匿名类来实现的
public class ThreadTest {
public void testLambda(){
new Thread(()->System.out.print("testLambda")).start();
}
public void testRunnableWithAnonymousThread(){ //匿名Thread类开启线程
new Thread(){
@Override
public void run() {
System.out.println("testRunnableWithAnonymousThread");
}
}.start();
}
public void testRunnableWithAnonymousRunnable(){ //匿名Runnable类开启线程
Thread thread=new Thread(new Runnable() {
@Override
public void run() {
System.out.println("testRunnableWithAnonymousRunnable");
}
});
thread.start();
}
public void testRunnable(){ //实现Runable接口
MyRunnable runabel=new MyRunnable();
Thread thread=new Thread(runabel);
}
public void testMyThread(){ //继承Thread
MyThread thread=new MyThread();
thread.setName("MyThread");
thread.start();
}
}
class MyRunnable implements Runnable{
@Override
public void run() {
System.out.println("MyRunnable");
}
}
class MyThread extends Thread{
@Override
public void run() {
System.out.println("MyThread");
}
}
注意,直接调用run()方法的方式执行线程体并未开启新线程,只是在main方法中调用了一个普通方法而已。而使用start()方法则会开启一个新的线程执行。两者的区别主要表现在前者是阻塞式的,而后者为非阻塞式。
3. 线程的停止
遵循的规则是让线程自己停止自己: 两种方法:1、线程任务执行完成,顺利结束退出。2、设置终止标志位,在循环的时候进行终止标志位检测,如果设置为终止状态则return结束线程。
boolean isStop = false;//终止标志位 当需要结束线程时,更改为true
public void testInterrupt(){
Thread th = new Thread(new Runnable() {
@Override
public void run() {
while(true){
if(isStop){//当需要结束线程的时候终止线程
//doSomething 进行一些收尾工作
return;
}
System.out.println(Thread.currentThread().getName());
}
}
});
4. 线程锁
- 锁的分类
- 共享锁/排他锁
是同一时刻是否允许多个线程同时持有该锁的角度来划分。共享锁主要指对数据库读操作的读锁,在读写资源的时候如果没有线程持有写锁和请求写锁,则允许多个线程持有读锁。
- 悲观锁/乐观锁
主要用于数据库的操作中,比较少见。
- 可重入锁/不可重入锁
同一个线程已持有锁的前提下来区分。可重入锁,如果该线程已经获取到锁并且未释放的情况下允许再次访问临界资源。
- 公平锁/不公平锁
获取锁的顺序角度来区分。公平锁,所有等待的线程按照请求锁的先后循序分别获取锁。非公平锁测试随机的获取锁或者其他的策略获取锁。ReentrantLock通过构造方法来选择实现公平锁还是非公平锁。
- 自旋锁/非自旋锁
从线程等待的处理机制来区分的。自旋锁在进行锁请求的时候,不进行wait挂起,不释放cpu资源,执行while空循环。适用于等待所时间较短的情况。
常用锁的使用方法
- synchronized锁:
每一个对象实例在对象头中都会有monitor record列表记录持有该锁的线程,底层通过对列表的查询来判断是否有线程访问临界资源。
- reentranLock
synchronized加锁机制基于JVM层面的加锁,而reentranLock是基于jdk层面的加锁机制。reentranLock可重入所,提供的构造方法可以指定公平锁或非公平锁。
- 读写锁
- 自旋锁
- 信号量实现锁效果
.5. 线程间的通信与协作
- wait和notify/notifyAll
- await和signal/signalAll
- sleep/yield/join
yield使当前线程从运行状态转变为就绪状态,由线程调度重新选择就绪状态的线程分配cpu资源。对于join方法,作用是暂停当前线程,等待被调用线程指向结束后再继续执行。调用join不会释放掉锁,如果调用线程也需要该锁就会导致死锁。join不会启动线程,必须先调用start,底层是自旋锁
- CyclicBarrier 栅栏
当指定数量的线程执行到指定位置的时候,才能触发后续动作的进行,其最终的目的是让所有的线程同时开始后续的工作。
- CountDownLatch 闭锁
CyclicBarrier与CountDownLatch都属于线程计数器,但是CountDownLatch是线程等待其他线程执行到某一个位置时开始执行。
- 6、Semaphore 信号量
主要控制访问临界资源的线程数量,只有当有线程释放资源之后,才会有新的线程获取到资源。即控制了同一时间访问临界区资源的线程数量。当Semaphore(1)设置为1的时候,此时可以当做锁来使用。
6. JDK原生线程池
线程的创建、切换与销毁都会耗费大量的系统资源,为了复用创建好的线程,减少频繁的创建线程的次数,提高线程利用率可以应用线程池技术。优势如下:
* 保持一定数量的线程,减少了线程频繁创建和销毁资源浪费。
* 使用线程的时候直接由线程池中取出线程,省去了创建线程的时间,侧面提高了系统的响应时间。
* 减少人为的不合理创建线程。
- newCachedThreadPool
/**
* Creates a thread pool that creates new threads as needed, but will reuse previously constructed threads when they are available.
* 创建一个线程池,需要的时候会创建新的线程,如果有可用的线程则会复用以前已经创建好的线程。
* These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks.
* 这些线程池通常情况下可以提升哪些短期异步任务的性能
* Calls to {@code execute} will reuse previously constructed threads if available.
* 如果以创建的线程状态可用的话,调用execute可以复用他们
* If no existing thread is available, a new thread will be created and added to the pool.
* 如果不存在可用状态的线程,那么将会创建一个新线程同时会把该线程添加到线程池中
* Threads that have not been used for sixty seconds are terminated and removed from the cache.
*那些超过60s没用的线程将会被销毁同时从缓存中移除
* Thus, a pool that remains idle for long enough will not consume any resources.
*因此长时间空闲的线程池不会消耗任何资源
* Note that pools with similar properties but different details (for example, timeout parameters) may be created using {@link ThreadPoolExecutor} constructors.
*可以使用ThreadPoolExecutor创建性质相似但实现细节不同的线程池
* @return the newly created thread pool
*/
- newFixedThreadPool
/**
* Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue.
*创建一个可重用、固定数量线程的线程池
* At any point, at most {@code nThreads} threads will be active processing tasks.
*任何时间最多只有 nThreads 个线程被激活来执行任务
* If additional tasks are submitted when all threads are active, they will wait in the queue until a thread is available.
* 当无可用空闲线程的时候,如果有新任务被提交,这些新任务将会一直等待直至有可用线程来执行。
* If any thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks.
*如果任何线程正常关闭之前在执行过程中因失败而提前终止,那么如果有未被执行的后续任务,则会创建新的线程来继续执行。
* The threads in the pool will exist until it is explicitly {@link ExecutorService#shutdown shutdown}.
* 线程池中的所有线程在明确掉用shutdown之后将会退出
*
* @param nThreads the number of threads in the pool
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
- 3.newScheduledThreadPool
/**
* Creates a thread pool that can schedule commands to run after a given delay, or to execute periodically.
* 创建一个线程池,该线程在延迟指定时间之后可以周期性的执行线程体
* @param corePoolSize the number of threads to keep in the pool,
* even if they are idle
* @return a newly created scheduled thread pool 注意返回值类型是ScheduledExecutorService,不要使用ExecutorService来接收,否则找不到schedule执行方法
* @throws IllegalArgumentException if {@code corePoolSize < 0}
*/
- newSingleThreadExector
/**
* Creates an Executor that uses a single worker thread operating off an unbounded queue.
*创建一个Executor,使用一个线程来工作,该线程存储在LinkedBlockingQueue中
* (Note however that if this single thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks.)
*注,如果任何线程正常关闭之前在执行过程中因失败而提前终止,那么如果有未被执行的后续任务,则会创建新的线程来继续执行。
* Tasks are guaranteed to execute sequentially, and no more than one task will be active at any given time.
* 任务是按顺序执行的,任何时间都只有一个线程来执行任务
* Unlike the otherwise equivalent {@code newFixedThreadPool(1)} the returned executor is guaranteed not to be reconfigurable to use additional threads.
*
* @return the newly created single-threaded Executor
*/
- newSingleThreadScheduledExecutor
/**
* Creates a single-threaded executor that can schedule commands
* to run after a given delay, or to execute periodically.
* (Note however that if this single
* thread terminates due to a failure during execution prior to
* shutdown, a new one will take its place if needed to execute
* subsequent tasks.) Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* {@code newScheduledThreadPool(1)} the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
* @return the newly created scheduled executor
*/
线程池的使用:
package com.essential.controller;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author Paranoid
* @description 线程池
* @date 2018/3/2
* @company 美衫
**/
public class testPool {
private static ThreadFactory myFactory=new MyThreadFactory();
private static void testSingleThreadFactory(){
//一个一个的依次执行
doHandle(Executors.newSingleThreadExecutor(myFactory));
}
private static void testFixThreadPool(){
//两个两个的执行
doHandle(Executors.newFixedThreadPool(2,myFactory));
}
private static void testCachedThreadPool(){
//10个一起一次性的执行完
doHandle(Executors.newCachedThreadPool(myFactory));
}
private static void testScheduledThreadPool(){
//定时周期执行
Executors.newScheduledThreadPool(1,myFactory).scheduleAtFixedRate(runnable,500,2000, TimeUnit.MILLISECONDS);
}
private static Runnable runnable=()->{
sleep(1000);
System.out.println(Thread.currentThread().getName()+" work!!!");
};
private static void doHandle(ExecutorService executorService){
for (int i = 0; i < 10; i++) {
executorService.execute(runnable);
}
executorService.shutdown();
}
private static void sleep(int time){
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
// testCachedThreadPool();
// testSingleThreadFactory();
// testFixThreadPool();
testScheduledThreadPool();
}
}
class MyThreadFactory implements ThreadFactory{
private static final AtomicInteger poolNumber=new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber=new AtomicInteger(1);
private final String namePrefix;
public MyThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = "☆☆☆--" + poolNumber.getAndIncrement() + "-****-";
}
@Override
public Thread newThread(Runnable r) {
Thread t=new Thread(group,r,namePrefix+threadNumber.getAndIncrement(),0);
if(t.isDaemon()){
t.setDaemon(false);
}
if(t.getPriority()!=Thread.NORM_PRIORITY){
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}
8.常用的线程模型
*1. Future模型
直接从子线程得到处理结果。
*2. fork&join 模型
把一个大任务逐级拆分为多个子任务,然后分别在子线程中执行,当每次子线程执行结形式束后逐级回溯,返回结果并汇总。
*3.actor消息模型
以消息的形式进行线程间数据的传输,避免全局变量的使用,进而避免数据同步错误的隐患。
- 4.生产者消费者模型
核心是使用一个缓存来保存任务
- 5.master-worker模型
master-worker模型类似于任务分发策略,开启一个master线程接收任务,然后在master中根据任务的具体情况进行分发给其它worker子线程,然后由子线程处理任务。如需返回结果,则worker处理结束之后把处理结果返回给master。
网友评论