实现一个线程池,该线程池可根据key来确定让哪一个线程来执行。该线程池是固定大小的线程池,初始化后不可改变线程的数量,主要由阻塞队列blockingqueue及执行者Work构成,结构如下:
package com.sandy.util;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author: chengyu
* @create: 2020-05-31 21:21
**/
public class ShardThreadPoolExecutorextends AbstractExecutorService {
private final ReentrantLockmainLock =new ReentrantLock();
/**
* Wait condition to support awaitTermination
*/
private final Conditiontermination =mainLock.newCondition();
private final Conditionshutdown =mainLock.newCondition();
/**
* cpu cores
*/
private IntegerpoolSize = Runtime.getRuntime().availableProcessors();
/**
* thread pool name
*/
private StringpoolName ="shard-thread-pool-executor";
/**
* task to be execute
*/
private List>commanders;
/**
* blocking queue
*/
private Classclazz = ArrayBlockingQueue.class;
/**
* worker thread
*/
private Mapworkers =new ConcurrentHashMap<>();
/**
* work queue map
*/
private Map>workQueueMap =new ConcurrentHashMap<>();
/**
* close thread pool
*/
private boolean shutDown;
/**
* terminate thread pool
*/
private boolean shutDownNow;
/**
* queue size
*/
private IntegerqueueSize =200;
public ShardThreadPoolExecutor(int poolSize, String poolName, Class clazz,
Integer queueSize) {
if(poolSize <0 || poolName ==null || poolName.trim() =="" || clazz ==null || queueSize ==null || queueSize <0){
throw new IllegalArgumentException();
}
this.poolSize = poolSize;
this.poolName = poolName;
this.clazz = clazz;
initCommanders();
}
public ShardThreadPoolExecutor() {
initCommanders();
}
/**
* init blocking queue
*
* @param
* @return void @createTime:2020/5/24 9:54
* @author: chengyu3
*/
private void initCommanders() {
commanders =new ArrayList<>(poolSize);
for (int i =0; i
try {
commanders.add((BlockingQueue)clazz.getConstructor(Integer.TYPE).newInstance(queueSize));
}catch (Exception e) {
throw new IllegalStateException(e);
}
}
}
@Override
public void shutdown() {
shutDown =true;
for (Work work :workers.values()) {
mainLock.lock();
while (!tryTerminateWork(work)) {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(50));
}
mainLock.unlock();
}
}
private boolean tryTerminateWork(Work work) {
if (work.state ==1 &&workQueueMap.get(work).isEmpty()) {
work.interrupt();
return true;
}
return false;
}
@Override
public List shutdownNow() {
shutDownNow =true;
shutDown =true;
for (Work work :workers.values()) {
work.thread.interrupt();
}
return new ArrayList(workers.values());
}
@Override
public boolean isShutdown() {
return shutDown;
}
@Override
public boolean isTerminated() {
// TODO
throw new UnsupportedOperationException();
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock =this.mainLock;
mainLock.lock();
try {
for (;;) {
if (workers.size() ==0){
return true;
}
if (nanos <=0){
return false;
}
nanos =termination.awaitNanos(nanos);
}
}finally {
mainLock.unlock();
}
}
@Override
public void execute(Runnable command) {
if (!(commandinstanceof Command)) {
throw new IllegalArgumentException("param must be instance of ShardThreadPoolExecutor Command");
}
Command cmd = ((Command) command);
Object shardObject = cmd.getShardKey();
if (shardObject ==null) {
throw new IllegalStateException("shardObject is null");
}
if (poolSize >workers.size() && !hasWorker(cmd)) {
// execute new work
newWork(cmd);
}else {
// add to queue
try {
getCommandQueue(cmd).put(cmd);
}catch (InterruptedException e) {
return;
}
}
}
private boolean hasWorker(Command command) {
return workers.containsKey(getBlockQueueIndex(command));
}
private BlockingQueue getCommandQueue(Command command) {
return commanders.get(getBlockQueueIndex(command));
}
private Integer getBlockQueueIndex(Command command) {
int hashCode = command.getShardKey().hashCode();
// can do better
return Math.abs(hashCode %poolSize);
}
private void newWork(Command command) {
Work work =new Work(command);
work.thread.start();
workQueueMap.put(work, getCommandQueue(command));
workers.put(getBlockQueueIndex(command), work);
}
private Command getCommand(BlockingQueue commandDeque) {
if (shutDownNow) {
return null;
}
try {
return commandDeque.take();
}catch (InterruptedException e) {
return null;
}
}
private void removeWork(Work work) {
if (isShutdown()) {
Integer key =null;
for (Map.Entry workEntry :workers.entrySet()) {
if (work == workEntry.getValue()) {
key = workEntry.getKey();
}
}
workers.remove(key);
}
}
public interface Commandextends Runnable {
String getShardKey();
}
private class Workextends Thread {
private Commandcommand;
private Threadthread;
private int state =0;
public Work(Command command) {
thread =new Thread(this::run, poolName + "-" + workers.size());
this.command = command;
}
@Override
public void run() {
execute(command);
}
private void execute(Command command) {
BlockingQueue commandDeque = getCommandQueue(command);
Command cmd = command;
while (cmd !=null || (cmd = getCommand(commandDeque)) !=null) {
try {
state =0;
cmd.run();
state =1;
}finally {
cmd =null;
}
}
removeWork(this);
}
}
}
这部分代码中,shutdown和shutdownNow目前还有些问题,后续将会继续修改一下。
使用此工具类很简单,测试代码:
ShardThreadPoolExecutor executor =new ShardThreadPoolExecutor();
for(int i =0; i <100; i++){
String key = i %2 ==0?"a":"b";
String value = i %2 ==0?"a_a":"b_b";
ShadingCommand command =new ShadingCommand<>(key,value);
executor.execute(command);
}
executor.shutdown();
executor.awaitTermination(1L, TimeUnit.SECONDS);
其中ShadingCommand需要实现Command接口(此接口在ShardThreadPoolExecutor类中),否则会抛出异常,使用者可根据任务需要自定义实现类,ShadingCommand实现:
private class ShadingCommandimplements ShardThreadPoolExecutor.Command{
private Stringkey;
private T t;
public ShadingCommand(String key,T t){
this.key = key;
this.t = t;
}
@Override
public String getShardKey() {
return key;
}
@Override
public void run() {
try {
Thread.sleep(100L);
}catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+",key:"+key+",value:"+t);
}
}
测试结果:
shard-thread-pool-executor-1,key:b,value:b_b
shard-thread-pool-executor-0,key:a,value:a_a
shard-thread-pool-executor-0,key:a,value:a_a
shard-thread-pool-executor-1,key:b,value:b_b
shard-thread-pool-executor-0,key:a,value:a_a
shard-thread-pool-executor-1,key:b,value:b_b
shard-thread-pool-executor-1,key:b,value:b_b
shard-thread-pool-executor-0,key:a,value:a_a
shard-thread-pool-executor-0,key:a,value:a_a
shard-thread-pool-executor-1,key:b,value:b_b
可以看到,相同key的数据只会被相同线程处理。
网友评论