Android 线程4件套 MessageQueue Message Looper Handler之Looper

时间:2021-11-09 17:29:39

Android驱动线程消息公四件套:

消息载体:Message

消息队列:MessageQueue

消息循环:Looper

消息处理:Handler


以Looper为基础,形象的表示为一个无限循环的环;在它运行期间会一直从MessageQueue中取出Message,然后通过Handler处理掉。


1. Looper

在OS中存在两个Looper:

第一个Looper:

文件位置:

frameworks_gaia/gaiacore/os/Looper.cpp

frameworks_gaia/base/include/gaiainternal/Looper.h
这个Looper是真正 用于Message循环的Looper。

/**
* Class used to run a message loop for a thread. Threads by default do
* not have a message loop associated with them; to create one, call
* {@link #prepare} in the thread that is to run the loop, and then
* {@link #loop} to have it process messages until the loop is stopped.
*
* <p>Most interaction with a message loop is through the
* {@link Handler} class.
*
* <p>This is a typical example of the implementation of a Looper thread,
* using the separation of {@link #prepare} and {@link #loop} to create an
* initial Handler to communicate with the Looper.
*
* <pre>
* class LooperThread extends Thread {
* public Handler mHandler;
*
* public void run() {
* Looper.prepare();
*
* mHandler = new Handler() {
* public void handleMessage(Message msg) {
* // process incoming messages here
* }
* };
*
* Looper.loop();
* }
* }</pre>
*/

第二个Looper:

位置:

common/frameworks/base/libs/utils/Looper.cpp

common/frameworks/base/include/utils/Looper.h

这个Looper是为了epoll监视文件设备的:

/** * A polling loop that supports monitoring file descriptor events, optionally 
* using callbacks. The implementation uses epoll() internally.
*
* A looper can be associated with a thread although there is no requirement that it must be.
*/




Android 线程4件套 MessageQueue Message Looper Handler之Looper

先看看第一个Looper的源码。

1.1. 成员变量

    //私有成员
static android::Mutex sClassLock;
mutable android::Mutex mInstLock;

// sThreadLocal.get() will return NULL unless you've called prepare().
static android::sp<ThreadLocal<Looper> > sThreadLocal;

android::sp<MessageQueue> mQueue;
android::wp<gaia::GThread> mThread;
volatile bool mRun;

android::sp<Printer> mLogging;
static android::sp<Looper> sMainLooper; // guarded by Looper.class

  • sThreadLocal不是一个线程,而是为本线程保存唯一的一个变量。可以参看上一篇文章:http://blog.csdn.net/passerbysrs/article/details/17966527

起作用是避免过多的参数传递,通过ThreadLocal就能获取本线程的此变量。当然放在Looper中也就是为了本线程的其他地方也能轻松获取到Looper变量;

  • mQueue,就是这个Looper所对应的MessageQueue, 消息队列,后续会详细介绍。
  • mThread才是保存其相关的线程;
  • sMainLooper 保存线程的主Looper,应该就是本线程的唯一的MessageLooper;

1.2. 接口

构造函数:

Looper::Looper(bool quitAllowed) {
mQueue = new MessageQueue(quitAllowed);//创建MessageQueue,quitAllowed是判定,是否支持暂停;
mRun = true;
mThread = GThread::current();//当前线程信息
}

其他接口:

     /** Initialize the current thread as a looper.
* This gives you a chance to create handlers that then reference
* this looper, before actually starting the loop. Be sure to call
* {@link #loop()} after calling this method, and end it by calling
* {@link #quit()}.
*/
static void prepare();

/**
* Initialize the current thread as a looper, marking it as an
* application's main looper. The main looper for your application
* is created by the Android environment, so you should never need
* to call this function yourself. See also: {@link #prepare()}
*/
static void prepareMainLooper();

/** Returns the application's main looper, which lives in the main thread of the application.
*/
// synchronized
static android::sp<Looper> getMainLooper();

