徹底搞懂 RSocket 協議
我們知道,常見的 RESTful Web 服務采用的都是基于 HTTP 協議實現的請求 - 響應式交互方式。這種交互方案很簡單,但是因為只支持單一的交互方式,也就無法應對所有日常開發需求場景,例如服務端主動向客戶端推送數據。
請求 - 響應模式
那么,能不能在網絡協議層上具備更加豐富的交互方式呢?答案是肯定的,這就是我們今天要討論的 RSocket 協議。RSocket 協議提供了多種客戶端和服務端之間的交互方式,它并不是對 HTTP 協議的補充,而是一個基于響應式編程技術的全新的、高性能網絡通訊協議。
在引入 RSocket 協議之前,你必須要了解為什么需要這樣一個協議,讓我們從傳統的請求 - 響應模式所存在的問題開始說起。
RSocket 協議解決了什么問題?
請求 - 響應模式的問題
請求 - 響應模式的一個問題是,高并發場景下,性能和響應性上存在瓶頸。因為整個處理過程是同步阻塞的,如果某個請求的響應時間過長,會導致其他請求無法及時響應。這一點我之前在之前的響應式編程一課中有詳細講解。
更重要的是,對很多應用場景來說,HTTP 協議提供的請求 - 響應模式是不合適的。典型的例子是消息推送,如果客戶端需要獲取最新的推送消息,就必須使用輪詢,客戶端不停的發送請求到服務器來檢查更新,無疑這會造成了大量資源浪費。
客戶端輪詢模式
你可能會說,我們可以使用服務器發送事件(Server-Sent Events,SSE)技術實現從服務端向客戶端推送消息。不過,SSE 也是一個構建在 HTTP 協議上的處理機制,一般只用來傳送文本,提供的功能非常有限。
服務器發送事件
幸運的是,業界認識到了異步、多向交互通信的必要性。在 2015 年,RSocket 協議就在這樣的背景下誕生了。
RSocket 協議的解決方案
RSocket 是一種語言無關的二進制網絡協議,用來解決現有網絡傳輸協議存在的單一請求 - 響應模式以及性能問題。那么,它是怎么解決這個問題的呢?
RSocket 以異步消息的方式提供 4 種交互模式,除了請求 - 響應(request/response)模式之外,還包括請求 - 響應流(request/stream)、即發即棄(fire-and-forget)和通道(channel)這三種新的交互模式。
RSocket 協議的四種交互模式
我們來看這四種交互模式的特點:
請求 - 響應模式:這是最典型也最常見的模式。發送方在發送消息給接收方之后,等待與之對應的響應消息
請求 - 響應流模式:發送方的每個請求消息,都對應接收方的一個消息流作為響應
即發即忘模式:發送方的請求消息沒有與之對應的響應
通道模式:在發送方和接收方之間建立一個雙向傳輸的通道
我們可以從請求與響應的數量對應關系來對上述四種交互模式做一個總結。
可以看到,當我們選擇具體的交互模式時,請求 - 響應、請求 - 響應流和即發即忘這三種交互模式的請求數量都是 1,并能夠獲取不同數量的響應結果。我們就可以根據這一特性來重構現有的請求處理過程。而通道模式則比較特殊,它的請求數量和響應數量都是 N,決定了我們可以選擇它來應對雙向的數據流處理場景。
RSocket 協議專門設計用來和響應式編程技術風格的應用程序配合使用,在使用 RSocket 協議時,響應式編程體系中的數據流機制仍然有效。
為了更好的理解 RSocket 協議,我們對比它和 HTTP 協議。在交互模式上,與 HTTP 的請求 - 響應這種單向交互模式不同,RSocket 倡導的是對等通信。這種對等通信不是傳統單向交互模式的改進,而是在客戶端和服務端之間可以自由的相互發送和處理請求。
RSocket 協議的交互方式
另一方面,從性能上講,我們知道 HTTP 協議為了兼容各種應用方式,本身有一定的復雜性和冗余性,性能一般。而 RSocket 采用的是自定義二進制協議,本身的定位就是高性能通訊協議,性能上比 HTTP 高出一個數量級。
如何正確使用 RSocket 協議?
到這里,我們就明白了 RSocket 協議是要解決什么問題,以及是如何解決的了。接下來,我們具體看看如何正確使用 RSocket 協議。
RSocket 接口
我們先來看一下 RSocket 協議中最核心的接口,即 RSocket 接口的定義,如下所示。
public interface RSocket extends Availability, Closeable {
//推送元信息,數據可以自定義
Mono<Void> metadataPush(Payload payload);
//請求-響應模式,發送一 個請求并接收一個響應
Mono<Payload> requestResponse(Payload payload);
//即發-即忘模式,請求-響應的優化,在不需要響應時非常有用
Mono<Void> fireAndForget(Payload payload);
//請求-響應流模式,類似于返回集合的請求/響應,集合將以流的方式返回,而不是等到查詢完成
Flux<Payload> requestStream(Payload payload);
//通道模式,允許任意交互模型的雙向消息流
Flux<Payload> requestChannel(Publisher<Payload> payloads);
}
顯然,RSocket 接口通過四個方法分別實現了它所提供的四種交互模式,這里的 Payload 代表的就是一種消息對象,由兩部分組成,即元信息 metadata 和數據 data,類似于常見的消息通信中的消息頭和消息體的概念。
同時,你注意到嗎,這里出現了兩個新的數據結構 Mono 和 Flux。在響應式編程中,Mono 代表只包含 0 個或 1 個元素的數據流,而對應的 Flux 則是一個包含 0 到 n 個元素的數據流。
所以 fireAndForget() 方法返回的是一個 Mono 流,符合即發 - 即棄模式的語義。而 requestStream() 作為請求 - 響應流模式的實現,與 requestResponse() 的區別在于它的返回值是一個 Flux 流,而不是一個 Mono 對象。而且,RSocket 提供的請求 - 響應模式也比 HTTP 更具優勢,因為它是異步且多路復用的。
另一方面,與其他方法不同,requestChannel() 方法返回的并不是一個 Payload 消息對象,而是一個代表響應式流的 Publisher 對象,意味著這種模式下的輸入輸出都是響應式流,也就是說可以實現客戶端和服務端之間的雙向交互,這也和通道模式的定義一致。
RSocket 與框架集成
RSocket 接口過于底層,開發人員需要考慮服務器端和客戶端的具體構建方式以及手工實現遠程調用的細節。所以,我通常不建議直接使用 RSocket 接口進行應用程序的開發,而是傾向于借助特定的開發框架。如果你使用的是 Spring Boot 框架,就可以構建如下所示一個簡單 Controller:
@Controller
public class HelloController {
@MessageMapping("hello")
public Mono<String> hello(String name) {
return Mono.just("Hello: " + name);
}
}
這里引入了一個新的注解@MessageMapping,類似 Spring MVC 中的@RequestMapping 注解,@MessageMapping 是,Spring 中提供,用來指定 RSocket 協議中消息處理的目的地。然后,我們輸入了一個 String 類型的參數并返回一個 Mono 對象,符合請求 - 響應交互模式的定義。
為了訪問這個 RSocket 端點,我們需要構建一個 RSocketRequester 請求對象。基于該對象,我們就可以通過它的 route() 方法路由到前面通過@MessageMapping 注解構建的"hello"端點,如下所示:
Mono<String> response = requester.route("hello")
.data("Geektime")
.retrieveMono(String.class);
我們再來看一個請求 - 響應流的示例,如下所示:
@MessageMapping("stream")
Flux<Message> stream(Request request) {
return Flux
.interval(Duration.ofSeconds(1))
.map(index -> new Message(request.getParam, index));
}
這里我們根據輸入的 Request 對象,返回一個 Flux 流,每一秒發送一個新的 Message 對象。
如果你想在其他框架中使用 RSocket 協議,也有很多選擇。Dubbo 在 3.0.0-SNAPSHOT 版本里基于 RSocket 對響應式編程提供了支持,開發人員可以非常方便的使用 RSocket 的 API。而隨著 Spring 框架的持續升級,5.2 版本中也把 RSocket 作為缺省的網絡通信協議。
目前,RSocket 協議的應用已經越來越廣泛。相信隨著這項技術的不斷成熟,日常開發過程中也會出現更多的應用場景和解決方案。
總結
今天我們系統討論了 RSocket 這款新的高性能網絡通信協議。與 HTTP 協議相比,RSocket 提供了四種不同的交互模式來實現多樣化的網絡通信。同時,RSocket 也無縫集成了響應式編程技術,我們可以通過 RSocket 協議來實現異步、非阻塞式的網絡通信。