F#與ASP.NET:使用F#實(shí)現(xiàn)基于事件的異步模式
在之前的文章F#與ASP.NET:基于事件的異步模式與異步Action中,我們的簡(jiǎn)單討論了.NET中兩種異步模型以及它們?cè)诋惓L幚砩系膮^(qū)別,并且簡(jiǎn)單觀察了ASP.NET MVC 2中異步Action的編寫(xiě)方式。從中我們得知,ASP.NET MVC 2的異步Action并非使用了傳統(tǒng)基于Begin/End的異步編程模型,而是另一種基于事件的異步模式。
此外,ASP.NET MVC 2對(duì)于這種異步模式提供了必要的支持,使此方面的程序設(shè)計(jì)變得相對(duì)簡(jiǎn)單一些。但是,簡(jiǎn)單的原因主要還是在于已經(jīng)由其他組件提供了良好的,基于事件的異步模式。那么現(xiàn)在我們就來(lái)看看一般我們應(yīng)該如何來(lái)實(shí)現(xiàn)這樣的功能,以及F#是如何美化我們的生活的吧。
異步數(shù)據(jù)傳輸
我們?yōu)槭裁匆惒?,主要目的之一還是提高I/O操作的伸縮性。I/O操作主要是I/O設(shè)備的事情,這些設(shè)備在好好工作的時(shí)候,是不需要系統(tǒng)花心思進(jìn)行照看的,它們只要能夠在完成指定工作后通知系統(tǒng)就可以了。這也是異步I/O高效的原理,因?yàn)樗鼘⒊绦驈牡却齀/O完成的苦悶中解脫出來(lái),這對(duì)用戶或是系統(tǒng)來(lái)說(shuō)都是一件絕好的事情。
那么說(shuō)到I/O操作,最典型的場(chǎng)景之一便是數(shù)據(jù)傳輸了。比如有兩個(gè)數(shù)據(jù)流streamIn和streamOut,我們需要異步地從streamIn中讀取數(shù)據(jù),并異步地寫(xiě)入到streamOut中。在這個(gè)過(guò)程中,我們使用一個(gè)相對(duì)較小的byte數(shù)組作為緩存空間,這樣程序在進(jìn)行數(shù)據(jù)傳輸時(shí)便不會(huì)占用太多內(nèi)存。
那么,如果現(xiàn)在需要您編寫(xiě)一個(gè)組件完成這樣的數(shù)據(jù)傳輸工作,并使用標(biāo)準(zhǔn)的基于事件的異步模式釋放出來(lái),您會(huì)怎么做?基于事件的異步模式,要求在任務(wù)完成時(shí)使用事件進(jìn)行提示。同時(shí)在出錯(cuò)的時(shí)候?qū)惓?duì)象保存在事件的參數(shù)中?,F(xiàn)在我已經(jīng)幫您寫(xiě)好了這樣的事件參數(shù):
- public class CompletedEventArgs : EventArgs
- {
- public CompletedEventArgs(Exception ex)
- {
- this.Error = ex;
- }
- public Exception Error { get; private set; }
- }
那么接下來(lái)的工作就交給您了,以下代碼僅供參考:
- public class AsyncTransfer
- {
- private Stream m_streamIn;
- private Stream m_streamOut;
- public AsyncTransfer(Stream streamIn, Stream streamOut)
- {
- this.m_streamIn = streamIn;
- this.m_streamOut = streamOut;
- }
- public void StartAsync()
- {
- byte[] buffer = new byte[1024];
- this.m_streamIn.BeginRead(
- buffer, 0, buffer.Length,
- this.EndReadInputStreamCallback, buffer);
- }
- private void EndReadInputStreamCallback(IAsyncResult ar)
- {
- var buffer = (byte[])ar.AsyncState;
- int lengthRead;
- try
- {
- lengthRead = this.m_streamIn.EndRead(ar);
- }
- catch (Exception ex)
- {
- this.OnCompleted(ex);
- return;
- }
- if (lengthRead <= 0)
- {
- this.OnCompleted(null);
- }
- else
- {
- try
- {
- this.m_streamOut.BeginWrite(
- buffer, 0, lengthRead,
- this.EndWriteOutputStreamCallback, buffer);
- }
- catch (Exception ex)
- {
- this.OnCompleted(ex);
- }
- }
- }
- private void EndWriteOutputStreamCallback(IAsyncResult ar)
- {
- try
- {
- this.m_streamOut.EndWrite(ar);
- var buffer = (byte[])ar.AsyncState;
- this.m_streamIn.BeginRead(
- buffer, 0, buffer.Length,
- this.EndReadInputStreamCallback, buffer);
- }
- catch (Exception ex)
- {
- this.OnCompleted(ex);
- }
- }
- private void OnCompleted(Exception ex)
- {
- var handler = this.Completed;
- if (handler != null)
- {
- handler(this, new CompletedEventArgs(ex));
- }
- }
- public event EventHandler<CompletedEventArgs> Completed;
- }
是不是很復(fù)雜的樣子?編寫(xiě)異步程序,基本則意味著要將原本同步的調(diào)用拆成兩段:發(fā)起及回調(diào),這樣便讓上下文狀態(tài)的保存便的困難起來(lái)。幸運(yùn)的是,C#這門(mén)語(yǔ)言提供了方便好用的匿名函數(shù)語(yǔ)法,這對(duì)于編寫(xiě)一個(gè)回調(diào)函數(shù)來(lái)說(shuō)已經(jīng)非常容易了。但是,如果需要真正寫(xiě)一個(gè)穩(wěn)定、安全的異步程序,需要做的事情還有很多。
例如,一次異步操作結(jié)束之后會(huì)執(zhí)行一個(gè)回調(diào)函數(shù),那么如果在這個(gè)回調(diào)函數(shù)中拋出了一個(gè)異常那該怎么辦?如果不正確處理這個(gè)異常,輕則造成資源泄露,重則造成進(jìn)程退出。因此在每個(gè)回調(diào)函數(shù)中,您會(huì)發(fā)現(xiàn)try...catch塊是必不可少的——甚至還需要兩段。
更復(fù)雜的可能還是在于邏輯控制上。這樣一個(gè)數(shù)據(jù)傳輸操作很顯然需要循環(huán)——讀一段,寫(xiě)一段。但是由于需要編寫(xiě)成二段式的異步調(diào)用,因此程序的邏輯會(huì)被拆得七零八落,我們沒(méi)法使用一個(gè)while塊包圍整段邏輯,編寫(xiě)一個(gè)異步程序本來(lái)就是那么復(fù)雜。 #p#
編寫(xiě)簡(jiǎn)單的代理
現(xiàn)在我們已經(jīng)有了一個(gè)異步傳輸數(shù)據(jù)的組件,就用它來(lái)做一些有趣的事情吧。例如,我們可以在ASP.NET應(yīng)用程序中建立一個(gè)簡(jiǎn)單的代理,即給定一個(gè)URL,在服務(wù)器端發(fā)起這樣一個(gè)請(qǐng)求,并將這個(gè)URL的數(shù)據(jù)傳輸?shù)娇蛻舳藖?lái)。簡(jiǎn)單起見(jiàn),除了進(jìn)行數(shù)據(jù)傳輸之外,我們只需要簡(jiǎn)單地輸出Content Type頭信息即可。以下代碼僅供參考:
- public class AsyncWebTransfer
- {
- private WebRequest m_request;
- private WebResponse m_response;
- private HttpContextBase m_context;
- private string m_url;
- public AsyncWebTransfer(HttpContextBase context, string url)
- {
- this.m_context = context;
- this.m_url = url;
- }
- public void StartAsync()
- {
- this.m_request = WebRequest.Create(this.m_url);
- this.m_request.BeginGetResponse(this.EndGetResponseCallback, null);
- }
- private void EndGetResponseCallback(IAsyncResult ar)
- {
- try
- {
- thisthis.m_response = this.m_request.EndGetResponse(ar);
- thisthis.m_context.Response.ContentType = this.m_response.ContentType;
- var streamIn = this.m_response.GetResponseStream();
- var streamOut = this.m_context.Response.OutputStream;
- var transfer = new AsyncTransfer(streamIn, streamOut);
- transfer.Completed += (sender, args) => this.OnCompleted(args.Error);
- transfer.StartAsync();
- }
- catch(Exception ex)
- {
- this.OnCompleted(ex);
- }
- }
- private void OnCompleted(Exception ex)
- {
- if (this.m_response != null)
- {
- this.m_response.Close();
- this.m_response = null;
- }
- var handler = this.Completed;
- if (handler != null)
- {
- handler(this, new CompletedEventArgs(ex));
- }
- }
- public event EventHandler<CompletedEventArgs> Completed;
- }
如果說(shuō)之前的AsyncTransfer類是基于“Begin/End異步編程模型”實(shí)現(xiàn)的基于事件的異步模式,那么AsyncWebTransfer便是基于“基于事件的異步模式”實(shí)現(xiàn)的基于事件的異步模式了。嗯,似乎有點(diǎn)繞口,不過(guò)我相信這段代碼對(duì)您來(lái)說(shuō)還是不難理解的。
使用F#完成異步工作
事實(shí)上我已經(jīng)很久沒(méi)有寫(xiě)過(guò)這樣的代碼了,咎其原因還是被F#給寵壞了。嘗試了C# 2.0之后我便拋棄了Java語(yǔ)言,熟悉了C# 3.0之后我用C# 2.0就快寫(xiě)不了程序了,而使用了F#進(jìn)行異步編程之后,我就再也沒(méi)有使用C#寫(xiě)過(guò)異步操作了。那么我們就來(lái)看看F#是如何進(jìn)行異步數(shù)據(jù)傳輸?shù)陌桑?/p>
- let rec transferAsync (streamIn: Stream) (streamOut: Stream) buffer =
- async {
- let! lengthRead = streamIn.AsyncRead(buffer, 0, buffer.Length)
- if lengthRead > 0 then
- do! streamOut.AsyncWrite(buffer, 0, lengthRead)
- do! transferAsync streamIn streamOut buffer
- }
上面的代碼利用了尾遞歸進(jìn)行不斷地?cái)?shù)據(jù)傳輸,我們也可以使用傳統(tǒng)的while循環(huán)來(lái)實(shí)現(xiàn)這個(gè)功能:
- let transferImperativelyAsync (streamIn: Stream) (streamOut: Stream) buffer =
- async {
- let hasData = ref true
- while (hasData.Value) do
- let! lengthRead = streamIn.AsyncRead(buffer, 0, buffer.Length)
- if lengthRead > 0 then
- do! streamOut.AsyncWrite(buffer, 0, lengthRead)
- else
- hasData := false
- }
有了transferAsync函數(shù),編寫(xiě)一個(gè)資源請(qǐng)求的代理也是幾分鐘的事情:
- let webTransferAsync (context: HttpContextBase) (url: string) =
- async {
- let request = WebRequest.Create(url)
- use! response = request.GetResponseAsync()
- context.Response.ContentType <- response.ContentType
- let streamIn = response.GetResponseStream()
- let streamOut = context.Response.OutputStream
- let buffer = Array.zeroCreate 1024
- do! transferAsync streamIn streamOut buffer
- }
沒(méi)錯(cuò),就是這么簡(jiǎn)單,這就是F#中編寫(xiě)異步任務(wù)方式。在執(zhí)行這兩個(gè)函數(shù)時(shí)(當(dāng)然確切地說(shuō),是執(zhí)行這兩個(gè)函數(shù)所生成的異步工作流),便會(huì)在出現(xiàn)“感嘆號(hào)”的操作之處自動(dòng)分成二段式的異步調(diào)用,但是在程序的寫(xiě)法上和同步代碼可謂毫無(wú)二致。 #p#
使用F#實(shí)現(xiàn)基于事件的異步模式
當(dāng)然,光有上面的代碼還不夠,因?yàn)檫@樣的代碼無(wú)法交給C#代碼來(lái)使用,我們還需要將它們封裝成基于事件的異步模式。不過(guò)這也非常簡(jiǎn)單,使用一個(gè)通用的抽象基類即可:
- [<AbstractClass>]
- type AsyncWorker(asyncWork: Async) =
- let completed = new Event()
- [<CLIEvent>]
- member e.Completed = completed.Publish
- member e.StartAsync() =
- Async.StartWithContinuations
- (asyncWork,
- (fun _ -> completed.Trigger(new CompletedEventArgs(null))),
- (fun ex -> completed.Trigger(new CompletedEventArgs(ex))),
- (fun ex -> ex |> ignore))
在使用F#進(jìn)行面向?qū)ο箝_(kāi)發(fā)時(shí),由于不需要C#的架子代碼,它實(shí)現(xiàn)相同的結(jié)構(gòu)一般都會(huì)顯得緊湊不少(不過(guò)在我看來(lái),C#在進(jìn)行一般的命令式編程時(shí)還是比F#來(lái)的方便一些)。在StartAsync方法中,我們使用Async.StartWithContinuations發(fā)起一個(gè)異步工作流,而這個(gè)異步工作流便是從構(gòu)造函數(shù)中傳入的具體任務(wù)。StartWithContinuations方法的后三個(gè)參數(shù)分別是成功時(shí)的回調(diào),失敗后的回調(diào),以及任務(wù)取消后的回調(diào)。您可能會(huì)說(shuō),難道F#中不需要異常處理,不需要資源釋放嗎?當(dāng)然需要。只不過(guò):
1) 異常處理已經(jīng)由StartWithContinuations統(tǒng)一完成了,我們只要按照“同步式”代碼的寫(xiě)法編寫(xiě)邏輯,也就是說(shuō),從語(yǔ)義上說(shuō)您可以看作存在一個(gè)巨大的try...catch圍繞著整段代碼。
2) 而對(duì)于資源釋放來(lái)說(shuō),您可以發(fā)現(xiàn)在webTransferAsync方法中有一個(gè)use!指令,這便是告訴F#的異步框架,在整個(gè)異步工作流結(jié)束之后需要調(diào)用這個(gè)資源的Dispose方法——沒(méi)錯(cuò),您可以把它看作是一種能在異步環(huán)境下工作的C# using關(guān)鍵字。有了AsyncWorker類之后,AsyncTransfer和WebAsyncTransfer類也可輕易實(shí)現(xiàn)了:
- type AsyncTransfer(streamIn: Stream, streamOut: Stream) =
- inherit AsyncWorker(
- Transfer.transferAsync streamIn streamOut (Array.zeroCreate 1024))
- type AsyncWebTransfer(context: HttpContextBase, url: string) =
- inherit AsyncWorker(Transfer.webTransferAsync context url)最后,只要在ASP.NET MVC中使用即可:
- public void LoadFsAsync(string url)
- {
- AsyncManager.OutstandingOperations.Increment();
- var transfer = new FSharpAsync.AsyncWebTransfer(HttpContext, url);
- transfer.Completed += (sender, args) =>
- AsyncManager.OutstandingOperations.Decrement();
- transfer.StartAsync();
- }
- public ActionResult LoadFsCompleted()
- {
- return new EmptyResult();
- }
事實(shí)上,在ImageController中我還提供了一個(gè)LoadAsync及對(duì)應(yīng)的LoadCompleted方法,它們使用的是利用C#實(shí)現(xiàn)的AsyncWebTransfer類。猜猜看這樣的代碼長(zhǎng)成什么樣?其實(shí)只是將上面的AsyncWebTransfer的命名空間改成CSharpAsync而已——F#與其它.NET代碼是真正做到無(wú)縫集成的。
總結(jié)
這便是F#的偉大之處。時(shí)常有朋友會(huì)問(wèn)我為什么對(duì)F#有那么大的興趣,我想,如果借助F#可以用十分之一的時(shí)間,十分之一的代碼行數(shù),寫(xiě)出執(zhí)行效果相同,但可維護(hù)性高出好幾倍的程序來(lái)。
【編輯推薦】