成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

遠(yuǎn)程寫入prometheus存儲(chǔ)

運(yùn)維 系統(tǒng)運(yùn)維
prometheus一般都是采用pull方式獲取數(shù)據(jù),但是有一些情況下,不方便配置exporter,就希望能通過push的方式上傳指標(biāo)數(shù)據(jù)。

[[410873]]

簡(jiǎn)介

prometheus一般都是采用pull方式獲取數(shù)據(jù),但是有一些情況下,不方便配置exporter,就希望能通過push的方式上傳指標(biāo)數(shù)據(jù)。

1、可以采用pushgateway的方式,推送到pushgateway,然后prometheus通過pushgateway拉取數(shù)據(jù)。

2、在新版本中增加了一個(gè)參數(shù):--enable-feature=remote-write-receiver,允許遠(yuǎn)程通過接口/api/v1/write,直接寫數(shù)據(jù)到prometheus里面。

pushgateway在高并發(fā)的情況下還是比較消耗資源的,特別是開啟一致性檢查,高并發(fā)寫入的時(shí)候特別慢。

第二種方式少了一層轉(zhuǎn)發(fā),速度應(yīng)該比較快。

遠(yuǎn)程寫入prometheus存儲(chǔ)

接口

可以通過prometheus的http接口/api/v1/write提交數(shù)據(jù),這個(gè)接口的數(shù)據(jù)格式有有要求:

  • 使用POST方式提交
  • 需要經(jīng)過protobuf編碼,依賴github.com/gogo/protobuf/proto
  • 可以使用snappy進(jìn)行壓縮,依賴github.com/golang/snappy

步驟:

  1. 收集指標(biāo)名稱,時(shí)間戳,值和標(biāo)簽
  2. 將數(shù)據(jù)轉(zhuǎn)換成prometheus需要的數(shù)據(jù)格式
  3. 使用proto對(duì)數(shù)據(jù)進(jìn)行編碼,并用snappy進(jìn)行壓縮
  4. 通過httpClient提交數(shù)據(jù)
  1. package prome 
  2.  
  3. import ( 
  4.     "bufio" 
  5.     "bytes" 
  6.     "context" 
  7.     "io" 
  8.     "io/ioutil" 
  9.     "net/http" 
  10.     "net/url" 
  11.     "regexp" 
  12.     "time" 
  13.  
  14.     "github.com/gogo/protobuf/proto" 
  15.     "github.com/golang/snappy" 
  16.     "github.com/opentracing-contrib/go-stdlib/nethttp" 
  17.     opentracing "github.com/opentracing/opentracing-go" 
  18.     "github.com/pkg/errors" 
  19.     "github.com/prometheus/common/model" 
  20.     "github.com/prometheus/prometheus/pkg/labels" 
  21.     "github.com/prometheus/prometheus/prompb" 
  22.  
  23. type RecoverableError struct { 
  24.     error 
  25.  
  26. type HttpClient struct { 
  27.     url     *url.URL 
  28.     Client  *http.Client 
  29.     timeout time.Duration 
  30.  
  31. var MetricNameRE = regexp.MustCompile(`^[a-zA-Z_:][a-zA-Z0-9_:]*$`) 
  32.  
  33. type MetricPoint struct { 
  34.     Metric  string            `json:"metric"` // 指標(biāo)名稱 
  35.     TagsMap map[string]string `json:"tags"`   // 數(shù)據(jù)標(biāo)簽 
  36.     Time    int64             `json:"time"`   // 時(shí)間戳,單位是秒 
  37.     Value   float64           `json:"value"`  // 內(nèi)部字段,最終轉(zhuǎn)換之后的float64數(shù)值 
  38.  
  39. func (c *HttpClient) remoteWritePost(req []byte) error { 
  40.     httpReq, err := http.NewRequest("POST", c.url.String(), bytes.NewReader(req)) 
  41.     if err != nil { 
  42.         return err 
  43.     } 
  44.     httpReq.Header.Add("Content-Encoding""snappy"
  45.     httpReq.Header.Set("Content-Type""application/x-protobuf"
  46.     httpReq.Header.Set("User-Agent""opcai"
  47.     httpReq.Header.Set("X-Prometheus-Remote-Write-Version""0.1.0"
  48.     ctx, cancel := context.WithTimeout(context.Background(), c.timeout) 
  49.     defer cancel() 
  50.  
  51.     httpReq = httpReq.WithContext(ctx) 
  52.  
  53.     if parentSpan := opentracing.SpanFromContext(ctx); parentSpan != nil { 
  54.         var ht *nethttp.Tracer 
  55.         httpReq, ht = nethttp.TraceRequest( 
  56.             parentSpan.Tracer(), 
  57.             httpReq, 
  58.             nethttp.OperationName("Remote Store"), 
  59.             nethttp.ClientTrace(false), 
  60.         ) 
  61.         defer ht.Finish() 
  62.     } 
  63.  
  64.     httpResp, err := c.Client.Do(httpReq) 
  65.     if err != nil { 
  66.         // Errors from Client.Do are from (for example) network errors, so are 
  67.         // recoverable. 
  68.         return RecoverableError{err} 
  69.     } 
  70.     defer func() { 
  71.         io.Copy(ioutil.Discard, httpResp.Body) 
  72.         httpResp.Body.Close() 
  73.     }() 
  74.  
  75.     if httpResp.StatusCode/100 != 2 { 
  76.         scanner := bufio.NewScanner(io.LimitReader(httpResp.Body, 512)) 
  77.         line := "" 
  78.         if scanner.Scan() { 
  79.             line = scanner.Text() 
  80.         } 
  81.         err = errors.Errorf("server returned HTTP status %s: %s", httpResp.Status, line) 
  82.     } 
  83.     if httpResp.StatusCode/100 == 5 { 
  84.         return RecoverableError{err} 
  85.     } 
  86.     return err 
  87.  
  88. func buildWriteRequest(samples []*prompb.TimeSeries) ([]byte, error) { 
  89.  
  90.     req := &prompb.WriteRequest{ 
  91.         Timeseries: samples, 
  92.     } 
  93.     data, err := proto.Marshal(req) 
  94.     if err != nil { 
  95.         return nil, err 
  96.     } 
  97.     compressed := snappy.Encode(nil, data) 
  98.     return compressed, nil 
  99.  
  100. type sample struct { 
  101.     labels labels.Labels 
  102.     t      int64 
  103.     v      float64 
  104.  
  105. const ( 
  106.     LABEL_NAME = "__name__" 
  107.  
  108. func convertOne(item *MetricPoint) (*prompb.TimeSeries, error) { 
  109.     pt := prompb.TimeSeries{} 
  110.     pt.Samples = []prompb.Sample{{}} 
  111.     s := sample{} 
  112.     s.t = item.Time 
  113.     s.v = item.Value 
  114.     // name 
  115.     if !MetricNameRE.MatchString(item.Metric) { 
  116.         return &pt, errors.New("invalid metrics name"
  117.     } 
  118.     nameLs := labels.Label{ 
  119.         Name:  LABEL_NAME, 
  120.         Value: item.Metric, 
  121.     } 
  122.     s.labels = append(s.labels, nameLs) 
  123.     for k, v := range item.TagsMap { 
  124.         if model.LabelNameRE.MatchString(k) { 
  125.             ls := labels.Label{ 
  126.                 Name:  k, 
  127.                 Value: v, 
  128.             } 
  129.             s.labels = append(s.labels, ls) 
  130.         } 
  131.     } 
  132.  
  133.     pt.Labels = labelsToLabelsProto(s.labels, pt.Labels) 
  134.     // 時(shí)間賦值問題,使用毫秒時(shí)間戳 
  135.     tsMs := time.Unix(s.t, 0).UnixNano() / 1e6 
  136.     pt.Samples[0].Timestamp = tsMs 
  137.     pt.Samples[0].Value = s.v 
  138.     return &pt, nil 
  139.  
  140. func labelsToLabelsProto(labels labels.Labels, buf []*prompb.Label) []*prompb.Label { 
  141.     result := buf[:0] 
  142.     if cap(buf) < len(labels) { 
  143.         result = make([]*prompb.Label, 0, len(labels)) 
  144.     } 
  145.     for _, l := range labels { 
  146.         result = append(result, &prompb.Label{ 
  147.             Name:  l.Name
  148.             Value: l.Value, 
  149.         }) 
  150.     } 
  151.     return result 
  152.  
  153. func (c *HttpClient) RemoteWrite(items []MetricPoint) (err error) { 
  154.     if len(items) == 0 { 
  155.         return 
  156.     } 
  157.     ts := make([]*prompb.TimeSeries, len(items)) 
  158.     for i := range items { 
  159.         ts[i], err = convertOne(&items[i]) 
  160.         if err != nil { 
  161.             return 
  162.         } 
  163.     } 
  164.     data, err := buildWriteRequest(ts) 
  165.     if err != nil { 
  166.         return 
  167.     } 
  168.     err = c.remoteWritePost(data) 
  169.     return 
  170.  
  171. func NewClient(ur string, timeout time.Duration) (c *HttpClient, err error) { 
  172.     u, err := url.Parse(ur) 
  173.     if err != nil { 
  174.         return 
  175.     } 
  176.     c = &HttpClient{ 
  177.         url:     u, 
  178.         Client:  &http.Client{}, 
  179.         timeout: timeout, 
  180.     } 
  181.     return 

測(cè)試

prometheus啟動(dòng)的時(shí)候記得加參數(shù)--enable-feature=remote-write-receiver

  1. package prome 
  2.  
  3. import ( 
  4.     "testing" 
  5.     "time" 
  6.  
  7. func TestRemoteWrite(t *testing.T) { 
  8.     c, err := NewClient("http://localhost:9090/api/v1/write", 10*time.Second
  9.     if err != nil { 
  10.         t.Fatal(err) 
  11.     } 
  12.     metrics := []MetricPoint{ 
  13.         {Metric: "opcai1"
  14.             TagsMap: map[string]string{"env""testing""op""opcai"}, 
  15.             Time:    time.Now().Add(-1 * time.Minute).Unix(), 
  16.             Value:   1}, 
  17.         {Metric: "opcai2"
  18.             TagsMap: map[string]string{"env""testing""op""opcai"}, 
  19.             Time:    time.Now().Add(-2 * time.Minute).Unix(), 
  20.             Value:   2}, 
  21.         {Metric: "opcai3"
  22.             TagsMap: map[string]string{"env""testing""op""opcai"}, 
  23.             Time:    time.Now().Unix(), 
  24.             Value:   3}, 
  25.         {Metric: "opcai4"
  26.             TagsMap: map[string]string{"env""testing""op""opcai"}, 
  27.             Time:    time.Now().Unix(), 
  28.             Value:   4}, 
  29.     } 
  30.     err = c.RemoteWrite(metrics) 
  31.     if err != nil { 
  32.         t.Fatal(err) 
  33.     } 
  34.     t.Log("end..."

使用go test進(jìn)行測(cè)試

  1. go test -v 

總結(jié)

這個(gè)方法也是在看夜鶯v5的代碼的時(shí)候發(fā)現(xiàn)的,剛好有需要統(tǒng)一收集redis的監(jiān)控指標(biāo),剛好可以用上,之前用pushgateway寫的實(shí)在是慢。

 

責(zé)任編輯:姜華 來源: 今日頭條
相關(guān)推薦

2022-04-27 08:22:43

Prometheus監(jiān)控數(shù)據(jù)庫

2023-08-09 08:18:22

2023-03-02 08:00:26

后端存儲(chǔ)InfluxDB

2025-01-03 08:08:56

2021-03-01 10:20:52

存儲(chǔ)

2021-02-22 10:37:47

存儲(chǔ)Prometheus

2018-02-28 14:04:08

RMIJDBC存儲(chǔ)

2012-02-13 10:02:20

虛擬化vmware ESXilinux

2022-07-06 08:02:51

undo 日志數(shù)據(jù)庫

2020-10-14 08:33:23

Prometheus監(jiān)控體系

2020-08-12 11:11:04

云計(jì)算云存儲(chǔ)存儲(chǔ)

2021-12-22 10:29:23

Prometheus elasticsear運(yùn)維

2021-08-27 07:06:10

應(yīng)用

2021-03-31 08:02:34

Prometheus 監(jiān)控運(yùn)維

2018-04-16 08:44:51

InfluxDB TS時(shí)序數(shù)據(jù)庫存儲(chǔ)

2022-09-27 09:17:40

數(shù)據(jù)監(jiān)控

2020-04-08 17:27:00

西部數(shù)據(jù)

2009-01-11 09:59:30

遠(yuǎn)程服務(wù)摩卡遠(yuǎn)程管理

2021-11-08 09:00:00

PrometheusKubernetes集群

2022-05-18 08:32:05

服務(wù)監(jiān)控Prometheus開源
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

主站蜘蛛池模板: 亚洲一区二区中文字幕 | av免费网站在线观看 | 国产精品久久久久无码av | 亚洲国产专区 | 热久久久久 | av毛片 | 欧美精品在线一区二区三区 | 四虎影院在线免费观看 | 久久免费精品视频 | www.国产| 91精品国产综合久久婷婷香蕉 | 欧美日韩一区二区在线播放 | 色眯眯视频在线观看 | 国产一区二区三区 | 日本一区二区在线视频 | 成人精品系列 | 一区二区三区视频在线 | 国产欧美精品一区二区三区 | 久在线观看 | 手机在线观看av | 中文字幕综合 | 久久久久久久久久一区二区 | 国产精品免费在线 | 人人玩人人干 | 午夜在线精品 | 精品一区久久 | 久久国产日本 | 久久精品视频播放 | 成人午夜| 在线免费观看黄色av | av男人的天堂在线 | 日韩av在线免费 | 午夜激情一区 | 亚洲国产激情 | 日本成人中文字幕在线观看 | 夜夜干夜夜操 | 成人 在线 | 91久久| 黄色男女网站 | 国产高清在线精品 | 日韩精品一区二区三区 |