10.7 reactreactWithin方法

我们见识到了如何通过在actor间传递不变对象以避免竞争问题。现在还有一个问题需要解决。在每个actor里,调用receive()的时候实际上会要求有一个单独的线程。这个线程会一直持有,直到这个actor结束。也就是说,即便是在等待消息到达,程序也会持有这些线程,每个actor一个;这绝对是一种资源浪费。

Scala不得不持有这些线程的原因在于,控制流的执行过程中有一些具体状态。如果在调用序列里没有需要保持和返回的状态,Scala几乎就可以从线程池里获取任意线程执行消息处理——这恰恰就是使用react()所做的事情。

react()不同于其表亲receive(),它并不返回任何结果。实际上,它并不从调用中返回。调用完receive()后,就会执行紧跟着这个调用的代码(就像任何典型的函数调用一样)。不过,调用react()则不同,放在调用后的任何代码都是不可达的。或许,这会有些让人糊涂,如果稍微换个角度来看的话,就容易理解了。把调用react()想象成调用它的线程在调用之后会立即释放(背后的实现相当复杂,为了做到这一点,Scala内部让react()方法抛出异常,由调用的线程处理它)。接收到一个消息后,如果可以匹配到react()方法里一个case语句,就会从线程池分配一个线程,执行这个case体。这个线程会一直运行,直到有另一个react()调用,或是case语句中没有代码可执行了。此时,线程返回,处理其他消息,或是去做虚拟机分配的其他任务。

如果处理了react()的当前消息后,还要处理更多的消息,就要在消息处理的末尾调用其他方法。Scala会把这个调用执行交给线程池里的任意线程。看一个这种行为的例子:

ConcurrentProgramming/React.scala

  1. import scala.actors.Actor._
  2. def info(msg: String) = println(msg + " received by " + Thread.
  3. currentThread())
  4. def receiveMessage(id : Int) {
  5. for(i <-1 to 2){
  6. receiveWithin(20000) {
  7. case msg : String => info("receive: " + id + msg) }
  8. }
  9. }
  10. def reactMessage(id : Int) {
  11. react {
  12. case msg : String => info("react: " + id + msg)
  13. reactMessage(id)
  14. }
  15. }
  16. val actors = Array(
  17. actor { info("react: 1 actor created"); reactMessage(1) },
  18. actor { info("react: 2 actor created"); reactMessage(2) },
  19. actor { info("receive: 3 actor created"); receiveMessage(3) },
  20. actor { info("receive: 4 actor created"); receiveMessage(4) }
  21. )
  22. Thread.sleep(1000)
  23. for(i <- 0 to 3) { actors(i) ! " hello"; Thread.sleep(2000) }
  24. Thread.sleep(2000)
  25. for(i <- 0 to 3) { actors(i) ! " hello"; Thread.sleep(2000) }

这里,receiveMessage()使用receiveWithin()方法处理接到的消息。在这个例子里,我们处于活跃的循环里,会获得更多的消息。另一方面,reactMessage()使用react()方法,它没有处于一个whilefor循环里——相反,它会在末尾递归地调用自身。

然后,创建了四个actor,两个使用react(),两个使用receiveWithin()。最后,用相当慢的节奏,给这四个actor发了一系列消息。每个actor都报出在线程执行过程中收到的消息。

上面的代码输出如下:

  1. react: 2 actor created received by Thread[Thread-4,5,main]
  2. receive: 3 actor created received by Thread[Thread-6,5,main]
  3. react: 1 actor created received by Thread[Thread-3,5,main]
  4. receive: 4 actor created received by Thread[Thread-5,5,main]
  5. react: 1 hello received by Thread[Thread-3,5,main]
  6. react: 2 hello received by Thread[Thread-3,5,main]
  7. receive: 3 hello received by Thread[Thread-6,5,main]
  8. receive: 4 hello received by Thread[Thread-5,5,main]
  9. react: 1 hello received by Thread[Thread-4,5,main]
  10. react: 2 hello received by Thread[Thread-3,5,main]
  11. receive: 3 hello received by Thread[Thread-6,5,main]
  12. receive: 4 hello received by Thread[Thread-5,5,main]

使用receiveWithin()方法的actor具有线程关联性(thread affinity);它们会持续的使用分配给它们的同一个线程。从上面的输出中可以看到:receive:3总是由Thread-6处理,receive:4总是由Thread-5处理。

另一方面,使用react()的actor可以自由的交换彼此的线程,可以由任何可用的线程处理。从上面的输出可以看到,使用react: 1的actor最初由Thread-3执行。同一个线程碰巧还为这个actor执行了第一个消息的处理。不过,这个actor收到的第二个消息就是由不同的线程Thread-4处理。后一个线程创建了使用react:2的actor。不过,这个actor随后的消息由Thread-3处理。

换句话说,使用react()的actor不具有线程关联性;它们会放弃自己的线程,用一个新的线程(或许是同一个)进行后续的消息处理。这种做法对资源更为友善,特别是在消息处理相当快的情况下。所以,我们鼓励使用react()来代替receive()。因为线程是不确定的,所以运行上述代码的时候,你观察到的输出序列或许不同于我的。不妨多运行几次看看效果。

上面的代码有一个坏味道。调用react()方法,必须要记住在处理消息的末尾调用适当的方法。否则这个actor就不再处理任何消息。然而,这样写的调用可不怎么优雅,很容易就忘了写。如果react()里有多个case语句,就会变得更复杂。我们不得不在每个case分支调用方法。幸好还有更好的方式进行处理,我们会在10.8节,“looploopWhile”里看到相关介绍。

类似于receiveWithin(),如果在超时时段里,没有接到任何消息,reactWithin()就会超时——在这种情况下,如果处理case TIMEOUT,可以采取任何想采取的行动,也可以从方法里退出。下面是一个使用reactWithin()的例子,尝试一下之前使用receiveWithin()实现累加器的例子,这次用reactWithin()方法:

ConcurrentProgramming/ReactWithin.scala

  1. import scala.actors._
  2. import Actor._
  3. val caller = self
  4. def accumulate() {
  5. var sum = 0
  6. reactWithin(500) {
  7. case number: Int => sum += number
  8. accumulate()
  9. case TIMEOUT =>
  10. println("Timed out! Will send result now")
  11. caller ! sum
  12. }
  13. println("This will not be called...")
  14. }
  15. val accumulator = actor { accumulate() }
  16. accumulator ! 1
  17. accumulator ! 7
  18. accumulator ! 8
  19. receiveWithin(10000) { case result => println("Total is " + result) }

上面代码的输出如下:

  1. Timed out! Will send result now
  2. Total is 0

这个输出可不是我们想看到的。让我们分析一下,修正这个问题。既然reactWithin()并不返回任何值,也就不能在accumulate()方法中reactWithin()调用以外的部分做任何处理。所以,我们决定在reactWithin()调用所附着的闭包里,把数加到局部变量sum上。不幸的是,当在case语句里调用accumulate()时,对于新的调用而言,sum的值是不同的,因为对每个方法调用而言,它都是局部的。因此,每次调用accumulate()时,sum的值都是从0开始。但不用担心,这很容易修正。随手修正这个问题,还会让代码更加函数式,如此一来,就不必修改变量sum了。

让我们修改例子,解决这个问题:

ConcurrentProgramming/ReactWithin2.scala

  1. import scala.actors._
  2. import Actor._
  3. val caller = self
  4. def accumulate(sum : Int) {
  5. reactWithin(500) {
  6. case number: Int => accumulate(sum + number)
  7. case TIMEOUT =>
  8. println("Timed out! Will send result now")
  9. caller ! sum
  10. }
  11. println("This will not be called...")
  12. }
  13. val accumulator = actor { accumulate(0) }
  14. accumulator ! 1
  15. accumulator ! 7
  16. accumulator ! 8
  17. receiveWithin(10000) { case result => println("Total is " + result) }

这里,把曾经的局部变量sum变成了函数的参数。现在,就不必修改既有变量了。每次调用accumulate(),都会得到正确的sum值。无需修改任何变量,就可以计算出sum的新值,传给accumulate()的下一个调用,直至超时。等超时以后,sum的当前值就发给调用者。

上面代码的输出如下:

  1. Timed out! Will send result now
  2. Total is 16

同使用receiveWithin()的方案比起来,这个方案更加优雅,等待接收消息时,它并不持有任何线程。

关于react()reactWithin(),最后要记住的一点是,因为这两个方法并不是真的从调用里返回(记住,Scala内部通过让这些方法抛出异常来处理这个问题),放在这些方法后的任何代码都不会执行⑨(比如在accumulate()方法末尾加上打印语句)。所以,在调用这两个方法之后,不要写任何东西。

⑨如果Scala对此给出一个不可达的错误就好了。