7.4.2 循环监听消息

循环监听消息由MessageQueue的next方法实现,其代码如下:


final Message next(){

int pendingIdleHandlerCount=-1;//IdleHandler的数量

int nextPollTimeoutMillis=0;//空闲等待时间

for(;){

if(nextPollTimeoutMillis!=0){

Binder.flushPendingCommands();

}

//调用Native方法,传入NativeMessageQueue的地址

nativePollOnce(mPtr, nextPollTimeoutMillis);

synchronized(this){

if(mQuiting){

return null;

}

final long now=SystemClock.uptimeMillis();

Message prevMsg=null;

Message msg=mMessages;

if(msg!=null&&msg.target==null){

do{

prevMsg=msg;

msg=msg.next;

}while(msg!=null&&!msg.isAsynchronous());

}

if(msg!=null){

if(now<msg.when){

//未到消息处理时间,设置等待时间

nextPollTimeoutMillis=(int)Math.min(msg.when-now,

Integer.MAX_VALUE);

}else{

mBlocked=false;//消息需要立即处理

if(prevMsg!=null){

prevMsg.next=msg.next;

}else{

mMessages=msg.next;

}

msg.next=null;

msg.markInUse();//设置消息已使用标记

return msg;

}

}else{//无消息

nextPollTimeoutMillis=-1;

}

//消息队列中无消息或者当前消息未到处理时间时获取IdleHandler数量

if(pendingIdleHandlerCount<0

&&(mMessages==null||now<mMessages.when)){

pendingIdleHandlerCount=mIdleHandlers.size();

}

//未注册IdleHandler时,设置当前线程blocked状态为true

if(pendingIdleHandlerCount<=0){

mBlocked=true;

continue;

}

if(mPendingIdleHandlers==null){

mPendingIdleHandlers=new IdleHandler[

Math.max(pendingIdleHandlerCount,4)];

}

mPendingIdleHandlers=mIdleHandlers.toArray(

mPendingIdleHandlers);

}

for(int i=0;i<pendingIdleHandlerCount;i++){

……//空闲消息处理,下一节分析

}

pendingIdleHandlerCount=0;

nextPollTimeoutMillis=0;

}

}


next方法所做的工作分为三部分:

1)调用nativePollOnce。

2)从消息队列中获取一个消息。

3)当消息队列中无消息或者当前消息未到处理时间时,获取空闲处理器IdleHandler数量,此时将进入空闲消息处理阶段。

从消息队列中获取消息返回的过程很简单,消息返回后进入消息处理阶段,这部分内容连同空闲消息处理都放在消息处理一节分析。下面只分析消息监控的核心实现:nativePollOnce。

Jave层的nativePollOnce方法是个Native方法,其JNI层实现方法位于frameworks/base/core/jni/android_os_MessageQueue.cpp中,代码如下:


static void android_os_MessageQueue_nativePollOnce(JNIEnv*env,

jobject obj, jint ptr, jint timeoutMillis){

//根据Java层MessageQueue中存储的mPtr找到JNI层的NativeMessageQueue

NativeMessageQueue*nativeMessageQueue=

reinterpret_cast<NativeMessageQueue*>(ptr);

nativeMessageQueue->pollOnce(env, timeoutMillis);

}


参数obj代表Java层的MessageQueue对象,参数ptr代表MessageQueue对象的mPtr成员变量,mPtr保存的是JNI层NativeMessageQueue的地址,因此这里直接将该地址转换为NativeMessageQueue*。接下来调用其pollOnce方法,代码如下:


void NativeMessageQueue:pollOnce(JNIEnv*env, int timeoutMillis){

mInCallback=true;

mLooper->pollOnce(timeoutMillis);//将请求转发给JNI层的Looper对象

mInCallback=false;

if(mExceptionObj){//这里并不关心pollOnce的返回值,只检查异常

env->Throw(mExceptionObj);

env->DeleteLocalRef(mExceptionObj);

mExceptionObj=NULL;

}

}


NativeMessageQueue的pollOnce方法将请求转发给JNI层的Looper对象,其代码如下:


inline int pollOnce(int timeoutMillis){

return pollOnce(timeoutMillis, NULL, NULL, NULL);

}

int Looper:pollOnce(int timeoutMillis, intoutFd, intoutEvents, void**outData){

int result=0;

for(;){

/*处理Response, mResponses定义为Vector<Response>mResponses。

*mResponses由Looper:pushResponse方法填充。

*创建Looper对象时在初始化列表中将mResponseIndex赋值为0。

*while循环中只是返回了Response对应请求的标识信息ident,

而Response的处理由下文的pollInner方法完成/

while(mResponseIndex<mResponses.size()){

const Response&response=mResponses.itemAt(mResponseIndex++);

int ident=response.request.ident;

if(ident>=0){

int fd=response.request.fd;

int events=response.events;

void*data=response.request.data;

if(outFd!=NULL)*outFd=fd;

if(outEvents!=NULL)*outEvents=events;

if(outData!=NULL)*outData=data;

return ident;

}

}

if(result!=0){//result由pollInner赋值,默认为0

if(outFd!=NULL)*outFd=0;

if(outEvents!=NULL)*outEvents=0;

if(outData!=NULL)*outData=NULL;

return result;

}

result=pollInner(timeoutMillis);

}

}


接下来分析pollInner方法,代码如下:


int Looper:pollInner(int timeoutMillis){

……

int result=ALOOPER_POLL_WAKE;

mResponses.clear();

mResponseIndex=0;

struct epoll_event eventItems[EPOLL_MAX_EVENTS];//epoll事件

/*这里使用Linux的epoll机制,调用epoll_wait监听mEpollFd上的事件,如果

没有事件发生,将在epoll_wait中等待,wait的时间由timeoutMillis指定/

int eventCount=epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS,

timeoutMillis);

mLock.lock();

……//出错或者超时情况下,直接跳转到下面的Done标签处

//处理所有事件

for(int i=0;i<eventCount;i++){

int fd=eventItems[i].data.fd;//取出事件中的文件描述符信息

uint32_t epollEvents=eventItems[i].events;

if(fd==mWakeReadPipeFd){//管道读端对应的文件描述符

if(epollEvents&EPOLLIN){//对应管道上的读事件,调用awoken

awoken();

}else{//忽略其他类型的epoll events,并打印错误

}

}else{//监听的其他文件描述符的请求,以Request表示

ssize_t requestIndex=mRequests.indexOfKey(fd);

if(requestIndex>=0){

int events=0;

if(epollEvents&EPOLLIN)events|=ALOOPER_EVENT_INPUT;

if(epollEvents&EPOLLOUT)events|=ALOOPER_EVENT_OUTPUT;

if(epollEvents&EPOLLERR)events|=ALOOPER_EVENT_ERROR;

if(epollEvents&EPOLLHUP)events|=ALOOPER_EVENT_HANGUP;

//这里并不立即处理,而是先存入mResponses数组中

pushResponse(events, mRequests.valueAt(requestIndex));

}else{//忽略其他类型的epoll events

}

}

}

Done:;//具体处理Message和Request

mNextMessageUptime=LLONG_MAX;

while(mMessageEnvelopes.size()!=0){//处理Native层的消息

nsecs_t now=systemTime(SYSTEM_TIME_MONOTONIC);

const MessageEnvelope&messageEnvelope=mMessageEnvelopes.itemAt(0);

if(messageEnvelope.uptime<=now){

{//获取消息处理器

sp<MessageHandler>handler=messageEnvelope.handler;

Message message=messageEnvelope.message;

mMessageEnvelopes.removeAt(0);

mSendingMessage=true;

mLock.unlock();

handler->handleMessage(message);//处理消息

}//释放消息处理器

mLock.lock();

mSendingMessage=false;

result=ALOOPER_POLL_CALLBACK;

}else{

//消息队列中最后一条消息的执行时间决定下次wakeup的时间

mNextMessageUptime=messageEnvelope.uptime;

break;

}

}

mLock.unlock();

//触发Reponse的callback回调函数处理Request

for(size_t i=0;i<mResponses.size();i++){

Response&response=mResponses.editItemAt(i);

if(response.request.ident==ALOOPER_POLL_CALLBACK){

int fd=response.request.fd;

int events=response.events;

void*data=response.request.data;

int callbackResult=response.request.callback->

handleEvent(fd, events, data);//处理消息

if(callbackResult==0){

removeFd(fd);

}

response.request.callback.clear();

result=ALOOPER_POLL_CALLBACK;

}

}

return result;

}


Looper:pollInner首先会调用epoll_wait监控mEpollFd上的事件。一旦有事件发生,会根据事件来源做出不同处理。事件来源分为两部分:mWakeReadPipeFd和其他Fd。

对于mWakeReadPipeFd上的EPOLLIN事件,直接调用awoken函数。对于其他事件,则只打印出错误信息,并不予以处理。

对于其他Fd上的事件,则以Request的形式封装事件,并调用pushResponse函数将其暂时存入mResponses数组中。

上述流程执行完毕后,进入Done标签处,开始处理Message和Request。这里的Message只包含Native层的Message,并不包含Java层的Message。对于Native层的Message,会遍历mMessageEnvelopes中存储的每一个messageEnvelope,进而调用messageEnvelope的handleMessage方法处理消息。

对于Request,则是遍历mResponses,取出请求后,调用其handleEvent方法处理Request。

接下来分析awoken方法,代码如下:


void Looper:awoken(){

char buffer[16];

ssize_t nRead;

do{//从管道读端读取管道数据,即管道写端写入的字符串"W"

nRead=read(mWakeReadPipeFd, buffer, sizeof(buffer));

}while((nRead==-1&&errno==EINTR)||nRead==sizeof(buffer));

}


可见,pollInner中同时处理Message和Request。那么Request又是在哪里添加的呢?Looper提供了addFd方法用于添加监控请求,代码如下:


int Looper:addFd(int fd, int ident, int events,

ALooper_callbackFunc callback, void*data){

return addFd(fd, ident, events, callback?

new SimpleLooperCallback(callback):NULL, data);

}

int Looper:addFd(int fd, int ident, int events,

const sp<LooperCallback>&callback, void*data){

if(!callback.get()){//未指定callback

if(!mAllowNonCallbacks){//需要指定callback,否则无法处理请求

return-1;

}

if(ident<0){//ident不能小于0

return-1;

}

}else{

ident=ALOOPER_POLL_CALLBACK;

}

int epollEvents=0;

if(events&ALOOPER_EVENT_INPUT)epollEvents|=EPOLLIN;

if(events&ALOOPER_EVENT_OUTPUT)epollEvents|=EPOLLOUT;

{//acquire lock

AutoMutex_l(mLock);

//封装Request请求

Request request;

request.fd=fd;//请求来源

request.ident=ident;//标记一个请求

request.callback=callback;//请求回调处理函数

request.data=data;//回调处理函数的参数

//epoll事件

struct epoll_event eventItem;

memset(&eventItem,0,sizeof(epoll_event));

eventItem.events=epollEvents;

eventItem.data.fd=fd;

//以fd作为请求的key,检索mRequests中是否已经存在该请求

ssize_t requestIndex=mRequests.indexOfKey(fd);

if(requestIndex<0){

//添加新的fd监控

int epollResult=epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd,&eventItem);

if(epollResult<0){

return-1;

}

mRequests.add(fd, request);//向请求数组中添加新的请求

}else{

int epollResult=epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd,&eventItem);

if(epollResult<0){

return-1;

}

mRequests.replaceValueAt(requestIndex, request);//更新旧的请求

}

}//释放锁

return 1;

}


Looper提供了addFd方法用于监控指定Fd上的Request,这部分内容主要应用在Input、Sensor和NativeActivity中。