Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 36 additions & 8 deletions cmd/bw/commit_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,45 @@ package main
import (
"errors"
"fmt"
"math/rand"
"time"

"github.com/jallum/beadwork/internal/issue"
"github.com/jallum/beadwork/internal/treefs"
)

// commitMaxRetries bounds how many times mutating commands retry a commit
// after losing the CAS race against a concurrent writer.
const commitMaxRetries = 3
// after losing the CAS race against a concurrent writer. Set generously
// because in real-world use (multiple agents, background sync) the race
// can be sustained over many attempts; 3 turned out to be too few.
const commitMaxRetries = 12

// commitBackoffBase is the base delay between retry attempts. The actual
// delay is roughly base * 2^(attempt-1) with jitter, capped at
// commitBackoffMax. Backoff desynchronizes colliding writers so the
// retry storm dissipates.
const commitBackoffBase = 5 * time.Millisecond

// commitBackoffMax caps the per-attempt delay so a stuck retry loop
// finishes in bounded time.
const commitBackoffMax = 250 * time.Millisecond

// commitWithRetry runs apply, then store.Commit(intent), retrying up to
// maxRetries times when the commit fails with treefs.ErrRefMoved. Between
// attempts the store cache is cleared and the underlying TreeFS is
// refreshed so apply observes the latest ref state.
// attempts the store fully reopens its TreeFS (fresh go-git handle, no
// cached state) so apply observes the latest ref state, then waits a
// jittered backoff before re-running.
//
// apply must re-derive all state from the store on every invocation —
// values captured from a previous attempt are stale after Refresh and
// values captured from a previous attempt are stale after reopen and
// must not be reused.
func commitWithRetry(store *issue.Store, maxRetries int, apply func() (intent string, err error)) error {
var commitErr error
for attempt := 0; attempt < maxRetries; attempt++ {
if attempt > 0 {
store.ClearCache()
if err := store.Refresh(); err != nil {
return fmt.Errorf("refresh after conflict: %w", err)
sleepBackoff(attempt)
if err := store.ReopenFS(); err != nil {
return fmt.Errorf("reopen after conflict: %w", err)
}
}
intent, aerr := apply()
Expand All @@ -43,3 +58,16 @@ func commitWithRetry(store *issue.Store, maxRetries int, apply func() (intent st
}
return fmt.Errorf("commit failed after %d attempts: %w", maxRetries, commitErr)
}

// sleepBackoff sleeps for an exponentially-growing, jittered duration.
// attempt is 1-based.
func sleepBackoff(attempt int) {
d := commitBackoffBase << (attempt - 1)
if d > commitBackoffMax || d <= 0 {
d = commitBackoffMax
}
// Full jitter: pick a duration in [0, d). Avoids thundering-herd
// retries when several writers collide at the same instant.
jittered := time.Duration(rand.Int63n(int64(d) + 1))
time.Sleep(jittered)
}
127 changes: 127 additions & 0 deletions cmd/bw/commit_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,35 @@ package main
import (
"errors"
"fmt"
"os/exec"
"strings"
"testing"

"github.com/jallum/beadwork/internal/issue"
"github.com/jallum/beadwork/internal/testutil"
"github.com/jallum/beadwork/internal/treefs"
)

func run(t *testing.T, dir, name string, args ...string) {
t.Helper()
cmd := exec.Command(name, args...)
cmd.Dir = dir
if out, err := cmd.CombinedOutput(); err != nil {
t.Fatalf("%s %v: %s: %v", name, args, out, err)
}
}

func runOut(t *testing.T, dir, name string, args ...string) string {
t.Helper()
cmd := exec.Command(name, args...)
cmd.Dir = dir
out, err := cmd.CombinedOutput()
if err != nil {
t.Fatalf("%s %v: %s: %v", name, args, out, err)
}
return string(out)
}

// TestCommitWithRetryRecoversFromRefMove proves the retry helper refreshes
// and replays when another writer moves the ref between mutation and commit.
func TestCommitWithRetryRecoversFromRefMove(t *testing.T) {
Expand Down Expand Up @@ -96,6 +118,111 @@ func TestCommitWithRetryGivesUpAfterMax(t *testing.T) {
}
}

// TestCommitWithRetryRecoversFromRawGitRefMove proves the helper picks up
// state when the ref is moved by a raw git process (not go-git) — defending
// against any in-process caching the existing TreeFS might be holding.
// This is the closest analogue to the user's reported failure mode: a
// concurrent `bw` invocation in a different process advances the ref via
// the on-disk loose ref file.
func TestCommitWithRetryRecoversFromRawGitRefMove(t *testing.T) {
env := testutil.NewEnv(t)
defer env.Cleanup()

iss, _ := env.Store.Create("To start", issue.CreateOpts{})
if err := env.Repo.Commit("create " + iss.ID); err != nil {
t.Fatalf("seed commit: %v", err)
}

// Build a fully-independent commit object that we can advance the ref
// to via raw git plumbing (no go-git involvement). We pre-create a
// dangling commit referencing the current ref's tree so the ref move
// is valid history.
cur := strings.TrimSpace(runOut(t, env.Dir, "git", "rev-parse", "refs/heads/beadwork"))
tree := strings.TrimSpace(runOut(t, env.Dir, "git", "rev-parse", "refs/heads/beadwork^{tree}"))
newHash := strings.TrimSpace(runOut(t, env.Dir,
"sh", "-c",
"GIT_AUTHOR_NAME=racer GIT_AUTHOR_EMAIL=r@r GIT_COMMITTER_NAME=racer GIT_COMMITTER_EMAIL=r@r "+
"git commit-tree "+tree+" -p "+cur+" -m racer"))
if newHash == "" {
t.Fatal("failed to build racer commit")
}

calls := 0
err := commitWithRetry(env.Store, 3, func() (string, error) {
calls++
if _, serr := env.Store.Start(iss.ID, "alice"); serr != nil {
return "", serr
}
if calls == 1 {
// Advance the ref via raw git on the filesystem. The
// in-process TreeFS's go-git Repository handle is now
// looking at a stale view of refs/heads/beadwork.
run(t, env.Dir, "git", "update-ref", "refs/heads/beadwork", newHash)
}
return "start " + iss.ID, nil
})
if err != nil {
t.Fatalf("commitWithRetry: %v", err)
}
if calls < 2 {
t.Errorf("apply called %d times; expected at least 2 (one retry)", calls)
}
got, _ := env.Store.Get(iss.ID)
if got.Status != "in_progress" {
t.Errorf("final status = %q, want in_progress", got.Status)
}
}

// TestCommitWithRetrySurvivesSustainedContention proves the helper can
// recover when the ref is being moved on every retry until eventually
// the racer stops. This is the closest analogue to the user-reported
// failure: "commit failed after 3 attempts" because the race kept
// winning. With more retries (and ideally backoff), we should succeed.
func TestCommitWithRetrySurvivesSustainedContention(t *testing.T) {
env := testutil.NewEnv(t)
defer env.Cleanup()

iss, _ := env.Store.Create("To start", issue.CreateOpts{})
if err := env.Repo.Commit("create " + iss.ID); err != nil {
t.Fatalf("seed commit: %v", err)
}

racer, err := treefs.Open(env.Dir, "refs/heads/beadwork")
if err != nil {
t.Fatalf("open racer: %v", err)
}

// Move the ref on the first 5 attempts; let the 6th succeed.
calls := 0
err = commitWithRetry(env.Store, 10, func() (string, error) {
calls++
if _, serr := env.Store.Start(iss.ID, "alice"); serr != nil {
return "", serr
}
if calls <= 5 {
racer.WriteFile(fmt.Sprintf("r%d.txt", calls), []byte("x"))
if cerr := racer.Commit(fmt.Sprintf("racer %d", calls)); cerr != nil {
return "", cerr
}
}
return "start " + iss.ID, nil
})
if err != nil {
t.Fatalf("commitWithRetry: %v", err)
}
if calls < 6 {
t.Errorf("apply called %d times; expected at least 6", calls)
}
}

// TestCommitMaxRetriesIsGenerous documents the chosen retry budget.
// Three was too few in practice; bump it.
func TestCommitMaxRetriesIsGenerous(t *testing.T) {
if commitMaxRetries < 10 {
t.Errorf("commitMaxRetries = %d, want >= 10 (got user-reported failure at 3)", commitMaxRetries)
}
}

// TestCommitWithRetryPropagatesNonRetryableErrors proves we don't loop on
// errors unrelated to CAS.
func TestCommitWithRetryPropagatesNonRetryableErrors(t *testing.T) {
Expand Down
24 changes: 24 additions & 0 deletions internal/issue/issue.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,30 @@ func (s *Store) Refresh() error {
return s.FS.Refresh()
}

// Reopener is implemented by Committers that can fully reopen their
// underlying TreeFS (fresh git.PlainOpen, fresh storer). Used between
// retry attempts to discard any cached state.
type Reopener interface {
Reopen() (*treefs.TreeFS, error)
}

// ReopenFS fully replaces the underlying TreeFS via the Committer's
// Reopener, then clears caches. Stronger than Refresh: discards any
// cached go-git state (packed-refs cache, etc.). Falls back to Refresh
// if the Committer does not implement Reopener.
func (s *Store) ReopenFS() error {
s.ClearCache()
if r, ok := s.Committer.(Reopener); ok {
tfs, err := r.Reopen()
if err != nil {
return err
}
s.FS = tfs
return nil
}
return s.FS.Refresh()
}

// Now returns the current time in UTC. If the BW_CLOCK environment variable
// is set to an RFC3339 value, that fixed time is used instead of the real
// clock. This enables deterministic timestamps for testing and migration.
Expand Down
19 changes: 19 additions & 0 deletions internal/repo/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,25 @@ func (r *Repo) TreeFS() *treefs.TreeFS {
return r.tfs
}

// Reopen replaces the in-memory go-git repository handle and TreeFS with
// freshly-opened copies. Use this between retry attempts in concurrent
// scenarios so any cached state (packed-refs cache, etc.) is discarded
// and the next operation reads directly from disk. The new TreeFS is
// returned so callers (e.g. issue.Store) can swap their own pointer.
func (r *Repo) Reopen() (*treefs.TreeFS, error) {
repoDir := filepath.Dir(r.GitDir)
goRepo, err := openGitRepo(repoDir)
if err != nil {
return nil, fmt.Errorf("reopen repo: %w", err)
}
tfs, err := treefs.OpenFromRepo(goRepo, refLocal)
if err != nil {
return nil, fmt.Errorf("reopen treefs: %w", err)
}
r.tfs = tfs
return tfs, nil
}

// UserName reads user.name from the git config (local, then global, then
// system) using go-git's ConfigScoped. Returns "unknown" if unset.
func (r *Repo) UserName() string {
Expand Down
Loading