成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Golang 優(yōu)雅關(guān)閉 gRPC 實(shí)踐

開發(fā) 后端
本文主要討論了在 Go 語言中實(shí)現(xiàn)gRPC服務(wù)優(yōu)雅關(guān)閉的技術(shù)和方法,從而確保所有連接都得到正確處理,防止數(shù)據(jù)丟失或損壞。

問題

我在上次做技術(shù)支持的時(shí)候,遇到了一個(gè)有趣的錯(cuò)誤。我們的服務(wù)在 Kubernetes 上運(yùn)行,有一個(gè)容器在重啟時(shí)不斷出現(xiàn)以下錯(cuò)誤信息--"Error bind: address already in use"。對于大多數(shù)程序員來說,這是一個(gè)非常熟悉的錯(cuò)誤信息,表明一個(gè)進(jìn)程正試圖綁定到另一個(gè)進(jìn)程正在使用的端口上。

一、背景

我的團(tuán)隊(duì)維護(hù)一個(gè) Go 服務(wù),啟動(dòng)時(shí)會(huì)在各自的 goroutine 中生成大量不同的 gRPC 服務(wù)。

Goroutine[2] - Go 運(yùn)行時(shí)管理的輕量級線程,運(yùn)行時(shí)只需要幾 KB 內(nèi)存,是 Go 并發(fā)性的基礎(chǔ)。

以下是我們服務(wù)架構(gòu)的簡化版本,以及以前啟動(dòng)和停止服務(wù)器時(shí)所執(zhí)行的任務(wù)。

package main

type GrpcServerInterface interface{
  Run(stopChan chan <-struct{})
}

type Server struct {
  ServerA GrpcServerIface
  ServerB GrpcServerIface
}

func NewServer() *Server {
  return &NewServer{
    ServerA: NewServerA,
    ServerB: NewServerB,
  }
}

// Start runs each of the grpc servers
func (s *Server) Start(stopChan <-chan struct{}){
  go ServerA.Run(stopChan)
  go ServerB.Run(stopChan)
  <- stopChan
}

func main() {
  stopChan := make(chan struct{})
  server := NewServer()
  server.Start(stopChan)
 
  // Wait for program to terminate and then signal servers to stop
  ch := make(chan os.Signal, 1)
  signal.Notify(c, os.Interrupt, syscall.SIGTERM)
  <-ch
  close(stopChan)
}
package internal

type ServerA struct {
  stopChan <-chan struct{}
}

// Start runs each of the grpc servers
func (s *ServerA) Run(stopChan <-chan struct{}){
  grpcServer := grpc.NewServer()
  
  var listener net.Listener
  ln, err := net.Listen("tcp", ":8080")
  if err != nil {
   // handle error
  }
  
  for {
   err := grpcServer.Serve(listener)
   if err != nil {
     return 
   }
  }
  
  <- stopChan
  grpcServer.Stop() // Gracefully terminate connections and close listener
}

我首先想到這可能是 Docker 或 Kubernetes 運(yùn)行時(shí)的某種偶發(fā)性錯(cuò)誤。這個(gè)錯(cuò)誤讓我覺得很奇怪,原因如下:1.)查看代碼,我們似乎確實(shí)在主程序退出時(shí)關(guān)閉了所有監(jiān)聽,端口怎么可能在重啟時(shí)仍在使用?2.)錯(cuò)誤信息持續(xù)出現(xiàn)了幾個(gè)小時(shí),以至于需要人工干預(yù)。我原以為在最壞情況下,操作系統(tǒng)會(huì)在嘗試重啟容器之前為我們清理資源。或許是清理速度不夠快?

團(tuán)隊(duì)成員建議我們再深入調(diào)查一下。

二、解決方案

經(jīng)過仔細(xì)研究,發(fā)現(xiàn)我們的代碼實(shí)際上存在一些問題...

1. 通道(Channel)與上下文(Context)

通道用于在程序之間發(fā)送信號,通常以一對一的方式使用,當(dāng)一個(gè)值被發(fā)送到某個(gè)通道時(shí),只能從該通道讀取一次。在我們的代碼中,使用的是一對多模式。我們將在 main 中創(chuàng)建的通道傳遞給多個(gè)不同的 goroutine,每個(gè) goroutine 都在等待 main 關(guān)閉通道,以便知道何時(shí)運(yùn)行清理函數(shù)。

從 Go 1.7 開始,上下文被認(rèn)為是向多個(gè) goroutine 廣播信號的標(biāo)準(zhǔn)方式。雖然這可能不是我們遇到問題的根本原因(我們是在等待通道關(guān)閉,而不是試圖讓每個(gè) goroutine 從通道中讀取相同的值),但考慮到這是最佳實(shí)踐,還是希望采用這種模式。

以下是從通道切換到上下文后更新的代碼。

package internal

type ServerA struct {}

func (s *ServerA) Run(ctx context.Context){
  grpcServer := grpc.NewServer()
  var listener net.Listener
  ln, err := net.Listen("tcp", ":8080")
  if err != nil {
   log.Fatal("ServerA - Failed to create listener")
  }
  
  for {
   err := grpcServer.Serve(listener)
   if err != nil {
     log.Fatal("ServerA - Failed to start server") 
   }
  }
  
  <- ctx.Done()
  // Clean up logic 
  grpcServer.Stop() // Gracefully terminate connections and close listener
}
package main

type GrpcServerInterface interface{
 Run(stopChan chan <-struct{})
}

type Server struct {
 ServerA GrpcServerIface
 ServerB GrpcServerIface
 stopServer context.CancelFunc
 serverCtx context.Context
}

func NewServer() *Server {
 return &NewServer{
    ServerA: NewServerA,
    ServerB: NewServerB,
  }
}

// Start runs each of the grpc servers
func (s *Server) Start(ctx context.Context){
  // create new context from parent context
  s.serverCtx, stopServer := context.WithCancel(ctx) 
  go ServerA.Run(s.serverCtx)
  go ServerB.Run(s.serverCtx)
}

func (s *Server) Stop() {
  s.stopServer() // close server context to signal spawned goroutines to stop
}

func main() {
 ctx, cancel := context.withCancel()
 server := NewServer()
 server.Start(ctx)
 // Wait for program to terminate and then signal servers to stop
 ch := make(chan os.Signal, 1)
 signal.Notify(c, os.Interrupt, syscall.SIGTERM)
 
 <-ch
 cancel() // close main context on terminate signal
 server.Stop() // clean up server resources
}

2. 基于等待組(WaitGroup)的優(yōu)雅停機(jī)

雖然我們通過取消主上下文向 goroutine 發(fā)出了退出信號,但并沒有等待它們完成工作。當(dāng)主程序收到退出信號時(shí),即使我們發(fā)送了取消信號,也不能保證它會(huì)等待生成的 goroutine 完成工作。因此我們必須明確等待每個(gè) goroutine 完成工作,以避免任何泄漏,為此我們使用了 WaitGroup。

WaitGroup[3] 是一種計(jì)數(shù)器,用于阻止函數(shù)(或者說是 goroutine)的執(zhí)行,直到其內(nèi)部計(jì)數(shù)器變?yōu)?0。

package internal

type ServerA struct {}

func (s *ServerA) Run(ctx context.Context, wg *sync.WaitGroup){
  wg.Add(1) // Add the current function to the parent's wait group
  defer wg.Done() // Send "done" signal upon function exit
  
  grpcServer := grpc.NewServer()
  var listener net.Listener
  ln, err := net.Listen("tcp", ":8080")
  if err != nil {
   log.Fatal("ServerA - Failed to create listener")
  }
  
  for {
   err := grpcServer.Serve(listener)
   if err != nil {
     log.Fatal("ServerA - Failed to start server") 
   }
  }
  
  <- ctx.Done()
  // Clean up logic 
  grpcServer.Stop() // Gracefully terminate connections and close listener
  fmt.Println("ServerA has stopped")
}
package main

