Skip to content

Fast publish initial design#32

Open
piotrpio wants to merge 12 commits intomainfrom
jetstreamext/fastpublish
Open

Fast publish initial design#32
piotrpio wants to merge 12 commits intomainfrom
jetstreamext/fastpublish

Conversation

@piotrpio
Copy link
Collaborator

No description provided.

@piotrpio piotrpio force-pushed the jetstreamext/fastpublish branch from 88118d3 to 3374d12 Compare December 3, 2025 13:50
@piotrpio piotrpio force-pushed the jetstreamext/fastpublish branch 3 times, most recently from b15a9cf to c47ebae Compare March 26, 2026 14:44
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
…adata

Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
@piotrpio piotrpio force-pushed the jetstreamext/fastpublish branch 2 times, most recently from e144e89 to a282c25 Compare March 26, 2026 15:18
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
@piotrpio piotrpio force-pushed the jetstreamext/fastpublish branch from a282c25 to 0693565 Compare March 26, 2026 15:26
@piotrpio
Copy link
Collaborator Author

@claude review this PR

@synadia-claude-reviewer
Copy link

synadia-claude-reviewer bot commented Mar 26, 2026

Claude finished @piotrpio's task in 3m 0s —— View job


PR Review: Fast publish initial design

  • Gather context and read changed files
  • Review fastpublish.go — core implementation
  • Review fastpublish_options.go — options pattern
  • Review errors.go — new sentinel errors
  • Review fastpublish_test.go — test coverage
  • Review publishbatch.go — modifications
  • Summarize findings

Overall this is a solid design for a fast-ingest batch publisher. The flow control mechanism (stall channel, ping/retry, server-driven frequency updates) is well thought out. A few issues found below, ranging from a timer leak to test bugs.

Bugs

1. time.After timer leak in AddMsgfastpublish.go:324

case <-time.After(fp.opts.ackTimeout):

time.After allocates a timer that cannot be GC'd until it fires. For a high-throughput publisher creating many batches, this leaks a timer per batch. Should use time.NewTimer with defer timer.Stop():

firstAckTimer := time.NewTimer(fp.opts.ackTimeout)
defer firstAckTimer.Stop()
// ...
case <-firstAckTimer.C:

Fix this →


2. Test assertion mismatch — fastpublish_test.go:72-74

if fastAck.AckSequence != 0 {
    t.Fatalf("Expected fastAck.AckSequence to be 1, got %d", fastAck.AckSequence)
}

The condition checks != 0 but the error message says "Expected... to be 1". Either:

  • the condition is correct and the message should say "to be 0", or
  • the condition should be != 1 (second message, first was acked)

