美文网首页
实现根据key分片的线程池-ShardThreadPoolExe

实现根据key分片的线程池-ShardThreadPoolExe

作者: sandy_cheng | 来源:发表于2020-05-31 22:17 被阅读0次

    实现一个线程池,该线程池可根据key来确定让哪一个线程来执行。该线程池是固定大小的线程池,初始化后不可改变线程的数量,主要由阻塞队列blockingqueue及执行者Work构成,结构如下:

    package com.sandy.util;

    import java.util.ArrayList;

    import java.util.List;

    import java.util.Map;

    import java.util.concurrent.*;

    import java.util.concurrent.locks.Condition;

    import java.util.concurrent.locks.LockSupport;

    import java.util.concurrent.locks.ReentrantLock;

    /**

    * @author: chengyu

    * @create: 2020-05-31 21:21

    **/

    public class ShardThreadPoolExecutorextends AbstractExecutorService {

    private final ReentrantLockmainLock =new ReentrantLock();

    /**

    * Wait condition to support awaitTermination

    */

        private final Conditiontermination =mainLock.newCondition();

    private final Conditionshutdown =mainLock.newCondition();

    /**

    * cpu cores

    */

        private IntegerpoolSize = Runtime.getRuntime().availableProcessors();

    /**

    * thread pool name

    */

        private StringpoolName ="shard-thread-pool-executor";

    /**

    * task to be execute

    */

        private List>commanders;

    /**

    * blocking queue

    */

        private Classclazz = ArrayBlockingQueue.class;

    /**

    * worker thread

    */

        private Mapworkers =new ConcurrentHashMap<>();

    /**

    * work queue map

    */

        private Map>workQueueMap =new ConcurrentHashMap<>();

    /**

    * close thread pool

    */

        private boolean shutDown;

    /**

    * terminate thread pool

    */

        private boolean shutDownNow;

    /**

    * queue size

    */

        private IntegerqueueSize =200;

    public ShardThreadPoolExecutor(int poolSize, String poolName, Class clazz,

    Integer queueSize) {

    if(poolSize <0 || poolName ==null || poolName.trim() =="" || clazz ==null || queueSize ==null || queueSize <0){

    throw new IllegalArgumentException();

    }

    this.poolSize = poolSize;

    this.poolName = poolName;

    this.clazz = clazz;

    initCommanders();

    }

    public ShardThreadPoolExecutor() {

    initCommanders();

    }

    /**

    * init blocking queue

    *

        * @param

        * @return void @createTime:2020/5/24 9:54

        * @author: chengyu3

    */

        private void initCommanders() {

    commanders =new ArrayList<>(poolSize);

    for (int i =0; i

    try {

    commanders.add((BlockingQueue)clazz.getConstructor(Integer.TYPE).newInstance(queueSize));

    }catch (Exception e) {

    throw new IllegalStateException(e);

    }

    }

    }

    @Override

        public void shutdown() {

    shutDown =true;

    for (Work work :workers.values()) {

    mainLock.lock();

    while (!tryTerminateWork(work)) {

    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(50));

    }

    mainLock.unlock();

    }

    }

    private boolean tryTerminateWork(Work work) {

    if (work.state ==1 &&workQueueMap.get(work).isEmpty()) {

    work.interrupt();

    return true;

    }

    return false;

    }

    @Override

        public List shutdownNow() {

    shutDownNow =true;

    shutDown =true;

    for (Work work :workers.values()) {

    work.thread.interrupt();

    }

    return new ArrayList(workers.values());

    }

    @Override

        public boolean isShutdown() {

    return shutDown;

    }

    @Override

        public boolean isTerminated() {

    // TODO

            throw new UnsupportedOperationException();

    }

    @Override

        public boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException {

    long nanos = unit.toNanos(timeout);

    final ReentrantLock mainLock =this.mainLock;

    mainLock.lock();

    try {

    for (;;) {

    if (workers.size() ==0){

    return true;

    }

    if (nanos <=0){

    return false;

    }

    nanos =termination.awaitNanos(nanos);

    }

    }finally {

    mainLock.unlock();

    }

    }

    @Override

        public void execute(Runnable command) {

    if (!(commandinstanceof Command)) {

    throw new IllegalArgumentException("param must be instance of ShardThreadPoolExecutor Command");

    }

    Command cmd = ((Command) command);

    Object shardObject = cmd.getShardKey();

    if (shardObject ==null) {

    throw new IllegalStateException("shardObject is null");

    }

    if (poolSize >workers.size() && !hasWorker(cmd)) {

    // execute new work

                newWork(cmd);

    }else {

    // add to queue

                try {

    getCommandQueue(cmd).put(cmd);

    }catch (InterruptedException e) {

    return;

    }

    }

    }

    private boolean hasWorker(Command command) {

    return workers.containsKey(getBlockQueueIndex(command));

    }

    private BlockingQueue getCommandQueue(Command command) {

    return commanders.get(getBlockQueueIndex(command));

    }

    private Integer getBlockQueueIndex(Command command) {

    int hashCode = command.getShardKey().hashCode();

    // can do better

            return Math.abs(hashCode %poolSize);

    }

    private void newWork(Command command) {

    Work work =new Work(command);

    work.thread.start();

    workQueueMap.put(work, getCommandQueue(command));

    workers.put(getBlockQueueIndex(command), work);

    }

    private Command getCommand(BlockingQueue commandDeque) {

    if (shutDownNow) {

    return null;

    }

    try {

    return commandDeque.take();

    }catch (InterruptedException e) {

    return null;

    }

    }

    private void removeWork(Work work) {

    if (isShutdown()) {

    Integer key =null;

    for (Map.Entry workEntry :workers.entrySet()) {

    if (work == workEntry.getValue()) {

    key = workEntry.getKey();

    }

    }

    workers.remove(key);

    }

    }

    public interface Commandextends Runnable {

    String getShardKey();

    }

    private class Workextends Thread {

    private Commandcommand;

    private Threadthread;

    private int state =0;

    public Work(Command command) {

    thread =new Thread(this::run, poolName + "-" + workers.size());

    this.command = command;

    }

    @Override

            public void run() {

    execute(command);

    }

    private void execute(Command command) {

    BlockingQueue commandDeque = getCommandQueue(command);

    Command cmd = command;

    while (cmd !=null || (cmd = getCommand(commandDeque)) !=null) {

    try {

    state =0;

    cmd.run();

    state =1;

    }finally {

    cmd =null;

    }

    }

    removeWork(this);

    }

    }

    }

    这部分代码中,shutdown和shutdownNow目前还有些问题,后续将会继续修改一下。

    使用此工具类很简单,测试代码:

    ShardThreadPoolExecutor executor =new ShardThreadPoolExecutor();

    for(int i =0; i <100; i++){

    String key = i %2 ==0?"a":"b";

    String value = i %2 ==0?"a_a":"b_b";

    ShadingCommand command =new ShadingCommand<>(key,value);

    executor.execute(command);

    }

    executor.shutdown();

    executor.awaitTermination(1L, TimeUnit.SECONDS);

    其中ShadingCommand需要实现Command接口(此接口在ShardThreadPoolExecutor类中),否则会抛出异常,使用者可根据任务需要自定义实现类,ShadingCommand实现:

    private class ShadingCommandimplements ShardThreadPoolExecutor.Command{

    private Stringkey;

    private T t;

    public ShadingCommand(String key,T t){

    this.key = key;

    this.t = t;

    }

    @Override

        public String getShardKey() {

    return key;

    }

    @Override

        public void run() {

    try {

    Thread.sleep(100L);

    }catch (InterruptedException e) {

    e.printStackTrace();

    }

    System.out.println(Thread.currentThread().getName()+",key:"+key+",value:"+t);

    }

    }

    测试结果:

    shard-thread-pool-executor-1,key:b,value:b_b

    shard-thread-pool-executor-0,key:a,value:a_a

    shard-thread-pool-executor-0,key:a,value:a_a

    shard-thread-pool-executor-1,key:b,value:b_b

    shard-thread-pool-executor-0,key:a,value:a_a

    shard-thread-pool-executor-1,key:b,value:b_b

    shard-thread-pool-executor-1,key:b,value:b_b

    shard-thread-pool-executor-0,key:a,value:a_a

    shard-thread-pool-executor-0,key:a,value:a_a

    shard-thread-pool-executor-1,key:b,value:b_b

    可以看到,相同key的数据只会被相同线程处理。

    相关文章

      网友评论

          本文标题:实现根据key分片的线程池-ShardThreadPoolExe

          本文链接:https://www.haomeiwen.com/subject/jpgrzhtx.html