成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

微服務架構下的熔斷框架:Hystrix-Go

開發 架構
伴隨著微服務架構被宣傳得如火如茶,一些概念也被推到了我們的面前。一提到微服務,就離不開這幾個字:高內聚低耦合;微服務的架構設計最終目的也就是實現這幾個字。

[[421890]]

本文轉載自微信公眾號「Golang夢工廠」,作者AsongGo 。轉載本文請聯系Golang夢工廠公眾號。

背景

伴隨著微服務架構被宣傳得如火如茶,一些概念也被推到了我們的面前。一提到微服務,就離不開這幾個字:高內聚低耦合;微服務的架構設計最終目的也就是實現這幾個字。在微服務架構中,微服務就是完成一個單一的業務功能,每個微服務可以獨立演進,一個應用可能會有多個微服務組成,微服務之間的數據交可以通過遠程調用來完成,這樣在一個微服務架構下就會形成這樣的依賴關系:

微服務A調用微服務C、D,微服務B又依賴微服務B、E,微服務D依賴于服務F,這只是一個簡單的小例子,實際業務中服務之間的依賴關系比這還復雜,這樣在調用鏈路上如果某個微服務的調用響應時間過長或者不可用,那么對上游服務(按調用關系命名)的調用就會占用越來越多的系統資源,進而引起系統崩潰,這就是微服務的雪蹦效應。

為了解決微服務的雪蹦效應,提出來使用熔斷機制為微服務鏈路提供保護機制。熔斷機制大家應該都不陌生,電路的中保險絲就是一種熔斷機制,在微服務中的熔斷機制是什么樣的呢?

當鏈路中的某個微服務不可用或者響應的時間太長時,會進行服務的降級,進而熔斷該節點微服務的調用,快速返回錯誤的響應信息,當檢測到該節點微服務調用響應正常后,恢復調用鏈路。

本文我們就介紹一個開源熔斷框架:hystrix-go。

熔斷框架(hystrix-go)

Hystrix是一個延遲和容錯庫,旨在隔離對遠程系統、服務和第三方服務的訪問點,停止級聯故障并在故障不可避免的復雜分布式系統中實現彈性。hystrix-go 旨在允許 Go 程序員輕松構建具有與基于 Java 的 Hystrix 庫類似的執行語義的應用程序。所以本文就從使用開始到源碼分析一下hystrix-go。

快速安裝

  1. go get -u github.com/afex/hystrix-go/hystrix 

快速使用

hystrix-go真的是開箱即用,使用還是比較簡單的,主要分為兩個步驟:

  • 配置熔斷規則,否則將使用默認配置。可以調用的方法
  1. func Configure(cmds map[string]CommandConfig)  
  2. func ConfigureCommand(name string, config CommandConfig) 

Configure方法內部也是調用的ConfigureCommand方法,就是傳參數不一樣,根據自己的代碼風格選擇。

  • 定義依賴于外部系統的應用程序邏輯 - runFunc 和服務中斷期間執行的邏輯代碼 - fallbackFunc,可以調用的方法:
  1. func Go(name string, run runFunc, fallback fallbackFunc) // 內部調用Goc方法 
  2. func GoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC)  
  3. func Do(name string, run runFunc, fallback fallbackFunc) // 內部調用的是Doc方法 
  4. func DoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) // 內部調用Goc方法,處理了異步過程 

Go和Do的區別在于異步還是同步,Do方法在調用Doc方法內處理了異步過程,他們最終都是調用的Goc方法。后面我們進行分析。

