一般我们多线程的通信方式
-
我们可以根据加锁进行限制线程的先后,比如, thread.join() 方法用synchronize加锁后,能让我们的线程根据谁先拿到锁的次序来执行,一般局限性很大
-
我们可以使用传统的object.wait(), object.notify(),他们都是属于Object类
-
我们可以使用Condition控制线程通信lock + condition + await + signal
-
使用阻塞队列(BlockingQueue)控制线程通信
-
有时候线程需要返回数值,我们可以通过Callable 、Future、FutureTask来满足需求。
方法很多,我们还可以设置一个线程之间共享的变量的他通知每个线程的执行顺序等等,当然,这个线程共享的变量的必须要满足多线程的要求,我们可以使用synchronize,使用原子类AtomicInteger等等来满足要求。
使用object.wait(), object.notify() 来实现两个线程交替执行
例如我们像交替打印奇偶数
public class ThreadUseObjectFunctionDemo implements Runnable {
static int value = 0;
@Override
public void run() {
while (value <= 100) {
synchronized (ThreadUseObjectFunctionDemo.class) {
System.out.println(Thread.currentThread().getName() + ":" + value++);
ThreadUseObjectFunctionDemo.class.notify();
try {
ThreadUseObjectFunctionDemo.class.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
new Thread(new ThreadUseObjectFunctionDemo(), "1").start();
new Thread(new ThreadUseObjectFunctionDemo(), "1").start();
}
}
四个线程 abc,其中 d 要等到 abc全执行完毕后才执行,而且abc是同步运行的
int worker = 3;
CountDownLatch countDownLatch = new CountDownLatch(worker);
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("D is waiting for other three threads");
try {
countDownLatch.await();
System.out.println("All done, D starts working");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
for (char threadName='A'; threadName <= 'C'; threadName++) {
final String tN = String.valueOf(threadName);
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(tN + "is working");
try {
Thread.sleep(100);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(tN + "finished");
countDownLatch.countDown();
}
}).start();
}
BlockingQueue
一般我们使用BlockingQueue于生产者,消费者问题。这里就不载赘述了。就是可以充当多线程的队列。
通过Callable 、Future、FutureTask来返回结果值
计算一个数组的之和
package threadDemo;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
public class CallableDemo implements Callable<Integer> {
//实现一个数组的求和
int start;
int end;
public CallableDemo (int start, int end){
this.start = start;
this.end = end;
}
private static volatile int[] nums;
{
nums = new int[1000];
for (int i = 0; i < 1000; i++)
nums[i] = i;
}
@Override
public Integer call() throws Exception {
int tmpResult = 0;
for (int i = start; i < end; i++){
tmpResult += nums[i];
// System.out.println(Thread.currentThread().getName() + " 正在工作 " );
}
return tmpResult;
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<Integer>[] result = new FutureTask[4];
for (int i = 0; i < 4; i++){
result[i] = new FutureTask<>(new CallableDemo(i*250,(i+1)*250));
new Thread(result[i]).start();
}
int res = 0;
for (int i = 0;i < 4; i++)
res += result[i].get();
System.out.println("result " + res);
}
}
CountDownLatch
可以使用激活多个线程启动,也可以等待多个线程执行完后呼唤线程
public class CountDownLatchDemo {
private static final int threadCount = 550;
public static void main(String[] args) throws InterruptedException {
// 创建一个具有固定线程数量的线程池对象(如果这里线程池的线程数量给太少的话你会发现执行的很慢)
ExecutorService threadPool = Executors.newFixedThreadPool(300);
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadnum = i;
threadPool.execute(() -> {// Lambda 表达式的运用
try {
test(threadnum);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
countDownLatch.countDown(); // 表示一个请求已经被完成
}
});
}
countDownLatch.await(); //等待countDownLatch的计数器为0时 才可以呼唤起主线程
threadPool.shutdown();
System.out.println("finish");
}
public static void test(int threadnum) throws InterruptedException {
Thread.sleep(1000);// 模拟请求的耗时操作
System.out.println("threadnum:" + threadnum);
Thread.sleep(1000);// 模拟请求的耗时操作
}
}
Semaphore
public class SemaphoreDome2 {
static int result = 0;
public static void main(String[] args) throws InterruptedException {
int N = 3;
Thread[] threads = new Thread[N];
final Semaphore[] syncObjects = new Semaphore[N];
for (int i = 0; i < N; i++) {
syncObjects[i] = new Semaphore(1);
if (i != N-1){
syncObjects[i].acquire();
}
}
for (int i = 0; i < N; i++) {
final Semaphore lastSemphore = i == 0 ? syncObjects[N - 1] : syncObjects[i - 1];
//上一个线程 有下一个线程的信号
final Semaphore curSemphore = syncObjects[i];
final int index = i;
threads[i] = new Thread(new Runnable() {
public void run() {
try {
while (true) { //先让上一个线程请求执行 这个线程在释放许可 就可以控制
lastSemphore.acquire(); //acquire 是一个堵塞的方法
System.out.println("thread" + index + ": " + result++);
if (result > 100){
System.exit(0);
}
curSemphore.release();
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
threads[i].start();
}
}
}
参考博客
JAVA GUIDE
https://juejin.im/post/5c89b9515188257e5b2befdd?utm_source=gold_browser_extension
其中还有对顺序控制多线程的部分可以好好看一遍。
网友评论