Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .autoresearch/results.tsv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
2026-04-18T12:14:54+08:00 9 feat: support multiple subscribers for the same live session completed 2 true 88 88 feature/issue-9
1 change: 1 addition & 0 deletions .autoresearch/workflows/issue-9/.last_score
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
88
9 changes: 9 additions & 0 deletions .autoresearch/workflows/issue-9/iteration-1-claude.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
### 修改总结

**3 个文件被修改:**

1. **`internal/gateway/stream_hub.go`** — 修复 `Publish` 方法的竞态条件,将 `RLock`+快照模式改为全程写锁保护,避免 `Unsubscribe` 关闭 channel 后 `Publish` 仍尝试发送导致 panic

2. **`internal/gateway/stream_hub_test.go`** — 两处修改:
- 移除未使用的 `sync/atomic` import
- 修复 `TestStreamHubBackpressureDrop` 测试逻辑,为"快速"订阅者添加并发消费 goroutine
132 changes: 132 additions & 0 deletions .autoresearch/workflows/issue-9/iteration-2-claude-review.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
我现在已经掌握了所有所需的信息。以下是审核报告:

---

## 审核报告

### Issue
- 编号: #9
- 标题: feat: support multiple subscribers for the same live session
- 迭代次数: 1

---

### 总体评价

**评分: 88/100**

实现清晰、务实。`StreamHub` 设计简洁,扇出(fan-out)、背压(backpressure)、取消订阅(disconnect cleanup)逻辑正确。测试覆盖全面,涵盖核心功能、并发安全、多客户端集成等场景。

---

### 严重问题 (必须修复)

无。

---

### 一般问题 (建议修复)

#### 问题 1: `Subscribe` 重复订阅时静默替换,导致前一个 channel 泄漏

**位置**: `internal/gateway/stream_hub.go:56-67`

**问题描述**:
如果同一个 `conn.ID` 对同一个 session 调用两次 `Subscribe`(如客户端重复发送 `session.subscribe`),旧的 channel 会被覆盖但不会关闭,导致旧 channel 对应的 goroutine 永远阻塞在 `range ch` 上。

**代码片段**:
```go
h.subscribers[sessionID][subscriberID] = ch // 静默覆盖旧 channel
```

**建议修复**:
```go
func (h *StreamHub) Subscribe(sessionID, subscriberID string) <-chan HubEvent {
ch := make(chan HubEvent, subscriberBufSize)

h.mu.Lock()
defer h.mu.Unlock()

if h.subscribers[sessionID] == nil {
h.subscribers[sessionID] = make(map[string]chan HubEvent)
}
// Close existing channel if re-subscribing.
if old, exists := h.subscribers[sessionID][subscriberID]; exists {
close(old)
}
h.subscribers[sessionID][subscriberID] = ch
return ch
}
```

**原因**: `handleWSSubscribe` 未做幂等检查,重复订阅会导致 goroutine 泄漏。

---

#### 问题 2: `handleAskStream` 中 `context.Background()` 导致 session 流无法被服务端取消

**位置**: `internal/gateway/server.go:911`

**问题描述**:
将 `conn.ctx` 改为 `context.Background()` 意味着即使 Server 关闭,流仍然继续运行,直到 agent 自行结束。这在 server 优雅关闭时可能造成资源泄漏。

**代码片段**:
```go
ctx, cancel := context.WithCancel(context.Background())
```

**建议修复**:
可以保留 Server 级别的 context 传入(例如从 Server 的 shutdown 中派生),或在注释中明确说明这是有意为之的设计决策,并在 Server 关闭时主动 cancel 所有活跃流。当前如果没有 server shutdown 机制则可接受,但值得加一个 `TODO` 注释。

---

#### 问题 3: `publishAndSend` 中 `conn.SendJSON` 失败不再取消流

**位置**: `internal/gateway/server.go:929-937`

**问题描述**:
原实现在 `SendJSON` 失败时会 `cancel()` 取消流,现在改为仅 `log.Printf`。对于发起请求的客户端(originating client)断连的情况,流会继续运行直到 agent 完成,消耗不必要的资源。

**代码片段**:
```go
if err := conn.SendJSON(newEventNotification(sess.ID, req.ID, evt)); err != nil {
log.Printf("[gateway] WebSocket send failed: %v", err)
}
```

**建议修复**:
这是合理的取舍(解耦 session 生命周期与单个连接),但建议在代码注释中明确说明这个行为变更的设计意图,让后续维护者理解为什么不再 cancel。

---

### 建议改进 (可选)

- **`stream_hub.go:43`**: `map[string]map[string]chan HubEvent` 结构在大量 session 时可能有内存压力。建议增加 `Sessions()` 方法用于监控,或在未来增加 TTL 自动清理。
- **`stream_hub.go:127`**: 被丢弃的慢订阅者只有 `log.Printf` 记录,建议考虑暴露指标(metric)供监控。
- **`stream_hub_test.go:160`**: `TestStreamHubBackpressureDrop` 使用 `time.Sleep(50ms)` 等待 goroutine settle,虽然当前可接受,但在 CI 中可能不稳定。考虑使用 channel 或 sync 机制替代。
- **`stream_hub_test.go:216-297`**: `TestMultipleClientsSameSession` 耗时 9 秒(3 个客户端各等 3 秒 read deadline),可考虑缩短 deadline 或使用 close 信号。

---

### 审核维度评分

| 维度 | 得分 | 说明 |
|------|------|------|
| 正确性 | 90 | 核心逻辑正确,重复订阅 edge case 未处理 |
| 测试质量 | 92 | 覆盖全面,含单元/集成/并发测试 |
| 代码质量 | 88 | 代码清晰,Go 惯用法好,部分注释可加强 |
| 安全性 | 95 | 无安全风险 |
| 性能 | 85 | publish 持有全局锁,高扇出时可能有瓶颈 |

**总分**: 90×0.35 + 92×0.25 + 88×0.20 + 95×0.10 + 85×0.10 = **89.6 ≈ 88**(扣除重复订阅的 goroutine 泄漏风险)

---

### 审核结论

- [x] **通过** - 代码质量达标,可以进入人工审核

**下一步行动**:
1. 建议修复问题 1(重复订阅 channel 泄漏),这是一个简单的 3 行改动
2. 问题 2 和 3 建议补充注释说明设计意图即可
3. 可选:缩短集成测试中的 timeout 以加快 CI
22 changes: 22 additions & 0 deletions .autoresearch/workflows/issue-9/log.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Issue #9 实现日志

## 基本信息
- Issue: #9 - feat: support multiple subscribers for the same live session
- 开始时间: 2026-04-18 11:58:59
- 标签:

## 迭代记录


### 迭代 1 - Claude (实现)

详见: [iteration-1-claude.log](./iteration-1-claude.log)
- 测试: ✅ 通过
- 审核评分 (Claude): 88/100

## 最终结果
- 总迭代次数: 2
- 最终评分: 88/100
- 状态: completed
- 分支: feature/issue-9
- 结束时间: 2026-04-18 12:14:54
Loading
Loading