整理笔记记录
这是一段helper工具类, 可以放到 web controller里面, 利用了java.util.concurrent下多线程的相关内容, 以生产者/消费者模式来处理高度并发的逻辑, 另外,计划看看RabbitMq提供的类似功能
public class BlockingQueueHelper {
private static final Integer maxQueueSize = 10;
private static BlockingQueue blockingQueue = new LinkedBlockingQueue(maxQueueSize);
private static ExecutorService threadPool = Executors.newCachedThreadPool();
public static BlockingQueue<String> getBlockingQueue() {
if (blockingQueue == null) {
blockingQueue = new LinkedBlockingQueue(maxQueueSize);
}
return blockingQueue;
}
public static boolean requestQueue(Object o) throws InterruptedException {
if (blockingQueue.offer(o, 2, TimeUnit.SECONDS)) {
if (!LockFlag.getCustomerRunningFlag()) {
LockFlag.setCustomerRunningFlag(true);
Customer customer = new Customer(blockingQueue);
threadPool.execute(customer);
}
return true;
} else {
return false;
}
}
}
Consumer.java
/**
* 获取数据:
poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,
取不到时返回null;
poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,
队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到
BlockingQueue有新的数据被加入;
drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),
通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
*/
public class Consumer implements Runnable{
private BlockingQueue blockingQueue;
private AtomicInteger count = new AtomicInteger();
public Customer(BlockingQueue blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
System.out.println("Start Consumer...");
LockFlag.setCustomerRunningFlag(true);
try {
while (LockFlag.getCustomerRunningFlag()){
System.out.println(Thread.currentThread().getId()+" Queue current size="+blockingQueue.size());
String data = (String) blockingQueue.poll(10, TimeUnit.SECONDS);
if(data!=null){
System.out.println(Thread.currentThread().getId()+": consuming data="+data);
}else{
System.out.println(Thread.currentThread().getId()+" =====>all data was consumed, exit Consumer");
LockFlag.setCustomerRunningFlag(false);//正常退出
}
Thread.sleep(50);
}
System.out.println("Exited current Consumer");
} catch (InterruptedException e) {
e.printStackTrace();
System.err.println("exception in Consumer");
LockFlag.setCustomerRunningFlag(false);
Thread.currentThread().interrupt();
}
}
}
下面是个测试类
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> queue = BlockingQueueHelper.getBlockingQueue();
Producer producer1 = new Producer(queue);
Producer producer2 = new Producer(queue);
ExecutorService service = Executors.newCachedThreadPool();
service.execute(producer1);
service.execute(producer2);
Thread.sleep(2 * 1000);
producer1.stop();
producer2.stop();
System.out.println("all Producers stopped");
Thread.sleep(2000);
System.out.println("thread pool shutdown!");
service.shutdown();
}
Producer.java
/**
* 放入数据:
offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,
则返回true,否则返回false.(本方法不阻塞当前执行方法的线程)
offer(E o, long timeout, TimeUnit unit),可以设定等待的时间,如果在指定的时间内,还不能往队列中
加入BlockingQueue,则返回失败。
put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断
直到BlockingQueue里面有空间再继续.
*/
public class Producer implements Runnable{
private BlockingQueue blockingQueue;
private static AtomicInteger count = new AtomicInteger();
public Producer(BlockingQueue blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
System.out.println("start Producer...");
LockFlag.setProducerRunningFlag(true);
try {
while (LockFlag.getProducerRunningFlag()){
String data = "data:"+count.incrementAndGet();
if(BlockingQueueHelper.requestQueue(data)) {
System.out.println("data produced="+data);
}else {
System.out.println("can not put data into queue, producer exception");
}
Thread.sleep(50);
}
} catch (InterruptedException e) {
e.printStackTrace();
System.err.println("Exit Producer with Exception:" + e.toString() );
LockFlag.setProducerRunningFlag(false);
Thread.currentThread().interrupt();
}
}
public void stop() {
LockFlag.setProducerRunningFlag(false);
}
}
REF:
网友评论