Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 112 additions & 30 deletions cmd/talk.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os/signal"
"strings"
"syscall"
"time"

"github.com/bgdnvk/clanker/internal/clankercloud"
"github.com/bgdnvk/clanker/internal/claudecode"
Expand All @@ -16,6 +17,16 @@ import (
"github.com/spf13/viper"
)

// Bridge crash recovery (clanker-cli #21). The Python bridge can die
// mid-session — bad Python deps, an unhandled exception inside a tool
// call, OOM. Before #21 the REPL just kept printing "bridge process
// exited" on every subsequent prompt. Now we restart it transparently,
// capped so a permanently-broken bridge doesn't burn CPU in a loop.
const (
maxBridgeRestarts = 3
bridgeRestartWin = time.Minute
)

var talkCmd = &cobra.Command{
Use: "talk",
Short: "Interactive conversation with an AI agent",
Expand Down Expand Up @@ -53,16 +64,17 @@ func runHermesTalk(parentCtx context.Context, debug bool) error {
return fmt.Errorf("hermes agent not found: %w\nRun 'make setup-hermes' to install", err)
}

runner := hermes.NewRunner(hermesPath, debug)
runner.SetEnv(buildHermesEnv())

ctx, cancel := context.WithCancel(parentCtx)
defer cancel()

runner := hermes.NewRunner(hermesPath, debug)
runner.SetEnv(buildHermesEnv())
if err := runner.Start(ctx); err != nil {
return fmt.Errorf("failed to start hermes agent: %w", err)
}
defer runner.Stop()
// Use a pointer-pointer so the deferred Stop() picks up the
// most-recent runner after a restart cycle.
defer func() { runner.Stop() }()

// Handle signals: Ctrl+C interrupts the current response but does not
// kill the session. A second Ctrl+C exits.
Expand All @@ -79,6 +91,7 @@ func runHermesTalk(parentCtx context.Context, debug bool) error {
fmt.Println("Type 'exit' or 'quit' to end the session.")
fmt.Println()

restartTimes := make([]time.Time, 0, maxBridgeRestarts)
scanner := bufio.NewScanner(os.Stdin)
for {
fmt.Print("you> ")
Expand Down Expand Up @@ -108,34 +121,38 @@ func runHermesTalk(parentCtx context.Context, debug bool) error {
}
}

events, err := runner.Prompt(ctx, input)
if err != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
continue
}

fmt.Print("hermes> ")
hadDelta := false
for event := range events {
switch {
case event.Error != nil:
fmt.Fprintf(os.Stderr, "\nError: %v\n", event.Error)
case event.MessageDelta != nil:
fmt.Print(event.MessageDelta.Text)
hadDelta = true
case event.ToolCall != nil:
if debug {
fmt.Fprintf(os.Stderr, "\n[tool: %s]\n", event.ToolCall.Name)
}
case event.Thought != nil:
if debug {
fmt.Fprintf(os.Stderr, "\n[thinking: %s]\n", event.Thought.Text)
}
case event.Final != nil:
if !hadDelta && event.Final.Text != "" {
fmt.Print(event.Final.Text)
// Inner retry loop: if the bridge dies mid-prompt, restart
// it once and re-issue the same prompt so the user doesn't
// have to retype.
for attempt := 0; attempt < 2; attempt++ {
bridgeExit, err := streamHermesPrompt(ctx, runner, input, debug)
if err != nil {
if hermes.IsBridgeExitError(err) {
bridgeExit = true
} else {
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
break
}
}
if !bridgeExit {
break
}
if attempt == 1 {
fmt.Fprintln(os.Stderr, "\nHermes bridge died again after restart — giving up on this turn.")
break
}
if !canRestartBridge(&restartTimes) {
fmt.Fprintf(os.Stderr, "\nHermes bridge crashed %d times in the last minute — refusing to restart again. Type 'exit' and rerun 'clanker talk'.\n", maxBridgeRestarts)
return fmt.Errorf("hermes bridge crashed too many times")
}
fmt.Fprintln(os.Stderr, "\nHermes bridge died — restarting...")
runner.Stop()
runner = hermes.NewRunner(hermesPath, debug)
runner.SetEnv(buildHermesEnv())
if err := runner.Start(ctx); err != nil {
fmt.Fprintf(os.Stderr, "Failed to restart hermes bridge: %v\n", err)
return fmt.Errorf("hermes bridge restart failed: %w", err)
}
}
fmt.Println()
fmt.Println()
Expand All @@ -144,6 +161,71 @@ func runHermesTalk(parentCtx context.Context, debug bool) error {
return nil
}

// streamHermesPrompt runs one prompt turn. Returns (bridgeExited, err).
// bridgeExited is true when the events channel closed because the
// bridge died (so the caller knows to restart). err is set for any
// non-bridge-death problem; bridge-death errors flow through the
// boolean instead so the caller can act on them uniformly.
func streamHermesPrompt(ctx context.Context, runner *hermes.Runner, input string, debug bool) (bool, error) {
events, err := runner.Prompt(ctx, input)
if err != nil {
if hermes.IsBridgeExitError(err) {
return true, nil
}
return false, err
}

fmt.Print("hermes> ")
hadDelta := false
bridgeExit := false
for event := range events {
switch {
case event.Error != nil:
if hermes.IsBridgeExitError(event.Error) {
bridgeExit = true
continue
}
fmt.Fprintf(os.Stderr, "\nError: %v\n", event.Error)
case event.MessageDelta != nil:
fmt.Print(event.MessageDelta.Text)
hadDelta = true
case event.ToolCall != nil:
if debug {
fmt.Fprintf(os.Stderr, "\n[tool: %s]\n", event.ToolCall.Name)
}
case event.Thought != nil:
if debug {
fmt.Fprintf(os.Stderr, "\n[thinking: %s]\n", event.Thought.Text)
}
case event.Final != nil:
if !hadDelta && event.Final.Text != "" {
fmt.Print(event.Final.Text)
}
}
}
return bridgeExit, nil
}

// canRestartBridge implements a sliding-window rate limit. Returns
// true (and appends a fresh timestamp) when a restart is permitted,
// false when the configured ceiling has been hit inside the window.
func canRestartBridge(history *[]time.Time) bool {
now := time.Now()
cutoff := now.Add(-bridgeRestartWin)
kept := (*history)[:0]
for _, t := range *history {
if t.After(cutoff) {
kept = append(kept, t)
}
}
*history = kept
if len(*history) >= maxBridgeRestarts {
return false
}
*history = append(*history, now)
return true
}

func handleClankerCloudTalk(ctx context.Context, question string, debug bool) (bool, error) {
client := clankercloud.NewClient()
result, err := client.AskAgent(ctx, question, "")
Expand Down
40 changes: 40 additions & 0 deletions internal/hermes/ringbuffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package hermes

import "sync"

// ringBuffer is a small fixed-capacity byte buffer that keeps the most
// recent N bytes written to it. Used to capture the tail of bridge
// stderr so the restart path (clanker-cli #21) can include "what the
// bridge said before it died" in its error message — typically a
// Python traceback ending in something useful like ModuleNotFoundError.
type ringBuffer struct {
mu sync.Mutex
buf []byte
cap int
}

func newRingBuffer(capacity int) *ringBuffer {
if capacity <= 0 {
capacity = 1
}
return &ringBuffer{cap: capacity}
}

// Write implements io.Writer. Keeps only the trailing `cap` bytes —
// older content is discarded as new content arrives.
func (r *ringBuffer) Write(p []byte) (int, error) {
r.mu.Lock()
defer r.mu.Unlock()
r.buf = append(r.buf, p...)
if len(r.buf) > r.cap {
r.buf = r.buf[len(r.buf)-r.cap:]
}
return len(p), nil
}

// String returns the current trailing contents as a string.
func (r *ringBuffer) String() string {
r.mu.Lock()
defer r.mu.Unlock()
return string(r.buf)
}
Loading
Loading