10.6 receivereceiveWithin方法

receive()接收一个函数值/闭包,返回一个处理消息的应答。下面是个从receive()方法接收结果的例子:

ConcurrentProgramming/Receive.scala

  1. import scala.actors.Actor._
  2. val caller = self
  3. val accumulator = actor {
  4. var sum = 0
  5. var continue = true
  6. while (continue) {
  7. sum += receive {
  8. case number : Int => number
  9. case "quit" =>
  10. continue = false
  11. 0
  12. }
  13. }
  14. caller ! sum
  15. }
  16. accumulator ! 1
  17. accumulator ! 7
  18. accumulator ! 8
  19. accumulator ! "quit"
  20. receive { case result => println("Total is " + result) }

accumulator接收数字,对传给它的数字求和。完成之后,它会发回一个消息,带有求和的结果。上面的代码输出如下:

  1. Total is 16

上面的代码告诉我们,即便receive()有着特殊的意义,它也只不过是另一个方法。不过,调用receive()会造成程序阻塞,直到实际接收到应答为止。如果预期的actor应答一直没有发过来就麻烦了。这会让我们一直等下去——一个活性失败(liveness failure)——这会让我们在同事间不得人心的。用receiveWithin()方法修正这一点,它会接收一个timeout参数,如下:

ConcurrentProgramming/ReceiveWithin.scala

  1. import scala.actors._
  2. import scala.actors.Actor._
  3. val caller = self
  4. val accumulator = actor {
  5. var sum = 0
  6. var continue = true
  7. while (continue) {
  8. sum += receiveWithin(1000) {
  9. case number : Int => number
  10. case TIMEOUT =>
  11. println("Timed out! Will return result now")
  12. continue = false
  13. 0
  14. }
  15. }
  16. caller ! sum
  17. }
  18. accumulator ! 1
  19. accumulator ! 7
  20. accumulator ! 8
  21. receiveWithin(10000) { case result => println("Total is " + result) }

在给定的超时期限内,如果什么都没收到,receiveWithin()方法会收到一个TIMEOUT消息。如果不对其进行模式匹配,就会抛出异常。在上面的代码里,接收到TIMEOUT消息当作了完成值累加的信号。输出如下:

  1. Timed out! Will return result now
  2. Total is 16

我们应该倾向于使用receiveWithin()方法而非receive()方法,避免产生活性等待的问题。

呃,关于receive()receiveWithin(),还有最后一件事要讲——它们相当勤奋,不会浪费时间在它们不关心的消息上。因为这些方法把函数值当作偏应用函数,调用代码块之前,会检查它是否处理消息。所以,如果接收到一个非预期消息,就会悄悄地忽略它。当然,如果想把忽略的消息显示出来,可以提供一个case _ => …语句。下面这个例子展示了忽略的无效消息:

ConcurrentProgramming/MessageIgnore.scala

  1. import scala.actors._
  2. import Actor._
  3. val expectStringOrInteger = actor {
  4. for(i <-1 to 4){
  5. receiveWithin(1000) {
  6. case str : String => println("You said " + str)
  7. case num : Int => println("You gave " + num)
  8. case TIMEOUT => println("Timed out!")
  9. }
  10. }
  11. }
  12. expectStringOrInteger ! "only constant is change"
  13. expectStringOrInteger ! 1024
  14. expectStringOrInteger ! 22.22
  15. expectStringOrInteger ! (self, 1024)
  16. receiveWithin(3000) { case _ =>}

在代码的最后,放了一个receiveWithin()的调用。因为主线程退出时,程序就退出了,这个语句会保证程序还活动着,给actor一个应答的机会。从输出中可以看出,actor处理了前两个发送给它的消息,忽略了后两个,因为它们没有匹配上预期的消息模式。程序最终会超时,因为没有再接收到任何可以匹配的消息。

  1. You said only constant is change
  2. You gave 1024
  3. Timed out!
  4. Timed out!