本文将一步步的探索Android中线程相关的知识,思路线索:单线程 - > 多线程 - > 线程池 - > 线程间通讯 - > 线程锁。
对应出现类的顺序
Thread -> Runnable -> Callable -> Future -> FutureTask -> Executor -> ExecutorService -> AbstractExecutorService -> ThreadPoolExecutor -> Executors -> AsyncTask -> Handler -> Looper -> MessageQueue -> Message -> Object -> Lock -> ReadWriteLock -> Condition -> Semaphore -> CountDownLatch
单线程
Thread、Runnable
使用多线程时,耗时操作放到run方法内,这样就简单的创建了一个子线程。
//方式一
new Thread() {
@Override
public void run() {
//do some thing
}
}.start();
//方式二
new Thread(new Runnable() {
@Override
public void run() {
//do some thing
}
}).start();
这两种方式有什么区别吗?
看看Thread源码
Thread自身实现了Runnable,而Runnable是一个接口,Thread的run方法做了什么呢?
//Thread源码
/**
* If this thread was constructed using a separate
* <code>Runnable</code> run object, then that
* <code>Runnable</code> object's <code>run</code> method is called;
* otherwise, this method does nothing and returns.
* <p>
* Subclasses of <code>Thread</code> should override this method.
*
* @see #start()
* @see #stop()
* @see #Thread(ThreadGroup, Runnable, String)
*/
@Override
public void run() {
if (target != null) {//此处的target为构造函数传入的Runnable对象
target.run();
}
}
方式一重写了此run方法,方法二则为target赋值。
thread对象通过start()方法将自身传递到native层开启了一个线程,最终调用thread对象的run方法。
如果使用thread.run(),则会在创建thread对象的线程内执行run(),并没有创建新的线程。
使用Thread+ Runnable的这种方法有个弊端,比如:当我数据库插入操作的时候,我想知道数据最后到底插入成功了没有我是无法被通知到的。
Runnable是个哑巴,就只会干活,干好干坏干完没有它是不会告诉你的。
Callable
Callable是在JDK1.5时才有的,它解决了任务完成后没有返回值的问题
public interface Callable<V> {
V call() throws Exception;
}
源码中包含的泛型即返回值类型,但使用的时候callable.call(),由于在子线程内执行耗时操作,并不能即时的得到响应返回数据,因此会造成阻塞。
如果这时能够知道任务结束了,就方便很多。当然源码做了!!
Future
同Callable也是在JDK1.5时才有的
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);//取消当前任务,分几种情况:
//当前任务等待 - 无视传入参数 取消任务
//当前任务执行 - mayInterruptIfRunning=true 中断任务
//当前任务执行 - mayInterruptIfRunning=false 不会中断任务
//当前任务完成此方法无效
boolean isCancelled();//当前任务是否被取消
boolean isDone();//当前任务是否完成
V get() throws InterruptedException, ExecutionException;//获取任务的结果,会产生阻塞
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;//获取任务的结果有超时时间
}
这个接口是用来对任务进行操作的。
有了能够返回任务完成结果的Callable,有了操作任务的Future,怎么结合使用呢?
FutureTask
FutureTask是集Runnable、Callable、Future一起使用的类,FutureTask实现了Runnable的run方法,将传入的Callable对象在此方法进行执行,同时将任务的返回结果通过done()方法回调。FutureTask同时实现了Future接口,用于控制任务的执行过程,由于所有工作都在在子线程执行所以不会对主线程造成阻塞。
FutureTask的构造函数
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);//将Runnable转为Callable
this.state = NEW; // ensure visibility of callable
}
FutureTask的run
public void run() {
if (state != NEW ||
!U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();//获取Callable的返回值,此处阻塞
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);//保存结果
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
FutureTask的结果
protected void set(V v) {
if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
outcome = v;
U.putOrderedInt(this, STATE, NORMAL); // final state
finishCompletion();
}
}
private void finishCompletion() {
//忽略代码
done();//回调,通知结束
}
FutureTask的逻辑了解了,具体怎么使用呢??
因为FutureTask实现了Runnable接口,所以用法类似Runnable。
Callable<Integer> callable = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return 1;
}
};
FutureTask<Integer> futureTask = new FutureTask<Integer>(callable) {
@Override
protected void done() {
try {
get();//任务完成回调方法,获取任务返回结果会抛异常
} catch (Exception e) {
e.printStackTrace();
}
}
};
new Thread(futureTask).start();
多线程
一份工作合理地分给多个人干,比全部交给一个人干,完成的时间肯定要少,这里的多个人即为多线程。
一份工作开始1个人做,后来为了赶时间招了1个共2个人做,发现时间还是不够又要招1个。做这份工作的时候除了实际的工作时间,花费最多的应该就是招人的时间了。
这时有了劳务派遣公司,我要人的时候立刻就会送人过来,这样就不用花时间去招人。
线程也是一样,由于线程的创建与销毁都需要耗费资源,所以多线程都会用到线程池,当工作需要线程的时候就去线程池中取。
线程池
常规的几种
- FixedThreadPool 固定线程数量的线程池
- SingleThreadExecutor 单个线程数量的线程池
- CachedThreadPool 可缓存线程的线程池
- SingleThreadScheduledExecutor 单线程计划线程池
- ScheduledThreadPool 计划线程池
非常规(待细分析)
- WorkStealingPool(工作窃取模式,一个任务太大可以fork成多个任务,每个小任务执行完成后join形成最后的结果)
它们是通过策略模式(接口Executor、ExecutorService、AbstractExecutorService:策略,类ThreadPoolExecutor:各种策略的实现,策略的切换类:Executors)产生的。策略模式是一种通过接口统一行为,继承接口的类来实现具体的操作,通常会有一个策略的切换类用以更换策略,是一种创建型的设计模式。
通过源码追踪看到Executors只是创建线程池的一个工具类,内除了静态方法就是一个
所以此类不能够实例化。
除ForkJoinPool外其余线程池都是ThreadPoolExecutor实例化出来的。
ThreadPoolExecutor
关键的构造函数
public ThreadPoolExecutor(int corePoolSize,//核心线程数
int maximumPoolSize,//池内最大线程数
long keepAliveTime,//线程的存活时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//线程排列顺序
ThreadFactory threadFactory,//线程生成器
RejectedExecutionHandler handler) //当线程数超出限制时的处理
由ThreadPoolExecutor实例化出来的线程池即劳务派遣公司,内部有人员配置指标,合同工(核心线程)为corePoolSize人,工人总数(总线程数)maximumPoolSize,所有人员均来自threadFactory。
公司之初为了减少人工成本,没有工人,来一个任务签一个合同工,此时即使有合同工闲着也要招一个合同工,直到达到指标。
合同工的合同期限为永久,当合同工全部在忙又有新任务时,还要招些临时工(一般线程),要符合人员配置指标。但临时工的合同时间很短为keepAliveTime单位为unit。
当有人需要劳务派遣时,优先合同工,其次才是临时工,任务多到现有maximumPoolSize人数做不完时,先将任务保存到workQueue等待处理,任务继续增加到workQueue存不下了handler就会来处理。
淡季到来时由于旺季人员全招满了有maximumPoolSize之多,所以轮到与临时工解除合同了,凡到期keepAliveTime的临时工将离开劳务派遣公司,只剩合同工。
这就是劳务派遣公司的运营模式,但不同地区情况不同,不能按这种模式去适应所有情况,所以在此运营模式的基础上,各地有自己特色的劳务派遣公司
FixedThreadPool
固定线程数量的线程池
核心线程数即总线程数,keepAliveTime=0。此劳务派遣公司不招临时工,工作全部由合同工做。
SingleThreadExecutor
单个线程数量的线程池,其他线程池后缀为pool,但它却是executor也表示出了单数的意思。
此劳务派遣公司不仅不招临时工,而且就只有一个合同工,且工作全部由他做。
CachedThreadPool
可缓存线程的线程池
此劳务派遣公司没有合同工全部是临时工,且临时工数量非常大Integer.MAX_VALUE。
多线程的使用
//Callable
Executors.newSingleThreadExecutor().submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return 0;
}
});
//Runnable
Executors.newSingleThreadExecutor().submit(new Runnable() {
@Override
public void run() {
}
});
//FutureTask
Callable<Integer> callable = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return 0;
}
};
Executors.newSingleThreadExecutor().submit(new FutureTask<Integer>(callable) {
@Override
protected void done() {
//任务结束可以取结果了
}
});
通过Executors.newSingleThreadExecutor()获取适合的线程池,将FutureTask任务submit到线程池,当任务完成回调done方法,如果我们在done方法内再起一个线程或回到主线程,不就实现了线程间到通讯了吗??
线程间通讯
根据上面的思路,ThreadPool执行完FutureTask会回调done方法,在done方法内任务的执行已经有了结果,此时回到主线程就形成了真正意义上的后台执行效果,官方提供的AsyncTask就是此思路。
AsyncTask
由于存在版本差异,这里只是线程间通讯的大体思路,源码来自API26
AsyncTask的使用
AsyncTask<String, Integer, Long> asyncTask = new AsyncTask<String, Integer, Long>() {//三个泛型分别为:参数类型、进度返回类型、结果类型
@Override
protected void onPreExecute() {//主线程
super.onPreExecute();
}
@Override
protected Long doInBackground(String... strings) {//子线程
publishProgress(10);//通知处理过程,回调到主线程的onProgressUpdate
return null;
}
@Override
protected void onProgressUpdate(Integer... values) {//主线程
super.onProgressUpdate(values);//接收处理过程
}
@Override
protected void onPostExecute(Long aLong) {//主线程
super.onPostExecute(aLong);
}
};
asyncTask.execute("");//开始执行
内部线程间通讯逻辑
//AsyncTask的构造函数
public AsyncTask() {
this((Looper) null);
}
public AsyncTask(@Nullable Handler handler) {
this(handler != null ? handler.getLooper() : null);
}
public AsyncTask(@Nullable Looper callbackLooper) {
mHandler = callbackLooper == null || callbackLooper == Looper.getMainLooper()
? getMainHandler()
: new Handler(callbackLooper);//Handler、Looper后面介绍,通过它回mHandler所在的线程,此处理解为主线程即可
mWorker = new WorkerRunnable<Params, Result>() {//实现Callable的类
public Result call() throws Exception {
mTaskInvoked.set(true);
Result result = null;
try {
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
//noinspection unchecked
result = doInBackground(mParams);//调用doInBackground方法,即自己重写的耗时操作
Binder.flushPendingCommands();
} catch (Throwable tr) {
mCancelled.set(true);
throw tr;
} finally {
postResult(result);//返回结果
}
return result;//返回结果
}
};
mFuture = new FutureTask<Result>(mWorker) {//任务对象
@Override
protected void done() {
try {
postResultIfNotInvoked(get());//调用postResult返回结果
} catch (InterruptedException e) {
android.util.Log.w(LOG_TAG, e);
} catch (ExecutionException e) {
throw new RuntimeException("An error occurred while executing doInBackground()",
e.getCause());
} catch (CancellationException e) {
postResultIfNotInvoked(null);
}
}
};
}
//任务执行
public final AsyncTask<Params, Progress, Result> execute(Params... params) {
return executeOnExecutor(sDefaultExecutor, params);//第一个参数为默认的线程池
}
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
Params... params) {
if (mStatus != Status.PENDING) {
switch (mStatus) {
case RUNNING:
throw new IllegalStateException("Cannot execute task:"
+ " the task is already running.");
case FINISHED:
throw new IllegalStateException("Cannot execute task:"
+ " the task has already been executed "
+ "(a task can be executed only once)");
}
}
mStatus = Status.RUNNING;//修改任务状态
onPreExecute();//调用onPreExecute方法,即自己重写的准备工作方法
mWorker.mParams = params;
exec.execute(mFuture);//将任务丢给线程池,进行异步执行
return this;
}
//将结果给Handler
private Result postResult(Result result) {
@SuppressWarnings("unchecked")
Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
new AsyncTaskResult<Result>(this, result));
message.sendToTarget();
return result;
}
//内部的Handler
private static class InternalHandler extends Handler {
public InternalHandler(Looper looper) {
super(looper);
}
@SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
@Override
public void handleMessage(Message msg) {
AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
switch (msg.what) {
case MESSAGE_POST_RESULT:
// There is only one result
result.mTask.finish(result.mData[0]);
break;
case MESSAGE_POST_PROGRESS:
result.mTask.onProgressUpdate(result.mData);
break;
}
}
}
private void finish(Result result) {
if (isCancelled()) {
onCancelled(result);
} else {
onPostExecute(result);////调用onPostExecute方法,即自己重写的获取结果方法
}
mStatus = Status.FINISHED;
}
protected final void publishProgress(Progress... values) {
if (!isCancelled()) {
getHandler().obtainMessage(MESSAGE_POST_PROGRESS,
new AsyncTaskResult<Progress>(this, values)).sendToTarget();
}
}
总结:
1、创建FutureTask
2、交给线程池处理
3、回调done时,将任务执行结果组装成Message传给Handler
4、Handler在主线程处理
为什么Handler的处理就是在主线程了呢?
Handler、Looper、Message、MessageQueue
Handler
构造函数
主要构造函数
public Handler(Callback callback, boolean async) {
if (FIND_POTENTIAL_LEAKS) {
final Class<? extends Handler> klass = getClass();
if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) &&
(klass.getModifiers() & Modifier.STATIC) == 0) {
Log.w(TAG, "The following Handler class should be static or leaks might occur: " +
klass.getCanonicalName());
}
}
mLooper = Looper.myLooper();//获取当前线程的Looper
if (mLooper == null) {
throw new RuntimeException(
"Can't create handler inside thread that has not called Looper.prepare()");
}
mQueue = mLooper.mQueue;//从Looper里拿出消息队列
mCallback = callback;
mAsynchronous = async;
}
使用
public final boolean sendMessage(Message msg)
{
return sendMessageDelayed(msg, 0);
}
public final boolean sendMessageDelayed(Message msg, long delayMillis)
{
if (delayMillis < 0) {
delayMillis = 0;
}
return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
}
public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
MessageQueue queue = mQueue;
if (queue == null) {
RuntimeException e = new RuntimeException(
this + " sendMessageAtTime() called with no mQueue");
Log.w("Looper", e.getMessage(), e);
return false;
}
return enqueueMessage(queue, msg, uptimeMillis);
}
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
msg.target = this;//注意这里的target
if (mAsynchronous) {
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);
}
到此Message被存到MessageQueue中,MessageQueue又是在Looper中取得的。
Looper
Looper的构造函数为private,因此外部不能new出实例。那Looper怎么来的,此处不深入直接跳出答案。
在Android中所有进程的入口如Java一样是main函数,此函数在ActivityThread
图中看出Looper的实例化。
static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();//当前线程
public static void prepareMainLooper() {
prepare(false);
synchronized (Looper.class) {
if (sMainLooper != null) {
throw new IllegalStateException("The main Looper has already been prepared.");
}
sMainLooper = myLooper();//取出当前线程的Looper设为主Looper
}
}
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));//为当前线程设置Looper
}
public static @Nullable Looper myLooper() {
return sThreadLocal.get();
}
main方法里还有个Looper.loop()开启循环
public static void loop() {
final Looper me = myLooper();//拿到Looper
if (me == null) {
throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
}
final MessageQueue queue = me.mQueue;//拿到Looper的MessageQueue
// Make sure the identity of this thread is that of the local process,
// and keep track of what that identity token actually is.
Binder.clearCallingIdentity();
final long ident = Binder.clearCallingIdentity();
for (;;) {//无限循环
Message msg = queue.next(); // might block在MessageQueue中取出Message
if (msg == null) {
// No message indicates that the message queue is quitting.
return;
}
// This must be in a local variable, in case a UI event sets the logger
final Printer logging = me.mLogging;
if (logging != null) {
logging.println(">>>>> Dispatching to " + msg.target + " " +
msg.callback + ": " + msg.what);
}
final long traceTag = me.mTraceTag;
if (traceTag != 0) {
Trace.traceBegin(traceTag, msg.target.getTraceName(msg));
}
try {
msg.target.dispatchMessage(msg);//Message被送往target,前面Handler在送Message入MessageQueue时有提醒,此处的target就是Handler
} finally {
if (traceTag != 0) {
Trace.traceEnd(traceTag);
}
}
if (logging != null) {
logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);
}
// Make sure that during the course of dispatching the
// identity of the thread wasn't corrupted.
final long newIdent = Binder.clearCallingIdentity();
if (ident != newIdent) {
Log.wtf(TAG, "Thread identity changed from 0x"
+ Long.toHexString(ident) + " to 0x"
+ Long.toHexString(newIdent) + " while dispatching to "
+ msg.target.getClass().getName() + " "
+ msg.callback + " what=" + msg.what);
}
msg.recycleUnchecked();
}
}
到此知道Looper在主线程不停的从MessageQueue中取出由Handler送来的Message去Handler处理。
总结:MessageQueue只是个暂存处,是Looper的成员变量。每一个线程有且仅有一个Looper,意味着有且仅有一个MessageQueue。Handler创建的时候指定了Looper,当多个Handler对象sendMessage时,message其实都去了同一个MessageQueue,然后到时间Looper又将其取出来送回原Handler。
线程间通讯时,将主线程创建的Handler对象作为变量传递到子线程,其发出的Message又会回到主线程的Handler进行处理,因此实现来线程间的通讯。
public void dispatchMessage(Message msg) {//由Looper送回的Message
if (msg.callback != null) {
handleCallback(msg);
} else {
if (mCallback != null) {
if (mCallback.handleMessage(msg)) {
return;
}
}
handleMessage(msg);//为重写方法
}
}
MessageQueue
是如何存储Message的呢?
在Looper循环的时候,看到Message msg = queue.next()。没错!MessageQueue就是个单链表。
到此android的线程间通讯就告一段落了,android主要以消息队列的方式进行线程间通讯,runOnUiThread也是Handler做的,当然通讯的方式还有管道、共享内存等。
多线程操作带来的不仅仅是效率,控制不好也会出现问题。
a=10
线程一取a=10加5得15
线程二取a=10减5得5存a=5
此时线程一才反应过来存a=15
那具体的操作到底谁先谁后,a最后等于几?
线程锁
synchronized关键字
synchronized应该是最常见的控制线程同步问题的解决方法,通过获取指定Object的锁来达到同步的目的。
public class MyClass {
public void method1(Object object){//获取由MyClass产生对象的锁
synchronized (this){
//todo
}
}
public void method2(Object object){//获取object对象的锁
synchronized (object){
//todo
}
}
public void method3(Object object){//获取MyClass这个类对象的锁
synchronized (MyClass.class){
//todo
}
}
public synchronized void method4(Object object){//获取由MyClass产生对象的锁
//todo
}
public static synchronized void method5(Object object){//获取MyClass这个类对象的锁
//todo
}
}
Object
android中所有的类的基类,synchronized也是通过Object才可以给类对象和对象加锁,可以看到相关的方法是native层的。
wait()、notify()、notifyAll()
这三个方法是 java.lang.Object 的 final native 方法,任何继承 java.lang.Object 的类都有这三个方法。它们是Java语言提供的实现线程间阻塞和控制进程内调度的底层机制,平时我们会很少用到的。
wait():
导致线程进入等待状态,直到它被其他线程通过notify()或者notifyAll唤醒,该方法只能在同步方法中调用。
notify():
随机选择一个在该对象上调用wait方法的线程,解除其阻塞状态,该方法只能在同步方法或同步块内部调用。
notifyAll():
解除所有那些在该对象上调用wait方法的线程的阻塞状态,同样该方法只能在同步方法或同步块内部调用。
调用这三个方法中任意一个,当前线程必须是锁的持有者,如果不是会抛出一个 IllegalMonitorStateException 异常。
wait()释放锁 Thread.sleep(long time)不释放锁
Lock
Lock本身是一个接口,真正的实现类是ReentrankLock。它的使用与synchronized类似,获取ReentrankLock对象的锁来实现线程的同步,但多了锁的一些操作。synchronized为native层控制而Lock为代码控制,所以加锁与解锁必须成对出现。简单场景synchronized效率高,到了多线程复杂操作时ReentrankLock稳定到效率得以展现。
这里有个公平的问题,锁分为公平锁与不公平锁,默认构造非公平锁。此处的公平并不是优先级,而是线程来的时间先后。公平锁按线程FIFO的顺序处理,非公平锁则随机从线程队列中取,若刚好释放锁的一刻来了个线程则此线程插队了线程队列里正在等待的其他线程。
public void method(){
ReentrantLock lock=new ReentrantLock();
try{
lock.lock();
//todo
}finally {
lock.unlock();//切记释放锁
}
}
ReadWriteLock
上面的锁好用,但某些场景却效率低下。
一本书
线程一加锁:读
线程二来等待
线程一读完解锁
线程二加锁:读
这种场景下书的内容没有变化,完全可以线程一二共同读不必让线程二消耗时间等待,因此就有了ReadWriteLock它的实现类是ReentrantReadWriteLock。
其内部也是实现了Lock接口,针对读写的特性内部控制了是否放行线程。
Condition
用来代替Object的
public interface Condition {
//让当前线程等待,直到线程发出信号signal或被线程中断
void await() throws InterruptedException;
//让当前线程等待,直到线程发出信号signal
void awaitUninterruptibly();
//让当前线程等待,直到线程发出信号signal、被线程中断或等超时
long awaitNanos(long nanosTimeout) throws InterruptedException;
//让当前线程等待,直到线程发出信号signal、被线程中断或等超时
boolean await(long time, TimeUnit unit) throws InterruptedException;
//让当前线程等待,直到线程发出信号signal、被线程中断或到终止时间
boolean awaitUntil(Date deadline) throws InterruptedException;
//唤醒一个等待线程
void signal();
//唤醒所有等待线程
void signalAll();
}
看个例子来自https://www.2cto.com/kf/201609/549780.html
/*
* 生产者与消费者问题
*/
public class ProduceConsume {
public static void main(String[] args) {
SyncStack ss = new SyncStack();
Produce pd = new Produce(ss);
Consume cs = new Consume(ss);
Thread t1 = new Thread(pd);
Thread t2 = new Thread(cs);
t1.start();
t2.start();
}
}
/*
* 馒头实体类
*/
class ManTou {
private int id;
public ManTou(int id) {
this.id = id;
}
public int getId() {
return this.id;
}
@Override
public String toString() {
// TODO Auto-generated method stub
return "ManTou " + getId();
}
}
/*
* 馒头框类
*/
class SyncStack {
Lock lock = new ReentrantLock();
Condition full = lock.newCondition();
Condition empty = lock.newCondition();
int index = 0;
ManTou[] mtArray = new ManTou[6];
public void push(ManTou mt) {
lock.lock();
try {
while (index == mtArray.length) {
try {
full.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
empty.signal();
mtArray[index] = mt;
index++;
System.out.println("生产了" + mt);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void pop() {
lock.lock();
try {
while (index == 0) {
try {
empty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
full.signal();
index--;
System.out.println("消费了" + mtArray[index]);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
/*
* 生产者
*/
class Produce implements Runnable {
SyncStack ss = null;
public Produce(SyncStack ss) {
this.ss = ss;
}
@Override
public void run() {
// TODO Auto-generated method stub
for (int i = 0; i < 20; i++) {
ManTou mt = new ManTou(i);
if (ss != null) {
ss.push(mt);
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/*
* 消费者
*/
class Consume implements Runnable {
SyncStack ss = null;
public Consume(SyncStack ss) {
this.ss = ss;
}
@Override
public void run() {
// TODO Auto-generated method stub
for (int i = 0; i < 20; i++) {
if (ss != null) {
ss.pop();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Semaphore
上面这些锁在某一时刻只能有一个线程进入,实际场景可能并不适用。
公共厕所共3个坑位
来1个人占1个
这时来了3个人全部占满了
第4个人就在等着
直到有1个人离开,第4个人才进入
private Semaphore semaphore=new Semaphore(3);
public void bathroom(SomeOne someOne){
semaphore.acquire();
someOne.doSomething();
semaphore.release();
}
CountDownLatch
学习最好的资料就是源码,看看官方的例子:
//例子一
class Driver { // ...
void main() throws InterruptedException {
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);
for (int i = 0; i < N; ++i) // create and start threads
new Thread(new Worker(startSignal, doneSignal)).start();
doSomethingElse(); // don't let run yet
startSignal.countDown(); // let all threads proceed
doSomethingElse();
doneSignal.await(); // wait for all to finish 等doneSignal变0
}
}
class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() {
try {
startSignal.await();//等待startSignal变为0
doWork();
doneSignal.countDown();//完成工作 计数
} catch (InterruptedException ex) {} // return;
}
void doWork() { ... }
}
//例子二
class Driver2 { // ...
void main() throws InterruptedException {
CountDownLatch doneSignal = new CountDownLatch(N);
Executor e = ...
for (int i = 0; i < N; ++i) // create and start threads
e.execute(new WorkerRunnable(doneSignal, i));
doneSignal.await(); // wait for all to finish等doneSignal变为0
}
}
class WorkerRunnable implements Runnable {
private final CountDownLatch doneSignal;
private final int i;
WorkerRunnable(CountDownLatch doneSignal, int i) {
this.doneSignal = doneSignal;
this.i = i;
}
public void run() {
try {
doWork(i);
doneSignal.countDown();//doneSignal计数减一
} catch (InterruptedException ex) {} // return;
}
void doWork() { ... }
}
看过一个问题:5个线程打印hello world,先打印hello再打印world
CountDownLatch countDownLatch=new CountDownLatch(5);
class Worker implements Runnable{
@Override
public void run() {
try {
System.out.print("hello");
countDownLatch.countDown();
countDownLatch.await();
System.out.print("world");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
线程中的其他问题
volatile关键字
作用:
-
保证了不同线程对这个变量进行操作时的可见性,即一个线程修改了某个变量的值,这新值对其他线程来说是立即可见的。
-
禁止进行指令重排序(即不能将在对volatile变量访问的语句放在其后面执行,也不能把volatile变量后面的语句放到其前面执行)。
join()方法
在Thread类中有join()方法,它能够将并行改为串行。如何操作的呢?说白了就是插队。
public void method() {
Thread t1 = new Thread();
Thread t2= new Thread();
t1.start();//主线程启动t1,当前线程为主线程
t1.join();//t1插队主线程,主线程等待t1完成
t2.start();//主线程拿回控制权,开启t2
}
Thread.yield()方法
让当前线程从运行态转为就绪态,此时线程进入等待队列,如果此线程优先级较高,则又会被调用,并不是Thread.yield()后就一定会将cpu交给其他线程。
网友评论