Android进程内线程之间的通信广泛使用到了Handler,handler也是Android独有的消息处理机制。最常见的莫过于使用handler更新ui了。现在就来分析Handler机制。
Android进程内线程之间的通信 如下图所示:
Android中当一个app运行之后,至少有一个主线程,也就是通常说的UI线程。如果还有其他线程要更新UI,那么试图更新UI的线程,就要给UI主线程发送消息,告诉UI线程如何更改UI等。
当然也可以向其他线程发送消息。Android 可以处理消息的线程都有且仅有一个消息队列,用来暂存发送给他的消息,处理线程的消息也有且仅有一个处理消息的对象,用来从消息对象中取出消息进行处理。
这里就要搞清楚,当有多个消息处理线程的时候,如何发送给某个具体的消息处理线程。消息处理线程又是如何处理消息的。
Android中与消息机制相关的类主要是 Looper,Handler,Message,MessageQueue.其中Looper充当消息处理的角色,MessageQueue充当消息队列,Message充当消息,Handler可以暂时理解为充当消息的发送者,实际上还Handler还定义了如何处理消息,只是处理消息是由消息处理线程完成的。
Handler机制即可用于异步通信,也可用于同步通信。大多数时候使用的是异步通信。
Looper
源码路径:
1
|
Android-6/frameworks/base/core/java/android/os/Looper.java
|
主要成员如下图所示:
Looper充当消息处理的角色,每个线程只能有一个looper对象,那么是如何做到的呢?
如何创建线程唯一的looper
创建looper对象只能使用其提供的静态方法prepare(),因为其构造方法是私有的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
public static void prepare() { prepare(true); }
private static void prepare(boolean quitAllowed) { if (sThreadLocal.get() != null) { throw new RuntimeException("Only one Looper may be created per thread"); } sThreadLocal.set(new Looper(quitAllowed)); }
private Looper(boolean quitAllowed) { mQueue = new MessageQueue(quitAllowed); mThread = Thread.currentThread(); }
|
静态变量sThreadLocal的类型是 ThreadLocal,它通过将需要保存的对象和线程id关联在一起的方式实现了线程本地存储的功能。这样放入sThreadLocal对象中的Looper对象就和创建它的线程关联在一起了。
消息处理循环
创建好Looper对象之后,调用他的loop方法即可进入消息处理循环:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
|
public static void loop() { final Looper me = myLooper(); if (me == null) { throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread."); } final MessageQueue queue = me.mQueue; ..............................................
for (;;) { Message msg = queue.next(); ................................... msg.target.dispatchMessage(msg); .......................................... msg.recycleUnchecked(); } }
|
loop()方法内部是一个for无线循环,所以肯定调用loop方法也是有讲究的:一般来说对其的调用是放在线程的run方法中的。
1 2 3 4 5 6
|
class mtThread extends Thread{ public void run(){ Looper.prepare(); Looper.loop(); } }
|
loop()方法会循环从MessageQueue队列中取出消息,然后吧消息分发出去。消息的分发是通过Message对象的Target变量完成的。这变量的类型是Handler类型。一个Looper对象可以和多个Handler对象关联.
Message是消息的载体,发送者吧需要传递的消息放在Message对象中,Message创建的时候需要指定他的处理对象。Handler主要用来发送和处理消息。只不过处理消息这个过程发生在消息处理线程中。
消息的载体Messenger
1
|
Android-6/frameworks/base/core/java/android/os/Message.java
|
关键数据成员:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
public final class Message implements Parcelable { public int what; public int arg1; public int arg2; public Object obj;
public Messenger replyTo;
static final int FLAG_IN_USE = 1 << 0;
static final int FLAG_ASYNCHRONOUS = 1 << 1;
static final int FLAGS_TO_CLEAR_ON_COPY_FROM = FLAG_IN_USE;
int flags;
long when;
Bundle data;
Handler target;
Runnable callback;
|
其中target表示哪个Handler处理该消息。要知道在哪里设置了该字段的值。
如何创建一个Message
创建Message对象,不建议直接new,而是调用Message提供的一个静态方法obtain():
推荐:
1 2 3 4 5 6
|
public static Message obtain(Handler h) { Message m = obtain(); m.target = h;
return m; }
|
Message设计时,实现了Recyle机制。
1 2 3 4 5 6 7 8 9 10 11 12 13
|
public static Message obtain() { synchronized (sPoolSync) { if (sPool != null) { Message m = sPool; sPool = m.next; m.next = null; m.flags = 0; sPoolSize--; return m; } } return new Message(); }
|
其他obtain方法:
1 2 3 4 5
|
public static Message obtain(Handler h, int what); public static Message obtain(Handler h, Runnable callback); public static Message obtain(Handler h, int what, Object obj); public static Message obtain(Handler h, int what, int arg1, int arg2); public static Message obtain(Handler h, int what, int arg1, int arg2, Object obj);
|
Recycle机制
Android源码中大量使用了一个设计机制:当一个对象不再使用时把它储藏起来,不让虚拟机回收,需要的时候再从仓库里拿出来重新使用,这就避免了对象被回收后再重分配的过程。对于在应用的生命周期内(或者在循环中)需要频繁创建的对象来说这个机制特别实用,可以显著减少对象创建的次数,从而减少 GC 的运行时间。运用得当便可改善应用的性能
如何实现?
首先,我们需要一个仓库用于存放暂时不用的对象;需要新对象的时候我们不能使用 new 来分配一个新对象,所以还需要一个方法 obtain 来从仓库里获取对象;最后,便是 recycle 方法了,用于回收不再使用的对象。
Message类中与Recycle机制相关的成员,要注意除了next外,其他都为static类型。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
|
private static final Object sPoolSync = new Object();
private static Message sPool;
Message next;
private static int sPoolSize = 0;
private static final int MAX_POOL_SIZE = 50;
private static boolean gCheckRecycle = true;
|
创建对象的时候,不建议直接new,而是调用obtain()来尝试从仓库中获取。
第一次调用Obtain时,仓库为空,那么就调用构造方法创建Message对象。当Message不在使用的时候,调用recycle()方法可以把该对象放在仓库中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
|
public void recycle() { if (isInUse()) { if (gCheckRecycle) { throw new IllegalStateException("This message cannot be recycled because it " + "is still in use."); } return; } recycleUnchecked(); }
void recycleUnchecked() { flags = FLAG_IN_USE; what = 0; arg1 = 0; arg2 = 0; obj = null; replyTo = null; sendingUid = -1; when = 0; target = null; callback = null; data = null;
synchronized (sPoolSync) { if (sPoolSize < MAX_POOL_SIZE) { next = sPool; sPool = this; sPoolSize++; } }
|
放在仓库中,实际上就是为该对象增加了一个引用而已,这样虚拟机就不会回收它了。等下次使用在使用obtain方法尝试获取Message对象的时候,就可以省去分配对象的过程了,直接从仓库中获取。
可能会有疑问,平时在使用过程中,并没有手动调用过recycle()方法啊,其实Looper在处理消息时,会调用Message的recyle()方法!!!
handler
在创建Message对象时,需要关联一个Handler,那么现在看看这个Handler是什么。
源码位置:
1
|
Android-6/frameworks/base/core/java/android/os/Handler.java
|
Handler充当了消息发送者和处理者的角色(定义了如何处理消息,但是是由Looper调用的,执行发生在消息处理线程),与Handler关联的Message可以有多个,那么Handler就要提供一个分发的功能,因为不同的消息,处理肯定也不一样。在一个线程中,可以只使用一个Handler对象来处理所有消息,也可以有多个。
如何创建Handler
Handler的创建必须关联一个Looper,因为Handler负责发送消息,那么自然要指定谁来接收这个消息了。如果创建Handler没有关联Looper,那么默认关联当前线程中的Looper对象。
Handler也要负责定义消息的实际处理逻辑,所以创建Handler时,可以传递一个callback,负责这个Handler所有消息的处理。但这不是必须的,因为Message类中也有callback变量,而且可以使用下面的方法创建Message并指定消息处理逻辑:
1
|
public static Message obtain(Handler h, Runnable callback);
|
创建Handler的构造方法如下,构造Handler时会将与之关联的Looper中的消息队列也通过一个引用保存在Handler对象中,方便直接操作。
普通的消息的Handler:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
|
public Handler(Callback callback) { this(callback, false); }
public Handler(Looper looper) { this(looper, null, false); }
public Handler(Looper looper, Callback callback) { this(looper, callback, false); }
|
异步消息的Handler:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
public Handler(boolean async) { this(null, async); } public Handler(Callback callback, boolean async); public Handler(Looper looper, Callback callback, boolean async){
mLooper = looper; mQueue = looper.mQueue; mCallback = callback; mAsynchronous = async; }
|
普通消息和异步消息的区别在于是否调用Message的setAsynchronous将其设置为异步消息了。默认创建的消息的均为普通消息,需要显示调用setAsynchronous将其变为异步消息。
1 2 3 4 5 6 7
|
public void setAsynchronous(boolean async) { if (async) { flags |= FLAG_ASYNCHRONOUS; } else { flags &= ~FLAG_ASYNCHRONOUS; } }
|
根据创建Handler方式的不同,在Handler发送消息到消息队列的时候设置是否为异步消息。异步消息和普通消息的处理之后当消息队列中存在一个target为null的message的时候,才会不同。当消息队列中没有这样的message的时候,处理是一样的。
Handler如何发送Message
有两大类发送消息的方法。
- send类,该类方法发送的消息一般用于发送传统的带有消息id的消息,也可以携带一些其他信息。消息的处理由Handler设定callback方法处理。当然Message也是可以指定消息处理的callback的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
|
public final boolean sendMessage(Message msg)
public final boolean sendEmptyMessage(int what)
public final boolean sendEmptyMessageAtTime(int what, long uptimeMillis)
public final boolean sendMessageDelayed(Message msg, long delayMillis)
public boolean sendMessageAtTime(Message msg, long uptimeMillis)
|
上述send方法最终都会调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
|
public final boolean sendMessageDelayed(Message msg, long delayMillis) { if (delayMillis < 0) { delayMillis = 0; } return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis); }
public boolean sendMessageAtTime(Message msg, long uptimeMillis) { MessageQueue queue = mQueue; if (queue == null) { RuntimeException e = new RuntimeException( this + " sendMessageAtTime() called with no mQueue"); Log.w("Looper", e.getMessage(), e); return false; } return enqueueMessage(queue, msg, uptimeMillis); } private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) { msg.target = this; if (mAsynchronous) { msg.setAsynchronous(true); } return queue.enqueueMessage(msg, uptimeMillis); }
|
所谓的send消息,只是把消息插入到了消息队列中,同时指定消息处理的时间。MesssageQueue中的消息是按照时间排序的,后面在细说。
如果指定的时间为0,则表示需要立即处理,MesssageQueue会把这条消息放到队列的头部:
1 2 3 4 5 6 7 8 9 10 11
|
public final boolean sendMessageAtFrontOfQueue(Message msg){ MessageQueue queue = mQueue; if (queue == null) { RuntimeException e = new RuntimeException( this + " sendMessageAtTime() called with no mQueue"); Log.w("Looper", e.getMessage(), e); return false; } return enqueueMessage(queue, msg, 0); }
|
该方法用于将本消息放在消息队列的最开始的位置,创建的消息一般是希望马上尽快处理,非常紧急。
- post类的发送方法中的一个必要参数是一个Runnable类的对象,然后在post方法内部调用getPostMessage(Runnable r),得到绑定该Runnable对象的Message对象。最后在发送。说白了,post类型的方法发送的message,会绑定一个用于消息处理的Runnable对象。
和send一样,post也有下列若干方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
|
sendMessage { return sendMessageDelayed(getPostMessage(r), 0); } public final boolean postAtTime(Runnable r, long uptimeMillis) { return sendMessageAtTime(getPostMessage(r), uptimeMillis); }
public final boolean postAtTime(Runnable r, Object token, long uptimeMillis) { return sendMessageAtTime(getPostMessage(r, token), uptimeMillis); } public final boolean postDelayed(Runnable r, long delayMillis) { return sendMessageDelayed(getPostMessage(r), delayMillis); } public final boolean postAtFrontOfQueue(Runnable r) { return sendMessageAtFrontOfQueue(getPostMessage(r)); }
|
getPostMessage方法定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
|
private static Message getPostMessage(Runnable r) { Message m = Message.obtain(); m.callback = r; return m; }
private static Message getPostMessage(Runnable r, Object token) { Message m = Message.obtain(); m.obj = token; m.callback = r; return m; }
|
handler如何处理消息
再来看看Looper中的loop方法中是如何处理消息的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
|
public static void loop() { final Looper me = myLooper(); if (me == null) { throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread."); } final MessageQueue queue = me.mQueue; ..............................................
for (;;) { Message msg = queue.next(); ................................... msg.target.dispatchMessage(msg); .......................................... msg.recycleUnchecked(); } }
|
从中可以看出Handler是使用dispatchMessage来进行消息处理的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
public void dispatchMessage(Message msg) { if (msg.callback != null) { handleCallback(msg); } else { if (mCallback != null) { if (mCallback.handleMessage(msg)) { return; } } handleMessage(msg); } }
private static void handleCallback(Message message) { message.callback.run(); }
public interface Callback { public boolean handleMessage(Message msg); }
|
逻辑很简单清晰。如果Message中指定了消息处理callback,那么就调用这个callback处理消息。其中post类型的方法发送的message就符合这种情况。
如果Message中没有指定callback,那么再看创建Handler对象时,有没有绑定一个callback,有的话,就是用这个callback进行处理。
如果Handler对象没有绑定callback,那么就调用handleMessage()方法处理。这个方法通常由其子类实现。我们在使用Handler的时候经常使用的就是定义一个继承Handler的类,并实现其handleMessage()方法。
这里强调下,当消息处理结束后,调用了Message的recycleUnchecked(),对该Message对象进行了回收,放入了仓库中,下次使用obtain方法获取Message对象时,就可以直接使用了,不在需要重新new了。
消息队列MessageQueue
MessageQueue类是整个Android Handler消息处理机制的难点也是精华。
源码位置:
1
|
Android-6/frameworks/base/core/java/android/os/MessageQueue.java
|
MessageQueue类定义节选:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
|
public final class MessageQueue {
private final boolean mQuitAllowed;
private long mPtr;
Message mMessages; private final ArrayList<IdleHandler> mIdleHandlers = new ArrayList<IdleHandler>(); private SparseArray<FileDescriptorRecord> mFileDescriptorRecords; private IdleHandler[] mPendingIdleHandlers; private boolean mQuitting;
private boolean mBlocked;
private int mNextBarrierToken;
private native static long nativeInit(); private native static void nativeDestroy(long ptr); private native void nativePollOnce(long ptr, int timeoutMillis); private native static void nativeWake(long ptr); private native static boolean nativeIsPolling(long ptr); private native static void nativeSetFileDescriptorEvents(long ptr, int fd, int events); ..............
|
其中Message mMessages记录的就是一条消息链表。另外还有几个native函数,这就说明MessageQueue会通过JNI技术调用到底层代码。mMessages域记录着消息队列中所有Java层的实质消息。一定要注意了,mMessages记录的只是Java层的消息,不包括native层的。而mptr记录的则是native层的message.
MessageQueue的示意图如下:
Android系统在Native层也实现了一个用于native进程中的线程通信的looper消息处理机制。java层的Looepr和Native的Looper并没有什么直接的关系。MessageQueue虽然使用了Native的looper类,但也仅仅使用了它的等待/唤醒机制。其余的如消息队列的实现都是在java层。
关于MessageQueue的不得不知的内幕
再次看一下Looper中的loop方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
|
public static void loop() { final Looper me = myLooper(); if (me == null) { throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread."); } final MessageQueue queue = me.mQueue; .............................................. for (;;) { Message msg = queue.next(); ................................... msg.target.dispatchMessage(msg); .......................................... msg.recycleUnchecked(); } }
|
本质上是一个无限for循环。纵然loop是运行在单独一个线程中的,一个死循环对性能的消耗也是很可观的。这是Android系统不允许的。正常的逻辑应该是这样的,消息队列中没有消息要处理的时候,线程应该被挂起。等有消息被传入消息队列的时候,在唤醒线程去处理消息。for循环中的三个方法有两个方法我们已经知道是做什么的了,其中没有涉及挂起线程的代码,那么只能是消息队列中的next方法在作怪了!
对于Looper而言,它主要关心的是从消息队列里取消息,而后分派消息。然而对消息队列而言,在取消息时还要考虑更多技术细节。它关心的细节有:
1)如果消息队列里目前没有合适的消息可以摘取,那么不能让它所属的线程“傻转”,而应该使之阻塞;
2)队列里的消息应该按其“到时”的顺序进行排列,最先到时的消息会放在队头,也就是mMessages域所指向的消息,其后的消息依次排开;
3)阻塞的时间最好能精确一点儿,所以如果暂时没有合适的消息节点可摘时,要考虑链表首个消息节点将在什么时候到时,所以这个消息节点距离当前时刻的时间差,就是我们要阻塞的时长。
4)有时候外界希望队列能在即将进入阻塞状态之前做一些动作,这些动作可以称为idle动作,我们需要兼顾处理这些idle动作。一个典型的例子是外界希望队列在进入阻塞之前做一次垃圾收集。
MessageQueue的创建
MessageQueue是和Looper绑定的,一个Looper只能有一个MessageQueue.在Looper的构造方法中会创建MessageQueue对象,然后关联到Looper中。
其构造方法
1 2 3 4
|
MessageQueue(boolean quitAllowed) { mQuitAllowed = quitAllowed; mPtr = nativeInit(); }
|
jni方法nativeInit如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
|
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) { NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue(); if (!nativeMessageQueue) { jniThrowRuntimeException(env, "Unable to allocate native queue"); return 0; }
nativeMessageQueue->incStrong(env); return reinterpret_cast<jlong>(nativeMessageQueue); } NativeMessageQueue::NativeMessageQueue() : mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) { mLooper = Looper::getForThread(); if (mLooper == NULL) { mLooper = new Looper(false); Looper::setForThread(mLooper); } }
Looper::Looper(bool allowNonCallbacks) : mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false), mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false), mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
mWakeEventFd = eventfd(0, EFD_NONBLOCK); LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd. errno=%d", errno);
AutoMutex _l(mLock); rebuildEpollLocked(); }
void Looper::rebuildEpollLocked() { ............................................ mEpollFd = epoll_create(EPOLL_SIZE_HINT); LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno);
struct epoll_event eventItem; memset(& eventItem, 0, sizeof(epoll_event)); eventItem.events = EPOLLIN; eventItem.data.fd = mWakeEventFd; int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem); LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance. errno=%d", errno);
for (size_t i = 0; i < mRequests.size(); i++) { const Request& request = mRequests.valueAt(i); struct epoll_event eventItem; request.initEventItem(&eventItem);
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem); if (epollResult < 0) { ALOGE("Error adding epoll events for fd %d while rebuilding epoll set, errno=%d", request.fd, errno); } } }
|
可以看到在构造Looper对象时,其内部创建事件对象,用来实现等待/通知(wait/notify) 机制。linux内核会为这个对象维护一个64位的计数器(uint64_t)。并且使用第一个参数(initval)初始化这个计数器。调用eventfd()这个函数就会返回一个新的文件描述符(event object)
创建这个事件对象后,可以对其做如下操作。
write 将缓冲区写入的8字节整形值加到内核计数器上。
read 读取8字节值, 并把计数器重设为0.如果调用read的时候计数器为0, 要是eventfd是阻塞的, read就一直阻塞在这里,否则就得到 一个EAGAIN错误。
除此之外Native的Looper还创建了一个epoll来监听事件对象的“读操作”。也就是说,是利用epoll机制来完成阻塞动作的。每当我们向消息队列发送事件时,最终会间接向这个事件对象写入uint64_t 类型的1.于是epoll立即就感知到了风吹草动,如果有读操作而阻塞的线程,那么就会被唤醒了。
如何将Message放入消息队列中
Handler的enqueueMessage方法将一个Message放入消息队列的时候,实际调用的就是MessageQueue的enqueueMessage方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
|
boolean enqueueMessage(Message msg, long when) { ............................. msg.when = when; Message p = mMessages; boolean needWake; if (p == null || when == 0 || when < p.when) { msg.next = p; mMessages = msg; needWake = mBlocked; } else { needWake = mBlocked && p.target == null && msg.isAsynchronous(); Message prev; for (;;) { prev = p; p = p.next; if (p == null || when < p.when) { break; } if (needWake && p.isAsynchronous()) { needWake = false; } } msg.next = p; prev.next = msg; }
if (needWake) { nativeWake(mPtr); } } return true; }
|
enqueueMessage()方法主要是把消息放入消息队列中,消息队列的组织也很简单,利用Message类中的next”指针”形成一个从头指向尾的单向链表。 因为消息链表是按时间进行排序的,所以主要是在比对Message携带的when信息。
enqueueMessage()方法会近尽量避免唤醒线程,因为这不是他的主要职责。除非插入的消息是立即要处理的消息,比如Handler使用sendMessageAtFrontOfQueue()方法发送的消息,因为when为0,所以enqueueMessage()方法在处理的时候会尝试唤醒线程来处理。
还有一种情况就是消息队列中存在target为null的消息,而且此时放入消息队列中的消息是一个异步消息,那么也要尝试唤醒线程。
target为null的Message是一个”同步分割栏”,它就像一个卡子,卡在消息链表中的某个位置,当消息循环不断从消息链表中摘取消息并进行处理时,一旦遇到这种“同步分割栏”,那么即使在分割栏之后还有若干已经到时的普通Message,也不会摘取这些消息了。请注意,此时只是不会摘取“普通Message”了,如果队列中还设置有“异步Message”,那么还是会摘取已到时的“异步Message”的。
如何向消息队列中放入”同步分割栏”呢?肯定不能使用Handler提供的发送Message的方法了。
方法就是MessageQueue中的postSyncBarrier():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
|
public int postSyncBarrier() { return postSyncBarrier(SystemClock.uptimeMillis()); } private int postSyncBarrier(long when) { synchronized (this) { final int token = mNextBarrierToken++; final Message msg = Message.obtain(); msg.markInUse(); msg.when = when; msg.arg1 = token;
Message prev = null; Message p = mMessages; if (when != 0) { while (p != null && p.when <= when) { prev = p; p = p.next; } } if (prev != null) { msg.next = p; prev.next = msg; } else { msg.next = p; mMessages = msg; } return token; } }
|
“同步分割栏”这种卡子会一直卡在消息队列中,除非我们调用MessageQueue中的removeSyncBarrier()删除这个卡子。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
|
public void removeSyncBarrier(int token) { synchronized (this) { Message prev = null; Message p = mMessages; while (p != null && (p.target != null || p.arg1 != token)) { prev = p; p = p.next; } if (p == null) { throw new IllegalStateException("The specified message queue synchronization " + " barrier token has not been posted or has already been removed."); } final boolean needWake; if (prev != null) { prev.next = p.next; needWake = false; } else { mMessages = p.next; needWake = mMessages == null || mMessages.target != null; } p.recycleUnchecked();
if (needWake && !mQuitting) { nativeWake(mPtr); } } }
|
删除“同步分割栏”的时候,如果它的前面还有其他Message,则不需要唤醒线程。如果“同步分割栏”就是消息队列的第一个消息,而且如果“同步分割栏”后面还有其他非“同步分割栏”的Message的时候,就尝试唤醒线程。
现在再来看看这个nativeWake()方法:
1
|
Android-6/frameworks/base/core/jni/android_os_MessageQueue.cpp
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr); nativeMessageQueue->wake(); } void NativeMessageQueue::wake() { mLooper->wake(); } void Looper::wake() { #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ wake", this); #endif
uint64_t inc = 1; ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t))); if (nWrite != sizeof(uint64_t)) { if (errno != EAGAIN) { ALOGW("Could not write wake signal, errno=%d", errno); } } }
|
做的事情很简单,就是向一个管道中写入一个uint64_t类型的数据1.
MessageQueue的消息循环
这里指的就是MessageQueue的next()方法了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
|
Message next() { final long ptr = mPtr; if (ptr == 0) { return null; }
int pendingIdleHandlerCount = -1; int nextPollTimeoutMillis = 0; for (;;) { if (nextPollTimeoutMillis != 0) { Binder.flushPendingCommands(); }
nativePollOnce(ptr, nextPollTimeoutMillis);
synchronized (this) { final long now = SystemClock.uptimeMillis(); Message prevMsg = null; Message msg = mMessages; if (msg != null && msg.target == null) { do { prevMsg = msg; msg = msg.next; } while (msg != null && !msg.isAsynchronous()); } if (msg != null) { if (now < msg.when) { nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE); } else { mBlocked = false; if (prevMsg != null) { prevMsg.next = msg.next; } else { mMessages = msg.next; } msg.next = null; if (DEBUG) Log.v(TAG, "Returning message: " + msg); msg.markInUse(); return msg; } } else { nextPollTimeoutMillis = -1; }
if (mQuitting) { dispose(); return null; }
if (pendingIdleHandlerCount < 0 && (mMessages == null || now < mMessages.when)) { pendingIdleHandlerCount = mIdleHandlers.size(); } if (pendingIdleHandlerCount <= 0) { mBlocked = true; continue; }
if (mPendingIdleHandlers == null) { mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)]; } mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers); }
for (int i = 0; i < pendingIdleHandlerCount; i++) { final IdleHandler idler = mPendingIdleHandlers[i]; mPendingIdleHandlers[i] = null;
boolean keep = false; try { keep = idler.queueIdle(); } catch (Throwable t) { Log.wtf(TAG, "IdleHandler threw exception", t); }
if (!keep) { synchronized (this) { mIdleHandlers.remove(idler); } } }
pendingIdleHandlerCount = 0;
nextPollTimeoutMillis = 0; } }
|
首先检查消息队列中的第一条消息是否是”同步分割栏”,如果是,寻找队列中第一条异步消息,找到后设为当前处理的消息;如果第一条消息不是”同步分割栏”,把第一条消息设置为当前处理的消息。
如果当前处理的消息不为null,检查该校的处理时间是否已经超时,如果没有,计算等待的时长。如果处理的时间到了,next()方法将返回该消息并退出。
如果当前处理的消息为null,表示队列中没有可以处理的消息,设置等待时间为-1
检查消息队列中的退出标志,如果设置了,那么销毁native层的对象,然后next()方法退出
检查是否安装了处理idle状态的回调方法,如果没有安装则回到for循环的最开始处重新执行,也就是执行nativePollOnce()方法挂起线程并等待新的消息到来。
如果安装了idle状态的回调方法,则调用所有的回调方法,同时把nextPollTimeoutMillis设置为0.这表明在安装了idle处理方法的情况下,消息队列的循环处理是不会被阻塞的,这样idle处理函数将会不停的被调用直到处理方法返回false。
实际上next这个方法里的for循环并不是起循环摘取消息节点的作用,而是为了连贯“当前时间点”和“处理下一条消息的时间点”。简单地说,当“定时机制”触发“摘取一条消息”的动作时,会判断事件队列的首条消息是否真的到时了,如果已经到时了,就直接返回这个msg,而如果尚未到时,则会努力计算一个较精确的等待时间(nextPollTimeoutMillis),计算完后,那个for循环会掉过头再次调用到nativePollOnce(mPtr, nextPollTimeoutMillis),进入阻塞状态,从而等待合适的时长。
当消息队列中没有消息需要马上处理时,会判断用户是否设置了Idle Handler,如果有的话,则会尝试处理mIdleHandlers中所记录的所有Idle Handler,此时会逐个调用这些Idle Handler的queueIdle()成员函数。
如果要彻底搞清楚next方法,那么就必须搞定nativePollOnce()到底做了什么事情。
nativePollOnce()起到了阻塞作用,保证消息循环不会在无消息处理时一直在那里“傻转”。那么,nativePollOnce()函数究竟是如何实现阻塞功能的呢?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
|
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj, jlong ptr, jint timeoutMillis) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr); nativeMessageQueue->pollOnce(env, obj, timeoutMillis); } void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) { ................... mLooper->pollOnce(timeoutMillis); ..................... } inline int pollOnce(int timeoutMillis) { return pollOnce(timeoutMillis, NULL, NULL, NULL); }
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) { int result = 0; for (;;) { ................. result = pollInner(timeoutMillis); } }
int Looper::pollInner(int timeoutMillis) { ...................... int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
............. for (int i = 0; i < eventCount; i++) { int fd = eventItems[i].data.fd; uint32_t epollEvents = eventItems[i].events; if (fd == mWakeEventFd) { if (epollEvents & EPOLLIN) { awoken(); } else { ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents); } } else { ssize_t requestIndex = mRequests.indexOfKey(fd); ....................... } } .......................... return result; }
void Looper::awoken() { uint64_t counter; TEMP_FAILURE_RETRY(read(mWakeEventFd, &counter, sizeof(uint64_t))); }
|
pollInner()调用epoll_wait()时传入的timeoutMillis参数,其实来自于前面所说的MessageQueue的next()函数里的nextPollTimeoutMillis,next()函数里在以下3种情况下,会给nextPollTimeoutMillis赋不同的值:
如果消息队列中的下一条消息还要等一段时间才到时的话,那么nextPollTimeoutMillis赋值为Math.min(msg.when - now, Integer.MAX_VALUE),即时间差;
如果消息队列已经是空队列了,那么nextPollTimeoutMillis赋值为-1;
不管前两种情况下是否已给nextPollTimeoutMillis赋过值了,只要队列中有Idle Handler需要处理,那么在处理完所有Idle Handler之后,会强制将nextPollTimeoutMillis赋值为0。这主要是考虑到在处理Idle Handler时,不知道会耗时多少,而在此期间消息队列的“到时情况”有可能已发生改变。
不管epoll_wait()的超时阀值被设置成什么,只要程序从epoll_wait()中返回,说明有事件可以被处理了,否则就一直阻塞在epoll_wait 方法中。
当从epoll_wait中返回了,调用awoken(),而这个方法也是很简单的读了一下数值而已。
Android java层的Handler机制还是很简单的。无非就是向Looper的消息队列中插入Message,而后再由Looper在消息循环里具体处理。因为消息队列本身不具有链表一变动就能马上感知的功能,所以它需要借助linux内核提供的等待/通知机制来监听变动。java层的Handler机制仅仅是利用native层的Looper中提供的阻塞唤醒机制而已。当消息处理线程调用next方法尝试获取message的时候,实际上就是在等待epoll_wait方法返回。epoll_wait方法在监听我们创建的事件对象的读事件。当没有内容可读的时候就会一直阻塞,而当我们使用发送Message的时候,就会向这个时间对象中写入uint64_t类型的数据1,此时有内容可读了epoll_wait就可以返回了。仅此而已。