/**
* Run the message queue in this thread. Be sure to call
* {@link #quit()} to end the loop.
*/
static void loop();

/**
* Return the Looper object associated with the current thread. Returns
* null if the calling thread is not associated with a Looper.
*/
static android::sp<Looper> myLooper();

/**
* Control logging of messages as they are processed by this Looper. If
* enabled, a log message will be written to <var>printer</var>
* at the beginning and ending of each message dispatch, identifying the
* target Handler and message contents.
*
* @param printer A Printer object that will receive log messages, or
* null to disable message logging.
*/
void setMessageLogging(const android::sp<Printer>& printer);

/**
* Return the {@link MessageQueue} object associated with the current
* thread. This must be called from a thread running a Looper, or a
* NullPointerException will be thrown.
*/
static android::sp<MessageQueue> myQueue();
    void quit();    int postSyncBarrier();    void removeSyncBarrier(int token);    /**     * Return the Thread associated with this Looper.     */    android::sp<gaia::GThread> getThread();    android::sp<MessageQueue> getQueue();    void dump(android::sp<Printer>& pw, const android::sp<String>& prefix);    android::sp<String> toString();
1.1.1. 先看Prepare

sp<ThreadLocal<Looper> > Looper::sThreadLocal =  new ThreadLocal<Looper>();
void Looper::prepare(bool quitAllowed) {
if (sThreadLocal->get() != NULL) { //通过THreadlocal来判定是否已经存在Looper了;
GLOGW("Only one Looper may be created per thread myLooper=>%p", sThreadLocal->get().get());
return;
}
sThreadLocal->set(new Looper(quitAllowed)); //THreadlocal保存Looper对象;
if (sThreadLocal->get() == NULL) {
GLOGE("Cannot prepare Looper");
return;
}
}

1.1.2. prepareMainLooper

void Looper::prepareMainLooper() {
prepare(false);
AutoMutex _l(sClassLock);
if (sMainLooper != NULL) {
GLOGE("The main Looper has already been prepared.");
}
sMainLooper = myLooper();//将sMainLooper设置为myLooper()
}
sp<Looper> Looper::myLooper() {
    return sThreadLocal->get();
}

1.1.3. loop

void Looper::loop() {
sp<Looper> me = myLooper(); //取出looper
if (me == NULL) {
GLOGW("Looper cannot loop without preparation");
return;
}
sp<MessageQueue> queue = me->mQueue; //取出MessageQueue

// Make sure the identity of this thread is that of the local process,
// and keep track of what that identity token actually is.
IPCThreadState::self()->clearCallingIdentity();
int64_t ident = IPCThreadState::self()->clearCallingIdentity();
int count = 0;
bool looperLog = false;

for (;;) {
sp<Message> msg = queue->next(); // might block!!取出MessageQueue中的Message
if (msg == NULL) {
// No target is a magic identifier for the quit message.
break;
}

// This must be in a local variable, in case a UI event sets the logger
sp<Printer> logging = me->mLogging;
if (logging != NULL) {
logging->println(String::format(">>>>> Dispatching to %s %p: %d",
msg->target->toString()->string(),
msg->callback.get(),
msg->what));
}

if (count++ > 100) {
count = 0;
char value[30];
property_get("LooperDebug", value, NULL);
int32_t tid = atoi(value);
if (tid == static_cast<int32_t>(syscall(__NR_gettid)))
looperLog = true;
}

if (looperLog) {
LOGD("Dispatching to %s %p: %d",
msg->target->toString()->string(),
msg->callback.get(),
msg->what);
}
msg->target->dispatchMessage(msg);//通过msg的target派发Message
if (looperLog) {
LOGD("Finished %s %p",
msg->target->toString()->string(),
msg->callback.get());
}

if (logging != NULL) {

logging->println(String::format("<<<<< Finished to %s %p",
msg->target->toString()->string(),
msg->callback.get()));
}

// Make sure that during the course of dispatching the
// identity of the thread wasn't corrupted.
int64_t newIdent = IPCThreadState::self()->clearCallingIdentity();
if (ident != newIdent) {
sp<StringBuilder> sb = new StringBuilder(128);
sp<String> name = ProcessName::getProcessName();
if (name != NULL) sb->append(name + String(": "));
sb->append("Thread identity changed from 0x");
sb->append(LongLong::toHexString(ident));
sb->append(" to 0x");
sb->append(LongLong::toHexString(newIdent));
sb->append(" while dispatching to ");
sb->append(String::format(" %s %p what = %d",
msg->target->getClassName(),
msg->callback.get(),
msg->what));
GLOGE("%s", sb->toString()->string());
}

msg->recycle();//回收msg
}
// Looper is willing to quit.
// This usually means that thread attached by this looper will be destroyed.
// So we should clear the TLS for this thread to release this looper.
sThreadLocal->clear();将threadlocal清除
}

