Skip to content

源健康监控与自动降级 #4

@Arcadia822

Description

@Arcadia822

背景

当前 sources.json 中的源只有 enabled: true/false 两态。当某个 RSS 源持续返回错误、超时或空内容时:

  1. agent 无感知_load_events_from_sources() 静默跳过失败的源,不记录失败原因和次数。
  2. 无法自动恢复:暂时性故障的源被跳过后,下次 run 仍然会尝试(然后再次失败),没有退避机制。
  3. 用户无法诊断:agent 无法告诉用户"你的 X 源已经连续 3 天拉取失败"。

参考实现

OpenTrends (nexmoe/opentrends) 在 source 表中维护了完整的源状态机:

status: ok / stale / error
fetchedAt, lastSuccessAt, expiresAt, staleUntil
itemCount, errorCount, lastError
refreshOwner, refreshLockedUntil  (分布式锁)

前端以绿/黄/红状态点展示每个源的健康状况。服务端根据状态自动决定刷新策略——ok 源按正常 TTL 刷新,error 源有 5 分钟退避期。

提案

1. 源状态追踪文件

在 topic 目录下新增 source-health.json

{
  "interface_version": "skrya.source-health.v1",
  "updated_at": "2026-05-17T10:00:00+08:00",
  "sources": {
    "verge-ai": {
      "status": "ok",
      "last_success_at": "2026-05-17T09:30:00+08:00",
      "last_fetch_at": "2026-05-17T09:30:00+08:00",
      "consecutive_failures": 0,
      "last_error": null,
      "item_count": 5
    },
    "some-broken-source": {
      "status": "degraded",
      "last_success_at": "2026-05-15T08:00:00+08:00",
      "last_fetch_at": "2026-05-17T09:30:00+08:00",
      "consecutive_failures": 12,
      "last_error": "HTTP 403: Forbidden",
      "item_count": 0
    }
  }
}

2. 状态转换规则

ok → degraded: 连续失败 >= 3 次
degraded → error: 连续失败 >= 10 次
degraded → ok: 成功拉取 1 次
error → degraded: 距上次 error >= 24h 且成功拉取 1 次

3. 退避策略

  • ok 源:正常拉取
  • degraded 源:仍然拉取,但在 digest 中标注"该源近期不稳定"
  • error 源:跳过拉取,避免浪费 agent 的执行时间

4. Agent 反馈机制

在 digest 系统提示区增加源状态报告:

## 源状态
✅ 5 个源正常
⚠️ 2 个源降级(知乎热榜: 连续失败5次, The Verge: 连续失败3次)
❌ 1 个源已停用(某源: 连续失败15次, HTTP 403)

> 如果需要替换故障源,建议运行: 替换知乎热榜的数据源

5. 对 _load_events_from_sources() 的改造

def _load_events_from_sources(self, topic_id, sources):
    health = self._load_source_health(topic_id)
    results = []
    
    for source in sources:
        source_health = health.get(source["id"])
        
        # error 状态源直接跳过
        if source_health and source_health["status"] == "error":
            continue
        
        try:
            items = self._fetch_source(source)
            self._record_success(health, source["id"], len(items))
            results.extend(items)
        except Exception as e:
            self._record_failure(health, source["id"], str(e))
    
    self._save_source_health(topic_id, health)
    return results

收益

  • agent 可诊断:能在 digest 中告诉用户哪些源有问题
  • 减少无效请求:长期故障源不再消耗执行时间
  • 自动恢复:暂时性故障源退避后自动恢复拉取

优先级

— 提升 agent 输出质量和运行效率,但不影响核心功能。

备注

来源:agent=牛马AI, model=mimo-v2.5-pro, reference=opentrends

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions