关键方法,
put和take
这俩个方法在入队列的时候如果队列已满,就会等待队列有了空位置再入,出队的时候如果队列为空就会阻塞等待队列有了值再出。
package com.ghw.springboot.A0307;
import java.util.Random;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 阻塞队列测试
*/
public class BlockingQueueTest {
public static void main(String[] args) throws InterruptedException {
// 声明一个容量为5的缓存队列
BlockingQueue<String> queue = new LinkedBlockingQueue<>(5);
Producer producer1 = new Producer(queue);
Producer producer2 = new Producer(queue);
Producer producer3 = new Producer(queue);
Consumer consumer = new Consumer(queue);
// 借助Executors
ExecutorService service = Executors.newCachedThreadPool();
// 启动线程
service.execute(producer1);
service.execute(producer2);
service.execute(producer3);
service.execute(consumer);
// 执行3s
Thread.sleep(3 * 1000);
producer1.stop();
producer2.stop();
producer3.stop();
Thread.sleep(2000);
// 退出Executor
service.shutdown();
}
}
class Consumer implements Runnable {
private BlockingQueue<String> queue;
private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
public Consumer(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
System.out.println("启动消费者线程!" + Thread.currentThread().getName());
Random r = new Random();
boolean isRunning = true;
try {
while (true) {
System.out.println("队列:" + queue + "队列大小:" + queue.size());
String data = queue.take();
System.out.println("正在消费数据:" + data);
TimeUnit.MILLISECONDS.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
System.out.println("退出消费者线程!");
}
}
}
class Producer implements Runnable {
private volatile boolean isRunning = true;
private BlockingQueue queue;
private static AtomicInteger count = new AtomicInteger();
private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
public Producer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
String data;
Random r = new Random();
System.out.println("启动生产者线程!" + Thread.currentThread().getName());
try {
while (isRunning) {
TimeUnit.MILLISECONDS.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
data = "data:" + count.incrementAndGet();
System.out.println(data + "入队");
queue.put(data);
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
System.out.println("退出生产者线程!");
}
}
public void stop() {
isRunning = false;
}
}
运行结果:
启动生产者线程!pool-1-thread-2
启动消费者线程!pool-1-thread-4
队列:[]队列大小:0
启动生产者线程!pool-1-thread-1
启动生产者线程!pool-1-thread-3
data:1入队
正在消费数据:data:1
队列:[]队列大小:0
data:2入队
正在消费数据:data:2
data:3入队
data:4入队
data:5入队
队列:[data:3, data:4, data:5]队列大小:3
正在消费数据:data:3
data:6入队
data:7入队
data:8入队
data:9入队
队列:[data:4, data:5, data:6, data:7, data:8]队列大小:5
正在消费数据:data:4
data:10入队
data:11入队
data:12入队
队列:[data:5, data:6, data:7, data:8, data:9]队列大小:5
正在消费数据:data:5
data:13入队
队列:[data:6, data:7, data:8, data:9, data:10]队列大小:5
正在消费数据:data:6
队列:[data:7, data:8, data:9, data:10, data:11]队列大小:5
退出生产者线程!
正在消费数据:data:7
data:14入队
队列:[data:8, data:9, data:10, data:11, data:12]队列大小:5
正在消费数据:data:8
退出生产者线程!
队列:[data:9, data:10, data:11, data:12, data:13]队列大小:5
正在消费数据:data:9
退出生产者线程!
队列:[data:10, data:11, data:12, data:13, data:14]队列大小:5
正在消费数据:data:10
队列:[data:11, data:12, data:13, data:14]队列大小:4
正在消费数据:data:11
队列:[data:12, data:13, data:14]队列大小:3
正在消费数据:data:12
队列:[data:13, data:14]队列大小:2
正在消费数据:data:13
队列:[data:14]队列大小:1
正在消费数据:data:14
队列:[]队列大小:0
网友评论