美文网首页
消息通信

消息通信

作者: 9283856ddec1 | 来源:发表于2020-02-18 22:25 被阅读0次

为什么需要消息通信?

Android的主线程即UI线程是非安全线程,只允许UI线程更新UI状态。其它任务线程处理完事务后期望更新UI状态,只能发消息通知UI线程进行更新操作。假如多个线程同时通知UI线程更新状态势必造成冲突,会导致更新事件的丢失。通过增加消息池将收到的消息缓存起来,如此能够保证收到的各个通知信息得到执行。

消息驱动机制

Looper & Handler机制.png
  • Runnalbe和Message可以被压入到MessageQueue中,形成一个集合。注意一般情况下某种类型的MessageQueue只允许保存相同类型的对象,图中为了叙述方便将它们混放在同一个MessageQueue中,实际源码对Runnable进行相应的转换。
  • Looper循环的取出消息,然后传给Handler进行处理,如此循环往复。假如队列为空,那么它会进入休眠。
  • Handler是真正“处理事情”的地方。

可以看出,上面的几个对象是缺一不可的,它们各司其职,就像一台计算机中CPU的工作方式:中央处理器(Looper)不断的从内存(MessageQueue)中读取指令(Message),执行指令(Handler),最终产生结果。Handler和Looper在同一个线程。

各部分介绍

消息--Message

消息用来表示一个可执行的任务,通常在消息内部会封装标识、执行时间和数据。Message本身是一个单向链表结构。

public final class Message implements Parcelable{
    public int what;  // 消息码
    public int arg1; 
    public int arg2;
    public Object obj; // 传递的对象
    public Messenger replyTo;
    long when; // 指定消息执行的时间
    Bundle data;
    Handler target;  // 目标Handler
    Runnable callback;
    Message next; // 下一条消息的引用
    private static final Object sPoolSync = new Object(); // 消息池的锁
    private static Message sPool;  // 消息池
    private static int sPoolSize = 0; // 消息池当前大小
    private static final int MAX_POOL_SIZE = 50; // 消息池最大值
    
    public static Message obtain(){
        synchronized(sPoolSync){
            if(sPool != null){
                Message m = sPool; // 指向消息池头部
                sPool = m.next;  // 消息池头部指向下一条消息
                m.next = null;
                sPoolSize--; // 消息池大小减1
                return m;
            }
            return new Message(); // 消息池为空时创建一个消息
        }
    }
    // 代码省略
    public void recycle(){
        clearForRecycle(); // 清空消息记录
        synchronized(sPoolSync){
            if(sPoolSize < MAX_POOL_SIZE){
                next = sPool;
                sPool = this; // 将当前消息加入到消息池头部
                sPoolSize++; // 消息池当前消息数加1
            }
        }
    }
}

说明: Message实现了Parcelable接口,因此Message对象可以写入Parcel中,也可以从Parcel中还原已写入的对象,这相当于Java的序列号和反序列化机制。可序列化保证Message对象可以通过Intent或者Binder远程传播。

消息队列--MessageQueue

public class MessageQueue{
    private final boolean mQuitAllowed;
    private int mPtr; // used by native code
    ...
    private natvie void nativeInit();

    MessageQueue(boolean quiteAllowed){
        mQuitAllowed = quitAllowed;
        nativeInit();
    }

