聊聊SpringAI流式輸出的底層實現?
在 Spring AI 中,流式輸出(Streaming Output)是一種逐步返回 AI 模型生成結果的技術,允許服務器將響應內容分批次實時傳輸給客戶端,而不是等待全部內容生成完畢后再一次性返回。
這種機制能顯著提升用戶體驗,尤其適用于大模型響應較慢的場景(如生成長文本或復雜推理結果)。
技術實現
在 Spring AI 中流式輸出的實現有以下兩種方式:
- 通過 ChatModel 實現流式輸出。
- 通過 ChatClient 實現流式輸出。
ChatModel 流式輸出
Spring AI 中的流式輸出實現非常簡單,使用 ChatModel 中的 stream 即可實現:
@RequestMapping(value = "/streamChat", produces = "text/event-stream")
public Flux<String> streamChat(@RequestParam(value = "msg") String msg) {
return chatModel.stream(msg);
}
ChatClient 流式輸出
ChatClient 流式輸出實現也很簡單,也是調用 stream().content() 返回 Flux 對象即可:
@RequestMapping("/stream")
public Flux<String> stream(String question) {
return chatClient.prompt(question)
.stream()
.content();
}
底層實現
那么問題來了流式輸出的底層實現究竟是啥呢?
根據以往的經驗我們知道,流式輸出的實現技術基本有兩種:
- Spring MVC(Servlet)+ SSE 實現流式輸出。
- Spring WebFlux Reactor 模型實現流式輸出。
SSE 介紹
SSE(Server-Sent Events)是一種允許服務器向瀏覽器或其他客戶端推送實時更新的技術。它是一種單向通信機制,服務器可以主動向客戶端發送數據,而客戶端無需頻繁輪詢服務器請求數據。SSE 是基于 HTTP 協議的,使用標準的 text/event-stream
MIME 類型來傳輸數據。
SSE 主要特點
- 單向通信:SSE 僅支持服務器到客戶端的單向通信,客戶端不能向服務器發送消息。如果需要雙向通信,可以結合 WebSocket 或其他技術。
- 基于 HTTP:SSE 使用標準的 HTTP 協議,不需要額外的協議支持,因此兼容性較好。
- 自動重連:客戶端在連接中斷后會自動嘗試重新連接。
- 數據格式:SSE 數據以特定的格式發送,每條消息以 data: 開頭,以兩個換行符 \n\n 結尾。
- 事件類型:可以為每條消息指定事件類型,客戶端可以通過監聽特定事件類型來處理不同的消息。
Spring MVC(Spring Web)底層是基于 Servlet 實現的,它是使用 SseEmitter 技術實現 SSE 協議實現流式輸出的。
SseEmitter 基本用法
這里提供一個 SseEmitter 的簡單使用案例,實現流式輸出,讓大家更好的理解這個技術點:
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
@RestController
public class SseDemoController {
@GetMapping(value = "/sse-demo", produces = "text/event-stream")
public SseEmitter streamData() {
// 設置超時時間(單位:毫秒)
SseEmitter emitter = new SseEmitter(30_000L); // 30秒超時
// 異步任務模擬流式輸出
new Thread(() -> {
try {
for (int i = 1; i <= 5; i++) {
String message = "第 " + i + " 條消息";
emitter.send(message);
Thread.sleep(1000); // 每秒發送一次
}
emitter.complete(); // 完成推送
} catch (IOException | InterruptedException e) {
emitter.completeWithError(e); // 異常處理
}
}).start();
return emitter;
}
}
Spring WebFlux 介紹
Spring WebFlux 是 Spring Framework 5 引入的響應式 Web 框架,旨在解決高并發場景下傳統同步阻塞模型(如 Spring MVC)的性能瓶頸。其核心目標是通過非阻塞異步編程模型提升系統吞吐量,適用于 I/O 密集型任務(如微服務通信、實時數據流處理)。
Spring WebFlux 與 Spring MVC 不同,它基于 Reactive Streams 規范實現的,支持背壓機制(Backpressure),防止數據生產者壓垮消費者。
“
背壓機制:通過訂閱者主動控制數據流速,避免內存溢出。例如,消費者可動態調整請求量,生產者根據反饋調整數據生成速度.
Spring AI 流式輸出
說完了前置知識,咱們回到主題:Spring AI 是如何實現流式輸出的?
要搞清楚這個問題,我們需要看流式輸出對象 Flux 的實現源碼:
查看 Flux 源碼我們發現它是屬于 reactor.core.publisher 包下的抽象類:
并且看類注釋和類所在的 jar 包我們就明白了:
Spring AI 中的流式輸出是通過 Reactor Streams 模型實現的,和 Spring WebFlux 的底層實現是一樣的技術。
Reactor 介紹
Reactor 是一種事件驅動的高性能網絡編程模型,主要用于處理高并發的網絡 I/O 請求。其核心思想是通過一個或多個線程監聽事件,并將事件分發給相應的處理程序,從而實現高效的并發處理。
Reactor 模型的主要特征如下:
- 事件驅動:所有 I/O 操作都由事件觸發并處理。
- 非阻塞:操作不會因為 I/O 而掛起,避免了線程等待的開銷。
- 高效資源利用:通過少量線程處理大量并發連接,提升性能。
- 組件分離:將事件監聽(Reactor)、事件分發(Dispatcher)和事件處理(Handler)解耦,使代碼結構更清晰。
Reactor 實現方式有三種:
- 單線程 Reactor 模型:所有操作在一個線程完成,適用于低并發場景。
- 多線程 Reactor 模型:主線程處理連接,子線程池處理 I/O 和業務。
- 主從 Reactor 模型:主線程池處理連接,子線程池處理 I/O(進一步優化資源分配)。
生產級別使用的 Reactor 基本都是主從 Reactor 模型,它的執行流程如下:
小結
Spring AI 中的流式輸出有兩種實現,而通過查看這兩種流式輸出的實現源碼可知,Spring AI 中的流式輸出是通過 Reactor Streams 技術實現的,和 Spring WebFlux 的底層實現技術一樣。