Skip to content

Comments

Refine streaming parser handling and cache cleanup#24

Merged
christopherkarani merged 2 commits intomainfrom
codex/fix-framework-audit-20260219
Feb 25, 2026
Merged

Refine streaming parser handling and cache cleanup#24
christopherkarani merged 2 commits intomainfrom
codex/fix-framework-audit-20260219

Conversation

@christopherkarani
Copy link
Owner

  • Summary
    • Harden SSE parsing in Anthropic, OpenAI, and HuggingFace providers to better handle done markers, emit completion chunks, and avoid infinite waits
    • Close out Responses tool call accumulators via a shared finalization path and recover malformed JSON arguments with JsonRepair
    • Tie MLX cache evictions to asynchronous cleanup so in-flight model metadata stays consistent
    • Improve JsonRepair context detection plus parity test expectations for the SSE parser
  • Testing
    • Not run (not requested)

@chatgpt-codex-connector
Copy link

You have reached your Codex usage limits for code reviews. You can see your limits in the Codex usage dashboard.

@christopherkarani
Copy link
Owner Author

@claude review

@claude
Copy link

claude bot commented Feb 19, 2026

Claude finished @christopherkarani's task in 3m 53s —— View job


Code Review

  • Gather context and read changed files
  • Review Anthropic SSE streaming changes
  • Review OpenAI streaming changes
  • Review HuggingFace streaming changes
  • Review MLX cache changes
  • Review JsonRepair changes
  • Review SSE parser and test changes
  • Post comprehensive review

Summary

Overall this is a well-structured hardening PR. The core ideas — flushing the SSE parser at EOF, emitting a synthetic completion chunk to prevent consumer hangs, and sharing the tool-call finalization path — are all sound. A few issues are worth addressing before merging.


Issues

🔴 HFInferenceClient — Unbounded error body collection

File: Sources/Conduit/Providers/HuggingFace/HFInferenceClient.swift:531–537

var errorData = Data()
for try await byte in bytes {
    errorData.append(byte)
}
try handleHTTPError(statusCode: httpResponse.statusCode, data: errorData, response: httpResponse)

Unlike the Anthropic (10_000 byte cap) and OpenAI (10_000 byte cap) error paths, the HuggingFace streaming error path collects bytes with no size limit. A slow or malicious server can drive unbounded heap allocation here. The fix mirrors what the other two providers already do.

Fix this →


🟡 OpenAI Chat Completions — Tool calls silently dropped on bare [DONE]

File: Sources/Conduit/Providers/OpenAI/OpenAIProvider+Streaming.swift:269–273

func processEventData(_ jsonStr: String) -> Bool {
    if jsonStr == "[DONE]" {
        continuation.finish()   // ← calls finish() and …
        return true             // … causes early return from performStreamingGeneration
    }
    // …
}

When [DONE] arrives and processEventData returns true, performStreamingGeneration executes an early return (lines 480 / 487). This skips the orphaned tool-call flush at lines 491–503. So if a server sends [DONE] without a prior finish_reason chunk while toolCallAccumulators is non-empty, those tool calls are silently lost.

The OpenAI spec guarantees a finish_reason chunk before [DONE], so this is only triggered by non-conformant servers, but the Responses path (which does route through finalizeToolCalls() inside .completed) already handles it correctly.

Suggested fix: move finalizeAccumulatedToolCalls() + the tool-call yield inside processEventData when [DONE] is seen, or check toolCallAccumulators before the early return.

Fix this →


🟡 MLXModelCachecurrentModelId cleared on eviction of active model

File: Sources/Conduit/Providers/MLX/MLXModelCache.swift:314–320

private func handleEviction(of modelId: String) {
    cachedModelIds.remove(modelId)
    modelSizes.removeValue(forKey: modelId)
    if currentModelId == modelId {
        currentModelId = nil   // ← clears the "active" pointer
    }
}

The handleEviction task is dispatched asynchronously. Between the NSCache eviction and the task executing, there's a window where currentModelId still points to an evicted model. Any caller who reads currentModelId in that window and tries to use the model pointer will get a cache miss from get(), which is handled gracefully — but statistics (cacheStats()) will report a model as "active" when it's already gone.

