2.3.3 nativePollOnce函数分析

nativePollOnce的实现函数是android_os_MessageQueue_nativePollOnce,代码如下:

[—>android_os_MessageQueue.cpp:android_os_MessageQueue_native Pollonce]


static void android_os_MessageQueue_nativePollOnce(JNIEnv*env, jobject obj,

jint ptr, jint timeoutMillis)

NativeMessageQueue*nativeMessageQueue=

reinterpret_cast<NativeMessageQueue*>(ptr);

//取出NativeMessageQueue对象,并调用它的pollOnce

nativeMessageQueue->pollOnce(timeoutMillis);

}

//分析pollOnce函数

void NativeMessageQueue:pollOnce(int timeoutMillis){

mLooper->pollOnce(timeoutMillis);//重任传递到Looper的pollOnce函数

}


Looper的pollOnce函数如下:

[—>Looper.cpp:pollOnce]


inline int pollOnce(int timeoutMillis){

return pollOnce(timeoutMillis, NULL, NULL, NULL);

}


上面的函数将调用另外一个有4个参数的pollOnce函数,这个函数的原型如下:


int pollOnce(int timeoutMillis, intoutFd, intoutEvents, void**outData)


其中:

timeoutMillis参数为超时等待时间。如果值为-1,则表示无限等待,直到有事件发生为止。如果值为0,则无须等待立即返回。

outFd用来存储发生事件的那个文件描述符[1]

outEvents用来存储在该文件描述符上发生了哪些事件,目前支持可读、可写、错误和中断4个事件。这4个事件其实是从epoll事件转化而来的。后面我们会介绍大名鼎鼎的epoll。

outData用于存储上下文数据,这个上下文数据是由用户在添加监听句柄时传递的,它的作用和pthread_create函数最后一个参数param一样,用来传递用户自定义的数据。另外,pollOnce函数的返回值也具有特殊的意义,具体如下:

当返回值为ALOOPER_POLL_WAKE时,表示这次返回是由wake函数触发的,也就是管道写端的那次写事件触发的。

返回值为ALOOPER_POLL_TIMEOUT表示等待超时。

返回值为ALOOPER_POLL_ERROR表示等待过程中发生错误。

返回值为ALOOPER_POLL_CALLBACK表示某个被监听的句柄因某种原因被触发。这时,outFd参数用于存储发生事件的文件句柄,outEvents用于存储所发生的事件。上面这些知识是和epoll息息相关的。

