SSE打扮你的AI應用,讓它美美噠!
前言
老粉絲都知道,我們有一個文檔問答的AI產品,然后有一個前端要求就是模仿ChatGPT展示后端返回的數據信息(打字效果)。剛開始呢,由于問答比較簡單,只是一些簡單的文本類型,并且后端返回的結果也有限,加上工期比較緊(反正就是各種原因),我們選擇了最原始的前后端數據交互方法。
前端發送問題,后端接入模型分析數據,然后將最后的結果一股腦的返回給前端。就這樣歲月靜好的度過了一段時間,但是由于需求的變更。后端返回的信息又臭又長,然后還是沿用之前的數據獲取和展示方式,就顯得捉襟見肘了。
所以,此時我們就從我們知識百寶箱中搜索,然后一眼就相中SSE。之前在寫一個類ChatGPT應用,前后端數據交互有哪幾種文章中,我們就對其有過簡單的介紹。
今天我們就來聊聊,如何實現基于SSE的前后端項目。(我們講主要邏輯,有些細節例如樣式等就不那么考究了)
效果展示
最終,我們就會得到一個類似下面的程序。
圖片
好了,天不早了,干點正事哇。
我們能所學到的知識點
- SSE是個啥?
- 用Node實現一個SSE服務
- SSE前端部分(React版本)
- 實現一個打字組件
1. SSE是個啥?
[服務器發送事件]((https://developer.mozilla.org/zh-CN/docs/Web/API/Server-sent_events "服務器發送事件"))(Server-Sent Events,SSE)提供了一種標準方法,通過 HTTP 將服務器數據推送到客戶端。與 WebSockets 不同,SSE 專門設計用于服務器到客戶端的單向通信,使其非常適用于實時信息的更新或者那些在不向服務器發送數據的情況下實時更新客戶端的情況。
服務器發送事件 (SSE) 允許服務器在任何時候向瀏覽器推送數據:
- 瀏覽器仍然會發出初始請求以建立連接。
- 服務器返回一個事件流響應并保持連接打開。
- 服務器可以使用這個連接在任何時候發送文本消息。
- 傳入的數據在瀏覽器中觸發一個 JavaScript 事件。事件處理程序函數可以解析數據并更新 DOM。
?
本質上,SSE 是一個無盡的數據流。可以將其視為下載一個無限大的文件,以小塊形式攔截和讀取。(類比我們之前講過的大文件分片上傳和分片下載)
SSE 首次實現于 2006 年,所有主要瀏覽器都支持這個標準。它可能不如 WebSockets[1] 知名,但SSE更簡單,使用標準 HTTP,支持單向通信,并提供自動重新連接功能。
SSE組件
我們可以將服務器發送事件視為單個 HTTP 請求,其中后端不會立即發送整個主體,而是保持連接打開,并通過每次發送事件時發送單個行來逐步傳輸答復。
圖片
SSE是一個由兩個組件組成的標準:
- 瀏覽器中的 EventSource 接口[2],允許客戶端訂閱事件:它提供了一種通過抽象較低級別的連接和消息處理來訂閱事件流的便捷方法。
- 事件流協議:描述服務器發送的事件必須遵循的標準純文本格式,以便 EventSource 客戶端理解和傳播它們
EventSource
作為核心的組件,EventSource的兼容性良好。
圖片
工作原理
服務端部分
服務器需要設置 HTTP 頭部 Content-Type: text/event-stream 并保持連接不斷開,以持續發送事件。典型的服務器發送事件的格式如下:
data: 這是一個事件消息
data: 這是另一個事件消息
可以包含多個字段:
id: 1234
event: customEvent
data: {"message": "這是一個自定義事件"}
retry: 10000
- id:事件 ID,客戶端會自動保存這個 ID,并在重連時發送 Last-Event-ID 頭部。
- event:事件類型,客戶端可以根據類型進行不同處理。
- data:事件數據。
- retry:建議客戶端重新連接的時間間隔(毫秒)。
客戶端部分
客戶端使用 JavaScript 創建一個 EventSource 對象并監聽事件:
const eventSource = new EventSource('server-url');
eventSource.onmessage = function(event) {
console.log('收到事件數據:', event.data);
};
eventSource.onerror = function(event) {
console.log('事件源連接錯誤:', event);
};
eventSource.addEventListener('customEvent', function(event) {
console.log('收到自定義事件:', event.data);
});
更高級用法
在單個頻道上發送不同的數據
服務器發送的消息可以有一個相關的事件:在 data: 行上方傳遞,以識別特定類型的信息:
event: React
data: React is great!
event: Rust
data: { "Rust": "我很喜歡", }
event: AI
data: { "name": "OpenAI" }
這些不會觸發客戶端的 message 事件處理程序。我們必須為每種類型的事件添加處理程序。例如:
// react 消息處理程序
source.addEventListener('React', e => {
document.getElementById('React')
.textContent = e.data;
});
// Rust 消息處理程序
source.addEventListener('Rust', e => {
const r = JSON.parse(e.data);
document.getElementById('Rust')
.textContent = `${r.Rust}`;
});
// AI 消息處理程序
source.addEventListener('AI', e => {
const ai = JSON.parse(e.data);
document.getElementById(`ai`)
.textContent = `${ai.name}`;
});
使用數據標識符
可選地,服務器也可以在 data: 行之后發送一個 id::
event: React
data: React is great!
id: 42
如果連接斷開,瀏覽器會在 Last-Event-ID HTTP 頭中發送最后的 id,以便服務器可以重新發送任何丟失的消息。
最新的 ID 也可以在客戶端的事件對象的 .lastEventId 屬性中獲取:
// news 消息處理程序
source.addEventListener('React', e => {
console.log(`last ID: ${e.lastEventId}`);
document.getElementById('React')
.textContent = e.data;
});
指定重試延遲
雖然重新連接是自動的,但我們的服務器可能知道在特定時間段內不會有新數據,因此無需保持活動的通信通道。服務器可以發送一個包含毫秒值的 retry: 響應,無論是單獨發送還是作為最終消息的一部分。例如:
retry: 60000
data: 你很好,這段時間我們還是別聯系了!
收到后,瀏覽器會斷開 SSE 連接,并在延遲期過后嘗試重新連接。
其他事件處理程序
除了 message 和命名事件,我們還可以在客戶端 JavaScript 中創建 open 和 error 處理程序。
open 事件在服務器連接建立時觸發。可以用于運行額外的配置代碼或初始化 DOM 元素:
const source = new EventSource('/sse1');
source.addEventListener('open', e => {
console.log('SSE connection established.');
});
error 事件在服務器連接失敗或終止時觸發。我們可以檢查事件對象的 .eventPhase 屬性以查看發生了什么:
source.addEventListener('error', e => {
if (e.eventPhase === EventSource.CLOSED) {
console.log('SSE connection closed');
} else {
console.log('error', e);
}
});
無需重新連接:它會自動進行。
終止 SSE 通信
瀏覽器可以使用 EventSource 對象的 .close() 方法終止 SSE 通信。例如:
const source = new EventSource('/sse1');
// 一小時后關閉
setTimeout(() => source.close(), 3600000);
服務器可以通過以下方式終止連接:
- 觸發 res.end() 或發送一個 retry: 延遲,然后
- 當相同的瀏覽器嘗試重新連接時返回 HTTP 狀態 204。
只有瀏覽器可以通過創建一個新的 EventSource 對象重新建立連接。
優點
優點 | 描述 |
簡單性 | 比 WebSocket 更簡單的 API 設計 |
自動管理重連 | 內置的重連機制使開發更簡便 |
瀏覽器支持 | 現代瀏覽器普遍支持 EventSource |
缺點
缺點 | 描述 |
單向通信 | 無法從客戶端向服務器發送數據 |
基于 HTTP | 相比 WebSocket,SSE 在處理高頻率數據傳輸時性能可能較低 |
受限于同源策略 | 跨域通信需要額外配置 CORS(跨域資源共享) |
在講代碼前,我們來簡單說一下我們要實現的交互
- 前端輸入信息
- 通過Post接口傳人后端
- 后端處理請求,拼接數據,返回SSE格式數據
- 前端通過EventSource事件接收數據
2. 用Node實現一個SSE服務
如果想了解一個事物的全貌,那就是接近它,了解它,實現它。
那么,我們就來自己用Node實現一個SSE服務。我們使用express[3]來搭建后端服務。
在我們心儀的目錄下,執行如下命令
mkdir SSE &&
cd SSE &&
mkdir Server &&
cd Server &&
npm init
構建一個簡單的Node項目。
然后,更新我們的package.json,這里就偷懶了,我直接把本地的內容復制下來了。
{
"name": "server",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"build": "tsc && node -v",
"dev": "tsc && tsc-watch --onSuccess \"node dist/index.js\""
},
"dependencies": {
"@types/uuid": "^10.0.0",
"body-parser": "^1.20.2",
"cors": "^2.8.5",
"express": "^4.18.2",
"uuid": "^10.0.0"
},
"devDependencies": {
"@types/cors": "^2.8.13",
"@types/express": "^4.17.17",
"tsc-watch": "^6.0.4",
"typescript": "^5.1.6"
},
"author": "Front789",
"license": "ISC"
}
處理主要邏輯
我們將只要的邏輯方式在src/index.ts中。
圖片
讓我們來挑幾個重要的點來解釋一下:
導入依賴和初始化Express
import express from "express";
import cors from "cors";
import bodyParser from "body-parser";
import { v4 as uuidv4 } from "uuid";
const app = express();
app.use(cors());
app.use(bodyParser.json());
const port = 4000;
這是一段實例化Express的代碼。不做過多解釋。我們是用了兩個中間件
- app.use(cors()): 應用 CORS 中間件,使服務器能夠處理跨域請求。
- app.use(bodyParser.json()): 應用 Body Parser 中間件,自動解析請求體中的 JSON 數據,并將其存儲在 req.body 中。
處理SSE鏈接
// SSE連接處理
app.get('/api/events', (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.flushHeaders(); // 發送頭部信息到客戶端
const clientId = uuidv4();
const newClient = {
id: clientId,
res,
};
clients.push(newClient);
req.on('close', () => {
console.log(`${clientId} Connection closed`);
clients = clients.filter((client) => client.id !== clientId);
});
});
這部分其實也很簡單,但是呢,我們要特別注意一下res.setHeader()部分。
- Content-Type: text/event-stream: 設置內容類型為 text/event-stream,表明這是一個 SSE 連接。
- Cache-Control: no-cache: 禁用緩存,以確保實時數據傳輸。
- Connection: keep-alive: 保持連接不斷開。
- res.flushHeaders(): 立即將響應頭部發送給客戶端,確保連接保持活躍狀態。
當我們每次接收到/api/events時,沒有立馬向請求方返回數據,而是構建一個newClient,并且將其push到一個全局變量clients中。
當客戶端關閉連接時,從列表中移除相應的客戶端,我們在close中執行對應的移除操作。
處理Post請求
// 處理POST請求
app.post('/api/message', (req, res) => {
const userInput = req.body.message;
// 模擬處理消息并推送給所有客戶端
const responses = generateChatGPTResponse(userInput);
let index = 0;
const intervalId = setInterval(() => {
if (index < responses.length) {
clients.forEach((client) => client.res.write(`data: ${JSON.stringify({ message: responses[index] })}\n\n`));
index++;
} else {
clearInterval(intervalId);
res.end();
}
}, 1000); // 每秒發送一個響應
res.status(200).send();
});
function generateChatGPTResponse(input:string) {
// 模擬AI模型的響應,這里可以替換為實際的模型調用
return [
`你說的是: ${input}`,
"這是AI模型的第一段響應。",
"這是AI模型的第二段響應。",
"這是AI模型的第三段響應。",
];
}
該段代碼代碼也是我們常見的用于處理Post請求的方法。有幾點需要額外注意一下
- 使用 req.body.message 獲取客戶端發送的消息內容,這需要 body-parser 中間件來解析請求體中的 JSON 數據
- 使用 setInterval 定時器每秒推送一條消息給所有 SSE 連接的客戶端
- 在消息推送開始之前,立即向發送 POST 請求的客戶端返回一個 200 狀態碼,表示請求已成功接收。
服務啟動
然后我們就可以使用yarn dev在port為4000的端口中啟動一個SSE服務,此時坐等對應的請求到來即可。
3. SSE前端部分(React版本)
既然,SSE后端服務已經有了,那么我們來在前端接入對應的服務。
我們在SSE目錄下,使用我們的腳手架在生成一個前端服務。
npx f_cli_f create Client
然后選擇自己擅長的技術即可。然后按照對應的提示按照并啟動服務即可。如果對我們的腳手架還不了解,可以翻看之前的文章Rust 賦能前端-開發一款屬于你的前端腳手架
最后,我們在SSE目錄下,就會生成如下的目錄信息。
---SSE
---Client(前端項目)
---Server (后端服務)
前端代碼邏輯
我們在Client/src/pages/新建一個ChatComponent組件。
UI部分
<div className='flex flex-col justify-center items-center w-full h-full'>
<div className='flex flex-col justify-center items-center flex-1 w-full'>
<Typewriter text={'大家好,我是柒八九。一個專注于前端開發技術/Rust及AI應用知識分享的Coder'} delay={100} infinite={false} />
{messages.map((msg, index) => (
// <p key={index}>{msg}</p>
<Typewriter text={msg} delay={100} infinite={false} key={index}/>
))}
</div>
<Form form={form} className='w-9/12'>
<Form.Item className={styles['message-item']} name="message">
<Input.TextArea
autoSize={{ minRows: 1, maxRows: 3 }}
placeholder="輸入內容開始對話(Shift + Enter 換行)"
onPressEnter={handleTextAreaKeyDown}
/>
</Form.Item>
</Form>
</div>
UI部分呢,我們大致分為兩部分
- 展示后端返回信息
- TextArea收集用戶輸入信息
然后,我們在TextArea的PressEnter中執行向后端發送的操作。
注冊EventSource
我們在Effect中注冊EventSource相關事件。
useEffect(() => {
const eventSource = new EventSource('http://localhost:4000/api/events');
eventSource.onmessage = function (event) {
const data = JSON.parse(event.data);
const { message } = data;
setMessages((prevMessages) => [...prevMessages, message]);
};
return () => {
eventSource.close();
};
}, []);
有幾點需要說明
- 我們是在組件初始化的時候,注冊EventSource
- 由于我們在上一節中已經在http://localhost:4000中啟用了SSE服務,所以在EventSource中傳人的是對應的SSE地址
- 在onmessage中我們解析從后端返回的數據,并存入到state-message中。
當數據返回后,對應的state-message發生變化,那也就觸發了React的重新渲染。就可以在UI部分看到后端返回的信息。
handleTextAreaKeyDown
這部分是調用指定的后端接口,將用戶信息傳遞給后端服務,用于做指定信息的處理。
const handleTextAreaKeyDown= async (event) => {
const { keyCode, shiftKey } = event;
if (keyCode == 13 && !shiftKey) {
event.preventDefault();
const message = form.getFieldValue('message');
if (message && message.trim().length > 0) {
if (message.trim().length > 0) {
await fetch('http://localhost:4000/api/message', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ message }),
});
}
form.setFieldValue('message', '');
}
}
};
在PressEnter事件中,我們還判斷event的keyCode和shiftKey來實現在TextArea中換行的操作。也就是只有在單純的觸發Enter才會向后端傳遞數據。
我們之所以選擇用Post來向后端發起情況,因為我們用戶輸入的信息,不單單是文本信息,也可以是PDF/Word/Text等文本資源。
最終,我們就會得到一個和本文開頭的那個效果。
圖片
求豆麻袋,好像有一個東西沒給大家展示,那就是實現打字效果。別著急,我們這就說。
4. 實現一個打字組件
其實呢,針對一個完整的應用,我們不僅僅需要處理純文本信息,我們還需要處理類似Table/Code/Img等富文本的展示。
此時,最好的后端數據返回是啥呢,MarkDown。沒錯,ChatGPT也是這種格式,只不過它在前端顯示的時候,用了針對這類信息的展示處理。
而,我們今天的主要方向是講SSE,而針對其他類型的信息展示不在此篇文章內。如果大家有興趣了解,后面我們也可以針對此處的內容展開聊聊。
話題扯的有點遠了,我們現在進入這節的主題,寫一個純前端的打字效果。
其實呢,針對現成的打字效果有很多。例如
- typed-js[4]
- react-typed[5]
但是呢,本著知識探索的精神,我們今天來實現一個屬于自己的打字效果。
在ChatComponent目錄下,新建一個Typewriter文件夾。
然后新建三個文件
- index.tsx:只要邏輯
- Cursor.tsx:處理光標邏輯
- index.module.scss:存放樣式信息。
下面我們就來看看它們各自的實現邏輯。
index.tsx
import React, { useState, useEffect } from 'react';
import style from './index.module.scss';
import Cursor from './Cursor';
interface TypewriterProps {
text: string | React.ReactNode;
delay: number;
infinite?: boolean;
}
const Typewriter: React.FC<TypewriterProps> = ({ text, delay, infinite }) => {
const [currentText, setCurrentText] = useState<string>('');
const [currentIndex, setCurrentIndex] = useState<number>(0);
useEffect(() => {
let timeout: number;
if (currentIndex < text.length) {
timeout = setTimeout(() => {
setCurrentText((prevText) => prevText + text[currentIndex]);
setCurrentIndex((prevIndex) => prevIndex + 1);
}, delay);
} else if (infinite) {
setCurrentIndex(0);
setCurrentText('');
}
return () => clearTimeout(timeout);
}, [currentIndex, delay, infinite, text]);
return (
<span className={style['text-writer-wrapper']}>
<span dangerouslySetInnerHTML={{ __html: currentText }}></span>
{currentIndex < text.length && <Cursor />}
</span>
);
};
export default Typewriter;
其實呢,上面的邏輯很簡單。
Typewriter接收三個參數
- text:要顯示的文本,可以是字符串或 React 節點。
- delay:每個字符之間的延遲時間(以毫秒為單位)。
- infinite:是否無限循環顯示文本,默認為 false。
使用 useEffect 鉤子在每次 currentIndex 改變時運行:
- 如果 currentIndex 小于 text 的長度:
設置一個 setTimeout 以延遲添加下一個字符到 currentText。
遞增 currentIndex。
- 否則如果 infinite 為 true:
重置 currentIndex 和 currentText 以開始新的循環。
返回一個清除定時器的函數,以避免內存泄漏。
然后,我們使用dangerouslySetInnerHTML來更新文本信息。
Cursor.tsx
這個組件就更簡單了,就是繪制了一個svg,用于在文本輸入過程中顯示光標。
import style from './index.module.scss';
export default function CursorSVG() {
return (
<svg viewBox="8 4 8 16" xmlns="http://www.w3.org/2000/svg" className={style['cursor']}>
<rect x="10" y="6" width="2" height="100%" fill="black" />
</svg>
);
}
index.module.scss
.text-writer-wrapper{
@keyframes flicker {
0% {
opacity: 0;
}
50% {
opacity: 1;
}
100% {
opacity: 0;
}
}
.cursor {
display: inline-block;
width: 1ch;
animation: flicker 0.5s infinite;
}
}
這段代碼主要用于創建打字機效果中的光標閃爍效果:
- @keyframes flicker 動畫定義了光標的閃爍效果,通過改變透明度來實現閃爍。
- .cursor 類應用了閃爍動畫,并設置了寬度,使其顯示為一個閃爍的光標。
最終效果是在 .text-writer-wrapper 中顯示的光標會每 0.5 秒閃爍一次,模擬文本編輯器中的光標效果。