type GrpcServerInterface interface{
 Run(stopChan chan <-struct{})
}

type Server struct {
 ServerA GrpcServerIface
 ServerB GrpcServerIface
 wg sync.WaitGroup
 stopServer context.CancelFunc
 serverCtx context.Context
}

func NewServer() *Server {
 return &NewServer{
    ServerA: NewServerA,
    ServerB: NewServerB,
  }
}

// Start runs each of the grpc servers
func (s *Server) Start(ctx context.Context){
  s.serverCtx, stopServer := context.WithCancel(ctx)
  go ServerA.Run(s.serverCtx, &s.wg)
  go ServerB.Run(s.serverCtx, &s.wg)
}

func (s *Server) Stop() {
  s.stopServer() // close server context to signal spawned goroutines to stop
  s.wg.Wait()  // wait for all goroutines to exit before returning
  fmt.Println("Main Server has stopped")
}

func main() {
 ctx, cancel := context.withCancel()
 server := NewServer()
 server.Start(ctx)
 // Wait for program to terminate and then signal servers to stop
 ch := make(chan os.Signal, 1)
 signal.Notify(c, os.Interrupt, syscall.SIGTERM)
 
 <-ch
 cancel() // close main context on terminate signal
 server.Stop() // clean up server resources
}

3. 基于通道的啟動(dòng)信號

在測試過程中,又發(fā)現(xiàn)了一個(gè)隱藏錯(cuò)誤。我們未能在接受流量之前等待所有服務(wù)端啟動(dòng),而這在測試中造成了一些誤報(bào),即流量被發(fā)送到服務(wù)端,但沒有實(shí)際工作。為了向主服務(wù)發(fā)送所有附屬服務(wù)都已準(zhǔn)備就緒的信號,我們使用了通道。

package internal

type ServerA struct {
  startChan  
}

func (s *ServerA) Run(ctx context.Context, wg *sync.WaitGroup){
  wg.Add(1) // Add the current function to the parent's wait group
  defer wg.Done() // Send "done" signal upon function exit
   
  go func(){
    grpcServer := grpc.NewServer()
    
    var listener net.Listener
    ln, err := net.Listen("tcp", ":8080")
    if err != nil {
     log.Fatal("ServerA - Failed to create listener")
    }
    
    for {
     err := grpcServer.Serve(listener)
     if err != nil {
       log.Fatal("ServerA - Failed to start server") 
     }
    }
    close(s.startChan) // Signal that we are done starting server to exit function
    // Wait in the background for mina program to exit
    <- ctx.Done()
    // Clean up logic 
    grpcServer.Stop() // Gracefully terminate connections and close listener
    fmt.Println("ServerA has stopped")
  }()
  <- s.StartChan // Wait for signal before exiting function
  fmt.Println("ServerA has started")
}
package main

type GrpcServerInterface interface{
 Run(stopChan chan <-struct{})
}

type Server struct {
 ServerA GrpcServerIface
 ServerB GrpcServerIface
 wg sync.WaitGroup
 stopServer context.CancelFunc
 serverCtx context.Context
 startChan chan <-struct{}
}

func NewServer() *Server {
 return &NewServer{
    ServerA: NewServerA,
    ServerB: NewServerB,
    startChan: make(chan <-struct{}),
  }
}

// Start runs each of the grpc servers
func (s *Server) Start(ctx context.Context){
  s.serverCtx, stopServer := context.WithCancel(ctx)
  ServerA.Run(s.serverCtx, &s.wg)
  ServerB.Run(s.serverCtx, &s.wg)
  close(s.startChan)
  <- s.startChan // wait for each server to Start before returning
  fmt.Println("Main Server has started")
}

func (s *Server) Stop() {
  s.stopServer() // close server context to signal spawned goroutines to stop
  s.wg.Wait()  // wait for all goroutines to exit before returning
  fmt.Println("Main Server has stopped")
}

func main() {
 ctx, cancel := context.withCancel()
 server := NewServer()
 server.Start(ctx)
 // Wait for program to terminate and then signal servers to stop
 ch := make(chan os.Signal, 1)
 signal.Notify(c, os.Interrupt, syscall.SIGTERM)
 
 <-ch
 cancel() // close main context on terminate signal
 server.Stop() // clean up server resources
}

三、結(jié)論

不瞞你說,剛開始學(xué)習(xí) Go 時(shí),并發(fā)會(huì)讓你頭疼不已。調(diào)試這個(gè)問題讓我有機(jī)會(huì)看到這些概念的實(shí)際用途,并強(qiáng)化了之前不確定的主題,建議你自己嘗試簡單的示例!

參考資料:

  • [1]Go Concurrency — Graceful Shutdown: https:/medium.com/@goldengirlgeeks/go-graceful-shutdown-0c46e67ab9c9
  • [2]Goroutine: https://go.dev/tour/concurrency/1
  • [3]WaitGroup: https://www.geeksforgeeks.org/using-waitgroup-in-golang/amp
責(zé)任編輯:趙寧寧 來源: DeepNoMind
相關(guān)推薦

2021-09-13 05:02:49

GogRPC語言

2021-09-26 10:20:06

開發(fā)Golang代碼

2021-01-19 10:35:49

JVM場景函數(shù)

2021-04-20 08:00:31

Redisson關(guān)閉訂單支付系統(tǒng)

2021-09-01 23:29:37

Golang語言gRPC

2021-06-04 10:52:51

kubernetes場景容器

2023-12-05 07:26:21

Golang項(xiàng)目結(jié)構(gòu)

2024-04-02 09:55:36

GolangColly開發(fā)者

2017-12-19 10:03:44

JavaLinux代碼

2024-11-13 16:37:00

Java線程池

2022-04-11 08:17:07

JVMJava進(jìn)程

2024-04-28 18:24:05

2020-11-23 14:16:42

Golang

2022-02-20 23:15:46

gRPCGolang語言

2021-03-28 09:17:18

JVM場景鉤子函數(shù)

2022-04-29 11:52:02

API代碼HTTP

2024-10-21 15:39:24

2022-01-27 08:27:23

Dubbo上下線設(shè)計(jì)

2024-05-28 00:00:30

Golang數(shù)據(jù)庫

2024-01-07 12:47:35

Golang流水線設(shè)計(jì)模式
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號

主站蜘蛛池模板: 欧美亚洲高清 | 视频1区2区| 在线成人精品视频 | 91精品国产91久久久久青草 | 亚洲免费在线播放 | 国产色婷婷精品综合在线播放 | 天天综合操| 国产美女永久免费无遮挡 | 狠狠干网站 | 亚州精品天堂中文字幕 | 亚洲国产成人久久久 | 一区二区三区久久 | 日日日干干干 | 国产免费观看视频 | 毛片视频免费观看 | 日韩一级| 在线视频一区二区三区 | 亚洲欧美中文日韩在线v日本 | 羞羞视频在线观看网站 | 亚洲色图在线观看 | 成人一区二区在线 | 亚洲国产精品一区 | 欧美成人一区二免费视频软件 | 91社区在线观看 | 天天插天天射天天干 | 日韩欧美国产精品一区二区三区 | 日韩区| 天天综合干 | 正在播放一区二区 | 91佛爷在线观看 | 6080亚洲精品一区二区 | 99精品国自产在线观看 | 欧美久久视频 | 亚洲精品在线看 | 国产精品一区二区不卡 | 国际精品鲁一鲁一区二区小说 | 久久亚洲一区二区 | 精品久久久久久久久久久久久久 | 成人高清视频在线观看 | 国产成人精品久久 | 亚洲精品免费在线观看 |