Golang 優(yōu)雅關(guān)閉 gRPC 實(shí)踐
問題
我在上次做技術(shù)支持的時(shí)候,遇到了一個(gè)有趣的錯(cuò)誤。我們的服務(wù)在 Kubernetes 上運(yùn)行,有一個(gè)容器在重啟時(shí)不斷出現(xiàn)以下錯(cuò)誤信息--"Error bind: address already in use"。對于大多數(shù)程序員來說,這是一個(gè)非常熟悉的錯(cuò)誤信息,表明一個(gè)進(jìn)程正試圖綁定到另一個(gè)進(jìn)程正在使用的端口上。
一、背景
我的團(tuán)隊(duì)維護(hù)一個(gè) Go 服務(wù),啟動(dòng)時(shí)會(huì)在各自的 goroutine 中生成大量不同的 gRPC 服務(wù)。
Goroutine[2] - Go 運(yùn)行時(shí)管理的輕量級線程,運(yùn)行時(shí)只需要幾 KB 內(nèi)存,是 Go 并發(fā)性的基礎(chǔ)。
以下是我們服務(wù)架構(gòu)的簡化版本,以及以前啟動(dòng)和停止服務(wù)器時(shí)所執(zhí)行的任務(wù)。
package main
type GrpcServerInterface interface{
Run(stopChan chan <-struct{})
}
type Server struct {
ServerA GrpcServerIface
ServerB GrpcServerIface
}
func NewServer() *Server {
return &NewServer{
ServerA: NewServerA,
ServerB: NewServerB,
}
}
// Start runs each of the grpc servers
func (s *Server) Start(stopChan <-chan struct{}){
go ServerA.Run(stopChan)
go ServerB.Run(stopChan)
<- stopChan
}
func main() {
stopChan := make(chan struct{})
server := NewServer()
server.Start(stopChan)
// Wait for program to terminate and then signal servers to stop
ch := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
<-ch
close(stopChan)
}
package internal
type ServerA struct {
stopChan <-chan struct{}
}
// Start runs each of the grpc servers
func (s *ServerA) Run(stopChan <-chan struct{}){
grpcServer := grpc.NewServer()
var listener net.Listener
ln, err := net.Listen("tcp", ":8080")
if err != nil {
// handle error
}
for {
err := grpcServer.Serve(listener)
if err != nil {
return
}
}
<- stopChan
grpcServer.Stop() // Gracefully terminate connections and close listener
}
我首先想到這可能是 Docker 或 Kubernetes 運(yùn)行時(shí)的某種偶發(fā)性錯(cuò)誤。這個(gè)錯(cuò)誤讓我覺得很奇怪,原因如下:1.)查看代碼,我們似乎確實(shí)在主程序退出時(shí)關(guān)閉了所有監(jiān)聽,端口怎么可能在重啟時(shí)仍在使用?2.)錯(cuò)誤信息持續(xù)出現(xiàn)了幾個(gè)小時(shí),以至于需要人工干預(yù)。我原以為在最壞情況下,操作系統(tǒng)會(huì)在嘗試重啟容器之前為我們清理資源。或許是清理速度不夠快?
團(tuán)隊(duì)成員建議我們再深入調(diào)查一下。
二、解決方案
經(jīng)過仔細(xì)研究,發(fā)現(xiàn)我們的代碼實(shí)際上存在一些問題...
1. 通道(Channel)與上下文(Context)
通道用于在程序之間發(fā)送信號,通常以一對一的方式使用,當(dāng)一個(gè)值被發(fā)送到某個(gè)通道時(shí),只能從該通道讀取一次。在我們的代碼中,使用的是一對多模式。我們將在 main 中創(chuàng)建的通道傳遞給多個(gè)不同的 goroutine,每個(gè) goroutine 都在等待 main 關(guān)閉通道,以便知道何時(shí)運(yùn)行清理函數(shù)。
從 Go 1.7 開始,上下文被認(rèn)為是向多個(gè) goroutine 廣播信號的標(biāo)準(zhǔn)方式。雖然這可能不是我們遇到問題的根本原因(我們是在等待通道關(guān)閉,而不是試圖讓每個(gè) goroutine 從通道中讀取相同的值),但考慮到這是最佳實(shí)踐,還是希望采用這種模式。
以下是從通道切換到上下文后更新的代碼。
package internal
type ServerA struct {}
func (s *ServerA) Run(ctx context.Context){
grpcServer := grpc.NewServer()
var listener net.Listener
ln, err := net.Listen("tcp", ":8080")
if err != nil {
log.Fatal("ServerA - Failed to create listener")
}
for {
err := grpcServer.Serve(listener)
if err != nil {
log.Fatal("ServerA - Failed to start server")
}
}
<- ctx.Done()
// Clean up logic
grpcServer.Stop() // Gracefully terminate connections and close listener
}
package main
type GrpcServerInterface interface{
Run(stopChan chan <-struct{})
}
type Server struct {
ServerA GrpcServerIface
ServerB GrpcServerIface
stopServer context.CancelFunc
serverCtx context.Context
}
func NewServer() *Server {
return &NewServer{
ServerA: NewServerA,
ServerB: NewServerB,
}
}
// Start runs each of the grpc servers
func (s *Server) Start(ctx context.Context){
// create new context from parent context
s.serverCtx, stopServer := context.WithCancel(ctx)
go ServerA.Run(s.serverCtx)
go ServerB.Run(s.serverCtx)
}
func (s *Server) Stop() {
s.stopServer() // close server context to signal spawned goroutines to stop
}
func main() {
ctx, cancel := context.withCancel()
server := NewServer()
server.Start(ctx)
// Wait for program to terminate and then signal servers to stop
ch := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
<-ch
cancel() // close main context on terminate signal
server.Stop() // clean up server resources
}
2. 基于等待組(WaitGroup)的優(yōu)雅停機(jī)
雖然我們通過取消主上下文向 goroutine 發(fā)出了退出信號,但并沒有等待它們完成工作。當(dāng)主程序收到退出信號時(shí),即使我們發(fā)送了取消信號,也不能保證它會(huì)等待生成的 goroutine 完成工作。因此我們必須明確等待每個(gè) goroutine 完成工作,以避免任何泄漏,為此我們使用了 WaitGroup。
WaitGroup[3] 是一種計(jì)數(shù)器,用于阻止函數(shù)(或者說是 goroutine)的執(zhí)行,直到其內(nèi)部計(jì)數(shù)器變?yōu)?0。
package internal
type ServerA struct {}
func (s *ServerA) Run(ctx context.Context, wg *sync.WaitGroup){
wg.Add(1) // Add the current function to the parent's wait group
defer wg.Done() // Send "done" signal upon function exit
grpcServer := grpc.NewServer()
var listener net.Listener
ln, err := net.Listen("tcp", ":8080")
if err != nil {
log.Fatal("ServerA - Failed to create listener")
}
for {
err := grpcServer.Serve(listener)
if err != nil {
log.Fatal("ServerA - Failed to start server")
}
}
<- ctx.Done()
// Clean up logic
grpcServer.Stop() // Gracefully terminate connections and close listener
fmt.Println("ServerA has stopped")
}
package main
type GrpcServerInterface interface{
Run(stopChan chan <-struct{})
}
type Server struct {
ServerA GrpcServerIface
ServerB GrpcServerIface
wg sync.WaitGroup
stopServer context.CancelFunc
serverCtx context.Context
}
func NewServer() *Server {
return &NewServer{
ServerA: NewServerA,
ServerB: NewServerB,
}
}
// Start runs each of the grpc servers
func (s *Server) Start(ctx context.Context){
s.serverCtx, stopServer := context.WithCancel(ctx)
go ServerA.Run(s.serverCtx, &s.wg)
go ServerB.Run(s.serverCtx, &s.wg)
}
func (s *Server) Stop() {
s.stopServer() // close server context to signal spawned goroutines to stop
s.wg.Wait() // wait for all goroutines to exit before returning
fmt.Println("Main Server has stopped")
}
func main() {
ctx, cancel := context.withCancel()
server := NewServer()
server.Start(ctx)
// Wait for program to terminate and then signal servers to stop
ch := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
<-ch
cancel() // close main context on terminate signal
server.Stop() // clean up server resources
}
3. 基于通道的啟動(dòng)信號
在測試過程中,又發(fā)現(xiàn)了一個(gè)隱藏錯(cuò)誤。我們未能在接受流量之前等待所有服務(wù)端啟動(dòng),而這在測試中造成了一些誤報(bào),即流量被發(fā)送到服務(wù)端,但沒有實(shí)際工作。為了向主服務(wù)發(fā)送所有附屬服務(wù)都已準(zhǔn)備就緒的信號,我們使用了通道。
package internal
type ServerA struct {
startChan
}
func (s *ServerA) Run(ctx context.Context, wg *sync.WaitGroup){
wg.Add(1) // Add the current function to the parent's wait group
defer wg.Done() // Send "done" signal upon function exit
go func(){
grpcServer := grpc.NewServer()
var listener net.Listener
ln, err := net.Listen("tcp", ":8080")
if err != nil {
log.Fatal("ServerA - Failed to create listener")
}
for {
err := grpcServer.Serve(listener)
if err != nil {
log.Fatal("ServerA - Failed to start server")
}
}
close(s.startChan) // Signal that we are done starting server to exit function
// Wait in the background for mina program to exit
<- ctx.Done()
// Clean up logic
grpcServer.Stop() // Gracefully terminate connections and close listener
fmt.Println("ServerA has stopped")
}()
<- s.StartChan // Wait for signal before exiting function
fmt.Println("ServerA has started")
}
package main
type GrpcServerInterface interface{
Run(stopChan chan <-struct{})
}
type Server struct {
ServerA GrpcServerIface
ServerB GrpcServerIface
wg sync.WaitGroup
stopServer context.CancelFunc
serverCtx context.Context
startChan chan <-struct{}
}
func NewServer() *Server {
return &NewServer{
ServerA: NewServerA,
ServerB: NewServerB,
startChan: make(chan <-struct{}),
}
}
// Start runs each of the grpc servers
func (s *Server) Start(ctx context.Context){
s.serverCtx, stopServer := context.WithCancel(ctx)
ServerA.Run(s.serverCtx, &s.wg)
ServerB.Run(s.serverCtx, &s.wg)
close(s.startChan)
<- s.startChan // wait for each server to Start before returning
fmt.Println("Main Server has started")
}
func (s *Server) Stop() {
s.stopServer() // close server context to signal spawned goroutines to stop
s.wg.Wait() // wait for all goroutines to exit before returning
fmt.Println("Main Server has stopped")
}
func main() {
ctx, cancel := context.withCancel()
server := NewServer()
server.Start(ctx)
// Wait for program to terminate and then signal servers to stop
ch := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
<-ch
cancel() // close main context on terminate signal
server.Stop() // clean up server resources
}
三、結(jié)論
不瞞你說,剛開始學(xué)習(xí) Go 時(shí),并發(fā)會(huì)讓你頭疼不已。調(diào)試這個(gè)問題讓我有機(jī)會(huì)看到這些概念的實(shí)際用途,并強(qiáng)化了之前不確定的主題,建議你自己嘗試簡單的示例!
參考資料:
- [1]Go Concurrency — Graceful Shutdown: https:/medium.com/@goldengirlgeeks/go-graceful-shutdown-0c46e67ab9c9
- [2]Goroutine: https://go.dev/tour/concurrency/1
- [3]WaitGroup: https://www.geeksforgeeks.org/using-waitgroup-in-golang/amp