1.1.4. quit

void Looper::quit() {
mQueue->quit();//暂停队列,这样MessageQueue将不再接收插入的message
}

1.1.5. postSyncBarrier与removeSyncBarrier

     /**
* Posts a synchronization barrier to the Looper's message queue.
*
* Message processing occurs as usual until the message queue encounters the
* synchronization barrier that has been posted. When the barrier is encountered,
* later synchronous messages in the queue are stalled (prevented from being executed)
* until the barrier is released by calling {@link #removeSyncBarrier} and specifying
* the token that identifies the synchronization barrier.
*
* This method is used to immediately postpone execution of all subsequently posted
* synchronous messages until a condition is met that releases the barrier.
* Asynchronous messages (see {@link Message#isAsynchronous} are exempt from the barrier
* and continue to be processed as usual.
*
* This call must be always matched by a call to {@link #removeSyncBarrier} with
* the same token to ensure that the message queue resumes normal operation.
* Otherwise the application will probably hang!
*
* @return A token that uniquely identifies the barrier. This token must be
* passed to {@link #removeSyncBarrier} to release the barrier.
*
* @hide
*/
/* 在MessageQueue中插入栏栅。这样当Looper循环取消息的时候
* 发现有barrier,那么Queue后面的同步消息将不会被执行
* 此方法用于立即推迟执行所有后续的同步消息。而MessageQUeue设置Barrier之后
* 会提供一个Token,我们可以称之为令牌,这个令牌将会用来解开栏栅。
*/
 int Looper::postSyncBarrier() {
return mQueue->enqueueSyncBarrier(uptimeMillis());
}

    /**
     * Removes a synchronization barrier.
     *
     * @param token The synchronization barrier token that was returned by
     * {@link #postSyncBarrier}.
     *
     * @throws IllegalStateException if the barrier was not found.
     *
     * @hide
     */
    void Looper::removeSyncBarrier(int token) {
        mQueue->removeSyncBarrier(token);//移除MessageQueue中的Barrier;token为Barrier的令牌
    }

这个Loop如何运行起来的,要回头看ActivityThread启动部分的代码:

int ActivityThread::main(int argc, char* argv[]) {
GLOGENTRY();
bool isWrapper = false;

// SamplingProfilerIntegration.start();
// Process::setArgV0("<pre-initialized>");
Looper::prepareMainLooper();
sp<ActivityThread> thread = new ActivityThread();
for (int i = 0; i < argc; i++) {
GLOGI("ActivityThread::main argv[%d] = %s",i, argv[i]);
if (strcmp(argv[i] , "--wrapper") == 0) {
GLOGI("ActivityThread::main isWrapper =true");
isWrapper =true;
thread->setWrapper(true);
}
if (strstr(argv[i],"--nice_name=") != NULL) {
String nice_name(argv[i]+strlen("--nice_name="));
GLOGI("ActivityThread::main nice_name = %s", nice_name.string());
thread->setProcessName(new String(nice_name));
}
}

thread->attach(false);

if (sMainThreadHandler == NULL) {
sMainThreadHandler = thread->getHandler();
}

/* enqueue message before loop for test*/
/*
GLOG("argc=%d",argc);
if (argc==2&&strcmp(argv[1],"--zygote")!=0) {
// sp<Intent> intent=new Intent(String8(argv[1]));
// = Intent.CREATOR.createFromParcel(data);
sp<Intent> intent=new Intent;
intent->setComponent(new ComponentName(String8(argv[1]),String8()));
sp<IBinder> b = new BBinder;
int ident = (int)b.get();
sp<ActivityInfo> info = new ActivityInfo();
// = ActivityInfo.CREATOR.createFromParcel(data);
sp<Bundle> state=new Bundle();// = data.readBundle();
Vector<sp<ResultInfo> > ri;
// = data.createTypedArrayList(ResultInfo.CREATOR);
Vector<sp<Intent> > pi; // = data.createTypedArrayList(Intent.CREATOR);
bool notResumed = false; // = data.readInt() != 0;
bool isForward = false; // = data.readInt() != 0;
thread->mAppThread->scheduleLaunchActivity(intent, b, ident, info,
state,ri,pi,notResumed,isForward);
// thread->mAppThread->scheduleExit();
}
*/
Looper::loop();
thread->detach();

// String name = (thread.mInitialApplication != NULL)
// ? thread.mInitialApplication.getPackageName() : "<unknown>";
// Log.i(TAG, "Main thread of " + name + " is now exiting");
return 0;
}

1.1.6. Looper留下的问题

现在第一个Looper中留下的问题,有两个:a) MessageQueue如何取出消息的;b). 线程信息mThread用来干嘛

接下来介绍第二个Looper,我们先叫它事件Looper。


2. 事件Looper

这个Looper与之前的消息Looper不同在于其处理的事情是不一样的,事件Looper在Android中放在Jni层,负责消息队列MessageQueue的唤醒/等待;

2.1. 私有成员

     struct Request { //请求信息
        int fd;
        int ident;
        sp<LooperCallback> callback;
        void* data;
    };

    struct Response {//响应信息
        int events;
        Request request;
    };

    struct MessageEnvelope { //Jni层的消息
        MessageEnvelope() : uptime(0) { }

        MessageEnvelope(nsecs_t uptime, const sp<MessageHandler> handler,
                const Message& message) : uptime(uptime), handler(handler), message(message) {
        }

        nsecs_t uptime; //时间
        sp<MessageHandler> handler; //handler
        Message message; //消息
    };


const bool mAllowNonCallbacks; // immutable

int mWakeReadPipeFd; // immutable //负责读端的管道Fd
int mWakeWritePipeFd; // immutable //负责写端的管道Fd
 Mutex mLock; //锁

Vector<MessageEnvelope> mMessageEnvelopes; // guarded by mLock 存储Jni层的消息,会按时间排序
bool mSendingMessage; // guarded by mLock //是否在发送信息

int mEpollFd; // immutable //Epoll的文件描述字

// Locked list of file descriptor monitoring requests.
KeyedVector<int, Request> mRequests; // guarded by mLock 请求集合

// This state is only used privately by pollOnce and does not require a lock since
// it runs on a single thread.
Vector<Response> mResponses; //响应集合
size_t mResponseIndex; //响应索引
nsecs_t mNextMessageUptime; // set to LLONG_MAX when none //Jni层的消息队列中下一次消息要执行的时间。

这写成员信息只是作为一个参照,在后面讲述原理的时候会用到。

2.2. 接口

    /**
* Creates a looper.
*
* If allowNonCallbaks is true, the looper will allow file descriptors to be
* registered without associated callbacks. This assumes that the caller of
* pollOnce() is prepared to handle callback-less events itself.
*/
Looper(bool allowNonCallbacks);

