一、Java线程状态
-
NEW
初始状态,线程被构建,但是还没有调用start
方法 -
RUNNABLE
运行状态,Java线程将操作系统中的 就绪 和 运行 两种状态笼统地称作 “运行中” -
BLOCK
阻塞状态,表示线程阻塞于锁。 -
WAITING
等待状态,表示线程进入等待状态,进入该状态表示当前线程需要等待其它线程做出一些动作(通知或中断) -
TIMED_WAITING
超时等待状态,该状态不同于WAITING,它是可以在指定的时间自行返回的 -
TERMINATED
终止状态,表示当前线程已执行完毕
二、等待/通知机制
一个线程修改了一个对象的值,而另一个线程感知到了变化。然后进行相应的操作,整个过程开始于一个线程,而最终执行又是另一个线程。前者是生产者,后者是消费者。Java中如何实现类似的功能呢?
- notify 通知一个在对象上等待的线程,使其从wait方法返回,而返回的前提是 该线程获取到了对象的锁。
- notifyAll 通知所有等待在该对象上的线程
- wait 调用该方法的线程进入WAITING 状态,只有等待另外线程的通知或被中断才能返回,需要注意,调用wait 方法后,会释放对象的锁。
- wait(long) 超时等待一段时间,如果没有通知就超时返回
![](https://img.haomeiwen.com/i18224982/d92e52f217d3ddbc.png)
一个等待/通知的例子:
/**
* 1、使用wait 、 notify 和 notifyAll 需要现对对象加锁。
* 2、调用wait方法后,编程状态由running 转成 waiting, 放置到等待队列中。
* 3、调用notify/notifyAll后,线程依旧不会从wait返回。需要调用notify、notifyAll的线程释放锁之后,等待
* 线程才有机会从wait返回。
* 4、notify()方法将等待队列中一个等待线程从等待队列中移动到同步队列中,被移动的线程状态从waiting状态转为blocked状态(可通过 jps 和 jstack 命令查看。)
* 5、从wait方法返回的前提是获得了调用对象的锁。
*/
/**
* 必须先获取 某个对象的锁 ,之后才可以调用 这个对象的 等待方法
*/
var flag = true
val lock = java.lang.Object()
class Wait : Runnable{
override fun run() {
synchronized(lock){ //必须先获取锁
// 当条件不满足的时候,循环等待。
while (flag){
println("${Thread.currentThread().name}, flag is true. wait at ${SimpleDateFormat("HH:mm:ss").format(Date())}")
lock.wait() //必须先获取锁
}
//条件满足时继续工作
println("${Thread.currentThread().name}, flag is false. wait at ${SimpleDateFormat("HH:mm:ss").format(Date())}")
}
}
}
class Notify : Runnable{
override fun run() {
synchronized(lock){
println("${Thread.currentThread().name} hold lock. notify at ${SimpleDateFormat("HH:mm:ss").format(Date())}")
lock.notifyAll()
flag = false //设定条件满足
TimeUnit.SECONDS.sleep(5)
}
synchronized(lock){
println("${Thread.currentThread().name} hold lock again. sleep at ${SimpleDateFormat("HH:mm:ss").format(Date())}")
TimeUnit.SECONDS.sleep(5)
}
}
}
fun main() {
val waitThread = Thread(Wait(), "WaitThread")
waitThread.start()
TimeUnit.SECONDS.sleep(1)
val notifyThread = Thread(Notify(), "NotifyThread")
notifyThread.start()
}
运行结果:
WaitThread, flag is true. wait at 21:14:09
NotifyThread hold lock. notify at 21:14:10
NotifyThread hold lock again. sleep at 21:14:15
WaitThread, flag is false. wait at 21:14:20
这里可以提炼出 等待/通知 的经典范式,该范式分两部分,分别针对等待方(消费者)和 通知方(生产者)
等待方遵循如下原则:
- 获取对象的锁。
- 如果条件不满足,那么调用对象的wait 方法,通知后仍要检查条件。
- 条件满足则执行对应的逻辑。
伪代码如下:
synchronized(对象){
while(条件不满足){
对象.wait()
}
对应的处理逻辑
}
通知方遵循如下原则:
- 获得对象的锁
- 改变条件
- 通知所有等待在对象上的线程
伪代码如下:
synchronized(对象){
改变条件
对象.notifyAll()
}
三、ThreadLocal
ThreadLocal 即线程变量,是一个以ThreadLocal对象为键、任意对象为值的存储结构。这个结构被附带在线程上,也就是说一个线程可以根据一个ThreadLocal对象查询绑定在这个线程上的一个值。
通过set(T)方法来设置一个值,在当前线程下再通过get()方法获取到原先设置的值。
val THREAD_LOCAL = ThreadLocal<String>()
class Run : Runnable {
override fun run() {
THREAD_LOCAL.set("${Thread.currentThread().name}")
println(THREAD_LOCAL.get())
}
}
fun main() {
val t1 = Thread(Run(), "Thread#1")
val t2 = Thread(Run(), "Thread#2")
val t3 = Thread(Run(), "Thread#3")
val t4 = Thread(Run(), "Thread#4")
t1.start()
t2.start()
t3.start()
t4.start()
}
运行结果:
Thread#1
Thread#4
Thread#3
Thread#2
以上显示了ThreadLocal 的用法,可以给每一个线程设置一些私有的变量。(具体的实现自己去看代码)
// 设置
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
//get
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}
接下来看一重点看一下ThreadLocal的应用:Handler机制。熟悉这个机制的都知道,每一个线程会对应创建一个Looper对象,Looper对象同时管理一个 MessageQueue 消息队列。Looper通过不断轮训自己线程的消息队列获取消息,然后分发消息进行处理。是一个消息驱动的模型。 每一个线程都会对应一个Looper,就是使用ThreadLocal来实现的。
public final class Looper {
...
// sThreadLocal.get() will return null unless you've called prepare().
@UnsupportedAppUsage
static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();
}
Looper 中静态属性 sThreadLocal,就是为每个线程存储Looper而定义的。在非主线程使用Looper的时候需要自己的调用prepare方法,此时就会给当前线程创建一个Looper,存入线程私有变量的键值对里(key:sThreadLocal; value:Looper对象
)。
public static void prepare() {
prepare(true);
}
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));
}
四、线程池
简单的一个线程池的实现:
/**
* 线程池的本质就是使用了一个线程安全的工作队列,连接工作者线程 和 客户端线程,
* 客户端线程将任务放入工作队列后便返回,而工作中线程则不断的从工作队列上取出
* 工作并执行。当工作队列为空是,所有的工作者线程均等待在工作队列上,当有客户端
* 提交了一个任务后会通知任意一个工作者线程,随着大量的任务被提交,更多的工作者
* 线程会被唤醒。
*/
/**
* 线程池中的线程并不是越多越好,具体的数量要评估每个任务的处理时间,
* 以及当前计算机的处理能力和数量。使用的线程过少,无法发挥处理器性能。
* 使用的线程过多,会增加系统的无故开销。
*/
/**
* 线程池的抽象。
* Job :类型参数
*/
interface ThreadPool<Job : Runnable> {
//执行一个任务
fun execute(job: Job)
//关闭线程池
fun shutdown()
// 增加工作线程数
fun addWorkers(num: Int)
// 减少工作线程数
fun removeWorkers(num: Int)
// 得到等待执行的任务数量
fun getJobSize(): Int
}
class DefaultThreadPool<Job : Runnable> : ThreadPool<Job> {
companion object {
// 线程池中线程数最大数
private val MAX_WORKER_NUMS = 10
// 默认线程池中线程个数
private val DEFAULT_WORKER_NUMS = 5
// 线程池中线程最小线程个数
private val MIN_WORKER_NUMS = 1
}
//待执行工作列表
private val jobs = LinkedList<Job>()
//工作线程列表
private val workers = Collections.synchronizedList(ArrayList<Worker>())
//工作线程数量
private var workerNum = DEFAULT_WORKER_NUMS
//线程编号生成
private val threadNum = AtomicInteger()
constructor() : this(DEFAULT_WORKER_NUMS)
constructor(num: Int) {
workerNum = when {
num > MAX_WORKER_NUMS -> MAX_WORKER_NUMS
num < MIN_WORKER_NUMS -> MIN_WORKER_NUMS
else -> num
}
initWorkers(workerNum)
}
private fun initWorkers(nums: Int) {
for (i in 0 until nums) {
val worker = Worker()
val thread = Thread(worker, "ThreadPool-Worker-${threadNum.getAndIncrement()}")
workers.add(worker)
thread.start()
}
}
override fun execute(job: Job) {
synchronized(jobs) {
jobs.addLast(job)
(jobs as Object).notify() // java 和 kotlin 互操作部分
}
}
override fun shutdown() {
workers.forEach {
it.shutdown()
}
synchronized(jobs){
(jobs as Object).notifyAll()
}
}
@Synchronized
override fun addWorkers(num: Int) {
val numOfAdd = if (num + workerNum > MAX_WORKER_NUMS) {
MAX_WORKER_NUMS - workerNum
} else {
num
}
initWorkers(numOfAdd)
workerNum += numOfAdd
}
@Synchronized
override fun removeWorkers(num: Int) {
if (num > this.workerNum) {
throw IllegalArgumentException("beyond workNum")
}
for (i in 0 until num) {
val worker = workers.removeAt(0)
worker.shutdown()
}
workerNum -= num
}
override fun getJobSize(): Int {
return jobs.size
}
private inner class Worker : Runnable {
@Volatile
private var running = true
override fun run() {
while (running) {
var job: Job?
synchronized(jobs) {
while (jobs.isEmpty()) {
/**
* 如果线程处于WAITING状态,此时调用shutdown方法,由于jobs一直为空,所以并不会结束线程,所以这里做一个判断。
*/
if (!running) return
try {
(jobs as Object).wait() // java 和 kotlin 互操作部分
} catch (e: InterruptedException) {
//当所在的workder线程被中断以后,会抛出异常,但是中断标识也被清空,所以再次标识一下告知上层。
Thread.currentThread().interrupt()
return
}
}
job = jobs.removeFirst()
}
try {
job?.run()
} catch (e: Exception) {
// 忽略 job 还行过程中的异常
}
}
}
/**
* 当再从任务队列中取数据的时候会退出线程;但是如果此时线程时WAITING的状态,就永远无法退出了。
*/
fun shutdown() {
running = false
}
}
}
class Runner : Runnable {
override fun run() {
println(Thread.currentThread().name)
TimeUnit.SECONDS.sleep(1)
}
}
fun main() {
val executors = DefaultThreadPool<Runnable>()
for (i in 0 until 10) {
executors.execute(Runner())
}
TimeUnit.SECONDS.sleep(1)
executors.shutdown()
}
通过上边写的一个简陋的线程池可以知道一个线程池大概的实现原理。
五 、一个生产者/消费者模型的 的实现
// 仓库
public class Repository {
private static final int MAX_VALUE = 100;
private final LinkedList produceQueue = new LinkedList(); // 商品队列
// 生产
public void push(int num) {
if (num <= 0) throw new IllegalArgumentException("num 错误");
synchronized (produceQueue) {
while (produceQueue.size() + num > MAX_VALUE) {
try {
System.out.println(" 生产者线程进入阻塞 ");
produceQueue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
for (int i = 0; i < num; i++) {
produceQueue.offer(new Object()); // 队尾插入数据
}
System.out.println(" 生产者生产成功 ");
produceQueue.notifyAll();
}
}
// 消费
public void consume(int num) {
if (num <= 0) throw new IllegalArgumentException("num 错误");
synchronized (produceQueue) {
while (produceQueue.size() < num) {
try {
System.out.println(" 消费者线程进入阻塞 ");
produceQueue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
for (int i = 0; i < num; i++) {
produceQueue.poll(); // 从队头取数据
}
System.out.println(" 消费者消费成功 ");
produceQueue.notifyAll();
}
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
Repository repository = new Repository();
executorService.execute(new Producer(repository));
executorService.execute(new Producer(repository));
executorService.execute(new Producer(repository));
executorService.execute(new Producer(repository));
executorService.execute(new Producer(repository));
executorService.execute(new Producer(repository));
executorService.execute(new Producer(repository));
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
executorService.execute(new Consumer(repository));
executorService.execute(new Consumer(repository));
}
static class Producer implements Runnable {
Repository repository;
public Producer(Repository repository) {
this.repository = repository;
}
@Override
public void run() {
repository.push(20);
}
}
static class Consumer implements Runnable {
Repository repository;
public Consumer(Repository repository) {
this.repository = repository;
}
@Override
public void run() {
repository.consume(10);
}
}
}
网友评论