7.5.2 AsyncTask的执行
异步任务创建完毕后,便调用其execute方法开始执行。异步任务的执行流程如图7-5所示。
图 7-5 异步任务的执行流程
由图7-5可知,调用异步任务的execute方法,进而会调用executeOnExecutor方法,该方法的作用正如其方法名所表示的:放入执行器中执行。
executeOnExecutor方法接收两个参数,其中参数exec用于指定执行器,params是异步任务的参数。默认实现中,执行器由Serial Executor提供,参数parmms则存入mWorker中,最后连同mFuture一起传入SerialExecutor执行器的execute方法。
SerialExecutor的execute方法接收的是Runnable类型的线程参数,FutureTask实现了Runnable接口,因此可以作为该execute的参数。
1.SerialExecutor的execute方法
接下来分析SerialExecutor的execute方法,代码如下:
public abstract class AsyncTask<Params, Progress, Result>{
……
//Executor是一个接口,定义了execute方法
private static class SerialExecutor implements Executor{
//定义一个队列,存储Runnable类型的成员
final ArrayDeque<Runnable>mTasks=new ArrayDeque<Runnable>();
Runnable mActive;
//实现Executor接口定义的execute方法,接收Runnable类型的参数mFuture
public synchronized void execute(final Runnable r){
//再次将mFuture封装到Runnable中,并存入队列mTasks的队尾
mTasks.offer(new Runnable(){
public void run(){
try{
r.run();//在新线程的run方法中调用mFuture的run方法
}finally{
scheduleNext();
}
}
});
if(mActive==null){//当前处理任务为空时,调度下一个任务
scheduleNext();
}
}
protected synchronized void scheduleNext(){
//从mTasks的队首取出一个任务
if((mActive=mTasks.poll())!=null){
//将取出的任务放入线程池中执行
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
……
调用SerialExecutor的execute方法,首先将传入的mFuture封装到Runnable中,然后存入ArrayDeque类型的mTasks任务队列中。接下来便调用scheduleNext方法从mTasks取出一个任务放入THREAD_POOL_EXECUTOR线程池中等待执行。
线程池是AsyncTask的公开静态成员变量,为所有异步任务共享,其初始化代码如下:
public abstract class AsyncTask<Params, Progress, Result>{
……
//线程池的核心大小,即线程池中核心线程数默认为5
private static final int CORE_POOL_SIZE=5;
//线程池的最大值为128
private static final int MAXIMUM_POOL_SIZE=128;
//当线程池中线程数量大于线程池核心大小的时候,指定空闲线程的存活时间
private static final int KEEP_ALIVE=1;
//线程池所需的线程创建工厂,用于创建线程池中的线程
private static final ThreadFactory sThreadFactory=new ThreadFactory(){
private final AtomicInteger mCount=new AtomicInteger(1);
public Thread newThread(Runnable r){
return new Thread(r,"AsyncTask#"+mCount.getAndIncrement());
}
};
//存放线程池中待执行任务的队列
private static final BlockingQueue<Runnable>sPoolWorkQueue=
new LinkedBlockingQueue<Runnable>(10);
//默认线程池,TimeUnit.SECONDS是KEEP_ALIVE的时间,单位为s(秒)
public static final Executor THREAD_POOL_EXECUTOR=
new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE,
KEEP_ALIVE, TimeUnit.SECONDS,
sPoolWorkQueue, sThreadFactory);
……
关于线程池的内容请参考《Java并发编程实战》一书,这里不赘述。
2.ThreadPoolExecutor线程池的execute方法
接下来分析ThreadPoolExecutor线程池的execute方法,代码如下:
public class ThreadPoolExecutor extends AbstractExecutorService{
/*调用ctlOf方法将整数RUNNING和0做或操作,操作结果存入一个整数ctl中。
*RUNNING表示线程池的运行状态(runState)为RUNNING状态,
0表示线程池中处于运行状态的线程数(workerCount)为0/
private final AtomicInteger ctl=new AtomicInteger(ctlOf(RUNNING,0));
//COUNT_BITS的值为29
private static final int COUNT_BITS=Integer.SIZE-3;
/*CAPACITY表示线程池中处于运行状态的线程最大数,
*其二进制形式为0001 1111 1111 1111 1111 1111 1111 1111,
*高3位全为0,低29位全为1,表示一个29位二进制数码能表示的最大值,
这个值即workerCount的最大值/
private static final int CAPACITY=(1<<COUNT_BITS)-1;
/*RUNNING的二进制形式为1110 0000 0000 0000 0000 0000 0000 0000,
即用一个32位整数的高3位111存储线程池的runState,下同/
private static final int RUNNING=-1<<COUNT_BITS;
//0左移29位后仍是0,因此SHUTDOWN的值为0,高3位为000
private static final int SHUTDOWN=0<<COUNT_BITS;
//STOP的二进制形式为0010 0000 0000 0000 0000 0000 0000 0000,高3位为001
private static final int STOP=1<<COUNT_BITS;
//TIDYING的值为0100 0000 0000 0000 0000 0000 0000 0000,高3位为010
private static final int TIDYING=2<<COUNT_BITS;
//TERMINATED的值为0110 0000 0000 0000 0000 0000 0000 0000,高3位为011
private static final int TERMINATED=3<<COUNT_BITS;
/*传入的参数c为ctl,用于获取ctl中存储的runState。
*~CAPACITY的二进制形式为1110 0000 0000 0000 0000 0000 0000 0000。
*c&~CAPACITY操作的结果是将c的低29位置为0,高3位值不变。
已知runState存储在高3位,因此runStateOf返回runState/
private static int runStateOf(int c){return c&~CAPACITY;}
/*传入的参数c为ctl,用于获取ctl中存储的workerCount。
*CAPACITY的二进制形式为0001 1111 1111 1111 1111 1111 1111 1111。
*c&CAPACITY操作的结果是将c的高3位置为0,低29位值不变。
已知workerCount存储在低29位,因此workerCountOf返回workerCount/
private static int workerCountOf(int c){return c&CAPACITY;}
/*ctlOf将两个整数rs和wc做或操作,放入一个整数中。
*rs表示runState, wc表示workerCount。已知runState以32位整数的高3位表
示,因此ctlOf的运算结果以高3位表示runState,以低29位表示workerCount/
private static int ctlOf(int rs, int wc){return rs|wc;}
public void execute(Runnable command){//command即封装后的mFuture
if(command==null)
throw new NullPointerException();
int c=ctl.get();//获取存储runState和workerCount的整数,存入c中
/*workerCountOf获取c中存储的workerCount, corePoolSize表示线程池的核
*心线程数,其值为5。如果线程池当前处于运行状态的线程数小于核心线程数,需要
调用addWorker创建新线程,传入的command将成为新线程的第一个执行任务/
if(workerCountOf(c)<corePoolSize){①
if(addWorker(command, true))
return;
c=ctl.get();
}
/*isRunning方法判断c是否小于SHUTDOWN,在线程池的runState中,只有
*RUNNING小于SHUTDOWN。workQueue在创建ThreadPoolExecutor时指定。这个
if分支的意思是:如果当前线程池处于运行状态,并且工作队列中可以插入任务/
if(isRunning(c)&&workQueue.offer(command)){②
int recheck=ctl.get();//防止有并发操作修改ctl,这里重新检查
/*recheck后,如果是线程池的runState发生改变,即不在
运行状态时,需要将上步中提交到工作队列的任务移除/
if(!isRunning(recheck)&&remove(command))
reject(command);//拒绝运行这个任务
/recheck后,如果是线程池的workerCount变为0,则创建新线程/
else if(workerCountOf(recheck)==0)
addWorker(null, false);
}
/*线程池runState不为RUNNING,或工作队列无法添加任务,这两种情况至少满足一种时,尝试
添加新线程,如果添加失败,说明线程池确实处于shut down状态了,此时拒绝执行任务/
else if(!addWorker(command, false))//③
reject(command);
}
ThreadPoolExecutor提供了线程池的默认实现,该线程池提供两个整型数runState和workerCount跟踪线程池的控制状态(ctl status)。其中runState表示线程池的运行状态,用于线程池生命周期的控制;workerCount表示线程池中允许start和没有stop的Worker数量,即线程池中当前工作线程的数量。
runState可以有以下五种状态:
RUNNING:运行状态,可以接受新任务,也可以处理工作队列中已有任务。
SHUTDOWN:关闭状态,不可以接收新任务,但可以处理工作队列中已有任务。
STOP:停止状态,不可以接收新任务,不可以处理工作队列中已有任务,并且要中断正在执行的任务。
TIDYING:整理状态,所有任务都已结束,workerCount为0,准备执行terminated()方法。
TERMINATED:终止状态,terminated()方法执行完毕。
runState以一个32位整型数的高3位表示,workerCount则通过CAPACITY限制为29位二进制数,因此可以调用ctlOf方法将runState和workerCount做“|”运算,这样将runState和workerCount分别存储到整数ctl的高3位和低29位。
线程池的execute方法所作的工作可以分成三部分,如上述代码中①②③所示。
1)在①中,如果线程池中工作线程的数量小于线程池定义的核心线程数量,需要调用addWorker(command, true)创建新的工作线程。
2)在②中,如果线程池处于运行状态,并且工作队列中可以插入新的任务,此时对ctl进行重新检查。如果检查结果是runState发生改变,则需要从工作队列中移除①中加入的任务;如果检查结果是workerCount发生变化,则需要调用addWorker(null, false)创建新的工作线程。
3)不满足②的情况下,即线程池runState不为RUNNING、工作队列无法添加任务这两种情况至少满足一种时,进入③中。此时需要调用addWorker(command, false),尝试创建工作线程,如果创建失败,说明线程池确实shut down了,此时拒绝执行任务。
上述三部分工作都是围绕addWorker展开的,只是传入的参数不同而已。接下来分析addWorker方法,代码如下:
private boolean addWorker(Runnable firstTask, boolean core){
retry://retry标签标记外层的for循环
for(;){
int c=ctl.get();//获取线程池的控制状态
int rs=runStateOf(c);//从控制状态中获取runState
/*根据逻辑运算规则,该判定条件可以转换为rs>=SHUTDOWN&&
*((rs!=SHUTDOWN||firstTask!=null||workQueue.isEmpty())
表示当线程池处于非RUNNING状态时,只要满足||运算的任一条件便直接返回/
if(rs>=SHUTDOWN&&
!(rs==SHUTDOWN&&
firstTask==null&&
!workQueue.isEmpty()))
return false;
for(;){//内层for循环
int wc=workerCountOf(c);//获取线程池的当前工作线程数
/*工作线程数已经达到CAPACITY的限制;
*如果参数core为true,需要判断工作线程数是否大于corePoolSize;
*如果参数core为false,需要判断工作线程数是否超过maximumPoolSize;
上述三种情况下,都需要直接返回/
if(wc>=CAPACITY||
wc>=(core?corePoolSize:maximumPoolSize))
return false;
//workCount加1,成功则跳出外层for循环,到①处的后续代码继续执行
if(compareAndIncrementWorkerCount(c))
break retry;
//如果workCount加1操作失败
c=ctl.get();//重新获取ctl
if(runStateOf(c)!=rs)//如果runState改变则继续外层for循环
continue retry;
//运行到此处说明workerCount改变,开始内层for循环的下一次循环
}
}
//①workCount成功加1后,就可以创建工作线程处理任务了
//将mFuture封装成Worker
Worker w=new Worker(firstTask);
Thread t=w.thread;
final ReentrantLock mainLock=this.mainLock;//访问工作线程池需要加锁
mainLock.lock();
try{
……
/workers的类型是HashSet<Worker>,将工作线程加入到工作线程池/
workers.add(w);
……
}finally{
mainLock.unlock();
}
t.start();//②启动t,注意这里不是启动w
……
return true;
}
addWorker的核心工作可以分成两部分,在上述代码中以①和②标出。其中①用于创建工作线程Worker,②用于启动工作线程。
Worker是ThreadPoolExecutor的内部类,代码如下:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/Worker运行于thread中/
final Thread thread;
/初始任务/
Runnable firstTask;
/Per-thread task counter/
volatile long completedTasks;
Worker(Runnable firstTask){
//新创建的工作线程可以指定一个初始任务
this.firstTask=firstTask;
/*通过工厂方法创建一个线程,该线程工厂在创建ThreadPoolExecutor时
指定newThread方法将当前Worker对象封装成Thread返回/
this.thread=getThreadFactory().newThread(this);
}
public void run(){//工作线程的运行代码
runWorker(this);
}
……
}
Worker是Runnable的子类,在其内部定义了一个Thread类型的成员变量thread。构造Worker对象时,通过工厂方法创建一个新线程,将Worker封装成Thread存入成员变量thread中。这样便可以通过thread.start方法启动Worker,进而执行Worker的run方法。
在Worker的run方法中调用runWorker方法,其代码如下:
final void runWorker(Worker w){
Runnable task=w.firstTask;//获取工作线程的第一个任务
w.firstTask=null;
boolean completedAbruptly=true;
try{
//如果指定了第一个任务则首先执行第一个任务,否则从工作队列中提取任务
while(task!=null||(task=getTask())!=null){
w.lock();
clearInterruptsForTaskRun();
try{
beforeExecute(w.thread, task);//方法体为空,可以在子类中覆盖
Throwable thrown=null;
try{
//调用任务的run方法执行任务,该任务此时运行于Worker线程中
task.run();
}catch(RuntimeException x){
……
}finally{
afterExecute(task, thrown);//方法体为空
}
}finally{
task=null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly=false;
}finally{
processWorkerExit(w, completedAbruptly);
}
}
可见runWorker方法是在工作线程中执行任务的run方法。而任务封装了FutureTask类型的mFuture对象,执行任务的run方法会调用mFuture的run方法,这部分内容在分析SerialExecutor的execute方法时已经分析过。接下来分析FutureTask的run方法,代码如下:
public void run(){
sync.innerRun();
}
在FutureTask的run方法中会调用其成员变量sync的innerRun方法,代码如下:
public class FutureTask<V>implements RunnableFuture<V>{
/用于FutureTask的同步控制/
private final Sync sync;
//创建FutureTask时,传入一个Callable类型的参数,该参数是WorkerRunnable类型的mWorker
public FutureTask(Callable<V>callable){
if(callable==null)
throw new NullPointerException();
sync=new Sync(callable);
}
private final class Sync extends AbstractQueuedSynchronizer{
……
void innerRun(){
if(!compareAndSetState(READY, RUNNING))
return;
runner=Thread.currentThread();
if(getState()==RUNNING){//recheck after setting thread
V result;
try{
result=callable.call();//调用的是mWorker的call方法
}catch(Throwable ex){
setException(ex);
return;
}
set(result);
}else{
releaseShared(0);//cancel
}
}
可见,sync的innerRun最终调用了其成员变量callable的call方法,而callable在创建mFuture时保存的是mWorker对象,因此这里实际调用了mWorker的call方法。该方法代码如下:
mWorker=new WorkerRunnable<Params, Result>(){
public Result call()throws Exception{
mTaskInvoked.set(true);
Process.setThreadPriority(
Process.THREAD_PRIORITY_BACKGROUND);
return postResult(doInBackground(mParams));
}
};
mWorker的call方法会调用doInBackground方法,并将返回结果作为postResult方法的参数,这样便实现了在后台线程中执行doInBackground。
接下来分析postResult,代码如下:
public abstract class AsyncTask<Params, Progress, Result>{
……
private static final int MESSAGE_POST_RESULT=0x1;
//InternalHandler是Handler的子类
private static final InternalHandler sHandler=new InternalHandler();
……
private Result postResult(Result result){//Result是个泛型参数
@SuppressWarnings("unchecked")
/*通过sHandler获取what标记为MESSAGE_POST_RESULT的消息。
*obtainMessage方法的第一个参数存入消息的what成员变量中,第二个参数
存入消息的obj成员变量中,消息的target存储的是当前对象sHandler/
Message message=sHandler.obtainMessage(MESSAGE_POST_RESULT,
new AsyncTaskResult<Result>(this, result));
message.sendToTarget();//将消息发送到sHandler处理
return result;
}
由以上代码可知,doInBackground方法执行完毕后将执行结果传入postResult方法,在该方法中以AsyncTaskResult类型的对象封装处理结果,并将其放入消息的obj成员变量中,然后通过Handler机制处理消息。首先分析AsyncTaskResult,其代码如下:
private static class AsyncTaskResult<Data>{
final AsyncTask mTask;
final Data[]mData;
AsyncTaskResult(AsyncTask task, Data……data){
mTask=task;//本例中即AddCallTask
mData=data;//执行结束和执行过程的数据,本例只分析执行结束
}
}
AsyncTaskResult只是将异步任务及其执行数据做了一个绑定,AsyncTaskResult最终封装到消息的obj成员变量中,发送到消息处理器InternalHandler。
InternalHandler是AsyncTask的内部类,其代码如下:
public abstract class AsyncTask<Params, Progress, Result>{
……
private static final InternalHandler sHandler=new InternalHandler();
……
private static class InternalHandler extends Handler{
@SuppressWarnings({"unchecked","RawUseOfParameterizedType"})
@Override
public void handleMessage(Message msg){
//从消息的obj成员变量中取出发送时存入的AsyncTaskResult
AsyncTaskResult result=(AsyncTaskResult)msg.obj;
switch(msg.what){
case MESSAGE_POST_RESULT://对应于执行结束的消息
//result的mTask存储的是异步任务
result.mTask.finish(result.mData[0]);
break;
case MESSAGE_POST_PROGRESS://对应于执行过程的消息
result.mTask.onProgressUpdate(result.mData);
break;
}
}
}
可见InternalHandler最终将消息的处理转发给异步任务的finish和onProgressUpdate方法处理。由于异步任务必须在UI线程中创建,因此其成员变量sHandler,以及finish和onProgressUpdate方法便定义于UI线程,当线程池中的工作线程执行doInBackground方法后,将处理结果以消息的形式发送到sHandler,这样便可以利用sHandler更新UI控件。
接下来分析finish方法,代码如下:
public abstract class AsyncTask<Params, Progress, Result>{
……
private void finish(Result result){
if(isCancelled()){
onCancelled(result);//由异步任务的子类定义具体的Cancel操作
}else{
onPostExecute(result);//由异步任务的子类定义具体的后续处理操作
}
mStatus=Status.FINISHED;//更改异步任务的状态,不能重复执行
}
finish方法很简单,只是根据异步任务的执行情况,分别调用不同的处理方法,而这些处理方法都是运行于UI线程的,可以由子类定义其具体行为。