Android Framework | 消息机制

Android Framework | 消息机制

Android小彩虹2021-08-21 6:20:20270A+A-

原文链接:banshan.tech/Android%E6%…

本文分析基于Android P(9.0) 源码

1 概述

消息机制是Android中重要的线程间通信手段。

它的存在可以让一个线程通知另一个线程去工作。那么一个线程为什么会有让另一个线程工作的需求呢?

可以看一个常见的应用场景——UI更新。Google官方有一句话解释了UI更新的规则:The Android UI toolkit is not thread-safe and the view must always be manipulated on the UI thread。 因为UI更新并不是线程安全的,所以Android为了规避开发者可能的不安全操作,干脆将所有UI更新都放在了主线程中进行。在这种场景下,就会出现其他线程请求主线程来帮忙更新UI的需求。

除了UI更新,某些设计模式的实现也离不开消息机制。

下图便是消息机制最基本的工作方式。A线程发送消息到B线程的消息队列中,B线程不断从消息队列中取出新的消息进行处理。

消息机制示意图

线程A在这里表现的就像是一个甩手掌柜,只负责发送消息,却从不干活。而线程B就像是一个苦力,不断地处理到来的消息。

2 详细过程

下图便是消息机制的详细过程,主要分为两个部分:

  1. 消息发送过程
  2. 消息处理过程

消息通过Handler发送到另一个线程的MessageQueue中。另一个线程通过Looper不断轮询消息队列,取出其中的消息,并交给当初发送它的Handler进行处理。

消息机制详细工作模式

上述详细过程有一个前提假设,也即线程B中存在Looper和MessageQueue。事实上,这两样东西并不是天生存在的。所以真正完整的详细过程包含以下三个部分:

  1. 消息队列准备过程
  2. 消息发送过程
  3. 消息处理过程

2.1 消息队列准备过程

在Android应用中,主线程自带Looper和MessageQueue,其他线程如果想具备消息机制的功能,则必须首先调用Looper.prepare()。

主线程为什么会自带Looper和MessageQueue呢?

所有Android应用的主线程都对应一个ActivityThread,正是由于所有Activity的回调方法都运行在主线程,所以Google便用ActivityThread来对应主线程。

ActivityThread的main方法是每个Android应用启动时的入口。通过6642行代码可知,主线程并非自带了Looper和MessageQueue,而是在ActivityThread的main方法中提前为我们创建好了而已。6642行创建了主线程的Looper和MessageQueue(下文有详述),6669行便开始了Looper的循环工作:不断从MessageQueue中取出消息并执行,消息队列为空时就将所在线程挂起休息,有新的消息到来时再起来继续工作。周而复始,永不停歇。

以上就是Android主线程的基本工作模型。至于我们所熟知的onCreate、onDestroy,其实背后也都是消息机制在起作用(当然还有Binder的身影)。

/frameworks/base/core/java/android/app/ActivityThread.java

6623    public static void main(String[] args) {
6624        Trace.traceBegin(Trace.TRACE_TAG_ACTIVITY_MANAGER, "ActivityThreadMain");
6625
6626        // CloseGuard defaults to true and can be quite spammy. We
6627        // disable it here, but selectively enable it later (via
6628        // StrictMode) on debug builds, but using DropBox, not logs.
6629        CloseGuard.setEnabled(false);
6630
6631        Environment.initForCurrentUser();
6632
6633        // Set the reporter for event logging in libcore
6634        EventLogger.setReporter(new EventLoggingReporter());
6635
6636        // Make sure TrustedCertificateStore looks in the right place for CA certificates
6637        final File configDir = Environment.getUserConfigDirectory(UserHandle.myUserId());
6638        TrustedCertificateStore.setDefaultUserDirectory(configDir);
6639
6640        Process.setArgV0("<pre-initialized>");
6641
6642        Looper.prepareMainLooper();
6643
6644        // Find the value for {@link #PROC_START_SEQ_IDENT} if provided on the command line.
6645        // It will be in the format "seq=114"
6646        long startSeq = 0;
6647        if (args != null) {
6648            for (int i = args.length - 1; i >= 0; --i) {
6649                if (args[i] != null && args[i].startsWith(PROC_START_SEQ_IDENT)) {
6650                    startSeq = Long.parseLong(
6651                            args[i].substring(PROC_START_SEQ_IDENT.length()));
6652                }
6653            }
6654        }
6655        ActivityThread thread = new ActivityThread();
6656        thread.attach(false, startSeq);
6657
6658        if (sMainThreadHandler == null) {
6659            sMainThreadHandler = thread.getHandler();
6660        }
6661
6662        if (false) {
6663            Looper.myLooper().setMessageLogging(new
6664                    LogPrinter(Log.DEBUG, "ActivityThread"));
6665        }
6666
6667        // End of event ActivityThreadMain.
6668        Trace.traceEnd(Trace.TRACE_TAG_ACTIVITY_MANAGER);
6669        Looper.loop();
6670
6671        throw new RuntimeException("Main thread loop unexpectedly exited");
6672    }

Looper.prepareMainLooper是一个静态方法,它的作用是为主线程创建一个Looper和MessageQueue。其最终调用了prepare方法,创建了一个新的Looper并将它写入sThreadLocal字段。

sThreadLocal字段是一个静态变量,按照常理它应该在内存中独一份,且各个线程均可访问的。但这里sThreadLocal利用了TLS(ThreadLocalStorage)的机制,每个线程访问到的sThreadLocal是相互独立的,并不是同一个。所以,主线程调用prepareMainLooper方法,相当于创建了一个线程独有的Looper,并且将这个Looper赋值给名为sMainLooper的静态变量(方便其他线程获取主线程的Looper)。

/frameworks/base/core/java/android/os/Looper.java

114    public static void prepareMainLooper() {
115        prepare(false);
116        synchronized (Looper.class) {
117            if (sMainLooper != null) {
118                throw new IllegalStateException("The main Looper has already been prepared.");
119            }
120            sMainLooper = myLooper();
121        }
122    }

