美文网首页
MessageQueue源码分析

MessageQueue源码分析

作者: tiancijiaren | 来源:发表于2017-05-16 10:34 被阅读0次

    MessageQueue提供了什么?和Looper、Handler、ThreadLocal、Message共同提供消息发送分发机制。MessageQueue提供消息队列功能,队列中保存Message作为节点,以单向列表实现。

    MessageQueue提供了插入删除消息(包含普通消息和异步消息)、插入删除消息屏障、获取消息、增加删除idle状态的处理、队列清空。

    Paste_Image.png

    “同步分割栏”(消息屏障,具体叫什么都可以吧)可以被理解为一个特殊Message,它的target域为null。它不能通过sendMessageAtTime()等函数打入到消息队列里,而只能通过调用Looper的postSyncBarrier()来打入。
    什么情况下会用到消息屏障呢?当需用通过handler接受的消息和另外的特定事件无法确定执行顺序,并且handler的消息要在特定事件发生后才可以执行时,需要使用消息屏障。例如:当在activity启动时,需要加载界面,其中有一个图片要加载到一个ImageView中,为了提高效率,同时执行界面和图片的加载,那图片加载必须要在界面加载结束后才可以设置到ImageView中,此时可以通过消息屏障来实现。

    “同步分割栏”是起什么作用的呢?它就像一个卡子,卡在消息链表中的某个位置,当消息循环不断从消息链表中摘取消息并进行处理时,一旦遇到这种“同步分割栏”,那么即使在分割栏之后还有若干已经到时的普通Message,也不会摘取这些消息了。请注意,此时只是不会摘取“普通Message”了,如果队列中还设置有“异步Message”,那么还是会摘取已到时的“异步Message”的。

    在Android的消息机制里,“普通Message”和“异步Message”也就是这点儿区别啦,也就是说,如果消息列表中根本没有设置“同步分割栏”的话,那么“普通Message”和“异步Message”的处理就没什么大的不同了。

    Paste_Image.png

    /*

    • Copyright (C) 2006 The Android Open Source Project
    • Licensed under the Apache License, Version 2.0 (the "License");
    • you may not use this file except in compliance with the License.
    • You may obtain a copy of the License at
    •  http://www.apache.org/licenses/LICENSE-2.0
      
    • Unless required by applicable law or agreed to in writing, software
    • distributed under the License is distributed on an "AS IS" BASIS,
    • WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    • See the License for the specific language governing permissions and
    • limitations under the License.
      */

    package com.hfbank;

    import android.os.Binder;
    import android.os.Handler;
    import android.os.Looper;
    import android.os.Message;
    import android.os.SystemClock;
    import android.util.Log;
    import android.util.Printer;

    import java.util.ArrayList;

    /**

    • Low-level class holding the list of messages to be dispatched by a

    • {@link Looper}. Messages are not added directly to a MessageQueue,

    • but rather through {@link Handler} objects associated with the Looper.

    • <p>You can retrieve the MessageQueue for the current thread with

    • {@link Looper#myQueue() Looper.myQueue()}.

    • jxy 在主线程中的looper在loop中是一个死循环,那如何处理其他事件呢?
      */
      public final class MessageQueue {
      // True if the message queue can be quit.
      private final boolean mQuitAllowed;

      @SuppressWarnings("unused")
      private long mPtr; // used by native code

      // jxy Message类是一个"单项链表"的节点,这里mMessages引用指向的是链表的头节点实例
      // 这个链表是按时间排序的
      Message mMessages;
      private final ArrayList<IdleHandler> mIdleHandlers = new ArrayList<IdleHandler>();
      /**

      • jxy 暂时没有理解这个数组是什么作用
      • 当空闲时要执行IdleHandler接口数组,但是如果使用mIdleHandlers,在for循环中执行remove会导致for
        
      • 循环失败,mPendingIdleHandlers完全可以作为局部变量来实现,把mPendingIdleHandlers作为全部变量
        
      • 的含义应该是为了性能,这样有很多概率不用每次都要new一个数组。
        

      */
      private IdleHandler[] mPendingIdleHandlers;
      private boolean mQuitting;

      // Indicates whether next() is blocked waiting in pollOnce() with a non-zero timeout.
      private boolean mBlocked;

      // The next barrier token.
      // Barriers are indicated by messages with a null target whose arg1 field carries the token.
      private int mNextBarrierToken;

      // jxy native方法,链表的wake、idle状态是在native中维护的,此类中只是对数据的操作
      private native static long nativeInit();
      private native static void nativeDestroy(long ptr);
      private native static void nativePollOnce(long ptr, int timeoutMillis);
      private native static void nativeWake(long ptr);
      private native static boolean nativeIsIdling(long ptr);

      /**

      • Callback interface for discovering when a thread is going to block
      • waiting for more messages.
      • jxy Handler线程在执行完所有的Message消息,它会wait,进行阻塞,直到有新的Message到达。
      • 这个线程也太浪费了。可以利用接口IdleHandler增加在Handler线程空闲时执行一些后台操作。
      • 但是这些操作不应该太复杂,否则会影响handler消息的处理
        /
        public static interface IdleHandler {
        /
        *
        • Called when the message queue has run out of messages and will now
        • wait for more. Return true to keep your idle handler active, false
        • to have it removed. This may be called if there are still messages
        • pending in the queue, but they are all scheduled to be dispatched
        • after the current time.
          */
          boolean queueIdle();
          }

      /**

      • Add a new {@link IdleHandler} to this message queue. This may be
      • removed automatically for you by returning false from
      • {@link IdleHandler#queueIdle IdleHandler.queueIdle()} when it is
      • invoked, or explicitly removing it with {@link #removeIdleHandler}.
      • <p>This method is safe to call from any thread.
      • @param handler The IdleHandler to be added.
        */
        public void addIdleHandler(IdleHandler handler) {
        if (handler == null) {
        throw new NullPointerException("Can't add a null IdleHandler");
        }
        // jxy 本类中所有的需要增加同步锁的地方,都使用的是synchronized (this)
        // 这里不写成函数同步应该是为了尽量提高性能,尽量减少同步块中包含的代码
        synchronized (this) {
        mIdleHandlers.add(handler);
        }
        }

      /**

      • Remove an {@link IdleHandler} from the queue that was previously added
      • with {@link #addIdleHandler}. If the given object is not currently
      • in the idle list, nothing is done.
      • @param handler The IdleHandler to be removed.
        */
        public void removeIdleHandler(IdleHandler handler) {
        synchronized (this) {
        mIdleHandlers.remove(handler);
        }
        }

      MessageQueue(boolean quitAllowed) {
      mQuitAllowed = quitAllowed;
      mPtr = nativeInit();
      }

      @Override
      protected void finalize() throws Throwable {
      try {
      dispose();
      } finally {
      super.finalize();
      }
      }

      // Disposes of the underlying message queue.
      // Must only be called on the looper thread or the finalizer.
      private void dispose() {
      if (mPtr != 0) {
      nativeDestroy(mPtr);
      mPtr = 0;
      }
      }

      // jxy 还没有看
      Message next() {
      // Return here if the message loop has already quit and been disposed.
      // This can happen if the application tries to restart a looper after quit
      // which is not supported.
      final long ptr = mPtr;
      if (ptr == 0) {
      return null;
      }

       int pendingIdleHandlerCount = -1; // -1 only during first iteration
       int nextPollTimeoutMillis = 0;
       for (;;) {
           if (nextPollTimeoutMillis != 0) {
               Binder.flushPendingCommands();
           }
      
           nativePollOnce(ptr, nextPollTimeoutMillis);
      
           synchronized (this) {
               // Try to retrieve the next message.  Return if found.
               final long now = SystemClock.uptimeMillis();
               Message prevMsg = null;
               Message msg = mMessages;
               // jxy 当第一条消息是消息屏障时,忽略所有的同步消息, 找出queue中的异步消息
               if (msg != null && msg.target == null) {
                   // Stalled by a barrier.  Find the next asynchronous message in the queue.
                   do {
                       prevMsg = msg;
                       msg = msg.next;
                   } while (msg != null && !msg.isAsynchronous());
               }
               // jxy 走到这一步, 有两种可能,
               // 一种是遍历到队尾没有发现异步消息, 继续等待
               // 另一种是找到queue中的下一个消息,判断消息的时间,并处理链表
               if (msg != null) {
                   // 没有到消息的执行时间
                   if (now < msg.when) {
                       // Next message is not ready.  Set a timeout to wake up when it is ready.
                       nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                   } else {
                       // Got a message.
                       mBlocked = false;
                       if (prevMsg != null) {
                           prevMsg.next = msg.next;
                       } else {
                           mMessages = msg.next;
                       }
                       msg.next = null;
                       if (false) Log.v("MessageQueue", "Returning message: " + msg);
                       return msg;
                   }
               } else {
                   // No more messages.
                   nextPollTimeoutMillis = -1;
               }
      
               // Process the quit message now that all pending messages have been handled.
               if (mQuitting) {
                   dispose();
                   return null;
               }
      
               // If first time idle, then get the number of idlers to run.
               // Idle handles only run if the queue is empty or if the first message
               // in the queue (possibly a barrier) is due to be handled in the future.
               // jxy 如果queue中没有msg, 或者msg没到可执行的时间,
               // 那么现在线程就处于空闲时间了, 可以执行IdleHandler了
               if (pendingIdleHandlerCount < 0
                       && (mMessages == null || now < mMessages.when)) {
                   pendingIdleHandlerCount = mIdleHandlers.size();
               }
               if (pendingIdleHandlerCount <= 0) {
                   // No idle handlers to run.  Loop and wait some more.
                   mBlocked = true;
                   continue;
               }
      
               if (mPendingIdleHandlers == null) {
                   mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
               }
               mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
           }
      
           // Run the idle handlers.
           // We only ever reach this code block during the first iteration.
           // jxy 退出同步块, 接下来就可以执行IdleHandler的相关操作了
           /**
            * jxy 暂时没有理解这个数组是什么作用
            *     当空闲时要执行IdleHandler接口数组,但是如果使用mIdleHandlers,在for循环中执行remove会导致for
            *     循环失败,mPendingIdleHandlers完全可以作为局部变量来实现,把mPendingIdleHandlers作为全部变量
            *     的含义应该是为了性能,这样有很多概率不用每次都要new一个数组。
            */
           for (int i = 0; i < pendingIdleHandlerCount; i++) {
               final IdleHandler idler = mPendingIdleHandlers[i];
               mPendingIdleHandlers[i] = null; // release the reference to the handler
      
               boolean keep = false;
               try {
                   keep = idler.queueIdle();
               } catch (Throwable t) {
                   Log.wtf("MessageQueue", "IdleHandler threw exception", t);
               }
      
               if (!keep) {
                   synchronized (this) {
                       mIdleHandlers.remove(idler);
                   }
               }
           }
      
           // Reset the idle handler count to 0 so we do not run them again.
           pendingIdleHandlerCount = 0;
      
           // While calling an idle handler, a new message could have been delivered
           // so go back and look again for a pending message without waiting.
           nextPollTimeoutMillis = 0;
       }
      

      }

      void quit(boolean safe) {
      if (!mQuitAllowed) {
      throw new IllegalStateException("Main thread not allowed to quit.");
      }

       synchronized (this) {
           if (mQuitting) {
               return;
           }
           mQuitting = true;
      
           if (safe) {
               removeAllFutureMessagesLocked();
           } else {
               removeAllMessagesLocked();
           }
      
           // We can assume mPtr != 0 because mQuitting was previously false.
           nativeWake(mPtr);
       }
      

      }

      // jxy 只有使用Looper.postSyncBarrier来增加同步消息屏障,在使用时必须记录返回的int值,当条件
      // 到达时,删除同步消息屏障要用到返回的int值
      int enqueueSyncBarrier(long when) {
      // Enqueue a new sync barrier token.
      // We don't need to wake the queue because the purpose of a barrier is to stall it.
      synchronized (this) {
      final int token = mNextBarrierToken++;
      // jxy 这里没有为msg.target赋值,不赋值代表:target = null
      final Message msg = Message.obtain();
      msg.markInUse();
      msg.when = when;
      msg.arg1 = token;

           Message prev = null;
           Message p = mMessages;
           if (when != 0) {
               while (p != null && p.when <= when) {
                   prev = p;
                   p = p.next;
               }
           }
           if (prev != null) { // invariant: p == prev.next
               msg.next = p;
               prev.next = msg;
           } else {
               msg.next = p;
               mMessages = msg;
           }
           return token;
       }
      

      }

      void removeSyncBarrier(int token) {
      // Remove a sync barrier token from the queue.
      // If the queue is no longer stalled by a barrier then wake it.
      synchronized (this) {
      Message prev = null;
      Message p = mMessages;
      // jxy 顺序查找消息链表中的消息屏障
      while (p != null && (p.target != null || p.arg1 != token)) {
      prev = p;
      p = p.next;
      }
      // jxy 查找到链表末尾,没有找到
      if (p == null) {
      throw new IllegalStateException("The specified message queue synchronization "
      + " barrier token has not been posted or has already been removed.");
      }
      final boolean needWake;
      if (prev != null) {
      prev.next = p.next;
      // 在消息屏障之前如果还有消息,说明消息还没有执行到这里,不需要唤醒
      needWake = false;
      } else {
      mMessages = p.next;
      needWake = mMessages == null || mMessages.target != null;
      }
      p.recycleUnchecked();

           // If the loop is quitting then it is already awake.
           // We can assume mPtr != 0 when mQuitting is false.
           if (needWake && !mQuitting) {
               nativeWake(mPtr);
           }
       }
      

      }

      boolean enqueueMessage(Message msg, long when) {
      if (msg.target == null) {
      throw new IllegalArgumentException("Message must have a target.");
      }
      if (msg.isInUse()) {
      throw new IllegalStateException(msg + " This message is already in use.");
      }

       synchronized (this) {
           if (mQuitting) {
               IllegalStateException e = new IllegalStateException(
                       msg.target + " sending message to a Handler on a dead thread");
               Log.w("MessageQueue", e.getMessage(), e);
               msg.recycle();
               return false;
           }
      
           msg.markInUse();
           msg.when = when;
           Message p = mMessages;
           boolean needWake;
           if (p == null || when == 0 || when < p.when) {
               // New head, wake up the event queue if blocked.
               msg.next = p;
               mMessages = msg;
               needWake = mBlocked;
           } else {
               // Inserted within the middle of the queue.  Usually we don't have to wake
               // up the event queue unless there is a barrier at the head of the queue
               // and the message is the earliest asynchronous message in the queue.
               needWake = mBlocked && p.target == null && msg.isAsynchronous();
               Message prev;
               for (;;) {
                   prev = p;
                   p = p.next;
                   if (p == null || when < p.when) {
                       break;
                   }
                   if (needWake && p.isAsynchronous()) {
                       needWake = false;
                   }
               }
               msg.next = p; // invariant: p == prev.next
               prev.next = msg;
           }
      
           // We can assume mPtr != 0 because mQuitting is false.
           if (needWake) {
               nativeWake(mPtr);
           }
       }
       return true;
      

      }

      boolean hasMessages(Handler h, int what, Object object) {
      if (h == null) {
      return false;
      }

       synchronized (this) {
           Message p = mMessages;
           while (p != null) {
               if (p.target == h && p.what == what && (object == null || p.obj == object)) {
                   return true;
               }
               p = p.next;
           }
           return false;
       }
      

      }

      boolean hasMessages(Handler h, Runnable r, Object object) {
      if (h == null) {
      return false;
      }

       synchronized (this) {
           Message p = mMessages;
           while (p != null) {
               if (p.target == h && p.callback == r && (object == null || p.obj == object)) {
                   return true;
               }
               p = p.next;
           }
           return false;
       }
      

      }

      boolean isIdling() {
      synchronized (this) {
      return isIdlingLocked();
      }
      }

      private boolean isIdlingLocked() {
      // If the loop is quitting then it must not be idling.
      // We can assume mPtr != 0 when mQuitting is false.
      return !mQuitting && nativeIsIdling(mPtr);
      }

      void removeMessages(Handler h, int what, Object object) {
      if (h == null) {
      return;
      }

       synchronized (this) {
           Message p = mMessages;
      
           // Remove all messages at front.
           while (p != null && p.target == h && p.what == what
                  && (object == null || p.obj == object)) {
               Message n = p.next;
               mMessages = n;
               p.recycleUnchecked();
               p = n;
           }
      
           // Remove all messages after front.
           while (p != null) {
               Message n = p.next;
               if (n != null) {
                   if (n.target == h && n.what == what
                       && (object == null || n.obj == object)) {
                       Message nn = n.next;
                       n.recycleUnchecked();
                       p.next = nn;
                       continue;
                   }
               }
               p = n;
           }
       }
      

      }

      void removeMessages(Handler h, Runnable r, Object object) {
      if (h == null || r == null) {
      return;
      }

       synchronized (this) {
           Message p = mMessages;
      
           // Remove all messages at front.
           while (p != null && p.target == h && p.callback == r
                  && (object == null || p.obj == object)) {
               Message n = p.next;
               mMessages = n;
               p.recycleUnchecked();
               p = n;
           }
      
           // Remove all messages after front.
           while (p != null) {
               Message n = p.next;
               if (n != null) {
                   if (n.target == h && n.callback == r
                       && (object == null || n.obj == object)) {
                       Message nn = n.next;
                       n.recycleUnchecked();
                       p.next = nn;
                       continue;
                   }
               }
               p = n;
           }
       }
      

      }

      void removeCallbacksAndMessages(Handler h, Object object) {
      if (h == null) {
      return;
      }

       synchronized (this) {
           Message p = mMessages;
      
           // Remove all messages at front.
           while (p != null && p.target == h
                   && (object == null || p.obj == object)) {
               Message n = p.next;
               mMessages = n;
               p.recycleUnchecked();
               p = n;
           }
      
           // Remove all messages after front.
           while (p != null) {
               Message n = p.next;
               if (n != null) {
                   if (n.target == h && (object == null || n.obj == object)) {
                       Message nn = n.next;
                       n.recycleUnchecked();
                       p.next = nn;
                       continue;
                   }
               }
               p = n;
           }
       }
      

      }

      private void removeAllMessagesLocked() {
      Message p = mMessages;
      while (p != null) {
      Message n = p.next;
      p.recycleUnchecked();
      p = n;
      }
      mMessages = null;
      }

      private void removeAllFutureMessagesLocked() {
      final long now = SystemClock.uptimeMillis();
      Message p = mMessages;
      if (p != null) {
      if (p.when > now) {
      // jxy 第一个节点就比当前晚,删除所有的节点
      removeAllMessagesLocked();
      } else {
      // jxy 下面是典型的单向链表删除操作,可以画一下图
      Message n;
      // jxy 先找到第一个比当前时间晚的消息节点
      for (;;) {
      n = p.next;
      if (n == null) {
      return;
      }
      if (n.when > now) {
      break;
      }
      p = n;
      }
      // jxy 删除后面的所有节点
      p.next = null;
      do {
      p = n;
      n = p.next;
      p.recycleUnchecked();
      } while (n != null);
      }
      }
      }

      void dump(Printer pw, String prefix) {
      synchronized (this) {
      long now = SystemClock.uptimeMillis();
      int n = 0;
      for (Message msg = mMessages; msg != null; msg = msg.next) {
      pw.println(prefix + "Message " + n + ": " + msg.toString(now));
      n++;
      }
      pw.println(prefix + "(Total messages: " + n + ", idling=" + isIdlingLocked()
      + ", quitting=" + mQuitting + ")");
      }
      }
      }

    相关文章

      网友评论

          本文标题:MessageQueue源码分析

          本文链接:https://www.haomeiwen.com/subject/spzkxxtx.html