美文网首页
线程池+队列并发处理

线程池+队列并发处理

作者: 舒尔诚 | 来源:发表于2019-01-08 17:44 被阅读0次

package com.mqtt.thread;

import java.util.Hashtable;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.stereotype.Component;

/**

  • @author HeyS1

  • @date 2016/12/1

  • @description threadPool多线池,不返回子线程的运行状态

  • scheduler 调度线程池 用于处理订单线程池由于超出线程范围和队列容量而不能处理的订单
    /
    @Component
    public class ThreadPoolManager implements BeanFactoryAware {
    private static Logger log = LoggerFactory.getLogger(ThreadPoolManager.class);
    private BeanFactory factory;//用于从IOC里取对象
    // 线程池维护线程的最少数量
    private final static int CORE_POOL_SIZE = 2;
    // 线程池维护线程的最大数量
    private final static int MAX_POOL_SIZE = 10;
    // 线程池维护线程所允许的空闲时间
    private final static int KEEP_ALIVE_TIME = 0;
    // 线程池所使用的缓冲队列大小
    /
    *
    大于MAX_POOL_SIZE时,放入阻塞对列,
    大于》MAX_POOL_SIZE+WORK_QUEUE_SIZE:拒绝
    /
    private final static int WORK_QUEUE_SIZE =2/
    50*/;
    // 消息缓冲队列
    Queue<Object> msgQueue = new LinkedList<Object>();

    //用于储存在队列中的订单,防止重复提交
    Map<String, Object> cacheMap = new ConcurrentHashMap<>();

private  Hashtable<String, Object>  tables=new Hashtable<String, Object>();

/* @Autowired
DBThread dBThread;
*/

public Hashtable<String, Object> getTables() {
    return tables;
}

public void setTables(Hashtable<String, Object> tables) {
    this.tables = tables;
}

public ThreadPoolManager(Hashtable<String, Object>  tables ) {
    this.tables=tables;
}

public ThreadPoolManager() {
    super();
}


//由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序
final RejectedExecutionHandler handler = new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("太忙了,把该订单交给调度线程池逐一处理" +r.toString()+"   "+
    //((DBThread) r).getMsg()+
    "--size="+msgQueue.size()+"---tsize="+threadPool.getQueue().size());
       
        
        //msgQueue.offer(dBThread.getMsg());
        msgQueue.offer(r);
    }
};

// 订单线程池
final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
        CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME,
        TimeUnit.SECONDS, new ArrayBlockingQueue(WORK_QUEUE_SIZE), this.handler);

// 调度线程池。此线程池支持定时以及周期性执行任务的需求。
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);

// 访问消息缓存的调度线程,每秒执行一次
// 查看是否有待定请求,如果有,则创建一个新的AccessDBThread,并添加到线程池中
final ScheduledFuture taskHandler = scheduler.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        if (!msgQueue.isEmpty()) {
            if (threadPool.getQueue().size() < WORK_QUEUE_SIZE) {
                System.out.print("调度:");
               /* String orderId = (String) msgQueue.poll();
                DBThread accessDBThread = (DBThread) factory.getBean("dBThread");
                accessDBThread.setMsg(orderId);
                threadPool.execute(accessDBThread);*/
                DBThread dt = (DBThread) msgQueue.poll();
                threadPool.execute(dt);
                
            }
             /*while (msgQueue.peek() != null) {
             }*/
        }
    }
}, 0, 10, TimeUnit.MILLISECONDS);//.SECONDS

//终止订单线程池+调度线程池
public void shutdown() {
    //true表示如果定时任务在执行,立即中止,false则等待任务结束后再停止
    System.out.println(taskHandler.cancel(false));
    scheduler.shutdown();
    threadPool.shutdown();
}

public Queue<Object> getMsgQueue() {
    return msgQueue;
}


//将任务加入订单线程池
public void processOrders(String orderId) {
    if (cacheMap.get(orderId) == null) {
        cacheMap.put(orderId,new Object());
        
        DBThread dt = this.getNewThread(orderId, orderId);
        threadPool.execute(dt);
        
    }
}


/**
 * 无状态返回
 * @param topic
 * @param msg
 */
public void processOrders2(String topic,String msg ) {
 
    DBThread dt = this.getNewThread(topic, msg);
    threadPool.execute(dt);
    
        
}

