美文网首页
线程池+队列并发处理

线程池+队列并发处理

作者: 舒尔诚 | 来源:发表于2019-01-08 17:44 被阅读0次

    package com.mqtt.thread;

    import java.util.Hashtable;
    import java.util.LinkedList;
    import java.util.Map;
    import java.util.Queue;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.Executors;
    import java.util.concurrent.RejectedExecutionHandler;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.ScheduledFuture;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.BeansException;
    import org.springframework.beans.factory.BeanFactory;
    import org.springframework.beans.factory.BeanFactoryAware;
    import org.springframework.stereotype.Component;

    /**

    • @author HeyS1

    • @date 2016/12/1

    • @description threadPool多线池,不返回子线程的运行状态

    • scheduler 调度线程池 用于处理订单线程池由于超出线程范围和队列容量而不能处理的订单
      /
      @Component
      public class ThreadPoolManager implements BeanFactoryAware {
      private static Logger log = LoggerFactory.getLogger(ThreadPoolManager.class);
      private BeanFactory factory;//用于从IOC里取对象
      // 线程池维护线程的最少数量
      private final static int CORE_POOL_SIZE = 2;
      // 线程池维护线程的最大数量
      private final static int MAX_POOL_SIZE = 10;
      // 线程池维护线程所允许的空闲时间
      private final static int KEEP_ALIVE_TIME = 0;
      // 线程池所使用的缓冲队列大小
      /
      *
      大于MAX_POOL_SIZE时,放入阻塞对列,
      大于》MAX_POOL_SIZE+WORK_QUEUE_SIZE:拒绝
      /
      private final static int WORK_QUEUE_SIZE =2/
      50*/;
      // 消息缓冲队列
      Queue<Object> msgQueue = new LinkedList<Object>();

      //用于储存在队列中的订单,防止重复提交
      Map<String, Object> cacheMap = new ConcurrentHashMap<>();

    private  Hashtable<String, Object>  tables=new Hashtable<String, Object>();
    

    /* @Autowired
    DBThread dBThread;
    */

    public Hashtable<String, Object> getTables() {
        return tables;
    }
    
    public void setTables(Hashtable<String, Object> tables) {
        this.tables = tables;
    }
    
    public ThreadPoolManager(Hashtable<String, Object>  tables ) {
        this.tables=tables;
    }
    
    public ThreadPoolManager() {
        super();
    }
    
    
    //由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序
    final RejectedExecutionHandler handler = new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            System.out.println("太忙了,把该订单交给调度线程池逐一处理" +r.toString()+"   "+
        //((DBThread) r).getMsg()+
        "--size="+msgQueue.size()+"---tsize="+threadPool.getQueue().size());
           
            
            //msgQueue.offer(dBThread.getMsg());
            msgQueue.offer(r);
        }
    };
    
    // 订单线程池
    final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
            CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME,
            TimeUnit.SECONDS, new ArrayBlockingQueue(WORK_QUEUE_SIZE), this.handler);
    
    // 调度线程池。此线程池支持定时以及周期性执行任务的需求。
    final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);
    
    // 访问消息缓存的调度线程,每秒执行一次
    // 查看是否有待定请求,如果有,则创建一个新的AccessDBThread,并添加到线程池中
    final ScheduledFuture taskHandler = scheduler.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            if (!msgQueue.isEmpty()) {
                if (threadPool.getQueue().size() < WORK_QUEUE_SIZE) {
                    System.out.print("调度:");
                   /* String orderId = (String) msgQueue.poll();
                    DBThread accessDBThread = (DBThread) factory.getBean("dBThread");
                    accessDBThread.setMsg(orderId);
                    threadPool.execute(accessDBThread);*/
                    DBThread dt = (DBThread) msgQueue.poll();
                    threadPool.execute(dt);
                    
                }
                 /*while (msgQueue.peek() != null) {
                 }*/
            }
        }
    }, 0, 10, TimeUnit.MILLISECONDS);//.SECONDS
    
    //终止订单线程池+调度线程池
    public void shutdown() {
        //true表示如果定时任务在执行,立即中止,false则等待任务结束后再停止
        System.out.println(taskHandler.cancel(false));
        scheduler.shutdown();
        threadPool.shutdown();
    }
    
    public Queue<Object> getMsgQueue() {
        return msgQueue;
    }
    
    
    //将任务加入订单线程池
    public void processOrders(String orderId) {
        if (cacheMap.get(orderId) == null) {
            cacheMap.put(orderId,new Object());
            
            DBThread dt = this.getNewThread(orderId, orderId);
            threadPool.execute(dt);
            
        }
    }
    
    
    /**
     * 无状态返回
     * @param topic
     * @param msg
     */
    public void processOrders2(String topic,String msg ) {
     
        DBThread dt = this.getNewThread(topic, msg);
        threadPool.execute(dt);
        
            
    }
    
    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        factory = beanFactory;
    }
    
    public DBThread getNewThread(String topic,String msg ){
        DBThread db=new DBThread(tables);
        db.setMsg(msg);
        db.setTopic(topic);
        return db;
    }
    

    public static void main(String[] agrs){

    noReturnTest();
    
    }
    
    
    
    public static void noReturnTest(){
    
        Hashtable<String, Object>  tables=new Hashtable<String, Object>();
            ThreadPoolManager tmp2=new ThreadPoolManager(tables);
            int size=1000;
            long start = System.currentTimeMillis();
            
            for (int i = 0; i < size; i++) {
                //模拟并发500条记录            
                tmp2.processOrders2("for-"+i, i+"");
            
            }
            
            //等待执行完,处理完的个数等于总数则完成
            while(tables.size()<size){
                
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                
                
                //等待
                System.out.println("----等待完成,已处理:"+tables.size()+"~~~"+tmp2.getTables().size());
                
            }
            for(Map.Entry<String, Object> entry: tables.entrySet()){
                System.out.println("key---------"+entry.getKey()+"  "+"value--------"+entry.getValue());
            }
            long end = System.currentTimeMillis();
            
            System.out.println("-----------处理完毕----------------"+(end-start));
            tmp2.shutdown();
            System.out.println("----------------00000000000000000--------------");
        
    }
    

    }

    -----------------线程类--------------------------------------------
    package com.mqtt.thread;

    import java.io.IOException;
    import java.util.Hashtable;

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.context.annotation.Scope;
    import org.springframework.stereotype.Component;

    import com.mqtt.AlibabaMQTTSendRecive;

    //线程池中工作的线程
    @Component
    @Scope("prototype")//spring 多例
    public class DBThread implements Runnable {
    private String msg;
    private String topic ;
    private Logger log = LoggerFactory.getLogger(DBThread.class);
    private Hashtable<String, Object> tables=new Hashtable<String, Object>();

    public DBThread(Hashtable<String, Object>tables ) {
    this.tables=tables;
    }
    public DBThread() {
    super();
    }

    public DBThread(String msg, String topic) {
    super();
    this.msg = msg;
    this.topic = topic;
    }

    public String getTopic() {
    return topic;
    }

    public void setTopic(String topic) {
    this.topic = topic;
    }

    @Override
    public void run() {
    //模拟在数据库插入数据
    log.info("发送消息线程进程 DBThread= "+ this.toString()+ " ---- ->" + msg+"");
    System.out.println("发送消息线程进程 DBThread= "+ this.toString()+ " ---- ->" + msg+"");

      try {
            Thread.sleep(3000);
                        //TODO dosomething
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
      
      tables.put(topic, msg);//线程执行完毕,消息反馈
    

    }

    public String getMsg() {
    return msg;
    }

    public void setMsg(String msg) {
    this.msg = msg;
    }
    }

    相关文章

      网友评论

          本文标题:线程池+队列并发处理

          本文链接:https://www.haomeiwen.com/subject/vxubrqtx.html