   boolean enqueueMessage(Message msg, long when) {
        // 入队的消息必须制定target 
        if (msg.target == null) {
            throw new IllegalArgumentException("Message must have a target.");
        }
        // 判断该消息的FLAG_IN_USE标记是否被设置,新消息不会设置该标记位
        if (msg.isInUse()) {
            throw new IllegalStateException(msg + " This message is already in use.");
        }
 
        synchronized (this) {
            // 表示处理消息的目标端Handler所在线程已经异常退出
            if (mQuitting) {
                IllegalStateException e = new IllegalStateException(
                        msg.target + " sending message to a Handler on a dead thread");
                Log.w(TAG, e.getMessage(), e);
                msg.recycle();
                return false;
            }
 
            msg.markInUse();
            // 记录消息何时开始处理
            msg.when = when;
            // 消息队列头部,p相当于工作指针,指向当前消息
            Message p = mMessages;
            boolean needWake;
            /*
             * 对应三种情况:(1) 消息队列为空;(2)新消息需要立刻处理;
             * (3)新消息处理时间早于消息队列头部消息的处理时间;
             */
            if (p == null || when == 0 || when < p.when) {
                // New head, wake up the event queue if blocked.
                msg.next = p; //消息队列头部的消息后移
                mMessages = msg; // 将新消息插入消息队列头部,这样可以立即处理
                needWake = mBlocked;
            } else {
                // 对应第四种情况:新消息处理时间晚于消息队列头部的处理时间
                needWake = mBlocked && p.target == null && msg.isAsynchronous();
                Message prev;
                // 根据消息处理时间,检索消息应该插入消息队列哪个位置
                for (;;) {
                    prev = p;
                    p = p.next;
                    // 循环退出条件:到达消息队列末尾或者在消息队列中找到一个处理时间大 
                    // 于新消息处理时间的消息节点
                    if (p == null || when < p.when) {
                        break;
                    }
                    if (needWake && p.isAsynchronous()) {
                        needWake = false;
                    }
                }
                msg.next = p; // 找到新消息插入位置
                prev.next = msg;
            }
 
            // We can assume mPtr != 0 because mQuitting is false.
            if (needWake) {
                nativeWake(mPtr);
            }
        }
        return true;
    }
}

说明:
1)成员变量mQuitAllowed表示当前消息循环是否允许退出,nativeInit会在本地创建一个NativeMessageQueue对象,然后直接赋给MessageQueue中的成员变量mPtr。
2)新消息加入消息队列时,如果新消息加入到消息队列头部,且此时处理消息的线程处于block状态,则需要调用nativeWake方法唤醒处理线程。JNI层是通过write系统调用向管道写入“W”字符串,这样处理消息的线程会因为I/O事件而被唤醒。

线程处于空闲状态符合下面两种情景:
1)线程的消息队列为空;
2)消息队列头部消息的处理时间未到;
空闲消息处理函数用于在线程暂无消息处理时做一些辅助工作,其重要应用之一是在GcIdler中完成空闲时内存垃圾回收,另个重要应用是在Home启动后进入空闲状态是发送BOOT_COMPLETED广播。

常见的方法:

  • 元素入队
final boolean enqueueMessage(Message msg, long when);
  • 元素出队
final Message next();
  • 删除元素
final void removeMessage(Handlder h, int what, Object object);
final void removeMessage(Handler h, Runnable r, Object object);
  • 销毁队列
    通过本地函数nativeDestory来销毁一个MessageQueue;

消息循环--Looper

主线程Looper
/*frameworks/base/core/java/android/app/ActivityThread.java*/
public static void main(String[] args){
    ...
    Looper.prepareMainLooper();
    ActivityThread thread = new ActivityThread();
    thread.attach(false);
    if(sMainThreadHandler == null){
        sMainThreadHandler = thread.getHander();
    }
    AsyncTask.init();
    Looper.loop();
}

/*frameworks/base/core/java/android/os/Looper.java*/
public static void prepareMainLooper(){
    // 线程不允许退出
    prepare(false);
    synchronized(Looper.class){
        if(sMainLooper != null){
            throw new IllegalStateException("The main Looper has already been prepared");
        }
        sMainLooper = myLooper();
    }
}

说明:prepareMainLooper需要用prepare,经过prepare后myLooper就可以得到一个本地线程<ThreadLocal>的Looper对象,最后赋值给sMainLooper。为了区分普通Looper,使用getMainLooper()获取主线程的Looper对象。

