29.6 TaskFactory类和TaskFactory<TResult>类
TaskFactory类是一个工厂类,用于创建Task类的实例。TaskFactory<TResult>是它的泛型版本,用于创建和计划Task<TResult>对象。从工厂模式[1]的角度来讲,TaskFactory是工厂类,而Task是产品类。
TaskFactory包含用来执行如下操作的方法:
❑创建任务并立即启动任务。
❑创建延续任务。
一般来说,创建Task类的实例有三种方式:
❑直接实例化。
❑在Task类中持有一个私有的静态的TaskFactory实例,因此可以通过Task的Factory属性获取此TaskFactory实例,接着就可以通过TaskFactory获取Task的实例。
❑先实例化TaskFactory,再获取Task的实例。
建议大家使用后两种方式,将Task对象的创建责任交给TaskFactory工厂来做,这更符合工厂模式的初衷,如下所示。
Task taskA=Task.Factory.StartNew(()=>……);
接下来介绍一下TaskFactory的构造函数,每种构造函数也对应实例化TaskFactory的一种方式。TaskFactory的StartNew方法是创建和计划任务的首选机制,但是对于创建和计划必须分开的情况,可以使用构造函数。TaskFactory的构造函数如表29-7所示。
虽然TaskFactory类有5个构造函数,但实际上,前4个都是对第5个构造函数的显式调用,以默认构造函数为例,如下所示:
public TaskFactory():
this(CancellationToken.None,
TaskCreationOptions.None,
TaskContinuationOptions.None,
null){}
再来看第2个构造函数,实际上也是对第5个构造函数的显式调用,例如:
public TaskFactory(CancellationToken cancellationToken):
this(cancellationToken,
TaskCreationOptions.None,
TaskContinuationOptions.None,
null){}
其余几个构造函数也类似,这些参数我们称之为任务选项,它们是任务并行库提供的一系列选项,使用它们可以对并行操作进行各种控制,从任务的创建到并发控制都可以配置。接下来将分别介绍这些参数,它们的说明如表29-8所示。
接下来将分别对上述几种配置类进行详细介绍。这里主要介绍TaskCreationOptions,Task-ContinuationOptions以及ParallelOptions,这三个类分别对应着:任务创建选项、后续任务选项以及并发选项。
29.6.1 任务创建选项
当我们创建一个任务,它具有一些默认的行为,或者说有一些默认设置。这些设置如下:
❑任务默认按“后进先出(LIFO)”的模式执行。
❑任务没有一个逻辑上的“父级任务”。
❑任务被假定是运行时间非常短的。
当这些被设定好的默认行为无法满足要求时,就需要通过设置任务创建选项来定制任务的行为。TaskCreationOptions是一个枚举类型,它的4个成员可以控制任务的创建和执行时的行为,表29-9是对这些成员的说明。
1.默认选项
默认选项对应TaskCreationOptions.None选项。当一个任务采用默认选项,表明该任务:
❑没有父级任务;
❑是运行时间较短的任务;
❑较早创建的任务可能较晚执行。
该选项在一些情况下是有用的,比如需要一个TaskCreationOptions类型的参数,却并不需要改变任务的默认创建行为,此时就可以使用TaskCreationOptions.None选项。例如,当创建一个任务时需要指定CancellationToken和TaskScheduler类型的参数时,必须指定一个TaskCreationOptions类型的参数,如代码清单29-9所示。
代码清单29-9 TaskCreationOptions.None使用示例代码
CancellationTokenSource someCancellationTokenSource=
new CancellationTokenSource();
CancellationToken someCancellationToken=
someCancellationTokenSource.Token;
TaskScheduler someTaskScheduler=new CustomTaskScheduler();
Task t1=Task.Factory.StartNew(
()=>
{
Console.WriteLine("from delegate.");
},
someCancellationToken,
TaskCreationOptions.None,
someTaskScheduler);
2.让较早创建的任务较早执行
使用TaskCreationOptions.PreferFairness选项,可以请求默认计划程序(TaskScheduler)尽量公平地安排任务的执行,默认是按照LIFO的原则执行。如果指定此选项,将按照FIFO的原则执行,即较早安排的任务将更可能较早运行,而较晚安排运行的任务将更可能较晚运行。但需要注意的是,此选项可能会在某些情况下失效——因为有些计划程序可能会完全忽略此选项。然而,如果使用的是默认计划程序,则包含了对此选项的处理,例如ThreadPoolTaskScheduler。
为了更好地理解这一点,我们不妨先了解一下CLR的内部工作机制。默认任务计划程序基于.NET 4的线程池(ThreadPool)。ThreadPool提供了一个全局工作队列,并为每个工作线程提供了本地工作队列。全局工作队列的处理顺序为先进先出(FIFO),但本地队列将按后进先出顺序(LIFO)访问,以便保留缓存位置并减少争用。
当一个线程准备好执行更多工作时,它将首先在本地队列中查找。如果工作项正等在那里,则很快就会被执行。当本地队列没有了排队的工作项时,它将依次在全局队列和其他线程的本地队列中查找。如果它在另一个线程的本地队列中找到工作项,将会首先应用试探法来确保能够有效地运行该工作。如果能够有效运行工作,它将按LIFO顺序使队列末尾的工作项离队。这称为“工作窃取(work-stealing)”。工作窃取的目的是提供负载平衡,可帮助确保当其他线程在其队列仍然有工作时,没有线程处于空闲状态。
顶级任务(即不是在另一个任务的上下文中创建的任务)将放入全局队列,而在另一个任务的上下文中创建的嵌套任务或子任务则放入本地队列。然而,如果使用PreferFairness选项创建任务,则此任务将进入全局队列。
实际上,使用PreferFairness选项确保了任务将被加入到全局队列。考虑如下代码,如代码清单29-10所示。
代码清单29-10 使用PreferFairness选项创建任务示例
using System;
using System.Threading;
using System.Threading.Tasks;
namespace ProgrammingCSharp4
{
class ParallelSample
{
public static void Main(string[]args)
{
//顶级任务
Task A=Task.Factory.StartNew(()=>
{
//嵌套任务
Task B=Task.Factory.StartNew(()=>
{
Console.WriteLine("Task B");
});
//嵌套任务,指定了PreferFairness选项
Task C=Task.Factory.StartNew(()=>
{
Console.WriteLine("Task C");
},TaskCreationOptions.PreferFairness);
Console.WriteLine("Task A");
});
Thread.Sleep(1000);
}
}
}
上述代码的运行结果为:
Task A
Task C
Task B
请按任意键继续……
在上述代码中;
❑任务A是顶级任务,因此它进入了ThreadPool的全局共享队列。它很快会被一个工作线程获取并执行;
❑任务B是在任务A的上下文中创建的,因此它是嵌套任务,它进入的是工作线程的本地队列;
❑任务C和任务B一样,也是创建于执行任务A的上下文中,但由于它使用了PreferFairness选项,因此它进入的是全局队列,而不是本地队列。需要注意的是:
❑虽然默认的TaskScheduler支持PreferFairness选项,但并不保证所有的TaskScheduler都支持它,此选项有可能被完全忽略;
❑即使使用的是默认的TaskScheduler,在并行计算的过程也有一些不确定性。虽然使用PreferFairness选项可以保证多个任务在.NET4中入队和离队的顺序一致,但无法确定任务的实际执行顺序。如果任务1在任务2之前离队,但基于各种不确定因素,并不意味着任务1在任务2前先开始执行;
❑虽然全局队列按照先入先出的顺序处理任务,但并不保证在未来更新的.NET版本中也这样处理。ThreadPool的未来版本可能处理全局队列时采用近似的先进先出策略,而不是精确的先进先出策略;
❑无法为Parallel类的For、ForEach以及Invoke方法启动的任务使用PreferFairness选项。
PreferFairness选项的一个典型使用场景是,一个任务的执行依赖于子任务的执行,但该任务却被阻塞,因为子任务中要求进行等待,此任务将继续执行直到子任务执行,如代码清单29-11所示。
代码清单29-11 父级任务等待子任务执行
using System;
using System.Threading;
using System.Threading.Tasks;
namespace ProgrammingCSharp4
{
class ParallelSample
{
public static void Main(string[]args)
{
//顶级任务
Task<int>outer=Task<int>.Factory.StartNew(()=>
{
//支持任务取消
CancellationTokenSource cts=
new CancellationTokenSource();
CancellationToken token=cts.Token;
//嵌套的任务数组
Task<int>[]tasks=new Task<int>[3];
tasks[0]=Task<int>.Factory.StartNew(()=>
{
FunctionA(token);
return 0;
},token);
tasks[1]=Task<int>.Factory.StartNew(()=>
{
FunctionB(token);
return 1;
},token);
tasks[2]=Task<int>.Factory.StartNew(()=>
{
FunctionC(token);
return 2;
},token);
//任务数组中的任何一个任务执行完成就立刻返回已执行完毕任务
//的索引
int winnerIndex=Task.WaitAny(tasks);
//取消任务执行
cts.Cancel();
//返回任务执行的结果
return tasks[winnerIndex].Result;
});
Console.WriteLine(outer.Result);
}
private static void FunctionC(CancellationToken token)
{
int i=0;
//检查任务是否已取消,是则退出循环
while(!token.IsCancellationRequested&&i++<10)
{
Console.WriteLine("FunctionC:{0}",i);
Thread.Sleep(100);
}
}
private static void FunctionB(CancellationToken token)
{
int i=0;
//检查是否任务已取消,是则退出循环
while(!token.IsCancellationRequested&&i++<50)
{
Console.WriteLine("FunctionB:{0}",i);
Thread.Sleep(100);
}
}
private static void FunctionA(CancellationToken token)
{
int i=0;
//检查是否任务已取消,是则退出循环
while(!token.IsCancellationRequested&&i++<30)
{
Console.WriteLine("FunctionA:{0}",i);
Thread.Sleep(100);
}
}
}
}
在上述代码中,outer是一个顶级任务,它创建并启动了3个嵌套任务。嵌套任务开始运行后outer任务就处于等待状态,直到某一个嵌套任务执行完毕,其余的任务也随后被取消,在FunctionA、FunctionB以及FunctionC的方法体中,我们检查了token的IsCancellationRequested属性,如果任务被取消则退出循环。
如果ThreadPool的负载不高,那么上述代码可以运行得很好,tasks[0]、tasks[1]以及tasks[2]这3个任务可以被空闲的线程“窃取”然后执行,如图29-4所示。
图 29-4 ThreadPool空闲情况下运行示意图
然而,如果ThreadPool的负载较高,上述代码就可能出现问题,因为所有的线程都处于繁忙状态,没有空闲线程来执行这些嵌套任务。一般来说,outer任务会一直等到全局队列中所有工作项执行完毕才会继续,如果一个系统非常的繁忙,那么等待全局线程队列中的任务都执行完将需要一段不短的时间。实际上,如果任务进入队列的速度超过任务被处理的速度,那么outer任务将永远不能结束。如图29-5所示。
图 29-5 系统负载较高情况下的执行情况
要应对这种问题,一个更好的方式就是使用PreferFairness选项来创建任务,以使这些任务直接进入全局队列,如代码清单29-12所示。
代码清单29-12 使用PreferFairness选项来创建任务
Task<int>outer=Task<int>.Factory.StartNew(()=>
{
CancellationTokenSource cts=
new CancellationTokenSource();
CancellationToken token=cts.Token;
Task<int>[]tasks=new Task<int>[3];
tasks[0]=Task<int>.Factory.StartNew(()=>
{
FunctionA(token);
return 0;
},token,TaskCreationOptions.PreferFairness,TaskScheduler.Default);
tasks[1]=Task<int>.Factory.StartNew(()=>
{
FunctionB(token);
return 1;
},token,TaskCreationOptions.PreferFairness,TaskScheduler.Default);
tasks[2]=Task<int>.Factory.StartNew(()=>
{
FunctionC(token);
return 2;
},token,TaskCreationOptions.PreferFairness,TaskScheduler.Default);
int winnerIndex=Task.WaitAny(tasks);
cts.Cancel();
return tasks[winnerIndex].Result;
});
在上述代码中,3个子任务都进入了全局队列,此队列处理的顺序大致为先入先出,在执行完毕的工作项和新的工作项之间交替运行,因此任务的处理是可以得到保证的,如图29-6所示。
图 29-6 使用PreferFairness选项将任务加入全局线程队列
事实上,利用.NET4中可定制TaskFactory特性可以将代码清单29-12进一步简化,如代码清单29-13所示。
代码清单29-13 使用TaskFactory简化内部任务的创建
Task<int>outer=Task<int>.Factory.StartNew(()=>
{
CancellationTokenSource cts=
new CancellationTokenSource();
CancellationToken token=cts.Token;
var taskFactory=new TaskFactory(token,
TaskCreationOptions.PreferFairness,
TaskContinuationOptions.None,
TaskScheduler.Default);
Task<int>[]tasks=new Task<int>[3];
tasks[0]=taskFactory.StartNew(()=>
{
FunctionA(token);
return 0;
});
tasks[1]=Task<int>.Factory.StartNew(()=>
{
FunctionB(token);
return 1;
});
tasks[2]=Task<int>.Factory.StartNew(()=>
{
FunctionC(token);
return 2;
});
int winnerIndex=Task.WaitAny(tasks);
cts.Cancel();
return tasks[winnerIndex].Result;
});
3.需要长时间运行的任务
默认情况下,任务被认为仅仅执行一些短时间运行的操作,如果需要执行一些耗时较长的任务,则需要在创建任务时指定LongRunning选项,此选项会指示TaskScheduler提高并发级别以允许更多的任务并行执行。这和Win32线程池中的WT_EXECUTELONGFUNCTION标志比较类似。如果使用了LongRunning选项,TaskScheduler将为任务分配一个一个专用的线程来执行它,而不是使用ThreadPool中的线程。注意,自定义的计划程序有可能忽略此选项。
代码清单29-14 演示了如何LongRunning选项的用法。
代码清单29-14 使用LongRunning选项创建任务
using System;
using System.Threading;
using System.Threading.Tasks;
namespace ProgrammingCSharp4
{
class ParallelSample
{
public static void Main(string[]args)
{
//短时间运行模式
LongRunningTest(false, "第一次运行");
//长时间运行模式
LongRunningTest(true, "第二次运行");
}
static void LongRunningTest(bool longRunningOn,string msg)
{
bool stopLongRunningTasks=false;
TaskCreationOptions options=longRunningOn?
TaskCreationOptions.LongRunning:
TaskCreationOptions.None;
Task t1=Task.Factory.StartNew(()=>
{
while(!stopLongRunningTasks)
{
Console.WriteLine("{0}:{1}",msg,
Thread.CurrentThread.IsThreadPoolThread);
Thread.Sleep(1000);
};
},options);
Task t2=Task.Factory.StartNew(()=>
{
while(!stopLongRunningTasks)
{
Console.WriteLine("{0}:{1}",msg,
Thread.CurrentThread.IsThreadPoolThread);
Thread.Sleep(1000);
};
},options);
Parallel.For(0,10,_=>{Thread.Sleep(10);});
stopLongRunningTasks=true;//kill t1&t2
Task.WaitAll(t1,t2);
}
}
}
上述代码的运行结果为:
第一次运行:True
第一次运行:True
第二次运行:False
第二次运行:False
请按任意键继续……
在上述代码中,如果longRunningOn参数的值为true,那么将使用LongRunning选项创建t1和t2两个任务。接下来,分别使用参数true和false执行LongRunningTest方法2次,从运行的结果可以看出,使用LongRunning选项创建的任务使用独立的线程,而非ThreadPool中的线程,反之则相反。
那么,什么时候应该使用LongRunning选项呢?一般的规则是,任何运行超过数秒的任务都应该使用LongRunning选项创建。现列举其中的一些场景如下:
❑占用CPU超过数秒的任务;
❑需要花大量时间等待I/O操作的任务;
❑大部分时间都在休眠的任务,例如一个“报告”任务偶尔才被激活,并进行状态报告。
需要说明的是,并非使用LongRunning选项就一定可以提高应用程序的性能!当我们考虑是否使用此选项时,建议先在使用和不用之间进行效果对比之后再做决定。
4.为任务指定一个父级
一般情况下,任务是没有父级的,但是可以通过指定AttachedToParent选项,可以将一个任务设置为另一个任务的子任务,从而在两个任务之间建立层级关系,这种任务之间的层级结构技术称为并行结构,如代码清单29-15所示。
代码清单29-15 AttachedToParent选项示例代码
using System;
using System.Threading;
using System.Threading.Tasks;
namespace ProgrammingCSharp4
{
class ParallelSample
{
public static void Main(string[]args)
{
Task t1=Task.Factory.StartNew(()=>
{
Console.WriteLine("{0}:{1}",
"t1",
Thread.CurrentThread.ManagedThreadId);
//t1的子任务
Task t2=Task.Factory.StartNew(()=>
{
Console.WriteLine("{0}:{1}",
"t2",
Thread.CurrentThread.ManagedThreadId);
},TaskCreationOptions.AttachedToParent);
//t1的嵌套任务
Task t3=Task.Factory.StartNew(()=>
{
Console.WriteLine("{0}:{1}",
"t3",
Thread.CurrentThread.ManagedThreadId);
});
});
Thread.Sleep(1000);
}
}
}
上述代码中,t2是t1的子任务,即t1是t2的父级任务。而t1和t3之间没有父子关系,t3是个嵌套任务。在任务之间建立这种层级关系有如下几点特性:
❑父级任务会等待所有子级任务都完成后才结束。
❑除非一个父级任务直接监测子级的异常,子级任务的异常将向上传播到它的父级。
再来看另一段代码,如代码清单29-16所示。
代码清单29-16 异常在父子任务间的传播示例
using System;
using System.Threading.Tasks;
namespace ProgrammingCSharp4
{
class ParallelSample
{
public static void Main(string[]args)
{
Task outer=Task.Factory.StartNew(()=>
{
//outer的子任务
Task inner1=Task.Factory.StartNew(()=>
{
throw new Exception(“inner1的异常”);
},TaskCreationOptions.AttachedToParent);
//outer的子任务
Task inner2=Task.Factory.StartNew(()=>
{
throw new Exception(“inner2的异常”);
},TaskCreationOptions.AttachedToParent);
//嵌套任务
Task inner3=Task.Factory.StartNew(()=>
{
throw new Exception(“inner3的异常”);
});
//outer的子任务
Task inner4=Task.Factory.StartNew(()=>
{
throw new Exception(“inner4的异常”);
},TaskCreationOptions.AttachedToParent);
//调用Wait()方法等待inner4完成
((IAsyncResult)inner4).AsyncWaitHandle.WaitOne();
Console.WriteLine(“inner4的异常:{0}”,
inner4.Exception);
try
{
//等待inner1任务完成
inner1.Wait();
}
catch(Exception e)
{
Console.WriteLine(“inner1抛出异常:{0}”,e);
}
});
try
{
//等待outer任务执行完成
outer.Wait();
}
catch(Exception e)
{
Console.WriteLine(“outer抛出异常:{0}”,e);
}
}
}
}
接下来分析下上述代码中的inner1、inner2、inner3以及inner4这4个任务抛出的异常,如下所示。
❑inner1是一个子任务,它的异常当调用inner1.Wait()时由其父级任务(outer)捕获,并且此异常不再继续传播。
❑inner2也是一个子任务,但它的异常并未被父级任务(outer)监测,因此该异常被传播至它的父级任务,outer.Wait()会抛出一个AggregateException异常,它封装了inner2抛出的异常。
❑inner3是一个嵌套任务,不是子任务,所以它的异常并未被监测,当然也不会被传播至outer任务。对于此类未被监测的异常,会传播到终结线程,并从那里抛出。
❑inner4和inner2一样,异常都被抛给它们的父级,并都包括在从outer.Wait()调用抛出的AggregateException异常中。
继续看另外一个例子,如代码清单29-17所示。
代码清单29-17 另外一个使用AttachedToParent选项的例子
using System.Threading;
using System.Threading.Tasks;
namespace ProgrammingCSharp4
{
class ParallelSample
{
public static void Main(string[]args)
{
Task outer=Task.Factory.StartNew(()=>
{
Task idle=new Task(()=>{},
TaskCreationOptions.AttachedToParent);
Task inner=Task.Factory.StartNew(()=>
{
while(true);
},TaskCreationOptions.AttachedToParent);
Thread.Sleep(1000);
});
outer.Wait();
}
}
}
仔细观察一下上述代码,outer.Wait()将会在什么时间结束?立刻?一秒钟?一分钟?答案是永远不会结束。因为它的2个子级任务永远不结束。
❑inner任务因为含有一个无限循环而永不结束。
❑idle任务则没有inner任务直观,它永不结束的原因是永远不能进入队列。因为idle任务创建为outer的子任务,但并没有进入默认计划程序被安排执行。因此此子任务不会被执行。而outer任务则将一直等待idle执行并结束,因此outer也将一直处于挂起状态。
因为outer和inner以及idle是父子关系,故而outer.Wait()将永远处于等待中,直到使用Ctrl+C组合键终止程序。
一个有益的经验是:既然树是一个按照层次结构组织的数据结构,那么很自然也很有效地方式就是使用并行结构。
另外,需要注意的是,尽量不要在类库程序中使用AttachedToParent选项创建任务,至少不应该在顶级方法中这么做。现举例如下,假设someLibraryMethod是类库中的一个方法:
static int reportingTaskStarted=0;
static int someLibraryMethod(int X,int Y)
{
//第一次调用首先启动报告任务
if((reportingTaskStarted==0)&&
(Interlocked.CompareExchange(ref reportingTaskStarted,1,0)==0))
{
Task.Factory.StartNew(()=>{someLongReportingLoop();},
TaskCreationOptions.AttachedToParent);
}
int result=X+Y;
return result;
}
//模拟一个需要执行的任务
private static void someLongReportingLoop()
{
Thread.Sleep(1000);
}
接下来就是对该方法的调用,如下:
Task<int>sampleTask=Task.Factory.StartNew(()=>
{
return someLibraryMethod(10,20);
});
Console.WriteLine(sampleTask.Result);
从上述代码可以看出,我们需要输出sampleTask任务运行的结果(sampleTask.Result),此处someLibraryMethod方法中创建的任务使用了AttachedToParent选项,因此sampleTask任务和该任务组成了父子关系。这将带来两个问题,如下:
❑因为sampleTask任务将等待子级任务完成才能结束,子级任务也将依赖其调用的someLong-ReportingLoop方法,如果someLongReportingLoop方法进入无限循环,那么sampleTask也将永远不能结束;
❑如果someLongReportingLoop方法中抛出了异常,此异常也将封装在AggregateException异常中并抛给其父级任务sampleTask。
[1]工厂模式:工厂模式是类的创建模式,主要角色包括工厂、产品以及消费者,其中工厂负责产品的创建,创建的产品被消费者使用。这里的工厂是TaskFactory或TaskFactory<TResult>,产品是Task或Task<TResult>,而消费者则是客户端。使用工厂模式,可以让客户端免去产品创建的责任,这实现了责任分离的原则。