/**
* Returns whether this looper instance allows the registration of file descriptors
* using identifiers instead of callbacks.
*/
bool getAllowNonCallbacks() const;

/**
* Waits for events to be available, with optional timeout in milliseconds.
* Invokes callbacks for all file descriptors on which an event occurred.
*
* If the timeout is zero, returns immediately without blocking.如果是0 那么立即返回
* If the timeout is negative, waits indefinitely until an event appears.如果是负值,那么将无穷等待一个事件的发生

*
* Returns ALOOPER_POLL_WAKE if the poll was awoken using wake() before
* the timeout expired and no callbacks were invoked and no other file
* descriptors were ready.
*
* Returns ALOOPER_POLL_CALLBACK if one or more callbacks were invoked.
*
* Returns ALOOPER_POLL_TIMEOUT if there was no data before the given
* timeout expired.
*
* Returns ALOOPER_POLL_ERROR if an error occurred.
*
* Returns a value >= 0 containing an identifier if its file descriptor has data
* and it has no callback function (requiring the caller here to handle it).
* In this (and only this) case outFd, outEvents and outData will contain the poll
* events and data associated with the fd, otherwise they will be set to NULL.
*
* This method does not return until it has finished invoking the appropriate callbacks
* for all file descriptors that were signalled.
*/
int pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData);
inline int pollOnce(int timeoutMillis) {
return pollOnce(timeoutMillis, NULL, NULL, NULL);
}

/**
* Like pollOnce(), but performs all pending callbacks until all
* data has been consumed or a file descriptor is available with no callback.
* This function will never return ALOOPER_POLL_CALLBACK.
*/
int pollAll(int timeoutMillis, int* outFd, int* outEvents, void** outData);
inline int pollAll(int timeoutMillis) {
return pollAll(timeoutMillis, NULL, NULL, NULL);
}

/**
* Wakes the poll asynchronously.
*
* This method can be called on any thread.
* This method returns immediately.
*/
void wake();

/**
* Adds a new file descriptor to be polled by the looper.
* If the same file descriptor was previously added, it is replaced.
*
* "fd" is the file descriptor to be added.
* "ident" is an identifier for this event, which is returned from pollOnce().
* The identifier must be >= 0, or ALOOPER_POLL_CALLBACK if providing a non-NULL callback.
* "events" are the poll events to wake up on. Typically this is ALOOPER_EVENT_INPUT.
* "callback" is the function to call when there is an event on the file descriptor.
* "data" is a private data pointer to supply to the callback.
*
* There are two main uses of this function:
*
* (1) If "callback" is non-NULL, then this function will be called when there is
* data on the file descriptor. It should execute any events it has pending,
* appropriately reading from the file descriptor. The 'ident' is ignored in this case.
*
* (2) If "callback" is NULL, the 'ident' will be returned by ALooper_pollOnce
* when its file descriptor has data available, requiring the caller to take
* care of processing it.
*
* Returns 1 if the file descriptor was added, 0 if the arguments were invalid.
*
* This method can be called on any thread.
* This method may block briefly if it needs to wake the poll.
*
* The callback may either be specified as a bare function pointer or as a smart
* pointer callback object. The smart pointer should be preferred because it is
* easier to avoid races when the callback is removed from a different thread.
* See removeFd() for details.
*/
int addFd(int fd, int ident, int events, ALooper_callbackFunc callback, void* data);
int addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data);

/**
* Removes a previously added file descriptor from the looper.
*
* When this method returns, it is safe to close the file descriptor since the looper
* will no longer have a reference to it. However, it is possible for the callback to
* already be running or for it to run one last time if the file descriptor was already
* signalled. Calling code is responsible for ensuring that this case is safely handled.
* For example, if the callback takes care of removing itself during its own execution either
* by returning 0 or by calling this method, then it can be guaranteed to not be invoked
* again at any later time unless registered anew.
*
* A simple way to avoid this problem is to use the version of addFd() that takes
* a sp<LooperCallback> instead of a bare function pointer. The LooperCallback will
* be released at the appropriate time by the Looper.
*
* Returns 1 if the file descriptor was removed, 0 if none was previously registered.
*
* This method can be called on any thread.
* This method may block briefly if it needs to wake the poll.
*/
int removeFd(int fd);

