美文网首页
Android消息处理机制工作原理

Android消息处理机制工作原理

作者: 未见哥哥 | 来源:发表于2019-06-03 16:21 被阅读0次

    Looper 的工作机制

    Looper 的创建与使用

    Looper 的使用流程

    • 创建 Looper 对象 Looper.prepare()
    • 开始循环 Looper.loop()
    • 在循环中获取 MessageQueue.next() 获取消息处理
    • Looper.quit() 结束

    在子线程中使用 Looper

    //在子线程中使用 Looper
    new Thread() {
        @Override
        public void run() {
            super.run();
            //在子线程中创建 Looper 对象
            Looper.prepare();
            final Handler handler = new Handler(Looper.myLooper());
            //发送消息到消息队列中
            handler.post(new Runnable() {
                @Override
                public void run() {
                    System.out.println("post message in " + Thread.currentThread().getName() + " thread");
                    handler.postDelayed(this, 1000);
                  
                }
            });
            //开始轮训
            Looper.loop();
        }
    }.start();
    

    Looper 是如何创建的,它与 ThreadLocal,Thread,MessageQueue 的是什么关系?

    对于 Looper 来说,每一个线程会保存一份实例,它是通过 ThreadLocal 来保证的,并且在 prepare 方法内部会在 Looper 创建前进行判断当前线程本地变量 ThreadLocal 是否存有 Looper,如果没有才会去创建一个 Looper,这就保证了一个线程只会有一个 Looper ,而对应的 mThread,mQueue 是在 Looper 的构造函数中实例化的,因此它也保证了一个 Looper 对应于一个线程 mThread和一个消息队列 mQueue。

    //SDK28 Looper
    public static void prepare() {
        prepare(true);
    }
    private static void prepare(boolean quitAllowed) {
      
        //从线程本地变量取,如果取到说明当前线程已经创建过了Looper了,不要重复创建,直接报错
        if (sThreadLocal.get() != null) {
            throw new RuntimeException("Only one Looper may be created per thread");
        }
        //将创建的 Looper 保存到 ThreadLocal 中
        sThreadLocal.set(new Looper(quitAllowed));
    }
    
    
    
    private Looper(boolean quitAllowed) {
        mQueue = new MessageQueue(quitAllowed);
        mThread = Thread.currentThread();
    }
    

    Handler 通讯原理

    在 Handler 中不管是通过 sendXxx还是 postXxx 发送 Message 最终都会走到enqueueMessage 方法,也就是将 Message 放入到 MessageQueue 队列中。

    多个线程是通过共享内存的 Message 的方式来实现线程间数据通讯。

    //SDK 28 Handler#enqueueMessage
    private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
        //指定 msg 的 target 为当前 Handler 对象,之后在消息分发时会用到
        msg.target = this;
        if (mAsynchronous) {
            msg.setAsynchronous(true);
        }
        return queue.enqueueMessage(msg, uptimeMillis);
    }
    

    线程间通讯是通过共享 Message 的方式,而 Message 则是通过 MessageQueue 来管理的 ,因此在 Message 入队出队就要考虑在多线程操作的情况可能出现线程安全问题,因此在消息入队 enqueueMessage() 和消息出队 next() 内部使用了 synchronized 锁确保线程安全。这里可以将其看成是一种生产者消费者模式

    //MessageQueue#enqueueMessage
    //===============================消息入队=====================================
    boolean enqueueMessage(Message msg, long when) {
        ...
        //加锁
        synchronized (this) {
            //...
            // msg 添加到队列中
        }
        return true;
    }
    
    
    //===============================消息出队=====================================
    
    Message next() {
            //...
        for (;;) {
            
            //加锁
            synchronized (this) {
                //...
                return msg;
            }
          //...
        }
    }
    

    MessageQueue的工作原理

    Message出队和入队的实现

    MessageQueue 是用来管理 Message 的,这里就设计到 Message 入队和出队两个操作

    入队:根据时间排序,时间优先级存储,那么就可以做到消息延迟执行的。

    1. 插入到队头
    2. 插入到队列中间
    //MessageQueue#enqueueMessage
    Message p = mMessages;//队头
    //当前消息 when 是比对头还早,那么就插入到队头
    if (p == null || when == 0 || when < p.when) {
        // New head, wake up the event queue if blocked.
        msg.next = p;
        //重新更新队头的指针指向为当前入队的消息
        mMessages = msg;
        needWake = mBlocked;
    } else {
        // Inserted within the middle of the queue.  Usually we don't have to wake
        // up the event queue unless there is a barrier at the head of the queue
        // and the message is the earliest asynchronous message in the queue.
        needWake = mBlocked && p.target == null && msg.isAsynchronous();
        Message prev;
        //从队头开始找,找到一个 when < p.when,也就是找到第一个 Message 比当前插入的消息还要晚执行的 
        for (;;) {//这里查找,使用链表的效率会比较低
            prev = p;
            p = p.next;
            if (p == null || when < p.when) {
                break;
            }
            if (needWake && p.isAsynchronous()) {
                needWake = false;
            }
        }
        //将该 msg 插入到队列中,这是一个优先级队列,存储的时候就会排序,它不是链表
        msg.next = p; // invariant: p == prev.next
        prev.next = msg;
    }
    

    消息出队的过程,因为消息在入队时已经是根据 Message.when 排好序了,因此会从队头 mMessages 开始取消息。

    队列中有消息:

    ​ 1.如果当前队头 mMessages 指向的消息还没有执行的时间,那么就开始等待

    ​ 2.如果当前队头 mMessages 指向的消息到了执行时间,那么就取出 mMessages 指向的 msg,并且将其从队列中移除,并标记 markInUse()。

    队列没有消息:

    ​ nextPollTimeoutMillis = -1;这里会进行阻塞,可以参考生产者消费者模式,当消费端没有数据可以消费时,那么就开始等待生成者生产数据。

    出队:由Looper.loop(),启动轮询器,对queue进行轮询。当消息达到执行时间就取出来。当 message queue为空的时候,队列阻塞,等消息队列调用enqueueMessage的时候,通知队列,可以取出消息,停止阻塞。

    //MessageQueue#next()
    synchronized (this) {
        // Try to retrieve the next message.  Return if found.
        final long now = SystemClock.uptimeMillis();
        Message prevMsg = null;
        Message msg = mMessages;//队头
        if (msg != null && msg.target == null) {
            // Stalled by a barrier.  Find the next asynchronous message in the queue.
            do {
                prevMsg = msg;
                msg = msg.next;
            } while (msg != null && !msg.isAsynchronous());
        }
        if (msg != null) {//队列中有消息
            if (now < msg.when) {//如果队头的消息还没有到执行时间,那么就等待...
                // Next message is not ready.  Set a timeout to wake up when it is ready
                nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE
            } else {
                // 获取一个消息
                // Got a message.
                mBlocked = false;
                if (prevMsg != null) {
                    prevMsg.next = msg.next;
                } else {
                    mMessages = msg.next;//更新队头
                }
                //将该消息从队列中移除
                msg.next = null;
                if (DEBUG) Log.v(TAG, "Returning message: " + msg);
                //标记该消息已经使用过了
                msg.markInUse();
                return msg;
            }
        } else {//队列没有消息
            // No more messages.
            nextPollTimeoutMillis = -1;
        }
    

    MessageQueue的阻塞机制

    要了解 MessageQueue 等待唤醒机制之前要看看这个 nextPollTimeoutMillis 的作用,它表示阻塞等待的时间,它分别有以下3种赋值情况:

    • nextPollTimeoutMillis如果该值为0,表示不等待

    • nextPollTimeoutMillis如果该值为-1,表示一直等待,表示当前队列为空,所以需要一直等待,直接有消息入队才会被唤醒。

    • nextPollTimeoutMillis如果该值为指定要等待的时间(非0/-1),一般是当前消息还没有到执行的时间点,所以需要等待。

    消息出队的逻辑 next()

    Message next() {
      ...
      int pendingIdleHandlerCount = -1; // -1 only during first iteration
        int nextPollTimeoutMillis = 0; 
      for (;;) {
        //最终调用 Linux 层实现等待
        /*
        * 1.nextPollTimeoutMillis==0 表示不需要等待
        * 2.nextPollTimeoutMillis==-1,表示无限期等待,一般是队列为空
        * 3.nextPollTimeoutMillis==某一个等待值(不为0、-1)表示当前消息还没有到时间执行,需要等待
        */
        nativePollOnce(ptr, nextPollTimeoutMillis);
        
        if(消息还没到时间执行){
            //等待时间
            nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
        }else{//队列没有消息,队列为空
            nextPollTimeoutMillis = -1;
        }
        ...
        if (pendingIdleHandlerCount <= 0) {
                // No idle handlers to run.  Loop and wait some more.
            //标记为阻塞状态,这个值会在 enqueueMessage 中使用
                mBlocked = true;
                continue;
            }
        ...
      }
    }
    

    消息入队的逻辑 enqueueMessage

    boolean enqueueMessage(Message msg, long when) {
      
        if(队列为空||当前插入消息执行时间早于队头的时间){
            //mBlocked 就是在 next 中设置
            //在队列为空,消息还没有达到执行的时间点这个值 mBlocked 就为 true
            needWake = mBlocked;
        } else{
            //插入到队列中间,这时表示队列不为空,如果当前处于阻塞等待状态,系统会自动唤醒,因为此时肯定是队头的消息还没有到执行的时间点。nextPollTimeoutMillis==某一个等待值(不为0、-1)表示当前消息还没有到时间执行,需要等待
            ...
        }
            if (needWake) {//表示需要唤醒
                nativeWake(mPtr);
            } 
    }
    

    Handler 设计亮点

    消息如何分发呢?

    //1.取出消息
    Message msg = queue.next(); // might block
    //2.分发消息
    msg.target.dispatchMessage(msg);
    //3.回收消息
    msg.recycleUnchecked();
    

    消息分发

    public void dispatchMessage(Message msg) {
        //1.消息需要自己处理,通过msg.callback进行回调
        if (msg.callback != null) {
            handleCallback(msg);
        } else {
            //2.Handler 设置了 Callback
            if (mCallback != null) {
                if (mCallback.handleMessage(msg)) {
                    return;
                }
            }
            //回调 Handler#handleMessage 处理消息
            handleMessage(msg);
        }
    }
    
    private static void handleCallback(Message message) {
        message.callback.run();
    }
    
    1. 第一种一般是 Handler.postXxx 方法这种
    public final boolean post(Runnable r)
    {
       return  sendMessageDelayed(getPostMessage(r), 0);
    }
    
    private static Message getPostMessage(Runnable r) {
        Message m = Message.obtain();
        m.callback = r;//给 message 设置一个 callback
        return m;
    }
    
    1. 第二种是在创建 Handler 时指定的 Callback 接口
    public Handler(Callback callback, boolean async) {
        if (FIND_POTENTIAL_LEAKS) {
            final Class<? extends Handler> klass = getClass();
            if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) &&
                    (klass.getModifiers() & Modifier.STATIC) == 0) {
                Log.w(TAG, "The following Handler class should be static or leaks might occur: " +
                    klass.getCanonicalName());
            }
        }
        mLooper = Looper.myLooper();
        if (mLooper == null) {
            throw new RuntimeException(
                "Can't create handler inside thread " + Thread.currentThread()
                        + " that has not called Looper.prepare()");
        }
        mQueue = mLooper.mQueue;
        mCallback = callback;
        mAsynchronous = async;
    }
    
    1. 自定义类继承Handler 实现 handleMessage 方法。

    Message 享元模式的应用

    Message 是实现线程间通讯的共享资源,而 Message 是存储在 messagequeue 中的,当消息通过 Handler 进行分发之后,那么这个消息就会被回收 msg.recycleUnchecked,具体看如下代码。之前也写过一篇 Message 复用的博客 Android消息处理机制之Message是如何重复利用的?

    //SDK 28 Looper.java
    //1.取出消息
    Message msg = queue.next(); // might block
    //2.分发消息
    msg.target.dispatchMessage(msg);
    //3.回收消息
    msg.recycleUnchecked();
    

    将消息的属性都恢复为默认值状态,并且将其插入到回收 sPool 队列中,那么下次再通过 Message.obtain() 获取 Message 时,可以从缓存的 sPool 队列中获取,节省了内存的开销。

    这里的 recycleUnChecked() 是在 Looper 所在的线程中执行,而 obtain()有可能是在其他线程中执行,那么这里需要使用 synchronized 保证线程安全 。

    通过 recycleUnChecked() 回收,obtain() 获取,这样就减少了频繁创建 Message 的开销,这里推荐使用 Message.obtain() 来获取一个缓存中的 Message 对象,而不是直接去 new Message() 这里也是一个内存优化的方向之一。

    //SDK 28 Message.java
    void recycleUnchecked() {
        // Mark the message as in use while it remains in the recycled object pool.
        // Clear out all other details.
        flags = FLAG_IN_USE;
        what = 0;
        arg1 = 0;
        arg2 = 0;
        obj = null;
        replyTo = null;
        sendingUid = -1;
        when = 0;
        target = null;
        callback = null;
        data = null;
        synchronized (sPoolSync) {//保证取和存是线程安全的
            //这里做了一个队列大小的限制
            if (sPoolSize < MAX_POOL_SIZE) {
                //将 msg 插入到队列中
                next = sPool;
                //sPool 指向当前 msg
                sPool = this;
                sPoolSize++;
            }
        }
    }
    

    从 sPool 中取出一个 Message

    public static Message obtain() {
        synchronized (sPoolSync) {//保证取和存是线程安全的
            if (sPool != null) {
                Message m = sPool;
                sPool = m.next;//更新 sPool 的引用指向下一个 Message 节点
                m.next = null;
                m.flags = 0; // clear in-use flag
                sPoolSize--;
                return m;
            }
        }
        return new Message();
    }
    

    Looper什么时候退出?

    子线程的 Looper 没有退出导致内存泄露,因为 Looper 一直在运行着,那么对应的 Message 处理时可能其它对象的引用,那么就可能出现内存泄露。

    首先要确定,主线程的 Looper 在整个程序运行期间是不会被退出的,如果主线程 Looper 退出,那么表示程序已经退出了,所以这里只需要考虑子线程的 Looper 是如何退出的?

    //SDK 28 Looper.java
    Looper.myLooper().quit();
    //Looper.myLooper().quitSafely();//注意做版本兼容
    

    通知 MessageQueue 退出,MessageQueue 底层实现一套 Linux 等待通知机制,它需要通知 Linux 要退出消息轮训了。

    //SDK 28 Looper.java
    public void quit() {
        mQueue.quit(false);
    }
    
    //SDK 28 MessageQueue.java
    void quit(boolean safe) {
        if (!mQuitAllowed) {
            throw new IllegalStateException("Main thread not allowed to quit.");
        }
        synchronized (this) {
            //避免重复退出
            if (mQuitting) {
                return;
            }
            //标记当前正在退出
            mQuitting = true;
            //消息的回收
            if (safe) {
                removeAllFutureMessagesLocked();
            } else {
                removeAllMessagesLocked();
            }
            // 唤起对应的线程
            // We can assume mPtr != 0 because mQuitting was previously false.
            nativeWake(mPtr);
        }
    }
    

    内部的入队和出队是怎么停止的?

    • 出队的处理
    Message next() {
      
        final long ptr = mPtr;
            if (ptr == 0) {
                return null;
            }
        ...
        for(;;){
            ...
                    // Process the quit message now that all pending messages have been handled.
                    if (mQuitting) {
                        dispose();
                        return null;
                    }
          ...
        }
          ...
    }
    
    
    private void dispose() {
        if (mPtr != 0) {
            nativeDestroy(mPtr);
            mPtr = 0;//标识为0
        }
    }
    
    public static void loop() {
      
            for (;;) {
                //quit() 会返回 null,就退出 for 循环
                    Message msg = queue.next(); // might block
                    if (msg == null) {
                        // No message indicates that the message queue is quitting.
                        return;
                    }
          }
    }       
    
    • 入队的处理
    boolean enqueueMessage(Message msg, long when) {
        ....
        if (mQuitting) {//quit
                IllegalStateException e = new IllegalStateException(
                msg.target + " sending message to a Handler on a dead thread");
                Log.w(TAG, e.getMessage(), e);
                msg.recycle();
                return false;
            }
          ...
    }
    

    Looper.loop() 为什么不会导致主线程不会阻塞出现 ANR?

    Looper 的 loop() 函数设计如下,它是不断地在 for 循环去获取消息,分发消息,回收消息,而这个过程就是在主线程中进行的。

    public static void loop() {
            for(;;){
                Message msg = queue.next(); // might block
                msg.target.dispatchMessage(msg);
                msg.recycleUnchecked();
            }
    }
    

    而主线程的 Looper 是在 ActivityThread 中的 main 函数创建,具体代码如下:

    public static void main(String[] args) {
      
            Looper.prepareMainLooper();//创建主线程 Looper
      
            //...
      
            Looper.loop();
            //从源码可以看出,主线程 Looper 是不能退出,一旦退出,那么就出现
                throw new RuntimeException("Main thread loop unexpectedly exited");
    }  
    
    • 创建主线程 的 Looper Looper.prepareMainLooper();

    主线程的所有操作都会在这里执行,从这里也可以看出,正常情况下,主线程是不会被退出的。

    • 开始消息轮训 Looper.loop();

    如果手贱调用了 Looper.getMainLooper().quit(); 那么程序就会出现以下异常。

    06-03 15:39:46.274 5061-5079/com.example.async E/AndroidRuntime: FATAL EXCEPTION: Thread-297
        Process: com.example.async, PID: 5061
        java.lang.IllegalStateException: Main thread not allowed to quit.
            at android.os.MessageQueue.quit(MessageQueue.java:234)
            at android.os.Looper.quit(Looper.java:216)
            at com.example.async.MainActivity$1$2.run(MainActivity.java:44)
            at android.os.Handler.handleCallback(Handler.java:739)
            at android.os.Handler.dispatchMessage(Handler.java:95)
            at android.os.Looper.loop(Looper.java:135)
            at com.example.async.MainActivity$1.run(MainActivity.java:50)
    

    追踪这个异常是在哪里抛出的?

    //SDk 28 MessageQueue.quit()
    void quit(boolean safe) {
        if (!mQuitAllowed) {//这个值是在构造函数中指定,表明是否可以退出
            throw new IllegalStateException("Main thread not allowed to quit.");
        }
    }
    //SDK 28 MessageQueue构造 
    MessageQueue(boolean quitAllowed) {
        mQuitAllowed = quitAllowed;
        mPtr = nativeInit();
    }
    //SDk Looper.prepareMainLooper
    public static void prepareMainLooper() {
        //这里传入 false 指定当前对应的 MessageQueue 是不可以退出的。
        prepare(false);
        synchronized (Looper.class) {
            if (sMainLooper != null) {
                throw new IllegalStateException("The main Looper has already been prepared.");
            }
            sMainLooper = myLooper();
        }
    }
    

    本文是笔者学习之后的总结,方便日后查看学习,有任何不对的地方请指正。

    记录于 2019年6月3号

    相关文章

      网友评论

          本文标题:Android消息处理机制工作原理

          本文链接:https://www.haomeiwen.com/subject/beocxctx.html