提示 查看Looper的代码会发现,Looper采用了编译选项(即#if和#else)来控制是否使用epoll作为I/O复用的控制中枢。鉴于现在大多数系统都支持epoll,这里仅讨论使用epoll的情况。

1.epoll基础知识

epoll机制提供了Linux平台上最高效的I/O复用机制,因此有必要介绍一下它的基础知识。

从调用方法上看,epoll的用法和select/poll非常类似,其主要作用就是I/O复用,即在一个地方等待多个文件句柄的I/O事件。

下面通过一个简单的例子来分析epoll的工作流程。

[—>epoll工作流程分析案例]


/*

使用epoll前,需要先通过epoll_create函数创建一个epoll句柄。

下面一行代码中的10表示该epoll句柄初次创建的时候分配能容纳10个fd相关信息的缓存。

对于2.6.8版本以后的内核,该值没有实际作用,这里可以忽略。其实这个值的主要目的是

确定分配一块多大的缓存。现在的内核都支持动态拓展这块缓存,所以该值就没有意义了

*/

int epollHandle=epoll_create(10);

/*

得到epoll句柄后,下一步就是通过epoll_ctl把需要监听的文件句柄加入到epoll句柄中。

除了指定文件句柄本身的fd值外,同时还需要指定在该fd上等待什么事件。epoll支持4类事件,

分别是EPOLLIN(句柄可读)、EPOLLOUT(句柄可写)、EPOLLERR(句柄错误)、EPOLLHUP(句柄中断)。

epoll定义了一个结构体struct epoll_event来表达监听句柄的诉求。

假设现在有一个监听端的socket句柄listener,要把它加入到epoll句柄中

*/

struct epoll_event listenEvent;//先定义一个event

/*

EPOLLIN表示可读事件,EPOLLOUT表示可写事件,另外还有EPOLLERR, EPOLLHUP

系统默认会将EPOLLERR加入到事件集合中

*/

listenEvent.events=EPOLLIN;//指定该句柄的可读事件

//epoll_event中有一个联合体称为data,用来存储上下文数据,本例的上下文数据就是句柄自己

listenEvent.data.fd=listenEvent;

/*

EPOLL_CTL_ADD将监听fd和监听事件加入到epoll句柄的等待队列中;

EPOLL_CTL_DEL将监听fd从epoll句柄中移除;

EPOLL_CTL_MOD修改监听fd的监听事件,例如本来只等待可读事件,现在需要同时等待

可写事件,那么修改listenEvent.events为EPOLLIN|EPOLLOUT后,再传给epoll句柄

*/

epoll_ctl(epollHandle, EPOLL_CTL_ADD, listener,&listenEvent);

/*

当把所有感兴趣的fd都加入到epoll句柄后,就可以开始坐等感兴趣的事情发生了。

为了接收所发生的事情,先定义一个epoll_event数组

*/

struct epoll_event resultEvents[10];

int timeout=-1;

while(1){

/*

调用epoll_wait用于等待事件,其中timeout可以指定一个超时时间,

resultEvents用于接收发生的事件,10为该数组的大小。

epoll_wait函数的返回值有如下含义:

nfds大于0表示所监听的句柄上有事件发生;

nfds等于0表示等待超时;

nfds小于0表示等待过程中发生了错误

*/

int nfds=epoll_wait(epollHandle, resultEvents,10,timeout);

if(nfds==-1)

{

//epoll_wait发生了错误

}

else if(nfds==0)

{

//发生超时,期间没有发生任何事件

}

else

{

//resultEvents用于返回那些发生了事件的信息

for(int i=0;i<nfds;i++)

{

struct epoll_event&event=resultEvents[i];

if(event&EPOLLIN)

{

/*

收到可读事件。到底是哪个文件句柄发生该事件呢?可通过event.data这个联合体取得

之前传递给epoll的上下文数据,该上下文数据可用于判断到底是谁发生了事件

*/

}

……//其他处理

}

}

}


epoll整体使用流程如上面代码所示,基本和select/poll类似,作为Linux平台最高效的I/O复用机制,这里有些内容供读者参考。

epoll的效率为什么会比select高?其中一个原因是调用方法。每次调用select时,需要把感兴趣的事件都复制到内核中,而epoll只在epll_ctl进行加入操作的时候才复制一次。另外,epoll内部用于保存事件的数据结构使用的是红黑树,查找速度很快。而select采用数组保存信息,不但一次能等待的句柄个数有限,并且查找起来速度很慢(等待的事件较多时会这样)。当然,在只等待少量文件句柄时,select和epoll效率相差不是很多,但笔者还是推荐使用epoll。

epoll等待的事件有两种触发条件,一个是水平触发(EPOLLLEVEL),另外一个是边缘触发(EPOLLET, ET为Edge Trigger之意),这两种触发条件的区别非常重要。读者可通过man epoll查阅系统提供的更为详细的epoll机制去了解。

最后,关于pipe,笔者还想提出一个小问题供读者思考讨论:为什么Android中使用pipe作为线程间通信的方式?对于pipe的写端写入的数据,读端都不感兴趣,只是为了简单的唤醒。POSIX不是也有线程间同步函数吗?为什么要用pipe呢?关于这个问题的答案,可参见笔者一篇博文“随笔之如何实现一个线程池”,笔者的博客地址是http://blog.csdn.net/innost。

2.pollOnce函数分析

下面分析带4个参数的pollOnce函数,代码如下:

[—>Looper.cpp:pollOnce]


int Looper:pollOnce(int timeoutMillis, intoutFd, intoutEvents,

void**outData){

int result=0;

for(;){//一个无限循环

//mResponses是一个Vector,这里首先需要处理response

while(mResponseIndex<mResponses.size()){

const Response&response=mResponses.itemAt(mResponseIndex++);

ALooper_callbackFunc callback=response.request.callback;

if(!callback){//首先处理那些没有callback的response

int ident=response.request.ident;//ident是这个response的id

int fd=response.request.fd;

int events=response.events;

void*data=response.request.data;

……

if(outFd!=NULL)*outFd=fd;

if(outEvents!=NULL)*outEvents=events;

if(outData!=NULL)*outData=data;

//实际上,对于没有callback的response, pollOnce只是返回它的

//ident,并没有实际做什么处理。因为没有callback,所以系统也不知道如何处理

return ident;

}

}

if(result!=0){

if(outFd!=NULL)*outFd=0;

if(outEvents!=NULL)*outEvents=NULL;

if(outData!=NULL)*outData=NULL;

return result;

}

//调用pollInner函数。注意,它在for循环内部

result=pollInner(timeoutMillis);

}

}


初看上面的代码,可能会给人有些丈二和尚摸不着头脑的感觉。但是把pollInner函数分析完毕,大家就会明白了。pollInner函数的实现代码非常长,这里把用于调试和统计的代码去掉,结果如下:

[—>Looper.cpp:pollInner]


int Looper:pollInner(int timeoutMillis){

if(timeoutMillis!=0&&mNextMessageUptime!=LLONG_MAX){

nsecs_t now=systemTime(SYSTEM_TIME_MONOTONIC);

……//根据Native Message的信息计算此次需要等待的时间

timeoutMillis=messageTimeoutMillis;

}

int result=ALOOPER_POLL_WAKE;

mResponses.clear();

mResponseIndex=0;

ifdef LOOPER_USES_EPOLL//我们只讨论使用epoll进行I/O复用的方式

struct epoll_event eventItems[EPOLL_MAX_EVENTS];

//调用epoll_wait,等待感兴趣的事件或超时发生

int eventCount=epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS,

timeoutMillis);

else

……//使用别的方式进行I/O复用

endif

//从epoll_wait返回,这时候一定发生了什么事情

mLock.lock();

if(eventCount<0){//返回值小于零,表示发生错误

if(errno==EINTR){

goto Done;

}

//设置result为ALOOPER_POLL_ERROR,并跳转到Done

result=ALOOPER_POLL_ERROR;

goto Done;

}

//eventCount为零,表示发生超时,因此直接跳转到Done

if(eventCount==0){

result=ALOOPER_POLL_TIMEOUT;

goto Done;

}

ifdef LOOPER_USES_EPOLL

//根据epoll的用法,此时的eventCount表示发生事件的个数

for(int i=0;i<eventCount;i++){

int fd=eventItems[i].data.fd;

uint32_t epollEvents=eventItems[i].events;

/*

之前通过pipe函数创建过两个fd,这里根据fd知道是管道读端有可读事件。

读者还记得对nativeWake函数的分析吗?在那里我们向管道写端写了一个“W”字符,这样

就能触发管道读端从epoll_wait函数返回了

*/

if(fd==mWakeReadPipeFd){

if(epollEvents&EPOLLIN){

//awoken函数直接读取并清空管道数据,读者可自行研究该函数

awoken();

}

……

}else{

/*

mRequests和前面的mResponse相对应,它也是一个KeyedVector,其中存储了

fd和对应的Request结构体,该结构体封装了和监控文件句柄相关的一些上下文信息,

例如回调函数等。我们在后面的小节会再次介绍该结构体

*/

ssize_t requestIndex=mRequests.indexOfKey(fd);

if(requestIndex>=0){

int events=0;

//将epoll返回的事件转换成上层LOOPER使用的事件

if(epollEvents&EPOLLIN)events|=ALOOPER_EVENT_INPUT;

if(epollEvents&EPOLLOUT)events|=ALOOPER_EVENT_OUTPUT;

if(epollEvents&EPOLLERR)events|=ALOOPER_EVENT_ERROR;

if(epollEvents&EPOLLHUP)events|=ALOOPER_EVENT_HANGUP;

//每处理一个Request,就相应构造一个Response

pushResponse(events, mRequests.valueAt(requestIndex));

}

……

}

}

Done:;

else

……

endif

//除了处理Request外,还处理Native的Message,注意,Native的Message和Jave层的Message

无任何关系

mNextMessageUptime=LLONG_MAX;

while(mMessageEnvelopes.size()!=0){

nsecs_t now=systemTime(SYSTEM_TIME_MONOTONIC);

const MessageEnvelope&messageEnvelope=mMessageEnvelopes.itemAt(0);

if(messageEnvelope.uptime<=now){

{

sp<MessageHandler>handler=messageEnvelope.handler;

Message message=messageEnvelope.message;

mMessageEnvelopes.removeAt(0);

mSendingMessage=true;

mLock.unlock();

//调用Native的handler处理Native的Message

//从这里也可看出Native Message和Java层的Message没有什么关系

handler->handleMessage(message);

}

mLock.lock();

mSendingMessage=false;

result=ALOOPER_POLL_CALLBACK;

}else{

mNextMessageUptime=messageEnvelope.uptime;

break;

}

}

mLock.unlock();

//处理那些带回调函数的Response

for(size_t i=0;i<mResponses.size();i++){

const Response&response=mResponses.itemAt(i);

ALooper_callbackFunc callback=response.request.callback;

if(callback){//有了回调函数,就能知道如何处理所发生的事情了

int fd=response.request.fd;

int events=response.events;

void*data=response.request.data;

//调用回调函数处理所发生的事件

int callbackResult=callback(fd, events, data);

if(callbackResult==0){

//callback函数的返回值很重要,如果为0,表明不需要再次监视该文件句柄

removeFd(fd);

}

result=ALOOPER_POLL_CALLBACK;

}

}

return result;

}


