美文网首页
Java线程池的使用

Java线程池的使用

作者: yehongyu_2018 | 来源:发表于2019-01-04 15:53 被阅读0次

    线程类型:

    1. 固定线程
    2. cached线程
    3. 定时线程

    固定线程池使用

    public class WorkerService {
    private ExecutorService executorService;
    Map<Integer, Worker> workerMap = new HashMap();
    public boolean init() {
        int workerNum = 3;
        executorService = 
        Executors.newFixedThreadPool(workerNum);
        IntStream.range(0, workerNum).forEach(id -> {
                        Worker worker = new Worker(id);
                        workerMap.put(id, worker);
                        executorService.submit(worker);
                    }
            );
        }
    public void close() {
            workerMap.forEach((k, v) -> v.stop());
            //shutdown the consumer thread
            if (executorService != null) {
                // disable new tasks from being submitted
                executorService.shutdown();
                try {
                    // wait a while for existing tasks to terminate
                    if (!executorService.awaitTermination(60000, TimeUnit.MILLISECONDS)) {
                        LOGGER.error("Still waiting...");
                        executorService.shutdownNow(); // cancel currently executing tasks
                        // Wait a while for tasks to respond to being cancelled
                        if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
                            LOGGER.error("Pool did not terminate");
                        }
                    }
                } catch (InterruptedException e) {
                    LOGGER.error("force to interrupted");
                    return;
                }
                LOGGER.info("Exiting normally...");
            }
        }
    }
    private class Worker implements Runnable {
            int id;
            private volatile boolean isRunning;
            Worker(int i) {
                id = i;
                isRunning = true;
            }
            @Override
            public void run() {
                    while (isRunning) {
                        try {
                              // TODO
                         } catch (Exception e) {
                         }
                    }
              }
            public void stop() {
                isRunning = false;
            }
        }
    
    

    cache线程池使用

    public class ConsumingExecutor {
        private Map<String, List<ConsumingWorker>>       workerMap = new HashMap<>();
        private ExecutorService executor;
        public ConsumingExecutor() {
        }
        public void start(int workerNum) {
            int workerNum = 10;
            ThreadFactory tf = new ThreadFactoryBuilder()
                    .setNameFormat("ConsumingWorker-thread-%d")
                    .setUncaughtExceptionHandler(new LoggingExceptionHandler())
                    .build();
            executor = Executors.newCachedThreadPool(tf);
            Runtime.getRuntime().addShutdownHook(new Thread(ConsumingExecutor.this::shutdown));
        }
    
        public void addWorker(String key, KafkaTopic topic, ElementBlockingQueue queue, PipelineHolder.PipelineType pipelineType) {
            for (int i = 0; i < CONSUMING_WORKER_NUM; ++i) {
                ConsumingWorker consumingWorker = new ConsumingWorker(i, topic, queue, pipelineType);
                executor.submit(consumingWorker);
                if (!workerMap.containsKey(key)) {
                    List<ConsumingWorker> workers = new ArrayList<>();
                    workerMap.put(key, workers);
                }
               workerMap.get(key).add(consumingWorker);
            }
        }
        public void removeWorker(String key) {
            if (workerMap.containsKey(key)) {
                List<ConsumingWorker> workers = workerMap.get(key);
                workers.forEach(worker -> {
                    worker.shutdown();
                });
                workerMap.remove(key);
            }
        }
    
        public void updateWorker(String key, KafkaTopic topic, ElementBlockingQueue queue, PipelineHolder.PipelineType pipelineType) {
            removeWorker(key);
            addWorker(key, topic, queue, pipelineType);
        }
        public void shutdown() {
            workerMap.entrySet().stream().forEach(workers -> {
                if (workers.getValue() != null) {
                    workers.getValue().forEach(worker -> {
                        worker.shutdown();
                    });
                }
            });
    
            if (null != executor && !executor.isTerminated() && !executor.isShutdown()) {
                executor.shutdown();
                try {
                    if (!executor.awaitTermination(EXECUTOR_TERMINATION_WAIT_SEC, TimeUnit.SECONDS)) {
                        executor.shutdown();
                        if (!executor.awaitTermination(EXECUTOR_TERMINATION_WAIT_SEC, TimeUnit.SECONDS)) {
                            LOGGER.error("ConsumingExecutor did not terminate");
                        }
                    }
                } catch (Exception e) {
                    LOGGER.error("Waiting active task termination fails.");
                }
            }
        }
    
    }
    

    定时调度线程池使用

    public class CleanService {
        private ScheduledExecutorService executor;
        public void init(ServiceConfig serviceConfig) {
            executor =     Executors.newScheduledThreadPool(cleanerNumber);
            executor.scheduleWithFixedDelay(
                        new ACleanerTask(expiryDate.longValue(), clientExpiryDate.longValue(), offset.longValue()),
                        initalDelay,
                        scanInterval,
                        TimeUnit.SECONDS);
    
            executor.scheduleWithFixedDelay(new BCleanerTask(expiryDate),
                        initalDelay,
                        scanInterval,
                        TimeUnit.SECONDS);
    
        }
    
        public void close() {
            if (null != executor && !executor.isTerminated() && !executor.isShutdown()) {
                LOGGER.info("close clean executor ..");
                executor.shutdownNow();
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:Java线程池的使用

          本文链接:https://www.haomeiwen.com/subject/wnzhrqtx.html