慢聊Golang的Websocket使用和實現代碼分析
本期將會繼續上次話題,上篇主要是理論還是停留在文字層面,今天帶來的是websocket實操,分享它使用和底層實現!
相信很多使用Golang的小伙伴都知道Gorilla這個工具包,長久以來gorilla/websocket 都是比官方包更好的websocket包。
題外話 gorilla:大猩猩(不過這個猩猩還挺可愛的)
圖片
gorilla/websocket 框架開源地址為: https://github.com/gorilla/websocket
今天小許就用【gorilla/websocket】框架來展開本期文章內容,文章會涉及到核心代碼的走讀,會涉及到不少代碼,需要小伙伴們保持耐心往下看,然后結合之前分享的websocket基礎,徹底學個明白!
圖片
簡單使用
安裝Gorilla Websocket Go軟件包,您只需要使用即可go get
go get github.com/gorilla/websocket
在正式使用之前我們先簡單了解下兩個數據結構 Upgrader 和 Conn
Upgrader
Upgrader指定用于將 HTTP 連接升級到 WebSocket 連接
type Upgrader struct {
HandshakeTimeout time.Duration
ReadBufferSize, WriteBufferSize int
WriteBufferPool BufferPool
Subprotocols []string
Error func(w http.ResponseWriter, r *http.Request, status int, reason error)
CheckOrigin func(r *http.Request) bool
EnableCompression bool
}
- ? HandshakeTimeout:握手完成的持續時間
- ? ReadBufferSize和WriteBufferSize:以字節為單位指定I/O緩沖區大小。如果緩沖區大小為零,則使用HTTP服務器分配的緩沖區
- ? CheckOrigin :函數應仔細驗證請求來源 防止跨站點請求偽造
這里一般會設置下CheckOrigin來解決跨域問題
Conn
Conn類型表示WebSocket連接,這個結構體的組成包括兩部分,寫入字段(Write fields)和 讀取字段(Read fields)
type Conn struct {
conn net.Conn
isServer bool
...
// Write fields
writeBuf []byte
writePool BufferPool
writeBufSize int
writer io.WriteCloser
isWriting bool
...
// Read fields
readRemaining int64
readFinal bool
readLength int64
messageReader *messageReader
...
}
isServer :字段來區分我們是否用Conn作為客戶端還是服務端,也就是說說gorilla/websocket中同時編寫客戶端程序和服務器程序,但是一般是Web應用程序使用單獨的前端作為客戶端程序。
部分字段說明如下圖:
圖片
服務端示例
出于說明的目的,我們將在Go中同時編寫客戶端程序和服務端程序(其實小許是前端小趴菜?? ??)。
當然我們在開發程序的時候基本都是單獨的前端,通常使用(Javascript,vue等)實現websocket客戶端,這里為了讓大家有比較直觀的感受,用【gorilla/websocket】分別寫了服務端和客戶端示例。
var upGrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
func main() {
http.HandleFunc("/ws", wsUpGrader)
err := http.ListenAndServe("localhost:8080", nil)
if err != nil {
log.Println("server start err", err)
}
}
func wsUpGrader(w http.ResponseWriter, r *http.Request) {
//轉換為升級為websocket
conn, err := upGrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
//釋放連接
defer conn.Close()
for {
//接收消息
messageType, message, err := conn.ReadMessage()
if err != nil {
log.Println(err)
return
}
log.Println("server receive messageType", messageType, "message", string(message))
//發送消息
err = conn.WriteMessage(messageType, []byte("pong"))
if err != nil {
log.Println(err)
return
}
}
}
我們知道websocket協議是基于http協議進行upgrade升級的, 這里使用 net/http提供原始的http連接。
http.HandleFunc接受兩個參數:第一個參數是字符串表示的 url 路徑,第二個參數是該 url 實際的處理對象
http.ListenAndServe 監聽在某個端口,啟動服務,準備接受客戶端的請求
HandleFunc的作用:通過類型轉換讓我們可以將普通的函數作為HTTP處理器使用
圖片
服務端代碼流程:
? Gorilla在使用websocket之前是先將初始化的upGrader結構體變量調用Upgrade方法進行請求協議升級
? 升級后返回 *Conn(此時isServer = true),后續使用它來處理websocket連接
? 服務端消息讀寫分別用 ReadMessage()、WriteMessage()
客戶端示例
import (
"fmt"
"github.com/gorilla/websocket"
"log"
"time"
)
func main() {
//服務器地址 websocket 統一使用 ws://
url := "ws://localhost:8080/ws"
//使用默認撥號器,向服務器發送連接請求
ws, _, err := websocket.DefaultDialer.Dial(url, nil)
if err != nil {
log.Fatal(err)
}
//關閉連接
defer conn.Close()
//發送消息
go func() {
for {
err := ws.WriteMessage(websocket.BinaryMessage, []byte("ping"))
if err != nil {
log.Fatal(err)
}
//休眠兩秒
time.Sleep(time.Second * 2)
}
}()
//接收消息
for {
_, data, err := ws.ReadMessage()
if err != nil {
log.Fatal(err)
}
fmt.Println("client receive message: ", string(data))
}
}
客戶端的實現看起來也是簡單,先使用默認撥號器,向服務器地址發送連接請求,撥號成功時也返回一個*Conn,開啟一個協程每隔兩秒向服務端發送消息,同樣都是使用ReadMessage和W riteMessage讀寫消息。
示例代碼運行結果如下:
圖片
源碼走讀
看完上面基本的客戶端和服務端案例之后,我們對整個消息發送和接收的使用已經熟悉了,實際開發中要做的就是如何結合業務去定義消息類型和發送場景了,我們接著走讀下底層的實現邏輯!
代碼走讀我們分了四部分,主要了解協議是如何升級、已經消息如何讀寫、解析數據幀【 ?? ??核心】!
Upgrade 協議升級
Upgrade顧名思義【升級】,在進行協議升級之前是需要對協議進行校驗的,之前我們知道待升級的http請求是有固定請求頭的,這里列舉幾個:
圖片
?? Upgrade進行校驗的目的是看該請求是否符合協議升級的規定
Upgrade的部分校驗代碼如下,return處進行了省略
func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (*Conn, error) {
if !tokenListContainsValue(r.Header, "Connection", "upgrade") {
return ...
}
if !tokenListContainsValue(r.Header, "Upgrade", "websocket") {
return ...
}
//必須是get請求方法
if r.Method != http.MethodGet {
return ...
}
if !tokenListContainsValue(r.Header, "Sec-Websocket-Version", "13") {
return ...
}
if _, ok := responseHeader["Sec-Websocket-Extensions"]; ok {
return ...
}
...
c := newConn(netConn, true, u.ReadBufferSize, u.WriteBufferSize, u.WriteBufferPool, br, writeBuf)
...
}
tokenListContainsValue的目的是校驗請求的Header中是否有upgrade需要的特定參數,比如我們上圖列舉的一些。
newConn就是初始化部分Conn結構體的,方法中的第二個參數為true代表這是服務端
圖片
computeAcceptKey 計算接受密鑰:
這個函數重點說下,在上一期中在websocket【連接確認】這一章節中知道,websocket協議升級時,需要滿足如下條件:
??只有當請求頭參數Sec-WebSocket-Key字段的值經過固定算法加密后的數據和響應頭里的Sec-WebSocket-Accept的值保持一致,該連接才會被認可建立。
圖片
var keyGUID = []byte("258EAFA5-E914-47DA-95CA-C5AB0DC85B11")
func computeAcceptKey(challengeKey string) string {
h := sha1.New()
h.Write([]byte(challengeKey))
h.Write(keyGUID)
return base64.StdEncoding.EncodeToString(h.Sum(nil))
}
上面 computeAcceptKey 函數的實現,驗證了之前說的關于 Sec-WebSocket-Accept的生成
服務端需將Sec-WebSocket-Key和固定的 GUID 字符串( 258EAFA5-E914-47DA-95CA-C5AB0DC85B11) 拼接后使用 SHA-1 進行哈希,并采用 base64 編碼后返回
ReadMessage 讀消息
ReadMessage方法內部使用NextReader獲取讀取器并從該讀取器讀取到緩沖區,如果是一條消息由多個數據幀,則會拼接成完整的消息,返回給業務層。
func (c *Conn) ReadMessage() (messageType int, p []byte, err error) {
var r io.Reader
messageType, r, err = c.NextReader()
if err != nil {
return messageType, nil, err
}
//ReadAll從r讀取,直到出現錯誤或EOF,并返回讀取的數據
p, err = io.ReadAll(r)
return messageType, p, err
}
該方法,返回三個參數,分別是消息類型、內容、error
messageType是int型,值可能是 BinaryMessage(二進制消息) 或 TextMessage(文本消息)
NextReader: 該方法得到一個消息類型 messageType,io.Reader,err
func (c *Conn) NextReader() (messageType int, r io.Reader, err error) {
...
for c.readErr == nil {
//解析數據幀方法advanceFrame
// frameType : 幀類型
frameType, err := c.advanceFrame()
if err != nil {
c.readErr = hideTempErr(err)
break
}
//數據類型是 文本或二進制類型
if frameType == TextMessage || frameType == BinaryMessage {
c.messageReader = &messageReader{c}
c.reader = c.messageReader
if c.readDecompress {
c.reader = c.newDecompressionReader(c.reader)
}
return frameType, c.reader, nil
}
}
...
}
c.advanceFrame() 是核心代碼,主要是實現解析這條消息,這里在最后章節會講。
這里有個 c.messageReader (當前的低級讀取器),賦值給c.reader,為什么要這樣呢?
c.messageReader 是更低級讀取器,而 c.reader 的作用是當前讀取器返回到應用程序。簡單就是messageReader 是實現了 c.reader 接口的結構體, 從而也實現了 io.Reader接口
圖片
圖上加一個 bufio.Read方法:Read讀取數據寫入p。本方法返回寫入p的字節數。本方法一次調用最多會調用下層Reader接口一次Read方法,因此返回值n可能小于len(p)。讀取到達結尾時,返回值n將為0而err將為io.EOF
messageReader的 Read方法: 我們看下Read的具體實現,Read方法主要是讀取數據幀內容,直到出現并返回io.EOF或者其他錯誤為止,而實際調用它的正是 io.ReadAll。
func (r *messageReader) Read(b []byte) (int, error) {
...
for c.readErr == nil {
//當前幀中剩余的字節
if c.readRemaining > 0 {
if int64(len(b)) > c.readRemaining {
b = b[:c.readRemaining]
}
//讀取到切片b中
n, err := c.br.Read(b)
c.readErr = hideTempErr(err)
//當Conn是服務端
if c.isServer {
c.readMaskPos = maskBytes(c.readMaskKey, c.readMaskPos, b[:n])
}
//readRemaining字節數轉int64
rem := c.readRemaining
rem -= int64(n)
//跟蹤連接上剩余的字節數
if err := c.setReadRemaining(rem); err != nil {
return 0, err
}
if c.readRemaining > 0 && c.readErr == io.EOF {
c.readErr = errUnexpectedEOF
}
//返回讀后字節數
return n, c.readErr
}
//標記是否最后一個數據幀
if c.readFinal {
// messageRader 置為nil
c.messageReader = nil
return 0, io.EOF
}
//獲取數據幀類型
frameType, err := c.advanceFrame()
switch {
case err != nil:
c.readErr = hideTempErr(err)
case frameType == TextMessage || frameType == BinaryMessage:
c.readErr = errors.New("websocket: internal error, unexpected text or binary in Reader")
}
}
err := c.readErr
if err == io.EOF && c.messageReader == r {
err = errUnexpectedEOF
}
return 0, err
}
io.ReadAll : ReadAll從r讀取,這里是實現如果一條消息由多個數據幀,會一直讀直到最后一幀的關鍵。
func ReadAll(r Reader) ([]byte, error) {
b := make([]byte, 0, 512)
for {
if len(b) == cap(b) {
// 給[]byte添加更多容量
b = append(b, 0)[:len(b)]
}
n, err := r.Read(b[len(b):cap(b)])
b = b[:len(b)+n]
if err != nil {
if err == EOF {
err = nil
}
return b, err
}
}
}
可以看出在for 循環中一直讀取,直至讀取到最后一幀,直到返回io.EOF或網絡原因錯誤為止,否則一直進行阻塞讀,這些 error 可以從上面講到的messageReader的 Read方法可以看出來。
總結下,整個流程如下:
圖片
整個讀消息的流程就結束了,我們繼續看如何寫消息!
WriteMessage 寫消息
既然讀消息是對數據幀進行解析,那么寫消息就自然會聯想到將數據按照數據幀的規范組裝寫入到一個writebuf中,然后寫入到網絡中。
圖片
我們繼續看WriteMessage是如何實現的
func (c *Conn) WriteMessage(messageType int, data []byte) error {
...
//w 是一個io.WriteCloser
w, err := c.NextWriter(messageType)
if err != nil {
return err
}
//將data寫入writeBuf中
if _, err = w.Write(data); err != nil {
return err
}
return w.Close()
}
WriteMessage方法接收一個消息類型和數據,主要邏輯是先調用Conn的NextWriter方法得到一個io.WriteCloser,然后寫消息到這個Conn的writeBuf,寫完消息后close它。
NextWriter實現如下:
func (c *Conn) NextWriter(messageType int) (io.WriteCloser, error) {
var mw messageWriter
if err := c.beginMessage(&mw, messageType); err != nil {
return nil, err
}
c.writer = &mw
...
return c.writer, nil
}
注意看這里有個messageWriter賦值給了Conn的writer,也就是說messageWriter實現了io.WriterCloser接口。
這里的實現跟讀消息中的NextReader方法中的messageReader很像,也是通過實現io.Reader接口,然后賦值給了Conn的Reader,這里可以做個小聯動,找到讀寫消息實際的實現者 messageReader、messageWriter。
messageWriter的Write實現:
前置知識:如果沒有設置Conn中writeBufferSize, 默認情況下會設置為 4096個字節,另外加上14字節的數據幀頭部大小【這些在newConn中初始化的時候有代碼說明】
func (w *messageWriter) Write(p []byte) (int, error) {
...
//如果字節長度大于初始化的writeBuf空間大小
if len(p) > 2*len(w.c.writeBuf) && w.c.isServer {
//寫入方法
err := w.flushFrame(false, p)
...
}
//字節長度不大于初始化的writeBuf空間大小
nn := len(p)
for len(p) > 0 {
//內部也是調用的flushFrame
n, err := w.ncopy(len(p))
...
}
return nn, nil
}
messageWriter中的Write方法主要的目的是將數據寫入到writeBuf中,它主要存儲結構化的數據幀內容,所謂結構化就是按照數據幀的格式,用Go實現寫入的。
總結下,整個流程如下:
圖片
而flushFrame方法將緩沖數據和額外數據作為幀寫入網絡,這個final參數表示這是消息中的最后一幀。
至于flushFrame內部是如何實現寫入網絡中的,你可以看看 net.Conn 是怎么Write的,因為最終就是調這個寫入網絡的,這里就不再深究了,有興趣的同學可以自己挖一挖!
advanceFrame 解析數據幀
解析數據幀放在最后,前面的代碼走讀主要是為了方便大家能把整體流程搞清楚,而數據幀的解析,是更加需要對websocket基礎有了解,特別是數據幀的組成,因為解析就是按照協定用Go代碼實現的一種方式而已!
強烈推薦大家看完# 為什么有了http,還需要websocket,懂了!]
圖片
根據上圖【來自網絡】回顧下數據幀各部分代表的意思:
FIN : 1個bit位,用來標記當前數據幀是不是最后一個數據幀
RSV1, RSV2, RSV3 :這三個各占用一個bit位用做擴展用途,沒有這個需求的話設置為0
Opcode : 該值定義的是數據幀的數據類型 1 表示文本 2 表示二進制
MASK:表示數據有沒有使用掩碼
Payload length :數據的長度,Payload data的長度,占7bits,7+16bits,7+64bits
Masking-key :數據掩碼 (設置為0,則該部分可以省略,如果設置為1,則用來解碼客戶端發送給服務端的數據幀)
Payload data : 幀真正要發送的數據,可以是任意長度
advanceFrame 解析方法
實現代碼會比較長,如果直接貼代碼,會看不下去,該方法返回數據類型和error, 這里我們只會截取其中一部分
func (c *Conn) advanceFrame() (int, error) {
...
//讀取前兩個字節
p, err := c.read(2)
if err != nil {
return noFrame, err
}
//數據幀類型
frameType := int(p[0] & 0xf)
// FIN 標記位
final := p[0]&finalBit != 0
//三個擴展用
rsv1 := p[0]&rsv1Bit != 0
rsv2 := p[0]&rsv2Bit != 0
rsv3 := p[0]&rsv3Bit != 0
//mask :是否使用掩碼
mask := p[1]&maskBit != 0
...
switch c.readRemaining {
case 126:
p, err := c.read(2)
if err != nil {
return noFrame, err
}
if err := c.setReadRemaining(int64(binary.BigEndian.Uint16(p))); err != nil {
return noFrame, err
}
case 127:
p, err := c.read(8)
if err != nil {
return noFrame, err
}
if err := c.setReadRemaining(int64(binary.BigEndian.Uint64(p))); err != nil {
return noFrame, err
}
}
..
}
整個流程分為了 7 個部分:
1. 跳過前一幀的剩余部分,畢竟這是之前幀的數據
2. 讀取并解析幀頭的前兩個字節(從上面圖中可以看出只讀取到 Payload len)
3. 根據讀取和解析幀長度(根據 Payload length的值來獲取Payload data的長度)
4. 處理數據幀的mask掩碼
5. 如果是文本和二進制消息,強制執行讀取限制并返回 (結束)
6. 讀取控制幀有效載荷 即 play data,設置setReadRemaining以安全地更新此值并防止溢出
7. 過程控制幀有效載荷,如果是ping/pong/close消息類型,返回 -1 (noFrame) (結束)
advanceFrame方法的主要目的就是解析數據幀,獲取數據幀的消息類型,而對于數據幀的解析都是按照上圖幀格式來的!
heartbeat 心跳
WebSocket 為了確??蛻舳?、服務端之間的 TCP 通道連接沒有斷開,使用心跳機制來判斷連接狀態。如果超時時間內沒有收到應答則認為連接斷開,關閉連接,釋放資源。流程如下
? 發送方 -> 接收方:ping
? 接收方 -> 發送方:pong
ping、pong 消息:它們對應的是 WebSocket 的兩個控制幀,opcode分別是0x9、0xA,對應的消息類型分別是PingMessage, PongMessage,前提是應用程序需要先讀取連接中的消息才能處理從對等方發送的 close、ping 和 pong 消息。
?? 當然關于源碼的部分我只是拿了其中一部分比如:控制類消息、并發、緩沖等,大家要知道有這些功能,有興趣的可以去看看
總結
本期主要和大家一起了解 gorilla/websocket 框架的使用和部分底層實現原理代碼走讀,通篇讀下來想必大家對websocket用程序語言實現有了更深刻的認識吧!
不過流行的開源 Go 語言 Web 工具包 Gorilla 宣布已正式歸檔,目前已進入只讀模式?!八l出的信號是,這些庫在未來將不會有任何發展。
也就是說 gorilla/websocket 這個被廣泛使用的 websocket 庫也會停止更新了,真是個令人悲傷的消息!
正如作者所說的那樣:“沒有一個項目需要永遠存在。這可能不會讓每個人都開心,但生活就是這樣。”
好了,通過兩期對websocket的講解,相信大家心里已經對它有了比較深刻的印象,還是那句話知道的越多,不知道的也越多,一起前行讓自己知道的更多一點!