7.4.2 循环监听消息
循环监听消息由MessageQueue的next方法实现,其代码如下:
final Message next(){
int pendingIdleHandlerCount=-1;//IdleHandler的数量
int nextPollTimeoutMillis=0;//空闲等待时间
for(;){
if(nextPollTimeoutMillis!=0){
Binder.flushPendingCommands();
}
//调用Native方法,传入NativeMessageQueue的地址
nativePollOnce(mPtr, nextPollTimeoutMillis);
synchronized(this){
if(mQuiting){
return null;
}
final long now=SystemClock.uptimeMillis();
Message prevMsg=null;
Message msg=mMessages;
if(msg!=null&&msg.target==null){
do{
prevMsg=msg;
msg=msg.next;
}while(msg!=null&&!msg.isAsynchronous());
}
if(msg!=null){
if(now<msg.when){
//未到消息处理时间,设置等待时间
nextPollTimeoutMillis=(int)Math.min(msg.when-now,
Integer.MAX_VALUE);
}else{
mBlocked=false;//消息需要立即处理
if(prevMsg!=null){
prevMsg.next=msg.next;
}else{
mMessages=msg.next;
}
msg.next=null;
msg.markInUse();//设置消息已使用标记
return msg;
}
}else{//无消息
nextPollTimeoutMillis=-1;
}
//消息队列中无消息或者当前消息未到处理时间时获取IdleHandler数量
if(pendingIdleHandlerCount<0
&&(mMessages==null||now<mMessages.when)){
pendingIdleHandlerCount=mIdleHandlers.size();
}
//未注册IdleHandler时,设置当前线程blocked状态为true
if(pendingIdleHandlerCount<=0){
mBlocked=true;
continue;
}
if(mPendingIdleHandlers==null){
mPendingIdleHandlers=new IdleHandler[
Math.max(pendingIdleHandlerCount,4)];
}
mPendingIdleHandlers=mIdleHandlers.toArray(
mPendingIdleHandlers);
}
for(int i=0;i<pendingIdleHandlerCount;i++){
……//空闲消息处理,下一节分析
}
pendingIdleHandlerCount=0;
nextPollTimeoutMillis=0;
}
}
next方法所做的工作分为三部分:
1)调用nativePollOnce。
2)从消息队列中获取一个消息。
3)当消息队列中无消息或者当前消息未到处理时间时,获取空闲处理器IdleHandler数量,此时将进入空闲消息处理阶段。
从消息队列中获取消息返回的过程很简单,消息返回后进入消息处理阶段,这部分内容连同空闲消息处理都放在消息处理一节分析。下面只分析消息监控的核心实现:nativePollOnce。
Jave层的nativePollOnce方法是个Native方法,其JNI层实现方法位于frameworks/base/core/jni/android_os_MessageQueue.cpp中,代码如下:
static void android_os_MessageQueue_nativePollOnce(JNIEnv*env,
jobject obj, jint ptr, jint timeoutMillis){
//根据Java层MessageQueue中存储的mPtr找到JNI层的NativeMessageQueue
NativeMessageQueue*nativeMessageQueue=
reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, timeoutMillis);
}
参数obj代表Java层的MessageQueue对象,参数ptr代表MessageQueue对象的mPtr成员变量,mPtr保存的是JNI层NativeMessageQueue的地址,因此这里直接将该地址转换为NativeMessageQueue*。接下来调用其pollOnce方法,代码如下:
void NativeMessageQueue:pollOnce(JNIEnv*env, int timeoutMillis){
mInCallback=true;
mLooper->pollOnce(timeoutMillis);//将请求转发给JNI层的Looper对象
mInCallback=false;
if(mExceptionObj){//这里并不关心pollOnce的返回值,只检查异常
env->Throw(mExceptionObj);
env->DeleteLocalRef(mExceptionObj);
mExceptionObj=NULL;
}
}
NativeMessageQueue的pollOnce方法将请求转发给JNI层的Looper对象,其代码如下:
inline int pollOnce(int timeoutMillis){
return pollOnce(timeoutMillis, NULL, NULL, NULL);
}
int Looper:pollOnce(int timeoutMillis, intoutFd, intoutEvents, void**outData){
int result=0;
for(;){
/*处理Response, mResponses定义为Vector<Response>mResponses。
*mResponses由Looper:pushResponse方法填充。
*创建Looper对象时在初始化列表中将mResponseIndex赋值为0。
*while循环中只是返回了Response对应请求的标识信息ident,
而Response的处理由下文的pollInner方法完成/
while(mResponseIndex<mResponses.size()){
const Response&response=mResponses.itemAt(mResponseIndex++);
int ident=response.request.ident;
if(ident>=0){
int fd=response.request.fd;
int events=response.events;
void*data=response.request.data;
if(outFd!=NULL)*outFd=fd;
if(outEvents!=NULL)*outEvents=events;
if(outData!=NULL)*outData=data;
return ident;
}
}
if(result!=0){//result由pollInner赋值,默认为0
if(outFd!=NULL)*outFd=0;
if(outEvents!=NULL)*outEvents=0;
if(outData!=NULL)*outData=NULL;
return result;
}
result=pollInner(timeoutMillis);
}
}
接下来分析pollInner方法,代码如下:
int Looper:pollInner(int timeoutMillis){
……
int result=ALOOPER_POLL_WAKE;
mResponses.clear();
mResponseIndex=0;
struct epoll_event eventItems[EPOLL_MAX_EVENTS];//epoll事件
/*这里使用Linux的epoll机制,调用epoll_wait监听mEpollFd上的事件,如果
没有事件发生,将在epoll_wait中等待,wait的时间由timeoutMillis指定/
int eventCount=epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS,
timeoutMillis);
mLock.lock();
……//出错或者超时情况下,直接跳转到下面的Done标签处
//处理所有事件
for(int i=0;i<eventCount;i++){
int fd=eventItems[i].data.fd;//取出事件中的文件描述符信息
uint32_t epollEvents=eventItems[i].events;
if(fd==mWakeReadPipeFd){//管道读端对应的文件描述符
if(epollEvents&EPOLLIN){//对应管道上的读事件,调用awoken
awoken();
}else{//忽略其他类型的epoll events,并打印错误
}
}else{//监听的其他文件描述符的请求,以Request表示
ssize_t requestIndex=mRequests.indexOfKey(fd);
if(requestIndex>=0){
int events=0;
if(epollEvents&EPOLLIN)events|=ALOOPER_EVENT_INPUT;
if(epollEvents&EPOLLOUT)events|=ALOOPER_EVENT_OUTPUT;
if(epollEvents&EPOLLERR)events|=ALOOPER_EVENT_ERROR;
if(epollEvents&EPOLLHUP)events|=ALOOPER_EVENT_HANGUP;
//这里并不立即处理,而是先存入mResponses数组中
pushResponse(events, mRequests.valueAt(requestIndex));
}else{//忽略其他类型的epoll events
}
}
}
Done:;//具体处理Message和Request
mNextMessageUptime=LLONG_MAX;
while(mMessageEnvelopes.size()!=0){//处理Native层的消息
nsecs_t now=systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope&messageEnvelope=mMessageEnvelopes.itemAt(0);
if(messageEnvelope.uptime<=now){
{//获取消息处理器
sp<MessageHandler>handler=messageEnvelope.handler;
Message message=messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage=true;
mLock.unlock();
handler->handleMessage(message);//处理消息
}//释放消息处理器
mLock.lock();
mSendingMessage=false;
result=ALOOPER_POLL_CALLBACK;
}else{
//消息队列中最后一条消息的执行时间决定下次wakeup的时间
mNextMessageUptime=messageEnvelope.uptime;
break;
}
}
mLock.unlock();
//触发Reponse的callback回调函数处理Request
for(size_t i=0;i<mResponses.size();i++){
Response&response=mResponses.editItemAt(i);
if(response.request.ident==ALOOPER_POLL_CALLBACK){
int fd=response.request.fd;
int events=response.events;
void*data=response.request.data;
int callbackResult=response.request.callback->
handleEvent(fd, events, data);//处理消息
if(callbackResult==0){
removeFd(fd);
}
response.request.callback.clear();
result=ALOOPER_POLL_CALLBACK;
}
}
return result;
}
Looper:pollInner首先会调用epoll_wait监控mEpollFd上的事件。一旦有事件发生,会根据事件来源做出不同处理。事件来源分为两部分:mWakeReadPipeFd和其他Fd。
对于mWakeReadPipeFd上的EPOLLIN事件,直接调用awoken函数。对于其他事件,则只打印出错误信息,并不予以处理。
对于其他Fd上的事件,则以Request的形式封装事件,并调用pushResponse函数将其暂时存入mResponses数组中。
上述流程执行完毕后,进入Done标签处,开始处理Message和Request。这里的Message只包含Native层的Message,并不包含Java层的Message。对于Native层的Message,会遍历mMessageEnvelopes中存储的每一个messageEnvelope,进而调用messageEnvelope的handleMessage方法处理消息。
对于Request,则是遍历mResponses,取出请求后,调用其handleEvent方法处理Request。
接下来分析awoken方法,代码如下:
void Looper:awoken(){
char buffer[16];
ssize_t nRead;
do{//从管道读端读取管道数据,即管道写端写入的字符串"W"
nRead=read(mWakeReadPipeFd, buffer, sizeof(buffer));
}while((nRead==-1&&errno==EINTR)||nRead==sizeof(buffer));
}
可见,pollInner中同时处理Message和Request。那么Request又是在哪里添加的呢?Looper提供了addFd方法用于添加监控请求,代码如下:
int Looper:addFd(int fd, int ident, int events,
ALooper_callbackFunc callback, void*data){
return addFd(fd, ident, events, callback?
new SimpleLooperCallback(callback):NULL, data);
}
int Looper:addFd(int fd, int ident, int events,
const sp<LooperCallback>&callback, void*data){
if(!callback.get()){//未指定callback
if(!mAllowNonCallbacks){//需要指定callback,否则无法处理请求
return-1;
}
if(ident<0){//ident不能小于0
return-1;
}
}else{
ident=ALOOPER_POLL_CALLBACK;
}
int epollEvents=0;
if(events&ALOOPER_EVENT_INPUT)epollEvents|=EPOLLIN;
if(events&ALOOPER_EVENT_OUTPUT)epollEvents|=EPOLLOUT;
{//acquire lock
AutoMutex_l(mLock);
//封装Request请求
Request request;
request.fd=fd;//请求来源
request.ident=ident;//标记一个请求
request.callback=callback;//请求回调处理函数
request.data=data;//回调处理函数的参数
//epoll事件
struct epoll_event eventItem;
memset(&eventItem,0,sizeof(epoll_event));
eventItem.events=epollEvents;
eventItem.data.fd=fd;
//以fd作为请求的key,检索mRequests中是否已经存在该请求
ssize_t requestIndex=mRequests.indexOfKey(fd);
if(requestIndex<0){
//添加新的fd监控
int epollResult=epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd,&eventItem);
if(epollResult<0){
return-1;
}
mRequests.add(fd, request);//向请求数组中添加新的请求
}else{
int epollResult=epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd,&eventItem);
if(epollResult<0){
return-1;
}
mRequests.replaceValueAt(requestIndex, request);//更新旧的请求
}
}//释放锁
return 1;
}
Looper提供了addFd方法用于监控指定Fd上的Request,这部分内容主要应用在Input、Sensor和NativeActivity中。