成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

使用Golang構建一萬+每秒處理請求的高性能系統

開發 后端
下面這篇文章給大家的提醒便是,我們只有在充分理解語言本身的特性,并巧妙加以利用的前提下,才能寫出高性能、高并發的處理程序,才能為企業節省成本,為客戶提供好的服務。

背景

一談到golang,大家的第一感覺就是高并發,高性能。但是語言本身的優勢是不是,就讓程序員覺得編寫高性能的處理系統變得輕而易舉,水到渠成呢。下面這篇文章給大家的提醒便是,我們只有在充分理解語言本身的特性,并巧妙加以利用的前提下,才能寫出高性能、高并發的處理程序,才能為企業節省成本,為客戶提供好的服務。

每分鐘處理百萬請求

?Malwarebytes的首席架構師Marcio Castilho分享了他在公司高速發展過程中,開發高性能數據處理系統的經歷。整個過程向我們詳細展示了如何不斷的優化與提升系統性能的過程,值得我們思考與學習。大佬也不是一下子就給出最優方案的。

首先作者的目標是能夠處理來自數百萬個端點的大量POST請求,然后將接收到的JSON 請求體,寫入Amazon S3,以便map-reduce稍后對這些數據進行操作。這個場景和我們現在的很多互聯網系統的場景是一樣的。傳統的處理方式是,使用隊列等中間件,做緩沖,消峰,然后后端一堆worker來異步處理。因為作者也做了兩年GO開發了,經過討論他們決定使用GO來完成這項工作。

第一版代碼

下面是Marcio給出的本能第一反應的解決方案,和大家的思路是不是一致的。首先他給出了負載(Payload)還有負載集合(PayloadCollection)的定義,然后他寫了一個處理web請求的Handler(payloadHandler)。在payloadHandler里面,由于把負載上傳S3比較耗時,所以針對每個負載,啟動GO的協程來異步上傳。具體的實現,大家可以看下面48-50行貼出的代碼。

type PayloadCollection struct {
WindowsVersion string `json:"version"`
Token string `json:"token"`
Payloads []Payload `json:"data"`
}

type Payload struct {
// [redacted]
}

func (p *Payload) UploadToS3() error {
// the storageFolder method ensures that there are no name collision in
// case we get same timestamp in the key name
storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())

bucket := S3Bucket

b := new(bytes.Buffer)
encodeErr := json.NewEncoder(b).Encode(payload)
if encodeErr != nil {
return encodeErr
}

// Everything we post to the S3 bucket should be marked 'private'
var acl = s3.Private
var contentType = "application/octet-stream"

return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}

func payloadHandler(w http.ResponseWriter, r *http.Request) {

if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}

// Read the body into a string for json decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}

// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
go payload.UploadToS3() // <----- DON'T DO THIS
}

w.WriteHeader(http.StatusOK)
}

那結果怎么樣呢?Marcio和他的同事們低估了請求的量級,而且上面的實現方法,又無法控制GO協程的生成數量,這個版本部署到生產后,很快就崩潰了。Marcio畢竟是牛逼架構師,他很快根據問題給出了新的解決方案。

第二版代碼

第一個版本的假設是,請求的生命周期都是很短的,不會有長時間的阻塞操作耗費資源。在這個前提下,我們可以根據請求不停的生成GO協程來處理請求。但是事實并非如此,Marcio轉變思路,引入隊列的思想。創建了Buffered Channel,把請求緩沖起來,然后再通過一個同步處理器從Channel里面把請求取出,上傳S3.這是典型的生產者-消費者模型。

處理流程

這個版本的問題是,首先同步處理器的處理能力有限,他的處理能力比不上請求到達的速度。很快Buffered Channel就會滿了,然后后續的客戶請求都會被阻塞。在Marcio他們部署這個有缺陷的版本幾分鐘后,延遲率會以固定的速率增加。

系統部署后的延遲

第三版代碼

Marcio引入了2層Channel,一個Channel用于緩存請求,是一個全局Channel,本文中就是下面的JobQueue,一個Channel用于控制每個請求隊列并發多少個worker.從下面的代碼可以看到,每個Worker都有兩個關鍵屬性,一個是WorkerPool(這個也是一個全局的變量,即所有的worker的這個屬性都指向同一個,worker在創建后,會把自身的JobChannel寫入WorkerPool完成注冊),一個是JobChannel(用于緩存分配需要本worker處理的請求作業)。web處理請求payloadHandler,會把接收到的請求放到JobQueue后,就結束并返回。

var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)

// Job represents the job to be run
type Job struct {
Payload Payload
}

// A buffered channel that we can send work requests on.
var JobQueue chan Job

// Worker represents the worker that executes the job
type Worker struct {
WorkerPool chan chan Job
JobChannel chan Job
quit chan bool
}

func NewWorker(workerPool chan chan Job) Worker {
return Worker{
WorkerPool: workerPool,
JobChannel: make(chan Job),
quit: make(chan bool)}
}

// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {
go func() {
for {
// register the current worker into the worker queue.
w.WorkerPool <- w.JobChannel

select {
case job := <-w.JobChannel:
// we have received a work request.
if err := job.Payload.UploadToS3(); err != nil {
log.Errorf("Error uploading to S3: %s", err.Error())
}

case <-w.quit:
// we have received a signal to stop
return
}
}
}()
}

// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
go func() {
w.quit <- true
}()
}

func payloadHandler(w http.ResponseWriter, r *http.Request) {

if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}

// Read the body into a string for json decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}

// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {

// let's create a job with the payload
work := Job{Payload: payload}

// Push the work onto the queue.
JobQueue <- work
}

w.WriteHeader(http.StatusOK)
}

請求任務都放到JobQueue里面了,如何監聽隊列,并觸發請求呢。這個地方又出現了Dispatcher,我們在另一篇文章中有詳細探討(基于dispatcher模式的事件與數據分發處理器的go語言實現:
https://www.toutiao.com/article/7186518439215841827/)。在系統啟動的時候,我們會通過NewDispatcher生成Dispatcher,并調用它的Run方法。

type Dispatcher struct {
// A pool of workers channels that are registered with the dispatcher
WorkerPool chan chan Job
}

func NewDispatcher(maxWorkers int) *Dispatcher {
pool := make(chan chan Job, maxWorkers)
return &Dispatcher{WorkerPool: pool}
}

func (d *Dispatcher) Run() {
// starting n number of workers
for i := 0; i < d.maxWorkers; i++ {
worker := NewWorker(d.pool)
worker.Start()
}

go d.dispatch()
}

func (d *Dispatcher) dispatch() {
for {
select {
case job := <-JobQueue:
// a job request has been received
go func(job Job) {
// try to obtain a worker job channel that is available.
// this will block until a worker is idle
jobChannel := <-d.WorkerPool

// dispatch the job to the worker job channel
jobChannel <- job
}(job)
}
}
}

Dispatcher與Worker的關系如下圖所示:

第三方案整體流程

1.客戶請求到Handler。

2.Handler把請求作業寫入JobQueue。

3.Dispatcher的dispatcher方法,從全局JobQueue中讀取Job。

4.Dispatcher的dispatcher方法同時也從WorkerPool中讀取JobChannel(屬于某一個Worker,即每一個Worker都有一個JobChannel)。

5.Dispatcher把獲得的Job寫入JobChannel,即分配某個Worker。

6.Worker從自己的JobChannel中獲取作業并執行。執行完成后,空閑后,把自己的JobChannel再次寫入WorkerPool等待分配。

這樣實現后,效果明顯,同時需要的機器數量大幅降低了,從100臺降低到20臺。

第三方案效果

部署機器變化

這里的兩層,一層是全局JobQueue,緩存任務。第二個是每個Worker都有自己的執行隊列,一臺機器可以創建多個Worker。這樣就提升了處理能力。

方案對比

方案思想

實現難度

方案問題

GO協程原生方法

簡單

無法應對大規模請求,無法控制協程數量

GO 單層Channel

簡單

當處理能力達不到請求速率后,隊列滿,系統崩潰

GO兩層Channel

復雜


參考資料:

http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/。

https://github.com/ReGYChang/zero/blob/main/pkg/utils/worker_pool.go。

責任編輯:姜華 來源: 今日頭條
相關推薦

2019-01-02 16:38:37

Golang彈幕

2019-01-02 16:47:46

Golang彈幕

2019-01-02 16:50:30

Golang彈幕

2025-06-03 08:15:00

微服務架構異步任務隊列

2025-03-04 08:00:00

機器學習Rust開發

2023-12-14 08:01:08

事件管理器Go

2023-12-01 07:06:14

Go命令行性能

2011-02-13 09:37:55

ASP.NET

2023-12-26 00:58:53

Web應用Go語言

2024-01-05 07:38:55

2011-12-15 13:28:57

2020-06-05 07:20:41

測試自動化環境

2023-01-11 15:17:01

gRPC.NET 7

2025-05-28 05:10:00

策略Spring性能

2018-03-26 11:39:13

LinuxAnsible計算系統

2017-11-16 09:35:56

高性能高可用架構

2011-10-21 14:20:59

高性能計算HPC虛擬化

2011-10-25 13:13:35

HPC高性能計算Platform

2022-12-09 08:40:56

高性能內存隊列

2019-10-11 10:44:30

Go語言數據庫軟件
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 欧美亚洲一区二区三区 | 亚洲激精日韩激精欧美精品 | 成人av免费 | 国产精品成人一区二区 | 国产1区在线 | 日韩欧美精品一区 | 欧美一级大片免费看 | 在线免费中文字幕 | 久久久久久高潮国产精品视 | 成人av高清在线观看 | 久久久久成人精品 | 日韩在线免费看 | 男人天堂视频在线观看 | 国产精品不卡一区 | 91精品久久久久 | 亚洲网站在线播放 | 99精品视频网 | 久久精品这里精品 | 国产无人区一区二区三区 | 亚洲精品一区二区三区中文字幕 | 国精产品一区二区三区 | 国产精品久久免费观看 | www.天天操 | 99re国产 | 日本精品一区二区三区在线观看 | 欧美日韩国产在线 | caoporn地址 | 男女污网站 | 亚洲一区国产精品 | 中文字幕日韩一区 | 国产成人小视频 | 亚洲夜夜爽 | 色综合久久久久 | 国产精品精品久久久 | 精品亚洲一区二区 | 国产一区二区在线观看视频 | 国产女人叫床高潮大片免费 | 中文字幕av高清 | 中文字幕在线中文 | 六月成人网 | 91社影院在线观看 |