29.6.2 并发选项

一般情况下,Parallel.Invoke、Parallel.For以及Parallel.ForEach这些操作执行时具有如下特性:

❑任务的调用使用默认的TaskScheduler

❑并行度不受任何限制

❑不能从外部取消任务的运行

使用并发选项可以改变上述的默认行为,例如使用自定义的TaskScheduler,或者指定特定的并行度等。接下来,我们将分别介绍这些并发选项。

1.任务计划

默认地,并行任务使用默认的TaskScheduler进行任务计划,我们可以使用ParallelOptions.TaskScheduler属性来指定另一个自定义的someTaskScheduler,如下所示:


ParallelOptions options=new ParallelOptions{TaskScheduler=someTaskScheduler};

Parallel.Invoke(options,

()=>{action1();},

()=>{action2();},

()=>{action3();},

()=>{action4();});


上述示例代码中,并行任务使用someTaskScheduler进行任务计划,而不是默认的TaskScheduler。注意,如果把ParallelOptions.TaskScheduler赋值为null,这意味着使用上下文中的TaskScheduler,考虑如下代码:


Task.Factory.StartNew(()=>

{

ParallelOptions options=new ParallelOptions{TaskScheduler=null};

Parallel.Invoke(options,

()=>{},

()=>{},

()=>{});

},CancellationToken.None,

TaskCreationOptions.None,someCustomTaskScheduler);


在上述代码中,ParallelOptions实例的TaskScheduler属性为null,此实例用以配置Parallel.Invoke中操作的行为,因为TaskScheduler为null,因此它将使用外层代码的someCustomTaskScheduler作为替代。

2.最大并行度

一个并行操作默认会使用所允许的最大的并行度运行,使用ParallelOptions.MaxDegreeOfParallelism属性可以控制并行操作允许的最大并行度。考虑代码清单29-18:

代码清单29-18 最大并行度示例代码


using System;

using System.Collections.Generic;

using System.Threading;

using System.Threading.Tasks;

namespace ProgrammingCSharp4

{

class ParallelSample

{

//所有参与任务计算的线程id集合

private static List<int>threadIDs=new List<int>();

static void Main(string[]args)

{

ParallelOptions options=new ParallelOptions

{

//设置最大并行度为10

MaxDegreeOfParallelism=10

};

Parallel.For(0,100000,options,delegate(int i)

{

//将参与计算的线程id加入到threadIDs集合

AddThreadID(Thread.CurrentThread.ManagedThreadId);

Console.WriteLine(i);

});

//输出所有参与计算的线程id

foreach(int id in threadIDs)

{

Console.WriteLine(“线程:{0}”,id);

}

}

//将线程id加入到threadIDs集合

private static void AddThreadID(int id)

{

//保证对于相同的线程id不重复添加

if(!threadIDs.Contains(id))

{

threadIDs.Add(id);

}

}

}

}


在上面的示例代码中,Parallel.For循环值只允许最多10个工作线程。需要说明的是,MaxDegreeOfParallelism属性仅对当前应用的一个并行操作有效。例如,在下面的示例中,仅外部的Parallel.For受MaxDegreeOfParallelism设置影响,里面的Parallel.For则不受任何影响。


ParallelOptions options=new ParallelOptions

{

MaxDegreeOfParallelism=2

};

Parallel.For(0,1000,options,delegate(int i)

{

Parallel.For(0,1000,delegate(int j)

{

Console.WriteLine("{0}-{1}",i,j);

});

});


3.取消一个任务

使用ParallelOptions.CancellationToken属性,可以取消一个并行执行的任务。方法就是声明一个CancellationTokenSource实例,然后从这个实例获取一个CancellationToken对象,并将此对象作为参数提供给一个并行操作。如果需要取消此并行操作,只需调用CancellationTokenSource实例的Cancel方法。如果在操作内部调用Cancle方法,那么任务将抛出一个OperationCanceledException异常并结束;如果在外部操作,则在操作的内部需要对token.IsCancellationRequested进行检查,如果为true,则取消任务并返回。示例请参见代码清单29-19。

代码清单29-19 取消任务示例代码


using System;

using System.Collections.Generic;

using System.Linq;

using System.Threading;

using System.Threading.Tasks;

namespace ProgrammingCSharp4

{

class ParallelSample

{

public static void Main(string[]args)

{

//声明一个CancellationTokenSource类型的对象

CancellationTokenSource tokenSource=new CancellationTokenSource();

//获取CancellationToken对象

CancellationToken token=tokenSource.Token;

bool result=false;

Task antecedent=Task.Factory.StartNew(()=>

{

//检查任务是否被取消

while(!token.IsCancellationRequested)

{

Console.WriteLine(“前置任务正在运行。”);

}

},token);

Thread.Sleep(100);

Console.WriteLine(“即将取消antecedent任务的运行……”);

tokenSource.Cancel();

Console.WriteLine(“antecedent任务已取消”);

}

}

}


运行结果省略。