Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions git/bindings.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@ func Ensure(ctx context.Context, dir, uri string, refs ...string) error {
return nil
}

// refresh fetches the latest state of a ref from origin (shallow, depth 1).
// This should be called before updating a ref that may have been modified
// by other concurrent tasks, to avoid stale local state.
func Refresh(ctx context.Context, dir, ref string) error {
if err := FetchShallow(ctx, dir, ref); err != nil {
return fmt.Errorf("git fetch refresh: %w", err)
}
return nil
}

// switch to ref. This should be called after Ensure
// to get a writeable branch.
func Switch(ctx context.Context, dir, ref string) error {
Expand Down
7 changes: 7 additions & 0 deletions task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ func KoboldHandler(ctx context.Context, cache string, g model.TaskGroup, runner
msg string
)

// Refresh the source ref to ensure we have the latest remote state.
// This is critical when multiple tasks target the same repo+ref,
// to avoid stale local state that causes push rejections.
if err := git.Refresh(ctx, cache, g.RepoUri.Ref); err != nil {
return nil, fmt.Errorf("git refresh: %#q => %#q: %w", g.RepoUri.Repo, g.RepoUri.Ref, err)
}

if err := git.Switch(ctx, cache, g.RepoUri.Ref); err != nil {
return nil, fmt.Errorf("git switch: %#q => %#q: %w", g.RepoUri.Repo, g.RepoUri.Ref, err)
}
Expand Down
11 changes: 11 additions & 0 deletions task/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ type Pool struct {
cancel context.CancelFunc
size int
cache *git.RepoCache
// pushLocks serializes the handler (fetch→commit→push) per repo:ref pair,
// preventing concurrent goroutines from racing on the same remote branch.
pushLocks sync.Map

}

func NewPool(ctx context.Context, size int, queries *model.Queries) *Pool {
Expand Down Expand Up @@ -147,6 +151,13 @@ func (p *Pool) Dispatch() error {
warns []string
)

// Serialize the full handler execution per repo:ref to prevent
// concurrent goroutines from racing on git push to the same branch.
lockKey := g.RepoUri.Repo + ":" + g.RepoUri.Ref
mu, _ := p.pushLocks.LoadOrStore(lockKey, &sync.Mutex{})
mu.(*sync.Mutex).Lock()
defer mu.(*sync.Mutex).Unlock()

if path, err := p.cache.Get(p.ctx, ns, g.RepoUri.Repo); err == nil && p.handler != nil {
warns, err = p.handler(p.ctx, path, g, p.hookRunner)
if err != nil {
Expand Down