线程类型:
- 固定线程
- cached线程
- 定时线程
固定线程池使用
public class WorkerService {
private ExecutorService executorService;
Map<Integer, Worker> workerMap = new HashMap();
public boolean init() {
int workerNum = 3;
executorService =
Executors.newFixedThreadPool(workerNum);
IntStream.range(0, workerNum).forEach(id -> {
Worker worker = new Worker(id);
workerMap.put(id, worker);
executorService.submit(worker);
}
);
}
public void close() {
workerMap.forEach((k, v) -> v.stop());
//shutdown the consumer thread
if (executorService != null) {
// disable new tasks from being submitted
executorService.shutdown();
try {
// wait a while for existing tasks to terminate
if (!executorService.awaitTermination(60000, TimeUnit.MILLISECONDS)) {
LOGGER.error("Still waiting...");
executorService.shutdownNow(); // cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
LOGGER.error("Pool did not terminate");
}
}
} catch (InterruptedException e) {
LOGGER.error("force to interrupted");
return;
}
LOGGER.info("Exiting normally...");
}
}
}
private class Worker implements Runnable {
int id;
private volatile boolean isRunning;
Worker(int i) {
id = i;
isRunning = true;
}
@Override
public void run() {
while (isRunning) {
try {
// TODO
} catch (Exception e) {
}
}
}
public void stop() {
isRunning = false;
}
}
cache线程池使用
public class ConsumingExecutor {
private Map<String, List<ConsumingWorker>> workerMap = new HashMap<>();
private ExecutorService executor;
public ConsumingExecutor() {
}
public void start(int workerNum) {
int workerNum = 10;
ThreadFactory tf = new ThreadFactoryBuilder()
.setNameFormat("ConsumingWorker-thread-%d")
.setUncaughtExceptionHandler(new LoggingExceptionHandler())
.build();
executor = Executors.newCachedThreadPool(tf);
Runtime.getRuntime().addShutdownHook(new Thread(ConsumingExecutor.this::shutdown));
}
public void addWorker(String key, KafkaTopic topic, ElementBlockingQueue queue, PipelineHolder.PipelineType pipelineType) {
for (int i = 0; i < CONSUMING_WORKER_NUM; ++i) {
ConsumingWorker consumingWorker = new ConsumingWorker(i, topic, queue, pipelineType);
executor.submit(consumingWorker);
if (!workerMap.containsKey(key)) {
List<ConsumingWorker> workers = new ArrayList<>();
workerMap.put(key, workers);
}
workerMap.get(key).add(consumingWorker);
}
}
public void removeWorker(String key) {
if (workerMap.containsKey(key)) {
List<ConsumingWorker> workers = workerMap.get(key);
workers.forEach(worker -> {
worker.shutdown();
});
workerMap.remove(key);
}
}
public void updateWorker(String key, KafkaTopic topic, ElementBlockingQueue queue, PipelineHolder.PipelineType pipelineType) {
removeWorker(key);
addWorker(key, topic, queue, pipelineType);
}
public void shutdown() {
workerMap.entrySet().stream().forEach(workers -> {
if (workers.getValue() != null) {
workers.getValue().forEach(worker -> {
worker.shutdown();
});
}
});
if (null != executor && !executor.isTerminated() && !executor.isShutdown()) {
executor.shutdown();
try {
if (!executor.awaitTermination(EXECUTOR_TERMINATION_WAIT_SEC, TimeUnit.SECONDS)) {
executor.shutdown();
if (!executor.awaitTermination(EXECUTOR_TERMINATION_WAIT_SEC, TimeUnit.SECONDS)) {
LOGGER.error("ConsumingExecutor did not terminate");
}
}
} catch (Exception e) {
LOGGER.error("Waiting active task termination fails.");
}
}
}
}
定时调度线程池使用
public class CleanService {
private ScheduledExecutorService executor;
public void init(ServiceConfig serviceConfig) {
executor = Executors.newScheduledThreadPool(cleanerNumber);
executor.scheduleWithFixedDelay(
new ACleanerTask(expiryDate.longValue(), clientExpiryDate.longValue(), offset.longValue()),
initalDelay,
scanInterval,
TimeUnit.SECONDS);
executor.scheduleWithFixedDelay(new BCleanerTask(expiryDate),
initalDelay,
scanInterval,
TimeUnit.SECONDS);
}
public void close() {
if (null != executor && !executor.isTerminated() && !executor.isShutdown()) {
LOGGER.info("close clean executor ..");
executor.shutdownNow();
}
}
}
网友评论