@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
    factory = beanFactory;
}

public DBThread getNewThread(String topic,String msg ){
    DBThread db=new DBThread(tables);
    db.setMsg(msg);
    db.setTopic(topic);
    return db;
}

public static void main(String[] agrs){

noReturnTest();

}



public static void noReturnTest(){

    Hashtable<String, Object>  tables=new Hashtable<String, Object>();
        ThreadPoolManager tmp2=new ThreadPoolManager(tables);
        int size=1000;
        long start = System.currentTimeMillis();
        
        for (int i = 0; i < size; i++) {
            //模拟并发500条记录            
            tmp2.processOrders2("for-"+i, i+"");
        
        }
        
        //等待执行完,处理完的个数等于总数则完成
        while(tables.size()<size){
            
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            
            
            //等待
            System.out.println("----等待完成,已处理:"+tables.size()+"~~~"+tmp2.getTables().size());
            
        }
        for(Map.Entry<String, Object> entry: tables.entrySet()){
            System.out.println("key---------"+entry.getKey()+"  "+"value--------"+entry.getValue());
        }
        long end = System.currentTimeMillis();
        
        System.out.println("-----------处理完毕----------------"+(end-start));
        tmp2.shutdown();
        System.out.println("----------------00000000000000000--------------");
    
}

}

-----------------线程类--------------------------------------------
package com.mqtt.thread;

import java.io.IOException;
import java.util.Hashtable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

import com.mqtt.AlibabaMQTTSendRecive;

//线程池中工作的线程
@Component
@Scope("prototype")//spring 多例
public class DBThread implements Runnable {
private String msg;
private String topic ;
private Logger log = LoggerFactory.getLogger(DBThread.class);
private Hashtable<String, Object> tables=new Hashtable<String, Object>();

public DBThread(Hashtable<String, Object>tables ) {
this.tables=tables;
}
public DBThread() {
super();
}

public DBThread(String msg, String topic) {
super();
this.msg = msg;
this.topic = topic;
}

public String getTopic() {
return topic;
}

public void setTopic(String topic) {
this.topic = topic;
}

@Override
public void run() {
//模拟在数据库插入数据
log.info("发送消息线程进程 DBThread= "+ this.toString()+ " ---- ->" + msg+"");
System.out.println("发送消息线程进程 DBThread= "+ this.toString()+ " ---- ->" + msg+"");

  try {
        Thread.sleep(3000);
                    //TODO dosomething
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
  
  tables.put(topic, msg);//线程执行完毕,消息反馈

}

public String getMsg() {
return msg;
}

public void setMsg(String msg) {
this.msg = msg;
}
}

相关文章

  • 线程池+队列并发处理

    package com.mqtt.thread; import java.util.Hashtable;impor...

  • 线程池很容易理解的

    线程池介绍 并发队列 线程池原理分析 自定义线程池 文中部分代码使用 lambda 表达式以简化代码。 线程池 什...

  • 多线程 | 4.线程池

    Java并发编程:线程池的使用 线程池基础 请求队列 线程池维护一定数量的线程,当线程池在运行状态的线程数量达上...

  • 面试刷题21:java并发工具中的队列有哪些?

    java的线程池的工作队列用到了并发队列。 队列一般用在生产者消费者的场景中,处理需要排队的需求。 你好,我是...

  • 在并发编程中线程池的使用

    在并发编程中线程池的使用 一、为什么要使用线程池 当需要处理的任务较少时,我们可以自己创建线程去处理,但在高并发...

  • 面试题系列:并发编程之线程池及队列

    并发编程之线程池及队列 问题: 线程池作用,主要实现类,并说出实现类场景以及区别。 ThreadPoolExecu...

  • 线程池

    [TOC] 线程池 1. 并发队列:阻塞队列和非阻塞队列 区别如下: 入队: 非阻塞队列:当队列中满了的时候,放入...

  • Java线程池

    什么是线程池 线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程...

  • 线程池

    线程池,是什么? 线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池...

  • 聊聊Java线程池原理

    线程池是很常用的并发框架,几乎所有需要异步和并发处理任务的程序都可用到线程池。使用线程池的好处如下: 降低资源消耗...

网友评论

      本文标题:线程池+队列并发处理

      本文链接:https://www.haomeiwen.com/subject/vxubrqtx.html