看完代码了,是否还有点模糊?那么,回顾一下pollInner函数的几个关键点:

1)首先计算一下真正需要等待的时间。

2)调用epoll_wait函数等待。

3)epoll_wait函数返回,这时候可能有3种情况:

发生错误,则跳转到Done处。

超时,也跳转到Done处。

epoll_wait监测到某些文件句柄上有事件发生。

4)假设epoll_wait因为文件句柄有事件而返回,此时需要根据文件句柄来分别处理:

如果是管道读端发生事件,则认为是控制命令,可以直接读取管道中的数据。

如果是其他fd发生事件,则根据Request构造Response,并push到Response数组中。

5)真正开始处理事件是在有Done标志的位置。

首先处理Native的Message。调用Native Handler的handleMessage处理该Message。

处理Response数组中那些带有callback的事件。

上面的处理流程还是比较清晰的,但还是有一个拦路虎,那就是mRequests,下面就来清剿这个拦路虎。

3.添加监控请求

添加监控请求其实就是调用epoll_ctl增加文件句柄。下面以Native的Activity中的一段代码为例来分析mRequests。

[—>android_app_NativeActivity.cpp:loadNativeCode_native]


static jint

loadNativeCode_native(JNIEnv*env, jobject clazz, jstring path,

jstring funcName, jobject messageQueue,

jstring internalDataDir, jstring obbDir,

jstring externalDataDir, int sdkVersion,

jobject jAssetMgr, jbyteArray savedState)

