import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class MessageQueueTest {
public static void main(String[] args) {
MessageQueue<String> messageQueue = new MessageQueue<>(5);
//生产者
for (int i = 0; i < 10; i++) {
int id = i;
new Thread(() -> {
messageQueue.put("content-" + id);
}, "生产者" + id).start();
}
//消费者
for (int i = 0; i < 2; i++) {
new Thread(() -> {
while (true) {
System.out.println(messageQueue.take());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "消费者" + i).start();
}
}
}
class MessageQueue<T> {
private final List<T> queue = new LinkedList<>();
private final int capacity;
public MessageQueue(int capacity) {
this.capacity = capacity;
}
public T take() {
synchronized (queue) {
while (queue.isEmpty()) {
try {
System.out.println("队列空,消费者线程:【" + Thread.currentThread().getName() + "】等待");
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T message = queue.remove(0);
queue.notifyAll();
return message;
}
}
public void put(T message) {
synchronized (queue) {
while (queue.size() == capacity) {
try {
System.out.println("队列满,生产者线程:【" + Thread.currentThread().getName() + "】等待");
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.add(message);
queue.notifyAll();
}
}
}
网友评论