Skip to content

Generation-based 事件生命周期管理 #6

@Arcadia822

Description

@Arcadia822

背景

当前事件数据的生命周期管理较为简单:

  1. Ingest 纯追加latest-ingest.json 每次全量覆盖,历史 ingest 以时间戳文件归档在 normalized/ 目录,但没有清理机制。
  2. RSS 去重靠标题_load_events_from_sources()normalized_lower(title) 去重,但没有跨 run 的持久化去重——同一事件在不同天的 run 中会重复出现。
  3. 无过期概念normalized/ 下的历史 ingest 文件持续增长,没有 TTL 或 generation 机制来淘汰旧数据。

参考实现

OpenTrends (nexmoe/opentrends) 使用 Generation-based 生命周期管理

每个 source 记录维护一个 generation 整数,每次成功刷新时 +1。数据项(sourceItem)以 (sourceId, itemId) 为复合主键,通过 upsert 写入——同一 source 的新 generation 数据更新现有项,旧 generation 中不再出现的项在查询时自然过滤掉。

source 表: sourceId(PK), generation, status, fetchedAt, itemCount, ...
sourceItem 表: (sourceId, itemId)(PK), generation, url, title, contentHash, ...

关键设计:

  • contentHash(FNV-1a)用于轻量级去重
  • 查询时 WHERE generation = (SELECT MAX(generation) FROM source WHERE sourceId = ?) 只返回最新一代数据
  • 旧 generation 数据不主动删除,仅在 generation 差距过大时清理

提案

将当前的"时间戳追加"模式改为"generation + content hash"模式:

1. 源 Generation 追踪

source-health.json(依赖 #4 提案)中增加 generation 字段:

{
  "verge-ai": {
    "status": "ok",
    "generation": 42,
    "last_success_at": "...",
    "item_count": 5
  }
}

2. 事件 ID 和 Content Hash

为每个事件生成稳定 ID:

import hashlib

def event_id(source_id: str, title: str, url: str) -> str:
    """生成稳定的事件 ID,跨 run 不变"""
    raw = f"{source_id}:{normalize(title)}:{url}"
    return hashlib.sha256(raw.encode()).hexdigest()[:12]

def content_hash(title: str, description: str = "") -> str:
    """内容哈希,用于检测事件内容是否变化"""
    raw = f"{normalize(title)}||{normalize(description)}"
    return hashlib.sha256(raw.encode()).hexdigest()[:16]

3. Ingest 格式升级

skrya.ingest.v1skrya.ingest.v2

{
  "interface_version": "skrya.ingest.v2",
  "topic_id": "ai-browser",
  "fetched_at": "2026-05-17T09:30:00+08:00",
  "events": [
    {
      "event_id": "a1b2c3d4e5f6",
      "source_id": "verge-ai",
      "title": "OpenAI launches agent browser",
      "description": "...",
      "url": "https://...",
      "content_hash": "f6e5d4c3b2a1",
      "published_at": "2026-05-17T08:00:00Z"
    }
  ],
  "source_generations": {
    "verge-ai": 42,
    "techcrunch-ai": 38
  }
}

4. 跨 Run 去重

generate_digest() 的事件排名阶段:

def _deduplicate_events(self, events, topic_id):
    """跨 run 去重:同一 event_id 在最近 N 次 digest 中出现过则降权或跳过"""
    recent_event_ids = self._load_recent_digest_event_ids(topic_id, runs=3)
    
    deduped = []
    for event in events:
        eid = event["event_id"]
        if eid in recent_event_ids:
            # 已在近期 digest 中出现:大幅降权但不完全排除
            event["_score"] *= 0.1
        deduped.append(event)
    
    return deduped

5. 旧数据清理

def _cleanup_old_ingests(self, topic_id, keep_runs=10):
    """保留最近 N 次 ingest 归档,清理更早的"""
    ingests_dir = self._paths.ingest_normalized_dir(topic_id)
    files = sorted(ingests_dir.glob("*.json"))
    if len(files) > keep_runs:
        for old_file in files[:len(files) - keep_runs]:
            old_file.unlink()

收益

  • 事件去重:同一事件不会在连续多天的 digest 中重复出现
  • 内容变更检测content_hash 可识别标题/描述被更新的事件
  • 存储清理:自动清理旧 ingest 文件,防止磁盘无限增长
  • 数据可追溯:generation 让 agent 可以追溯"这个事件是哪个源的第几代数据"

优先级

— 数据质量改进,对长期运行的 agent 场景价值高。

备注

来源:agent=牛马AI, model=mimo-v2.5-pro, reference=opentrends

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions