一.本章要點
- 每個actor都要擴展Actor類並提供act方法
- 要往actor發送消息,可以用actor!message
- 消息發送是異步的:”發完就忘“
- 要接受消息,actor可以調用receive或react,通常是在循環中這樣做
- receive/react的參數是有case語句組成的代碼塊(偏函數)
- 不同actor之間不應該共享狀態。總是使用消息來發送數據
- 不要直接調用actor的方法,通過消息進行通信
- 避免同步消息——將發送消息和等待響應分開
- 不同actor可以通過react而不是receive來共享線程,前提是消息處理器的控制流轉足夠簡單
- 讓actor掛掉是OK的,前提是你有其他actor監控着actor的生死。用鏈接來設置監控關係
二.創建和啓動Actor
actor是擴展自Antor特質的類。該特質有一個抽象方法act。可以重寫這個方法指定avtor的行為。act方法帶有一個消息循環,例:
import scala.actors.Actor
class HiActor extends Actor{
def act(){while(true){
receive{
case "Hi"=>println("Hello")}
}}
}
//調用start方法執行
val actor1=new HiActor
actor1.start()
act方法與Java中Runable接口中的run方法很相似,正如不同線程的run方法那樣,不同actor的act方法也是並行運行的,對響應信息做了優化。
臨時創建actor,利用Actor伴生對象的actor方法創建和啓動actor:
import scala.actors.Actor._
val actor=actor{
while(true){
receive{case "Hi"=>println("Hello")
}
}
}
三.發送消息
actor是一個處理異步消息的對象。
消息可以是任何對象。
使用!操作符發送小心,消息被髮送當前線程繼續執行——”發完就忘“(可以等待一個回覆),一個好的做法是使用樣例類作為消息,讓actor使用模式匹配來處理消息。
四.接受消息
發送到actor的消息被存放在一個”郵箱“中,receive方法從郵箱獲取下一條信息並將它傳遞給它的參數,該參數是一個偏函數。例:
receive {
case Deposit(amount)=>....
case Withdraw(amount)=>...
}
//receive的參數是{case Deposit(amount)=>....
//case Withdraw(amount)=>...
//}
該代碼塊被轉換成一個類型為PartialFunction[Any,T]的對象,其中T是case語句=>操作符右邊的表達式的計算結果的類型。這是個偏函數,因為它對那些能夠匹配其中一個case語句的參數有定義;
注:消息傳遞的過程是異步的。(到達順序不確定,設計時應讓應用程序不要依賴任何特定的消息投遞順序);
receive方法被調用時並沒有消息,則該調用阻塞,直到有消息抵達,如果郵箱中沒有任何消息可以被偏函數處理,則對receive方法的調用也會阻塞,直到一個可以匹配的消息抵達;
郵箱有可能被那些不與任何case語句匹配的消息佔滿。添加一個case _語句來處理任意的消息;
郵箱會串行化信息:actor運行在單個線程中(不用擔心爭用狀況);
actor可以很安全的修改自己的數據(但如果修改了在不同actor之間共享的數據,那麼爭用情況就有可能出現),因此不要在不同的actor中使用共享對象(除非是線程安全的)
五.向其他Actor發送消息
當運算被拆分到不同的actor來並行處理問題的各個部分時,這些處理結果需要被收集到一起。actor可以將結果存入到一個線程安全的數據結構當中(如併發的哈希映射),但actor模型並不鼓勵使用共享數據。因此當actor計算結果後,應該向另一個actor發送消息。
actor是如何知道應該往哪裏發送計算結果:
1.可以有一些全局的actor(當actor數量很多時,這個方案的伸縮性不好);
2.actor可以構造成帶有指向一個或多個actor的引用(當antor持有另一個actor的引用時,它只應該使用這個引用來發送消息,而不是調用方法。這樣做違背了actor精神,同時還可能引發爭用狀況——這正是actor設計出來要避免的問題);
3.actor可以接收帶有指向另一個actor的引用消息。在請求中提供一個actor引用是很常見的做法,例:actor ! Compute(date,continuation)
4.actor可以返回消息給發送方,receive方法會把sender字段設為當前消息的發送方
六.消息通道
除了在應用程序中對actor共享引用的做法,還可以共享消息通道給它們。
優點:1.消息通道是類型安全的——你只能發送或接受某個特定類型的消息;2.不會不小心通過消息通道調用到某個actor的方法。
消息通道可以是一個OutputChannel(帶有!方法),也可以是一個InputChannel(帶有receive或react方法)。Channel類同時擴展OutputChannel和InputChannel特質。
構造一個消息通道(需要提供一個actor):val channel=new Channel[Int][someActor]
如果不提供構造參數,那麼消息通道就會綁定到當前執行這個actor上:
case class Compute(input:Seq[Int],result:OutputChannel[Int])
class Computer extends Actor{
public void act(){
while(true){
receive{
case Compute(input,output)=>{val anwser=...;out!anwser}
}}
}
}
actor {
val channel=new Channel[Int]
val computeActor:Computer=...
val input:Seq[Double]=...
computeActor!Compute(input,channel)
channel.receive{
case x=>...//已知x是一個Int
}
}
注意:這裏調用的是channeldereceive而不是actor自己,如果想要通過actor來接收相應,可以匹配一個!樣例類的實例
七.同步消息和Future
actor可以發送一個消息並等待回覆,用!?操作符即可,例:
val reply=account!?Deposit(1000)
reply match{
case Balance(bal)=>println("Current Balance:"+bal)
}
要讓它工作,接收方必須返回一個消息給發送方:
receive {
case Deposit(amount)=>{
//除了用sender!Balance(balance),也可以寫reply(Balance(balance))
balance+=amount;sender!Balance(balance)
...
}
}
注:同步消息容易引發死鎖,通常而言,最好避免在actor的act方法裏執行阻塞調用。
使用receiveWithin方法指定需要等待多少毫秒的時間(不想對一個回覆永久等待),如果在指定時間沒有收到消息,就會收到一個Actor.TIMEOUT對象,例:
actor {
worker !Task(data,self)
receiveWithin(secends*1000){
case Result(data)=>,,,
case TIMEOUT=>lof(...)
}
}
注:react方法也有一個帶時間的版本,稱做reactWithin。
除了等待對方返回結果之外,也可以選擇接收一個future——這是一個將在結果可用時產出結果的對象,使用!!方法得到:val replyFuture=ammount!!Deposit(1000),isSet方法會檢查結果是否可用,要接受結果,使用函數調用的表示法:val replyFuture=replyFuture(),這個調用將會阻塞,直到消息被髮送。注:如果立即想從future接受返回的結果,那麼你並沒有享受到任何優於同步方法調用的好處,不過,actor可以將future放到一邊,稍後再做處理,或將它交給其他actor。
八.共享線程
上述在Scala中,react方法接受一個偏函數,並將它添加到郵箱,然後退出,例:
react {//偏函數f1
case Withdraw(amount)=>{
react {//偏函數f2
case Confirm()=>{
println("Confirming"+amount)
}
}
}
}
//第一個react的調用將f1與actor的郵箱關聯起來,然後退出。當Withdraw消息抵達時,f1被調用,偏函數f1也調用react,這次調用把f2與actor的郵箱關聯起來,然後退出,當Confirm消息抵達時,f2被調用
注:第二個react可能在一個單獨的函數中,因此,react需要拋出異常,從而得以退出。
關聯的偏函數不會返回一個值——它執行了某些工作,然後執行下一個react,就造成退出了。退出意味着返回到協調的actor的方法中,這樣的一個函數的返回類型為Nothing,該類型用來表示正常退出。
//由於react會退出,因此不能簡單的把它放在while循環中,如:
def act(){
while(true){
react{//偏函數
case Withdraw(amount)=>println("Withdrawing"+amount)
}
}
}
//當act被調用時,對react的調用將f1與郵箱關聯起來,然後退出,當f1被調用時,將會處理這條消息
{case Withdraw(amount)=>println("Withawing "+amount)}
//解決方法一:在消息處理器中再次調用act方法(無窮遞歸替換掉無窮循環,這個無窮遞歸不會佔用很大的棧空間,每次對react的調用都會拋出異常從而清棧)
def act(){
react{//偏函數
case Wirhdraw(amount)=>{
println("Withawing"+amount)
act()
}
}
}
//讓每個消息處理器自己負責保持循環繼續進行下去看上去並不很公平,於是有一些”控制流轉組合子“可以自動產出這些循環
//loop組合子可以製作一個無窮循環
def act(){
loop{
react{
case Withdraw(amount)=>process(amount)}
}}
//如果需要一個循環條件,可以用loopWhile
loopWhile(count>max){
react{
...}
}
//eventloop方法可以製作一個無窮循環套react的簡化版,不過提前是偏函數不會再次調用react
def act(){
eventloop{
case Withdraw(amount)=>println("Withdrawing"+amount)
}
}
九.Actor的生命週期
actor的act方法在actor的start方法被調用時開始執行。actor接下來做的事情是進入某個循環,例:
def act(){
while(...){
receive{
...}
}
}
//actor在如下情形之一會終止執行:
/×
1.act方法返回
2.act方法由於異常被終止
3.actor調用exit方法
×/
注意:exit方法是個受保護的方法。它只能被Actor的子類中調用(其他方法不能調用exit方法來終止一個actor)。例:
//其他方法也不能調用exit()來終止
val actor1=actor {
while(true){
receive{
case "Hi"=>println("Hello")
case "Bye"=>exit()
}
}
}
還有一個exit方法的重載版本可以接受一個參數描述退出原因。不帶參數調用exit相當於exit('normal)。
當actor因一個異常終止時,退出原因就是UncaughtException樣例類的一個實例,該樣例類有如下屬性:
- actor:拋出異常的actor
- message:Some(msg),其中msg是該actor處理的最後一條消息或者None,如果actor在沒來得及處理任何消息之前就掛掉的話;
- sender:Some(channel),其中channel是代表最後一條消息的發送方的輸出消息通道或者None,如果actor在沒來得及處理任何消息之前就掛掉的話;
- thread:actor退出時所在的線程;
- case:相應的異常
十.將多個Actor鏈接在一起
將兩個actor鏈接在一起,則另一個都會在另一個終止執行的時候得到通知(調用link方法即可):
def act(){
link(master)
}
鏈表是雙向的(如監管actor在數個工作actor中分發工作任務,那麼當某個工作actor掛掉的時候,監管actor應該知道,以便相應的工作任務可以重新指派,反過來,如果監控actor掛掉了,工作actor也應該知道,以便可以停止工作)
注:儘管鏈接是雙向的,但link方法並不是對稱的,不能呢將link(worker)替換成worker.link(self),該方法必須由請求鏈接的actor調用。
默認情況下,只要當前actor鏈接到的actor中有一個非’normal原因退出,當前actor就會終止(在這種情況下,退出原因和鏈接到那個actor的退出原因相同)。
actor可以改變這個行為,做法是設置trapExit為true,修改後mactor會接受到一個類型為Exit的消息,該消息包含了那個正在終止的actor和退出的原因。例:
override def act(){
trapExit=true
link(wprker)
while(...){
receive{
...
case Exit(linked,UncaughtException(_,_,_,case))=>...
case Exit(linked,reason)=>...
}
}
}
在操作actor時,允許它們掛掉是正常的。只需要把每個actor鏈接到一個”監管“actor,由它來處理失敗的actor,例把工作重新分配或者重新啓動。
十一.Actor的設計
使用和操作actor注意:
十二.練習