10.3 消息传递
下面看一下消息如何从一个actor传到另一个actor。每个actor都有自己的消息队列——它从InputChannel[Any]
接收输入,通过OutputChannel[Any]
发送输出。
想象一下,每个actor都在一个使用电话应答服务。actor离开或是不能接电话时,电话来了。错过的电话可能是朋友邀请actor去参加party,也可能是actor发给自己的提示消息。这些电话会顺序的存储在语音信箱里,方便的时候一次一个地取出。类似的,actor可以给其他actor留言。发送消息时,actor不会阻塞。不过,如果是调用receive()
方法,actor就会阻塞在那里。另一方面,忙碌的actor不会被消息打断。用下面的例子理解一下这些概念:
ConcurrentProgramming/MessagePassing.scala
import scala.actors.Actor._
var startTime : Long = 0
val caller = self
val engrossedActor = actor {
println("Number of messages received so far? " + mailboxSize)
caller ! "send"
Thread.sleep(3000)
println("Number of messages received while I was busy? " + mailboxSize)
receive {
case msg =>
val receivedTime = System.currentTimeMillis() - startTime
println("Received message " + msg + " after " + receivedTime + " ms")
}
caller ! "received"
}
receive { case _ => }
println("Sending Message ")
startTime = System.currentTimeMillis()
engrossedActor ! "hello buddy"
val endTime = System.currentTimeMillis() - startTime
printf("Took less than %dms to send message\n", endTime)
receive {
case _ =>
}
上面代码的输出如下:
Number of messages received so far? 0
Sending Message
Took less than 1ms to send message
Number of messages received while I was busy? 1
Received message hello buddy after 3002 ms
从输出可以看出,发送不阻塞,接收不中断。在actor调用receive()
方法接收之前,消息会一直等在那里。
异步地发送和接收消息是一项好的实践——可以最大限度的利用并发。不过,如果对同步的发送消息和接收响应有兴趣,可以用!?()
方法。在接收发消息的目标actor给出响应之前,它会一直阻塞在那里。这会引起潜在的死锁。一个已经失败的actor会导致其他actor的失败,然后就轮到应用失败了。所以,即便要用这个方法,至少要用有超时参数的变体,像这样:
ConcurrentProgramming/AskFortune.scala
import scala.actors._
import Actor._
val fortuneTeller = actor {
for (i <-1 to 4){
Thread.sleep(1000)
receive {
case _ => sender ! "your day will rock! " +i
//case _ => reply("your day will rock! " + i) // same as above
}
}
}
println( fortuneTeller !? (2000, "what's ahead"))
println( fortuneTeller !? (500, "what's ahead"))
val aPrinter = actor {
receive { case msg => println("Ah, fortune message for you-" + msg) }
}
fortuneTeller.send("What's up", aPrinter)
fortuneTeller ! "How's my future?"
Thread.sleep(3000)
receive { case msg : String => println("Received " + msg)}
println("Let's get that lost message")
receive { case !(channel, msg) => println("Received belated message "
+ msg) }
在超时之前,如果actor发送回消息,!?()
方法就会返回结果。否则,它会返回None,所以,这个方法的返回类型是Option[Any]
⑦。在上面的代码里,sender
所引用的是最近一个发送消息的actor。此外,也可以用reply()
方法隐式地把消息发给最近的发送者。如果想的话,还可以修改sender
。假定要给一个actor发送消息,但是,想让它把结果回给另外的actor(比如上面例子里的aPrinter
),就可以用send()
方法。在这种情况下,应答会发给这里赋值的委托,而非真正的调用者。你也许会好奇,如果由于超时造成跳出了!?()
,那么没有接收到的消息会如何处理。这个消息最终还是会由actor接收到,稍后,它会把这个消息发给自己,以便处理这条消息。用一个特殊的case
类⑧:
就可以取回这条消息。这个case
类表示actor发给自己的消息。因此,在继续处理其他消息时,如果需要处理丢失的消息,就可以用这个case
类得到它,正如上面代码的最后一行所示。
⑦参见5.4节“
Option
类型”,了解Option
类型的细节。⑧对于
case
类的讨论,参见9.7节“使用case
类进行模式匹配”。
上面代码的输出如下:
Some(your day will rock! 1)
None
Ah, fortune message for you-your day will rock! 3
Received your day will rock! 4
Let's get that lost message
Received belated message your day will rock! 2
现在,我们已经对actor如何交互有了一个基本的理解,稍后,会稍微深入地挖掘一下。