More importantly, MLXModelLoader.loadModel now (correctly, per the +1 change) calls setCurrentModel on a cache hit, which re-anchors the pointer. This significantly reduces the window. Still, consider whether currentModelId should be cleared eagerly in get() / contains() on a miss, consistent with how cachedModelIds and modelSizes are cleaned up there.


🟡 Anthropic — Incomplete tool calls omitted from the synthetic completion chunk

File: Sources/Conduit/Providers/Anthropic/AnthropicProvider+Streaming.swift:429–431

if !didEmitCompletionChunk {
    continuation.yield(GenerationChunk.completion(finishReason: .stop))
}

completedToolCalls is not threaded into this fallback chunk. In the normal path, tool calls arrive in messageDelta. If the connection drops after some contentBlockStop events have added items to completedToolCalls but before messageDelta fires, those calls are completed but not surfaced. This path should include completedToolCalls (and clear any leftover activeToolCalls).


Minor / Observations

Redundant continuation.finish() in OpenAI Chat Completions path (processEventData line 271 + performStreamingGeneration line 505): Multiple calls to .finish() are idempotent on AsyncThrowingStream.Continuation, so this is safe. Noting it only for clarity.

seenFields: Set<String> allocation per event (ServerSentEventParser): For very high-frequency SSE streams, replacing Set<String> with four Bool flags (seenData, seenEvent, seenId, seenRetry) eliminates repeated heap allocations. Not a correctness issue, just a performance note.

JsonRepair.removeIncompleteKeyValuePairs + removeTrailingCommasBeforeClosingBrackets — two O(n) passes over potentially large JSON. Bounded by the 100 KB cap already in place, so not a practical concern.


What's Good

  • sseParser.finish() call added to all three providers — correct fix for partial events at EOF; the Anthropic and OpenAI implementations now properly handle the sseParser.finish() loop after the byte stream exhausts.
  • Synthetic completion chunk guards downstream consumers against hanging indefinitely when the stream ends abnormally.
  • Shared finalizeAccumulatedToolCalls() in the OpenAI Chat Completions path eliminates the previous copy-paste finalization logic and makes JsonRepair recovery consistent.
  • JsonRepair.findContext is a clean fix for the array-vs-object ambiguity that previously caused valid array elements to be incorrectly stripped as incomplete keys. The new test arrayContextWithBraceCharactersInStrings pins the regression.
  • MLXModelLoader cache-hit path now calls setCurrentModel — this was the missing line (+1) that kept currentModelId stale on repeated cache hits.
  • SSE parity test EOF flushes empty data-only — important edge case pinned in the test suite.
  • Responses API tool-call accumulation by call ID (vs. index) is more robust against non-sequential event ordering.

@claude
Copy link

claude bot commented Feb 19, 2026

Code Review: Refine streaming parser handling and cache cleanup

Overall this is a solid, targeted set of fixes. The refactoring reduces meaningful duplication, and the correctness fixes (SSE parser, JsonRepair, cache eviction) are well-scoped. A few items worth discussing before merge:


Issues

1. Tool call finalization condition is semantically wider than intended

OpenAIProvider+Streaming.swift

// Before
let isToolCallsComplete = finishReason == .toolCalls || finishReason == .toolCall

// After
let shouldFinalizeToolCalls = finishReason != nil && !toolCallAccumulators.isEmpty

The new condition finalizes accumulators on any non-nil finish reason, including .stop, .length, .contentFilter, etc. If a server ever sends finishReason: .stop while accumulators are populated (e.g. a partial/abandoned tool call mid-stream), the code will silently emit those partial tool calls as if they were complete. The original condition was narrow on purpose. Consider keeping it narrow or at least logging a warning when finalizing on a non-tool finish reason:

if finishReason == .toolCalls || finishReason == .toolCall || finishReason == .stop {
    // only finalize on expected terminal reasons
}

2. seenFields reset in ServerSentEventParser