{

……

/*

调用Looper的addFd函数。第一个参数表示监听的fd;第二个参数0表示ident;

第三个参数表示需要监听的事件,这里只监听可读事件;第四个参数为回调函数,当该fd发生指定事件时,looper将回调该函数;第五个参数code为回调函数的参数

*/

code->looper->addFd(code->mainWorkRead,0,

ALOOPER_EVENT_INPUT, mainWorkCallback, code);

……

}

Looper的addFd函数的实现代码如下:

[—>Looper.cpp:addFd]


int Looper:addFd(int fd, int ident, int events,

ALooper_callbackFunc callback, void*data){

if(!callback){

//判断该Looper是否支持不带回调函数的文件句柄添加。一般不支持,因为没有回调函数

//Looper也不知道如何处理该文件句柄上发生的事情

if(!mAllowNonCallbacks){

return-1;

}

……

}

ifdef LOOPER_USES_EPOLL

int epollEvents=0;

//将用户的事件转换成epoll使用的值

if(events&ALOOPER_EVENT_INPUT)epollEvents|=EPOLLIN;

if(events&ALOOPER_EVENT_OUTPUT)epollEvents|=EPOLLOUT;

{

AutoMutex_l(mLock);

Request request;//创建一个Request对象

request.fd=fd;//保存fd

request.ident=ident;//保存id

request.callback=callback;//保存callback

request.data=data;//保存用户自定义数据

struct epoll_event eventItem;

memset(&eventItem,0,sizeof(epoll_event));

eventItem.events=epollEvents;

eventItem.data.fd=fd;

//判断该Request是否已经存在,mRequests以fd作为key值

ssize_t requestIndex=mRequests.indexOfKey(fd);

if(requestIndex<0){

//如果是新的文件句柄,则需要为epoll增加该fd

int epollResult=epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd,&eventItem);

……

//保存request到mRequests键值数组

mRequests.add(fd, request);

}else{

//如果之前加过,那么就修改该监听句柄的一些信息

int epollResult=epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd,&eventItem);

……

mRequests.replaceValueAt(requestIndex, request);

}

}

else

……

endif

return 1;

}


4.处理监控请求

我们发现在pollInner函数中,当某个监控fd上发生事件后,就会把对应的Request取出来调用,相应代码如下:


pushResponse(events, mRequests.itemAt(i));


PushResponse函数实现代码如下:

[—>Looper.cpp:pushResponse]


void Looper:pushResponse(int events, const Request&request){

Response response;

response.events=events;

response.request=request;//保存所发生的事情和对应的request

mResponses.push(response);//保存到mResponse数组

}


根据前面的知识可知,PoolInner并不是单独处理request,而是需要先收集request,等到Native Message消息处理完之后再做处理。这表明,在处理逻辑上,Native Message的优先级高于监控fd的优先级。

下面介绍如何添加Native的Message。

5.Native的sendMessage

Android 2. 2中只有Java层才可以通过sendMessage往MessageQueue中添加消息,从Android 4.0开始,Native层也支持sendMessage了[2]。sendMessage的代码如下:


[—>Looper.cpp:sendMessage]


void Looper:sendMessage(const sp<MessageHandler>&handler,

const Message&message){

//Native的sendMessage函数必须同时传递一个Handler

nsecs_t now=systemTime(SYSTEM_TIME_MONOTONIC);

sendMessageAtTime(now, handler, message);//调用sendMessageAtTime

}

void Looper:sendMessageAtTime(nsecs_t uptime,

const sp<MessageHandler>&handler,

const Message&message){

size_t i=0;

{

AutoMutex_l(mLock);

size_t messageCount=mMessageEnvelopes.size();

//按时间排序,将消息插到正确的位置上

while(i<messageCount&&

uptime>=mMessageEnvelopes.itemAt(i).uptime){

i+=1;

}

MessageEnvelope messageEnvelope(uptime, handler, message);

mMessageEnvelopes.insertAt(messageEnvelope, i,1);

//mSendingMessage和Java层中的那个mBlocked一样,是一个小小的优化措施

if(mSendingMessage){

return;

}

}

//唤醒epoll_wait,让它处理消息

if(i==0){

wake();

}

}


[1]注意,以后文件描述符也会简称为句柄。

[2]我们这里略过了Android 2.2到Android 4.0之间几个版本中的代码变化。