java线程状态
-
初始(NEW)
新创建了一个线程对象,但还没有调用start()方法 -
运行(RUNNABLE)
Java线程中将就绪(ready)和运行中(running)两种状态笼统的称为“运行”。
线程对象创建后,其他线程(比如main线程)调用了该对象的start()方法。该状态的线程位于可运行线程池中,等待被线程调度选中,获取CPU的使用权,此时处于就绪状态(ready)。就绪状态的线程在获得CPU时间片后变为运行中状态(running) -
阻塞(BLOCKED)
表示线程阻塞于锁 -
等待(WAITING)
进入该状态的线程需要等待其他线程做出一些特定动作(通知或中断) -
超时等待(TIMED_WAITING)
该状态不同于WAITING,它可以在指定的时间后自行返回 -
终止(TERMINATED)
表示该线程已经执行完毕
实现多线程
有三种方式实现多线程,分别是继承Thread类、实现Runnable接口、实现Callable接口;
因为java 类不支持多继承,继承Thread类之后就不能继承其它类了,因此继承Thread类方式比较少用
实现Runnable接口方式,实现Runnable接口的类可以继承其它类,且可以在多个线程共享
实现Callable接口方式,call方法可以返回值和抛出异常
- 继承Thread类
public class MyThread extends Thread {
public MyThread() {
}
public void run() {
for(int i=0;i<10;i++) {
System.out.println(Thread.currentThread()+":"+i);
}
}
public static void main(String[] args) {
MyThread mThread1=new MyThread();
MyThread mThread2=new MyThread();
MyThread myThread3=new MyThread();
mThread1.start();
mThread2.start();
myThread3.start();
}
}
- 实现Runnable接口
public class MyTarget implements Runnable{
public static int count=20;
public void run() {
while(count>0) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"-当前剩余票数:"+count--);
}
}
public static void main(String[] args) {
MyThread target=new MyTarget();
Thread mThread1=new Thread(target,"线程1");
Thread mThread2=new Thread(target,"线程2");
Thread mThread3=new Thread(target,"线程3");
mThread1.start();
mThread2.start();
myThread3.start();
}
}
- 实现Callable接口
public class MyTarget implements Callable<String> {
private int count = 20;
@Override
public String call() throws Exception {
for (int i = count; i > 0; i--) {
// Thread.yield();
System.out.println(Thread.currentThread().getName()+"当前票数:" + i);
}
return "sale out";
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
Callable<String> callable =new MyTarget();
FutureTask <String>futureTask=new FutureTask<>(callable);
Thread mThread=new Thread(futureTask);
Thread mThread2=new Thread(futureTask);
Thread mThread3=new Thread(futureTask);
// mThread.setName("hhh");
mThread.start();
mThread2.start();
mThread3.start();
System.out.println(futureTask.get());
}
}
线程常用方法
-
obj.wait
当前线程调用对象的wait()方法,当前线程释放对象锁,进入等待队列。依靠notify()/notifyAll()唤醒或者wait(long timeout) timeout时间到自动唤醒
只能用在sychonize同步方法或sychonize同步代码块中 -
obj.notify()/obj.notfiyAll
唤醒在此对象监视器上等待的单个线程,选择是任意性的。notifyAll()唤醒在此对象监视器上等待的所有线程。
只能用在sychonize同步方法或sychonize同步代码块中 -
Thread.sleep(long millis)
一定是当前线程调用此方法,当前线程进入TIMED_WAITING状态,但不释放对象锁,millis后线程自动苏醒进入就绪状态。作用:给其它线程执行机会的最佳方式 -
Thread.yield()
一定是当前线程调用此方法,当前线程放弃获取的CPU时间片,但不释放锁资源,由运行状态变为就绪状态,让OS再次选择线程。作用:让相同优先级的线程轮流执行,但并不保证一定会轮流执行。实际中无法保证yield()达到让步目的,因为让步的线程还有可能被线程调度程序再次选中。Thread.yield()不会导致阻塞。该方法与sleep()类似,只是不能由用户指定暂停多长时间
yield方法只是将Running状态转变为Runnable状态
很少有场景要用到该方法,主要使用的地方是调试和测试 -
thread.join()/thread.join(long millis)
当前线程里调用其它线程T的join方法,当前线程进入WAITING/TIMED_WAITING状态,当前线程不会释放已经持有的对象锁。线程T执行完毕或者millis时间到,当前线程一般情况下进入RUNNABLE状态,也有可能进入BLOCKED状态(因为join是基于wait实现的) -
LockSupport.park()/LockSupport.parkNanos(long nanos)/LockSupport.parkUntil(long deadlines)
当前线程进入WAITING/TIMED_WAITING状态。对比wait方法,不需要获得锁就可以让线程进入WAITING/TIMED_WAITING状态,需要通过LockSupport.unpark(Thread thread)唤醒。
当调用LockSupport.park时,表示当前线程将会被阻塞,直到获得许可
当调用LockSupport.unpark时,必须把等待获得许可的线程作为参数进行传递,好让此线程继续运行
有两种方式可以让LockSupport.park解除阻塞:
(1)调用LockSupport.unpark,在LockSupport.park执行前或后调用都可以
(2)中断调用LockSupport.park的线程
(3)LockSupport.parkNanos(long nanos)/LockSupport.parkUntil(long deadlines),设置的时间到了
而obj.notify方法必须在obj.await方法后调用才能唤醒执行obj.await方法的线程 -
condition.await()/condition.signal()/condition.signalAll()
-
thread.interrupt()/thread.interrupted()
thread.interrupt()会设置线程的中断标志
thread.interrupted()获取线程是否处于中断 -
thread.Daemon()
守护线程是程序运行时在后台提供服务的线程,不属于程序中不可或缺的部分。
当所有非守护线程结束时,程序也就终止,同时会杀死所有守护线程。
main() 属于非守护线程。
线程池
public class ThreadPoolExecutor extends AbstractExecutorService {
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
}
-
corePoolSize
线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize, 即使有其他空闲线程能够执行新来的任务, 也会继续创建线程;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。 -
workQueue
用来保存等待被执行的任务的阻塞队列. 在JDK中提供了如下阻塞队列-
ArrayBlockingQueue
: 基于数组结构的有界阻塞队列,按FIFO排序任务; -
LinkedBlockingQueue
: 基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQueue; -
SynchronousQueue
: 一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue; -
PriorityBlockingQueue
: 具有优先级的无界阻塞队列;
-
LinkedBlockingQueue
比ArrayBlockingQueue
在插入删除节点性能方面更优,但是二者在put()
, take()
任务的时均需要加锁,SynchronousQueue
使用无锁算法,根据节点的状态判断执行,而不需要用到锁,其核心是Transfer.transfer()
.
-
maximumPoolSize
线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize;当阻塞队列是无界队列, 则maximumPoolSize则不起作用, 因为无法提交至核心线程池的线程会一直持续地放入workQueue. -
keepAliveTime
线程空闲时的存活时间,即当线程没有任务执行时,该线程继续存活的时间;默认情况下,该参数只在线程数大于corePoolSize时才有用, 超过这个时间的空闲线程将被终止; -
unit
keepAliveTime的单位 -
threadFactory
创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名。默认为DefaultThreadFactory -
handler
线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:-
AbortPolicy
: 直接抛出异常,默认策略; -
CallerRunsPolicy
: 用调用者所在的线程来执行任务; -
DiscardOldestPolicy
: 丢弃阻塞队列中靠最前的任务,并执行当前任务; -
DiscardPolicy
: 直接丢弃任务;
-
当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务
调用线程池的execute方法的执行过程:
- 首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。
- 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。
- 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
- 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
- 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
线程池运行状态
线程池内部使用一个变量维护两个值:运行状态(runState)和线程数量 (workerCount),变量的高3位保存runState,低29位保存workerCount
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
-
RUNNING
高3位为111
能接收新提交的任务,并且也能处理阻塞队列中的任务
-
SHUTDOWN
高3位为000
关闭状态,不再接收新提交的任务,但会继续处理阻塞队列中已保存的任务 -
STOP
高3位为001
不能接收新任务,也不处理队列中的任务,会中断正在处理任务的线程 -
TIDYING
高3位为010
所有任务都已终止,workCount(有效线程数)为0 -
TERMINATED
高3位为011
在terminate()方法执行完后进入该状态
线程池配置
- CPU密集型: 尽可能少的线程,Ncpu+1
- IO密集型: 尽可能多的线程, Ncpu*2,比如数据库连接池
- 混合型: CPU密集型的任务与IO密集型任务的执行时间差别较小,拆分为两个线程池;否则没有必要拆分
- 最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目
ThreadLocal
每个线程都维护了一个threadLocals属性,该属性的类型是ThreadLocal.ThreadLocalMap,为一个类似Map的数据结构,ThreadLocalMap的key为ThreadLocal对象,value为ThreadLocal变量设置的value。
ThreadLocal在每个线程中对该变量会创建一个副本,即每个线程内部都会有一个该变量,且在线程内部任何地方都可以使用,线程之间互不影响,这样一来就不存在线程安全问题。
如果需要对创建ThreadLocal对象对ThreadLocal的value进行初始化,需要继承ThreadLocal类重写initialValue方法。
ThreadLocalMap作为一个HashMap和java.util.HashMap的实现是不同的。对于java.util.HashMap使用的是链表法来处理冲突,而对于ThreadLocalMap,它使用的是简单的线性探测法,如果发生了元素冲突,那么就使用下一个槽位存放。
ThreadLocalMap 中使用的 key 为 ThreadLocal 的弱引用,弱引用的特点是,如果这个对象只存在弱引用,那么在下一次垃圾回收的时候必然会被清理掉。
所以如果 ThreadLocal 没有被外部强引用的情况下,在垃圾回收的时候会被清理掉的,这样一来 ThreadLocalMap中使用这个 ThreadLocal 的 key 也会被清理掉。但是,value 是强引用,不会被清理,这样一来就会出现 key 为 null 的 value。
需要手动调用ThreadLocal的remove方法是否,否则在线程池场景使用ThreadLocal,由于线程是复用的一直存活,导致通过Thread->ThreadLocalMap->ThreadLocal设置的value强引用一直存在,可能出现内存泄漏
ThreadLocal变量定义为private static final,可以保持对ThreadLocal变量的强引用,避免被在ThreadLocalMap 中被回收,而是通过自己手动从ThreadLocalMap中remove
public class Thread implements Runnable {
/* ThreadLocal values pertaining to this thread. This map is maintained
* by the ThreadLocal class. */
ThreadLocal.ThreadLocalMap threadLocals = null;
}
public class ThreadLocal<T> {
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}
private T setInitialValue() {
T value = initialValue();
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
map.set(this, value);
} else {
createMap(t, value);
}
if (this instanceof TerminatingThreadLocal) {
TerminatingThreadLocal.register((TerminatingThreadLocal<?>) this);
}
return value;
}
protected T initialValue() {
return null;
}
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
map.set(this, value);
} else {
createMap(t, value);
}
}
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
public static <S> ThreadLocal<S> withInitial(Supplier<? extends S> supplier) {
return new SuppliedThreadLocal<>(supplier);
}
static final class SuppliedThreadLocal<T> extends ThreadLocal<T> {
private final Supplier<? extends T> supplier;
SuppliedThreadLocal(Supplier<? extends T> supplier) {
this.supplier = Objects.requireNonNull(supplier);
}
@Override
protected T initialValue() {
return supplier.get();
}
}
}
public class DateUtils {
public static final ThreadLocal<DateFormat> df = new ThreadLocal<DateFormat>(){
@Override
protected DateFormat initialValue() {
return new SimpleDateFormat("yyyy-MM-dd");
}
};
}
private ThreadLocal<Integer> localInt = ThreadLocal.withInitial(() -> 6);
InheritableThreadLocal
主线程开了一个子线程,我们希望在子线程中可以访问主线程中的ThreadLocal对象,也就是说有些数据需要进行父子线程间的传递
public static void main(String[] args) {
ThreadLocal threadLocal = new ThreadLocal();
IntStream.range(0,10).forEach(i -> {
//每个线程的序列号,希望在子线程中能够拿到
threadLocal.set(i);
//这里来了一个子线程,我们希望可以访问上面的threadLocal
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + ":" + threadLocal.get());
}).start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
由于每个线程都自己的ThreadLocalMap类属性,线程之间的ThreadLocal变量是隔离的
如果需要在子线程访问到父线程设置的ThreadLoca的值,需要使用InheritableThreadLocal(ThreadLocal的子类)
- 变量的传递是发生在线程创建的时候,如果不是新建线程,而是用了线程池里的线程,就不灵了
- 变量的赋值就是从主线程的map复制到子线程,它们的value是同一个对象,如果这个对象本身不是线程安全的,那么就会有线程安全问题
InheritableThreadLocal使用Thread类定义另外一个Map
public class InheritableThreadLocal<T> extends ThreadLocal<T> {
protected T childValue(T parentValue) {
return parentValue;
}
ThreadLocalMap getMap(Thread t) {
return t.inheritableThreadLocals;
}
void createMap(Thread t, T firstValue) {
t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
}
}
public class Thread implements Runnable {
/* ThreadLocal values pertaining to this thread. This map is maintained
* by the ThreadLocal class. */
ThreadLocal.ThreadLocalMap threadLocals = null;
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
private Thread(ThreadGroup g, Runnable target, String name,
long stackSize, AccessControlContext acc,
boolean inheritThreadLocals) {
// 省略一段代码
if (inheritThreadLocals && parent.inheritableThreadLocals != null)
//拷贝父类的Map
this.inheritableThreadLocals =
ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
/* Stash the specified stack size in case the VM cares */
this.stackSize = stackSize;
/* Set thread ID */
this.tid = nextThreadID();
}
}
public class ThreadLocal<T> {
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) {
return new ThreadLocalMap(parentMap);
}
private ThreadLocalMap(ThreadLocalMap parentMap) {
Entry[] parentTable = parentMap.table;
int len = parentTable.length;
setThreshold(len);
table = new Entry[len];
for (Entry e : parentTable) {
if (e != null) {
@SuppressWarnings("unchecked")
ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
if (key != null) {
Object value = key.childValue(e.value);
Entry c = new Entry(key, value);
int h = key.threadLocalHashCode & (len - 1);
while (table[h] != null)
h = nextIndex(h, len);
table[h] = c;
size++;
}
}
}
}
}
网友评论