/frameworks/base/core/java/android/os/Looper.java

97    public static void prepare() {
98        prepare(true);
99    }
100
101    private static void prepare(boolean quitAllowed) {
102        if (sThreadLocal.get() != null) {
103            throw new RuntimeException("Only one Looper may be created per thread");
104        }
105        sThreadLocal.set(new Looper(quitAllowed));
106    }

消息队列准备过程

Looper的构造方法中会创建一个MessageQueue,所以调用Looper.prepare方法便会创建与线程唯一对应的Looper和MessageQueue。

/frameworks/base/core/java/android/os/Looper.java

267    private Looper(boolean quitAllowed) {
268        mQueue = new MessageQueue(quitAllowed);
269        mThread = Thread.currentThread();
270    }

MessageQueue的构造方法如下,它会调用nativeInit方法在native层做一些初始化的工作。

/frameworks/base/core/java/android/os/MessageQueue.java

70    MessageQueue(boolean quitAllowed) {
71        mQuitAllowed = quitAllowed;
72        mPtr = nativeInit();
73    }
63    private native static long nativeInit();

nativeInit对应的JNI方法为android_os_MessageQueue_nativeInit,其中创建了一个NativeMessageQueue对象,并将该对象的指针转化为long型传递给java层。在Android的世界中,存在大量java层对象和native层对象一一映射的关系,通常都是在java层对象中设立一个long型的字段,用于记录native对象的指针值。

/frameworks/base/core/jni/android_os_MessageQueue.cpp

172static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
173    NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
174    if (!nativeMessageQueue) {
175        jniThrowRuntimeException(env, "Unable to allocate native queue");
176        return 0;
177    }
178
179    nativeMessageQueue->incStrong(env);
180    return reinterpret_cast<jlong>(nativeMessageQueue);
181}

在NativeMessageQueue的构造函数中创建一个native层的Looper,并通过TLS的机制和线程绑定。

/frameworks/base/core/jni/android_os_MessageQueue.cpp

78NativeMessageQueue::NativeMessageQueue() :
79        mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
80    mLooper = Looper::getForThread();
81    if (mLooper == NULL) {
82        mLooper = new Looper(false);
83        Looper::setForThread(mLooper);
84    }
85}

在native层Looper的构造过程中,67行的代码非常关键。它用于mWakeEventFd的初始化,创建出来的eventfd将会在rebuildEpollLocked函数中被epoll监听(151行)。Epoll机制是Linux内核中一种事件触发的机制,可以同时监听多个文件描述符。在调用epoll_wait将线程挂起的时候,如果有被监测的事件产生,则线程从挂起状态恢复,重新恢复运行。这其实是一种中断式的wait/notify机制。如果想了解这个机制的详细内容,可以参考这两篇博客:博客1博客2。博客1中对epoll的基本概念讲述较多,博客2对epoll中的Level Trigger和Edge Trigger讲的非常清楚。

我们以149行到151行的代码为例,EPOLLIN表示监测mWakeEventFd上的可读事件,当该线程调用epoll_wait时,如果mWakeEventFd上有可读事件,则线程直接返回,否则挂起。在该线程挂起的时候,如果有其他线程往mWakeEventFd上写入新的数据,则该线程会接收到事件,并从挂起状态恢复为运行状态。

/system/core/libutils/Looper.cpp

63Looper::Looper(bool allowNonCallbacks) :
64        mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
65        mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),
66        mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
67    mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
68    LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd: %s",
69                        strerror(errno));
70
71    AutoMutex _l(mLock);
72    rebuildEpollLocked();
73}

/system/core/libutils/Looper.cpp

134void Looper::rebuildEpollLocked() {
135    // Close old epoll instance if we have one.
136    if (mEpollFd >= 0) {
137#if DEBUG_CALLBACKS
138        ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this);
139#endif
140        close(mEpollFd);
141    }
142
143    // Allocate the new epoll instance and register the wake pipe.
144    mEpollFd = epoll_create(EPOLL_SIZE_HINT);
145    LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno));
146
147    struct epoll_event eventItem;
148    memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
149    eventItem.events = EPOLLIN;
150    eventItem.data.fd = mWakeEventFd;
151    int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);
152    LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s",
153                        strerror(errno));
154
155    for (size_t i = 0; i < mRequests.size(); i++) {
156        const Request& request = mRequests.valueAt(i);
157        struct epoll_event eventItem;
158        request.initEventItem(&eventItem);
159
160        int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem);
161        if (epollResult < 0) {
162            ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s",
163                  request.fd, strerror(errno));
164        }
165    }
166}

综上所述,一个可以处理消息的线程,必然会有一个唯一的Looper和唯一的MessageQueue。

2.2 消息发送过程

消息通过Handler进行发送。

通过调用Handler类的sendMessage方法,我们可以发送一个消息。sendMessage最终调用的是sendMessageAtTime方法。参数uptimeMillis表示希望消息发送的时间点距离开机时间点的毫秒数,譬如手机15:00:00开机,消息发送者希望这条消息15:00:01准时发送,那么传入的uptimeMillis就是1000。

/frameworks/base/core/java/android/os/Handler.java

602    public final boolean sendMessage(Message msg) 603 {
604        return sendMessageDelayed(msg, 0);
605    }

662    public final boolean sendMessageDelayed(Message msg, long delayMillis) 663 {
664        if (delayMillis < 0) {
665            delayMillis = 0;
666        }
667        return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
668    }

689    public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
690        MessageQueue queue = mQueue;
691        if (queue == null) {
692            RuntimeException e = new RuntimeException(
693                    this + " sendMessageAtTime() called with no mQueue");
694            Log.w("Looper", e.getMessage(), e);
695            return false;
696        }
697        return enqueueMessage(queue, msg, uptimeMillis);
698    }

