美文网首页
整理分享一段处理web高并发的纯java代码

整理分享一段处理web高并发的纯java代码

作者: DONG999 | 来源:发表于2018-09-05 11:57 被阅读0次

    整理笔记记录

    这是一段helper工具类, 可以放到 web controller里面, 利用了java.util.concurrent下多线程的相关内容, 以生产者/消费者模式来处理高度并发的逻辑, 另外,计划看看RabbitMq提供的类似功能

    public class BlockingQueueHelper {
    
        private static final Integer maxQueueSize = 10;
        private static BlockingQueue blockingQueue = new LinkedBlockingQueue(maxQueueSize);
        private static ExecutorService threadPool = Executors.newCachedThreadPool();
    
        public static BlockingQueue<String> getBlockingQueue() {
            if (blockingQueue == null) {
                blockingQueue = new LinkedBlockingQueue(maxQueueSize);
            }
            return blockingQueue;
        }
    
        public static boolean requestQueue(Object o) throws InterruptedException {
                    
                if (blockingQueue.offer(o, 2, TimeUnit.SECONDS)) {            
                    if (!LockFlag.getCustomerRunningFlag()) {                 
                        LockFlag.setCustomerRunningFlag(true);                   
                        Customer customer = new Customer(blockingQueue);
                        threadPool.execute(customer);
                    }
                    return true;
                } else {
                    return false;             
                } 
        }
    }
    

    Consumer.java

    /**
     * 获取数据:
      poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,
        取不到时返回null;
      poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,
        队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
      take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到
        BlockingQueue有新的数据被加入; 
      drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数), 
        通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
     */
    
    public class Consumer implements Runnable{
     
        private BlockingQueue blockingQueue;
        private AtomicInteger count = new AtomicInteger();
        public Customer(BlockingQueue blockingQueue) {
            this.blockingQueue = blockingQueue;
        }
        
        @Override
        public void run() {
            System.out.println("Start Consumer...");
            LockFlag.setCustomerRunningFlag(true);
            try {
                while (LockFlag.getCustomerRunningFlag()){
                    System.out.println(Thread.currentThread().getId()+" Queue current size="+blockingQueue.size());
                    String data = (String) blockingQueue.poll(10, TimeUnit.SECONDS);
                    if(data!=null){
                        System.out.println(Thread.currentThread().getId()+": consuming data="+data);
                    }else{
                      
                        System.out.println(Thread.currentThread().getId()+" =====>all data was consumed, exit Consumer");
                        LockFlag.setCustomerRunningFlag(false);//正常退出
                    }
                    Thread.sleep(50);
                }
                System.out.println("Exited current Consumer");
            } catch (InterruptedException e) {
                e.printStackTrace();
                System.err.println("exception in Consumer");
                LockFlag.setCustomerRunningFlag(false); 
                Thread.currentThread().interrupt();
            }
        }
    }
    

    下面是个测试类

    public static void main(String[] args) throws InterruptedException {
            BlockingQueue<String> queue = BlockingQueueHelper.getBlockingQueue();
     
            Producer producer1 = new Producer(queue);
            Producer producer2 = new Producer(queue);
     
            ExecutorService service = Executors.newCachedThreadPool(); 
            service.execute(producer1);
            service.execute(producer2);
       
            Thread.sleep(2 * 1000);
            producer1.stop();
            producer2.stop();
     
            System.out.println("all Producers stopped");
            Thread.sleep(2000);
     
            System.out.println("thread pool shutdown!");
            service.shutdown();
        }
    

    Producer.java

    /**
     * 放入数据:
      offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,
        则返回true,否则返回false.(本方法不阻塞当前执行方法的线程)
      offer(E o, long timeout, TimeUnit unit),可以设定等待的时间,如果在指定的时间内,还不能往队列中
        加入BlockingQueue,则返回失败。
      put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断
        直到BlockingQueue里面有空间再继续.
     */
    public class Producer implements Runnable{
     
        private BlockingQueue blockingQueue;
        private static AtomicInteger count = new AtomicInteger();
        public Producer(BlockingQueue blockingQueue) {
            this.blockingQueue = blockingQueue;
        }
    
      
        @Override
        public void run() {
            System.out.println("start Producer...");
            LockFlag.setProducerRunningFlag(true);
            try {
                while (LockFlag.getProducerRunningFlag()){
                    String data = "data:"+count.incrementAndGet();
                    
                    if(BlockingQueueHelper.requestQueue(data)) {                                  
                        System.out.println("data produced="+data);
                    }else {                 
                        System.out.println("can not put data into queue, producer exception");
                    }
                    Thread.sleep(50);
    
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
                System.err.println("Exit Producer with Exception:" + e.toString() );
                LockFlag.setProducerRunningFlag(false); 
                Thread.currentThread().interrupt();
            }
        }
        
        
        public void stop() {
            LockFlag.setProducerRunningFlag(false);
        }
    }
    

    REF:

    相关文章

      网友评论

          本文标题:整理分享一段处理web高并发的纯java代码

          本文链接:https://www.haomeiwen.com/subject/ffcawftx.html