10.7 react
和reactWithin
方法
我们见识到了如何通过在actor间传递不变对象以避免竞争问题。现在还有一个问题需要解决。在每个actor里,调用receive()
的时候实际上会要求有一个单独的线程。这个线程会一直持有,直到这个actor结束。也就是说,即便是在等待消息到达,程序也会持有这些线程,每个actor一个;这绝对是一种资源浪费。
Scala不得不持有这些线程的原因在于,控制流的执行过程中有一些具体状态。如果在调用序列里没有需要保持和返回的状态,Scala几乎就可以从线程池里获取任意线程执行消息处理——这恰恰就是使用react()
所做的事情。
react()
不同于其表亲receive()
,它并不返回任何结果。实际上,它并不从调用中返回。调用完receive()
后,就会执行紧跟着这个调用的代码(就像任何典型的函数调用一样)。不过,调用react()
则不同,放在调用后的任何代码都是不可达的。或许,这会有些让人糊涂,如果稍微换个角度来看的话,就容易理解了。把调用react()
想象成调用它的线程在调用之后会立即释放(背后的实现相当复杂,为了做到这一点,Scala内部让react()
方法抛出异常,由调用的线程处理它)。接收到一个消息后,如果可以匹配到react()
方法里一个case
语句,就会从线程池分配一个线程,执行这个case
体。这个线程会一直运行,直到有另一个react()
调用,或是case
语句中没有代码可执行了。此时,线程返回,处理其他消息,或是去做虚拟机分配的其他任务。
如果处理了react()
的当前消息后,还要处理更多的消息,就要在消息处理的末尾调用其他方法。Scala会把这个调用执行交给线程池里的任意线程。看一个这种行为的例子:
ConcurrentProgramming/React.scala
import scala.actors.Actor._
def info(msg: String) = println(msg + " received by " + Thread.
currentThread())
def receiveMessage(id : Int) {
for(i <-1 to 2){
receiveWithin(20000) {
case msg : String => info("receive: " + id + msg) }
}
}
def reactMessage(id : Int) {
react {
case msg : String => info("react: " + id + msg)
reactMessage(id)
}
}
val actors = Array(
actor { info("react: 1 actor created"); reactMessage(1) },
actor { info("react: 2 actor created"); reactMessage(2) },
actor { info("receive: 3 actor created"); receiveMessage(3) },
actor { info("receive: 4 actor created"); receiveMessage(4) }
)
Thread.sleep(1000)
for(i <- 0 to 3) { actors(i) ! " hello"; Thread.sleep(2000) }
Thread.sleep(2000)
for(i <- 0 to 3) { actors(i) ! " hello"; Thread.sleep(2000) }
这里,receiveMessage()
使用receiveWithin()
方法处理接到的消息。在这个例子里,我们处于活跃的循环里,会获得更多的消息。另一方面,reactMessage()
使用react()
方法,它没有处于一个while
或for
循环里——相反,它会在末尾递归地调用自身。
然后,创建了四个actor,两个使用react()
,两个使用receiveWithin()
。最后,用相当慢的节奏,给这四个actor发了一系列消息。每个actor都报出在线程执行过程中收到的消息。
上面的代码输出如下:
react: 2 actor created received by Thread[Thread-4,5,main]
receive: 3 actor created received by Thread[Thread-6,5,main]
react: 1 actor created received by Thread[Thread-3,5,main]
receive: 4 actor created received by Thread[Thread-5,5,main]
react: 1 hello received by Thread[Thread-3,5,main]
react: 2 hello received by Thread[Thread-3,5,main]
receive: 3 hello received by Thread[Thread-6,5,main]
receive: 4 hello received by Thread[Thread-5,5,main]
react: 1 hello received by Thread[Thread-4,5,main]
react: 2 hello received by Thread[Thread-3,5,main]
receive: 3 hello received by Thread[Thread-6,5,main]
receive: 4 hello received by Thread[Thread-5,5,main]
使用receiveWithin()
方法的actor具有线程关联性(thread affinity);它们会持续的使用分配给它们的同一个线程。从上面的输出中可以看到:receive:3
总是由Thread-6
处理,receive:4
总是由Thread-5
处理。
另一方面,使用react()
的actor可以自由的交换彼此的线程,可以由任何可用的线程处理。从上面的输出可以看到,使用react: 1
的actor最初由Thread-3
执行。同一个线程碰巧还为这个actor执行了第一个消息的处理。不过,这个actor收到的第二个消息就是由不同的线程Thread-4
处理。后一个线程创建了使用react:2
的actor。不过,这个actor随后的消息由Thread-3
处理。
换句话说,使用react()
的actor不具有线程关联性;它们会放弃自己的线程,用一个新的线程(或许是同一个)进行后续的消息处理。这种做法对资源更为友善,特别是在消息处理相当快的情况下。所以,我们鼓励使用react()
来代替receive()
。因为线程是不确定的,所以运行上述代码的时候,你观察到的输出序列或许不同于我的。不妨多运行几次看看效果。
上面的代码有一个坏味道。调用react()
方法,必须要记住在处理消息的末尾调用适当的方法。否则这个actor就不再处理任何消息。然而,这样写的调用可不怎么优雅,很容易就忘了写。如果react()
里有多个case
语句,就会变得更复杂。我们不得不在每个case
分支调用方法。幸好还有更好的方式进行处理,我们会在10.8节,“loop
和loopWhile
”里看到相关介绍。
类似于receiveWithin()
,如果在超时时段里,没有接到任何消息,reactWithin()
就会超时——在这种情况下,如果处理case TIMEOUT
,可以采取任何想采取的行动,也可以从方法里退出。下面是一个使用reactWithin()
的例子,尝试一下之前使用receiveWithin()
实现累加器的例子,这次用reactWithin()
方法:
ConcurrentProgramming/ReactWithin.scala
import scala.actors._
import Actor._
val caller = self
def accumulate() {
var sum = 0
reactWithin(500) {
case number: Int => sum += number
accumulate()
case TIMEOUT =>
println("Timed out! Will send result now")
caller ! sum
}
println("This will not be called...")
}
val accumulator = actor { accumulate() }
accumulator ! 1
accumulator ! 7
accumulator ! 8
receiveWithin(10000) { case result => println("Total is " + result) }
上面代码的输出如下:
Timed out! Will send result now
Total is 0
这个输出可不是我们想看到的。让我们分析一下,修正这个问题。既然reactWithin()
并不返回任何值,也就不能在accumulate()
方法中reactWithin()
调用以外的部分做任何处理。所以,我们决定在reactWithin()
调用所附着的闭包里,把数加到局部变量sum
上。不幸的是,当在case
语句里调用accumulate()
时,对于新的调用而言,sum
的值是不同的,因为对每个方法调用而言,它都是局部的。因此,每次调用accumulate()
时,sum
的值都是从0
开始。但不用担心,这很容易修正。随手修正这个问题,还会让代码更加函数式,如此一来,就不必修改变量sum
了。
让我们修改例子,解决这个问题:
ConcurrentProgramming/ReactWithin2.scala
import scala.actors._
import Actor._
val caller = self
def accumulate(sum : Int) {
reactWithin(500) {
case number: Int => accumulate(sum + number)
case TIMEOUT =>
println("Timed out! Will send result now")
caller ! sum
}
println("This will not be called...")
}
val accumulator = actor { accumulate(0) }
accumulator ! 1
accumulator ! 7
accumulator ! 8
receiveWithin(10000) { case result => println("Total is " + result) }
这里,把曾经的局部变量sum
变成了函数的参数。现在,就不必修改既有变量了。每次调用accumulate()
,都会得到正确的sum
值。无需修改任何变量,就可以计算出sum
的新值,传给accumulate()
的下一个调用,直至超时。等超时以后,sum
的当前值就发给调用者。
上面代码的输出如下:
Timed out! Will send result now
Total is 16
同使用receiveWithin()
的方案比起来,这个方案更加优雅,等待接收消息时,它并不持有任何线程。
关于react()
和reactWithin()
,最后要记住的一点是,因为这两个方法并不是真的从调用里返回(记住,Scala内部通过让这些方法抛出异常来处理这个问题),放在这些方法后的任何代码都不会执行⑨(比如在accumulate()
方法末尾加上打印语句)。所以,在调用这两个方法之后,不要写任何东西。
⑨如果Scala对此给出一个不可达的错误就好了。