sendMessageAtTime方法取出Handler的mQueue字段,并调用enqueueMessage方法。enqueueMessage的作用就是将消息加入到消息队列中。首先,将消息的target字段设置为发送时的Handler,表明这个消息被接收后依然由此Handler进行处理。其后根据Handler是否异步来决定发送的消息是否异步。最后调用MessageQueue的enqueueMessage方法。

/frameworks/base/core/java/android/os/Handler.java

740    private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
741        msg.target = this;
742        if (mAsynchronous) {
743            msg.setAsynchronous(true);
744        }
745        return queue.enqueueMessage(msg, uptimeMillis);
746    }

745行的queue是从Handler的sendMessageAtTime方法中传递过来的,它是Handler对象的mQueue字段,在Handler对象的构造方法中被赋值。为了搞清楚mQueue的来及,我们有必要看一看Handler的构造方法。

2.2.1 消息被发送到哪个线程?

Handler的构造方法被重载了很多个,但底层其实都是这两个:

/frameworks/base/core/java/android/os/Handler.java

192    public Handler(Callback callback, boolean async) {
193        if (FIND_POTENTIAL_LEAKS) {
194            final Class<? extends Handler> klass = getClass();
195            if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) &&
196                    (klass.getModifiers() & Modifier.STATIC) == 0) {
197                Log.w(TAG, "The following Handler class should be static or leaks might occur: " +
198                    klass.getCanonicalName());
199            }
200        }
201
202        mLooper = Looper.myLooper();
203        if (mLooper == null) {
204            throw new RuntimeException(
205                "Can't create handler inside thread " + Thread.currentThread()
206                        + " that has not called Looper.prepare()");
207        }
208        mQueue = mLooper.mQueue;
209        mCallback = callback;
210        mAsynchronous = async;
211    }

232    public Handler(Looper looper, Callback callback, boolean async) {
233        mLooper = looper;
234        mQueue = looper.mQueue;
235        mCallback = callback;
236        mAsynchronous = async;
237    }

二者最大的区别就在于,一个传入了Looper,另一个没有传入Looper。

传入Looper的话,Handler对象的mQueue就等于looper.mQueue。假设Handler对象在线程A中创建,其构造时传入的是线程B的Looper,那么通过这个Handler发送的消息就将由线程B来处理。

没有传入Looper的话,Handler对象的mQueue就等于其创建线程的Looper。依然假设Handler对象在线程A中创建,此时构造Handler时没有传入Looper,那么通过这个Handler发送的消息就将由线程A来处理。

请仔细体会上述两种情况的区别。

针对没有传入Looper的情况,这里还要多提几句。Handler对象创建之后,由于它存在于Java堆上,所以可以被任何线程访问、使用。任何线程通过它发送的消息,最终都将汇总到其创建线程的MessageQueue中,包括在它的创建线程中发送消息。

下面看看MessageQueue的enqueueMessage方法做了哪些工作。

/frameworks/base/core/java/android/os/MessageQueue.java

536    boolean enqueueMessage(Message msg, long when) {
537        if (msg.target == null) {
538            throw new IllegalArgumentException("Message must have a target.");
539        }
540        if (msg.isInUse()) {
541            throw new IllegalStateException(msg + " This message is already in use.");
542        }
543
544        synchronized (this) {
545            if (mQuitting) {
546                IllegalStateException e = new IllegalStateException(
547                        msg.target + " sending message to a Handler on a dead thread");
548                Log.w(TAG, e.getMessage(), e);
549                msg.recycle();
550                return false;
551            }
552
553            msg.markInUse();
554            msg.when = when;
555            Message p = mMessages;
556            boolean needWake;
557            if (p == null || when == 0 || when < p.when) {
558                // New head, wake up the event queue if blocked.
559                msg.next = p;
560                mMessages = msg;
561                needWake = mBlocked;
562            } else {
563                // Inserted within the middle of the queue. Usually we don't have to wake
564                // up the event queue unless there is a barrier at the head of the queue
565                // and the message is the earliest asynchronous message in the queue.
566                needWake = mBlocked && p.target == null && msg.isAsynchronous();
567                Message prev;
568                for (;;) {
569                    prev = p;
570                    p = p.next;
571                    if (p == null || when < p.when) {
572                        break;
573                    }
574                    if (needWake && p.isAsynchronous()) {
575                        needWake = false;
576                    }
577                }
578                msg.next = p; // invariant: p == prev.next
579                prev.next = msg;
580            }
581
582            // We can assume mPtr != 0 because mQuitting is false.
583            if (needWake) {
584                nativeWake(mPtr);
585            }
586        }
587        return true;
588    }

跳过enqueueMessage方法中的异常判断,其核心的作用只有一个:将新消息加入MessageQueue中的消息链表中。MessageQueue中的Message通过链表的方式进行管理,其中的消息按照发送时间的先后顺序排列。在管理链表的过程中,只需持有头部对象就可以遍历所有的对象。因此MessageQueue只用了一个字段(mMessages)来记录消息链表的头部消息。

2.2.2 消息应该被插入到链表的什么位置?

557行和562行分别表示对新消息的两种处理方式,第一种是将新消息插入到链表头部,第二种是将新消息插入到链表中间(或尾部)。

先分析插入链表头部的情况。

  • p == null 表示MessageQueue的消息链表为空,也即所有消息发送完毕,新加入的消息理所应当插入到头部。
  • when == 0表示消息通过sendMessageAtTime方法发送,且传入的uptime为0,此类消息优先级最高,不管消息链表中是何种情况,新加入的消息都要插入到头部。
  • when < p.when表示新消息预设的发送时间要早于现有头部消息的发送时间,根据时间越早越靠前的原则,新加入的消息要插入到头部。

除了插入到头部的三种情况外,其他情况下消息都将插入到链表中间(或尾部)。568行的for循环其实就是遍历消息链表,根据发送时间的先后顺序将消息插入到链表中。