/**
* Enqueues a message to be processed by the specified handler.
*
* The handler must not be null.
* This method can be called on any thread.
*/
void sendMessage(const sp<MessageHandler>& handler, const Message& message);

/**
* Enqueues a message to be processed by the specified handler after all pending messages
* after the specified delay.
*
* The time delay is specified in uptime nanoseconds.
* The handler must not be null.
* This method can be called on any thread.
*/
void sendMessageDelayed(nsecs_t uptimeDelay, const sp<MessageHandler>& handler,
const Message& message);

/**
* Enqueues a message to be processed by the specified handler after all pending messages
* at the specified time.
*
* The time is specified in uptime nanoseconds.
* The handler must not be null.
* This method can be called on any thread.
*/
void sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
const Message& message);

/**
* Removes all messages for the specified handler from the queue.
*
* The handler must not be null.
* This method can be called on any thread.
*/
void removeMessages(const sp<MessageHandler>& handler);

/**
* Removes all messages of a particular type for the specified handler from the queue.
*
* The handler must not be null.
* This method can be called on any thread.
*/
void removeMessages(const sp<MessageHandler>& handler, int what);

/**
* Prepares a looper associated with the calling thread, and returns it.
* If the thread already has a looper, it is returned. Otherwise, a new
* one is created, associated with the thread, and returned.
*
* The opts may be ALOOPER_PREPARE_ALLOW_NON_CALLBACKS or 0.
*/
static sp<Looper> prepare(int opts);

/**
* Sets the given looper to be associated with the calling thread.
* If another looper is already associated with the thread, it is replaced.
*
* If "looper" is NULL, removes the currently associated looper.
*/
static void setForThread(const sp<Looper>& looper);

/**
* Returns the looper associated with the calling thread, or NULL if
* there is not one.
*/
static sp<Looper> getForThread();

接下来一一介绍这写函数接口:

2.1.1. 构造函数

关于epoll相关的只是可以参看:http://blog.csdn.net/passerbysrs/article/details/17922195

Looper::Looper(bool allowNonCallbacks) :
mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {//mNextMessageUptime设置为一个很大的值
int wakeFds[2];
int result = pipe(wakeFds); //创建管道
LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe. errno=%d", errno);

mWakeReadPipeFd = wakeFds[0]; //读管道描述字
mWakeWritePipeFd = wakeFds[1];//写管道描述字

result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK);//不堵塞
LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake read pipe non-blocking. errno=%d",
errno);

result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK);//不堵塞
LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipe non-blocking. errno=%d",
errno);

// Allocate the epoll instance and register the wake pipe.
mEpollFd = epoll_create(EPOLL_SIZE_HINT);//创建epol描述字
LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno);

struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = EPOLLIN;//设置要处理触发的事务类型
eventItem.data.fd = mWakeReadPipeFd; //设置与要处理触发的事务相干的文件描述符
result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem);//在EpollFd上注册epoll_event,关联到了wakeReadPipeFd;
LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epoll instance. errno=%d",
errno);
}

2.1.2. 添加Fd addFd

int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
  if (!callback.get()) {
if (! mAllowNonCallbacks) {
ALOGE("Invalid attempt to set NULL callback but not allowed for this looper.");
return -1;
}
/* ident将作为events的唯一标识,在pollonce时,如果responce中非空,其实也就代表有一部分responce没有处理掉,没有处理掉
* 的愿意是因为request中没有callback,也就是ident不是ALOOPER_POLL_CALLBACK, 那么在pollonce的时候将返回该ident
  */
if (ident < 0) {
 ALOGE("Invalid attempt to set NULL callback with ident < 0.");
return -1;
}
} else {
ident = ALOOPER_POLL_CALLBACK; //非NUll callback,将ident设置为ALOOPER_POLL_CALLBACK
}

