構建一個即時消息應用(五):實時消息
本文是該系列的第五篇。
對于實時消息,我們將使用 服務器發送事件。這是一個打開的連接,我們可以在其中傳輸數據流。我們會有個端點,用戶會在其中訂閱發送給他的所有消息。
消息戶端
在 HTTP 部分之前,讓我們先編寫一個映射 ,讓所有客戶端都監聽消息。 像這樣全局初始化:
- type MessageClient struct {
- Messages chan Message
- UserID string
- }
- var messageClients sync.Map
已創建的新消息
還記得在 上一篇文章 中,當我們創建這條消息時,我們留下了一個 “TODO” 注釋。在那里,我們將使用這個函數來調度一個 goroutine。
- go messageCreated(message)
把這行代碼插入到我們留注釋的位置。
- func messageCreated(message Message) error {
- if err := db.QueryRow(`
- SELECT user_id FROM participants
- WHERE user_id != $1 and conversation_id = $2
- `, message.UserID, message.ConversationID).
- Scan(&message.ReceiverID); err != nil {
- return err
- }
- go broadcastMessage(message)
- return nil
- }
- func broadcastMessage(message Message) {
- messageClients.Range(func(key, _ interface{}) bool {
- client := key.(*MessageClient)
- if client.UserID == message.ReceiverID {
- client.Messages <- message
- }
- return true
- })
- }
該函數查詢接收者 ID(其他參與者 ID),并將消息發送給所有客戶端。
訂閱消息
讓我們轉到 main()
函數并添加以下路由:
- router.HandleFunc("GET", "/api/messages", guard(subscribeToMessages))
此端點處理 /api/messages
上的 GET 請求。請求應該是一個 EventSource 連接。它用一個事件流響應,其中的數據是 JSON 格式的。
- func subscribeToMessages(w http.ResponseWriter, r *http.Request) {
- if a := r.Header.Get("Accept"); !strings.Contains(a, "text/event-stream") {
- http.Error(w, "This endpoint requires an EventSource connection", http.StatusNotAcceptable)
- return
- }
- f, ok := w.(http.Flusher)
- if !ok {
- respondError(w, errors.New("streaming unsupported"))
- return
- }
- ctx := r.Context()
- authUserID := ctx.Value(keyAuthUserID).(string)
- h := w.Header()
- h.Set("Cache-Control", "no-cache")
- h.Set("Connection", "keep-alive")
- h.Set("Content-Type", "text/event-stream")
- messages := make(chan Message)
- defer close(messages)
- client := &MessageClient{Messages: messages, UserID: authUserID}
- messageClients.Store(client, nil)
- defer messageClients.Delete(client)
- for {
- select {
- case <-ctx.Done():
- return
- case message := <-messages:
- if b, err := json.Marshal(message); err != nil {
- log.Printf("could not marshall message: %v\n", err)
- fmt.Fprintf(w, "event: error\ndata: %v\n\n", err)
- } else {
- fmt.Fprintf(w, "data: %s\n\n", b)
- }
- f.Flush()
- }
- }
- }
首先,它檢查請求頭是否正確,并檢查服務器是否支持流式傳輸。我們創建一個消息通道,用它來構建一個客戶端,并將其存儲在客戶端映射中。每當創建新消息時,它都會進入這個通道,因此我們可以通過 for-select
循環從中讀取。
服務器發送事件使用以下格式發送數據:
- data: some data here\n\n
我們以 JSON 格式發送:
- data: {"foo":"bar"}\n\n
我們使用 fmt.Fprintf()
以這種格式寫入響應寫入器,并在循環的每次迭代中刷新數據。
這個循環會一直運行,直到使用請求上下文關閉連接為止。我們延遲了通道的關閉和客戶端的刪除,因此,當循環結束時,通道將被關閉,客戶端不會收到更多的消息。
注意,服務器發送事件(EventSource)的 JavaScript API 不支持設置自定義請求頭😒,所以我們不能設置 Authorization: Bearer <token>
。這就是為什么 guard()
中間件也會從 URL 查詢字符串中讀取令牌的原因。
實時消息部分到此結束。我想說的是,這就是后端的全部內容。但是為了編寫前端代碼,我將再增加一個登錄端點:一個僅用于開發的登錄。