2.2.3 消息加入链表后是否应该主动唤醒线程?

除了需要将新消息插入到链表的合适位置,enqueueMessage还要决定是否唤醒MessageQueue所在的线程。MessageQueue的mBlocked字段记录了其所属线程是否已经发生阻塞(被挂起),该字段在消息处理的过程中被赋值。

当新消息插入到链表头部时,needWake = mBlocked:

  1. 如果MessageQueue此时已经发生阻塞,则新消息插入头部时,需要唤醒阻塞线程,以便让它根据头部的新消息重新决定处理逻辑(可能是立即处理,也可能是延时处理)。
  2. 如果MessageQueue此时未发生阻塞,则新消息插入头部后无需做多余处理。它只需要静静地等在那里,线程处理完手中的消息后自然会同它碰面。

当新消息插入到链表中间(或尾部)时,needWake的赋值变得复杂起来。这主要是由于异步消息和同步屏障的存在。

同步屏障像是一个守卫,当消息链表的头部是一个同步屏障时,后续的同步消息都无法被放行,即便这些消息已经满足发送的时间要求。此时,链表上的异步消息却不受影响,它们照常按照发送时间的逻辑,顺利地被处理。

同步屏障是一种特殊的Message,它的target为null,表明这个消息是不需要被处理的,而普通消息的target都是最终来处理该消息的Handler。通过MessageQueue的postSyncBarrier方法可以放置同步屏障,只不过这个方法是hide的,而且从Android P开始,反射调用非 SDK 的接口被限制了。虽然网上有一些手段可以绕开这种限制,但Google的本意应该是不想让开发者再使用同步屏障了。与之对应,撤除同步屏障的方法是removeSyncBarrier。

/frameworks/base/core/java/android/os/MessageQueue.java

461    public int postSyncBarrier() {
462        return postSyncBarrier(SystemClock.uptimeMillis());
463    }

465    private int postSyncBarrier(long when) {
466        // Enqueue a new sync barrier token.
467        // We don't need to wake the queue because the purpose of a barrier is to stall it.
468        synchronized (this) {
469            final int token = mNextBarrierToken++;
470            final Message msg = Message.obtain();
471            msg.markInUse();
472            msg.when = when;
473            msg.arg1 = token;
474
475            Message prev = null;
476            Message p = mMessages;
477            if (when != 0) {
478                while (p != null && p.when <= when) {
479                    prev = p;
480                    p = p.next;
481                }
482            }
483            if (prev != null) { // invariant: p == prev.next
484                msg.next = p;
485                prev.next = msg;
486            } else {
487                msg.next = p;
488                mMessages = msg;
489            }
490            return token;
491        }
492    }

同步消息和异步消息的唯一差异在于Message的flag是否被置上FLAG_ASYNCHRONOUS标志位。这个标志位只在setAsynchronous方法中被改变。如果Handler的mAsynchronous为true,则通过该Handler发送的消息默认都是异步;反之,默认都是同步。除此以外,我们也可以通过消息的setAsynchronous方法来单独地给某个方法设置是否异步。

/frameworks/base/core/java/android/os/Message.java

447    public boolean isAsynchronous() {
448        return (flags & FLAG_ASYNCHRONOUS) != 0;
449    }

477    public void setAsynchronous(boolean async) {
478        if (async) {
479            flags |= FLAG_ASYNCHRONOUS;
480        } else {
481            flags &= ~FLAG_ASYNCHRONOUS;
482        }
483    }

回到新消息插入到链表中间(或尾部)时needWake的赋值,needWake在遍历之初被赋值如下:

/frameworks/base/core/java/android/os/MessageQueue.java

566                needWake = mBlocked && p.target == null && msg.isAsynchronous();

只有当MessageQueue所在的线程阻塞,链表头部为同步屏障,且新消息为异步消息时,needWake才为true。三者缺一不可。

  1. mBlocked为false,表明线程未阻塞,自然不需要唤醒。
  2. p.target != null,表明头部消息有效,此时即便mBlocked为true,这时候的阻塞也是有超时的,超时时间到达后,线程自动唤醒,无需外部唤醒。
  3. msg.isAsynchronous() = false,表明新消息为同步消息,此时若头部消息为同步屏障,则新消息也无法被放行,唤醒线程也没用,干脆不唤醒。

另外在遍历的过程中,如果发现新消息的前面有另一个消息为异步消息,则needWake重新置为false。这种情况表明原有的异步消息为线程设置了有超时的阻塞,当下时间未达到异步消息的发送时间,所以mBlocked为true。但由于此次阻塞设有超时,所以并不需要外不唤醒。

线程的阻塞相当于人类的睡眠,从阻塞状态中恢复有两种可能,一种是超时唤醒,另一个是外部唤醒。类比到人类的睡眠,人从睡梦中被叫醒也有两种可能,一种是自己定闹钟,闹钟响后将自己叫醒,另一种是被别人拍醒(不考虑自然醒,因为自然醒本质也是闹钟叫醒,只不过这个闹钟是生物钟)。

2.2.4 如何主动唤醒线程?

上面介绍了是否应该主动唤醒线程,如果回答“需要”的话,那我们又该怎样去唤醒线程呢?

/frameworks/base/core/java/android/os/MessageQueue.java

584                nativeWake(mPtr);

通过nativeWake的native方法,我们就可以实现唤醒MessageQueue所在线程的目的。它对应的JNI方法是android_os_MessageQueue_nativeWake。传入的mPtr实际上是native对象的指针,它被存在一个Java的字段中,用于Java层和native层的互动。

mPtr被转换成NativeMessageQueue对象(c++对象)的指针,紧接着调用NativeMessageQueue对象的wake方法。

/frameworks/base/core/jni/android_os_MessageQueue.cpp

194static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
195    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
196    nativeMessageQueue->wake();
197}

