1、线程安全与锁
线程安全的本质,在于 存在了共享的可变状态 status, 在多线程共同操作状态变量时,当计算的正确性依赖于运行时相关的时序(比如i++,先取出i,再+1,然后赋值),就会出现竞争(竞态)
无状态对象永远是线程安全的.
所以线程安全 三步骤:a、无共享状态;b、共享状态不可变;c、共享状态同步
要做到状态同步,就必须要通过锁 or volatile;本章节先讨论锁,被锁保护的代码块可以认为是一间屋子,只有唯一的锁可以打开,即同时只能只有一个线程进入。其他进入的线程必须拿到这把锁。这样就保证了数据的安全性。
I、内部锁 synchronized
三种用法:a、锁住变量 (这个变量就是锁);b、锁住方法 (拥有这个方法的对象是锁) ;c、锁住静态方法 (这个类的Class对象是锁)
每个java对象都自带隐式锁(也称为监视器锁)
可重入 - 每个线程可以对同一个锁多次获取,比如 下面这个例子 a 获取一次,b还要再获取,获取时计数\退出时减少(像不像早期的垃圾回收)
public synchronized a(){
b();
}
public synchronized b(){
}
II、显式锁 ReentrantLock
ReentrantLock + Condition 替换了原先的 synchronized + object
Condition 实现了 资源的 竞争与释放 ; Lock 实现了 代码的隔离
Lock lock = new ReentrantLock();
Waits = lock.newConidition();
Lock.lock();
Try{
Waits.await() – 等同于 原先的锁wait
}finnaly{
Lock.unlock
}
显示锁有什么好处?
a、tryLock( xxx ) -- 可以设置超时,无法获取的时候,这样可以避免死锁
b、lockInterruptibly(); -- 可以响应中断的锁
c、性能提升? -- java6 以后的版本已经没有性能优势了
d、读写分离
显式锁里面,有专门的 ReentrantReadWriteLock 读写分离锁。好吧,这里又看到类似数据库的设计思路了。
读锁可以多线程同时进入,是共享的。写锁只能单线程进入,是排他的
e、公平
初始化 ReentrantLock 时,可以指定是否是公平的。何解? 因为线程调度唤醒线程时,是随机的,有可能先来的线程未获取到,这就不公平了。指定公平性,可以按等待顺序获取锁
有啥缺点?
每次都需要显式手工finnaly 关闭
内部锁 or 显式锁?
如何选择两者。jdk 推荐 默认使用 内部锁,因为简单、方便;如果有特殊需求的,如上显示锁描述的好处。
III、死锁
a、出现死锁的条件:
互斥 - 资源只能被一个线程占有
请求与保持 - 占住的不想放,想要的非要
不可剥夺 - 占住的不能抢走
循环等待 - 只要有人放手就解开了,就是没人放手
b、如果预防死锁?
排好队 - 如果非要获取多个锁,那所有的获取锁方式都要 按照一定顺序
中断(抢夺) - 显式锁可以中断某个锁
超时(放弃) - 显式锁可以设置超时放弃锁
c、如何检测死锁?
打印堆栈
win( Ctrl + Break ) linux (kill -3)
Jmap+jhat+jstatck
监视工具
JConsole | Visual VM
IV、使用锁的建议
a、如何选择内部锁和显式锁,默认内部锁,特殊情况比如定时、中断使用显式。(参见上文描述)
b、尽量减少同步的范围、 不要在 方法上加锁,而是在方法内部
c、多个锁按顺序获取,可以把获取的地方统一隔离,放在一起,通过对象hash值identityHashCode安排顺序
2、线程安全与对象(非阻塞式实现)
同步是 保护 对象(数据) 的一种方式,除了用锁这种同步方式,还有其他效率更高的方式。
I、volatile - 共享变量
系统会监视这个变量,对它的操作不会和其他内存操作重排序,也就是说对volatile 变量的操作当成是原子操作;所有的修改其他线程都可见,也就不需要加锁同步了。
volatile int a;
对volatile的使用有一些限制,主要是自增类的操作不能使用(比如i++)、只能单独使用、不能组合使用
II、CAS 与原子类型 -- CAS = 比较并设置CompareAndSet
实现思路很简单,就是当修改的值和 现在内存的值比较,如果一致了,就修改生效;如果不一致就一直循环检查,直到生效为止。
public final long getAndDecrement() {
while (true) {
long current = get();
long next = current - 1;
if (compareAndSet(current, next))
return current;
}
}
volatile和 CAS 都是CPU层面提供的底层技术,不是语言本身提供的技术,其中CAS的实现还需要调用 JNI 本地化实现。
volatile + CAS 构成了java的原子类型库 java\util\concurrent\atomic
a、基本类型的原子包装器 AtomicInteger | AtomicLong | AtomicBoolean
b、数组类型 AtomicIntegerArray ...
c、对象类型 AtomicReference (包装一个类) | 域更新器 AtomicReferenceFieldUpdater(利用反射,更细粒度的操作)
-- java自身的cocurrent类库也有使用原子类型的,比如ConcurrentLinkedQueue 就使用了 域更新器实现 具体链表 字段的更新
该如何选择 锁 or 原子类型+volatile?
一般需要用在控制变量、标记值等等应用场景应该选用 原子类型这些非阻塞的同步方式;如果有大段代码复杂逻辑需要保护,则采用同步方式。
III、封闭线程Ad-hoc 与ThreadLocal
把数据封闭在线程内部,就不存在共享状态的问题,也就是线程安全了,最典型的应用就是 ThreadLocal,网上对ThreadLocal很多理解都有误区
a、每个线程绑定一个 ThreadLocal.ThreadLocalMap 对象,线程内部的局部变量。
b、存放对象 set(T) 实际上就是 把 ThreadLocal 对象作为key ,T 作为value put 到 线程的map对象中
c、获取对象 get,实际上就是把 this自身的对象 作为key 从 线程map中取出数据
几个关键问题
a、多个线程的 ThreadLocal 对象虽然是同一个,但Map 是线程自己的,所以在调用 set get 时,获取的是自己Map存储的 value,实现了数据封闭
b、设置的value 必须是新的对象或者基本类型,否则如果设置的 是同一个 对象的引用的话,取出来还是同一个引用,就达到不到隔离的效果。
c、如果需要 在所有线程共享一个初始数据,可以 继承ThreadLocal,扩展实现 initialValue 方法,这样所有线程都能看到初始化数据
private static ThreadLocal> threadLog = new ThreadLocal>(){
protected List initialValue(){
return new ArrayList();
}
};
总结:ThreadLocal本质上是一个 空间换时间的 实现,通过在多个线程中 存储不同的拷贝,实现了线程安全
3、线程生命周期
I、五种状态
新建 - 两种方法创建多线程:继承java.lang.Thread,覆盖run方法 | 实现java.lang.Runnable接口,实现run方法
就绪 - Thread.start - 对应run 方法
运行 - 线程抢到CPU开始运行
阻塞 - 遇到 锁、休眠、IO等情况线程处于阻塞态 (等待、休眠)
终止 - 正常结束,被中断 Thread.interrrupt
II、改变状态
a、sleep - 当前线程休眠
b、yield - 当前线程让渡CPU给同级别的线程
c、wait - 阻塞等待其他资源的锁
d、join - 傻傻等其他线程结束后再结束
sleep vs wait
sleep 只是线程休眠,并不会释放占有的资源,wait会释放所有的资源;wait 是资源上的方法,而休眠是线程方法
sleep vs yield
sleep 线程切入休眠状态,等待一定时间才唤醒抢占CPU;yield 只是暂时让渡CPU
在构造函数中启动线程是危险的,因为可能构造函数还未构造完成,如果回调或者其他就会出现异常。避免在构造函数启动线程。
III、线程优先级
调度器根据线程优先级作为参考,决定多长时间调度该线程 ;Thread类中定义了默认3个级别 1 MIN ,5 NORMAL ,10 MAX
4、线程间通讯
严格来说 join 方法也算是一种线程间通讯,一个线程等待另一个线程处理结束,才继续。
线程间通讯,不像进程间通讯Socket、共享内存、系统级别信号量,线程间因为共享资源,所以通讯方式大都是通过共享的资源。
传统方式
I、经典通讯方式 wait/notify/notifyAll
使用wait/notify的正确姿势
线程A
synchronized(list){
while(list.size == 0)
list.wait();
}
线程B
synchronized(list){
list.add(xxx);
list.notifyAll();
}
a、wait 方法一般都要写在synchronized 的循环里
b、synchronized 锁住的对象和 wait的对象一般是同一个对象
c、使用notifyAll 而不是 notify
wait 有无参数? 有参数只是多了一个唤醒条件,唤醒后和wait一样继续抢占资源
notify 和 notifyAll ? notifyAll唤醒所有线程,只有一个线程抢到,notify只随机唤醒一个线程
线程屏障
II、闭锁 CountDownLatch 和 关卡 CyclicBarrier
a、从功能上看,两者很类似,都是用作阻塞的线程控制。
b、从语义上看,略有区别,闭锁等待的是事件、关卡等待的是线程,两者都是在await等待;闭锁通过 countDown 这个事件、触发等待的线程,而关卡需要互相等待、等所有线程处理完成后、触发等待的线程。
c、更大的区别则是:CountDownLatch的计数器只能使用一次。而CyclicBarrier的计数器可以使用reset() 方法重置。CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得阻塞的线程数量。isBroken方法用来知道阻塞的线程是否被中断。
III、信号量 Semaphore
和操作系统的信号量概念一样,允许一个资源被被操作使用的次数
Semaphore s = new Semaphore(100);
申请 s.acquire()
释放 s.release()
以上三种使用方法,wait/闭锁/信号量 某种程度上都属于资源申请;而闭锁,则是单纯的线程等待控制。下面介绍另一种方式,数据交换.
IV、Exchanger 数据交换
只能两个线程进行数据交换
Exchange exc = new Exchange();
A线程执行 IN = exc.exchange(OUT) ,此时A线程阻塞
B线程执行 OUT = exc.exchange(IN) ,此时A、线程解除阻塞,双方数据发生了交换
5、线程池
线程池一般有两种实现方式:一种是 任务池,各个线程循环去池里取任务;一种是 线程池,提交任务后,从池里取出空闲线程处理。
I、ThreadPoolExecutor
任务队列BlockingQueue - 使用了阻塞队列作为提交的任务,接受继承 Runnable 的任务
线程池HashSet - 最少corePoolSize 最大 maximumPoolSize 空闲时间 keepAliveTime ;这三个参数决定 线程数目在 最小和最大之间弹性处理(当 > 最少,且 > 空闲时间,释放一部分空闲线程)
线程工厂ThreadFactory - 提供创建线程的方法 (还是为了扩展性, 给应用更多的创建线程前后的处理工作),Executors框架提供了一些线程工厂的实现
工作线程Worker - 实现了Runnable ,绑定了一个线程(ThreadFactory 创建线程时,传入),执行提交的任务外,还有beforeExecute afterExecute 的钩子函数
队列管理
ThreadPoolExecutor 的队列管理是开放出来由外界提供 BlockingQueue的实现,具体参见下面 并发框架的 Executors 提供的一些默认队列
饱和策略
当有限队列充满后(即无法把任务提交到 BlockingQueue中去),就要有饱和策略。ThreadPoolExecutor的饱和策略,可以通过调用setRejectedExecutionHandler来修改
中止(abort)
遗弃(discard)
遗弃最旧的(discard-oldest)
调用者运行(caller-runs)
II、Executors/ExecutorServie(Executor) 框架
a、Executor 框架 -- 其实就是一个 Command模式
接口非常简单,把一个Runnable线程作为参数
void execute(Runnable command);
一般情况下,不需要自己定义一个线程,而是提交任务给Executor框架
b、ExecutorService 框架 -- Active Object 模式(主动对象模式)
调用线程 方法的调用 和被调用线程的 方法执行 进行解耦
被调用线程内部绑定一个 线程对象,即ThredPoolExecutor 的Worker,自己管理线程的状态
获取被调用线程的引用
ExecutorService service = Executors.newFixedThreadPool(NTHREADS);
提交
service.execute(task);
service.submit(task);
还可以管理executor的生命周期
service.shutdown();
c、队列
使用无限队列 LinkedBlockingQueue (对于ThreadPoolExecutor 来说这些队列都是有限的,可以把指定容量的队列给他,但Executors 默认的都是无限未指定容量的队列)
newFixedThreadPool -- 定长的线程池
newSingleThreadExecutor -- 单线程化的 executor
使用同步移交队列 SynchronousQueue (没有内存的队列,直接把队列从 提交者 给 消费者,据说等待性能更好?少了数据结构的操作开销?)
newCachedThreadPool -- 可缓存的线程池
周期线程池
newScheduledThreadPool - 定长线程池,支持定时和周期的任务执行
Timer和TimerTask 的改进版本
原先的Timer存在一些缺陷,Timer调度是基于绝对时间,对系统时间改变很敏感。(ScheduledThreadPoolExecutor 支持相对时间);所有timer任务在一个线程执行,如果耗时操作导致其他任务延迟,会出现一些不可预料的错误Timer中如果timetask执行失败,整个timer的任务都失败
// 定时器
public class Timer {
public static final int TIME_DEFAULT = 15 * 60;
protected ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
protected int period;
protected Runnable runnable;
public Timer(Runnable runnable){
this(TIME_DEFAULT, runnable);
}
public Timer(int period, Runnable runnable){
this.period = period;
this.runnable = runnable;
}
public void start(){
service.scheduleAtFixedRate(runnable, 0, period, TimeUnit.SECONDS);
}
public void destroy(){
service.shutdown();
}
// 绝对定时器,从下一个粒度整点开始
public static class AbsoluteTimer extends Timer{
public AbsoluteTimer(Runnable runnable) {
super(runnable);
}
public AbsoluteTimer(int period, Runnable runnable) {
super(period, runnable);
}
@Override
public void start(){
service.scheduleAtFixedRate(runnable, Utils.getNextWholeTime(new Date(), period), period, TimeUnit.SECONDS);
}
}
}
d、线程工厂
DefaultThreadFactory
PrivilegedThreadFactory
III、任务管理
任务的执行,很容易操作,直接Thread 的start,或者提交到 任务执行框架 Executor执行
任务取消和停止,如何控制? 怎么知道任务执行的结果? 怎么隔离一个耗时任务 ?
取消的方法一:通过共享一个变量标志来进行任务的协调。- cancellation requested
中断的方法二:通过interrupt 中断方法,取消一个线程的执行
获取返回值
通过 局部变量,或者返回函数 -- 不可行,因为主线程和子线程并没有先后顺序的约束,受限于CPU、IO等复杂因素,结果是未知的
轮询 -- 可行,采用类似阻塞方式,不停的去检查是否完成,但多做了很多无用功,占用了CPU
回调 -- 正解
Future/FutureTask + Callable - 增强版的任务执行
Callable 提供 Runnable的增强功能 可以支持返回值
非阻塞的模型:Promise,Future 和 Callback 典型架构,Future 是异步执行方式
下面是一个简版的 任务执行框架,可以把任务提交给Executors框架,通过 Future 来
// 执行任务
static interface Function{
Out action() throws Exception ;
}
// 提交任务框架, 任何一次性执行的, 有可能延迟的都可以使用
private static Out submit(final Function in){
final ExecutorService exec = Executors.newSingleThreadExecutor();
Future future = exec.submit(new Callable(){
@Override
public Out call() throws Exception {
return in.action();
}
});
Out result = null;
try {
result = future.get(STREAM_TIMEOUT_PER_TIME, TimeUnit.SECONDS);
} catch (Exception e) {
log.error(String.format("Task Thread %d Get submit error : %s ", Thread.currentThread().getId(), e.getMessage()), e.getCause());
}finally{
exec.shutdown();
}
return result;
}
Fork/Join 框架?
IV、CompletionService:Executor 和 BlockingQueue的组合
整合了 Executor 和 BlockingQueue的功能。调用时,把Callable任务提交给他,同时像quue一样取出take和poll方法,返回的是一个Future
CompletionService 和 直接调用ExecutorService 区别在于,如果ExecutorService 启动多个任务,获取返回值时,多个Future需要管理起来,现在CompletionService直接用BlockingQueque对Future进行管理。
V、其他
线程增加 UncaughtExceptionHandler 捕获未知的线程异常,通知框架和应用
6、其他
JVM线程关闭 时,会有一个钩子函数
Runtime.getRuntime().addShutdownHook(new Thread(){
public void run(){
// 在这里做一些清理的工作。
}
});
守护线程 - 精灵线程 (dameon)
后台服务线程,和JVM线程同在
用户线程和守护线程区别不大,用户线程随时可以退出,守护线程一直到虚拟机的生命周期。 没有用户线程,守护线程和JVM一起退出。
一个完整的实际使用案例 - 这个可以用 JDK7的 Fork/Join 改写
// 下载调度线程
public class DownLoadTask {
public int MAX_THREAD ;
public int MAX_RECORD_PER_THREAD ;
public static final int DEFAULT_TIME_OUT = 120000;
private FileWriter writer;
private List list;
private ExecutorService downPool ;
private PriorityBlockingQueue container;
private CyclicBarrier oneTimes;
private AtomicInteger interrupt = new AtomicInteger(0);
private AtomicInteger timeout = new AtomicInteger(0);
private static Logger log = Logger.getLogger(DownLoadTask.class);
public DownLoadTask(List list, FileWriter writer){
this.list = list;
this.writer = writer;
}
public void setMAX_THREAD(int MAX_THREAD) {
this.MAX_THREAD = MAX_THREAD;
}
public void setMAX_RECORD_PER_THREAD(int MAX_RECORD_PER_THREAD) {
this.MAX_RECORD_PER_THREAD = MAX_RECORD_PER_THREAD;
}
public void run(){
int ONE_SORT_TOTAL_NUM = MAX_THREAD * MAX_RECORD_PER_THREAD;
downPool = Executors.newFixedThreadPool(MAX_THREAD);
container = new PriorityBlockingQueue(ONE_SORT_TOTAL_NUM);
int recordsSize = list.size();
int repeatCount = recordsSize / ONE_SORT_TOTAL_NUM;
int lastRecords = recordsSize % ONE_SORT_TOTAL_NUM;
int lastRecordsWorkerNum = (int) ceil((double)lastRecords / (double) MAX_RECORD_PER_THREAD);
int lastRecordsLastWorkNum = lastRecords % MAX_RECORD_PER_THREAD;
log.info("repeatCount size:"+ repeatCount);
log.info("lastRecords size:"+ lastRecords);
log.info("lastRecordsWorkerNum:"+ lastRecordsWorkerNum);
log.info("lastRecordsLastWorkNum:"+ lastRecordsLastWorkNum);
if(repeatCount==0){
oneTimes = new CyclicBarrier(lastRecordsWorkerNum + 1, new Combine());
}else if(repeatCount > 0){
oneTimes = new CyclicBarrier(MAX_THREAD + 1, new Combine());
}
int i = 0;
int count = 0;
int index = 0;
List listfile = new ArrayList();
for(int j=0;j
SortFileData sortfile = new SortFileData();
sortfile.setFilestruct(list.get(j));
sortfile.setIndex(j);
listfile.add(sortfile);
i++;
if(listfile.size()== MAX_RECORD_PER_THREAD){
downPool.submit(new Worker(index, listfile));
index++;
if(i % ONE_SORT_TOTAL_NUM == 0 ){
count++;
waitForComplete();
if(count == repeatCount && lastRecords != 0){
oneTimes = new CyclicBarrier(lastRecordsWorkerNum + 1, new Combine());
}
}
listfile = new ArrayList();
}
}
if (lastRecordsLastWorkNum>0){
downPool.submit(new Worker(index, listfile));
}
if(lastRecordsWorkerNum != 0){
waitForComplete();
}
log.info(String.format("result [interrup = %d, timeout = %d]", interrupt.get(), timeout.get()));
}
// 线程阻塞
public void waitForComplete(){
try {
oneTimes.await(DEFAULT_TIME_OUT, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("Thread interrupted.", e.getCause());
interrupt.incrementAndGet();
} catch (BrokenBarrierException e) {
log.info("CyclicBarrier reset", e.getCause());
} catch (TimeoutException e) {
log.error("Thread timeout.", e.getCause());
timeout.incrementAndGet();
}
}
// 用于排序的查询结果
static class SortFileData implements Comparable{
int index;
byte[] data;
private FileStruct filestruct;
public void setIndex(int index) {
this.index = index;
}
public void setData(byte[] data) {
this.data = data;
}
public FileStruct getFilestruct() {
return filestruct;
}
public int getIndex(){
return index;
}
public byte[] getData(){
return data;
}
public void setFilestruct(FileStruct filestruct) {
this.filestruct = filestruct;
}
@Override
public int compareTo(SortFileData o) {
SortFileData that = o;
if(this.getIndex() > that.getIndex()) return 1;
else if(this.getIndex() < that.getIndex()) return -1;
return 0;
}
}
//工作线程
class Worker implements Runnable{
private FileStruct struct;
private int index;
private List listfile;
private DsuQuery query;
Worker(int index, List listfile){
this.index = index;
this.listfile = listfile;
}
public void setQuery(DsuQuery query) {
this.query = query;
}
@Override
public void run() {
query = new DsuQuery();
for (SortFileData file:listfile) {
byte[] data = Decode.decode(query.queryOnce(file.getFilestruct()),file.getFilestruct());
//byte[] data =new byte[0];
file.setData(data);
container.offer(file);
}
waitForComplete();
}
}
//合并线程,取出数据,并合并写入文件
class Combine implements Runnable{
@Override
public void run() {
log.info("container size is "+ container.size());
while(!container.isEmpty()){
SortFileData result = container.poll();
writer.write(result.data);
//log.info("start write size is "+ result.data.length);
}
oneTimes.reset();
}
}
public static void main(String[] args) throws IOException {
List list = new ArrayList();
for(int i = 0 ; i < 14; i++){
FileStruct f = new FileStruct();
f.fileName = "test";
f.start = i;
f.length = 1;
f.timestab=i;
list.add(f);
}
LocalFileWriter filewriter = new LocalFileWriter(1);
DownLoadTask down=new DownLoadTask(list,filewriter);
down.run();
filewriter.close();
}
}
网友评论