Android 中的多线程

作者: a57ecf3aaaf2 | 来源:发表于2019-08-25 20:49 被阅读0次

    一、线程初探

    Android 中的线程和 Java 中的线程没有本质区别,可通过 Runnable 或 Thread 类构造新的线程执行任务。

    1.1 Thread 核心方法

    对于新建一个线程的工作,操作起来并没有那么麻烦,实例化 Thread 即可。但是,关于线程同步、加锁、等待等相关操作实现起来并没有启动一个线程那样简单。

    1.1.1 wait() 方法

    当一个线程执行到该方法时,它即进入到一个与该对象相关的等待池中,同时释放该对象锁,使得其他线程可以获取该对象的访问权限。

    开发人员可通过 Object 对象的 notify、notifyAll 方法来唤醒当前正在等待的线程,或通过 wait(long millis) 指定睡眠时间来唤醒。

    wait() 方法必须放在 synchronized block 中:

    public Looper getLooper() {
        if (!isAlive()) {
            return null;
        }
    
        // If the thread has been started, wait until the looper has been created.
        synchronized (this) {
            while (isAlive() && mLooper == null) {
                try {
                    wait();
                } catch (InterruptedException e) {
                }
            }
        }
        return mLooper;
    }
    

    wait() 调用后即刻释放对象锁,前提是已经获得了锁,所以必须放在 synchronized block 中,否则会抛出 IllegalMonitorStateException。

    /**
     * @throws IllegalMonitorStateException
     *             if the thread calling this method is not the owner of this
     *             object's monitor.
     */
    public final native void wait() throws InterruptedException;
    

    通过 wait、notify/notifyAll 共同作用,可实现线程等待机制。当条件不满足时,调用 wait 进入等待状态;一旦条件满足,调用 notify/notifyAll 唤醒等待中的线程池,继续执行。

    1.1.2 sleep() 方法

    该函数的作用是使当前调用线程处于睡眠状态,与 wait() 不同的是,若在 synchronized block 中执行 sleep(),虽然线程睡眠了,但是对象锁没有释放,其他线程依然无法访问该对象。

    1.1.3 join() 方法

    该方法表示,等待目标线程执行完毕后再继续执行。其内部是利用 wait() 完成这一功能的:

      /**
        * Blocks the current Thread (<code>Thread.currentThread()</code>) until
        * the receiver finishes its execution and dies.
        *
        * @throws InterruptedException if the current thread has been interrupted.
        *         The interrupted status of the current thread will be cleared before the exception is
        *         thrown.
        * @see Object#notifyAll
        * @see java.lang.ThreadDeath
        */
       public final void join() throws InterruptedException {
           synchronized (lock) {
               while (isAlive()) {
                   lock.wait();
               }
           }
       }
    

    也就是,等待调用该方法的线程对象启动新线程并运行完成后 ,再继续执行调用该方法(只是一个方法,并不与调用的线程对象有关)所在的线程

    1.1.4 yield() 方法

    该方法是一个 native 方法,涉及到系统平台底层逻辑,表示调用该静态方法的线程由运行状态变为就绪状态,让出执行权限供其他线程优先执行(能否优先执行是未知的)。

        /**
         * Causes the calling Thread to yield execution time to another Thread that
         * is ready to run. The actual scheduling is implementation-dependent.
         */
        public static native void yield();
    

    二、线程池

    虽然 Thread、Runnable 为建立新线程提供了便捷,但是这种便捷也会带来各种安全隐患。如果不加控制的肆意新建线程,可能会导致系统资源浪费、资源紧张、系统崩溃等情况的发生。

    而线程池就是为了解决这一问题而产生的,线程池提供了一套完整的管理线程的解决方案,为线程的创建、销毁,以及资源调度提供了有力工具。

    2.1 Callback

    Callback 接口与 Runnable 功能类似,Callback 接口用于线程池中,与 Runnable 不同的是,Callback 是一个包含方法返回值的泛型接口。

    /**
     * A task that returns a result and may throw an exception.
     * Implementors define a single method with no arguments called
     * {@code call}.
     *
     * <p>The {@code Callable} interface is similar to {@link
     * java.lang.Runnable}, in that both are designed for classes whose
     * instances are potentially executed by another thread.  A
     * {@code Runnable}, however, does not return a result and cannot
     * throw a checked exception.
     *
     * <p>The {@link Executors} class contains utility methods to
     * convert from other common forms to {@code Callable} classes.
     *
     * @see Executor
     * @since 1.5
     * @author Doug Lea
     * @param <V> the result type of method {@code call}
     */
    public interface Callable<V> {
        /**
         * Computes a result, or throws an exception if unable to do so.
         *
         * @return computed result
         * @throws Exception if unable to compute a result
         */
        V call() throws Exception;
    }
    

    Runnable 与 Callback 只是简单的提供了线程的运行框架,但是不能很好的管理线程,为此,Future 的出现解决了线程无法管理的问题。

    2.2 Future

    Future 也是一个接口,该接口提供了几个管理线程的方法:

    public interface Future<V> {
    
        boolean cancel(boolean mayInterruptIfRunning);
        boolean isCancelled();
        boolean isDone();
        V get() throws InterruptedException, ExecutionException;
        V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }
    

    cancel 用于取消 Callback 或 Runnable 任务,isCancelled 用于判断任务是否已取消,isDone 用于判断任务是否已完成,get 用于获取任务完成后的结果,该方法会阻塞,直到任务返回结果,或到达超时时间。

    2.3 FutureTask

    RunnableFuture 是 Runnable 和 Future 的实现类,具备两者的所有能力。而真正的角色 FutureTask 继承于 RunnableFuture。

    public interface RunnableFuture<V> extends Runnable, Future<V> {
        /**
         * Sets this Future to the result of its computation
         * unless it has been cancelled.
         */
        void run();
    }
    

    FutureTask 真正地实现了线程的管理,对 Callback、 Runnable 的执行起到了辅助管理的作用。Runnable 最终被被包装成 Callback 进行执行。

        public FutureTask(Runnable runnable, V result) {
            this.callable = Executors.callable(runnable, result);
            this.state = NEW;       // ensure visibility of callable
        }
    
       public static <T> Callable<T> callable(Runnable task, T result) {
            if (task == null)
                throw new NullPointerException();
            return new RunnableAdapter<T>(task, result);
        }
    
      static final class RunnableAdapter<T> implements Callable<T> {
            final Runnable task;
            final T result;
            RunnableAdapter(Runnable task, T result) {
                this.task = task;
                this.result = result;
            }
            public T call() {
                task.run();
                return result;
            }
        }
    

    从源码中可以看出,FutureTask 既是 Runnable 又是 Future,所以可以通过 Thread 包装 FutureTask 执行,并且可以通过 get 方法获取执行后的结果。

    FutureTask 真正的伯乐是 ExecutorService,FutureTask 虽然提供了各种管理线程的方法,但是真正使得这些方法得以施展的功臣其实是 ExecutorService。

    2.4 ThreadPoolExecutor

    ThreadPoolExecutor 是 ExecutorService 的一个实现,构造方法中通过传入不同参数来创建不同功能的线程池。Executors 中的 newFixedThreadPool、newSingleThreadExecutor、newCachedThreadPool 等方法都利用了该类构造了指定功能的线程池。

    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler)
    

    corePoolSize 线程池中的核心线程数。线程池默认是空的,直到有新的任务才会创建线程,prestartAllCoreThreads 方法可以在线程池启动后,立即启动所有核心线程以等待任务。

       /**
         * Starts all core threads, causing them to idly wait for work. This
         * overrides the default policy of starting core threads only when
         * new tasks are executed.
         *
         * @return the number of threads started
         */
        public int prestartAllCoreThreads() {
            int n = 0;
            while (addWorker(null, true))
                ++n;
            return n;
        }
    

    默认情况下,核心线程被创建后会在线程池中一直存活,即便处于闲置状态。allowCoreThreadTimeOut(boolean value) 方法设置为 true,则闲置的核心线程在等待新任务到来时会有超时等待策略,一旦超过等待时间(keepAliveTime),限制核心线程就会被终止。

    maximumPoolSize 线程池中允许创建的最大线程数。当活动线程达到这个数值后,后续的新任务将会被阻塞。当使用无界队列(如
    LinkedBlockingQueue)时,该参数无效。

    keepAliveTime 非核心线程闲置时的超时时长,超过该时长后非核心线程就会被回收。一旦核心线程将 allowCoreThreadTimeOut 方法的入参设置为 true,keepAliveTime 同样会作用于核心线程。unit 为 keepAliveTime 的时间单位。

    workQueue 任务队列。当活动线程数量超过 corePoolSize 指定的数量时,新加入的任务(execute 提交的 Runnable)都会放在该队列中。

    threadFactory 线程工厂,以构造不同的新线程。

    handler 当线程池队列已满,或其他原因导致的无法执行新任务时,会利用该对象处理新加进来的任务。提供的策略如下:
    (1)AbortPolicy 拒绝任务,直接抛出异常,默认策略;
    (2)其他策略:CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy,具体使用规则参见 ThreadPoolExecutor 类中的对应内部类声明。

    2.5 ScheduledThreadPoolExecutor

    除了 ThreadPoolExecutor 可以构建线程池外,ScheduledThreadPoolExecutor 也是一个实现了 ExecutorService,且支持创建定时任务的线程池工具。

    通过 Executors 的 newScheduledThreadPool 方法可以轻松创建定时任务的线程池对象。

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    
    public static ScheduledExecutorService newScheduledThreadPool(
                int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }
    

    三、线程同步

    3.1 原子数据

    在 Java 并发操作时,基本的数据类型,如 Long、Integer,并没有加锁,如果多个线程并发修改该类型的值,就会导致最后的结果不一致,这就是并发带来的问题。

    在 Java 的 java.util.concurrent.atomic 包下,提供了基于基本数据类型的原子类,这些类处理了并发请求导致的数据不一致的问题,相当于对基本数据类型加了 synchronized 关键字。

    public class AtomicLong extends Number implements Serializable
    public class AtomicBoolean implements Serializable
    public class AtomicInteger extends Number implements Serializable
    // ...
    

    在同样的包下,还有几个以 “Adder”结尾的特殊类,如 LongAdder,这些类采用了高竞正资源的分布式处理机制,在高并发场景下的效率高于其基本数据类型的原子类。

    3.1 同步集合

    3.1.1 CopyOnWrite

    CopyOnWrite 机制是一种以空间换时间的机制,这种机制在高并发场景下非常高效,但是因为采用了读写分离的思想,在空间上会占用相对过多的内存。

    实现该机制的类有 CopyOnWriteArrayList、CopyOnWriteArraySet,其中
    CopyOnWriteArrayList 中的 get 方法如下:

    public E get(int index) {
        return (E) elements[index];
    }
    

    调用 get 方法进行读的时候并不需要加锁,若此时有多个线程对该数组添加数据,则读取的仍然是旧的数据,因为写的时候并不会锁住旧的元素数组:

    public synchronized boolean add(E e) {
        Object[] newElements = new Object[elements.length + 1];
        System.arraycopy(elements, 0, newElements, 0, elements.length);
        newElements[elements.length] = e;
        elements = newElements;
        return true;
    }
    

    从上面的 add 方法中可以看出 COW 的读写分离思想:

    当某个线程想要修改这个数组中的元素时,会通过
    System.arraycopy() 方法 copy 一份新的元素数组进行修改,修改完成后再将新的元素设置给该数组列表。

    在新的 JDK 版本中,add 方法使用显示锁了 ReentrantLock,和 synchronized 相比更加高效。

    3.1.2 ConcurrentHashMap

    ConcurrentHashMap 是一种采用分段加锁机制的并发容器。HashMap 不是线程安全的,但是基于 HashMap 的线程安全实现 HashTable,在高并发场景下效率非常低,一个线程调用 put 的同时,其他线程并不能获取 put 权限。

    然而,ConcurrentHashMap 却可以做到,只要当前线程 put 的数据段与另一个线程 put 的数据段不在同一个段内,就可以继续访问 put 权限,使得数据的修改更加高效。

    3.1.3 BlockingQueue

    实现了 BlockingQueue 的类有 ArrayBlockingQueue、LinkedBlockingQueue、LinkedTransferQueue、PriorityBlockingQueue、LinkedBlockingDeque 等,这些类除了应用于线程池,还可以利用阻塞队列的特性解决数据并发读写的问题。

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }
    

    3.2 同步锁

    线程的安全离不开锁,不管是 ConcurrentHashMap,还是 CopyOnWriteArrayList 等线程安全的容器,都或多或少地使用到了线程加锁机制,这也是线程安全的第一要素。

    除了传统的 synchronized 关键字之外,Java 也为我们新增了一些其它的加锁方式。

    3.2.1 ReentrantLock

    ReentrantLock 是 Java 6.0 提供的新的加锁方式,与 synchronized 相比,实现了大致相同的使用方法,但是更加灵活、高效。

    lock // 获取锁
    tryLock // 尝试获取锁
    unlock // 释放锁
    newCondition // 获取锁的条件
    

    以上是显示锁 ReentrantLock 的主要几个方法,一般而言,lock、tryLock 与 unlock 需成对出现,且 unlock 必须放在 finally 中,否则可能导致死锁等问题。

    newCondition 方法返回一个 Condition 对象,可以获取 lock 上的条件,功能与 Object 中的 wait()、notify()、notifyAll() 类似,其主要方法如下:

    await // 线程等待
    signal // 线程唤醒
    signalAll // 唤醒所有等待中的线程
    

    ReentrantLock 一般与 Condition 配合使用,使用方法和 synchronized 配合
    Object 的 wait()、notify() 的使用方法大同小异。

    3.2.2 Semaphore

    信号量,是一个计数型的共享锁,可以通过 acquire、release 方法来获取和释放操作许可。Semaphore 类似一个阻塞队列,只有这些队列中存在空闲位置,其他等待中的线程才能获取许可。这个“许可”是指 acquire 方法节点后面代码的执行权限。

    Semaphore 的作用是控制线程或代码块的并发数量,直到许可权限被释放后才允许新的许可,若许可数量总数为 1,则可以实现和 wait、notify 一样的功能;或者使许可数量总数为 0,若等待线程不满足条件则通过调用 acquire 方法一进入便进入阻塞状态,直到其他线程释放许可。

    3.2.3 CountDownLatch

    CountDownLatch,闭锁,是一个线程同步辅助类,在完成当前线程中的操作之前,允许其他线程一直等待,直到条件满足。

    CountDownLatch 其实是一个倒计数锁,当所有线程启动后,等待线程需调用 await 方法进入等待状态,其他线程通过 countDown 使计数器递减,一旦计数器达到 0,等待线程即刻获取执行权限。

    通过闭锁,我们也可以实现传统 wait、notify 线程等待、唤醒的功能,一旦条件达到随即唤醒等待中的线程。将 CountDownLatch 倒计总数 count 设置为 1,在条件未达到时调用 await 等待,条件达到时调用 countDown 唤醒等待中的线程。

    3.2.4 CyclicBarrier

    CyclicBarrier,循环栅栏,允许一组线程互相等待,直到到达某个屏障点,与 CountDownLatch 不同的是,当达到某个屏障点后对 CyclicBarrier 进行重置,可以重复利用,所以称为“循环” Barrier。

    CyclicBarrier 的其中一个构造函数有两个入参,第一个入参表示计数总数,第二个参数表示计数归 0 后立即执行的 Runnable。

    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
    

    通过调用 await 方法,使计数器递减,同时线程处于等待状态,一旦归 0,则第二个参数的 Runnable 就会执行,同时相互等待的线程继续执行。

    同样地,循环栅栏也可以实现类似 Object 中 wait、notify 的线程等待、唤醒功能。将 CyclicBarrier 总计数设置为 1,当某个条件不满足时调用 await 使得 Runnable 执行,在 Runnable 中修改数据使得条件满足,在等待线程中利用 while 循环,当相同的条件不满足时继续循环,直到条件满足。

    由于同一个循环序列中,Runnable 只会执行一次,所以无需考虑并发问题,直接可以修改条件。

    3.3 其他同步工具

    除了上面介绍的同步工具外,Java 还提供了其他线程同步辅助工具。比如,JDK 1.7 中新增的 Phaser,线程间数据交换工具类 Exchanger,等等。

    四、总结

    Android 开发中涉及到线程的场景比比皆是,不管是 I/O 操作,还是网络请求,类似这些耗时较久的场景,合理使用多线程,能够为程序提供高效的运行环境,提高程序流畅性,保持数据一致性。

    本文由 Fynn_ 原创,未经许可,不得转载!

    相关文章

      网友评论

        本文标题:Android 中的多线程

        本文链接:https://www.haomeiwen.com/subject/atahectx.html