再追踪下去,发现调用的是NativeMessageQueue中mLooper变量的wake方法。最终只干了一件事:往native层的Looper对象的mWakeEventFd中写一个“1”。结合2.1中对Epoll机制的描述,mWakeEventFd上有可读数据时,epfd将会监测到该事件,并将线程从挂起状态恢复为运行状态。

/frameworks/base/core/jni/android_os_MessageQueue.cpp

121void NativeMessageQueue::wake() {
122    mLooper->wake();
123}

/system/core/libutils/Looper.cpp

398void Looper::wake() {
399#if DEBUG_POLL_AND_WAKE
400    ALOGD("%p ~ wake", this);
401#endif
402
403    uint64_t inc = 1;
404    ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
405    if (nWrite != sizeof(uint64_t)) {
406        if (errno != EAGAIN) {
407            LOG_ALWAYS_FATAL("Could not write wake signal to fd %d: %s",
408                    mWakeEventFd, strerror(errno));
409        }
410    }
411}

2.3 消息处理过程

对于想要运行消息机制的线程而言,除了需要通过Looper.prepare来创建属于自己的Looper和MessageQueue,还需要调用Looper.loop来真正的轮询、处理消息。

/frameworks/base/core/java/android/os/Looper.java

127    public static Looper getMainLooper() {
128        synchronized (Looper.class) {
129            return sMainLooper;
130        }
131    }
132
133    /** 134 * Run the message queue in this thread. Be sure to call 135 * {@link #quit()} to end the loop. 136 */
137    public static void loop() {
138        final Looper me = myLooper();
139        if (me == null) {
140            throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
141        }
142        final MessageQueue queue = me.mQueue;
143
144        // Make sure the identity of this thread is that of the local process,
145        // and keep track of what that identity token actually is.
146        Binder.clearCallingIdentity();
147        final long ident = Binder.clearCallingIdentity();
148
149        // Allow overriding a threshold with a system prop. e.g.
150        // adb shell 'setprop log.looper.1000.main.slow 1 && stop && start'
151        final int thresholdOverride =
152                SystemProperties.getInt("log.looper."
153                        + Process.myUid() + "."
154                        + Thread.currentThread().getName()
155                        + ".slow", 0);
156
157        boolean slowDeliveryDetected = false;
158
159        for (;;) {
160            Message msg = queue.next(); // might block
161            if (msg == null) {
162                // No message indicates that the message queue is quitting.
163                return;
164            }
165
166            // This must be in a local variable, in case a UI event sets the logger
167            final Printer logging = me.mLogging;
168            if (logging != null) {
169                logging.println(">>>>> Dispatching to " + msg.target + " " +
170                        msg.callback + ": " + msg.what);
171            }
172
173            final long traceTag = me.mTraceTag;
174            long slowDispatchThresholdMs = me.mSlowDispatchThresholdMs;
175            long slowDeliveryThresholdMs = me.mSlowDeliveryThresholdMs;
176            if (thresholdOverride > 0) {
177                slowDispatchThresholdMs = thresholdOverride;
178                slowDeliveryThresholdMs = thresholdOverride;
179            }
180            final boolean logSlowDelivery = (slowDeliveryThresholdMs > 0) && (msg.when > 0);
181            final boolean logSlowDispatch = (slowDispatchThresholdMs > 0);
182
183            final boolean needStartTime = logSlowDelivery || logSlowDispatch;
184            final boolean needEndTime = logSlowDispatch;
185
186            if (traceTag != 0 && Trace.isTagEnabled(traceTag)) {
187                Trace.traceBegin(traceTag, msg.target.getTraceName(msg));
188            }
189
190            final long dispatchStart = needStartTime ? SystemClock.uptimeMillis() : 0;
191            final long dispatchEnd;
192            try {
193                msg.target.dispatchMessage(msg);
194                dispatchEnd = needEndTime ? SystemClock.uptimeMillis() : 0;
195            } finally {
196                if (traceTag != 0) {
197                    Trace.traceEnd(traceTag);
198                }
199            }
200            if (logSlowDelivery) {
201                if (slowDeliveryDetected) {
202                    if ((dispatchStart - msg.when) <= 10) {
203                        Slog.w(TAG, "Drained");
204                        slowDeliveryDetected = false;
205                    }
206                } else {
207                    if (showSlowLog(slowDeliveryThresholdMs, msg.when, dispatchStart, "delivery",
208                            msg)) {
209                        // Once we write a slow delivery log, suppress until the queue drains.
210                        slowDeliveryDetected = true;
211                    }
212                }
213            }
214            if (logSlowDispatch) {
215                showSlowLog(slowDispatchThresholdMs, dispatchStart, dispatchEnd, "dispatch", msg);
216            }
217
218            if (logging != null) {
219                logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);
220            }
221
222            // Make sure that during the course of dispatching the
223            // identity of the thread wasn't corrupted.
224            final long newIdent = Binder.clearCallingIdentity();
225            if (ident != newIdent) {
226                Log.wtf(TAG, "Thread identity changed from 0x"
227                        + Long.toHexString(ident) + " to 0x"
228                        + Long.toHexString(newIdent) + " while dispatching to "
229                        + msg.target.getClass().getName() + " "
230                        + msg.callback + " what=" + msg.what);
231            }
232
233            msg.recycleUnchecked();
234        }
235    }

159行开始的for循环在正常状态下永远不会退出,除非调用Looper或MessageQueue的quit方法。在每一次循环的过程中,都做了以下三件事:

  1. 取出消息链表中可被处理的头部消息。
  2. 执行消息所对应的Handler的dispatchMessage方法,并记录消息处理的delivery时间和dispatch时间,用于监测消息队列的运转状态是否正常。
  3. 回收此消息。

在这三个步骤中,需要详细分析的是1和2。1需要较多的篇幅来阐述,因此我们先分析2的过程。

2.3.1 Delievery Time和Dispatch Time分别指的是什么?

Delievery Time:

Delievery Time示意图