舉一個例子:我們在Gin框架上加一個接口級的熔斷中間件

  1. // 代碼已上傳github: 文末查看地址 
  2. var CircuitBreakerName = "api_%s_circuit_breaker" 
  3. func CircuitBreakerWrapper(ctx *gin.Context){ 
  4.  name := fmt.Sprintf(CircuitBreakerName,ctx.Request.URL) 
  5.  hystrix.Do(name, func() error { 
  6.   ctx.Next() 
  7.   code := ctx.Writer.Status() 
  8.   if code != http.StatusOK{ 
  9.    return errors.New(fmt.Sprintf("status code %d", code)) 
  10.   } 
  11.   return nil 
  12.  
  13.  }, func(err error) error { 
  14.   if err != nil{ 
  15.    // 監控上報(未實現) 
  16.    _, _ = io.WriteString(f, fmt.Sprintf("circuitBreaker and err is %s\n",err.Error())) //寫入文件(字符串) 
  17.    fmt.Printf("circuitBreaker and err is %s\n",err.Error()) 
  18.    // 返回熔斷錯誤 
  19.    ctx.JSON(http.StatusServiceUnavailable,gin.H{ 
  20.     "msg": err.Error(), 
  21.    }) 
  22.   } 
  23.   return nil 
  24.  }) 
  25.  
  26. func init()  { 
  27.  hystrix.ConfigureCommand(CircuitBreakerName,hystrix.CommandConfig{ 
  28.   Timeout:                int(3*time.Second), // 執行command的超時時間為3s 
  29.   MaxConcurrentRequests:  10, // command的最大并發量 
  30.   RequestVolumeThreshold: 100, // 統計窗口10s內的請求數量,達到這個請求數量后才去判斷是否要開啟熔斷 
  31.   SleepWindow:            int(2 * time.Second), // 當熔斷器被打開后,SleepWindow的時間就是控制過多久后去嘗試服務是否可用了 
  32.   ErrorPercentThreshold:  20, // 錯誤百分比,請求數量大于等于RequestVolumeThreshold并且錯誤率到達這個百分比后就會啟動熔斷 
  33.  }) 
  34.  if checkFileIsExist(filename) { //如果文件存在 
  35.   f, errfile = os.OpenFile(filename, os.O_APPEND, 0666) //打開文件 
  36.  } else { 
  37.   f, errfile = os.Create(filename) //創建文件 
  38.  } 
  39.  
  40.  
  41. func main()  { 
  42.  defer f.Close() 
  43.  hystrixStreamHandler := hystrix.NewStreamHandler() 
  44.  hystrixStreamHandler.Start() 
  45.  go http.ListenAndServe(net.JoinHostPort("""81"), hystrixStreamHandler) 
  46.  r := gin.Default() 
  47.  r.GET("/api/ping/baidu", func(c *gin.Context) { 
  48.   _, err := http.Get("https://www.baidu.com"
  49.   if err != nil { 
  50.    c.JSON(http.StatusInternalServerError, gin.H{"msg": err.Error()}) 
  51.    return 
  52.   } 
  53.   c.JSON(http.StatusOK, gin.H{"msg""success"}) 
  54.  }, CircuitBreakerWrapper) 
  55.  r.Run()  // listen and serve on 0.0.0.0:8080 (for windows "localhost:8080"
  56.  
  57. func checkFileIsExist(filename string) bool { 
  58.  if _, err := os.Stat(filename); os.IsNotExist(err) { 
  59.   return false 
  60.  } 
  61.  return true 

指令:wrk -t100 -c100 -d1s http://127.0.0.1:8080/api/ping/baidu

運行結果:

  1. circuitBreaker and err is status code 500 
  2. circuitBreaker and err is status code 500 
  3. .....  
  4. circuitBreaker and err is hystrix: max concurrency 
  5. circuitBreaker and err is hystrix: max concurrency 
  6. ..... 
  7. circuitBreaker and err is hystrix: circuit open 
  8. circuitBreaker and err is hystrix: circuit open 
  9. ..... 

對錯誤進行分析:

  • circuitBreaker and err is status code 500:因為我們關閉了網絡,所以請求是沒有響應的
  • circuitBreaker and err is hystrix: max concurrency:我們設置的最大并發量MaxConcurrentRequests是10,我們的壓測工具使用的是100并發,所有會觸發這個熔斷
  • circuitBreaker and err is hystrix: circuit open:我們設置熔斷開啟的請求數量RequestVolumeThreshold是100,所以當10s內的請求數量大于100時就會觸發熔斷。

簡單對上面的例子做一個解析:

  • 添加接口級的熔斷中間件
  • 初始化熔斷相關配置
  • 開啟dashboard 可視化hystrix的上報信息,瀏覽器打開http://localhost:81,可以看到如下結果:

hystrix-go流程分析

本來想對源碼進行分析,代碼量有點大,所以就針對流程來分析,順便看一些核心代碼。

配置熔斷規則

既然是熔斷,就要有熔斷規則,我們可以調用兩個方法配置熔斷規則,不會最終調用的都是ConfigureCommand,這里沒有特別的邏輯,如果我們沒有配置,系統將使用默認熔斷規則:

  1. var ( 
  2.  // DefaultTimeout is how long to wait for command to complete, in milliseconds 
  3.  DefaultTimeout = 1000 
  4.  // DefaultMaxConcurrent is how many commands of the same type can run at the same time 
  5.  DefaultMaxConcurrent = 10 
  6.  // DefaultVolumeThreshold is the minimum number of requests needed before a circuit can be tripped due to health 
  7.  DefaultVolumeThreshold = 20 
  8.  // DefaultSleepWindow is how long, in milliseconds, to wait after a circuit opens before testing for recovery 
  9.  DefaultSleepWindow = 5000 
  10.  // DefaultErrorPercentThreshold causes circuits to open once the rolling measure of errors exceeds this percent of requests 
  11.  DefaultErrorPercentThreshold = 50 
  12.  // DefaultLogger is the default logger that will be used in the Hystrix package. By default prints nothing. 
  13.  DefaultLogger = NoopLogger{} 

配置規則如下:

  • Timeout:定義執行command的超時時間,時間單位是ms,默認時間是1000ms;
  • MaxConcurrnetRequests:定義command的最大并發量,默認值是10并發量;
  • SleepWindow:熔斷器被打開后使用,在熔斷器被打開后,根據SleepWindow設置的時間控制多久后嘗試服務是否可用,默認時間為5000ms;
  • RequestVolumeThreshold:判斷熔斷開關的條件之一,統計10s(代碼中寫死了)內請求數量,達到這個請求數量后再根據錯誤率判斷是否要開啟熔斷;
  • ErrorPercentThreshold:判斷熔斷開關的條件之一,統計錯誤百分比,請求數量大于等于RequestVolumeThreshold并且錯誤率到達這個百分比后就會啟動熔斷 默認值是50;

這些規則根據command的name進行區分存放到一個map中。

執行command

執行command主要可以調用四個方法,分別是:

  1. func Go(name string, run runFunc, fallback fallbackFunc) 
  2. func GoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC)  
  3. func Do(name string, run runFunc, fallback fallbackFunc) 
  4. func DoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) 

Do內部調用的Doc方法,Go內部調用的是Goc方法,在Doc方法內部最終調用的還是Goc方法,只是在Doc方法內做了同步邏輯:

  1. func DoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) error { 
  2.   ..... 省略部分封裝代碼 
  3.   var errChan chan error 
  4.  if fallback == nil { 
  5.   errChan = GoC(ctx, name, r, nil) 
  6.  } else { 
  7.   errChan = GoC(ctx, name, r, f) 
  8.  } 
  9.  
  10.  select { 
  11.  case <-done: 
  12.   return nil 
  13.  case err := <-errChan: 
  14.   return err 
  15.  } 

因為他們最終都是調用的Goc方法,所以我們執行分析Goc方法的內部邏輯;代碼有點長,我們分邏輯來分析:

創建command對象

  1. cmd := &command{ 
  2.  run:      run, 
  3.  fallback: fallback, 
  4.  start:    time.Now(), 
  5.  errChan:  make(chan error, 1), 
  6.  finished: make(chan bool, 1), 
  7. // 獲取熔斷器 
  8. circuit, _, err := GetCircuit(name
  9. if err != nil { 
  10.  cmd.errChan <- err 
  11.  return cmd.errChan 

介紹一下command的數據結構:

  1. type command struct { 
  2.  sync.Mutex 
  3.  
  4.  ticket      *struct{} 
  5.  start       time.Time 
  6.  errChan     chan error 
  7.  finished    chan bool 
  8.  circuit     *CircuitBreaker 
  9.  run         runFuncC 
  10.  fallback    fallbackFuncC 
  11.  runDuration time.Duration 
  12.  events      []string 

字段介紹:

  • ticket:用來做最大并發量控制,這個就是一個令牌
  • start:記錄command執行的開始時間
  • errChan:記錄command執行錯誤
  • finished:標志command執行結束,用來做協程同步
  • circuit:存儲熔斷器相關信息
  • run:應用程序
  • fallback:應用程序執行失敗后要執行的函數
  • runDuration:記錄command執行消耗時間
  • events:events主要是存儲事件類型信息,比如執行成功的success,或者失敗的timeout、context_canceled等

上段代碼重點是GetCircuit方法,這一步的目的就是獲取熔斷器,使用動態加載的方式,如果沒有就創建一個熔斷器,熔斷器結構如下:

  1. type CircuitBreaker struct { 
  2.  Name                   string 
  3.  open                   bool 
  4.  forceOpen              bool 
  5.  mutex                  *sync.RWMutex 
  6.  openedOrLastTestedTime int64 
  7.  
  8.  executorPool *executorPool 
  9.  metrics      *metricExchange 

解釋一下這幾個字段:

  • name:熔斷器的名字,其實就是創建的command名字
  • open:判斷熔斷器是否打開的標志
  • forceopen:手動觸發熔斷器的開關,單元測試使用
  • mutex:使用讀寫鎖保證并發安全
  • openedOrLastTestedTime:記錄上一次打開熔斷器的時間,因為要根據這個時間和SleepWindow時間來做恢復嘗試
  • executorPool:用來做流量控制,因為我們有一個最大并發量控制,就是根據這個來做的流量控制,每次請求都要獲取令牌

metrics:用來上報執行狀態的事件,通過它把執行狀態信息存儲到實際熔斷器執行各個維度狀態 (成功次數,失敗次數,超時……) 的數據集合中。

后面會單獨分析executorPool、metrics的實現邏輯。

定義令牌相關的方法和變量

因為我們有一個條件是最大并發控制,采用的是令牌的方式進行流量控制,每一個請求都要獲取一個令牌,使用完畢要把令牌還回去,先看一下這段代碼:

  1. ticketCond := sync.NewCond(cmd) 
  2. ticketChecked := false 
  3. // When the caller extracts error from returned errChan, it's assumed that 
  4. // the ticket's been returned to executorPool. Therefore, returnTicket() can 
  5. // not run after cmd.errorWithFallback(). 
  6. returnTicket := func() { 
  7.  cmd.Lock() 
  8.  // Avoid releasing before a ticket is acquired. 
  9.  for !ticketChecked { 
  10.   ticketCond.Wait() 
  11.  } 
  12.  cmd.circuit.executorPool.Return(cmd.ticket) 
  13.  cmd.Unlock() 

使用sync.NewCond創建一個條件變量,用來協調通知你可以歸還令牌了。

然后定義一個返回令牌的方法,調用Return方法歸還令牌。

定義上報執行事件的方法

前面我們也提到了,我們的熔斷器會上報執行狀態的事件,通過它把執行狀態信息存儲到實際熔斷器執行各個維度狀態 (成功次數,失敗次數,超時……) 的數據集合中。所以要定義一個上報的方法:

  1. reportAllEvent := func() { 
  2.  err := cmd.circuit.ReportEvent(cmd.events, cmd.start, cmd.runDuration) 
  3.  if err != nil { 
  4.   log.Printf(err.Error()) 
  5.  } 

開啟協程一:執行應用程序邏輯 - runFunc

協程一的主要目的就是執行應用程序邏輯:

  1. go func() { 
  2.   defer func() { cmd.finished <- true }() // 標志協程一的command執行結束,同步到協程二 
  3.  
  4.   // 當最近執行的并發數量超過閾值并且錯誤率很高時,就會打開熔斷器。  
  5.    // 如果熔斷器打開,直接拒絕拒絕請求并返回令牌,當感覺健康狀態恢復時,熔斷器將允許新的流量。 
  6.   if !cmd.circuit.AllowRequest() { 
  7.    cmd.Lock() 
  8.    // It's safe for another goroutine to go ahead releasing a nil ticket. 
  9.    ticketChecked = true 
  10.    ticketCond.Signal() // 通知釋放ticket信號 
  11.    cmd.Unlock() 
  12.       // 使用sync.Onece保證只執行一次。 
  13.    returnOnce.Do(func() { 
  14.         // 返還令牌 
  15.     returnTicket() 
  16.         // 執行fallback邏輯 
  17.     cmd.errorWithFallback(ctx, ErrCircuitOpen) 
  18.         // 上報狀態事件 
  19.     reportAllEvent() 
  20.    }) 
  21.    return 
  22.   } 
  23.    // 控制并發 
  24.   cmd.Lock() 
  25.   select { 
  26.     // 獲取到令牌 
  27.   case cmd.ticket = <-circuit.executorPool.Tickets: 
  28.       // 發送釋放令牌信號 
  29.    ticketChecked = true 
  30.    ticketCond.Signal() 
  31.    cmd.Unlock() 
  32.   default
  33.       // 沒有令牌可用了, 也就是達到最大并發數量則直接處理fallback邏輯 
  34.    ticketChecked = true 
  35.    ticketCond.Signal() 
  36.    cmd.Unlock() 
  37.    returnOnce.Do(func() { 
  38.     returnTicket() 
  39.     cmd.errorWithFallback(ctx, ErrMaxConcurrency) 
  40.     reportAllEvent() 
  41.    }) 
  42.    return 
  43.   } 
  44.   // 執行應用程序邏輯 
  45.   runStart := time.Now() 
  46.   runErr := run(ctx) 
  47.   returnOnce.Do(func() { 
  48.    defer reportAllEvent() // 狀態事件上報 
  49.       // 統計應用程序執行時長 
  50.    cmd.runDuration = time.Since(runStart) 
  51.       // 返還令牌 
  52.    returnTicket() 
  53.       // 如果應用程序執行失敗執行fallback函數 
  54.    if runErr != nil { 
  55.     cmd.errorWithFallback(ctx, runErr) 
  56.     return 
  57.    } 
  58.    cmd.reportEvent("success"
  59.   }) 
  60.  }() 

總結一下這個協程:

  • 判斷熔斷器是否打開,如果打開了熔斷器直接進行熔斷,不在進行后面的請求
  • 運行應用程序邏輯

開啟協程二:同步協程一并監聽錯誤

先看代碼:

  1. go func() { 
  2.     //  使用定時器來做超時控制,這個超時時間就是我們配置的,默認1000ms 
  3.   timer := time.NewTimer(getSettings(name).Timeout) 
  4.   defer timer.Stop() 
  5.  
  6.   select { 
  7.       // 同步協程一 
  8.   case <-cmd.finished: 
  9.    // returnOnce has been executed in another goroutine 
  10.        
  11.     // 是否收到context取消信號 
  12.   case <-ctx.Done(): 
  13.    returnOnce.Do(func() { 
  14.     returnTicket() 
  15.     cmd.errorWithFallback(ctx, ctx.Err()) 
  16.     reportAllEvent() 
  17.    }) 
  18.    return 
  19.     // command執行超時了 
  20.   case <-timer.C: 
  21.    returnOnce.Do(func() { 
  22.     returnTicket() 
  23.     cmd.errorWithFallback(ctx, ErrTimeout) 
  24.     reportAllEvent() 
  25.    }) 
  26.    return 
  27.   } 
  28.  }() 

這個協程的邏輯比較清晰明了,目的就是監聽業務執行被取消以及超時。

畫圖總結command執行流程

上面我們都是通過代碼來進行分析的,看起來還是有點亂,最后畫個圖總結一下:

上面我們分析了整個具體流程,接下來我們針對一些核心點就行分析

上報狀態事件

hystrix-go為每一個Command設置了一個默認統計控制器,用來保存熔斷器的所有狀態,包括調用次數、失敗次數、被拒絕次數等,存儲指標結構如下:

  1. type DefaultMetricCollector struct { 
  2.  mutex *sync.RWMutex 
  3.  
  4.  numRequests *rolling.Number 
  5.  errors      *rolling.Number 
  6.  
  7.  successes               *rolling.Number 
  8.  failures                *rolling.Number 
  9.  rejects                 *rolling.Number 
  10.  shortCircuits           *rolling.Number 
  11.  timeouts                *rolling.Number 
  12.  contextCanceled         *rolling.Number 
  13.  contextDeadlineExceeded *rolling.Number 
  14.  
  15.  fallbackSuccesses *rolling.Number 
  16.  fallbackFailures  *rolling.Number 
  17.  totalDuration     *rolling.Timing 
  18.  runDuration       *rolling.Timing 

使用rolling.Number結構保存狀態指標,使用rolling.Timing保存時間指標。

最終監控上報都依靠metricExchange來實現,數據結構如下:

  1. type metricExchange struct { 
  2.  Name    string 
  3.  Updates chan *commandExecution 
  4.  Mutex   *sync.RWMutex 
  5.  
  6.  metricCollectors []metricCollector.MetricCollector 

上報command的信息結構:

  1. type commandExecution struct { 
  2.  Types            []string      `json:"types"` // 區分事件類型,比如success、failure.... 
  3.  Start            time.Time     `json:"start_time"` // command開始時間 
  4.  RunDuration      time.Duration `json:"run_duration"` // command結束時間 
  5.  ConcurrencyInUse float64       `json:"concurrency_inuse"` // command 線程池使用率 

說了這么多,大家還是有點懵,其實用一個類圖就能表明他們之間的關系:

我們可以看到類mertricExchange提供了一個Monitor方法,這個方法主要邏輯就是監聽狀態事件,然后寫入指標,所以整個上報流程就是這個樣子:

流量控制

hystrix-go對流量控制采用的是令牌算法,能得到令牌的就可以執行后繼的工作,執行完后要返還令牌。結構體executorPool就是hystrix-go 流量控制的具體實現。字段Max就是每秒最大的并發值。

  1. type executorPool struct { 
  2.  Name    string 
  3.  Metrics *poolMetrics // 上報執行數量指標 
  4.  Max     int // 最大并發數量 
  5.  Tickets chan *struct{} // 代表令牌 

這里還有一個上報指標,這個又單獨實現一套方法用來統計執行數量,比如執行的總數量、最大并發數等,我們依賴畫一個類圖來表示:

上報執行數量邏輯與上報狀態事件的邏輯是一樣的,使用channel進行數據通信的,上報與返還令牌都在Return方法中:

  1. func (p *executorPool) Return(ticket *struct{}) { 
  2.  if ticket == nil { 
  3.   return 
  4.  } 
  5.  
  6.  p.Metrics.Updates <- poolMetricsUpdate{ 
  7.   activeCount: p.ActiveCount(), 
  8.  } 
  9.  p.Tickets <- ticket 

主要邏輯兩步:

  • 上報當前可用的令牌數
  • 返回令牌

熔斷器

我們最后來分析熔斷器中一個比較重要的方法:AllowRequest,我們在執行Command是會根據這個方法來判斷是否可以執行command,接下來我們就來看一下這個判斷的主要邏輯:

  1. func (circuit *CircuitBreaker) AllowRequest() bool { 
  2.  return !circuit.IsOpen() || circuit.allowSingleTest() 

內部就是調用IsOpen()、allowSingleTest這兩個方法:

  • IsOpen()
  1. func (circuit *CircuitBreaker) IsOpen() bool { 
  2.  circuit.mutex.RLock() 
  3.  o := circuit.forceOpen || circuit.open 
  4.  circuit.mutex.RUnlock() 
  5.  // 熔斷已經開啟 
  6.  if o { 
  7.   return true 
  8.  } 
  9.  // 判斷10s內的并發數是否超過設置的最大并發數,沒有超過時,不需要開啟熔斷器 
  10.  if uint64(circuit.metrics.Requests().Sum(time.Now())) < getSettings(circuit.Name).RequestVolumeThreshold { 
  11.   return false 
  12.  } 
  13.  // 此時10s內的并發數已經超過設置的最大并發數了,如果此時系統錯誤率超過了預設值,那就開啟熔斷器 
  14.  if !circuit.metrics.IsHealthy(time.Now()) { 
  15.   //  
  16.   circuit.setOpen() 
  17.   return true 
  18.  } 
  19.  
  20.  return false 
  • allowSingleTest()

先解釋一下為什么要有這個方法,還記得我們之前設置了一個熔斷規則中的SleepWindow嗎,如果在開啟熔斷的情況下,在SleepWindow時間后進行嘗試,這個方法的目的就是干這個的:

  1. func (circuit *CircuitBreaker) allowSingleTest() bool { 
  2.  circuit.mutex.RLock() 
  3.  defer circuit.mutex.RUnlock() 
  4.   
  5.   // 獲取當前時間戳 
  6.  now := time.Now().UnixNano() 
  7.  openedOrLastTestedTime := atomic.LoadInt64(&circuit.openedOrLastTestedTime) 
  8.   // 當前熔斷器是開啟狀態,當前的時間已經大于 (上次開啟熔斷器的時間 +SleepWindow 的時間) 
  9.  if circuit.open && now > openedOrLastTestedTime+getSettings(circuit.Name).SleepWindow.Nanoseconds() { 
  10.     // 替換openedOrLastTestedTime 
  11.   swapped := atomic.CompareAndSwapInt64(&circuit.openedOrLastTestedTime, openedOrLastTestedTime, now) 
  12.   if swapped { 
  13.    log.Printf("hystrix-go: allowing single test to possibly close circuit %v", circuit.Name
  14.   } 
  15.   return swapped 
  16.  } 

這里只看到了熔斷器被開啟的設置了,但是沒有關閉熔斷器的邏輯,因為關閉熔斷器的邏輯是在上報狀態指標的方法ReportEvent內實現,我們最后再看一下ReportEvent的實現:

  1. func (circuit *CircuitBreaker) ReportEvent(eventTypes []string, start time.Time, runDuration time.Duration) error { 
  2.  if len(eventTypes) == 0 { 
  3.   return fmt.Errorf("no event types sent for metrics"
  4.  } 
  5.   
  6.  circuit.mutex.RLock() 
  7.  o := circuit.open 
  8.  circuit.mutex.RUnlock() 
  9.   // 上報的狀態事件是success 并且當前熔斷器是開啟狀態,則說明下游服務正常了,可以關閉熔斷器了 
  10.  if eventTypes[0] == "success" && o { 
  11.   circuit.setClose() 
  12.  } 
  13.  
  14.  var concurrencyInUse float64 
  15.  if circuit.executorPool.Max > 0 { 
  16.   concurrencyInUse = float64(circuit.executorPool.ActiveCount()) / float64(circuit.executorPool.Max
  17.  } 
  18.  
  19.  select { 
  20.     // 上報狀態指標,與上文的monitor呼應 
  21.  case circuit.metrics.Updates <- &commandExecution{ 
  22.   Types:            eventTypes, 
  23.   Start:            start, 
  24.   RunDuration:      runDuration, 
  25.   ConcurrencyInUse: concurrencyInUse, 
  26.  }: 
  27.  default
  28.   return CircuitError{Message: fmt.Sprintf("metrics channel (%v) is at capacity", circuit.Name)} 
  29.  } 
  30.  
  31.  return nil 

可視化hystrix的上報信息

通過上面的分析我們知道hystrix-go上報了狀態事件、執行數量事件,那么這些指標我們可以怎么查看呢?

設計者早就想到了這個問題,所以他們做了一個dashborad,可以查看hystrix的上報信息,使用方法只需在服務啟動時添加如下代碼:

  1. hystrixStreamHandler := hystrix.NewStreamHandler() 
  2. hystrixStreamHandler.Start() 
  3. go http.ListenAndServe(net.JoinHostPort("""81"), hystrixStreamHandler) 

然后打開瀏覽器:http://127.0.0.1:81/hystrix-dashboard,進行觀測吧。

總結

故事終于接近尾聲了,一個熔斷機制的實現確實不簡單,要考慮的因素也是方方面面,尤其在微服務架構下,熔斷機制是必不可少的,不僅要在框架層面實現熔斷機制,還要根據具體業務場景使用熔斷機制,這些都是值得我們深思熟慮的。本文介紹的熔斷框架實現的還是比較完美的,這種優秀的設計思路值得我們學習。

 

文中代碼已上傳github:https://github.com/asong2020/Golang_Dream/tree/master/code_demo/hystrix_demo,歡迎star。

 

責任編輯:武曉燕 來源: Golang夢工廠
相關推薦

2020-09-26 10:56:33

服務器熔斷服務隔離

2022-01-17 10:55:50

微服務API網關

2020-07-28 08:32:57

微服務API網關熔斷

2025-03-13 00:55:00

微服務架構系統

2017-07-03 09:50:07

Spring Clou微服務架構

2018-12-06 14:56:46

微服務隔離熔斷

2024-06-05 06:43:20

2017-07-04 17:35:46

微服務架構Spring Clou

2020-11-27 10:50:06

微服務架構框架

2021-03-05 11:09:46

Go框架微服務

2023-12-13 07:19:01

微服務架構Golang

2025-01-20 00:10:00

Go語言Kratos

2024-06-27 10:50:01

2024-04-09 07:27:06

微服務架構YAML

2024-12-23 00:22:55

2017-07-17 15:50:17

微服務Docker架構

2021-06-22 18:00:09

微服務架構系統

2025-01-13 00:00:07

Go語言微服務

2022-05-13 09:05:49

Hystrix熔斷器

2024-12-30 00:38:23

Go語言微服務
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 精品视频在线观看 | 国内久久 | 亚洲国产一区在线 | 日韩久久久久久 | 男人视频网站 | 国产h在线 | 视频在线h | 精品一二三区视频 | 久久久黑人 | 在线观看精品视频网站 | 中国av在线免费观看 | 狠狠躁夜夜躁人人爽天天高潮 | 免费的色网站 | 搞黄网站在线观看 | 91.色 | 美国a级毛片免费视频 | 亚洲激情综合网 | 午夜影院免费体验区 | 亚洲国产精品精华素 | 日本涩涩网 | 成人h视频在线 | 国产精彩视频一区 | 日本视频中文字幕 | 精品一区二区在线观看 | 欧美一级在线免费 | 91伊人| 久久亚洲精品国产精品紫薇 | 国产精品99一区二区 | 欧美日韩国产精品一区 | 亚洲小视频 | 夜色www国产精品资源站 | 亚洲一区在线播放 | 久久33 | 欧美精品在线播放 | 久久男人 | 国产免费观看一区 | 亚洲一区 | 亚洲在线一区二区 | 国产精品一区在线观看 | 国产精品久久久久久久久久久久 | 波多野结衣精品在线 |