这次不多比比,直接上代码,发现网上很多Demo都没有去测一下offer、add、put的方法的区别就乱用,完全没用到blockingqueue的特性,相当于用了一个普通queue而已。这里简单贴一下
offer:如果队列满,会返回false,
add:如果队列满,会报错,
put:如果队列满,会阻塞该线程,直到队列有位置放
remove:如果队列为空,会报错
poll:如果队列为空,返回null
take:如果队列为空,阻塞线程,直到队列中有值
知道了这些我们再来贴代码就好理解啦
实体类:
public class Data {
int num;
public int getNum() {
return num;
}
public Data(int num) {
super();
this.num = num;
}
}
生产者:
public class Producer implements Runnable {
private volatile boolean isRunning = true;
private BlockingQueue<Data> queue;
private static AtomicInteger count = new AtomicInteger();
private static final int SLEEPTIME = 1000;
public Producer(BlockingQueue<Data> queue) {
super();
this.queue = queue;
}
@Override
public void run() {
// TODO Auto-generated method stub
System.out.println("线程id为" + Thread.currentThread().getId() + "的生产者启动了");
Data data = null;
Random r = new Random();
while (isRunning) {
try {
//这边的睡就是用来模仿生产者生产过程啦,实际业务中哪有瞬间生产完的
Thread.sleep(r.nextInt(SLEEPTIME));
//多线程的情况下当然要用原子类来操作啦
data = new Data(count.incrementAndGet());
//这边只能用put(),用其他add,offer的话队列满了不会阻塞线程。
queue.put(data);
System.out.println("生产者" + Thread.currentThread().getId() + "把 data为:"
+ data.getNum() + "BlockingQueue里去啦,现在队列里还有"+queue.size()+"个");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
//这是外部用来调用停止生产的方法
public void stop() {
this.isRunning=false;
}
}
消费者:
public class Consumer implements Runnable {
private BlockingQueue <Data> queue;
private static final int SLEEPTIME =1000;
public Consumer(BlockingQueue<Data> queue) {
super();
this.queue = queue;
}
@Override
public void run() {
// TODO Auto-generated method stub
System.out.println("线程id为"+Thread.currentThread().getId()+"的消费者启动了");
Random r=new Random();
while(true) {
Data data;
try {
//注意哦 这里只能用take()方法,用其他的话如果队列空了BlockingQueue是不会阻塞线程的。
data = queue.take();
//模仿消费者工作,这里就是简单的把数组拿出来算出平方,然后随便睡一会。
System.out.println("消费者"+Thread.currentThread().getId()+"处理数据"+Math.pow(data.getNum(), 2)
+"现在队列里还有"+queue.size()+"个");
Thread.sleep(r.nextInt(SLEEPTIME));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
主方法:
public class start {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Data> queue = new LinkedBlockingQueue<>(1);
Producer p1 = new Producer(queue);
Producer p2 = new Producer(queue);
Producer p3 = new Producer(queue);
Consumer c1 = new Consumer(queue);
Consumer c2 = new Consumer(queue);
Consumer c3 = new Consumer(queue);
ExecutorService service = Executors.newCachedThreadPool();
service.execute(p1);
service.execute(p2);
service.execute(p3);
service.execute(c1);
service.execute(c2);
service.execute(c3);
//这边的sleep是停止主线程,也就是main方法执行的这个,是为了待会再关掉生产者
//否则主线程一路执行下来,咱们生产者还没造出来东西呢,就给stop了
Thread.sleep(3 * 1000);
p1.stop();
p2.stop();
p3.stop();
//这边是为了给消费者时间去消费队列里的东西。
Thread.sleep(1 * 1000);
//线程池使用完后如果不调用shutdown会导致线程池资源一直不会被释放,记得释放哦
service.shutdown();
}
}
网友评论