待发送的消息通常都有一个预设的发送时间点,也即message的when字段。当这个消息从消息链表中被取出时,记录另一个时间点,称之为dispatchStart。正常情况下,dispatchStart和msg.when相同,表明消息按照预设的时间点被取出。非正常情况下,如果前面消息处理时间过长,将会延误后续消息的发送(因为消息链表是串行发送的)。这个道理和排队的情形很相似。

DelieveryTime = dispatchStart - msg.when,表示消息被取出的时间点和预设的时间点之间的差距。差值较小,表明消息基本是按照预设的时间来取出的。差值较大,则表明消息队列有些拥堵,可能是前面的消息过多,也可能是前面某个消息的处理耗时过长。总之,当前这个消息并没有按照预设的时间被取出,而是有些滞后了。

Dispatch Time:

消息的处理时间,也即消息所对应Handler的dispatchMessage方法的运行时间。每个消息都有属于自己的处理方法,其中可能包含某些耗时操作。因此记录下dispatch time,当这个时间超过某个阈值时给出相应的警告,可以帮助开发者了解程序的性能以及运行时的压力。

2.3.2 消息处理最终执行哪个方法?

消息处理会调用Handler的dispatchMessage方法来对消息进行处理。在这个方法内部,我们可以看出一个消息会有三种处理方式。三种处理方式并非随机选择,而是具有一定的优先级的。

  1. 当message本身的callback字段不为空时,按照callback指定的方式对消息进行处理。
  2. 当条件1不满足,且Handler对象的mCallback字段不为空时,按照mCallback指定的方式对消息进行处理。
  3. 当条件1、2均不满足时,按照Handler类的handleMessage方法对消息进行处理。

/frameworks/base/core/java/android/os/Handler.java

97    public void dispatchMessage(Message msg) {
98        if (msg.callback != null) {
99            handleCallback(msg);
100        } else {
101            if (mCallback != null) {
102                if (mCallback.handleMessage(msg)) {
103                    return;
104                }
105            }
106            handleMessage(msg);
107        }
108    }

以下分别列举满足3种处理方式的例子:

  1. 当message本身的callback字段不为空时,按照callback指定的方式对消息进行处理。

/frameworks/base/core/java/android/speech/tts/TextToSpeechService.java

579            Runnable runnable = new Runnable() {
580                @Override
581                public void run() {
582                    if (setCurrentSpeechItem(speechItem)) {
583                        speechItem.play();
584                        removeCurrentSpeechItem();
585                    } else {
586                        // The item is alreadly flushed. Stopping.
587                        speechItem.stop();
588                    }
589                }
590            };
591            Message msg = Message.obtain(this, runnable);

  1. 当条件1不满足,且Handler对象的mCallback字段不为空时,按照mCallback指定的方式对消息进行处理。

/frameworks/base/services/core/java/com/android/server/GraphicsStatsService.java

110        mWriteOutHandler = new Handler(bgthread.getLooper(), new Handler.Callback() {
111            @Override
112            public boolean handleMessage(Message msg) {
113                switch (msg.what) {
114                    case SAVE_BUFFER:
115                        saveBuffer((HistoricalBuffer) msg.obj);
116                        break;
117                    case DELETE_OLD:
118                        deleteOldBuffers();
119                        break;
120                }
121                return true;
122            }
123        });

  1. 当条件1、2均不满足时,按照Handler类的handleMessage方法对消息进行处理。

/frameworks/base/services/core/java/com/android/server/pm/ProcessLoggingHandler.java

