From 81e365e2776947e64b9cd332ffebf49b2b2d6549 Mon Sep 17 00:00:00 2001 From: "karni.rathore" Date: Mon, 30 Mar 2026 10:34:51 +0200 Subject: [PATCH] fix(git): serialize concurrent pushes and refresh refs to avoid stale state --- git/bindings.go | 10 ++++++++++ task/handler.go | 7 +++++++ task/pool.go | 11 +++++++++++ 3 files changed, 28 insertions(+) diff --git a/git/bindings.go b/git/bindings.go index 12e16ee..7ab38ef 100644 --- a/git/bindings.go +++ b/git/bindings.go @@ -58,6 +58,16 @@ func Ensure(ctx context.Context, dir, uri string, refs ...string) error { return nil } +// refresh fetches the latest state of a ref from origin (shallow, depth 1). +// This should be called before updating a ref that may have been modified +// by other concurrent tasks, to avoid stale local state. +func Refresh(ctx context.Context, dir, ref string) error { + if err := FetchShallow(ctx, dir, ref); err != nil { + return fmt.Errorf("git fetch refresh: %w", err) + } + return nil +} + // switch to ref. This should be called after Ensure // to get a writeable branch. func Switch(ctx context.Context, dir, ref string) error { diff --git a/task/handler.go b/task/handler.go index 42c37e0..9ae3d11 100644 --- a/task/handler.go +++ b/task/handler.go @@ -22,6 +22,13 @@ func KoboldHandler(ctx context.Context, cache string, g model.TaskGroup, runner msg string ) + // Refresh the source ref to ensure we have the latest remote state. + // This is critical when multiple tasks target the same repo+ref, + // to avoid stale local state that causes push rejections. + if err := git.Refresh(ctx, cache, g.RepoUri.Ref); err != nil { + return nil, fmt.Errorf("git refresh: %#q => %#q: %w", g.RepoUri.Repo, g.RepoUri.Ref, err) + } + if err := git.Switch(ctx, cache, g.RepoUri.Ref); err != nil { return nil, fmt.Errorf("git switch: %#q => %#q: %w", g.RepoUri.Repo, g.RepoUri.Ref, err) } diff --git a/task/pool.go b/task/pool.go index 48849d7..1916dcc 100644 --- a/task/pool.go +++ b/task/pool.go @@ -42,6 +42,10 @@ type Pool struct { cancel context.CancelFunc size int cache *git.RepoCache + // pushLocks serializes the handler (fetch→commit→push) per repo:ref pair, + // preventing concurrent goroutines from racing on the same remote branch. + pushLocks sync.Map + } func NewPool(ctx context.Context, size int, queries *model.Queries) *Pool { @@ -147,6 +151,13 @@ func (p *Pool) Dispatch() error { warns []string ) + // Serialize the full handler execution per repo:ref to prevent + // concurrent goroutines from racing on git push to the same branch. + lockKey := g.RepoUri.Repo + ":" + g.RepoUri.Ref + mu, _ := p.pushLocks.LoadOrStore(lockKey, &sync.Mutex{}) + mu.(*sync.Mutex).Lock() + defer mu.(*sync.Mutex).Unlock() + if path, err := p.cache.Get(p.ctx, ns, g.RepoUri.Repo); err == nil && p.handler != nil { warns, err = p.handler(p.ctx, path, g, p.hookRunner) if err != nil {