Add durable store runtime#22
Draft
djgrant wants to merge 3 commits into
Draft
Conversation
… unwrap proxies, and add complete test coverage
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds the durable store proposal and implementation for YieldStar.
SPEC.mddescribing the durable store API, selector-trackedonChange, default read policy, and runtime work.step.store(...)with durableget,select,update, and selector-trackedonChange.StepStoreWaitso workflows can suspend on store changes and resume via waiter wakeups.sdk.store(...)updates.Design Decisions
SQLite storage model
The SQLite runtime stores durable state in three tables:
stores: keyed by(store_name, store_id), withversionand serialized JSONstate.store_update_results: keyed by(store_name, store_id, idempotency_key), storing the serializedStoreUpdateResultfor idempotent external and workflow updates.store_waiters: keyed by(store_name, store_id, execution_id, step_key), storing the workflow/event identity,since_version, and trackedread_paths.This keeps the state row, idempotency ledger, and wakeup subscriptions separate. The split makes replay behavior explicit: store data can advance independently, while a repeated idempotency key returns the original update result without applying the mutation again.
Store updates and derived write paths
updateStorereads the current state, clones it into a mutable draft, runs the updater, validates the resulting state with the store schema, increments the version, and writes the new JSON state back tostores.Changed paths are derived by
diffStorePaths(previousState, nextState)rather than supplied by the caller. The diff recursively compares the old and new JSON-shaped state and records leaf paths that changed, normalizing array indexes into numeric path segments. This avoids exposing publicpathsonupdatewhile still giving the runtime enough detail to wake relevant waiters.onChangeimplementationstore.onChange(...)is implemented as a normal step-runner generator until the selector does not match. It yields aStepKey, checks the step cache, reads the current store snapshot, and runs the selector throughtrackStoreSelector.If the selector returns a truthy/non-null value, the step yields a
StepResultand completes like any other durable step. If it does not match, the step registers a waiter containing the original workflow event, the store key, the step key, the current store version, and the selector read paths. It then yieldsStepStoreWait.The workflow wrapper records
StepStoreWaitwithstepDone: false. That is the key detail: on replay, the wait step is treated as incomplete, so the selector is evaluated again instead of returning a cached wait response.Proxy read tracking
trackStoreSelectorwraps the cloned snapshot state in JavaScriptProxyobjects. Every string property read records the path that was accessed, then recursively wraps child objects so nested reads are tracked too. Proxies are cached in aWeakMapso the same object is not wrapped repeatedly.The proxy traps
setanddeletePropertyand throws if a selector mutates state. Selector functions are never serialized; only the paths read during the last non-matching evaluation are stored instore_waiters.Path intersection for wakeups
When a store update commits, the store client loads waiters for that
(store_name, store_id)and compares each waiter'sread_pathswith the update's derived write paths using prefix matching. A waiter wakes when either path is a prefix of the other.That means a selector that reads
messagescan wake when an update writesmessages[0], and a selector that readsprofile.namecan wake when an update replacesprofile. The wakeup is intentionally coarse; the workflow replay still re-runs the selector before returning a value.Runtime wakeup flow
When a waiter matches, the runtime deletes the waiter row and calls
schedulerClient.requestWakeUp(waiter.event). The stored event is the original workflow event serialized intostore_waiters, so the normal task queue path can resume the same execution without a special store-specific worker entrypoint.The resumed workflow replays from the start, hits the same
onChangestep key, sees the prior wait step as incomplete, reads the latest committed store state, and runs the selector again. If the selector now matches, it persists aStepResultand the workflow continues. If it still does not match, it registers a fresh waiter at the newer store version.Memory runtime mirrors SQLite semantics
The memory runtime implements the same contract with maps: committed store records, update-result records, and waiter records. It uses the same core helpers for cloning, validation, path diffing, selector tracking, and path intersection. This gives fast workflow-level tests while keeping behavior aligned with the SQLite durable runtime.
End-to-End Flow
yield* step.store(StoreDef, { id, initial }); the step creates or loads the store throughStoreClientand reconstructs aWorkflowStorehandle on replay.yield* store.onChange("next-message", state => state.messages.find(...)).messagesand persists a store waiter.StepStoreWait, which is cached as an incomplete step and returns control to the worker.store.update(...); the runtime applies the update, validates the result, increments the store version, records any idempotent update result, and derives changed paths from the before/after state.onChangestep, re-reads current store state, re-runs the selector, and either completes with aStepResultor waits again.Notes
BEGIN IMMEDIATEacrossawait params.updater(draft). If anyone does real async work in an updater, it holds the DB lock.Validation
bun testbun run bundle