35public final class ProcessLoggingHandler extends Handler {
......
......
47    @Override
48    public void handleMessage(Message msg) {
49        switch (msg.what) {
50            case LOG_APP_PROCESS_START_MSG: {
51                Bundle bundle = msg.getData();
52                String processName = bundle.getString("processName");
53                int uid = bundle.getInt("uid");
54                String seinfo = bundle.getString("seinfo");
55                String apkFile = bundle.getString("apkFile");
56                int pid = bundle.getInt("pid");
57                long startTimestamp = bundle.getLong("startTimestamp");
58                String apkHash = computeStringHashOfApk(apkFile);
59                SecurityLog.writeEvent(SecurityLog.TAG_APP_PROCESS_START, processName,
60                        startTimestamp, uid, pid, seinfo, apkHash);
61                break;
62            }
63            case INVALIDATE_BASE_APK_HASH_MSG: {
64                Bundle bundle = msg.getData();
65                mProcessLoggingBaseApkHashes.remove(bundle.getString("apkFile"));
66                break;
67            }
68        }
69    }

开发者定义的都是Handler的子类(譬如上面的ProcessingLoggingHandler),如果需要最终由Handler类的handleMessage来对消息进行处理,则子类中必须覆盖父类的handleMessage方法。否则将不会对消息进行处理,因为父类(Handler)的handleMessage方法是一个空方法。

这种阶梯式处理消息的设计,可以给予开发者更大的自由度。

2.3.3 如何取出下一个消息?

接下来重点讲述如何取出消息链表中可被处理的头部消息。让我们走进MessageQueue的next方法。

/frameworks/base/core/java/android/os/Looper.java

160            Message msg = queue.next(); // might block

/frameworks/base/core/java/android/os/MessageQueue.java

310    Message next() {
311        // Return here if the message loop has already quit and been disposed.
312        // This can happen if the application tries to restart a looper after quit
313        // which is not supported.
314        final long ptr = mPtr;
315        if (ptr == 0) {
316            return null;
317        }
318
319        int pendingIdleHandlerCount = -1; // -1 only during first iteration
320        int nextPollTimeoutMillis = 0;
321        for (;;) {
322            if (nextPollTimeoutMillis != 0) {
323                Binder.flushPendingCommands();
324            }
325
326            nativePollOnce(ptr, nextPollTimeoutMillis);
327
328            synchronized (this) {
329                // Try to retrieve the next message. Return if found.
330                final long now = SystemClock.uptimeMillis();
331                Message prevMsg = null;
332                Message msg = mMessages;
333                if (msg != null && msg.target == null) {
334                    // Stalled by a barrier. Find the next asynchronous message in the queue.
335                    do {
336                        prevMsg = msg;
337                        msg = msg.next;
338                    } while (msg != null && !msg.isAsynchronous());
339                }
340                if (msg != null) {
341                    if (now < msg.when) {
342                        // Next message is not ready. Set a timeout to wake up when it is ready.
343                        nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
344                    } else {
345                        // Got a message.
346                        mBlocked = false;
347                        if (prevMsg != null) {
348                            prevMsg.next = msg.next;
349                        } else {
350                            mMessages = msg.next;
351                        }
352                        msg.next = null;
353                        if (DEBUG) Log.v(TAG, "Returning message: " + msg);
354                        msg.markInUse();
355                        return msg;
356                    }
357                } else {
358                    // No more messages.
359                    nextPollTimeoutMillis = -1;
360                }
361
362                // Process the quit message now that all pending messages have been handled.
363                if (mQuitting) {
364                    dispose();
365                    return null;
366                }
367
368                // If first time idle, then get the number of idlers to run.
369                // Idle handles only run if the queue is empty or if the first message
370                // in the queue (possibly a barrier) is due to be handled in the future.
371                if (pendingIdleHandlerCount < 0
372                        && (mMessages == null || now < mMessages.when)) {
373                    pendingIdleHandlerCount = mIdleHandlers.size();
374                }
375                if (pendingIdleHandlerCount <= 0) {
376                    // No idle handlers to run. Loop and wait some more.
377                    mBlocked = true;
378                    continue;
379                }
380
381                if (mPendingIdleHandlers == null) {
382                    mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
383                }
384                mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
385            }
386
387            // Run the idle handlers.
388            // We only ever reach this code block during the first iteration.
389            for (int i = 0; i < pendingIdleHandlerCount; i++) {
390                final IdleHandler idler = mPendingIdleHandlers[i];
391                mPendingIdleHandlers[i] = null; // release the reference to the handler
392
393                boolean keep = false;
394                try {
395                    keep = idler.queueIdle();
396                } catch (Throwable t) {
397                    Log.wtf(TAG, "IdleHandler threw exception", t);
398                }
399
400                if (!keep) {
401                    synchronized (this) {
402                        mIdleHandlers.remove(idler);
403                    }
404                }
405            }
406
407            // Reset the idle handler count to 0 so we do not run them again.
408            pendingIdleHandlerCount = 0;
409
410            // While calling an idle handler, a new message could have been delivered
411            // so go back and look again for a pending message without waiting.
412            nextPollTimeoutMillis = 0;
413        }
414    }

首先分析326行的nativePollOnce方法,它的作用是设定下一次发送的时间或挂起线程。其对应的JNI方法为android_os_MessageQueue_nativePollOnce。内部调用NativeMessageQueue的pollOnce函数。

/frameworks/base/core/jni/android_os_MessageQueue.cpp

188static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj, 189 jlong ptr, jint timeoutMillis) {
190    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
191    nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
192}

NativeMessageQueue的pollOnce函数进一步调用Looper的pollOnce函数,并传入timeoutMills参数。

/frameworks/base/core/jni/android_os_MessageQueue.cpp

107void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
108    mPollEnv = env;
109    mPollObj = pollObj;
110    mLooper->pollOnce(timeoutMillis);
111    mPollObj = NULL;
112    mPollEnv = NULL;
113
114    if (mExceptionObj) {
115        env->Throw(mExceptionObj);
116        env->DeleteLocalRef(mExceptionObj);
117        mExceptionObj = NULL;
118    }
119}

一层层往下走,发现最终调用的是Looper的pollInner函数,最终通过系统调用epoll_wait陷入内核态。

/system/core/libutils/Looper.cpp

242    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);

传入epoll_wait的timeoutMillis参数将直接决定epoll的行为。这里可以分为三种情况:

  1. timeoutMillis = 0,意味着无需等待。检测epfd上是否有事件,有或没有都将直接返回,继续执行后面的操作。
  2. timeoutMillis > 0,意味着epoll_wait有超时时间。对于Level Trigger的fd事件(这里是这种情况),在调用epoll_wait的时候会首先查看该事件是否已经存在。如果存在则直接返回,否则线程被挂起呈现阻塞状态,等待超时时间到达后恢复至运行状态。在超时等待的这段时间内,如果有新的消息被加入到链表头部,发送线程将会唤醒此线程以重新决定timeoutMillis的值。
  3. timeoutMillis = -1,epoll_wait会首先查看监测事件是否已经存在,如果存在则直接返回,否则将无限期地等待下去,直到有新消息到来,其他线程唤醒此线程。

通过320行可知,nextPollTimeoutMillis在第一次循环时被设置为0,意味着第一次循环将跳过epoll_wait的等待,直接去检查消息链表的状态。

/frameworks/base/core/java/android/os/MessageQueue.java

320        int nextPollTimeoutMillis = 0;
321        for (;;) {
322            if (nextPollTimeoutMillis != 0) {
323                Binder.flushPendingCommands();
324            }
325
326            nativePollOnce(ptr, nextPollTimeoutMillis);

330-339行的主要工作是取出链表中第一个可被处理的消息。上文提到,MessageQueue只用了一个字段(mMessages)来记录消息链表的头部消息,所以通过332行便可以取到头部消息。如果链表头部是同步屏障,那么就要遍历去寻找链表中第一个异步消息。

/frameworks/base/core/java/android/os/MessageQueue.java

330                final long now = SystemClock.uptimeMillis();
331                Message prevMsg = null;
332                Message msg = mMessages;
333                if (msg != null && msg.target == null) {
334                    // Stalled by a barrier. Find the next asynchronous message in the queue.
335                    do {
336                        prevMsg = msg;
337                        msg = msg.next;
338                    } while (msg != null && !msg.isAsynchronous());
339                }

当取出的可处理消息为null时,意味着链表中暂时没有消息可以被处理,所以将nextPollTimeoutMillis置为-1,让next下一次轮询的时候直接通过epoll_wait将线程挂起休息。

反之则需要有进一步的处理,分两种情况讨论:

  1. 当下时间 < 该消息预定的处理时间,此时不应处理消息,需要等待时机成熟。于是将nextPollTimeoutMillis设置为当下时间和预定处理时间之间的差值,保证超时后能够再次轮询此消息,并进行相应处理。
  2. 当下时间 ≥ 该消息预定的处理时间,此时消息已经成熟,应该被处理。此时将mBlocked置为false,表明该线程处于Runnable状态,并且马上就要执行消息的处理方法。接着重构链表,将此消息从链表中删除。最后返回此消息到Looper的loop方法进行消息的实际处理。

/frameworks/base/core/java/android/os/MessageQueue.java

340                if (msg != null) {
341                    if (now < msg.when) {
342                        // Next message is not ready. Set a timeout to wake up when it is ready.
343                        nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
344                    } else {
345                        // Got a message.
346                        mBlocked = false;
347                        if (prevMsg != null) {
348                            prevMsg.next = msg.next;
349                        } else {
350                            mMessages = msg.next;
351                        }
352                        msg.next = null;
353                        if (DEBUG) Log.v(TAG, "Returning message: " + msg);
354                        msg.markInUse();
355                        return msg;
356                    }
357                } else {
358                    // No more messages.
359                    nextPollTimeoutMillis = -1;
360                }

2.3.4 IdleHandler有什么用?

在MessageQueue的next方法中,还会对IdleHandler进行处理。IdleHandler,顾名思义,表示线程空闲时才需要去执行的一些操作。如果此时链表头部的消息为空或尚未到达发送时间,则表明线程空闲,因此可以去处理一些杂事(IdleHandler里的工作)。

通过319行可知,pendingIdleHandlerCount最初始被赋值为-1。

/frameworks/base/core/java/android/os/MessageQueue.java

319        int pendingIdleHandlerCount = -1; // -1 only during first iteration

所以第一次运行到371行时,pendingIdleHandlerCount必定小于0。通过373行到384行,将mIdleHandlers(类型为ArrayList)中的元素赋值给mPendingIdleHandlers(类型为数组)。之所以不直接使用mIdleHandlers来进行遍历,是因为遍历处理mIdleHandles时无需持有MessageQueue的monitor lock,于是干脆将锁释放,让其他线程可以在处理mPendingIdleHandlers中的元素时,同时往mIdleHandlers中插入新的元素。

如果不需要对IdleHandler处理,或者mIdleHandlers中没有需要处理的对象,则设置mBlocked为true(377行),在下一轮循环的过程中会通过epoll_wait将本线程挂起。需要注意的一点是,如果此次next()方法能够取出有效消息进行处理,代码是不会执行到371行及以下的位置,它会在355行直接返回。

接下来便是遍历mIdleHandlers中的元素,并执行它们的queueIdle方法的过程。如果queueIdle返回false,表明该IdleHandler只会执行一次,执行完之后就从mIdleHandlers列表中删除。

/frameworks/base/core/java/android/os/MessageQueue.java

371                if (pendingIdleHandlerCount < 0
372                        && (mMessages == null || now < mMessages.when)) {
373                    pendingIdleHandlerCount = mIdleHandlers.size();
374                }
375                if (pendingIdleHandlerCount <= 0) {
376                    // No idle handlers to run. Loop and wait some more.
377                    mBlocked = true;
378                    continue;
379                }
380
381                if (mPendingIdleHandlers == null) {
382                    mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
383                }
384                mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
385            }
386
387            // Run the idle handlers.
388            // We only ever reach this code block during the first iteration.
389            for (int i = 0; i < pendingIdleHandlerCount; i++) {
390                final IdleHandler idler = mPendingIdleHandlers[i];
391                mPendingIdleHandlers[i] = null; // release the reference to the handler
392
393                boolean keep = false;
394                try {
395                    keep = idler.queueIdle();
396                } catch (Throwable t) {
397                    Log.wtf(TAG, "IdleHandler threw exception", t);
398                }
399
400                if (!keep) {
401                    synchronized (this) {
402                        mIdleHandlers.remove(idler);
403                    }
404                }
405            }
406
407            // Reset the idle handler count to 0 so we do not run them again.
408            pendingIdleHandlerCount = 0;
409
410            // While calling an idle handler, a new message could have been delivered
411            // so go back and look again for a pending message without waiting.
412            nextPollTimeoutMillis = 0;
413        }
414    }

3 总结

本文从以下三个方面详细介绍了Android中的消息机制:

  1. 消息队列准备过程
  2. 消息发送过程
  3. 消息处理过程

分析了消息从哪里来,到哪里去的问题。顺着这条主线,也穿插讲述了消息机制中一些不为人熟知的机制:同步屏障、epoll机制、delievery time以及IdleHandler的处理时机等。希望这些分析能够帮助到大家。

原文链接:banshan.tech/Android%E6%…

点击这里复制本文地址 以上内容由权冠洲的博客整理呈现,请务必在转载分享时注明本文地址!如对内容有疑问,请联系我们,谢谢!

支持Ctrl+Enter提交

联系我们| 本站介绍| 留言建议 | 交换友链 | 域名展示
本站资源来自互联网收集,仅供用于学习和交流,请遵循相关法律法规,本站一切资源不代表本站立场,如有侵权、后门、不妥请联系本站删除

权冠洲的博客 © All Rights Reserved.  Copyright quanguanzhou.top All Rights Reserved
苏公网安备 32030302000848号   苏ICP备20033101号-1
本网站由 提供CDN/云存储服务

联系我们