美文网首页
整理分享一段处理web高并发的纯java代码

整理分享一段处理web高并发的纯java代码

作者: DONG999 | 来源:发表于2018-09-05 11:57 被阅读0次

整理笔记记录

这是一段helper工具类, 可以放到 web controller里面, 利用了java.util.concurrent下多线程的相关内容, 以生产者/消费者模式来处理高度并发的逻辑, 另外,计划看看RabbitMq提供的类似功能

public class BlockingQueueHelper {

    private static final Integer maxQueueSize = 10;
    private static BlockingQueue blockingQueue = new LinkedBlockingQueue(maxQueueSize);
    private static ExecutorService threadPool = Executors.newCachedThreadPool();

    public static BlockingQueue<String> getBlockingQueue() {
        if (blockingQueue == null) {
            blockingQueue = new LinkedBlockingQueue(maxQueueSize);
        }
        return blockingQueue;
    }

    public static boolean requestQueue(Object o) throws InterruptedException {
                
            if (blockingQueue.offer(o, 2, TimeUnit.SECONDS)) {            
                if (!LockFlag.getCustomerRunningFlag()) {                 
                    LockFlag.setCustomerRunningFlag(true);                   
                    Customer customer = new Customer(blockingQueue);
                    threadPool.execute(customer);
                }
                return true;
            } else {
                return false;             
            } 
    }
}

Consumer.java

/**
 * 获取数据:
  poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,
    取不到时返回null;
  poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,
    队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
  take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到
    BlockingQueue有新的数据被加入; 
  drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数), 
    通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
 */

public class Consumer implements Runnable{
 
    private BlockingQueue blockingQueue;
    private AtomicInteger count = new AtomicInteger();
    public Customer(BlockingQueue blockingQueue) {
        this.blockingQueue = blockingQueue;
    }
    
    @Override
    public void run() {
        System.out.println("Start Consumer...");
        LockFlag.setCustomerRunningFlag(true);
        try {
            while (LockFlag.getCustomerRunningFlag()){
                System.out.println(Thread.currentThread().getId()+" Queue current size="+blockingQueue.size());
                String data = (String) blockingQueue.poll(10, TimeUnit.SECONDS);
                if(data!=null){
                    System.out.println(Thread.currentThread().getId()+": consuming data="+data);
                }else{
                  
                    System.out.println(Thread.currentThread().getId()+" =====>all data was consumed, exit Consumer");
                    LockFlag.setCustomerRunningFlag(false);//正常退出
                }
                Thread.sleep(50);
            }
            System.out.println("Exited current Consumer");
        } catch (InterruptedException e) {
            e.printStackTrace();
            System.err.println("exception in Consumer");
            LockFlag.setCustomerRunningFlag(false); 
            Thread.currentThread().interrupt();
        }
    }
}

下面是个测试类

public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> queue = BlockingQueueHelper.getBlockingQueue();
 
        Producer producer1 = new Producer(queue);
        Producer producer2 = new Producer(queue);
 
        ExecutorService service = Executors.newCachedThreadPool(); 
        service.execute(producer1);
        service.execute(producer2);
   
        Thread.sleep(2 * 1000);
        producer1.stop();
        producer2.stop();
 
        System.out.println("all Producers stopped");
        Thread.sleep(2000);
 
        System.out.println("thread pool shutdown!");
        service.shutdown();
    }

Producer.java

/**
 * 放入数据:
  offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,
    则返回true,否则返回false.(本方法不阻塞当前执行方法的线程)
  offer(E o, long timeout, TimeUnit unit),可以设定等待的时间,如果在指定的时间内,还不能往队列中
    加入BlockingQueue,则返回失败。
  put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断
    直到BlockingQueue里面有空间再继续.
 */
public class Producer implements Runnable{
 
    private BlockingQueue blockingQueue;
    private static AtomicInteger count = new AtomicInteger();
    public Producer(BlockingQueue blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

  
    @Override
    public void run() {
        System.out.println("start Producer...");
        LockFlag.setProducerRunningFlag(true);
        try {
            while (LockFlag.getProducerRunningFlag()){
                String data = "data:"+count.incrementAndGet();
                
                if(BlockingQueueHelper.requestQueue(data)) {                                  
                    System.out.println("data produced="+data);
                }else {                 
                    System.out.println("can not put data into queue, producer exception");
                }
                Thread.sleep(50);

            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            System.err.println("Exit Producer with Exception:" + e.toString() );
            LockFlag.setProducerRunningFlag(false); 
            Thread.currentThread().interrupt();
        }
    }
    
    
    public void stop() {
        LockFlag.setProducerRunningFlag(false);
    }
}

REF:

相关文章

网友评论

      本文标题:整理分享一段处理web高并发的纯java代码

      本文链接:https://www.haomeiwen.com/subject/ffcawftx.html