Conversation
- Implement tests for slash command completions and input handling. - Validate mouse interactions for scrolling and selecting history. - Test command execution for /ask, /plan, and /run commands. - Ensure proper state management during command execution and history navigation. - Verify rendering behavior for output lines with folded details. - Check initial state for suggestions on empty input.
- 在 Command 结构中添加 Metadata 字段,用于存储命令的扩展注解。 - 实现命令调度钩子,支持将特定命令自动重定向到 agentline。 - 增加 agentline 命令的初始化参数支持,允许直接执行命令。 - 更新相关测试用例,确保新功能的正确性。 - 添加示例文档,展示如何使用新功能。
- Introduced `handleRunStreamWS` to manage WebSocket connections for streaming command execution. - Updated `CommandMeta` to include `SupportsStream` field for metadata. - Enhanced command collection to identify commands that support streaming. - Implemented client-side support for streaming commands in the web UI, including new buttons and handling for stream events. - Added tests for streaming command execution and metadata inclusion in command responses. - Created `RunCallback` utility for handling typed responses and streams. - Refactored response handling to support typed output and streaming.
There was a problem hiding this comment.
Code Review
This pull request introduces a comprehensive structured streaming and interactive command framework, adding InvocationStream, ResponseHandler, and ResponseStreamHandler to support both request-response and streaming execution models. Key features include a generic RunCallback for typed data consumption, WebSocket-based stream support in the Web UI, and extensive documentation updates. The review feedback identifies a potential race condition in the initialization of the internal response stream and an error-handling oversight where JSON serialization failures could be suppressed during data transmission.
command.go
Outdated
| func (inv *Invocation) ensureResponseStream(buffer int) chan any { | ||
| if inv.responseStream != nil { | ||
| return inv.responseStream | ||
| } | ||
| if buffer <= 0 { | ||
| buffer = defaultStreamResponseBuffer | ||
| } | ||
| inv.responseStream = make(chan any, buffer) | ||
| return inv.responseStream | ||
| } |
There was a problem hiding this comment.
此函数不是并发安全的。如果多个 goroutine 同时调用 ResponseStream()(它会调用此函数),当 inv.responseStream 为 nil 时,可能会导致竞争条件,即创建多个通道并泄漏其中一些。
考虑到流式处理通常意味着并发,建议使用互斥锁或 sync.Once 来确保 responseStream 的初始化是线程安全的。
例如,使用互斥锁(需要您在 Invocation 结构中添加一个 streamMu sync.Mutex 字段):
// In Invocation struct (conceptual):
// streamMu sync.Mutex
func (inv *Invocation) ensureResponseStream(buffer int) chan any {
inv.streamMu.Lock()
defer inv.streamMu.Unlock()
if inv.responseStream != nil {
return inv.responseStream
}
if buffer <= 0 {
buffer = defaultStreamResponseBuffer
}
inv.responseStream = make(chan any, buffer)
return inv.responseStream
}注意:sync.Mutex 不能被复制。Invocation 结构体通过 with 方法进行值复制,这会导致互斥锁被复制。您可能需要修改 with 方法或 Invocation 的使用方式(例如,使用指针)来解决此问题。
handler.go
Outdated
| _, err = fmt.Fprintf(s.inv.Stdout, "%v", data) | ||
| return err |
There was a problem hiding this comment.
当 json.Marshal(data) 失败时,其返回的 err 会被 fmt.Fprintf 的结果覆盖。如果 fmt.Fprintf 调用成功,err 会变为 nil,导致原始的 JSON 序列化错误被静默忽略。这可能会隐藏潜在的问题。
建议保留原始错误,例如通过 errors.Join 将 fmt.Fprintf 的错误与原始错误合并。
| _, err = fmt.Fprintf(s.inv.Stdout, "%v", data) | |
| return err | |
| _, werr := fmt.Fprintf(s.inv.Stdout, "%v", data) | |
| return errors.Join(err, werr) |
No description provided.