詳解F#異步及并行模式中的輕量級代理
F#目前還有些待字閨中的意思,不過隨著大家對F#了解的加深。希望更多的程序員能運用好F#。在此之前,我們51CTO曾報道過《詳解F#異步及并行模式中的并行CPU及I/O計算》
在本系列的第3部分中,我們會來探索F#中輕量級的,交互式的代理,以及與代理有關的一些模式,包括“隔離的內部狀態”。(譯注:由于原文較長,因此譯文分為兩段,目前是第一段,講解了F#中異步代理的基本使用方式。)
第1部分描述了F#作為一個并行及異步語言,是如何支持輕量級的響應操作,并給出了CPU異步并行和I/O異步并行兩種模式。
第2部分描述了如何從異步計算或后臺計算單元中獲得結果。
模式4:您的第一個代理
我們來觀察您所創建的第一個異步代理:
- type Agent<'T> = MailboxProcessor<'T>
- let agent =
- Agent.Start(fun inbox ->
- async { while true do
- let! msg = inbox.Receive()
- printfn "got message '%s'" msg } )
這個代理不斷地異步等待消息,并將它們打印出來。在這段代碼中,每個消息都是一個字符串,且agent的類型是:
agent.Post "hello!"這便會打印出:
got message 'hello!'也可以這樣發送多條消息:
for i in 1 .. 10000 do
agent.Post (sprintf "message %d" i)
這樣便可以打印出10000條消息。
您可以認為每個代理對象都包含一個消息隊列(或管道),并在消息到達時進行響應。一個委托一般都使用異步的循環等待來消息并進行處理。如在上面的例子中,代理使用while循環進行處理。
許多讀者可能已經對代理頗為熟悉了。如Erlang,它便是基于代理設計的(在那里被稱為進程)。而不久之前,一個基于.NET平臺的實驗性的孵化型語言,Axum,也注重了基于代理編程的重要性。Axum與F#中的代理設計相互影響,而其他包含輕量級線程的語言也強調了基于代理的組合與設計。
上面的例子一開始創建了一個類型的縮寫:Agent,它代表了F#類庫中基于內存的代理類型“MailboxProcessor”。如果您愿意的話也可以使用這個完整的名字,不過我更喜歡簡單的命名。
您的第一批10萬個代理
代理對象非常輕量,這是因為它基于F#的異步編程模型。例如,您可以在一個.NET進程中創建成百上千,甚至更多個代理。例如,我們來創建10萬個簡單的代理對象:
- let agents =
- [ for i in 0 .. 100000 ->
- Agent.Start(fun inbox ->
- async { while true do
- let! msg = inbox.Receive()
- if i % 10000 = 0 then
- printfn "agent %d got message '%s'" i msg } ) ]
您可以這樣向每個代理對象發送消息:
for agent in agents do
agent.Post "ping!每第1萬個代理對象會在收到消息時打印信息。這個代理集合在處理消息時非常迅速,只要幾秒鐘時間。代理和內存中的消息處理非常快。
很顯然,代理并不與.NET線程直接對應──您不可能在單個應用程序中創建10萬的線程(在32位操作系統中,即便1000個線程也已經太多了)。相反,在代理等待消息時,它實際上只是表現為一個回調函數,一些對象分配,以及代理所引用的閉包等等。在收到消息之后,代理的工作會在一個線程池(默認便是.NET線程池)中分配并執行。
盡管需要10萬個代理的情況并不多見,不過2000多個代理倒是很正常的。接下來我們便會看到這樣一些例子。
高伸縮的Web服務器處理請求
在F#編程中,異步代理的思想其實是一種在多個環境中反復出現的設計模式。在F#中,我們經常使用“代理”這個詞表示一種隨時發生的,特別是通過循環,或是處理消息,或是產生結果的異步計算。
例如,在以后的文章中,我們會來關注如何使用F#構建伸縮性強的TCP或HTTP服務器應用程序,并將它們部署到EC2或是Windows Azure中去。這里我們打算用“股票服務器”作為例子,它接受TCP或HTTP連接,并向客戶端返回一系列的股票信息。每個客戶端會每隔一秒鐘收到一條股票信息。這個服務最終會以單個URL或REST API的形式發布。
在實現時,我們為每個客戶端請求分配一個異步代理(由于只是演示,我們在這里便不斷地寫入相同的AAPL股票信息):
- open System.Net.Sockets
- /// serve up a stream of quotes
- let serveQuoteStream (client: TcpClient) = async {
- let stream = client.GetStream()
- while true do
- do! stream.AsyncWrite( "AAPL 200.38"B )
- do! Async.Sleep 1000.0 // sleep one second}
每個代理會一直運行到客戶端連接斷開。因為代理非常輕量,因此這個股票服務能夠在一臺機器上支持數千個并發連接(如果使用云托管服務則會有更好的伸縮性)。而同一時刻會出現多少個代理對象則取決于客戶端的數量。
上面的例子演示了使用F#進行網絡編程是多么的方便──網絡協議在此變成了基于異步代理的數據流讀寫。在以后的文章中我們會觀察更多使用F#進行伸縮性強的TCP/HTTP編程。
代理與隔離狀態(命令式)
F#代理編程的一個優秀的關鍵之處便是其隔離性。隔離性則意味著資源“歸屬”與某個特定的代理,而不會暴露給其他代理。因此,獨立狀態對并發的訪問及數據競爭是一種良好的保護。
在F#中,異步代理的獨立性直接表現為文法上的作用域。例如,下面的代碼使用一個字典來累計發送至代理對象的不同消息的次數。內部的字典在文法上是異步代理私有的,因此我們無法在代理外部對字典進行讀寫。這意味著字典的可變狀態實際上是被隔離的,代理保證了對它的非并發的安全訪問。
- type Agent<'T> = MailboxProcessor<'T>
- open System.Collections.Generic
- let agent =
- Agent.Start(fun inbox ->
- async { let strings = Dictionary<string,int>()
- while true do
- let! msg = inbox.Receive()
- if strings.ContainsKey msg then
- strings.[msg] <- strings.[msg] + 1
- else
- strings.[msg] <- 0
- printfn "message '%s' now seen '%d' times" msg strings.[msg] } )
狀態隔離是F#的基本特性,它并不是代理所獨有的。例如,下面的代碼對StreamReader和ResizeArray(這是F#中對System.Collections.Generics.List的別稱)的隔離訪問。請注意這段代碼和.NET類庫中的System.IO.File.ReadAllLines方法功能相同:
- let readAllLines (file:string) =
- use reader = new System.IO.StreamReader(file)
- let lines = ResizeArray<_>()
- while not reader.EndOfStream do
- lines.Add (reader.ReadLine())
- lines.ToArray()
在這里,reader和ResizeArray對象都無法在函數外部使用。在代理或其他持續計算的情況里,隔離性是至關重要的──狀態在這里永遠獨立于程序運行中的其他部分。
說到底,隔離性是個動態的屬性,它經常受到文法的約束。例如,考慮這樣一個延遲的,隨需加載的讀取器,它會讀取文件中的所有行:
- let readAllLines (file:string) =
- seq { use reader = new System.IO.StreamReader(file)
- while not reader.EndOfStream do
- yield reader.ReadLine() }
隔離狀態經常包含可變的值,包括F#中的引用單元。例如,下面的代碼在一個引用單元中不斷累計接受到的消息個數:
- let agent =
- Agent.Start(fun inbox ->
- async { let count = ref 0
- while true do
- let! msg = inbox.Receive()
- incr count
- printfn "now seen a total of '%d' messages" !count } )
再次強調,這里可變的狀態被隔離了,確保了對它單線程的安全訪問。
在代理中使用循環和隔離狀態(函數式)
F#程序員了解兩種實現循環的方法:使用“命令式”的while/for以及可變的累加器(ref或mutable),或是“函數式”風格的調用,將累加狀態作為參數傳遞給一個或多個遞歸函數。例如,計算文件行數的程序可以使用命令式的方式來寫:
- let countLines (file:string) =
- use reader = new System.IO.StreamReader(file)
- let count = ref 0
- while not reader.EndOfStream do
- reader.ReadLine() |> ignore
- incr count
- !count或是遞歸式的:
- let countLines (file:string) =
- use reader = new System.IO.StreamReader(file)
- let rec loop n =
- if reader.EndOfStream then n
- else
- reader.ReadLine() |> ignore
- loop (n+1)
- loop 0
在使用時,兩種方式都是可行的:函數式的做法相對更加高級一些,但是大大減少了代碼中顯式的狀態修改次數,且更為通用。兩種寫法對于F#程序員來說,一般都可以理解,他們還可以將“while”循環轉化為等價的“let rec”函數(這是個不錯的面試問題!)。
有趣的是,在編寫異步循環時的規則也一樣:您可以使用命令式的“while”或函數式的“let rec”中的任意一種來定義循環。例如,這里有一個利用遞歸的異步函數統計消息數量的做法:
- let agent =
- Agent.Start(fun inbox ->
- let rec loop n = async {
- let! msg = inbox.Receive()
- printfn "now seen a total of %d messages" (n+1)
- return! loop (n+1)
- }
- loop 0 )
這樣我們便可以獲得這樣的輸出:
now seen a total of 0 messages
now seen a total of 1 messages
....
now seen a total of 10000 messages
再提一次,定義代理對象的兩種常見模式為命令式的:
- let agent =
- Agent.Start(fun inbox ->
- async {
- // isolated imperative state goes here
- ...
- while <condition> do
- // read messages and respond
- ...
- })
及函數式的:
- let agent =
- Agent.Start(fun inbox ->
- let rec loop arg1 arg2 = async {
- // receive and process messages here
- ...
- return! loop newArg1 newArg2
- }
- loop initialArg1 initialArg2)
再次強調,兩種方法在F#都是合理的──使用遞歸的異步函數可能是更高級的方法,但更為函數式且更為通用。
原文標題:F#中的異步及并行模式(3 - 上):代理
鏈接:http://www.cnblogs.com/JeffreyZhao/archive/2010/03/15/async-and-parallel-design-patterns-in-fsharp-3-agents.html
【F#教程回顧】