The diff for ServerSentEventParser.swift only shows the finish() change. For seenFields to work correctly it must be:

  • Declared as a mutating property on the struct
  • Populated in ingestLine when a data: field is parsed (even with empty value)
  • Cleared when an event is dispatched (alongside currentData, currentEventId, etc.)

If seenFields is not reset between events, all events after the first data: field will incorrectly appear to have an explicit data field. Please confirm the reset is in place (it's not visible in the diff as presented).

3. Post-loop continuation.finish() called unconditionally in Responses path

In streamResponsesAPI, the post-loop path is:

for event in sseParser.finish() {
    if processResponsesEventData(event.data) {
        continuation.finish()
        return
    }
}

if !toolAccumulatorsByID.isEmpty {
    // yield a completion chunk
}

continuation.finish()   // <-- always called here

If processResponsesEventData handles a .completed event inside sseParser.finish(), it returns true, and the caller calls continuation.finish() and returns — correct. But if the loop exits normally, continuation.finish() is called at the end. The question is: is it possible for a .completed event to be processed during the main loop (setting internal state that marks the stream as done), and then the post-loop continuation.finish() to be called a second time? AsyncThrowingStream.Continuation.finish() is documented as idempotent so this won't crash, but it's worth a brief comment explaining the intent.

4. Silently swallowing decode errors in HFInferenceClient

if let chunk = try? decoder.decode(HFChatCompletionResponse.self, from: data) {
    continuation.yield(chunk)
}

This was present before the PR, but the refactor is a good opportunity to add logger.debug(...) on the else path. Silent drops make production debugging very difficult.


Suggestions (non-blocking)

MLXModelCache.swift — document accepted race window

The eviction callback fires on NSCache's internal queue, and handleEviction(of:) runs asynchronously on the actor. There's a brief window where cachedModelIds still contains a model ID whose underlying CachedModel has already been evicted by NSCache. A short // NSCache eviction is async; a brief window exists where cachedModelIds may contain an already-evicted entry. comment would help future readers not be alarmed.

OpenAIProvider+Streaming.swiftbuildReasoningDetails() called after post-loop finalizeAccumulatedToolCalls()

In the post-loop block:

if !toolCallAccumulators.isEmpty {
    finalizeAccumulatedToolCalls()
    let completedReasoningDetails = buildReasoningDetails()
    ...
}

completedToolCalls at this point may already contain tool calls from an earlier in-loop finalization. Verify that the yielded completedToolCalls slice is intentionally cumulative and not double-reporting any entries.


Test Coverage Gaps

The changes that lack test coverage:

  • Anthropic/OpenAI/HuggingFace streaming paths: no test for the "stream ends without [DONE]" case (the main motivation for calling parser.finish())
  • MLX cache eviction: no test that handleEviction(of:) correctly updates cachedModelIds, modelSizes, and currentModelId
  • Tool call finalization with non-tool finish reason (the semantic change in condition Feature/linux compatibility #1 above)

The JsonRepairTests addition and ServerSentEventParserParityTests update are both good and directly cover the bugs being fixed.


Minor

  • The processSSEEventData / processSSEEvent local function names are slightly inconsistent across providers (_Data suffix on Anthropic/OpenAI, no suffix on HuggingFace). Not a bug, just worth noting for searchability.
  • finalizeAccumulatedToolCalls correctly uses keepingCapacity: true — good micro-optimisation for cases where tool calls are repeatedly streamed.

@christopherkarani christopherkarani merged commit 18d6258 into main Feb 25, 2026
1 of 4 checks passed
@claude
Copy link

claude bot commented Feb 25, 2026

PR Review: Refine streaming parser handling and cache cleanup

Thanks for this set of fixes. The overall direction is solid: draining buffered SSE events after the byte stream closes, deduplicating completion chunks, and wiring up NSCache evictions to actor state cleanup are all valuable improvements. A few issues need attention before merge.


Critical: Duplicate switch cases in JsonRepair.swift (won't compile)

File: Sources/Conduit/Utilities/JsonRepair.swift

The diff inserts new case statements for '[', ']', and an expanded '}' into the switch, but the original case '[': and its body are left as context lines rather than being deleted. The resulting file contains a duplicate case '[:

case "[":          // NEW (added in hunk)
    bracketStack.append("[")
case "]":
    if bracketStack.last == "[" {
        bracketStack.removeLast()
    }
    if bracketStack.last == "{" { bracketStack.removeLast() }  // old body, still present
case "[":          // OLD context line still present — DUPLICATE
    bracketStack.append("[")

Swift rejects duplicate switch cases at compile time, so this branch will not build. The old single-line case bodies need to be marked as deleted lines in the patch.

There is also a logic error in the resulting case ']': after popping '[' from the stack, the code immediately tries to also pop '{'. This would erroneously collapse a brace context on every ']' encountered.

Fix: mark the original 'if bracketStack.last == "{"' body and the old 'case "[":' as deleted lines.


Critical: seenFields in ServerSentEventParser.finish() — verify the property exists

File: Sources/Conduit/Utilities/ServerSentEventParser.swift

finish() now references seenFields.contains("data"):

let hasExplicitEmptyDataField = currentData.isEmpty && seenFields.contains("data")

The diff does not show where seenFields is declared or populated. If it is new, its declaration and the insertion logic inside ingestLine must also be part of this PR — without it the build fails. If it already exists in the file, a brief note confirming that would help reviewers.

The behavior change itself (dispatching an event with empty-string data when 'data:' was present but blank) aligns with the SSE spec, so the intent is correct — but consumers that currently assume data payloads are always non-empty should be audited.


Concern: shouldFinalizeToolCalls is broader than intended

File: Sources/Conduit/Providers/OpenAI/OpenAIProvider+Streaming.swift

Old guard:
let isToolCallsComplete = finishReason == .toolCalls || finishReason == .toolCall

New guard:
let shouldFinalizeToolCalls = finishReason != nil && !toolCallAccumulators.isEmpty

This now finalizes accumulated (possibly partial) tool calls on any terminal finish reason including .stop, .length, .contentFilter, and error variants. A model that starts streaming a tool call and then halts mid-stream due to a content-policy hit would cause this path to attempt JSON repair and yield a synthesized tool-call chunk. The consumer would receive a fabricated tool invocation for a generation the model explicitly did not complete.

Suggested approach: retain the original .toolCalls / .toolCall guard for the in-stream path, and keep the new end-of-stream fallback (the block after the loop) only for the case where the stream ended without any finish reason at all.


Minor: Redundant continuation.finish() in the Responses completed path

File: Sources/Conduit/Providers/OpenAI/OpenAIProvider+Streaming.swift

When processResponsesEventData returns true for a .completed event, the call site calls continuation.finish() explicitly. Since continuation.finish() already sits unconditionally at the bottom of the outer function, removing the explicit call here and letting control fall through would simplify the flow without changing behaviour.


What looks good

  • MLXModelCache eviction wiring: Clear improvement over the previous no-op comment. Storing modelId on CachedModel and dispatching async cleanup via Task { await self?.handleEviction(of:) } is the correct pattern for bridging an NSCache callback into an actor. The [weak self] capture in init is correct: evictions after deallocation safely no-op.

  • SSE finish() drain: Calling sseParser.finish() after the byte loop is the right fix for providers that truncate the stream without a [DONE] marker.

  • Completion chunk deduplication: didEmitCompletionChunk in the Anthropic provider prevents the double-completion that previously could occur when a provider-sent completion event was followed by the hard-coded GenerationChunk.completion(finishReason: .stop).

  • finalizeAccumulatedToolCalls extraction: The JSON-repair fallback was duplicated in-line; extracting it is a clear readability win.

  • JsonRepair guard + test: The guard !chars.isEmpty, idx >= 0 early-return and the new test for brace characters inside array string literals are solid defensive additions (once the duplicate-case issue is resolved).


Summary

Item Severity
Duplicate case "[" / dead code in JsonRepair.findContext Build-breaking
seenFields reference not visible in diff Likely build-breaking
shouldFinalizeToolCalls fires on non-tool finish reasons Correctness concern
Redundant continuation.finish() in responses loop Minor

Happy to help clarify any of the above.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant