Skip to content

Commit 1b516fa

Browse files
committed
doc(Metrics&Alert): 修改数据库字段,修改内部处理逻辑并通过测试
1 parent 9b57efb commit 1b516fa

File tree

5 files changed

+272
-28
lines changed

5 files changed

+272
-28
lines changed

docs/alerting/database-design.md

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
|--------|------|------|
2323
| id | varchar(64) PK | 告警 issue ID |
2424
| state | enum(Closed, Open) | 问题状态 |
25-
| level | varchar(32) | 告警等级:如 P0/P1/Px/Warning |
25+
| level | varchar(32) | 告警等级:如 P0/P1/Px |
2626
| alert_state | enum(Pending, Restored, AutoRestored, InProcessing) | 处理状态 |
2727
| title | varchar(255) | 告警标题 |
2828
| labels | json | 标签,格式:[{key, value}] |
@@ -117,25 +117,21 @@
117117

118118
---
119119

120-
### 7) service_states(服务异常状态表
120+
### 7) service_states(服务状态表
121121

122122
追踪服务在某一版本上的健康状态与处置进度。
123123

124124
| 字段名 | 类型 | 说明 |
125125
|--------|------|------|
126126
| service | varchar(255) PK | 服务名 |
127127
| version | varchar(255) PK | 版本号 |
128-
<!-- | level | varchar(32) | 影响等级:如 P0/P1/Px/Warning | -->
129-
| detail | text | 异常详情(可为 JSON 文本)(可空) |
130-
| report_at | TIMESTAMP(6) | 首次报告时间 |
128+
| report_at | TIMESTAMP(6) | 同步alert_issue_ids中,alert_issue中alert_state=InProcessing状态的alert_since的最早时间 |
131129
| resolved_at | TIMESTAMP(6) | 解决时间(可空) |
132-
| health_state | enum(Normal,Processing,Error) | 处置阶段 |
133-
| correlation_id | varchar(255) | 关联 ID(用于跨系统联动/串联事件)(可空) |
130+
| health_state | enum(Normal,Warning,Error) | 处置阶段 |
131+
| alert_issue_ids | [] alert_issue_id | 关联alert_issues表的id |
134132

135133
**索引建议:**
136134
- PRIMARY KEY: `(service, version)`
137-
- INDEX: `(health_state, report_at)`
138-
- INDEX: `(correlation_id)`
139135

140136
## 数据关系(ER)
141137

Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
# healthcheck — Pending 告警扫描与分发任务
2+
3+
本包提供一个定时任务:
4+
- 周期性扫描 Pending 状态的告警
5+
- 将告警投递到消息队列(供下游处理器消费)
6+
- 成功投递后,原子地把缓存中的状态更新:
7+
- `alert:issue:{id}``alertState`:Pending → InProcessing
8+
- `service_state:{service}:{version}``health_state`:由告警等级推导(P0→Error;P1/P2→Warning)
9+
10+
此任务默认只更新缓存,不直接更新数据库,以降低耦合与避免与业务处理竞争。数据库状态可由下游处理器在处理开始时回写,或由后续补偿任务兜底。
11+
12+
——
13+
14+
## 1. 触发与频率
15+
16+
- 间隔:默认每 10s 扫描一次(可配置)
17+
- 批量:每次最多处理 200 条 Pending(可配置)
18+
- 并发:串行或小并发(<= 4),避免重复投递
19+
20+
环境变量建议:
21+
```
22+
HC_SCAN_INTERVAL=10s
23+
HC_SCAN_BATCH=200
24+
HC_WORKERS=1
25+
```
26+
27+
——
28+
29+
## 2. 数据来源与过滤
30+
31+
优先以数据库为准,结合缓存加速:
32+
33+
- 数据库查询(推荐)
34+
```sql
35+
SELECT id, level, title, labels, alert_since
36+
FROM alert_issues
37+
WHERE alert_state = 'Pending'
38+
ORDER BY alert_since ASC
39+
LIMIT $1;
40+
```
41+
42+
当告警切换为 InProcessing 时,需要更新对应 `service_states.report_at` 为该 service/version 关联的 `alert_issue_ids` 中,所有 alert_issues 里 alert_state=InProcessing 的 `alert_since` 最早时间(min)。可通过下游处理器或本任务的补充逻辑回填:
43+
44+
```sql
45+
UPDATE service_states ss
46+
SET report_at = sub.min_since
47+
FROM (
48+
SELECT si.service, si.version, MIN(ai.alert_since) AS min_since
49+
FROM service_states si
50+
JOIN alert_issues ai ON ai.id = ANY(si.alert_issue_ids)
51+
WHERE ai.alert_state = 'InProcessing'
52+
GROUP BY si.service, si.version
53+
) AS sub
54+
WHERE ss.service = sub.service AND ss.version = sub.version;
55+
```
56+
57+
- 或仅用缓存(可选):
58+
- 维护集合 `alert:index:alert_state:Pending`(若未维护,可临时 SCAN `alert:issue:*` 并过滤 JSON 中的 `alertState`,但不推荐在大规模下使用 SCAN)。
59+
60+
——
61+
62+
## 3. 消息队列
63+
64+
抽象接口:
65+
```go
66+
type AlertQueue interface {
67+
PublishAlert(ctx context.Context, msg AlertMessage) error
68+
}
69+
70+
type AlertMessage struct {
71+
ID string `json:"id"`
72+
Service string `json:"service"`
73+
Version string `json:"version,omitempty"`
74+
Level string `json:"level"`
75+
Title string `json:"title"`
76+
AlertSince time.Time `json:"alert_since"`
77+
Labels map[string]string `json:"labels"`
78+
}
79+
```
80+
81+
实现可选:Kafka、NATS、SQS、Redis Stream(示例):
82+
```go
83+
// Redis Stream 样例
84+
func (q *RedisStreamQueue) PublishAlert(ctx context.Context, m AlertMessage) error {
85+
b, _ := json.Marshal(m)
86+
return q.r.XAdd(ctx, &redis.XAddArgs{Stream: q.stream, Values: map[string]any{"data": b}}).Err()
87+
}
88+
```
89+
90+
环境变量建议:
91+
```
92+
ALERT_QUEUE_KIND=redis_stream|kafka|nats
93+
ALERT_QUEUE_DSN=redis://localhost:6379/0
94+
ALERT_QUEUE_TOPIC=alerts.pending
95+
```
96+
97+
——
98+
99+
## 4. 缓存键与原子更新
100+
101+
现有(或建议)键:
102+
- 告警:`alert:issue:{id}` → JSON,字段包含 `alertState`
103+
- 指数(可选):`alert:index:alert_state:{Pending|InProcessing|...}`
104+
- 服务态:`service_state:{service}:{version}` → JSON,字段包含 `health_state`
105+
- 指数:`service_state:index:health:{Error|Warning|...}`
106+
107+
为避免并发写冲突,建议使用 Lua CAS(Compare-And-Set)脚本原子修改值与索引:
108+
109+
```lua
110+
-- KEYS[1] = alert key, ARGV[1] = expected, ARGV[2] = next, KEYS[2] = idx:old, KEYS[3] = idx:new, ARGV[3] = id
111+
local v = redis.call('GET', KEYS[1])
112+
if not v then return 0 end
113+
local obj = cjson.decode(v)
114+
if obj.alertState ~= ARGV[1] then return -1 end
115+
obj.alertState = ARGV[2]
116+
redis.call('SET', KEYS[1], cjson.encode(obj), 'KEEPTTL')
117+
if KEYS[2] ~= '' then redis.call('SREM', KEYS[2], ARGV[3]) end
118+
if KEYS[3] ~= '' then redis.call('SADD', KEYS[3], ARGV[3]) end
119+
return 1
120+
```
121+
122+
服务态类似(示例将态切换到推导的新态):
123+
```lua
124+
-- KEYS[1] = service_state key, ARGV[1] = expected(optional), ARGV[2] = next, KEYS[2] = idx:old(optional), KEYS[3] = idx:new, ARGV[3] = member
125+
local v = redis.call('GET', KEYS[1])
126+
if not v then return 0 end
127+
local obj = cjson.decode(v)
128+
if ARGV[1] ~= '' and obj.health_state ~= ARGV[1] then return -1 end
129+
obj.health_state = ARGV[2]
130+
redis.call('SET', KEYS[1], cjson.encode(obj), 'KEEPTTL')
131+
if KEYS[2] ~= '' then redis.call('SREM', KEYS[2], ARGV[3]) end
132+
if KEYS[3] ~= '' then redis.call('SADD', KEYS[3], ARGV[3]) end
133+
return 1
134+
```
135+
136+
——
137+
138+
## 5. 任务流程(伪代码)
139+
140+
```go
141+
func runOnce(ctx context.Context, db *Database, rdb *redis.Client, q AlertQueue, batch int) error {
142+
rows := queryPendingFromDB(ctx, db, batch) // id, level, title, labels(JSON), alert_since
143+
for _, it := range rows {
144+
svc := it.Labels["service"]
145+
ver := it.Labels["service_version"]
146+
// 1) 投递消息
147+
if err := q.PublishAlert(ctx, AlertMessage{ID: it.ID, Service: svc, Version: ver, Level: it.Level, Title: it.Title, AlertSince: it.AlertSince, Labels: it.Labels}); err != nil {
148+
// 投递失败:跳过状态切换,计数并继续
149+
continue
150+
}
151+
// 2) 缓存状态原子切换(告警)
152+
alertKey := "alert:issue:" + it.ID
153+
rdb.Eval(ctx, alertCAS, []string{alertKey, "alert:index:alert_state:Pending", "alert:index:alert_state:InProcessing"}, "Pending", "InProcessing", it.ID)
154+
// 3) 缓存状态原子切换(服务态:按告警等级推导)
155+
if svc != "" { // version 可空
156+
target := deriveHealth(it.Level) // P0->Error; P1/P2->Warning; else Warning
157+
svcKey := "service_state:" + svc + ":" + ver
158+
-- 可按需指定旧态索引,否则留空
159+
localOld := ''
160+
newIdx := "service_state:index:health:" + target
161+
member := svcKey
162+
rdb.Eval(ctx, svcCAS, []string{svcKey, localOld, newIdx}, '', target, member)
163+
}
164+
}
165+
return nil
166+
}
167+
168+
func StartScheduler(ctx context.Context, deps Deps) {
169+
t := time.NewTicker(deps.Interval)
170+
defer t.Stop()
171+
for {
172+
select {
173+
case <-ctx.Done(): return
174+
case <-t.C:
175+
_ = runOnce(ctx, deps.DB, deps.Redis, deps.Queue, deps.Batch)
176+
}
177+
}
178+
}
179+
```
180+
181+
——
182+
183+
## 6. 可观测与重试
184+
185+
- 指标:扫描次数、选出数量、成功投递数量、CAS 成功/失败数量、用时分位
186+
- 日志:每批开始/结束、首尾 ID、错误明细
187+
- 重试:
188+
- 消息投递失败:不更改缓存状态,等待下次扫描重试
189+
- CAS 返回 -1(状态被他处更改):记录并跳过
190+
191+
——
192+
193+
## 7. 本地验证
194+
195+
1) 准备 Redis 与 DB(见 receiver/README.md)
196+
197+
2) 造数据:插入一条 `alert_issues.alert_state='Pending'` 且缓存中存在 `alert:issue:{id}` 的 JSON。
198+
199+
3) 启动任务:观察日志/指标。
200+
201+
4) 验证缓存:
202+
```bash
203+
redis-cli --raw GET alert:issue:<id> | jq
204+
redis-cli --raw SMEMBERS alert:index:alert_state:InProcessing | head -n 20
205+
redis-cli --raw GET service_state:<service>:<version> | jq
206+
redis-cli --raw SMEMBERS service_state:index:health:Processing | head -n 20
207+
```
208+
209+
5) 验证消息队列:在订阅端查看 `alerts.pending` 是否收到消息。
210+
211+
——
212+
213+
## 8. 配置汇总
214+
215+
```
216+
# 扫描任务
217+
HC_SCAN_INTERVAL=10s
218+
HC_SCAN_BATCH=200
219+
HC_WORKERS=1
220+
221+
# 队列
222+
ALERT_QUEUE_KIND=redis_stream|kafka|nats
223+
ALERT_QUEUE_DSN=redis://localhost:6379/0
224+
ALERT_QUEUE_TOPIC=alerts.pending
225+
```
226+
227+
——
228+
229+

internal/alerting/service/receiver/README.md

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -164,18 +164,21 @@ func (h *Handler) AlertmanagerWebhook(c *gin.Context) {
164164
// 若唯一约束冲突/网络抖动等,记录后继续
165165
continue
166166
}
167-
// 5) 同步写入 service_states(health_state=Error;detail/resolved_at/correlation_id 留空)
167+
// 5) 同步写入 service_states(health_state 由 level 推导;resolved_at 留空;alert_issue_ids 追加新 issue id)
168+
// 规则:P0→Error;P1/P2→Warning;其他→Warning(可按需调整)
168169
// service 从 labels.service 取;version 可从 labels.service_version 取(可空)
169-
if err := h.dao.UpsertServiceState(c, a.Labels["service"], a.Labels["service_version"], row.AlertSince, "Error"); err != nil {
170+
derived := func(l string) string { if l == "P0" { return "Error" }; if l == "P1" || l == "P2" { return "Warning" }; return "Warning" }(row.Level)
171+
// report_at 此处暂不写,由 healthcheck 定时任务在 alert_issues 进入 InProcessing 后回填为最早的 alert_since
172+
if err := h.dao.UpsertServiceState(c, a.Labels["service"], a.Labels["service_version"], nil, derived, row.ID); err != nil {
170173
// 仅记录错误,不阻断主流程
171174
}
172175
// 6) 写通到 Redis(不阻塞主流程,失败仅记录日志)
173176
// alert_issues
174177
if err := h.cache.WriteIssue(c, row, a); err != nil {
175178
// 仅记录错误,避免影响 Alertmanager 重试逻辑
176179
}
177-
// service_states
178-
_ = h.cache.WriteServiceState(c, a.Labels["service"], a.Labels["service_version"], row.AlertSince, "Error")
180+
// service_states(使用同样的推导态)
181+
_ = h.cache.WriteServiceState(c, a.Labels["service"], a.Labels["service_version"], "", derived)
179182
MarkSeen(key) // 记忆幂等键
180183
created++
181184
}
@@ -264,7 +267,7 @@ func NormalizeLevel(sev string) string {
264267
目标:将 Alertmanager 的单条 AMAlert → AlertIssueRow。
265268
• id:uuid.NewString()
266269
• state:Open(首次创建强制)
267-
• alertState:InProcessing(首次创建强制)
270+
• alertState:Pending(首次创建强制)
268271
• level:NormalizeLevel(alert.Labels["severity"])
269272
• title:优先 annotations.summary,否则拼:{idc} {service} {alertname} ...
270273
• label:把 labels 展平成 [{key,value}](额外加上一些关键来源信息:am_fingerprint、generatorURL、groupKey)
@@ -509,8 +512,8 @@ curl -X POST http://localhost:8080/v1/integrations/alertmanager/webhook \
509512
• service=serviceA
510513
• version=(若 labels 中有 service_version 则为其值,否则为空字符串)
511514
• report_at=与 alert_since 一致(若已存在则保留更早的 report_at)
512-
• health_state=Error
513-
detail/resolved_at/correlation_id 为空
515+
• health_state=Warning(因本示例 level=P1)
516+
alert_issue_ids 包含刚插入的 alert_issues.id
514517

515518
Redis 中应看到:
516519
• key: alert:issue:<id> 值为 JSON 且 TTL≈3 天

internal/alerting/service/receiver/dao.go

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ type AlertIssueDAO interface {
1414

1515
// ServiceStateWriter optionally allows writing to service_states table.
1616
type ServiceStateWriter interface {
17-
UpsertServiceState(ctx context.Context, service, version string, reportAt time.Time, healthState string) error
17+
UpsertServiceState(ctx context.Context, service, version string, reportAt *time.Time, healthState string, issueID string) error
1818
}
1919

2020
type NoopDAO struct{}
@@ -23,7 +23,7 @@ func NewNoopDAO() *NoopDAO { return &NoopDAO{} }
2323

2424
func (d *NoopDAO) InsertAlertIssue(ctx context.Context, r *AlertIssueRow) error { return nil }
2525

26-
func (d *NoopDAO) UpsertServiceState(ctx context.Context, service, version string, reportAt time.Time, healthState string) error {
26+
func (d *NoopDAO) UpsertServiceState(ctx context.Context, service, version string, reportAt *time.Time, healthState string, issueID string) error {
2727
return nil
2828
}
2929

@@ -44,17 +44,26 @@ func (d *PgDAO) InsertAlertIssue(ctx context.Context, r *AlertIssueRow) error {
4444
return nil
4545
}
4646

47-
// UpsertServiceState inserts or updates service_states with health_state and earliest report_at.
48-
// detail, resolved_at, correlation_id remain empty/unchanged.
49-
func (d *PgDAO) UpsertServiceState(ctx context.Context, service, version string, reportAt time.Time, healthState string) error {
47+
// UpsertServiceState inserts or updates service_states with health_state and alert_issue_ids.
48+
// report_at is not updated here except at insert-time if provided (may be NULL).
49+
func (d *PgDAO) UpsertServiceState(ctx context.Context, service, version string, reportAt *time.Time, healthState string, issueID string) error {
5050
const q = `
51-
INSERT INTO service_states (service, version, report_at, health_state)
52-
VALUES ($1, $2, $3, $4)
51+
INSERT INTO service_states (service, version, report_at, health_state, alert_issue_ids)
52+
VALUES ($1, $2, $3, $4, ARRAY[$5]::text[])
5353
ON CONFLICT (service, version) DO UPDATE
5454
SET health_state = EXCLUDED.health_state,
55-
report_at = LEAST(service_states.report_at, EXCLUDED.report_at)
55+
alert_issue_ids = CASE
56+
WHEN NOT ($5 = ANY(service_states.alert_issue_ids)) THEN array_append(service_states.alert_issue_ids, $5)
57+
ELSE service_states.alert_issue_ids
58+
END
5659
`
57-
if _, err := d.DB.ExecContext(ctx, q, service, version, reportAt, healthState); err != nil {
60+
var reportAtVal any
61+
if reportAt != nil {
62+
reportAtVal = *reportAt
63+
} else {
64+
reportAtVal = nil
65+
}
66+
if _, err := d.DB.ExecContext(ctx, q, service, version, reportAtVal, healthState, issueID); err != nil {
5867
return fmt.Errorf("upsert service_state: %w", err)
5968
}
6069
return nil

internal/alerting/service/receiver/handler.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package receiver
33
import (
44
"net/http"
55
"strings"
6+
"time"
67

78
"github.com/fox-gonic/fox"
89
)
@@ -60,13 +61,19 @@ func (h *Handler) AlertmanagerWebhook(c *fox.Context) {
6061
if err := h.dao.InsertAlertIssue(c.Request.Context(), row); err != nil {
6162
continue
6263
}
63-
// Upsert service_states: health_state=Error; detail/resolved_at/correlation_id left empty
64+
6465
if w, ok := h.dao.(ServiceStateWriter); ok {
6566
service := strings.TrimSpace(a.Labels["service"])
6667
version := strings.TrimSpace(a.Labels["service_version"]) // optional
6768
if service != "" {
68-
_ = w.UpsertServiceState(c.Request.Context(), service, version, row.AlertSince, "Error")
69-
_ = h.cache.WriteServiceState(c.Request.Context(), service, version, row.AlertSince, "Error")
69+
derived := "Warning"
70+
if row.Level == "P0" {
71+
derived = "Error"
72+
} else if row.Level == "P1" || row.Level == "P2" {
73+
derived = "Warning"
74+
}
75+
_ = w.UpsertServiceState(c.Request.Context(), service, version, nil, derived, row.ID)
76+
_ = h.cache.WriteServiceState(c.Request.Context(), service, version, time.Time{}, derived)
7077
}
7178
}
7279
// Write-through to cache. Errors are ignored to avoid impacting webhook ack.

0 commit comments

Comments
 (0)