Skip to content

ApproveAndSend: inbound + side-effect writes happen outside the locking tx #128

@jiashuoz

Description

@jiashuoz

Follow-up from #109 review.

The problem

`identity.Store.ApproveAndSend` (and its TTL counterpart `ExpireApproveAndSend`) open a row-locking transaction, then call a user-supplied send callback inside that tx, then commit. The callback's side effects — actually sending the message — happen on a separate DB connection (or external network call) from the tx.

This creates a small but real consistency window. Two cases:

Case A: SES (existing). Callback calls `outbound.Sender.Send`. SES accepts the message → returns success. Then the tx `UPDATE messages SET status='sent', provider_message_id=... WHERE id=$1` fails to commit (connection drops, statement_timeout, server killed). Result: SES is sending the message to the recipient, but the DB still has the row at `pending_approval`. User re-approves → SES sends again → duplicate delivery to the external recipient.

Case B: Loopback (new in #109). Callback calls `loopback.DeliverInbound` which writes an inbound row via `store.CreateInboundMessage` on a non-tx connection. The inbound write succeeds. Then the outer tx fails to commit. Result: an inbound row exists in the agent's mailbox, but the outbound row stays `pending_approval`. The agent sees a self-message in their inbox but the dashboard shows the outbound as still awaiting approval. Re-approving creates a second inbound row.

Same shape on the worker side via `ExpireApproveAndSend`.

Frequency

Rare in practice — pgx pool connections are stable, statement_timeout doesn't typically fire during a small UPDATE — but not zero, especially during deploys, DB failovers, or under load.

Fix sketch

Pass the locking tx into the send callback so the inbound write (or any other DB side effect) shares the tx. Outline:

```go
// Today:
func (s *Store) ApproveAndSend(
ctx context.Context, messageID, userID string, edits PendingApprovalEdit,
send func(*Message) (SendResult, error),
) (*Message, error) {
// ...begin tx, lock row, call send(locked), apply result, commit
}

// Proposed:
func (s *Store) ApproveAndSend(
ctx context.Context, messageID, userID string, edits PendingApprovalEdit,
send func(tx pgx.Tx, msg *Message) (SendResult, error),
) (*Message, error) { ... }
```

Then `loopback.DeliverInbound` accepts the tx as an additional parameter (replacing the `InboundWriter` interface for the HITL path; the non-HITL fast path in `internal/agent/selfsend.go.performSelfSend` still uses the non-tx variant since it doesn't have an outer tx to join).

SES side (Case A) is trickier — SES IS an external system, so we can't put it in our tx. The mitigation there is different: persist the `provider_message_id` in a separate transaction immediately after SES accepts, then do the status flip in a second tx that reads the persisted provider id. If the second tx fails, a sweeper job can complete the transition idempotently. Bigger refactor — file as a separate issue if pursued.

Scope

  • Case B (loopback): clean win, ~50 lines, can land as a small PR.
  • Case A (SES): documented limitation; defer until we see actual duplicate-send symptoms in production telemetry.

Existing mitigations

  • Row-level locking prevents concurrent workers from acting on the same row.
  • The `messages.id` primary key prevents naive duplicate-insertion races.
  • Operator-visible state on the duplicate path is recoverable (a 2nd inbound row is annoying, not catastrophic).

Acceptance criteria

  • New test that simulates a tx commit failure (mock store, inject error from `Commit`) and asserts the inbound row also rolls back for the loopback case.
  • Existing tests still pass.
  • No behavior change on the happy path.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions