Android消息处理机制之Handler

时间:2021-08-10 17:47:48

Android进程内线程之间的通信广泛使用到了Handler,handler也是Android独有的消息处理机制。最常见的莫过于使用handler更新ui了。现在就来分析Handler机制。

Android进程内线程之间的通信 如下图所示:

Android消息处理机制之Handler

Android中当一个app运行之后,至少有一个主线程,也就是通常说的UI线程。如果还有其他线程要更新UI,那么试图更新UI的线程,就要给UI主线程发送消息,告诉UI线程如何更改UI等。

当然也可以向其他线程发送消息。Android 可以处理消息的线程都有且仅有一个消息队列,用来暂存发送给他的消息,处理线程的消息也有且仅有一个处理消息的对象,用来从消息对象中取出消息进行处理。

这里就要搞清楚,当有多个消息处理线程的时候,如何发送给某个具体的消息处理线程。消息处理线程又是如何处理消息的。

Android中与消息机制相关的类主要是 Looper,Handler,Message,MessageQueue.其中Looper充当消息处理的角色,MessageQueue充当消息队列,Message充当消息,Handler可以暂时理解为充当消息的发送者,实际上还Handler还定义了如何处理消息,只是处理消息是由消息处理线程完成的。

Handler机制即可用于异步通信,也可用于同步通信。大多数时候使用的是异步通信。

Looper

源码路径:

1
Android-6/frameworks/base/core/java/android/os/Looper.java

主要成员如下图所示:

Android消息处理机制之Handler

Looper充当消息处理的角色,每个线程只能有一个looper对象,那么是如何做到的呢?

如何创建线程唯一的looper

创建looper对象只能使用其提供的静态方法prepare(),因为其构造方法是私有的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void prepare() {
prepare(true);
}

private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));
}

private Looper(boolean quitAllowed) {
mQueue = new MessageQueue(quitAllowed);
mThread = Thread.currentThread();
}

静态变量sThreadLocal的类型是 ThreadLocal,它通过将需要保存的对象和线程id关联在一起的方式实现了线程本地存储的功能。这样放入sThreadLocal对象中的Looper对象就和创建它的线程关联在一起了。

消息处理循环

创建好Looper对象之后,调用他的loop方法即可进入消息处理循环:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
    public static void loop() {
// 拿到与当前线程关联的looper对象
final Looper me = myLooper();
if (me == null) {
throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
}
final MessageQueue queue = me.mQueue;
..............................................

for (;;) {
Message msg = queue.next(); // might block
...................................
msg.target.dispatchMessage(msg);
..........................................
msg.recycleUnchecked();// 回收Message对象
}
}

loop()方法内部是一个for无线循环,所以肯定调用loop方法也是有讲究的:一般来说对其的调用是放在线程的run方法中的。

1
2
3
4
5
6
class mtThread extends Thread{
public void run(){
Looper.prepare();
Looper.loop();
}
}

loop()方法会循环从MessageQueue队列中取出消息,然后吧消息分发出去。消息的分发是通过Message对象的Target变量完成的。这变量的类型是Handler类型。一个Looper对象可以和多个Handler对象关联.

Message是消息的载体,发送者吧需要传递的消息放在Message对象中,Message创建的时候需要指定他的处理对象。Handler主要用来发送和处理消息。只不过处理消息这个过程发生在消息处理线程中。

消息的载体Messenger

1
Android-6/frameworks/base/core/java/android/os/Message.java

关键数据成员:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public final class Message implements Parcelable {
// 标识消息的类型,消息分发的时候需要使用
public int what;
// 此消息可以携带的一个int型数据
public int arg1;
// 此消息可以携带的第二个int型数据
public int arg2;
// 此消息可以携带的一个对象
public Object obj;

public Messenger replyTo;

/*package*/ static final int FLAG_IN_USE = 1 << 0;

/** If set message is asynchronous */
/*package*/ static final int FLAG_ASYNCHRONOUS = 1 << 1;

/** Flags to clear in the copyFrom method */
/*package*/ static final int FLAGS_TO_CLEAR_ON_COPY_FROM = FLAG_IN_USE;

/*package*/ int flags;

/*package*/ long when;

/*package*/ Bundle data;

/*package*/ Handler target; // 此消息绑定的Handler,也就是该Handler发送和处理该消息

/*package*/ Runnable callback; // 处理消息的回调方法,执行是在消息处理线程

其中target表示哪个Handler处理该消息。要知道在哪里设置了该字段的值。

如何创建一个Message

创建Message对象,不建议直接new,而是调用Message提供的一个静态方法obtain():

推荐:

1
2
3
4
5
6
public static Message obtain(Handler h) {
Message m = obtain();
m.target = h;

return m;
}

Message设计时,实现了Recyle机制。

1
2
3
4
5
6
7
8
9
10
11
12
13
public static Message obtain() {
synchronized (sPoolSync) {
if (sPool != null) {
Message m = sPool;
sPool = m.next;
m.next = null;
m.flags = 0; // clear in-use flag
sPoolSize--;
return m;
}
}
return new Message();
}

其他obtain方法:

1
2
3
4
5
public static Message obtain(Handler h, int what);
public static Message obtain(Handler h, Runnable callback);//同事指定了处理该消息的逻辑
public static Message obtain(Handler h, int what, Object obj);
public static Message obtain(Handler h, int what, int arg1, int arg2);
public static Message obtain(Handler h, int what, int arg1, int arg2, Object obj);

Recycle机制

Android源码中大量使用了一个设计机制:当一个对象不再使用时把它储藏起来,不让虚拟机回收,需要的时候再从仓库里拿出来重新使用,这就避免了对象被回收后再重分配的过程。对于在应用的生命周期内(或者在循环中)需要频繁创建的对象来说这个机制特别实用,可以显著减少对象创建的次数,从而减少 GC 的运行时间。运用得当便可改善应用的性能

如何实现?

首先,我们需要一个仓库用于存放暂时不用的对象;需要新对象的时候我们不能使用 new 来分配一个新对象,所以还需要一个方法 obtain 来从仓库里获取对象;最后,便是 recycle 方法了,用于回收不再使用的对象。

Message类中与Recycle机制相关的成员,要注意除了next外,其他都为static类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

// 同步
private static final Object sPoolSync = new Object();

// 仓库中的某个Message
private static Message sPool;

// 指向仓库中的下一个可用Message对象,当 next 为 null 时表示仓库为空
Message next;

// 仓库中的 Message 对象 数量
private static int sPoolSize = 0;

// 回收仓库中的Message对象数量最大为50个
private static final int MAX_POOL_SIZE = 50;

private static boolean gCheckRecycle = true;

创建对象的时候,不建议直接new,而是调用obtain()来尝试从仓库中获取。

第一次调用Obtain时,仓库为空,那么就调用构造方法创建Message对象。当Message不在使用的时候,调用recycle()方法可以把该对象放在仓库中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public void recycle() {
if (isInUse()) {
if (gCheckRecycle) {
throw new IllegalStateException("This message cannot be recycled because it "
+ "is still in use.");
}
return;
}
recycleUnchecked();
}

void recycleUnchecked() {
// Mark the message as in use while it remains in the recycled object pool.
// Clear out all other details.
flags = FLAG_IN_USE;
what = 0;
arg1 = 0;
arg2 = 0;
obj = null;
replyTo = null;
sendingUid = -1;
when = 0;
target = null;
callback = null;
data = null;

synchronized (sPoolSync) {
if (sPoolSize < MAX_POOL_SIZE) {
// 初次sPool为null
next = sPool;
sPool = this;
sPoolSize++;
}
}

放在仓库中,实际上就是为该对象增加了一个引用而已,这样虚拟机就不会回收它了。等下次使用在使用obtain方法尝试获取Message对象的时候,就可以省去分配对象的过程了,直接从仓库中获取。

可能会有疑问,平时在使用过程中,并没有手动调用过recycle()方法啊,其实Looper在处理消息时,会调用Message的recyle()方法!!!

handler

在创建Message对象时,需要关联一个Handler,那么现在看看这个Handler是什么。

源码位置:

1
Android-6/frameworks/base/core/java/android/os/Handler.java

Handler充当了消息发送者和处理者的角色(定义了如何处理消息,但是是由Looper调用的,执行发生在消息处理线程),与Handler关联的Message可以有多个,那么Handler就要提供一个分发的功能,因为不同的消息,处理肯定也不一样。在一个线程中,可以只使用一个Handler对象来处理所有消息,也可以有多个。

如何创建Handler

Handler的创建必须关联一个Looper,因为Handler负责发送消息,那么自然要指定谁来接收这个消息了。如果创建Handler没有关联Looper,那么默认关联当前线程中的Looper对象。

Handler也要负责定义消息的实际处理逻辑,所以创建Handler时,可以传递一个callback,负责这个Handler所有消息的处理。但这不是必须的,因为Message类中也有callback变量,而且可以使用下面的方法创建Message并指定消息处理逻辑:

1
public static Message obtain(Handler h, Runnable callback);

创建Handler的构造方法如下,构造Handler时会将与之关联的Looper中的消息队列也通过一个引用保存在Handler对象中,方便直接操作。

普通的消息的Handler:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

// 关联当前线程的Looper
// 也指定了处理该Handler中所有Message的回调方法
public Handler(Callback callback) {
this(callback, false);
}

// 指定关联的Looper
public Handler(Looper looper) {
this(looper, null, false);
}

// 指定关联的Looper
// 也指定了处理该Handler中所有Message的回调方法
public Handler(Looper looper, Callback callback) {
this(looper, callback, false);
}

异步消息的Handler:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 下面三个构造方法中有一个相同的参数:boolean类型的 async
// 为true时,表明异步消息
public Handler(boolean async) {
this(null, async);
}
public Handler(Callback callback, boolean async);
public Handler(Looper looper, Callback callback, boolean async){

mLooper = looper;
// 与这个Handler关联的Looper中的消息队列
mQueue = looper.mQueue;
mCallback = callback;
//在将该消息放入消息队列的enqueueMessage方法中,该变量为true时,会设置该消息为异步消息
mAsynchronous = async;
}

普通消息和异步消息的区别在于是否调用Message的setAsynchronous将其设置为异步消息了。默认创建的消息的均为普通消息,需要显示调用setAsynchronous将其变为异步消息。

1
2
3
4
5
6
7
public void setAsynchronous(boolean async) {
if (async) {
flags |= FLAG_ASYNCHRONOUS;
} else {
flags &= ~FLAG_ASYNCHRONOUS;
}
}

根据创建Handler方式的不同,在Handler发送消息到消息队列的时候设置是否为异步消息。异步消息和普通消息的处理之后当消息队列中存在一个target为null的message的时候,才会不同。当消息队列中没有这样的message的时候,处理是一样的。

Handler如何发送Message

有两大类发送消息的方法。

  1. send类,该类方法发送的消息一般用于发送传统的带有消息id的消息,也可以携带一些其他信息。消息的处理由Handler设定callback方法处理。当然Message也是可以指定消息处理的callback的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 发送一个Message,希望马上处理
public final boolean sendMessage(Message msg)

// 创建一个Message,初始化what,然后发送,希望马上处理
public final boolean sendEmptyMessage(int what)

// 创建一个Message,初始化what,然后在uptimeMillis毫秒时 发送,希望在指定的时间处理
public final boolean sendEmptyMessageAtTime(int what, long uptimeMillis)

// 延时delayMillis毫秒,在发送Message,希望延时一段时间处理
public final boolean sendMessageDelayed(Message msg, long delayMillis)

// 在uptimeMillis毫秒时,发送Message,希望在指定的时间处理
public boolean sendMessageAtTime(Message msg, long uptimeMillis)

上述send方法最终都会调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public final boolean sendMessageDelayed(Message msg, long delayMillis)
{

if (delayMillis < 0) {
delayMillis = 0;
}
return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
}

public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
MessageQueue queue = mQueue;
if (queue == null) {
RuntimeException e = new RuntimeException(
this + " sendMessageAtTime() called with no mQueue");
Log.w("Looper", e.getMessage(), e);
return false;
}
return enqueueMessage(queue, msg, uptimeMillis);
}
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
msg.target = this;//在这里设置了Message的target字段
// 如果是异步Handler的时候,mAsynchronous为true
// 此时就会把它发送的消息设置为异步消息,然后放入消息队列中
if (mAsynchronous) {
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);
}

所谓的send消息,只是把消息插入到了消息队列中,同时指定消息处理的时间。MesssageQueue中的消息是按照时间排序的,后面在细说。

如果指定的时间为0,则表示需要立即处理,MesssageQueue会把这条消息放到队列的头部:

1
2
3
4
5
6
7
8
9
10
11
// 发送一个Message,放在消息队列前面
public final boolean sendMessageAtFrontOfQueue(Message msg){
MessageQueue queue = mQueue;
if (queue == null) {
RuntimeException e = new RuntimeException(
this + " sendMessageAtTime() called with no mQueue");
Log.w("Looper", e.getMessage(), e);
return false;
}
return enqueueMessage(queue, msg, 0);
}

该方法用于将本消息放在消息队列的最开始的位置,创建的消息一般是希望马上尽快处理,非常紧急。

  1. post类的发送方法中的一个必要参数是一个Runnable类的对象,然后在post方法内部调用getPostMessage(Runnable r),得到绑定该Runnable对象的Message对象。最后在发送。说白了,post类型的方法发送的message,会绑定一个用于消息处理的Runnable对象。

和send一样,post也有下列若干方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
sendMessage
{
return sendMessageDelayed(getPostMessage(r), 0);
}
public final boolean postAtTime(Runnable r, long uptimeMillis)
{

return sendMessageAtTime(getPostMessage(r), uptimeMillis);
}

public final boolean postAtTime(Runnable r, Object token, long uptimeMillis)
{

return sendMessageAtTime(getPostMessage(r, token), uptimeMillis);
}
public final boolean postDelayed(Runnable r, long delayMillis)
{

return sendMessageDelayed(getPostMessage(r), delayMillis);
}
public final boolean postAtFrontOfQueue(Runnable r)
{

return sendMessageAtFrontOfQueue(getPostMessage(r));
}

getPostMessage方法定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private static Message getPostMessage(Runnable r) {
Message m = Message.obtain();
// Message类中的callback为Runnable类型
m.callback = r;
return m;
}

// 消息可以携带一个对象
private static Message getPostMessage(Runnable r, Object token) {
Message m = Message.obtain();
m.obj = token;
m.callback = r;
return m;
}

handler如何处理消息

再来看看Looper中的loop方法中是如何处理消息的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
    public static void loop() {
// 拿到与当前线程关联的looper对象
final Looper me = myLooper();
if (me == null) {
throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
}
final MessageQueue queue = me.mQueue;
..............................................

for (;;) {
Message msg = queue.next(); // might block
...................................
msg.target.dispatchMessage(msg);//-----------------------调用Handler中的dispatchMessage对消息进行分发处理
..........................................
msg.recycleUnchecked();// 回收Message对象
}
}

从中可以看出Handler是使用dispatchMessage来进行消息处理的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void dispatchMessage(Message msg) {
if (msg.callback != null) {
handleCallback(msg);
} else {
if (mCallback != null) {
if (mCallback.handleMessage(msg)) {
return;
}
}
handleMessage(msg);
}
}

private static void handleCallback(Message message) {
message.callback.run();
}

public interface Callback {
public boolean handleMessage(Message msg);
}

逻辑很简单清晰。如果Message中指定了消息处理callback,那么就调用这个callback处理消息。其中post类型的方法发送的message就符合这种情况。

如果Message中没有指定callback,那么再看创建Handler对象时,有没有绑定一个callback,有的话,就是用这个callback进行处理。

如果Handler对象没有绑定callback,那么就调用handleMessage()方法处理。这个方法通常由其子类实现。我们在使用Handler的时候经常使用的就是定义一个继承Handler的类,并实现其handleMessage()方法。

这里强调下,当消息处理结束后,调用了Message的recycleUnchecked(),对该Message对象进行了回收,放入了仓库中,下次使用obtain方法获取Message对象时,就可以直接使用了,不在需要重新new了。

消息队列MessageQueue

MessageQueue类是整个Android Handler消息处理机制的难点也是精华。

源码位置:

1
Android-6/frameworks/base/core/java/android/os/MessageQueue.java

MessageQueue类定义节选:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public final class MessageQueue {

private final boolean mQuitAllowed;

private long mPtr; // used by native code

Message mMessages;
private final ArrayList<IdleHandler> mIdleHandlers = new ArrayList<IdleHandler>();
private SparseArray<FileDescriptorRecord> mFileDescriptorRecords;
private IdleHandler[] mPendingIdleHandlers;
private boolean mQuitting;

// Indicates whether next() is blocked waiting in pollOnce() with a non-zero timeout.
private boolean mBlocked;

// The next barrier token.
// Barriers are indicated by messages with a null target whose arg1 field carries the token.
private int mNextBarrierToken;

private native static long nativeInit();
private native static void nativeDestroy(long ptr);
private native void nativePollOnce(long ptr, int timeoutMillis); /*non-static for callbacks*/
private native static void nativeWake(long ptr);
private native static boolean nativeIsPolling(long ptr);
private native static void nativeSetFileDescriptorEvents(long ptr, int fd, int events);
..............

其中Message mMessages记录的就是一条消息链表。另外还有几个native函数,这就说明MessageQueue会通过JNI技术调用到底层代码。mMessages域记录着消息队列中所有Java层的实质消息。一定要注意了,mMessages记录的只是Java层的消息,不包括native层的。而mptr记录的则是native层的message.

MessageQueue的示意图如下:

Android消息处理机制之Handler

Android系统在Native层也实现了一个用于native进程中的线程通信的looper消息处理机制。java层的Looepr和Native的Looper并没有什么直接的关系。MessageQueue虽然使用了Native的looper类,但也仅仅使用了它的等待/唤醒机制。其余的如消息队列的实现都是在java层。

关于MessageQueue的不得不知的内幕

再次看一下Looper中的loop方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
    public static void loop() {
// 拿到与当前线程关联的looper对象
final Looper me = myLooper();
if (me == null) {
throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
}
final MessageQueue queue = me.mQueue;
..............................................
// 无限死循环
for (;;) {
Message msg = queue.next(); // might block
...................................
msg.target.dispatchMessage(msg);//-----------------------调用Handler中的dispatchMessage对消息进行分发处理
..........................................
msg.recycleUnchecked();// 回收Message对象
}
}

本质上是一个无限for循环。纵然loop是运行在单独一个线程中的,一个死循环对性能的消耗也是很可观的。这是Android系统不允许的。正常的逻辑应该是这样的,消息队列中没有消息要处理的时候,线程应该被挂起。等有消息被传入消息队列的时候,在唤醒线程去处理消息。for循环中的三个方法有两个方法我们已经知道是做什么的了,其中没有涉及挂起线程的代码,那么只能是消息队列中的next方法在作怪了!

对于Looper而言,它主要关心的是从消息队列里取消息,而后分派消息。然而对消息队列而言,在取消息时还要考虑更多技术细节。它关心的细节有:

1)如果消息队列里目前没有合适的消息可以摘取,那么不能让它所属的线程“傻转”,而应该使之阻塞;

2)队列里的消息应该按其“到时”的顺序进行排列,最先到时的消息会放在队头,也就是mMessages域所指向的消息,其后的消息依次排开;

3)阻塞的时间最好能精确一点儿,所以如果暂时没有合适的消息节点可摘时,要考虑链表首个消息节点将在什么时候到时,所以这个消息节点距离当前时刻的时间差,就是我们要阻塞的时长。

4)有时候外界希望队列能在即将进入阻塞状态之前做一些动作,这些动作可以称为idle动作,我们需要兼顾处理这些idle动作。一个典型的例子是外界希望队列在进入阻塞之前做一次垃圾收集。

MessageQueue的创建

MessageQueue是和Looper绑定的,一个Looper只能有一个MessageQueue.在Looper的构造方法中会创建MessageQueue对象,然后关联到Looper中。

其构造方法

1
2
3
4
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit();
}

jni方法nativeInit如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
if (!nativeMessageQueue) {
jniThrowRuntimeException(env, "Unable to allocate native queue");
return 0;
}

nativeMessageQueue->incStrong(env);
return reinterpret_cast<jlong>(nativeMessageQueue);
}
NativeMessageQueue::NativeMessageQueue() :
mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
mLooper = Looper::getForThread();
if (mLooper == NULL) {
mLooper = new Looper(false);
Looper::setForThread(mLooper);
}
}

Looper::Looper(bool allowNonCallbacks) :
mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),
mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {

//创建一个eventfd,这是一个计数器相关的fd,计数器不为零是有可读事件发生,read清零计数器,write递增计数器;返回的fd可以进行如下操作:read、write、select(poll、epoll)、close
mWakeEventFd = eventfd(0, EFD_NONBLOCK);
LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd. errno=%d", errno);

AutoMutex _l(mLock);
rebuildEpollLocked();
}

void Looper::rebuildEpollLocked() {
............................................
// 创建epoll
mEpollFd = epoll_create(EPOLL_SIZE_HINT);
LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno);

struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = EPOLLIN;
// 监听前面创建的eventfd
eventItem.data.fd = mWakeEventFd;
int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance. errno=%d",
errno);

for (size_t i = 0; i < mRequests.size(); i++) {
const Request& request = mRequests.valueAt(i);
struct epoll_event eventItem;
request.initEventItem(&eventItem);

int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem);
if (epollResult < 0) {
ALOGE("Error adding epoll events for fd %d while rebuilding epoll set, errno=%d",
request.fd, errno);
}
}
}

可以看到在构造Looper对象时,其内部创建事件对象,用来实现等待/通知(wait/notify) 机制。linux内核会为这个对象维护一个64位的计数器(uint64_t)。并且使用第一个参数(initval)初始化这个计数器。调用eventfd()这个函数就会返回一个新的文件描述符(event object)

创建这个事件对象后,可以对其做如下操作。

write 将缓冲区写入的8字节整形值加到内核计数器上。

read 读取8字节值, 并把计数器重设为0.如果调用read的时候计数器为0, 要是eventfd是阻塞的, read就一直阻塞在这里,否则就得到 一个EAGAIN错误。

除此之外Native的Looper还创建了一个epoll来监听事件对象的“读操作”。也就是说,是利用epoll机制来完成阻塞动作的。每当我们向消息队列发送事件时,最终会间接向这个事件对象写入uint64_t 类型的1.于是epoll立即就感知到了风吹草动,如果有读操作而阻塞的线程,那么就会被唤醒了。

如何将Message放入消息队列中

Handler的enqueueMessage方法将一个Message放入消息队列的时候,实际调用的就是MessageQueue的enqueueMessage方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

boolean enqueueMessage(Message msg, long when) {
.............................
msg.when = when;
// p 指向消息队列头
Message p = mMessages;
boolean needWake;
if (p == null || when == 0 || when < p.when) {
// 消息队列为null,或者消息需要插到消息队列的头部
// 这是如果线程阻塞了,就需要被唤醒
// mBlocked的值由next()方法来设置
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
// 此时,新消息会插入到链表的内部,一般情况下,这不需要尝试唤醒线程
// 但当消息是异步消息而且消息队列中存在target为null的消息的时候,就要尝试唤醒线程
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
// 如果已经有一条异步消息在消息队列了,而且还在本条消息之前处理,那么就不需要唤醒了
needWake = false;
}
}
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}

//是否唤醒
if (needWake) {
nativeWake(mPtr);
}
}
return true;
}

enqueueMessage()方法主要是把消息放入消息队列中,消息队列的组织也很简单,利用Message类中的next”指针”形成一个从头指向尾的单向链表。 因为消息链表是按时间进行排序的,所以主要是在比对Message携带的when信息。

enqueueMessage()方法会近尽量避免唤醒线程,因为这不是他的主要职责。除非插入的消息是立即要处理的消息,比如Handler使用sendMessageAtFrontOfQueue()方法发送的消息,因为when为0,所以enqueueMessage()方法在处理的时候会尝试唤醒线程来处理。

还有一种情况就是消息队列中存在target为null的消息,而且此时放入消息队列中的消息是一个异步消息,那么也要尝试唤醒线程。

target为null的Message是一个”同步分割栏”,它就像一个卡子,卡在消息链表中的某个位置,当消息循环不断从消息链表中摘取消息并进行处理时,一旦遇到这种“同步分割栏”,那么即使在分割栏之后还有若干已经到时的普通Message,也不会摘取这些消息了。请注意,此时只是不会摘取“普通Message”了,如果队列中还设置有“异步Message”,那么还是会摘取已到时的“异步Message”的。

如何向消息队列中放入”同步分割栏”呢?肯定不能使用Handler提供的发送Message的方法了。

方法就是MessageQueue中的postSyncBarrier():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public int postSyncBarrier() {
return postSyncBarrier(SystemClock.uptimeMillis());
}
private int postSyncBarrier(long when) {
// Enqueue a new sync barrier token.
// We don't need to wake the queue because the purpose of a barrier is to stall it.
synchronized (this) {
final int token = mNextBarrierToken++;
final Message msg = Message.obtain();
msg.markInUse();
msg.when = when;
msg.arg1 = token;

Message prev = null;
Message p = mMessages;
if (when != 0) {
while (p != null && p.when <= when) {
prev = p;
p = p.next;
}
}
if (prev != null) { // invariant: p == prev.next
msg.next = p;
prev.next = msg;
} else {
msg.next = p;
mMessages = msg;
}
return token;
}
}

“同步分割栏”这种卡子会一直卡在消息队列中,除非我们调用MessageQueue中的removeSyncBarrier()删除这个卡子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public void removeSyncBarrier(int token) {
// Remove a sync barrier token from the queue.
// If the queue is no longer stalled by a barrier then wake it.
synchronized (this) {
Message prev = null;
Message p = mMessages;
while (p != null && (p.target != null || p.arg1 != token)) {
prev = p;
p = p.next;
}
if (p == null) {
throw new IllegalStateException("The specified message queue synchronization "
+ " barrier token has not been posted or has already been removed.");
}
final boolean needWake;
if (prev != null) {
prev.next = p.next;
needWake = false;
} else {
mMessages = p.next;
needWake = mMessages == null || mMessages.target != null;
}
p.recycleUnchecked();

// If the loop is quitting then it is already awake.
// We can assume mPtr != 0 when mQuitting is false.
if (needWake && !mQuitting) {
nativeWake(mPtr);
}
}
}

删除“同步分割栏”的时候,如果它的前面还有其他Message,则不需要唤醒线程。如果“同步分割栏”就是消息队列的第一个消息,而且如果“同步分割栏”后面还有其他非“同步分割栏”的Message的时候,就尝试唤醒线程。

现在再来看看这个nativeWake()方法:

1
Android-6/frameworks/base/core/jni/android_os_MessageQueue.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->wake();
}
void NativeMessageQueue::wake() {
mLooper->wake();
}
void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ wake", this);
#endif

uint64_t inc = 1;
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
if (nWrite != sizeof(uint64_t)) {
if (errno != EAGAIN) {
ALOGW("Could not write wake signal, errno=%d", errno);
}
}
}

做的事情很简单,就是向一个管道中写入一个uint64_t类型的数据1.

MessageQueue的消息循环

这里指的就是MessageQueue的next()方法了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
Message next() {
// Return here if the message loop has already quit and been disposed.
// This can happen if the application tries to restart a looper after quit
// which is not supported.
final long ptr = mPtr;
if (ptr == 0) {
return null;
}

int pendingIdleHandlerCount = -1; // -1 only during first iteration
int nextPollTimeoutMillis = 0;
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}

//调用jni方法使其阻塞nextPollTimeoutMillis毫秒,当nextPollTimeoutMillis为-1时,要一直阻塞
nativePollOnce(ptr, nextPollTimeoutMillis);

// 使用this对象同步,只要next方法还没退出,在调用本对象的任何方法都将导致调用线程挂起。
synchronized (this) {
// Try to retrieve the next message. Return if found.
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages; // 得到消息队列的头部
if (msg != null && msg.target == null) {
//此时说明消息队列的头部是一个“同步分割栏”
// 则查找消息队列中的异步消息
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
// 找到了一个异步消息
if (msg != null) {
if (now < msg.when) {
//当前还没倒Message希望处理的时刻,计算需要等待的时长
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// 当前已经到了Message希望处理的时刻
// 那么设置阻塞唤醒标志为false
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
if (DEBUG) Log.v(TAG, "Returning message: " + msg);
msg.markInUse();
//将找到的消息冲消息队列中取出返回,此时该Message从消息队列中也被移除了
return msg;
}
} else {
// No more messages.
// 表明消息队列中没有要处理的消息
nextPollTimeoutMillis = -1;
}

// Process the quit message now that all pending messages have been handled.
// 如果退出标志设置了,则销毁native对象,然后返回
if (mQuitting) {
dispose();
return null;
}

// If first time idle, then get the number of idlers to run.
// Idle handles only run if the queue is empty or if the first message
// in the queue (possibly a barrier) is due to be handled in the future.
// 如果是第一次进入,idle会检查是否安装了idle handler,
// 实际上就是获取idle handler的数量
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
if (pendingIdleHandlerCount <= 0) {
// No idle handlers to run. Loop and wait some more.
mBlocked = true;// 设置阻塞唤醒标志为true
continue;// 没有安装 idle handler 则继续for循环
}

// idle handler 方法数组mPendingIdleHandlers中
if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}

// Run the idle handlers.
// We only ever reach this code block during the first iteration.
// 处理所有的idle handler
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; // release the reference to the handler

boolean keep = false;
try {
// 如果idle handler 返回false,表示不在需要继续处理
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf(TAG, "IdleHandler threw exception", t);
}

if (!keep) {
//返回false,那么移除该idle handler
// 否则一直被循环调用
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}

// Reset the idle handler count to 0 so we do not run them again.
pendingIdleHandlerCount = 0;

// While calling an idle handler, a new message could have been delivered
// so go back and look again for a pending message without waiting.
// 如果有 idle handler ,重置nextPollTimeoutMillis为0,让for循环继续,而不是阻塞线程
nextPollTimeoutMillis = 0;
}
}
  1. 首先检查消息队列中的第一条消息是否是”同步分割栏”,如果是,寻找队列中第一条异步消息,找到后设为当前处理的消息;如果第一条消息不是”同步分割栏”,把第一条消息设置为当前处理的消息。

  2. 如果当前处理的消息不为null,检查该校的处理时间是否已经超时,如果没有,计算等待的时长。如果处理的时间到了,next()方法将返回该消息并退出。

  3. 如果当前处理的消息为null,表示队列中没有可以处理的消息,设置等待时间为-1

  4. 检查消息队列中的退出标志,如果设置了,那么销毁native层的对象,然后next()方法退出

  5. 检查是否安装了处理idle状态的回调方法,如果没有安装则回到for循环的最开始处重新执行,也就是执行nativePollOnce()方法挂起线程并等待新的消息到来。

  6. 如果安装了idle状态的回调方法,则调用所有的回调方法,同时把nextPollTimeoutMillis设置为0.这表明在安装了idle处理方法的情况下,消息队列的循环处理是不会被阻塞的,这样idle处理函数将会不停的被调用直到处理方法返回false。

实际上next这个方法里的for循环并不是起循环摘取消息节点的作用,而是为了连贯“当前时间点”和“处理下一条消息的时间点”。简单地说,当“定时机制”触发“摘取一条消息”的动作时,会判断事件队列的首条消息是否真的到时了,如果已经到时了,就直接返回这个msg,而如果尚未到时,则会努力计算一个较精确的等待时间(nextPollTimeoutMillis),计算完后,那个for循环会掉过头再次调用到nativePollOnce(mPtr, nextPollTimeoutMillis),进入阻塞状态,从而等待合适的时长。

当消息队列中没有消息需要马上处理时,会判断用户是否设置了Idle Handler,如果有的话,则会尝试处理mIdleHandlers中所记录的所有Idle Handler,此时会逐个调用这些Idle Handler的queueIdle()成员函数。

如果要彻底搞清楚next方法,那么就必须搞定nativePollOnce()到底做了什么事情。

nativePollOnce()起到了阻塞作用,保证消息循环不会在无消息处理时一直在那里“傻转”。那么,nativePollOnce()函数究竟是如何实现阻塞功能的呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis)
{

NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
...................
mLooper->pollOnce(timeoutMillis);
.....................
}
inline int pollOnce(int timeoutMillis) {
return pollOnce(timeoutMillis, NULL, NULL, NULL);
}

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
.................
result = pollInner(timeoutMillis);
}
}



int Looper::pollInner(int timeoutMillis) {
......................
// 阻塞,等待
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);

.............
// 处理所有epoll事件
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
// 是我们创建的事件对象fd
if (fd == mWakeEventFd) {
// 有数据可读
if (epollEvents & EPOLLIN) {
awoken();
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
}
} else {
ssize_t requestIndex = mRequests.indexOfKey(fd);
.......................
}
}
..........................
return result;
}



void Looper::awoken() {
uint64_t counter;
TEMP_FAILURE_RETRY(read(mWakeEventFd, &counter, sizeof(uint64_t)));
}

pollInner()调用epoll_wait()时传入的timeoutMillis参数,其实来自于前面所说的MessageQueue的next()函数里的nextPollTimeoutMillis,next()函数里在以下3种情况下,会给nextPollTimeoutMillis赋不同的值:

  1. 如果消息队列中的下一条消息还要等一段时间才到时的话,那么nextPollTimeoutMillis赋值为Math.min(msg.when - now, Integer.MAX_VALUE),即时间差;

  2. 如果消息队列已经是空队列了,那么nextPollTimeoutMillis赋值为-1;

  3. 不管前两种情况下是否已给nextPollTimeoutMillis赋过值了,只要队列中有Idle Handler需要处理,那么在处理完所有Idle Handler之后,会强制将nextPollTimeoutMillis赋值为0。这主要是考虑到在处理Idle Handler时,不知道会耗时多少,而在此期间消息队列的“到时情况”有可能已发生改变。

不管epoll_wait()的超时阀值被设置成什么,只要程序从epoll_wait()中返回,说明有事件可以被处理了,否则就一直阻塞在epoll_wait 方法中。

当从epoll_wait中返回了,调用awoken(),而这个方法也是很简单的读了一下数值而已。

Android java层的Handler机制还是很简单的。无非就是向Looper的消息队列中插入Message,而后再由Looper在消息循环里具体处理。因为消息队列本身不具有链表一变动就能马上感知的功能,所以它需要借助linux内核提供的等待/通知机制来监听变动。java层的Handler机制仅仅是利用native层的Looper中提供的阻塞唤醒机制而已。当消息处理线程调用next方法尝试获取message的时候,实际上就是在等待epoll_wait方法返回。epoll_wait方法在监听我们创建的事件对象的读事件。当没有内容可读的时候就会一直阻塞,而当我们使用发送Message的时候,就会向这个时间对象中写入uint64_t类型的数据1,此时有内容可读了epoll_wait就可以返回了。仅此而已。