10.3 消息传递

下面看一下消息如何从一个actor传到另一个actor。每个actor都有自己的消息队列——它从InputChannel[Any]接收输入,通过OutputChannel[Any]发送输出。

想象一下,每个actor都在一个使用电话应答服务。actor离开或是不能接电话时,电话来了。错过的电话可能是朋友邀请actor去参加party,也可能是actor发给自己的提示消息。这些电话会顺序的存储在语音信箱里,方便的时候一次一个地取出。类似的,actor可以给其他actor留言。发送消息时,actor不会阻塞。不过,如果是调用receive()方法,actor就会阻塞在那里。另一方面,忙碌的actor不会被消息打断。用下面的例子理解一下这些概念:

ConcurrentProgramming/MessagePassing.scala

  1. import scala.actors.Actor._
  2. var startTime : Long = 0
  3. val caller = self
  4. val engrossedActor = actor {
  5. println("Number of messages received so far? " + mailboxSize)
  6. caller ! "send"
  7. Thread.sleep(3000)
  8. println("Number of messages received while I was busy? " + mailboxSize)
  9. receive {
  10. case msg =>
  11. val receivedTime = System.currentTimeMillis() - startTime
  12. println("Received message " + msg + " after " + receivedTime + " ms")
  13. }
  14. caller ! "received"
  15. }
  16. receive { case _ => }
  17. println("Sending Message ")
  18. startTime = System.currentTimeMillis()
  19. engrossedActor ! "hello buddy"
  20. val endTime = System.currentTimeMillis() - startTime
  21. printf("Took less than %dms to send message\n", endTime)
  22. receive {
  23. case _ =>
  24. }

上面代码的输出如下:

  1. Number of messages received so far? 0
  2. Sending Message
  3. Took less than 1ms to send message
  4. Number of messages received while I was busy? 1
  5. Received message hello buddy after 3002 ms

从输出可以看出,发送不阻塞,接收不中断。在actor调用receive()方法接收之前,消息会一直等在那里。

异步地发送和接收消息是一项好的实践——可以最大限度的利用并发。不过,如果对同步的发送消息和接收响应有兴趣,可以用!?()方法。在接收发消息的目标actor给出响应之前,它会一直阻塞在那里。这会引起潜在的死锁。一个已经失败的actor会导致其他actor的失败,然后就轮到应用失败了。所以,即便要用这个方法,至少要用有超时参数的变体,像这样:

ConcurrentProgramming/AskFortune.scala

  1. import scala.actors._
  2. import Actor._
  3. val fortuneTeller = actor {
  4. for (i <-1 to 4){
  5. Thread.sleep(1000)
  6. receive {
  7. case _ => sender ! "your day will rock! " +i
  8. //case _ => reply("your day will rock! " + i) // same as above
  9. }
  10. }
  11. }
  12. println( fortuneTeller !? (2000, "what's ahead"))
  13. println( fortuneTeller !? (500, "what's ahead"))
  14. val aPrinter = actor {
  15. receive { case msg => println("Ah, fortune message for you-" + msg) }
  16. }
  17. fortuneTeller.send("What's up", aPrinter)
  18. fortuneTeller ! "How's my future?"
  19. Thread.sleep(3000)
  20. receive { case msg : String => println("Received " + msg)}
  21. println("Let's get that lost message")
  22. receive { case !(channel, msg) => println("Received belated message "
  23. + msg) }

在超时之前,如果actor发送回消息,!?()方法就会返回结果。否则,它会返回None,所以,这个方法的返回类型是Option[Any]⑦。在上面的代码里,sender所引用的是最近一个发送消息的actor。此外,也可以用reply()方法隐式地把消息发给最近的发送者。如果想的话,还可以修改sender。假定要给一个actor发送消息,但是,想让它把结果回给另外的actor(比如上面例子里的aPrinter),就可以用send()方法。在这种情况下,应答会发给这里赋值的委托,而非真正的调用者。你也许会好奇,如果由于超时造成跳出了!?(),那么没有接收到的消息会如何处理。这个消息最终还是会由actor接收到,稍后,它会把这个消息发给自己,以便处理这条消息。用一个特殊的case类⑧:a就可以取回这条消息。这个case类表示actor发给自己的消息。因此,在继续处理其他消息时,如果需要处理丢失的消息,就可以用这个case类得到它,正如上面代码的最后一行所示。

⑦参见5.4节“Option类型”,了解Option类型的细节。

⑧对于case类的讨论,参见9.7节“使用case类进行模式匹配”。

上面代码的输出如下:

  1. Some(your day will rock! 1)
  2. None
  3. Ah, fortune message for you-your day will rock! 3
  4. Received your day will rock! 4
  5. Let's get that lost message
  6. Received belated message your day will rock! 2

现在,我们已经对actor如何交互有了一个基本的理解,稍后,会稍微深入地挖掘一下。