美文网首页Android异步处理
Android通信方式篇(一)-消息机制(Java层)

Android通信方式篇(一)-消息机制(Java层)

作者: Stan_Z | 来源:发表于2018-11-24 20:25 被阅读67次

    Android从某种意义上看是一个以消息驱动的系统,内部含有大量以消息驱动的当时进行的交互,比如四大组件的启动、又比如常见的将子线程的任务切换到Handler所在的主线程中执行等等,它属于进程内部的一种通信方式。这篇文章就对android的消息机制做一个简单的梳理。

    一、关键类简介

    消息机制主要涉及Looper/MessageQueue/Message/Handler/ThreadLocal这几个类。

    • Looper:消息泵。一个死循环,有消息来就处理,没有消息就等待。一个线程最多只能有一个Looper对象,一个Looper对应管理此线程的MessageQueue,两者一一对应。

    • MessageQueue:消息队列。内部存储了一组消息,以队列的形式对外提供向消息池投递消息(MessageQueue.enqueueMessage)和取走消息(MessageQueue.next)的工作,内部采用单链表的数据结构来存储消息列表。

    • Message:消息。分为硬件产生的消息(如按钮、触摸)和软件生成的消息。

    • Handler:消息处理者。主要向消息池发送各种消息事件(Handler.sendMessage)和处理相应消息事件(Handler.handleMessage);

    • ThreadLocal:一个线程内部的数据存储类。保证线程内部数据在各线程间相互独立。

    先看看消息机制整体流程:

    from gityuan

    二、源码分析

    2.1 Looper

    Looper的字面意思是“循环者”,它被设计用来使一个普通线程变成Looper线程。所谓Looper线程就是循环工作的线程。

    主要方法有两个:

    1) prepare() : 使Thread变成looper线程,具备循环的能力

    private static void prepare(boolean quitAllowed) {
        if (sThreadLocal.get() != null) {
            throw new RuntimeException("Only one Looper may be created per thread");
        }
        sThreadLocal.set(new Looper(quitAllowed));
    }
    

    看这个方法,就是创建一个Looper放到当前Thread对应的sThreadLocal里去。ThreadLocal前面讲过,它主要作用就是让线程间存储数据相互独立,显然这里它保证的是线程对应的 Looper一一对应且相互独立。

    我们再看看Looper的构造方法:

    private Looper(boolean quitAllowed) {
        mQueue = new MessageQueue(quitAllowed);
        mThread = Thread.currentThread();
    }
    

    Looper内包含了一个初始化的MessageQueue,也关联了当前的Thread。
    那么总结下关系就是:一个Thread对应的ThreadLocal保持了对应的Looper,而Looper关联了MessageQueue与当前Thread。

    2) loop():

    public static void loop() {
        final Looper me = myLooper();//获取当前线程本地存储区存储的 Looper对象
        if (me == null) {
            throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
        }
        final MessageQueue queue = me.mQueue;//获取Looper对象对应的消息队列
        Binder.clearCallingIdentity();
        final long ident = Binder.clearCallingIdentity();
        for (;;) {//looper主循环方法,是个死循环
            Message msg = queue.next(); // 取走消息池的消息去处理,可能会阻塞
            ...
            try {
                msg.target.dispatchMessage(msg);//分发Message
            } 
             ...
            msg.recycleUnchecked();//将消息放入消息池
        }
    }
    

    loop()进入循环模式,不断重复下面的操作,直到没有消息时退出循环

    • 读取MessageQueue的下一条Message; Message msg = queue.next();
    • 把Message分发给相应的target;msg.target.dispatchMessage(msg);
    • 再把分发后的Message回收到消息池,以便重复利用。msg.recycleUnchecked();

    总结: Looper主要就是让线程具备消息泵的循环能力,仅此而已。

    from Jeanboydev
    2.2MessageQueue

    MessageQueue看名字像是一个队列, 其实是以链表的形式保存message。其次,MessageQueue是消息机制的Java层和C++层的连接纽带,大部分核心方法都交给native层来处理。

    例如消息的阻塞处理:

    private native void nativePollOnce(long ptr, int timeoutMillis); /*non-static for callbacks*/
    

    在此我先只关心几个方法:

    1) next():

    之前在Looper 的loop()死循环中见过此方法,作用是取出消息进行处理,而且有可能会阻塞。
    下面我们来看看源码,方法稍微有点长:

    Message next() {
        final long ptr = mPtr;
        if (ptr == 0) {//当消息循环已经退出,直接返回
            return null;
        }
        int pendingIdleHandlerCount = -1; // 循环迭代的首次为-1
        int nextPollTimeoutMillis = 0;
        for (;;) {
            if (nextPollTimeoutMillis != 0) {
                Binder.flushPendingCommands();
            }
           //阻塞操作,当等待nextPollTimeoutMillis时长,或者消息队列被唤醒,都会返回
            nativePollOnce(ptr, nextPollTimeoutMillis);
            synchronized (this) {
                final long now = SystemClock.uptimeMillis();
                Message prevMsg = null;
                Message msg = mMessages;
                if (msg != null && msg.target == null) {
                    do {
                        prevMsg = msg;
                        msg = msg.next;
                    } while (msg != null && !msg.isAsynchronous());
                }
                if (msg != null) {
                    if (now < msg.when) {
                        //当异步消息触发的时间大于当前时间,则设置下一次轮询的超时时长
                        nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                    } else {
                        //获取一条消息
                        mBlocked = false;
                        if (prevMsg != null) {
                            prevMsg.next = msg.next;
                        } else {
                            mMessages = msg.next;
                        }
                        msg.next = null;
                        if (DEBUG) Log.v(TAG, "Returning message: " + msg);
                        msg.markInUse();
                        return msg;
                    }
                } else {
                    //没有消息,当nextPollTimeoutMillis = -1时,表示消息队列中无消息,会一直等待下去
                    nextPollTimeoutMillis = -1;
                }
                // 消息正在退出,返回null
                if (mQuitting) {
                    dispose();
                    return null;
                }
                //当消息队列为空,或者是消息队列的第一个消息时
                if (pendingIdleHandlerCount < 0
                        && (mMessages == null || now < mMessages.when)) {
                    pendingIdleHandlerCount = mIdleHandlers.size();
                }
                if (pendingIdleHandlerCount <= 0) {
                   //没有idle handlers 需要运行,则循环并等待。
                    mBlocked = true;
                    continue;
                }
                if (mPendingIdleHandlers == null) {
                    mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
                }
                mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
            }
            //只有第一次循环时,会运行idle handlers,执行完成后,重置pendingIdleHandlerCount为0.
            for (int i = 0; i < pendingIdleHandlerCount; i++) {
                final IdleHandler idler = mPendingIdleHandlers[i];
                mPendingIdleHandlers[i] = null; //去掉handler的引用
                boolean keep = false;
                try {
                    keep = idler.queueIdle();//idle时执行的方法
                } catch (Throwable t) {
                    Log.wtf(TAG, "IdleHandler threw exception", t);
                }
                if (!keep) {
                    synchronized (this) {
                        mIdleHandlers.remove(idler);
                    }
                }
            }
           //重置idle handler个数为0,以保证不会再次重复运行
            pendingIdleHandlerCount = 0;
           //当调用一个空闲handler时,一个新message能够被分发,因此无需等待可以直接查询pending message.
            nextPollTimeoutMillis = 0;
        }
    }
    

    nativePollOnce是阻塞操作,其中nextPollTimeoutMillis代表下一个消息到来前,还需要等待的时长;当nextPollTimeoutMillis = -1时,表示消息队列中无消息,会一直等待下去。

    当处于空闲时,往往会执行IdleHandler中的方法。当nativePollOnce()返回后,next()从mMessages中提取一个消息。
    nativePollOnce()在native做了大量的工作。

    2) enqueueMessage:

    该方法是添加一条消息到消息队列,下面具体来看看实现:

    boolean enqueueMessage(Message msg, long when) {
        // 每一个普通Message必须有一个target
        if (msg.target == null) {
            throw new IllegalArgumentException("Message must have a target.");
        }
        if (msg.isInUse()) {
            throw new IllegalStateException(msg + " This message is already in use.");
        }
        synchronized (this) {
            if (mQuitting) {//正在退出时,回收msg,加入到消息池
                IllegalStateException e = new IllegalStateException(
                        msg.target + " sending message to a Handler on a dead thread");
                Log.w(TAG, e.getMessage(), e);
                msg.recycle();
                return false;
            }
            msg.markInUse();
            msg.when = when;
            Message p = mMessages;
            boolean needWake;
            if (p == null || when == 0 || when < p.when) {
               //p==null代表MessageQueue没消息,或者msg的触发时间是队列中最早的,则进入该分支
                msg.next = p;
                mMessages = msg;
                needWake = mBlocked;//当阻塞时需要唤醒
            } else {
          //将消息按时间顺序插入到MessageQueue。一般地,不需要唤醒事件队列,除非   
         //消息队头存在barrier,并且同时Message是队列中最早的异步消息。
                needWake = mBlocked && p.target == null && msg.isAsynchronous();
                Message prev;
                for (;;) {
                    prev = p;
                    p = p.next;
                    if (p == null || when < p.when) {
                        break;
                    }
                    if (needWake && p.isAsynchronous()) {
                        needWake = false;
                    }
                }
                msg.next = p; // invariant: p == prev.next
                prev.next = msg;
            }
            // 消息没有退出,我们认为此时mPtr != 0
            if (needWake) {
                nativeWake(mPtr);
            }
        }
        return true;
    }
    

    MessageQueue是按照Message触发时间的先后顺序排列的,队头的消息是将要最早触发的消息。当有消息需要加入消息队列时,会从队列头开始遍历,直到找到消息应该插入的合适位置,以保证所有消息的时间顺序。

    3) removeMessage:

    方法顾名思义,从MessageQueue中移除消息:

    void removeMessages(Handler h, int what, Object object) {
        if (h == null) {
            return;
        }
        synchronized (this) {
            Message p = mMessages;
           //从消息队列头部开始,移除所有符合条件的消息
            while (p != null && p.target == h && p.what == what
                   && (object == null || p.obj == object)) {
                Message n = p.next;
                mMessages = n;
                p.recycleUnchecked();
                p = n;
            }
           //移除剩余的符合要求的消息
            while (p != null) {
                Message n = p.next;
                if (n != null) {
                    if (n.target == h && n.what == what
                        && (object == null || n.obj == object)) {
                        Message nn = n.next;
                        n.recycleUnchecked();
                        p.next = nn;
                        continue;
                    }
                }
                p = n;
            }
        }
    }
    

    这个移除消息的方法,采用了两个while循环,第一个循环是从队头开始,移除符合条件的消息,第二个循环是从头部移除完连续的满足条件的消息之后,再从队列后面继续查询是否有满足条件的消息需要被移除。

    4) postSyncBarrier和removeSyncBarrier

    拦截同步消息和取消拦截,可以自行研究。

    总结:MessageQueue就是一个链表结构的消息池,内部按照Message触发时间的先后顺序排列,提供基本的投递、获取、删除消息的功能。

    from Jeanboydev
    2.3 Message

    message 其实就是对消息封装的对象。针对它,主要了解的是消息池的概念,以及获取消息和回收消息的两个方法。

    1. 消息池:

    message引入了消息池,这样的好处是,当消息池不为空时,可以直接从消息池中获取Message对象,而不是直接创建,提高效率。类似于设计模式中享元模式这么一个概念。

    静态变量sPool的数据类型为Message,通过next成员变量,维护一个消息池;静态变量MAX_POOL_SIZE代表消息池的可用大小;消息池的默认大小为50。

    1. 消息池的主要操作obtain()和recycle()。
    • obtain方法:从消息池中获取消息
    public static Message obtain() {
        synchronized (sPoolSync) {
            if (sPool != null) {
                Message m = sPool;
                sPool = m.next;
                m.next = null;//从sPool中取出一个Message对象,并让消息链断开
                m.flags = 0; // 清除in-use flag
                sPoolSize—;//消息池的可用大小进行-1操作
                return m;
            }
        }
        return new Message();  //当消息池为空,则直接创建Message对象
    }
    

    obtain(): 从消息池取Message,都是把消息池表头的Message取走,再把表头指向next;

    • recycle方法:把不再使用的消息加入消息池
    public void recycle() {
        if (isInUse()) {//判断消息是否正在使用
            if (gCheckRecycle) {// 版本>Android5.0 = true  版本<=Android5.0 = false
                throw new IllegalStateException("This message cannot be recycled because it "
                        + "is still in use.");
            }
            return;
        }
        recycleUnchecked();
    }
    //对于不再使用的消息,加入消息池
    void recycleUnchecked() {
    //将消息标志位设为IN_USE,并清空消息所有参数
        flags = FLAG_IN_USE;
        what = 0;
        arg1 = 0;
        arg2 = 0;
        obj = null;
        replyTo = null;
        sendingUid = -1;
        when = 0;
        target = null;
        callback = null;
        data = null;
        synchronized (sPoolSync) {//当消息池没有满时,将Message对象加入消息池
            if (sPoolSize < MAX_POOL_SIZE) {
                next = sPool;
                sPool = this;
                sPoolSize++;//消息池可用大小+1
            }
        }
    }
    

    recycle(): 将Message加入到消息池的过程,都是把Message加到链表的表头;

    总结: Message 就是消息对象本身,内部有对象池,能一定程度优化频繁创建和销毁消息带来的性能问题。

    2.4 Handler

    Handler是消息处理者,由它发起消息操作。主要扮演了往MessageQueue上添加消息和处理消息的角色。

    首先看看Handler的构造方法:

    public class Handler {
    final Looper mLooper;
    final MessageQueue mQueue;
    final Callback mCallback;
    public Handler(Callback callback, boolean async) {
        if (FIND_POTENTIAL_LEAKS) {
            final Class<? extends Handler> klass = getClass();
            if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) &&
                    (klass.getModifiers() & Modifier.STATIC) == 0) {
                Log.w(TAG, "The following Handler class should be static or leaks might occur: " +
                    klass.getCanonicalName());
            }
        }
        mLooper = Looper.myLooper(); //默认关联当前线程的Looper
        if (mLooper == null) {
            throw new RuntimeException(
                "Can't create handler inside thread that has not called Looper.prepare()");
        }
        mQueue = mLooper.mQueue;//关联当前线程Looper对应的MessageQueue
        mCallback = callback;
        mAsynchronous = async;
       }
    }
    

    那么我们知道了,Handler在哪个线程被创建就属于哪个线程,就为哪个线程服务,因为关联的是当前线程的Looper,操作的也就是当前线程的MessageQueue

    Handler发送和处理消息

    Handler最核心的功能就两个,一个是向MessageQueue发送消息,一个是处理MessageQueue分发出来的消息。

    1) 发送消息:

    handler有许多发送消息的方法:

    post(Runnable)
    postAtTime(Runnable, long)
    postDelayed(Runnable, long)
    sendEmptyMessage(int)
    sendMessage(Message)
    sendMessageAtTime(Message, long)
    sendMessageDelayed(Message, long)
    

    光看这些方法参数你可能会觉得handler能发两种消息,一种是Runnable对象,一种是message对象,这是直观的理解,但其实post发出的Runnable对象最后都被封装成message对象了。

    以post()方法为例:

    private static Message getPostMessage(Runnable r) {
        Message m = Message.obtain();
        m.callback = r;
        return m;
    }
    

    最终还是封装成了Message,而且还是从Message的消息池里取的。

    另外,这些发送消息的方法最终都会走到如下方法:

    private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
        msg.target = this;//把当前msg的target与当前Handler一一对应
        if (mAsynchronous) {//这个值是在Handler的构造方法中设置是否异步
            msg.setAsynchronous(true);
        }
        return queue.enqueueMessage(msg, uptimeMillis);//最终发送消息
    }
    

    2) 处理消息

    消息的处理是通过核心方法dispatchMessage与钩子方法handleMessage完成的

    在Looper的 loop() 方法中:
    msg.target.dispatchMessage(msg); 也就是执行当前Hanlder的dispatchMessage(msg)方法:

    public void dispatchMessage(Message msg) {
        if (msg.callback != null) {
            handleCallback(msg); 
        } else {
            if (mCallback != null) {
                if (mCallback.handleMessage(msg)) {
                    return;
                }
            }
            handleMessage(msg);
        }
    }
    

    都是回调方法的处理,逻辑很明显,不赘述了。

    最后一张图看整个Java层面消息处理流程:


    from Jeanboydev

    相关文章

      网友评论

        本文标题:Android通信方式篇(一)-消息机制(Java层)

        本文链接:https://www.haomeiwen.com/subject/uhwyqqtx.html