Given that the first message is acked synchronously before the second Add returns, AckSequence of the second message should be 1 (the first msg's stream sequence), making the intended check != 1.

Fix this →


3. Wrong error description for ErrFastBatchNotEnablederrors.go:69

ErrFastBatchNotEnabled jetstream.JetStreamError = &jsError{apiErr: &jetstream.APIError{..., Description: "batch publish not enabled on stream", ...}}

This has the same description as ErrBatchPublishNotEnabled (line 31). Should read "fast batch publish not enabled on stream" to distinguish the two.

Fix this →


Design / Behavioural Concerns

4. Error handler called while holding fp.mufastpublish_options.go:55-57 / fastpublish.go:534

The comment correctly warns about this, but the implication is significant: any call back into the FastPublisher from within the error handler will deadlock (e.g., checking IsClosed(), logging the ack sequence, etc.). Consider documenting this more prominently in the FastPublisher interface doc, or switching to a queued/async error delivery.


5. Empty-batch Close returns a non-sentinel error — fastpublish.go:516-518

if fp.sequence == 0 {
    fp.mu.Unlock()
    return nil, errors.New("no messages in batch")
}

Every other error in this package is a sentinel (exported var) that callers can check with errors.Is. This one-off errors.New can't be matched. Consider adding ErrEmptyBatch to errors.go for consistency.

Fix this →


6. firstAckCh not cleared on timeout path — fastpublish.go:325-333

When the first-message ack times out, fp.closed = true and fp.ackSub is cleaned up, but fp.firstAckCh is left non-nil. If an in-flight ack arrives and the ackMsgHandler fires before the unsubscribe takes effect, it will send into the buffered channel and close it — not a panic, but a minor inconsistency. Setting fp.firstAckCh = nil (and fp.initialErrCh = nil) in the timeout branch would match the cleanup done in the error branch (lines 320-323).


7. Duplicate subscribe logic in commit()fastpublish.go:444-451

commit() contains a fallback subscribe block (for the Close/EOB path when called without prior Add). It could share the logic from AddMsg, but more importantly this path is never exercised by tests — Close requires fp.sequence > 0, meaning at least one AddMsg was called and the subscription already exists. The code is defensive but currently dead. A comment explaining when this path is reached would help.


Minor / Nits

8. Redundant double-set of ExpectedLastSubjSeqHeaderfastpublish.go:395-401

if o.lastSubjectSeq != nil {
    msg.Header.Set(jetstream.ExpectedLastSubjSeqHeader, ...)
}
if o.lastSubject != "" {
    msg.Header.Set(jetstream.ExpectedLastSubjSeqSubjHeader, ...)
    msg.Header.Set(jetstream.ExpectedLastSubjSeqHeader, ...) // sets same header again
}

When both lastSubjectSeq and lastSubject are set, ExpectedLastSubjSeqHeader is written twice (same value, so harmless). This pre-existing pattern was copied from publishbatch.go. It's worth cleaning up in both places to avoid confusion.


9. Duplicate test error check — fastpublish_test.go:256-259

ack, err := batch.Close(context.Background())
if err != nil {
    t.Fatalf("Unexpected error discarding batch: %v", err)
}

if err != nil {   // ← dead code, always false here
    t.Fatalf("Unexpected error committing batch: %v", err)
}

The second if err != nil block at line 257-259 is dead code — err is already confirmed nil by the block above.

Fix this →


10. Close passes the EOB commit message without data but with batchSubjectfastpublish.go:524

Close sends nats.NewMsg(fp.batchSubject) as the EOB commit. This means the commit message has no data and reuses the last batch subject. This is intentional based on the protocol design, but adding a brief comment about why a no-data EOB is sent to the stream subject (rather than a special subject) would help future readers.

Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
@piotrpio piotrpio force-pushed the jetstreamext/fastpublish branch from 89487ff to 29ea326 Compare March 26, 2026 15:44
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
@piotrpio piotrpio force-pushed the jetstreamext/fastpublish branch from 30e4fcc to 278dd07 Compare March 26, 2026 15:52
@piotrpio
Copy link
Collaborator Author

@claude re-review this PR

@synadia-claude-reviewer
Copy link

synadia-claude-reviewer bot commented Mar 26, 2026

Claude finished @piotrpio's task in 2m 47s —— View job


PR Re-Review: Fast publish initial design

  • Gather context
  • Read fastpublish.go
  • Read fastpublish_options.go
  • Read errors.go
  • Read fastpublish_test.go
  • Read publishbatch.go
  • Compare against previous review findings
  • Summarize findings

Good progress — 7 of the 10 previous findings are resolved. Two design concerns remain, and I found two new issues.

Previous findings: status

# Issue Status
1 time.After timer leak in AddMsg ✅ Fixed — time.NewTimer + defer Stop()
2 Test assertion mismatch (AckSequence) ✅ Fixed — message now says "to be 0"
3 Wrong description for ErrFastBatchNotEnabled ✅ Fixed — "fast batch publish not enabled…"
4 Error handler called while holding fp.mu ⚠️ Documented, design concern persists
5 Non-sentinel errors.New in Close ✅ Fixed — ErrEmptyBatch sentinel added
6 firstAckCh/initialErrCh not cleared on timeout ✅ Fixed
7 Dead subscribe fallback in commit() without comment ⚠️ Still no explanatory comment
8 Redundant double-set of ExpectedLastSubjSeqHeader ✅ Fixed — else if used in both files
9 Duplicate dead if err != nil in test ✅ Fixed
10 Close EOB comment suggestion — Minor, noted

New Bugs

1. Test error message wrong in fastpublish_test.go:165

if ack.BatchSize != 201 {
    t.Fatalf("Expected BatchAck.BatchSize to be 21, got %d", ack.BatchSize)
}

Condition checks != 201 but the error message says "to be 21" — should be "to be 201".

Fix this →


2. Uppercase error string in ackMsgHandlerfastpublish.go:596

fp.errHandler(fmt.Errorf("Error processing batch at sequence %d: %w", flowErr.Sequence, flowErr.Error))

Go convention (and all other error messages in this file) is lowercase. Should be "error processing batch at sequence %d: %w".

Fix this →


Remaining Design Concerns

4. Error handler called while holding fp.mu (unchanged)

The WithFastPublisherErrorHandler doc now correctly warns about this. The risk is deadlock if the handler calls back into the FastPublisher (e.g., IsClosed(), reading ackSequence). The warning is sufficient for now, but worth noting: all errHandler calls in ackMsgHandler currently happen under fp.mu, so the doc warning applies to all of them, not just a subset.

7. Dead subscribe fallback in commit()fastpublish.go:449-456

if fp.ackSub == nil {
    ackSub, err := fp.js.Conn().Subscribe(...)
    // ...
}

Close() requires fp.sequence > 0, so at least one AddMsg succeeded, which means fp.ackSub is already set. This path is never reached. A one-line comment explaining why it is kept (or removing it) would prevent future confusion.


Minor Nit

fp.flow = firstAck.Messages in AddMsg:312 is redundant

ackMsgHandler already updates fp.flow (and calls buildReplyPrefix()) before sending to firstAckCh. The assignment on line 312 re-sets the same value. Harmless, but slightly misleading since it lacks the corresponding buildReplyPrefix() call. Consider removing or adding a comment.

Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
@piotrpio
Copy link
Collaborator Author

@claude re-review this PR

  1. Error handler under lock is documented and intentional
  2. Dead subscribe fallback and redundant fp.flow assignment are ok

@synadia-claude-reviewer
Copy link

synadia-claude-reviewer bot commented Mar 26, 2026

Claude finished @piotrpio's task in 2m 26s —— View job


PR Re-Review: Fast publish initial design

  • Gather context
  • Read fastpublish.go
  • Read fastpublish_test.go
  • Check prior open bugs
  • Review for new issues
  • Summarize findings

All previously open bugs are resolved, and the user-dismissed design concerns (#4 error handler under lock, #7 dead subscribe fallback, redundant fp.flow assignment) are noted as intentional.

Previous findings: final status

# Issue Status
1 time.After timer leak in AddMsg ✅ Fixed
2 Test assertion mismatch (AckSequence) ✅ Fixed
3 Wrong description for ErrFastBatchNotEnabled ✅ Fixed
4 Error handler called while holding fp.mu ✅ Documented and intentional
5 Non-sentinel errors.New in Close ✅ Fixed — ErrEmptyBatch added
6 firstAckCh/initialErrCh not cleared on timeout ✅ Fixed
7 Dead subscribe fallback in commit() ✅ Accepted as-is
8 Redundant double-set of ExpectedLastSubjSeqHeader ✅ Fixed
9 Duplicate dead if err != nil in test ✅ Fixed
New 1 Test message "to be 21" vs condition != 201 ✅ Fixed
New 2 Uppercase error string in ackMsgHandler ✅ Fixed

New Nits

1. Remaining fast-batch error descriptions missing "fast" prefix — errors.go:85-91

The same issue fixed for ErrFastBatchNotEnabled (review round 1, #3) still exists for three sibling errors:

// errors.go:85
ErrFastBatchInvalidPattern: Description: "batch publish invalid pattern used"
// errors.go:88
ErrFastBatchInvalidID: Description: "batch publish ID is invalid (exceeds 64 characters)"
// errors.go:91
ErrFastBatchUnknownID: Description: "batch publish ID is unknown"

All three should read "fast batch publish ..." to be consistent with ErrFastBatchNotEnabled and to distinguish them from the regular batch-publish sentinels with similar descriptions.

Fix this →


2. commit() holds fp.mu during PublishMsgfastpublish.go:472

AddMsg (non-first-message path) releases the lock before calling PublishMsg (line 365→367), so the ack handler can run concurrently during the publish I/O. commit() does the opposite — it publishes while still holding the lock (line 472), delaying any in-flight ack handler until the publish completes. Not a correctness issue, but it's an inconsistency worth aligning with the AddMsg pattern.

Fix this →


Overall the implementation is solid. The flow-control mechanism, ping/retry logic, and cleanup paths are well handled. Ready to merge once the description nits are addressed.

@piotrpio piotrpio marked this pull request as ready for review March 26, 2026 16:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant