F#中的異步及并行模式:代理的高級使用
本文我們會來探索F#函數式編程語言的異步及并行模式、交互式的代理,以及與代理有關的一些模式,包括隔離的內部狀態。
消息與聯合類型
很多時候我們會使用聯合類型(Union Type)作為消息的類型。例如,我將要展示一個基于代理的DirectX示例,我們要在模擬引擎中使用如下的消息:
- type Message =
- | PleaseTakeOneStep
- | PleaseAddOneBall of Ball 模擬引擎中的代理:
- let simulationEngine =
- Agent.Start(fun inbox ->
- async { while true do
- // Wait for a message
- let! msg = inbox.Receive()
- // Process a message
- match msg with
- | PleaseTakeOneStep -> state.Transform moveBalls
- | PleaseAddOneBall ball -> state.AddObject ball })
在很多情況下使用強類型消息是個不錯的做法。不過,在某些您需要和其他消息機制協作的時候,也無需擔心使用如“obj”和“string”等泛化的消息類型,此時代理只需要在運行時進行類型判斷或轉化即可。
參數化代理及抽象代理
代理只是F#編碼中的一種設計模式。這意味著您可以將F#中各種常用的技巧,如參數化,抽象或是代碼片段重用與代理一起使用。例如,您可以把之前的serveQuoteStream函數參數化,指定每條股票消息傳輸中的間隔時間:
- open System.Net.Sockets
- /// serve up a stream of quotes
- let serveQuoteStream (client: TcpClient, periodMilliseconds: int) = async {
- let stream = client.GetStream()
- while true do
- do! stream.AsyncWrite( "AAPL 439.2"B )
- do! Async.Sleep periodMilliseconds
- }
- 這意味著您的股票服務器中不同的請求可以擁有不同長度的間隔。與此類似,您可以使用函數參數,將整個代理類的功能進行抽象:
- let iteratingAgent job =
- Agent.Start(fun inbox ->
- async { while true do
- let! msg = inbox.Receive()
- do! job msg })
- let foldingAgent job initialState =
- Agent.Start(fun inbox ->
- let rec loop state = async {
- let! msg = inbox.Receive()
- let! state = job state msg
- return! loop state
- }
- loop initialState)您可以這樣使用***個函數:
- let agent1 = iteratingAgent (fun msg -> async { do printfn "got message '%s'" msg }) 及第二個:
- let agent2 =
- foldingAgent (fun state msg ->
- async { if state % 1000 = 0 then printfn "count = '%d'" msg;
- return state + 1 }) 0 從代理返回結果
在以后的文章中,我們會討論一些訪問執行中的代理的部分結果的技巧,例如,我們可以使用每個MailboxProcessor代理的PostAndAsyncReply方法。這樣的技巧在創建網絡通信代理時顯得尤其重要。
然而,這種做法很多時候有些過了,我們可能只是需要將結果匯報給一些如GUI般的監視環境。匯報部分結果的簡單方法之一,便是之前在第二篇文章中討論過的設計模式。下面便是這樣一個例子,它創建了一個代理,對每1000條消息進行采樣,并將得到的事件分發給GUI或其他管理線程(請注意,其中用到了第二篇文章中SynchronizationContext的兩個擴展方法CaptureCurrent和RaiseEvent)。
- // Receive messages and raise an event on each 1000th message
- type SamplingAgent() =
- // The event that is raised
- // Capture the synchronization context to allow us to raise events
- // back on the GUI thread
- let syncContext = SynchronizationContext.CaptureCurrent()
- // The internal mailbox processor agent
- let agent =
- new MailboxProcessor<_>(fun inbox ->
- async { let count = ref 0
- while true do
- let! msg = inbox.Receive()
- incr count
- if !count % 1000 = 0 then
- syncContext.RaiseEvent sample msg })
- /// Post a message to the agent
- member x.Post msg = agent.Post msg
- /// Start the agent
- member x.Start () = agent.Start()
- /// Raised every 1000'th message
- member x.Sample = sample.Publish您可以這樣使用代理:
- let agent = SamplingAgent()
- agent.Sample.Add (fun s -> printfn "sample: %s" s)
- agent.Start()
- for i = 0 to 10000 do
- agent.Post (sprintf "message %d" i) 與預料一致,這會報告agent的消息采樣:
- sample: message 999
- sample: message 1999
- sample: message 2999
- sample: message 3999
- sample: message 4999
- sample: message 5999
- sample: message 6999
- sample: message 7999
- sample: message 8999
- sample: message 9999
#p#
代理及錯誤
我們都無法避免錯誤和異常。良好的錯誤檢測,報告及記錄的措施是基于代理編程的基本要素。我們來看一下如何在F#的內存代理(MailboxProcessor)中檢測和轉發錯誤。
首先,F#異步代理的神奇之處在于異常可以由async { ... }自動捕獲及分發,即使跨過多個異步等待及I/O操作。您也可以在async { ... }中使用try/with,try/finally及use關鍵字來捕獲異常或釋放資源。這意味著我們只需要在代理中處理那些未捕獲的錯誤即可。當MailboxProcessor代理中出現未捕獲的異常時便會觸發Error事件。一個常見的模式是將所有的錯誤轉發給一個監視進程,例如:
- type Agent<'T> = MailboxProcessor<'T>
- let supervisor =
- Agent<System.Exception>.Start(fun inbox ->
- async { while true do
- let! err = inbox.Receive()
- printfn "an error occurred in an agent: %A" err })
- let agent =
- new Agent<int>(fun inbox ->
- async { while true do
- let! msg = inbox.Receive()
- if msg % 1000 = 0 then
- failwith "I don't like that cookie!" })
- agent.Error.Add(fun error -> supervisor.Post error)
- agent.Start() 我們也可以很方便地并行這些配置操作:
- let agent =
- new Agent<int>(fun inbox ->
- async { while true do
- let! msg = inbox.Receive()
- if msg % 1000 = 0 then
- failwith "I don't like that cookie!" })
- |> Agent.reportErrorsTo supervisor
- |> Agent.start 或使用輔助模塊:
- module Agent =
- let reportErrorsTo (supervisor: Agent<exn>) (agent: Agent<_>) =
- agent.Error.Add(fun error -> supervisor.Post error); agent
- let start (agent: Agent<_>) = agent.Start(); agent
下面是一個例子,我們創建了10000個代理,其中某些會報告錯誤:
- let supervisor =
- Agent<int * System.Exception>.Start(fun inbox ->
- async { while true do
- let! (agentId, err) = inbox.Receive()
- printfn "an error '%s' occurred in agent %d" err.Message agentId })
- let agents =
- [ for agentId in 0 .. 10000 ->
- let agent =
- new Agent<string>(fun inbox ->
- async { while true do
- let! msg = inbox.Receive()
- if msg.Contains("agent 99") then
- failwith "I don't like that cookie!" })
- agent.Error.Add(fun error -> supervisor.Post (agentId,error))
- agent.Start()
- (agentId, agent) ]我們發送消息:
- for (agentId, agent) in agents do
- agent.Post (sprintf "message to agent %d" agentId ) 便可看到:
- an error 'I don't like that cookie!' occurred in agent 99
- an error 'I don't like that cookie!' occurred in agent 991
- an error 'I don't like that cookie!' occurred in agent 992
- an error 'I don't like that cookie!' occurred in agent 993
- ...
- an error 'I don't like that cookie!' occurred in agent 999
這一節我們處理了F#內存中的MailboxProcessor代理發生的錯誤。其他一些代理(例如,表示服務器端請求的代理)也可以這樣進行設計與架構,以便進行優雅的錯誤轉發及重試。
總結
隔離的代理是一種常用的編程模式,它不斷運用在各種編程領域中,從設備驅動編程到用戶界面,還包括分布式編程及高度伸縮的通信服務器。每次您編寫了一個對象,線程或是異步工作程序,用于處理一個長時間的通信(如向聲卡發送數據,從網絡讀取數據,或是響應一個輸入的事件流),您其實就是在編寫一種代理。每次您在寫一個ASP.NET網頁處理程序時,其實您也在使用一種形式的代理(每次調用時都重置狀態)。在各種情況下,隔離與通信有關的狀態是很常見的需求。
隔離的代理是一種最終的實現方式──例如,實現可伸縮的編程算法,包括可伸縮的請求服務器及分布式編程算法。與其他各種異步及并發編程模式一樣,它們也不能被濫用。然而,他們是一種優雅、強大且高效的技術,使用非常廣泛。
F#是一個獨特的,隨Visual Studio 2010一同出現的托管語言,完整支持輕量級的異步計算及內存種的代理。在F#中,異步代理可以通過組合的形式編寫,而不用使用回調函數或控制反轉等方式。這里有些權衡的地方──例如:在以后的文章中,我們會觀察如何使用.NET類庫中標準的APM模式來釋放您的代理。然而,優勢也是很明顯的:易于控制,伸縮性強,并且在需要的時候,便可以在組織起CPU和I/O并行操作的同時,保持CPU密集型代碼在.NET中的完整性能。
當然,也有其他一些.NET或基于JVM的語言支持輕量級的交互式代理──早前,有人認為這在.NET是“不可能”的事情,因為線程的代價十分昂貴。而如今,F#在2007年引入了“async { ... }”,這被視為語言設計上的一個突破──它讓程序員可以在一個被業界廣泛認可的編程平臺上構建輕量級、組合式的異步編程及交互式的代理。除了Axum語言原型(它也受了F#的影響)之外,F#還證明了一個異步語言特性是一個完全可行的方法,這也解放了如今業界運行時系統設計領域的一個爭論話題:我們是否要將線程做得輕量?
文章轉自老趙的博客,
原文地址:http://blog.zhaojie.me/2010/03/async-and-parallel-design-patterns-in-fsharp-3-more-agents.html
【編輯推薦】
- 詳解F#異步及并行模式中的輕量級代理
- 詳解F#異步及并行模式中的并行CPU及I/O計算
- TechED 09視頻專訪:F#與函數式編程語言
- F#中DSL原型設計:語法檢查和語義分析
- 大話F#和C#:是否會重蹈C#失敗的覆轍?