Prometheus告警規則管理
什么是Rule
Prometheus支持用戶自定義Rule規則。Rule分為兩類,一類是Recording Rule,另一類是Alerting Rule。Recording Rule的主要目的是通過PromQL可以實時對Prometheus中采集到的樣本數據進行查詢,聚合以及其它各種運算操作。而在某些PromQL較為復雜且計算量較大時,直接使用PromQL可能會導致Prometheus響應超時的情況。這時需要一種能夠類似于后臺批處理的機制能夠在后臺完成這些復雜運算的計算,對于使用者而言只需要查詢這些運算結果即可。Prometheus通過Recoding Rule規則支持這種后臺計算的方式,可以實現對復雜查詢的性能優化,提高查詢效率。
今天主要帶來告警規則的分析。Prometheus中的告警規則允許你基于PromQL表達式定義告警觸發條件,Prometheus后端對這些觸發規則進行周期性計算,當滿足觸發條件后則會觸發告警通知。
什么是告警Rule
告警是prometheus的一個重要功能,接下來從源碼的角度來分析下告警的執行流程。
怎么定義告警Rule
一條典型的告警規則如下所示:
- groups:
- - name: example
- rules:
- - alert: HighErrorRate
- #指標需要在觸發告警之前的10分鐘內大于0.5。
- expr: job:request_latency_seconds:mean5m{job="myjob"} > 0.5
- for: 10m
- labels:
- severity: page
- annotations:
- summary: High request latency
- description: description info
在告警規則文件中,我們可以將一組相關的規則設置定義在一個group下。在每一個group中我們可以定義多個告警規則(rule)。一條告警規則主要由以下幾部分組成:
- alert:告警規則的名稱。
- expr:基于PromQL表達式告警觸發條件,用于計算是否有時間序列滿足該條件。
- for:評估等待時間,可選參數。用于表示只有當觸發條件持續一段時間后才發送告警。在等待期間新產生告警的狀態為pending。
- labels:自定義標簽,允許用戶指定要附加到告警上的一組附加標簽。
- annotations:用于指定一組附加信息,比如用于描述告警詳細信息的文字等,annotations的內容在告警產生時會一同作為參數發送到Alertmanager。
Rule管理器
規則管理器會根據配置的規則,基于規則PromQL表達式告警的觸發條件,用于計算是否有時間序列滿足該條件。在滿足該條件時,將告警信息發送給告警服務。
- type Manager struct {
- opts *ManagerOptions //外部的依賴
- groups map[string]*Group //當前的規則組
- mtx sync.RWMutex //規則管理器讀寫鎖
- block chan struct{}
- done chan struct{}
- restored bool
- logger log.Logger
- }
- opts(*ManagerOptions類型):記錄了Manager實例使用到的其他模塊,例如storage模塊、notify模塊等。
- groups(map[string]*Group類型):記錄了所有的rules.Group實例,其中key由rules.Group的名稱及其所在的配置文件構成。
- mtx(sync.RWMutex類型):在讀寫groups字段時都需要獲取該鎖進行同步。
讀取Rule組配置
在Prometheus Server啟動的過程中,首先會調用Manager.Update()方法加載Rule配置文件并進行解析,其大致流程如下。
- 調用Manager.LoadGroups()方法加載并解析Rule配置文件,最終得到rules.Group實例集合。
- 停止原有的rules.Group實例,啟動新的rules.Group實例。其中會為每個rules.Group實例啟動一個goroutine,它會關聯rules.Group實例下的全部PromQL查詢。
- func (m *Manager) Update(interval time.Duration, files []string, externalLabels labels.Labels, externalURL string) error {
- m.mtx.Lock()
- defer m.mtx.Unlock()
- // 從當前文件中加載規則
- groups, errs := m.LoadGroups(interval, externalLabels, externalURL, files...)
- if errs != nil {
- for _, e := range errs {
- level.Error(m.logger).Log("msg", "loading groups failed", "err", e)
- }
- return errors.New("error loading rules, previous rule set restored")
- }
- m.restored = true
- var wg sync.WaitGroup
- //循環遍歷規則組
- for _, newg := range groups {
- // If there is an old group with the same identifier,
- // check if new group equals with the old group, if yes then skip it.
- // If not equals, stop it and wait for it to finish the current iteration.
- // Then copy it into the new group.
- //根據新的rules.Group的信息獲取規則組名
- gn := GroupKey(newg.file, newg.name)
- //根據規則組名獲取到老的規則組并刪除原有的rules.Group實例
- oldg, ok := m.groups[gn]
- delete(m.groups, gn)
- if ok && oldg.Equals(newg) {
- groups[gn] = oldg
- continue
- }
- wg.Add(1)
- //為每一個rules.Group實例啟動一個goroutine
- go func(newg *Group) {
- if ok {
- oldg.stop()
- //將老的規則組中的狀態信息復制到新的規則組
- newg.CopyState(oldg)
- }
- wg.Done()
- // Wait with starting evaluation until the rule manager
- // is told to run. This is necessary to avoid running
- // queries against a bootstrapping storage.
- <-m.block
- //調用rules.Group.run()方法,開始周期性的執行PromQl語句
- newg.run(m.opts.Context)
- }(newg)
- }
- // Stop remaining old groups.
- //停止所有老規則組的服務
- wg.Add(len(m.groups))
- for n, oldg := range m.groups {
- go func(n string, g *Group) {
- g.markStale = true
- g.stop()
- if m := g.metrics; m != nil {
- m.IterationsMissed.DeleteLabelValues(n)
- m.IterationsScheduled.DeleteLabelValues(n)
- m.EvalTotal.DeleteLabelValues(n)
- m.EvalFailures.DeleteLabelValues(n)
- m.GroupInterval.DeleteLabelValues(n)
- m.GroupLastEvalTime.DeleteLabelValues(n)
- m.GroupLastDuration.DeleteLabelValues(n)
- m.GroupRules.DeleteLabelValues(n)
- m.GroupSamples.DeleteLabelValues((n))
- }
- wg.Done()
- }(n, oldg)
- }
- wg.Wait()
- //更新規則管理器中的規則組
- m.groups = groups
- return nil
- }
運行Rule組調度方法
規則組啟動流程(Group.run):進入Group.run方法后先進行初始化等待,以使規則的運算時間在同一時刻,周期為g.interval;然后定義規則運算調度方法:iter,調度周期為g.interval;在iter方法中調用g.Eval方法執行下一層次的規則運算調度。
規則運算的調度周期g.interval,由prometheus.yml配置文件中global中的 [ evaluation_interval:| default = 1m ]指定。實現如下:
- func (g *Group) run(ctx context.Context) {
- defer close(g.terminated)
- // Wait an initial amount to have consistently slotted intervals.
- evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.interval)
- select {
- case <-time.After(time.Until(evalTimestamp))://初始化等待
- case <-g.done:
- return
- }
- ctx = promql.NewOriginContext(ctx, map[string]interface{}{
- "ruleGroup": map[string]string{
- "file": g.File(),
- "name": g.Name(),
- },
- })
- //定義規則組規則運算調度算法
- iter := func() {
- g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Inc()
- start := time.Now()
- //規則運算的入口
- g.Eval(ctx, evalTimestamp)
- timeSinceStart := time.Since(start)
- g.metrics.IterationDuration.Observe(timeSinceStart.Seconds())
- g.setEvaluationTime(timeSinceStart)
- g.setLastEvaluation(start)
- }
- // The assumption here is that since the ticker was started after having
- // waited for `evalTimestamp` to pass, the ticks will trigger soon
- // after each `evalTimestamp + N * g.interval` occurrence.
- tick := time.NewTicker(g.interval) //設置規則運算定時器
- defer tick.Stop()
- defer func() {
- if !g.markStale {
- return
- }
- go func(now time.Time) {
- for _, rule := range g.seriesInPreviousEval {
- for _, r := range rule {
- g.staleSeries = append(g.staleSeries, r)
- }
- }
- // That can be garbage collected at this point.
- g.seriesInPreviousEval = nil
- // Wait for 2 intervals to give the opportunity to renamed rules
- // to insert new series in the tsdb. At this point if there is a
- // renamed rule, it should already be started.
- select {
- case <-g.managerDone:
- case <-time.After(2 * g.interval):
- g.cleanupStaleSeries(ctx, now)
- }
- }(time.Now())
- }()
- //調用規則組規則運算的調度方法
- iter()
- if g.shouldRestore {
- // If we have to restore, we wait for another Eval to finish.
- // The reason behind this is, during first eval (or before it)
- // we might not have enough data scraped, and recording rules would not
- // have updated the latest values, on which some alerts might depend.
- select {
- case <-g.done:
- return
- case <-tick.C:
- missed := (time.Since(evalTimestamp) / g.interval) - 1
- if missed > 0 {
- g.metrics.IterationsMissed.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed))
- g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed))
- }
- evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval)
- iter()
- }
- g.RestoreForState(time.Now())
- g.shouldRestore = false
- }
- for {
- select {
- case <-g.done:
- return
- default:
- select {
- case <-g.done:
- return
- case <-tick.C:
- missed := (time.Since(evalTimestamp) / g.interval) - 1
- if missed > 0 {
- g.metrics.IterationsMissed.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed))
- g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed))
- }
- evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval)
- //調用規則組規則運算的調度方法
- iter()
- }
- }
- }
- }
運行Rule調度方法
規則組對具體規則的調度在Group.Eval中實現,在Group.Eval方法中會將規則組下的每條規則通過QueryFunc將(promQL)放到查詢引擎(queryEngine)中執行,如果被執行的是AlertingRule類型,那么執行結果指標會被NotifyFunc組件發送給告警服務;如果是RecordingRule類型,最后將改結果指標存儲到Prometheus的儲存管理器中,并對過期指標進行存儲標記處理。
- // Eval runs a single evaluation cycle in which all rules are evaluated sequentially.
- func (g *Group) Eval(ctx context.Context, ts time.Time) {
- var samplesTotal float64
- 遍歷當前規則組下的所有規則
- for i, rule := range g.rules {
- select {
- case <-g.done:
- return
- default:
- }
- func(i int, rule Rule) {
- sp, ctx := opentracing.StartSpanFromContext(ctx, "rule")
- sp.SetTag("name", rule.Name())
- defer func(t time.Time) {
- sp.Finish()
- //更新服務指標-規則的執行時間
- since := time.Since(t)
- g.metrics.EvalDuration.Observe(since.Seconds())
- rule.SetEvaluationDuration(since)
- //記錄本次規則執行的耗時
- rule.SetEvaluationTimestamp(t)
- }(time.Now())
- //記錄規則運算的次數
- g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()
- //運算規則
- vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL)
- if err != nil {
- //規則出現錯誤后,終止查詢
- rule.SetHealth(HealthBad)
- rule.SetLastError(err)
- //記錄查詢失敗的次數
- g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()
- // Canceled queries are intentional termination of queries. This normally
- // happens on shutdown and thus we skip logging of any errors here.
- if _, ok := err.(promql.ErrQueryCanceled); !ok {
- level.Warn(g.logger).Log("msg", "Evaluating rule failed", "rule", rule, "err", err)
- }
- return
- }
- samplesTotal += float64(len(vector))
- //判斷是否是告警類型規則
- if ar, ok := rule.(*AlertingRule); ok {
- 發送告警
- ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc)
- }
- var (
- numOutOfOrder = 0
- numDuplicates = 0
- )
- //此處為Recording獲取存儲器指標
- app := g.opts.Appendable.Appender(ctx)
- seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i]))
- defer func() {
- if err := app.Commit(); err != nil {
- rule.SetHealth(HealthBad)
- rule.SetLastError(err)
- g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()
- level.Warn(g.logger).Log("msg", "Rule sample appending failed", "err", err)
- return
- }
- g.seriesInPreviousEval[i] = seriesReturned
- }()
- for _, s := range vector {
- if _, err := app.Append(0, s.Metric, s.T, s.V); err != nil {
- rule.SetHealth(HealthBad)
- rule.SetLastError(err)
- switch errors.Cause(err) {
- 儲存指標返回的各種錯誤碼處理
- case storage.ErrOutOfOrderSample:
- numOutOfOrder++
- level.Debug(g.logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s)
- case storage.ErrDuplicateSampleForTimestamp:
- numDuplicates++
- level.Debug(g.logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s)
- default:
- level.Warn(g.logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s)
- }
- } else {
- //緩存規則運算后的結果指標
- seriesReturned[s.Metric.String()] = s.Metric
- }
- }
- if numOutOfOrder > 0 {
- level.Warn(g.logger).Log("msg", "Error on ingesting out-of-order result from rule evaluation", "numDropped", numOutOfOrder)
- }
- if numDuplicates > 0 {
- level.Warn(g.logger).Log("msg", "Error on ingesting results from rule evaluation with different value but same timestamp", "numDropped", numDuplicates)
- }
- for metric, lset := range g.seriesInPreviousEval[i] {
- if _, ok := seriesReturned[metric]; !ok {
- //設置過期指標的指標值
- // Series no longer exposed, mark it stale.
- _, err = app.Append(0, lset, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN))
- switch errors.Cause(err) {
- case nil:
- case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
- // Do not count these in logging, as this is expected if series
- // is exposed from a different rule.
- default:
- level.Warn(g.logger).Log("msg", "Adding stale sample failed", "sample", metric, "err", err)
- }
- }
- }
- }(i, rule)
- }
- if g.metrics != nil {
- g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal)
- }
- g.cleanupStaleSeries(ctx, ts)
- }
然后就是規則的具體執行了,我們這里先只看AlertingRule的流程。首先看下AlertingRule的結構:
- // An AlertingRule generates alerts from its vector expression.
- type AlertingRule struct {
- // The name of the alert.
- name string
- // The vector expression from which to generate alerts.
- vector parser.Expr
- // The duration for which a labelset needs to persist in the expression
- // output vector before an alert transitions from Pending to Firing state.
- holdDuration time.Duration
- // Extra labels to attach to the resulting alert sample vectors.
- labels labels.Labels
- // Non-identifying key/value pairs.
- annotations labels.Labels
- // External labels from the global config.
- externalLabels map[string]string
- // true if old state has been restored. We start persisting samples for ALERT_FOR_STATE
- // only after the restoration.
- restored bool
- // Protects the below.
- mtx sync.Mutex
- // Time in seconds taken to evaluate rule.
- evaluationDuration time.Duration
- // Timestamp of last evaluation of rule.
- evaluationTimestamp time.Time
- // The health of the alerting rule.
- health RuleHealth
- // The last error seen by the alerting rule.
- lastError error
- // A map of alerts which are currently active (Pending or Firing), keyed by
- // the fingerprint of the labelset they correspond to.
- active map[uint64]*Alert
- logger log.Logger
- }
這里比較重要的就是active字段了,它保存了執行規則后需要進行告警的資源,具體是否告警還要執行一系列的邏輯來判斷是否滿足告警條件。具體執行的邏輯如下:
- func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, externalURL *url.URL) (promql.Vector, error) {
- res, err := query(ctx, r.vector.String(), ts)
- if err != nil {
- r.SetHealth(HealthBad)
- r.SetLastError(err)
- return nil, err
- }
- // ......
- }
這一步通過創建Manager時傳入的QueryFunc函數執行規則配置中的expr表達式,然后得到返回的結果,這里的結果是滿足表達式的指標的集合。比如配置的規則為:
- cpu_usage > 90
那么查出來的結果可能是
- cpu_usage{instance="192.168.0.11"} 91
- cpu_usage{instance="192.168.0.12"} 92
然后遍歷查詢到的結果,根據指標的標簽生成一個hash值,然后判斷這個hash值是否之前已經存在(即之前是否已經有相同的指標數據返回),如果是,則更新上次的value及annotations,如果不是,則創建一個新的alert并保存至該規則下的active alert列表中。然后遍歷規則的active alert列表,根據規則的持續時長配置、alert的上次觸發時間、alert的當前狀態、本次查詢alert是否依然存在等信息來修改alert的狀態。具體規則如下:
如果alert之前存在,但本次執行時不存在
- 狀態是StatePending或者本次檢查時間距離上次觸發時間超過15分鐘(15分鐘為寫死的常量),則將該alert從active列表中刪除
- 狀態不為StateInactive的alert修改為StateInactive
如果alert之前存在并且本次執行仍然存在
- alert的狀態是StatePending并且本次檢查距離上次觸發時間超過配置的for持續時長,那么狀態修改為StateFiring
其余情況修改alert的狀態為StatePending
上面那一步只是修改了alert的狀態,但是并沒有真正執行發送告警操作。下面才是真正要執行告警操作:
- // 判斷規則是否是alert規則,如果是則發送告警信息(具體是否真正發送由ar.sendAlerts中的邏輯判斷)
- if ar, ok := rule.(*AlertingRule); ok {
- ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc)
- }
- // .......
- func (r *AlertingRule) sendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc) {
- alerts := []*Alert{}
- r.ForEachActiveAlert(func(alert *Alert) {
- if alert.needsSending(ts, resendDelay) {
- alert.LastSentAt = ts
- // Allow for two Eval or Alertmanager send failures.
- delta := resendDelay
- if interval > resendDelay {
- delta = interval
- }
- alert.ValidUntil = ts.Add(4 * delta)
- anew := *alert
- alerts = append(alerts, &anew)
- }
- })
- notifyFunc(ctx, r.vector.String(), alerts...)
- }
- func (a *Alert) needsSending(ts time.Time, resendDelay time.Duration) bool {
- if a.State == StatePending {
- return false
- }
- // if an alert has been resolved since the last send, resend it
- if a.ResolvedAt.After(a.LastSentAt) {
- return true
- }
- return a.LastSentAt.Add(resendDelay).Before(ts)
- }
概括一下以上邏輯就是:
- 如果alert的狀態是StatePending,則不發送告警
- 如果alert的已經被解決,那么再次發送告警標記該條信息已經被解決
- 如果當前時間距離上次發送告警的時間大于配置的重新發送延時時間(ResendDelay),則發送告警,否則不發送
以上就是prometheus的告警流程。學習這個流程主要是問了能夠對prometheus的rules相關的做二次開發。我們可以修改LoadGroups()方法,讓其可以動態側加載定義在mysql中定義的規則,動態實現告警規則更新。
參考:
《深入淺出prometheus原理、應用、源碼與拓展詳解》