10.2 使用Actor的并发
Scala的actor提供了一种基于事件的轻量级线程。只要使用scala.actors.Actor
伴生对象的actor()
方法,就可以创建一个actor。它接受一个函数值/闭包做参数,一创建好就开始运行。用!()
方法给actor发消息,用receive()
方法从actor接收消息。receive()
也可以闭包为参数,通常用模式匹配处理接收到的消息。
我们看个例子,假定我们需要判定一个给定的数是否是完全数④:
④完全数是一个正整数,其因子之和是该数的两倍。比如,第一个已知的完全数是6——其因子1、2、3、6,加起来是12。
ConcurrentProgramming/PerfectNumberFinder.scala
def sumOfFactors(number: Int) = {
(0 /: (1 to number)) { (sum, i) => if (number % i == 0) sum + i else sum }
}
def isPerfect(candidate: Int) = 2 * candidate == sumOfFactors(candidate)
这段代码按顺序计算了给定candidate
数的因子之和。这段代码有个问题。如果数值很大,顺序执行会非常慢。而且,如果在多核处理器上运行这段代码,也利用不到多核的优势。不管在什么时候,都是一个核做了所有艰苦的工作,没有用到其他核。
找几个数字⑤做例子试一下上面的代码,如下:
⑤本书的一个技术评审者试着传了一个非常大的数时,(接近于
scala.Math.MAX_INT
)遇到了问题。类似于Java,Scala超过极限也会导致溢出。因此,请小心检查在Scala代码里的溢出。ConcurrentProgramming/PerfectNumberFinder.scala
println("6 is perfect? " + isPerfect(6))
println("33550336 is perfect? " + isPerfect(33550336))
println("33550337 is perfect? " + isPerfect(33550337))
上面代码的输出如下:
6 is perfect? true
33550336 is perfect? true
33550337 is perfect? false
我的机器是双核的MacBook Pro,运行Mac OS X。根据Activity Monitor显示,两核加起来的利用率从60%到95%。对两个核而言,Activity Monitor显示的最大利用率是200%。这样看来,95%的含义就是,对这种运算密集型操作而言,在任意时刻,都只有一个核有效地利用了起来。或者,可以将其视为双核的能力只用到了一半。
将因子求和的计算划分到多线程上,可以获得更好的吞吐量。即便在只有一个处理器的机器上,应用也能获得更多的执行机会,得到更好的响应。
这样,将从1到candidate
数这个范围内的数划分成多个区间⑥,把每个区间内求和的任务分配给单独的线程。
⑥不过,选择区间的粒度是很需要技巧的。这取决于在哪个点上,并发的增长可以抵消协调配合的负担。
ConcurrentProgramming/FasterPerfectNumberFinder.scala
Line 1 import scala.actors.Actor._
-
- def sumOfFactorsInRange(lower: Int, upper: Int, number: Int) = {
- (0 /: (lower to upper)) { (sum, i) => if (number % i == 0) sum + i
else sum }
5 }
-
- def isPerfectConcurrent(candidate: Int) = {
- val RANGE = 1000000
- val numberOfPartitions = (candidate.toDouble / RANGE).ceil.toInt
10 val caller = self
-
- for (i <- 0 until numberOfPartitions) {
- val lower = i * RANGE + 1;
- val upper = candidate min (i + 1) * RANGE
15
- actor {
- caller ! sumOfFactorsInRange(lower, upper, candidate)
- }
- }
20
- val sum = (0 /: (0 until numberOfPartitions)) { (partialSum, i) =>
- receive {
- case sumInRange : Int => partialSum + sumInRange
- }
25 }
-
- 2 * candidate == sum
- }
-
30 println("6 is perfect? " + isPerfectConcurrent(6))
- println("33550336 is perfect? " + isPerfectConcurrent(33550336))
- println("33550337 is perfect? " + isPerfectConcurrent(33550337))
上面的代码里没有synchronized
或是wait
。在isPerfectConcurrent()
方法里,先对这个范围内的值进行分区。在第16行,对于每个区间而言,因子部分求和的计算委托给了单独的actor。当actor完成分配给它的任务,在第17行,它就会把部分和作为消息发送给调用者(caller)。在这个闭包里,caller
变量绑定到isPerfectConcurrent()
方法里的一个变量上——这个变量持有actor的引用,它是通过调用self()
方法得到的,表示主线程。最后,在第22行,从委托的actor中接收消息,一次一个。用foldLeft()
方法(这里显示为/:()
方法)接收了所有的部分和,以函数式风格计算了这些部分和的总和。
两种方式间在时间上并没有很大的差异。在我的机器上,顺序程序花了大约7秒,而并发程序花了大约5秒。结果很接近,考虑到系统执行其他操作所带来的影响,所以可能难以观察出差异。Activity Monitor显示第二种方式有120%到180%的利用率,这表示同时用到了多个核。为了让它更明显一些,下面对一个范围内的值查找完全数:
ConcurrentProgramming/FindPerfectNumberOverRange.scala
def countPerfectNumbersInRange(start : Int, end : Int,
isPerfectFinder : Int => Boolean) = {
val startTime = System.nanoTime()
val numberOfPerfectNumbers = (0 /: (start to end)) { (count, candidate) =>
if (isPerfectFinder(candidate)) count + 1 else count
}
val endTime = System.nanoTime()
println("Found " + numberOfPerfectNumbers +
" perfect numbers in given range, took " +
(endTime - startTime)/1000000000.0 + " secs")
}
val startNumber = 33550300
val endNumber = 33550400
countPerfectNumbersInRange(startNumber, endNumber, isPerfect)
countPerfectNumbersInRange(startNumber, endNumber, isPerfectConcurrent)
在countPerfectNumbersInRange()
里,统计了给定范围内从start
到end
之间能够发现多少个完全数。实际找出候选数是否是完全数的方法委托给闭包isPerfectFinder
——它是作为参数传进来的。在给定范围内查找完全数数量所花的时间由JDK的System.nanoTime()
方法计算得到。调用countPerfectNumbersInRange()
两次,先使用顺序实现isPerfect()
,然后用并发实现isPerfectConcurrent()
。
上面代码的输出如下:
Found 1 perfect numbers in given range, took 322.681763 secs
Found 1 perfect numbers in given range, took 219.511014 secs
这次确定从33 550 300开始的100个值里完全数的数量,同并发实现相比,顺序计算差不多多花了两分钟时间。