基于linkedList的BlockingQueue的书写
//java中阻塞队列的应用
public class BlockingQueue {
private List queue = new LinkedList<>();
private int limit = 10;
public BlockingQueue(int limit){
this.limit = limit;
}
public synchronized void enqueue(Object item) throws Exception{
while(this.queue.size() == this.limit){
wait();
}
if(this.queue.size()==0){
notify();
}
this.queue.add(item);
}
public synchronized void dequeue(Object item) throws Exception{
while(this.queue.size() == this.limit){
notify();
}
if(this.queue.size()==0){
wait();
}
this.queue.remove(0);
}
}
异步消息队列
原始数据类
public class PCData {
private final int intData;
public PCData (int d ){
intData=d;
}
public PCData (String d ){
intData=Integer.valueOf(d);
}
public int getData(){
return intData;
}
@Override
public String toString() {
return "data: "+intData;
}
}
消费者
public class Consumer implements Runnable{
private BlockingQueue<PCData> queue;
private static final int SLEEPTIME = 1000;
public Consumer(BlockingQueue<PCData> queue){
this.queue = queue;
}
@Override
public void run() {
System.out.println("start Consumer id :"+Thread.currentThread().getId());
Random r = new Random();
try{
while(true){
//注意:take()都remove()都是删除元素并返回队列的头,但是take()操作为空不报错
PCData data = queue.take();
if(data != null)
{
int re = data.getData() * data.getData();
System.out.println("消费者消耗的数值: "+MessageFormat.format("{0}*{1}={2}", data.getData(),data.getData(),re));
Thread.sleep(r.nextInt(SLEEPTIME));
}
}
}catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
生产者
public class Producer implements Runnable{
private volatile boolean isRunning = true;
private BlockingQueue<PCData> queue;
private static AtomicInteger count = new AtomicInteger();
private static final int SLEEPTIME = 1000;
public Producer( BlockingQueue<PCData> queue){
this.queue = queue;
}
@Override
public void run() {
PCData data = null;
Random random = new Random();
System.out.println("start producting id:" + Thread.currentThread().getId());
try {
while(isRunning){
Thread.sleep(random.nextInt(SLEEPTIME));
data = new PCData(count.incrementAndGet());
System.out.println(data + "加入队列");
if(!queue.offer(data, 2, TimeUnit.SECONDS)){
System.out.println("加入队列失败");
}
}
} catch (Exception e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
public void stop(){
isRunning = false;
}
}
主测试方法
public class Main {
public static void main(String[] args) throws Exception {
BlockingQueue<PCData> queue = new LinkedBlockingDeque<>(10);
Producer p1 = new Producer(queue);
Producer p2 = new Producer(queue);
Producer p3 = new Producer(queue);
Consumer c1 = new Consumer(queue);
Consumer c2 = new Consumer(queue);
Consumer c3 = new Consumer(queue);
ExecutorService service = Executors.newCachedThreadPool();
service.execute(p1);
service.execute(p2);
service.execute(p3);
service.execute(c1);
service.execute(c2);
service.execute(c3);
Thread.sleep(10*1000);
p1.stop();
p2.stop();
p3.stop();
Thread.sleep(3000);
service.shutdown();
}
}
网友评论