美文网首页
一种有回调的ThreadPool的实现方式

一种有回调的ThreadPool的实现方式

作者: 王岩_shang | 来源:发表于2016-06-30 18:47 被阅读156次

    直接上源码

    
    
    import android.util.Log;
    import java.util.concurrent.Executor;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class ThreadPool {
        @SuppressWarnings("unused")
        private static final String TAG = "ThreadPool";
        private static final int CORE_POOL_SIZE = 4;
        private static final int MAX_POOL_SIZE = 8;
        private static final int KEEP_ALIVE_TIME = 10; // 10 seconds
    
        // Resource type
        public static final int MODE_NONE = 0;
        public static final int MODE_CPU = 1;
        public static final int MODE_NETWORK = 2;
    
        public static final JobContext JOB_CONTEXT_STUB = new JobContextStub();
    
        ResourceCounter mCpuCounter = new ResourceCounter(2);
        ResourceCounter mNetworkCounter = new ResourceCounter(2);
    
        // A Job is like a Callable, but it has an addition JobContext parameter.
        public interface Job<T> {
            public T run(JobContext jc);
        }
    
        public interface JobContext {
            boolean isCancelled();
            void setCancelListener(CancelListener listener);
            boolean setMode(int mode);
        }
    
        private static class JobContextStub implements JobContext {
            @Override
            public boolean isCancelled() {
                return false;
            }
    
            @Override
            public void setCancelListener(CancelListener listener) {
            }
    
            @Override
            public boolean setMode(int mode) {
                return true;
            }
        }
    
        public interface CancelListener {
            public void onCancel();
        }
    
        private static class ResourceCounter {
            public int value;
            public ResourceCounter(int v) {
                value = v;
            }
        }
    
        private final Executor mExecutor;
    
        public ThreadPool() {
            this(CORE_POOL_SIZE, MAX_POOL_SIZE);
        }
    
        public ThreadPool(int initPoolSize, int maxPoolSize) {
            mExecutor = new ThreadPoolExecutor(
                    initPoolSize, maxPoolSize, KEEP_ALIVE_TIME,
                    TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
                    new PriorityThreadFactory("thread-pool",
                    android.os.Process.THREAD_PRIORITY_BACKGROUND));
        }
    
        // Submit a job to the thread pool. The listener will be called when the
        // job is finished (or cancelled).
        public <T> Future<T> submit(Job<T> job, FutureListener<T> listener) {
            Worker<T> w = new Worker<T>(job, listener);
            mExecutor.execute(w);
            return w;
        }
    
        public <T> Future<T> submit(Job<T> job) {
            return submit(job, null);
        }
    
        private class Worker<T> implements Runnable, Future<T>, JobContext {
            @SuppressWarnings("hiding")
            private static final String TAG = "Worker";
            private Job<T> mJob;
            private FutureListener<T> mListener;
            private CancelListener mCancelListener;
            private ResourceCounter mWaitOnResource;
            private volatile boolean mIsCancelled;
            private boolean mIsDone;
            private T mResult;
            private int mMode;
    
            public Worker(Job<T> job, FutureListener<T> listener) {
                mJob = job;
                mListener = listener;
            }
    
            // This is called by a thread in the thread pool.
            @Override
            public void run() {
                T result = null;
    
                // A job is in CPU mode by default. setMode returns false
                // if the job is cancelled.
                if (setMode(MODE_CPU)) {
                    try {
                        result = mJob.run(this);
                    } catch (Throwable ex) {
                        Log.w(TAG, "Exception in running a job", ex);
                    }
                }
    
                synchronized(this) {
                    setMode(MODE_NONE);
                    mResult = result;
                    mIsDone = true;
                    notifyAll();
                }
                if (mListener != null) mListener.onFutureDone(this);
            }
    
            // Below are the methods for Future.
            @Override
            public synchronized void cancel() {
                if (mIsCancelled) return;
                mIsCancelled = true;
                if (mWaitOnResource != null) {
                    synchronized (mWaitOnResource) {
                        mWaitOnResource.notifyAll();
                    }
                }
                if (mCancelListener != null) {
                    mCancelListener.onCancel();
                }
            }
    
            @Override
            public boolean isCancelled() {
                return mIsCancelled;
            }
    
            @Override
            public synchronized boolean isDone() {
                return mIsDone;
            }
    
            @Override
            public synchronized T get() {
                while (!mIsDone) {
                    try {
                        wait();
                    } catch (Exception ex) {
                        Log.w(TAG, "ingore exception", ex);
                        // ignore.
                    }
                }
                return mResult;
            }
    
            @Override
            public void waitDone() {
                get();
            }
    
            // Below are the methods for JobContext (only called from the
            // thread running the job)
            @Override
            public synchronized void setCancelListener(CancelListener listener) {
                mCancelListener = listener;
                if (mIsCancelled && mCancelListener != null) {
                    mCancelListener.onCancel();
                }
            }
    
            @Override
            public boolean setMode(int mode) {
                // Release old resource
                ResourceCounter rc = modeToCounter(mMode);
                if (rc != null) releaseResource(rc);
                mMode = MODE_NONE;
    
                // Acquire new resource
                rc = modeToCounter(mode);
                if (rc != null) {
                    if (!acquireResource(rc)) {
                        return false;
                    }
                    mMode = mode;
                }
    
                return true;
            }
    
            private ResourceCounter modeToCounter(int mode) {
                if (mode == MODE_CPU) {
                    return mCpuCounter;
                } else if (mode == MODE_NETWORK) {
                    return mNetworkCounter;
                } else {
                    return null;
                }
            }
    
            private boolean acquireResource(ResourceCounter counter) {
                while (true) {
                    synchronized (this) {
                        if (mIsCancelled) {
                            mWaitOnResource = null;
                            return false;
                        }
                        mWaitOnResource = counter;
                    }
    
                    synchronized (counter) {
                        if (counter.value > 0) {
                            counter.value--;
                            break;
                        } else {
                            try {
                                counter.wait();
                            } catch (InterruptedException ex) {
                                // ignore.
                            }
                        }
                    }
                }
    
                synchronized (this) {
                    mWaitOnResource = null;
                }
    
                return true;
            }
    
            private void releaseResource(ResourceCounter counter) {
                synchronized (counter) {
                    counter.value++;
                    counter.notifyAll();
                }
            }
        }
    }
    
    

    相关文章

      网友评论

          本文标题:一种有回调的ThreadPool的实现方式

          本文链接:https://www.haomeiwen.com/subject/bjkfjttx.html