普通Looper线程
class LooperThread extends Thread{
    public Handler mHandler;
    public void run(){
        // Looper线程准备阶段
        Looper.prepare();
        mHandler = new Handler(){
            public void handleMessage(Message msg){
                // 处理具体消息
            }
        };
        // Looper线程循环阶段
        Looper.loop();
    }
}
Looper循环
public static void loop(){
    Looper me = myLooper();
    if(me == null){
        throw new RuntimeException("No Looper; Looper.prepare() wasn't 
                                                        called on this thread.");
    }
    MessageQueue queue = me.mQueue;
    ...
    while(true){
        Message msg = queue.next();
        if(msg != null){
            if(msg.target == null){
                // No target is a magic identifier for the quit message.
                return;
            }
            ...
            msg.target.dispatchMessage(msg);  // 处理消息
            ...
            msg.recycle(); // 回收消息
        }
    }
}

loop方法实质就是建立一个死循环,然后通过从消息队列中逐个取出消息,最后处理消息的过程。

消息处理器--Handler

Handler与Looper、MessageQueue关联
public class Hander{
    ...
    final MessageQueue mQueue;
    final Looper mLooper;
    final Callback mCallback;
    IMessage mMessage; // 用于跨进程的消息发送
    
    public Handler(){
        // 通过sThreadLocal.get()来获取当前线程中的Looper实例
        mLooper = Looper.myLooper();
        if(mLooper == null){
            throw new RuntimeException("Can't create handler inside thread that has not called Looper.prepare()");
        }
        mQueue = mLooper.mQueue;
        mCallback = null;
    }
}

注意:使用UI线程的Looper,但是在子线程中创建Handler会抛出异常?因为Looper对象是ThreadLocal的,即每个线程都有自己的Looper,这个Looper可以为空。当在子线程中创建Handler对象时,此子线程的Looper为空则会抛出异常。

发送消息与处理消息
发送消息
  • post消息
final boolean post(Runnable r);
final boolean postDelayed(Runnable r, long delayMillis);
final boolean postAtTime(Runnable r, long uptimeMillis);
  • send消息
final boolean sendMessage(Message msg);
final boolean sendEmptyMessage(int what);
final boolean sendEmptyAtFrontOfQueue(Message msg);
boolean sendMessageAtTime(Message msg, long uptimeMillis);
final boolean sendMessageDelayed(Message msg, long delayMillis);

Post发送的信息需转换成Message,再调用Send系列函数发送出去。内部转换方法如下:

public final boolean post(Runnable r){
    return sendMessageDelayed(getPostMessage(r), 0);
}

private static Message getPostMessage(Runnable r){
    Message m = Message.obtain();
    // 将Runnable对象设置为Message的回调函数
    m.callback = r;
    return m;
}
处理消息
public void dispatchMessage(Message msg){
    if(msg.callback != null){ // 首先匹配消息中指定的回调方法
        handleCallback(msg);
    }else{
        if(mCallback != null){ // 然后匹配创建Handler时指定的回调方法
            if(mCallback.handleMessage(msg)){
                return;
            }
        }
        handleMessage(msg); // 最后匹配Handler的handleMessage方法
    }
}

从MessageQueue获取到Message通过dispatchMessage(Message msg)进行分发,处理优先级:消息中携带的Runnable对象 --> 创建Handler时指定的回调方法 --> 重载的handleMessage方法。
由此可见,Handler的拓展子类可以通过重载dispatchMessage或者handleMessage来改变它的默认行为。

AsyncQueryHandler

使用场景

AsyncQueryHandler是一个抽象类,主要是用来在异步线程中操作数据库,当访问结束后通知界面更新。

使用方式

使用起来相当方便,只需要两步:
1)继承AsyncQueryHandler类,并实现onXXXComplete方法;
onXXXComplete方法中主要是处理数据库操作结果,用以完成界面更新。如果不处理结果的话,也可以不实现onXXXComplete方法。
2)利用AsyncQueryHandler实例直接调用startxxx()方法;

各个参数说明:

名称 含义
token 令牌,同一个AsyncQueryHandler类对象中,startQuery()和onQueryComplete()方法的token应该是一致的。
cookie 在startQuery()中传入,想在onQueryComplete()中使用的对象,没有的话传递null即可。
uri 操作的数据库对应的uri
projection 想要查询的列
selection 限制条件
selectionArgs 限制条件的具体值
orderBy 排序条件

实现原理

AsyncQueryHandler就是利用分别在主线程和异步线程中的两个handler来实现异步访问数据库的功能的。其工作过程如下图所示[3]:


AsyncQueryHandler的工作过程.jpg

WorkHandler
WorkHandler是一个普通的Handler的子类,主要看下handlerMessage方法:

@Override
public void handleMessage(Message msg) {
    final ContentResolver resolver = mResolver.get();
    if (resolver == null) return;

    WorkerArgs args = (WorkerArgs) msg.obj;
    int token = msg.what;
    int event = msg.arg1;

    switch (event) {
        case EVENT_ARG_QUERY:
            Cursor cursor;
            try {
               cursor = resolver.query(args.uri, args.projection,
                                args.selection, args.selectionArgs,
                                args.orderBy);
                // Calling getCount() causes the cursor window to be filled,
                // which will make the first access on the main thread a lot faster.
                if (cursor != null) {
                    cursor.getCount();
                 }
             } catch (Exception e) {
                Log.w(TAG, "Exception thrown during handling EVENT_ARG_QUERY", e);
                cursor = null;
             }
            args.result = cursor;
            break;

            case EVENT_ARG_INSERT:
                args.result = resolver.insert(args.uri, args.values);
                break;

            case EVENT_ARG_UPDATE:
                args.result = resolver.update(args.uri, args.values, args.selection,
                            args.selectionArgs);
                break;

            case EVENT_ARG_DELETE:
                args.result = resolver.delete(args.uri, args.selection, args.selectionArgs);
                break;
        }

        // args.handler即为AsyncQueryHandler
        Message reply = args.handler.obtainMessage(token);
        reply.obj = args;
        reply.arg1 = msg.arg1;
        reply.sendToTarget();
    }

说明:对数据库的增删改查操作放在WorkHandler对象所在的异步线程中,在handlerMessage方法的最后将处理结果封装到Message对象中,发送给AsyncQueryHandler进行处理。

AsyncQueryHandler
AsyncQueryHandler位于主线程中,负责对异步线程处理后的结果进行处理,其代码如下:

public void handleMessage(Message msg) {
    WorkerArgs args = (WorkerArgs) msg.obj;

    if (localLOGV) {
        LogUtil.i(TAG, "QueryHandler.handleMessage: msg.what=" + msg.what
                    + ", msg.arg1=" + msg.arg1);
    }

    int token = msg.what;
    int event = msg.arg1;

    // pass token back to caller on each callback.
    switch (event) {
    case EVENT_ARG_QUERY:
        onQueryComplete(token, args.cookie, (Cursor) args.result);
        break;

    case EVENT_ARG_BULK_INSERT:
        onBulkInsertComplete(token, args.cookie, (Integer) args.result);
        break;
        
    case EVENT_ARG_INSERT:
        onInsertComplete(token, args.cookie, (Uri) args.result);
        break;

    case EVENT_ARG_UPDATE:
        onUpdateComplete(token, args.cookie, (Integer) args.result);
        break;

    case EVENT_ARG_DELETE:
        onDeleteComplete(token, args.cookie, (Integer) args.result);
        break;
    }
}

说明:AsyncQueryHandler的handlerMessage中,调用onXXXComplete方法,就可以完成界面的一些更新以及其它不耗时操作。

参考资料

[1] 深入理解Android内核设计思想,林学森
[2] Android的设计与实现-卷I,杨云君
[3] AsyncQueryHandler详解及使用,(https://blog.csdn.net/weixin_42193691/article/details/82469627)

相关文章

  • 通信:消息

    背景 你已经使用了微服务架构。服务需要处理来自应用客户端的请求。将来,服务有时必须协作起来处理这些请求。他们必须采...

  • 消息通信

    为什么需要消息通信? Android的主线程即UI线程是非安全线程,只允许UI线程更新UI状态。其它任务线程处理完...

  • 04、HTML5-跨文档消息通信

    一、跨文档消息通信(同域名) 二、跨文档消息通信(不同域名) postMessage 三、跨文档消息通信(不同域名...

  • Redis 入门(三):订阅/发布、事务、脚本

    一、Redis 消息通信模式 -- 发布/订阅 Publish/Subscribe 是 Redis 的消息通信模式...

  • CEPH消息通信

    1.CEPH通信连接模型: 首先通信双方建立socket连接,然后server端会向client发送banner和...

  • Handler消息通信

    主线程 在程序启动的时候,就调用Looper.prepareMainLooper方法 可以看到上面的代码主要做了这...

  • 组件间通信

    组件间通信 EventBus实现通信在Activity注册EventBus,在Activity写入消息订阅接收消息...

  • 进程的通信方式

    进程通信 1.进程通信的类型 高级通信机制主要分为三大类:共享存储器系统、消息传递系统、管道通信系统。 其中,消息...

  • React笔记 -- 组件传值

    通信问题 组件会发生三种通信。 向子组件发消息 向父组件发消息 向其他组件发消息 React 只提供了一种通信手段...

  • 计算机网络(二)物理层

    2.1 通信基础 通信的目的是传送消息(消息:语音、文字、图像、视频等)。 2.1.1 基本概念 2.1.1.1 ...

网友评论

      本文标题:消息通信

      本文链接:https://www.haomeiwen.com/subject/fpewxhtx.html