int epollEvents = 0;
if (events & ALOOPER_EVENT_INPUT) epollEvents |= EPOLLIN;
if (events & ALOOPER_EVENT_OUTPUT) epollEvents |= EPOLLOUT;

{ // acquire lock
AutoMutex _l(mLock);

Request request; //生成request
request.fd = fd;
request.ident = ident;
request.callback = callback;
request.data = data;

struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = epollEvents;
eventItem.data.fd = fd;

ssize_t requestIndex = mRequests.indexOfKey(fd);//察看request中是否有对应的fd了。
if (requestIndex < 0) {
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem); //如果没有找到相同fd,那么注册
if (epollResult < 0) {
ALOGE("Error adding epoll events for fd %d, errno=%d", fd, errno);
return -1;
}
mRequests.add(fd, request);
} else {
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem); //如果找到了相同fd,那么说明之前已经注册过了,只是修改类型
if (epollResult < 0) {
ALOGE("Error modifying epoll events for fd %d, errno=%d", fd, errno);
return -1;
}
mRequests.replaceValueAt(requestIndex, request); //替换该request
}
} // release lock
return 1;
}


2.1.3. pollOnce


PollOnce从我理解上来分析,在MessageQueue的作用上,主要是防止Cpu一直处于忙碌状态;

也就是在每次MessageQueue轮询过程中,会发现MessageQueue中并不是每次都会有消息需要处理,那么究竟何时该再去轮询获取Message呢,两中情况:

a). MessageQueue有消息,但是现在还不能处理,也就是这些消息可能是延时消息,需要到达时间之后再拿出来。那么等待的这段时间,就可以通过PollOnce等待;这里面有一点关键的地方是:MessageQueue中所有的消息都是按照时间先后顺序排列的,这样保证前面的消息等待不会防碍后面消息的处理;

b). MessageQueue没有消息,那么也没必要一直忙碌的去取消息,而是等到到有消息来的时候,我们再去取。而这个等待也是由Pollonce完成;

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
checkLooperDebug();
int result = 0;
for (;;) {
while (mResponseIndex < mResponses.size()) { //我们假设是从第一次进入这个函数,俺么下面的这个循环体不会进入。
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
if (ident >= 0) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
if (outFd != NULL) *outFd = fd;
if (outEvents != NULL) *outEvents = events;
if (outData != NULL) *outData = data;
return ident;
}
}

if (result != 0) {
if (outFd != NULL) *outFd = 0;
if (outEvents != NULL) *outEvents = 0;
if (outData != NULL) *outData = NULL;
return result;
}

result = pollInner(timeoutMillis); //调用了核心pollInner
}
}


int Looper::pollInner(int timeoutMillis) {

// Adjust the timeout based on when the next message is due.
/* 下面这段计算时间的代码,可以这么理解: Looper需要处理两个时间,一个是MessageQueue提供的,timeoutMillis,它告诉Looper可以等待
* 多长时间,二是mNextMessageUptime,这是其内部的消息的等待时间;而这两个等待在一个流水线上;那么为了防止一个事件等待超时,必须以最小的
* 那个等待时间作为基准; 这段代码就是计算出最小的等待时间。然后更新到timeoutMillis。
*/
if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
if (messageTimeoutMillis >= 0
&& (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
timeoutMillis = messageTimeoutMillis;
}
}

// Poll.
int result = ALOOPER_POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;

struct epoll_event eventItems[EPOLL_MAX_EVENTS];
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
//timeoutMillis由MessageQueue提供过来,epoll_wait将会等待eventItems
// Acquire lock.
mLock.lock(); //锁住

// Check for poll error.
if (eventCount < 0) { //出错
if (errno == EINTR) {
goto Done;
}
ALOGW("Poll failed with an unexpected error, errno=%d", errno);
result = ALOOPER_POLL_ERROR;
goto Done;
}

// Check for poll timeout.
if (eventCount == 0) { //超时返回
result = ALOOPER_POLL_TIMEOUT; //表示超时了
goto Done;
}

// Handle all events.
for (int i = 0; i < eventCount; i++) { //有事件到来
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeReadPipeFd) {
if (epollEvents & EPOLLIN) { //出发的FD是WakeReadPipeFd且类型是EPOLLIN :默示对应的文件描述符可以读;
awoken();//进入到awoke, 其实awoke并没干什么事,只是将ReadPipeFd里面的东西消耗掉,其内容并没有实际用途
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
}
} else {
ssize_t requestIndex = mRequests.indexOfKey(fd);//其他的Fd,如果是其他的Fd,那么将Requet和events封装成一个response,保留起来
if (requestIndex >= 0) { //找到了之前注册的fd的request
int events = 0;
if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;
pushResponse(events, mRequests.valueAt(requestIndex));
} else { //没有注册的fd到达,出错了。
ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
}
}
}
Done: ;

