基于Prometheus的自動化巡檢
前言
目前,大部分公司都采用Prometheus + Grafana
這一套來做指標監控,所以在Prometheus
中也有大量的指標數據。為了滿足日常工作中的巡檢,可以基于Prometheus
實現自動巡檢,減輕部分運維壓力。
思路
為了靈活管理巡檢任務,將整個巡檢功能進行了拆分管理,分為:
數據源管理
:可以管理多個Prometheus
數據源,后期也可以增加其他數據源,比如ES
等。巡檢項管理
:目前的巡檢項
就是各種Prometheus規則
,之所以要單獨進行管理,是為了在多數據源、多集群等情況下進行復用。標簽管理
:目前是Prometheus
的label
,也是為了方便復用巡檢項
,巡檢項
和標簽
可以靈活進行組合。任務編排
:編排各種巡檢任務。執行作業
:配置定時的巡檢作業,它由多個編排的任務組成。巡檢報告
:便于查看、導出巡檢結果。巡檢通知
:巡檢結果可以通知到企業微信
群,便于業務方快速知道目前整個系統有沒有問題。
圖片
效果
數據源管理
(1)添加數據源
(2)數據源列表
巡檢項管理
(1)添加巡檢項
(2)巡檢項列表
標簽管理
(1)添加標簽
(2)標簽列表
任務編排
(1)創建任務編排
(2)任務列表
執行作業
(1)創建執行作業
(2)作業列表
巡檢報告
每次巡檢完成都會生成對應的巡檢報告。
點擊詳情可以看到巡檢的具體結果。
點擊導出,即可將報告導出為PDF。
如果配置了巡檢通知,則會將對應的巡檢結果發送到企業微信群。
代碼實現
大部分的代碼都是普通的CRUD
,比如數據源的管理、巡檢項的管理都是基礎的CRUD
,沒有什么好說的。
這里簡單說一下具體巡檢的實現。
(1)當用戶創建了執行作業
且該作業處于開啟
狀態,就會創建一個定時任務。
// CreateCronTask 創建定時任務
func (inspectionExecutionJobService *InspectionExecutionJobService) CreateCronTask(job *AutoInspection.InspectionExecutionJob) error {
cronName := fmt.Sprintf("InspectionExecution_%d", job.ID)
taskName := fmt.Sprintf("InspectionExecution_%d", job.ID)
// 檢查是否已存在相同的定時任務
if _, found := global.GVA_Timer.FindTask(cronName, taskName); found {
// 如果已存在,先清除舊的定時任務
global.GVA_Timer.Clear(cronName)
}
// 創建定時任務
var option []cron.Option
option = append(option, cron.WithSeconds())
// 添加定時任務
if _, err := global.GVA_Timer.AddTaskByFunc(cronName, job.CronExpr, func() {
// 執行巡檢任務
inspectionExecutionJobService.ExecuteInspectionJob(job)
}, taskName, option...); err != nil {
global.GVA_LOG.Error("創建定時任務失敗", zap.Error(err), zap.Uint("jobID", job.ID))
return err
}
// 更新下次執行時間
nextTime := inspectionExecutionJobService.calculateNextRunTime(job.CronExpr)
job.NextRunTime = &nextTime
// 更新數據庫中的記錄
return global.GVA_DB.Model(job).Updates(map[string]interface{}{
"next_run_time": job.NextRunTime,
}).Error
}
Tips:因為是采用的
gin-vue-admin
框架,所以直接使用框架自帶的timer定時器。
(2)當執行時間到了,就會執行ExecuteInspectionJob
巡檢任務。
func (inspectionExecutionJobService *InspectionExecutionJobService) ExecuteInspectionJob(job *AutoInspection.InspectionExecutionJob) {
// 更新作業執行時間
inspectionExecutionJobService.updateJobExecutionTime(job)
// 創建執行記錄
jobExecution := inspectionExecutionJobService.createJobExecution(job)
if jobExecution == nil {
return
}
// 執行所有關聯的巡檢任務并收集結果
allResults := inspectionExecutionJobService.executeAllInspectionTasks(job, jobExecution)
global.GVA_LOG.Info("執行完成", zap.Any("results", allResults))
// 更新執行記錄狀態和結果
inspectionExecutionJobService.updateJobExecutionResult(jobExecution, allResults)
// 發送通知
if *job.IsNotice {
inspectionExecutionJobService.sendInspectionNotification(job, jobExecution, allResults)
}
}
這里主要是executeAllInspectionTasks
來執行巡檢任務。
// executeAllInspectionTasks 執行所有關聯的巡檢任務并收集結果
func (inspectionExecutionJobService *InspectionExecutionJobService) executeAllInspectionTasks(job *AutoInspection.InspectionExecutionJob, jobExecution *AutoInspection.JobExecution) []*result.ProductsResult {
// 創建一個等待組來同步所有巡檢任務
var wg sync.WaitGroup
// 創建一個互斥鎖來保護結果集
var mu sync.Mutex
// 創建一個結果集合
allResults := make([]*result.ProductsResult, 0)
// 執行所有關聯的巡檢任務
for _, jobID := range job.JobIds {
wg.Add(1)
gofunc(id uint) {
defer wg.Done()
// 執行單個巡檢任務并獲取結果
result := inspectionExecutionJobService.executeSingleInspectionTask(id, jobExecution)
if result != nil {
// 將結果添加到總結果集中
mu.Lock()
allResults = append(allResults, result)
mu.Unlock()
}
}(jobID)
}
// 等待所有巡檢任務完成
wg.Wait()
return allResults
}
它會把作業
中的任務拆成單個任務,然后由executeSingleInspectionTask
分別執行并收集執行結果。
// executeSingleInspectionTask 執行單個巡檢任務
func (inspectionExecutionJobService *InspectionExecutionJobService) executeSingleInspectionTask(jobID uint, jobExecution *AutoInspection.JobExecution) *result.ProductsResult {
global.GVA_LOG.Info("執行巡檢任務", zap.Uint("jobID", jobID))
// 獲取巡檢任務信息
inspectionJob, _ := inspectionJobService.GetInspectionJob(fmt.Sprintf("%d", jobID))
// 創建結果通道
resultCh := make(chan *result.ProductsResult)
// 創建一個用于等待結果的WaitGroup
var resultWg sync.WaitGroup
resultWg.Add(1)
// 用于存儲結果的變量
var taskResult *result.ProductsResult
// 啟動一個goroutine來接收結果
gofunc() {
defer resultWg.Done()
result := <-resultCh
global.GVA_LOG.Info("巡檢任務執行完成",
zap.String("jobName", inspectionJob.Name),
zap.Any("result", result))
// 保存結果
taskResult = result
}()
// 執行巡檢任務
inspectionExecutionJobService.ExecuteInspectionTask(&inspectionJob, jobExecution, resultCh)
// 等待結果接收完成
resultWg.Wait()
return taskResult
}
在ExecuteInspectionTask
中是為了方便擴展數據源。
func (inspectionExecutionJobService *InspectionExecutionJobService) ExecuteInspectionTask(inspectionJob *AutoInspection.InspectionJob, jobExecution *AutoInspection.JobExecution, resultCh chan *result.ProductsResult) {
switch inspectionJob.DataSourceType {
case "prometheus":
// 執行Prometheus巡檢任務
inspectionExecutionJobService.ExecutePrometheusInspectionTask(inspectionJob, jobExecution, resultCh)
}
}
由于目前只有Prometheus
數據源,所以將直接執行ExecutePrometheusInspectionTask
。在這個方法中主要是構造Prometheus
規則然后進行巡檢。
// ExecutePrometheusInspectionTask 執行Prometheus巡檢任務
func (inspectionExecutionJobService *InspectionExecutionJobService) ExecutePrometheusInspectionTask(inspectionJob *AutoInspection.InspectionJob, jobExecution *AutoInspection.JobExecution, resultCh chan *result.ProductsResult) {
// 執行Prometheus巡檢任務的邏輯
var inspectionItemsService InspectionItemsService
var inspectionTagService InspectionTagService
var dataSourceService DataSourceService
// 獲取數據源信息
dataSource, _ := dataSourceService.GetDataSource(fmt.Sprintf("%d", inspectionJob.DataSourceId))
// 創建規則列表
prometheusRules := make([]*product.PrometheusRule, 0, len(inspectionJob.ItemLabelMaps))
// 遍歷巡檢項與標簽映射關系
for _, itemLabelMap := range inspectionJob.ItemLabelMaps {
// 獲取巡檢項信息
inspectionItem, _ := inspectionItemsService.GetInspectionItems(fmt.Sprintf("%d", itemLabelMap.ItemId))
// 獲取標簽信息
var inspectionTag AutoInspection.InspectionTag
if itemLabelMap.LabelId != 0 {
inspectionTag, _ = inspectionTagService.GetInspectionTag(fmt.Sprintf("%d", itemLabelMap.LabelId))
}
// 創建Prometheus規則
prometheusRule := &product.PrometheusRule{
Name: inspectionItem.Name,
Rule: inspectionItem.Rule,
LabelFilter: inspectionTag.Label,
Desc: inspectionItem.Description,
AlertInfo: inspectionItem.OutputTemplate,
DataSourceName: dataSource.Name,
}
// 添加到規則列表
prometheusRules = append(prometheusRules, prometheusRule)
}
// 創建規則集合
rules := product.Rules{
Prometheus: prometheusRules,
AliyunSafe: []*product.AliyunSafeRule{}, // 空列表,因為這里只處理Prometheus規則
}
// 創建產品
prod := &product.Product{
Name: inspectionJob.Name,
Rules: rules,
}
// 使用defer和recover捕獲可能的panic
deferfunc() {
if r := recover(); r != nil {
// 記錄panic信息
global.GVA_LOG.Error("執行巡檢任務發生panic",
zap.Any("panic", r),
zap.String("jobName", inspectionJob.Name))
// 創建一個表示失敗的結果并發送到結果通道
pr := &result.ProductsResult{ProductName: inspectionJob.Name}
// 為每個規則創建失敗結果
for _, rule := range prometheusRules {
errorMsg := fmt.Sprintf("巡檢執行失敗: %v", r)
failureResult := result.NewRuleResult(
result.WithInspectionInfo(rule.Name),
result.WithInspectionResult(result.ABNORMAL),
result.WithInspectionErrorInfo(
[]map[string]string{{
"error": errorMsg,
"rule": rule.Rule,
}},
"執行規則 {{rule}} 時發生錯誤: {{error}}",
),
)
pr.Add(failureResult)
}
// 發送結果
resultCh <- pr
}
}()
// 執行巡檢
err = prod.Run(resultCh)
if err != nil {
global.GVA_LOG.Error("執行巡檢任務失敗", zap.Error(err), zap.String("jobName", inspectionJob.Name))
return
}
global.GVA_LOG.Info("巡檢任務已啟動", zap.String("jobName", inspectionJob.Name))
}
在prod.Run
中,會去做真正的指標數據查詢。
func (p *Product) Run(resultCh chan *result.ProductsResult) error {
global.GVA_LOG.Info(fmt.Sprintf("開始巡檢, %s", p.Name))
pr := &result.ProductsResult{ProductName: p.Name}
// prometheus巡檢規則
for _, prometheusRule := range p.Rules.Prometheus {
ruleInspectRes, err := prometheusRule.Run()
if err != nil {
return err
}
pr.Add(ruleInspectRes)
}
resultCh <- pr
returnnil
}
然后調用prometheusRule.Run
獲取結果。
func (r *PrometheusRule) Run() (*result.RuleResult, error) {
ds, err := datasource.GetByName(r.DataSourceName)
if err != nil {
returnnil, err
}
pds, ok := ds.(*datasource.PrometheusDataSource)
if !ok {
returnnil, fmt.Errorf("數據源類型錯誤: %s 不是Prometheus數據源", r.DataSourceName)
}
if pds.Client == nil {
returnnil, fmt.Errorf("數據源為空: %s", r.DataSourceName)
}
res, err := pds.Run(r.Rule, r.LabelFilter)
if err != nil {
returnnil, err
}
ruleRes := r.buildRuleResult(res)
return ruleRes, nil
}
func (r *PrometheusRule) buildRuleResult(resultLabels []map[string]string) *result.RuleResult {
iflen(resultLabels) == 0 {
return result.NewRuleResult(result.WithInspectionInfo(fmt.Sprintf("%s", r.Name)),
result.WithInspectionResult(result.NORMAL))
}
return result.NewRuleResult(result.WithInspectionInfo(fmt.Sprintf("%s", r.Name)),
result.WithInspectionResult(result.ABNORMAL),
result.WithInspectionErrorInfo(resultLabels, r.AlertInfo))
}
具體的查詢是封裝在pds.Run
中的,它會去調用Prometheus
的接口去查詢數據。
func Query(client api.Client, rule string) (model.Value, []string, error) {
// 添加空指針檢查
if client == nil {
returnnil, nil, errors.New("Prometheus client is nil")
}
v1Api := promV1.NewAPI(client)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
value, warnings, err := v1Api.Query(ctx, rule, time.Now(), promV1.WithTimeout(10*time.Second))
global.GVA_LOG.Debug("查詢結果", zap.String("value", value.String()), zap.Any("warnings", warnings))
if err != nil {
returnnil, nil, errors.WithStack(err)
}
return value, warnings, nil
}
(3)如果需要發送到企業微信,就會構建發送結果進行發送。
func (inspectionExecutionJobService *InspectionExecutionJobService) sendInspectionNotification(job *AutoInspection.InspectionExecutionJob, jobExecution *AutoInspection.JobExecution, results []*result.ProductsResult) {
// 獲取通知配置
var notifyService = NotifyService{}
notify, err := notifyService.GetNotify(fmt.Sprintf("%d", job.NoticdId))
if err != nil {
global.GVA_LOG.Error("獲取通知配置失敗", zap.Error(err))
return
}
// 構建通知內容
// 1. 巡檢摘要
taskCount := len(results) // 巡檢任務數量
itemCount := 0 // 巡檢項數量
normalCount := 0 // 正常項數量
abnormalCount := 0 // 異常項數量
abnormalItems := []string{} // 異常項列表
// 統計巡檢項、正常項和異常項的數量
for _, task := range results {
itemCount += len(task.SubRuleResults)
for _, item := range task.SubRuleResults {
if item.InspectionResult == result.NORMAL {
normalCount++
} elseif item.InspectionResult == result.ABNORMAL {
abnormalCount++
// 收集異常項信息
abnormalDetail := fmt.Sprintf("【%s】%s", task.ProductName, item.InspectionInfo)
iflen(item.InspectionErrorInfo) > 0 {
abnormalDetail += "\n" + strings.Join(item.InspectionErrorInfo, "\n")
}
abnormalItems = append(abnormalItems, abnormalDetail)
}
}
}
// 格式化摘要信息
summary := fmt.Sprintf("巡檢任務%d個,巡檢項%d個,正常%d個,異常%d個", taskCount, itemCount, normalCount, abnormalCount)
// 構建企業微信通知內容
var content string
if notify.TemplateType == "markdown" {
// Markdown格式
content = fmt.Sprintf(`{
"msgtype": "markdown",
"markdown": {
"content": "# 自動化巡檢結果通知\n\n> ### 執行作業:%s\n> ### 執行時間:%s\n> ### 執行結果:%s\n\n### **異常項列表:**\n%s"
}
}`,
jobExecution.ExecutionJobName,
jobExecution.EndTime.Format("2006-01-02 15:04:05"),
summary,
formatAbnormalItems(abnormalItems))
} else {
// 文本格式
content = fmt.Sprintf(`{
"msgtype": "text",
"text": {
"content": "巡檢結果通知\n執行作業:%s\n執行時間:%s\n執行結果:%s\n\n異常項列表:\n%s"
}
}`,
jobExecution.ExecutionJobName,
jobExecution.EndTime.Format("2006-01-02 15:04:05"),
summary,
formatAbnormalItemsText(abnormalItems))
}
// 發送通知
ctx := context.Background()
sendParams := sender.SendParams{
NoticeType: notify.Type,
NoticeId: fmt.Sprintf("%d", notify.ID),
NoticeName: notify.Name,
Hook: notify.Address,
Content: content,
}
err = sender.Sender(&ctx, sendParams)
if err != nil {
global.GVA_LOG.Error("發送巡檢通知失敗", zap.Error(err))
return
}
global.GVA_LOG.Info("發送巡檢通知成功",
zap.String("jobName", jobExecution.ExecutionJobName),
zap.String("summary", summary))
}
(4)PDF導出是用wkhtmltopdf
實現,該包依賴服務器上的wkhtmltopdf
命令。
func (jobExecutionService *JobExecutionService) GeneratePDF(jobExecution *AutoInspection.JobExecution) (string, error) {
pdf, err := wkhtmltopdf.NewPDFGenerator()
if err != nil {
global.GVA_LOG.Error("PDF生成器初始化失敗", zap.Error(err))
return"", err
}
// 設置全局選項
pdf.Dpi.Set(300)
pdf.Orientation.Set(wkhtmltopdf.OrientationPortrait)
pdf.PageSize.Set(wkhtmltopdf.PageSizeA4)
pdf.MarginTop.Set(20)
pdf.MarginBottom.Set(20)
pdf.MarginLeft.Set(20)
pdf.MarginRight.Set(20)
// 渲染HTML模板
htmlContent, err := jobExecutionService.renderHTMLTemplate(jobExecution)
if err != nil {
global.GVA_LOG.Error("HTML模板渲染失敗", zap.Error(err))
return"", err
}
// 創建一個頁面并添加到生成器
page := wkhtmltopdf.NewPageReader(bytes.NewBufferString(htmlContent))
pdf.AddPage(page)
// 生成PDF
err = pdf.Create()
if err != nil {
return"", err
}
basePath := "uploads/pdf"
// 創建目錄(如果不存在)
if err = os.MkdirAll(basePath, 0755); err != nil {
global.GVA_LOG.Error("創建PDF保存目錄失敗", zap.Error(err))
return"", err
}
filename := generatePDFFileName(jobExecution)
filePath := filepath.Join(basePath, filename)
// 3. 保存PDF到文件
if err = os.WriteFile(filePath, pdf.Bytes(), 0644); err != nil {
global.GVA_LOG.Error("保存PDF文件失敗", zap.Error(err))
return"", err
}
....
return downloadURL, nil
}
以上就是實現巡檢的主要代碼。
最后
大部分企業雖然都有監控告警,但是自動化巡檢在日常的運維工作中還是必要的,它可以聚合目前系統、集群存在的問題,避免遺漏告警信息。另外,在AI發展迅猛的今天,可以把AI也結合到自動化巡檢中,比如在巡檢中增加一些AI預測,AI故障診斷、AI根因分析等功能。