// Invoke pending message callbacks.
mNextMessageUptime = LLONG_MAX;
/*接下来这部分代码是为了处理Looper中Message信息,属于Jni层的消息;mMessageEnvelopes存放接收到的消息,而在这个轮询中
*将一一处理掉;
*/
while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
// Remove the envelope from the list.
// We keep a strong reference to the handler until the call to handleMessage
// finishes. Then we drop it so that the handler can be deleted *before*
// we reacquire our lock.
{ // obtain handler
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock();
handler->handleMessage(message);//派发消息
} // release handler

mLock.lock();
mSendingMessage = false;
result = ALOOPER_POLL_CALLBACK; //表示触发了callback
} else {
// The last message left at the head of the queue determines the next wakeup time.
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}

// Release lock.
mLock.unlock(); //解锁
/* 处理接下里的response,这写Responce是有回调的,那么通过Callback处理掉;
*
*/
// Invoke all response callbacks. //处理Response的毁掉
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
if (response.request.ident == ALOOPER_POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
int callbackResult = response.request.callback->handleEvent(fd, events, data);
if (callbackResult == 0) {
removeFd(fd);
}
// Clear the callback reference in the response structure promptly because we
// will not clear the response vector itself until the next poll.
response.request.callback.clear();
result = ALOOPER_POLL_CALLBACK; //表示触犯了callback
}
}
/*Returns a value >= 0 containing an identifier if its file descriptor has data
* and it has no callback function*/
return result;
}


void Looper::awoken() {
#if DEBUG_POLL_AND_WAKE
if (looperDebug)
ALOGD("%p ~ awoken", this);
#endif

char buffer[16];
ssize_t nRead;
do {
nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
} while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
}






现在总结下Pollonce的工作:

a). 首先检查Responce,如果发现有responce存在,那么返回该responce的ident;

b). 进入到pollinner, 根据timeoutmill和NextMessageUptime计算出应该等待的时间;

c). 调用epoll_wait 等待事件触发;

d). 若epoll_wait返回出错或者超时,那么进入到Gone,处理Jni的Message;

e). 若epoll_wait返回值大于0;表示有事件触发了;若是ReadPipeFd。那么消耗掉fd的信息;

f). 若是其他注册过的fd,那么封装成responce,然后保存下来;

g). 处理掉内部的Message信息;

h). 处理掉Responce中带有callback的responce;

i). 返回, 再次进入Pollonce的for循环中,这时候若发现Resonce非空,查找ident,若ident>0,那么将outEvent和outData,OutFd填充,返回ident,否则返回result;

        while (mResponseIndex < mResponses.size()) {
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
if (ident >= 0) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
if (outFd != NULL) *outFd = fd;
if (outEvents != NULL) *outEvents = events;
if (outData != NULL) *outData = data;
return ident;
}
}