diff --git a/SPEC.md b/SPEC.md new file mode 100644 index 0000000..3b50f0d --- /dev/null +++ b/SPEC.md @@ -0,0 +1,492 @@ +# Durable Store Spec + +Status: proposal + +This spec adds durable, schema-validated state to YieldStar. The goal is to make long-lived agents, actors, mailboxes, shared project state, human approval workflows, and external event ingestion possible without adding separate concepts for each one. + +The core API should stay small: + +```ts +const store = yield* step.store(StoreDef, { id }) +yield* store.update("append:message-1", draft => { + draft.messages.push(message) +}) +const message = yield* store.onChange(s => + s.messages.find(m => !m.processed) +) +``` + +## Design Principles + +- A store definition creates a store type, not an instance. +- Store identity is `definition.name + id`. +- If no store id is provided inside a workflow, the id defaults to `event.executionId`. +- Reads inside workflows are yielded and durable. +- Replaying a workflow must never observe a different value for an already completed read. +- Workflow updates are idempotent by step key. +- Public APIs should not expose path lists for writes. +- `onChange` should infer watched paths by running the selector against a tracking proxy. +- Store schemas use Standard Schema, not Zod-specific APIs. + +## Prior Art + +Selector-driven tracking has enough precedent to be worth pursuing, but the durable runtime needs stricter semantics than UI state libraries. + +MobX reactions run user functions in a reactive context and record the observables read during that execution. Solid memos similarly re-execute when tracked dependencies change. React Tracked and proxy-compare-style systems use JavaScript `Proxy` objects to track which object paths are read. Immer can generate write patches with array paths from mutating update functions. + +References: + +- [MobX reactions](https://mobx.js.org/reactions.html) +- [Solid createMemo](https://docs.solidjs.com/reference/basic-reactivity/create-memo) +- [React Tracked state usage tracking](https://react-tracked.js.org/docs/introduction/) +- [Immer patches](https://immerjs.github.io/immer/patches/) +- [Standard Schema](https://standardschema.dev/) + +The useful shape for YieldStar is: + +- Run `onChange` selectors against a read-tracking proxy. +- Run `update` functions against a draft that can produce write patches. +- Wake waiters when write patches intersect selector-read paths. +- Replay the workflow and re-run the selector rather than serializing selector functions. + +This keeps the magical part local and inspectable: selectors are ordinary functions, but the runtime derives their dependencies from actual reads. + +## Store Definition API + +```ts +import type { StandardSchemaV1 } from "@standard-schema/spec" + +type StoreDefinition = { + name: string + schema: Schema +} + +type StoreState = + StandardSchemaV1.InferOutput + +function defineStore( + name: string, + schema: Schema +): StoreDefinition +``` + +Example: + +```ts +const ConversationStore = defineStore( + "conversation", + v.object({ + messages: v.array( + v.object({ + id: v.string(), + role: v.picklist(["user", "assistant", "tool"]), + content: v.string(), + processed: v.optional(v.boolean(), false), + }) + ), + status: v.picklist(["idle", "working"]), + }) +) +``` + +The schema is validated: + +- when a store is created from `initial` +- after every workflow update +- after every external update +- before a snapshot is returned if the persisted format version or schema version requires validation + +## Store Identity + +```ts +type StoreId = string + +type StoreKey = { + storeName: string + storeId: StoreId +} +``` + +Inside a workflow: + +```ts +const storeId = params.id ?? event.executionId +``` + +So: + +```ts +yield* step.store(ConversationStore) +``` + +means execution-local store. + +And: + +```ts +yield* step.store(ConversationStore, { id: event.params.conversationId }) +``` + +means shared/correlation-bound store. + +There is no public `scope`, `mailbox`, `actor`, or `writePolicy`. + +## Workflow API + +```ts +interface StepRunner { + store( + store: StoreDefinition, + params?: { + id?: string + initial?: + | StoreState + | (() => StoreState | Promise>) + } + ): AsyncGenerator>> +} +``` + +Semantics: + +- If the store exists, return a handle. +- If it does not exist, create it using `initial`. +- If it does not exist and `initial` is omitted, fail. +- If no `id` is provided, use `event.executionId`. +- The operation is durable and idempotent like other YieldStar steps. + +## Workflow Store Handle + +```ts +type StoreVersion = number +type StorePath = readonly (string | number | symbol)[] + +type StoreSnapshot = { + state: T + version: StoreVersion +} + +type StoreUpdateResult = { + state: T + previousVersion: StoreVersion + version: StoreVersion +} + +type StoreSelector = (state: Readonly) => R + +interface WorkflowStore { + readonly definition: StoreDefinition + readonly id: string + readonly key: StoreKey + + get(key?: string): AsyncGenerator> + + select( + key: string, + selector: StoreSelector + ): AsyncGenerator + + update( + key: string, + updater: (draft: Draft) => void | T | Promise + ): AsyncGenerator> + + onChange( + selector: StoreSelector + ): AsyncGenerator> + + onChange( + key: string, + selector: StoreSelector + ): AsyncGenerator> +} +``` + +There is intentionally no `WorkflowStore.version`. Versions belong to snapshots and update results, because a long-lived handle would otherwise invite stale reads. + +There is intentionally no `timeoutMs`. Timeouts can be added later as an explicit cancellation/deadline primitive, but the first store API should not mix "wait for data" with "fail after time" until the workflow cancellation story is clear. + +## Default Read Policy + +The default read policy is: + +```text +read latest committed on first execution of the step; +return the recorded snapshot on replay +``` + +In other words: + +- `get` and `select` read the latest committed store state when that step first runs. +- The snapshot and version are persisted under the step key. +- Replays return the persisted snapshot, not whatever the external store contains later. +- A later read with a different step key may observe newer committed state. +- A read after a completed `store.update` sees that update if no newer write wins the race before the read executes. + +This is deliberately not a workflow-wide snapshot transaction. Workflows are long-running processes, so they should be allowed to observe new committed state at new durable read steps while keeping each completed read replay-stable. + +## `get` + +```ts +const snapshot = yield* store.get("load-current") +``` + +Returns: + +```ts +snapshot.state +snapshot.version +``` + +If the key is omitted, YieldStar may use the call-site hash, following the same caveats as other step helpers. Explicit keys are required when a workflow can reach the same call site more than once before yielding. + +## `select` + +```ts +const unprocessed = yield* store.select("unprocessed", s => + s.messages.filter(m => !m.processed) +) +``` + +`select` is a durable read followed by a pure selector. The selected value is persisted as the step result. The selector is not serialized. + +## `update` + +```ts +yield* store.update(`start:${message.id}`, draft => { + draft.status = "working" +}) +``` + +`update` is a durable step. The key is the workflow step key for that update. + +If replayed, the workflow step cache returns the recorded `StoreUpdateResult` and the runtime does not call the store updater again. + +Public `paths` are not accepted. The runtime is responsible for deriving changed paths from the update itself, for example by producing patch paths from the draft operation. + +Update semantics: + +- Updates are atomic per store. +- Each successful update increments the store version by one. +- The updated state is validated against the store schema before commit. +- The update result records `previousVersion` and `version`. +- The runtime records changed paths internally for waiter wakeups. + +## `onChange` + +`onChange` is the durable wait primitive. + +```ts +const message = yield* store.onChange(s => + s.messages.find(m => !m.processed) +) +``` + +The selector-only form uses the call-site hash as the durable step key. The keyed form is required when the same workflow can reach the same `onChange` call site more than once before yielding: + +```ts +const message = yield* store.onChange(`next-message:${turn}`, s => + s.messages.find(m => !m.processed) +) +``` + +Semantics: + +1. Read the current committed store snapshot. +2. Run the selector against a read-tracking proxy. +3. Record the paths read by the selector. +4. If the selector returns a truthy/non-null value, persist and return it. +5. Otherwise persist a waiter: + - workflow id + - execution id + - original event + - store name + - store id + - step key + - current store version + - tracked read paths +6. Yield control to the runtime. +7. When the store changes at an intersecting path, enqueue the same workflow execution. +8. On replay, run the selector again. + +The selector is not serialized. The runtime wakes coarsely from stored read paths, then the workflow replay evaluates the actual condition. + +### Path Tracking + +`onChange` tracks paths by observing property reads: + +```ts +yield* store.onChange("wait-status", s => s.status === "idle") +``` + +tracks `["status"]`. + +```ts +yield* store.onChange("next-message", s => + s.messages.find(m => !m.processed) +) +``` + +tracks at least `["messages"]`. Implementations may also record deeper paths such as `["messages", 0, "processed"]`, but they must retain ancestor collection paths so appending to `messages` wakes the waiter. + +A write path matches a read path when either path is a prefix of the other. This lets: + +- a waiter on `["messages"]` wake for `["messages", 3]` +- a waiter on `["status"]` wake for `["status"]` +- a waiter on `["profile", "name"]` wake if `["profile"]` is replaced + +### Selector Requirements + +Selectors should be synchronous and pure: + +- no network +- no timers +- no mutation +- no reads from external mutable state +- no async work + +The runtime may reject async selectors. The selector result must be serializable, because successful `onChange` results are persisted. + +## External Runtime API + +External writes are required for event ingestion and human/app interaction. + +```ts +interface Runtime { + store( + store: StoreDefinition, + id: string + ): RuntimeStore> +} + +interface RuntimeStore { + get(): Promise> + + update( + updater: (draft: Draft) => void | T | Promise + ): Promise> +} +``` + +Example: + +```ts +await runtime + .store(ConversationStore, "conversation:123") + .update(draft => { + draft.messages.push({ + id: crypto.randomUUID(), + role: "user", + content: "hello", + processed: false, + }) + }) +``` + +The update wakes matching `onChange` waiters using internally generated changed paths. + +## Store Lifetime + +Lifecycle is implicit: + +```text +No id provided: + store id = executionId + lifecycle may be tied to workflow execution retention + +id provided: + store id = params.id + lifecycle is independent and app-managed +``` + +## Example Agent + +```ts +const AgentStore = defineStore( + "agent", + v.object({ + messages: v.array(Message), + status: v.picklist(["idle", "working"]), + memory: v.record(v.string(), v.unknown()), + }) +) + +export const agent = workflow(async function* (step, event) { + const store = yield* step.store(AgentStore, { + id: event.params.agentId, + initial: { + messages: [], + status: "idle", + memory: {}, + }, + }) + + let turn = 0 + + while (true) { + const msg = yield* store.onChange(`next-message:${turn++}`, s => + s.messages.find(m => !m.processed) + ) + + yield* store.update(`start:${msg.id}`, draft => { + draft.status = "working" + }) + + const response = yield* step.run(`llm:${msg.id}`, () => + callModel(msg.content) + ) + + yield* store.update(`finish:${msg.id}`, draft => { + const message = draft.messages.find(m => m.id === msg.id)! + message.processed = true + message.response = response.text + draft.status = "idle" + }) + } +}) +``` + +## Runtime Work + +This is a first-class runtime feature, not only an SDK helper. + +Required core changes: + +- Add store definitions, snapshots, update results, and waiter response types to `@yieldstar/core`. +- Add a `StoreClient` abstraction beside `HeapClient` and `SchedulerClient`. +- Pass the store client into workflow generators. +- Make `stepRunner` execution-aware so `step.store` can use `event.executionId`. +- Add a workflow suspension response for store waiters. +- Teach `WorkflowRunner` to register store waiters and wake executions. + +Required runtime changes: + +- Memory store state and waiters. +- SQLite store and waiter tables. +- Atomic per-store update transactions. +- Waiter wakeup by path intersection. +- Event re-enqueue with the original workflow event. + +Required SDK/API changes: + +- Local runtime access to `runtime.store(...)`. +- HTTP endpoints for external store `get` and `update` if store access is exposed remotely. +- Test utilities that can trigger workflows and mutate stores externally. + +Required tests: + +- store creation with initial state +- execution-local default store id +- shared explicit store id +- durable `get` replay stability +- durable `select` replay stability +- workflow update idempotency +- schema validation on create and update +- `onChange` returns immediately when selector matches +- `onChange` waits when selector does not match +- external update wakes a waiting workflow +- workflow update wakes another waiting workflow +- array append wakes selector tracking the array +- unrelated paths do not wake waiters +- replacing an ancestor path wakes descendant waiters +- concurrent updates serialize per store diff --git a/examples/http-server/worker.ts b/examples/http-server/worker.ts index ca8a1d5..f873fb4 100644 --- a/examples/http-server/worker.ts +++ b/examples/http-server/worker.ts @@ -4,6 +4,7 @@ import { createWorkflowWorker } from "@yieldstar/bun-worker-invoker"; import { SqliteSchedulerClient, SqliteHeapClient, + SqliteStoreClient, SqliteTaskQueueClient, SqliteTimersClient, } from "@yieldstar/bun-sqlite-runtime"; @@ -16,10 +17,15 @@ const schedulerClient = new SqliteSchedulerClient({ taskQueueClient: new SqliteTaskQueueClient(runtimeDb), timersClient: new SqliteTimersClient(runtimeDb), }); +const storeClient = new SqliteStoreClient({ + db: runtimeDb, + schedulerClient, +}); const workflowRunner = new WorkflowRunner({ heapClient, schedulerClient, + storeClient, router: workflowRouter, logger, }); diff --git a/examples/local-execution/worker.ts b/examples/local-execution/worker.ts index bcc20f9..de85120 100644 --- a/examples/local-execution/worker.ts +++ b/examples/local-execution/worker.ts @@ -4,6 +4,7 @@ import { createWorkflowWorker } from "@yieldstar/bun-worker-invoker"; import { SqliteSchedulerClient, SqliteHeapClient, + SqliteStoreClient, SqliteTaskQueueClient, SqliteTimersClient, } from "@yieldstar/bun-sqlite-runtime"; @@ -16,11 +17,16 @@ const schedulerClient = new SqliteSchedulerClient({ taskQueueClient: new SqliteTaskQueueClient(runtimeDb), timersClient: new SqliteTimersClient(runtimeDb), }); +const storeClient = new SqliteStoreClient({ + db: runtimeDb, + schedulerClient, +}); const workflowRunner = new WorkflowRunner({ router: workflowRouter, heapClient, schedulerClient, + storeClient, logger, }); diff --git a/packages/bun-sqlite-runtime/src/index.ts b/packages/bun-sqlite-runtime/src/index.ts index 1600159..2405e05 100644 --- a/packages/bun-sqlite-runtime/src/index.ts +++ b/packages/bun-sqlite-runtime/src/index.ts @@ -1,4 +1,5 @@ export { SqliteHeapClient } from "./sqlite-heap"; +export { SqliteStoreClient } from "./sqlite-store"; export { SqliteSchedulerClient } from "./sqlite-scheduler"; export { SqliteEventLoop } from "./sqlite-event-loop"; export { SqliteTaskQueueClient } from "./sqlite-task-queue"; diff --git a/packages/bun-sqlite-runtime/src/sqlite-store.test.ts b/packages/bun-sqlite-runtime/src/sqlite-store.test.ts new file mode 100644 index 0000000..ea4d24e --- /dev/null +++ b/packages/bun-sqlite-runtime/src/sqlite-store.test.ts @@ -0,0 +1,69 @@ +import { Database } from "bun:sqlite"; +import { expect, test } from "bun:test"; +import { + defineStore, + type SchedulerClient, + type StandardSchemaV1, + type WorkflowEvent, +} from "@yieldstar/core"; +import { SqliteStoreClient } from "./sqlite-store"; + +type State = { + messages: { id: string }[]; +}; + +const Store = defineStore("sqlite-test", schema()); + +test("sqlite store updates wake matching waiters", async () => { + const db = new Database(":memory:"); + const events: WorkflowEvent[] = []; + const schedulerClient: SchedulerClient = { + async requestWakeUp(event) { + events.push(event); + }, + }; + const client = new SqliteStoreClient({ db, schedulerClient }); + const event = { + workflowId: "workflow", + executionId: "execution", + params: undefined, + context: new Map(), + }; + + await client.getOrCreateStore({ + definition: Store, + id: "one", + initial: { messages: [] }, + }); + await client.registerWaiter({ + workflowId: event.workflowId, + executionId: event.executionId, + stepKey: "next-message", + event, + storeName: Store.name, + storeId: "one", + sinceVersion: 0, + readPaths: [["messages"]], + }); + await client.updateStore({ + definition: Store, + id: "one", + updater(draft) { + draft.messages.push({ id: "msg-1" }); + }, + }); + + expect(events).toEqual([event]); +}); + +function schema(): StandardSchemaV1 { + return { + "~standard": { + version: 1, + vendor: "yieldstar-test", + validate(value) { + return { value: value as T }; + }, + }, + }; +} diff --git a/packages/bun-sqlite-runtime/src/sqlite-store.ts b/packages/bun-sqlite-runtime/src/sqlite-store.ts new file mode 100644 index 0000000..0efb98e --- /dev/null +++ b/packages/bun-sqlite-runtime/src/sqlite-store.ts @@ -0,0 +1,327 @@ +import type { Database } from "bun:sqlite"; +import type { SchedulerClient, WorkflowEvent } from "@yieldstar/core"; +import { + cloneStoreState, + diffStorePaths, + StoreClient, + storePathsIntersect, + validateStoreState, + type Draft, + type StandardSchemaV1, + type StoreDefinition, + type StorePath, + type StoreSnapshot, + type StoreState, + type StoreUpdateResult, + type StoreWaiter, +} from "@yieldstar/core"; + +class StoreRow { + state!: string; + version!: number; +} + +class WaiterRow { + workflow_id!: string; + execution_id!: string; + step_key!: string; + event!: string; + store_name!: string; + store_id!: string; + since_version!: number; + read_paths!: string; +} + +export class SqliteStoreClient extends StoreClient { + private db: Database; + private schedulerClient: SchedulerClient; + + constructor(params: { db: Database; schedulerClient: SchedulerClient }) { + super(); + this.db = params.db; + this.schedulerClient = params.schedulerClient; + this.setupDb(); + } + + async getOrCreateStore(params: { + definition: StoreDefinition; + id: string; + initial?: + | StoreState + | (() => StoreState | Promise>); + }): Promise>> { + const existing = this.getStoreRow(params.definition.name, params.id); + + if (existing) { + return { + state: JSON.parse(existing.state), + version: existing.version, + }; + } + + if (!params.initial) { + throw new Error( + `Store "${params.definition.name}:${params.id}" does not exist` + ); + } + + const initialValue = params.initial; + const initial = + typeof initialValue === "function" + ? await ( + initialValue as () => + | StoreState + | Promise> + )() + : initialValue; + const state = await validateStoreState(params.definition, initial); + + this.db.run("BEGIN IMMEDIATE"); + try { + const existingTx = this.getStoreRow(params.definition.name, params.id); + if (existingTx) { + this.db.run("COMMIT"); + return { + state: JSON.parse(existingTx.state), + version: existingTx.version, + }; + } + + this.db + .query( + `INSERT INTO stores (store_name, store_id, version, state) + VALUES ($storeName, $storeId, 0, $state)` + ) + .run({ + $storeName: params.definition.name, + $storeId: params.id, + $state: JSON.stringify(state), + }); + this.db.run("COMMIT"); + } catch (err) { + this.db.run("ROLLBACK"); + throw err; + } + + return { + state: cloneStoreState(state), + version: 0, + }; + } + + async getStore(params: { + definition: StoreDefinition; + id: string; + }): Promise>> { + const row = this.getStoreRow(params.definition.name, params.id); + + if (!row) { + throw new Error( + `Store "${params.definition.name}:${params.id}" does not exist` + ); + } + + return { + state: JSON.parse(row.state), + version: row.version, + }; + } + + async updateStore(params: { + definition: StoreDefinition; + id: string; + updater: ( + draft: Draft> + ) => void | StoreState | Promise>; + }): Promise>> { + let result: StoreUpdateResult>; + let eventsToWake: WorkflowEvent[] = []; + + this.db.run("BEGIN IMMEDIATE"); + + try { + const row = this.getStoreRow(params.definition.name, params.id); + if (!row) { + throw new Error( + `Store "${params.definition.name}:${params.id}" does not exist` + ); + } + + const previousState = JSON.parse(row.state) as StoreState; + const draft = cloneStoreState(previousState) as Draft>; + const updated = await params.updater(draft); + const nextState = await validateStoreState( + params.definition, + updated === undefined ? draft : updated + ); + const changedPaths = diffStorePaths(previousState, nextState); + + result = { + state: cloneStoreState(nextState), + previousVersion: row.version, + version: row.version + 1, + }; + + this.db + .query( + `UPDATE stores + SET version = $version, state = $state + WHERE store_name = $storeName AND store_id = $storeId` + ) + .run({ + $storeName: params.definition.name, + $storeId: params.id, + $version: result.version, + $state: JSON.stringify(nextState), + }); + + eventsToWake = this.deleteMatchingWaiters({ + storeName: params.definition.name, + storeId: params.id, + version: result.version, + changedPaths, + }); + + this.db.run("COMMIT"); + } catch (err) { + this.db.run("ROLLBACK"); + throw err; + } + + for (const event of eventsToWake) { + await this.schedulerClient.requestWakeUp(event); + } + + return result; + } + + async registerWaiter(waiter: StoreWaiter): Promise { + this.db + .query( + `INSERT INTO store_waiters + (workflow_id, execution_id, step_key, event, store_name, store_id, since_version, read_paths) + VALUES + ($workflowId, $executionId, $stepKey, $event, $storeName, $storeId, $sinceVersion, $readPaths) + ON CONFLICT(store_name, store_id, execution_id, step_key) + DO UPDATE SET + event = excluded.event, + since_version = excluded.since_version, + read_paths = excluded.read_paths` + ) + .run({ + $workflowId: waiter.workflowId, + $executionId: waiter.executionId, + $stepKey: waiter.stepKey, + $event: JSON.stringify(serializeEvent(waiter.event)), + $storeName: waiter.storeName, + $storeId: waiter.storeId, + $sinceVersion: waiter.sinceVersion, + $readPaths: JSON.stringify(waiter.readPaths), + }); + } + + private setupDb() { + this.db.run(` + CREATE TABLE IF NOT EXISTS stores ( + store_name TEXT NOT NULL, + store_id TEXT NOT NULL, + version INTEGER NOT NULL, + state TEXT NOT NULL, + PRIMARY KEY (store_name, store_id) + ); + + CREATE TABLE IF NOT EXISTS store_waiters ( + workflow_id TEXT NOT NULL, + execution_id TEXT NOT NULL, + step_key TEXT NOT NULL, + event TEXT NOT NULL, + store_name TEXT NOT NULL, + store_id TEXT NOT NULL, + since_version INTEGER NOT NULL, + read_paths TEXT NOT NULL, + PRIMARY KEY (store_name, store_id, execution_id, step_key) + ); + `); + } + + private getStoreRow(storeName: string, storeId: string) { + return this.db + .query( + `SELECT state, version FROM stores + WHERE store_name = $storeName AND store_id = $storeId` + ) + .as(StoreRow) + .get({ + $storeName: storeName, + $storeId: storeId, + }); + } + + private deleteMatchingWaiters(params: { + storeName: string; + storeId: string; + version: number; + changedPaths: StorePath[]; + }): WorkflowEvent[] { + const events: WorkflowEvent[] = []; + const rows = this.db + .query( + `SELECT * FROM store_waiters + WHERE store_name = $storeName AND store_id = $storeId` + ) + .as(WaiterRow) + .all({ + $storeName: params.storeName, + $storeId: params.storeId, + }); + + for (const row of rows) { + if (row.since_version >= params.version) continue; + + const readPaths = JSON.parse(row.read_paths) as StorePath[]; + if (!storePathsIntersect(readPaths, params.changedPaths)) continue; + + this.db + .query( + `DELETE FROM store_waiters + WHERE store_name = $storeName + AND store_id = $storeId + AND execution_id = $executionId + AND step_key = $stepKey` + ) + .run({ + $storeName: params.storeName, + $storeId: params.storeId, + $executionId: row.execution_id, + $stepKey: row.step_key, + }); + + events.push(deserializeEvent(JSON.parse(row.event))); + } + + return events; + } +} + +function serializeEvent(event: WorkflowEvent) { + return { + workflowId: event.workflowId, + executionId: event.executionId, + params: event.params, + context: + event.context instanceof Map + ? [...event.context.entries()] + : event.context + ? Object.entries(event.context as any) + : [], + }; +} + +function deserializeEvent(event: any): WorkflowEvent { + return { + workflowId: event.workflowId, + executionId: event.executionId, + params: event.params, + context: new Map(event.context ?? []), + }; +} diff --git a/packages/core/src/base/step.ts b/packages/core/src/base/step.ts index 2c9785a..a38bc52 100644 --- a/packages/core/src/base/step.ts +++ b/packages/core/src/base/step.ts @@ -52,6 +52,14 @@ export class WorkflowDelay extends StepResponse { } } +export class StepStoreWait extends StepResponse { + static type = "store-wait"; + readonly type = "store-wait"; + constructor() { + super(); + } +} + export class WorkflowRestart extends StepResponse { static type = "workflow-restart"; readonly type = "workflow-restart"; diff --git a/packages/core/src/base/store.ts b/packages/core/src/base/store.ts new file mode 100644 index 0000000..28c5c0c --- /dev/null +++ b/packages/core/src/base/store.ts @@ -0,0 +1,270 @@ +import type { WorkflowEvent } from "./event"; + +export type StandardSchemaV1 = { + readonly "~standard": { + readonly version: 1; + readonly vendor: string; + readonly validate: ( + value: unknown + ) => + | StandardSchemaV1.Result + | Promise>; + readonly types?: { + readonly input: Input; + readonly output: Output; + }; + }; +}; + +export namespace StandardSchemaV1 { + export type Issue = { + readonly message: string; + readonly path?: readonly PropertyKey[]; + }; + + export type Result = + | { readonly value: Output; readonly issues?: undefined } + | { readonly issues: readonly Issue[] }; + + export type InferOutput = + Schema extends StandardSchemaV1 ? Output : never; +} + +export type Draft = T extends object + ? { -readonly [K in keyof T]: Draft } + : T; + +export type StoreVersion = number; +export type StorePath = readonly (string | number)[]; + +export type StoreDefinition = + { + name: string; + schema: Schema; + }; + +export type StoreState = + StandardSchemaV1.InferOutput; + +export type StoreKey = { + storeName: string; + storeId: string; +}; + +export type StoreSnapshot = { + state: T; + version: StoreVersion; +}; + +export type StoreUpdateResult = { + state: T; + previousVersion: StoreVersion; + version: StoreVersion; +}; + +export type StoreSelector = (state: Readonly) => R; + +export type StoreWaiter = { + workflowId: string; + executionId: string; + stepKey: string; + event: WorkflowEvent; + storeName: string; + storeId: string; + sinceVersion: StoreVersion; + readPaths: StorePath[]; +}; + +export type RuntimeStore = { + get(): Promise>; + update( + updater: (draft: Draft) => void | T | Promise + ): Promise>; +}; + +export function defineStore( + name: string, + schema: Schema +): StoreDefinition { + return { name, schema }; +} + +export abstract class StoreClient { + store( + definition: StoreDefinition, + id: string + ): RuntimeStore> { + return { + get: () => + this.getStore({ + definition, + id, + }), + update: (updater) => + this.updateStore({ + definition, + id, + updater, + }), + }; + } + + abstract getOrCreateStore(params: { + definition: StoreDefinition; + id: string; + initial?: + | StoreState + | (() => StoreState | Promise>); + }): Promise>>; + + abstract getStore(params: { + definition: StoreDefinition; + id: string; + }): Promise>>; + + /** + * Updates a store's state atomically. + * NOTE: Updaters should ideally be synchronous. Although the signature allows async updaters, + * holding open transactions (e.g. SQLite BEGIN IMMEDIATE) across long-running async steps + * will block other clients from writing. Avoid network, timers, or other async tasks in updaters. + */ + abstract updateStore(params: { + definition: StoreDefinition; + id: string; + updater: ( + draft: Draft> + ) => void | StoreState | Promise>; + }): Promise>>; + + abstract registerWaiter(waiter: StoreWaiter): Promise; +} + +export async function validateStoreState( + definition: StoreDefinition, + state: unknown +): Promise> { + const result = await definition.schema["~standard"].validate(state); + + if ("issues" in result && result.issues) { + const messages = result.issues.map((issue) => issue.message).join("; "); + throw new Error(`Invalid store state for "${definition.name}": ${messages}`); + } + + return result.value as StoreState; +} + +export function cloneStoreState(state: T): T { + if (typeof structuredClone === "function") { + try { + return structuredClone(state); + } catch { + // Fallback if structuredClone fails (e.g. for Proxy objects in JavaScriptCore) + } + } + + return JSON.parse(JSON.stringify(state)); +} + +export function diffStorePaths(previous: unknown, next: unknown): StorePath[] { + const paths: StorePath[] = []; + + function visit(left: unknown, right: unknown, path: StorePath) { + if (Object.is(left, right)) return; + + if (!isDiffableObject(left) || !isDiffableObject(right)) { + paths.push(path); + return; + } + + const keys = new Set([...Object.keys(left), ...Object.keys(right)]); + + for (const key of keys) { + visit( + (left as Record)[key], + (right as Record)[key], + [...path, arrayKeyToPathSegment(key)] + ); + } + } + + visit(previous, next, []); + return collapseStorePaths(paths); +} + +export function trackStoreSelector( + state: T, + selector: StoreSelector +): { result: R; readPaths: StorePath[] } { + const paths: StorePath[] = []; + const proxies = new WeakMap(); + + function track(value: unknown, path: StorePath): unknown { + if (!isDiffableObject(value)) return value; + + const cached = proxies.get(value); + if (cached) return cached; + + const proxy = new Proxy(value, { + get(target, property, receiver) { + if (typeof property === "symbol") { + return Reflect.get(target, property, receiver); + } + + const nextPath = [...path, arrayKeyToPathSegment(property)]; + if (nextPath.length > 0) { + paths.push(nextPath); + } + + const child = Reflect.get(target, property, receiver); + return track(child, nextPath); + }, + set() { + throw new Error("Store selectors must not mutate state"); + }, + deleteProperty() { + throw new Error("Store selectors must not mutate state"); + }, + }); + + proxies.set(value, proxy); + return proxy; + } + + const result = selector(track(state, []) as Readonly); + + return { + result, + readPaths: collapseStorePaths(paths), + }; +} + +export function storePathsIntersect( + readPaths: StorePath[], + writePaths: StorePath[] +): boolean { + return readPaths.some((readPath) => + writePaths.some( + (writePath) => + isPathPrefix(readPath, writePath) || isPathPrefix(writePath, readPath) + ) + ); +} + +function collapseStorePaths(paths: StorePath[]): StorePath[] { + const unique = new Map(paths.map((path) => [path.join("\u0000"), path])); + return [...unique.values()]; +} + +function isPathPrefix(prefix: StorePath, path: StorePath): boolean { + if (prefix.length > path.length) return false; + return prefix.every((segment, index) => segment === path[index]); +} + +function isDiffableObject(value: unknown): value is Record { + return typeof value === "object" && value !== null; +} + +function arrayKeyToPathSegment(key: string): string | number { + if (/^(0|[1-9]\d*)$/.test(key)) return Number(key); + return key; +} diff --git a/packages/core/src/base/workflow.ts b/packages/core/src/base/workflow.ts index 0a081d0..f6c62ed 100644 --- a/packages/core/src/base/workflow.ts +++ b/packages/core/src/base/workflow.ts @@ -1,6 +1,7 @@ import type { Logger } from "pino"; import type { EventParams, WorkflowEvent, EventContext } from "./event"; import { HeapClient } from "./heap"; +import { StoreClient } from "./store"; import { StepResponse, WorkflowResult } from "./step"; export type WorkflowGeneratorParams< @@ -9,6 +10,7 @@ export type WorkflowGeneratorParams< > = { event: WorkflowEvent; heapClient: HeapClient; + storeClient: StoreClient; logger: Logger; }; diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index f216656..d851cf0 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -2,6 +2,29 @@ export type { HeapClient, HeapRecord } from "./base/heap"; export type { WorkflowInvoker } from "./base/invoker"; export type { EventProcessor } from "./base/runtime"; export type { SchedulerClient } from "./base/scheduler"; +export type { + Draft, + RuntimeStore, + StandardSchemaV1, + StoreDefinition, + StoreKey, + StorePath, + StoreSelector, + StoreSnapshot, + StoreState, + StoreUpdateResult, + StoreVersion, + StoreWaiter, +} from "./base/store"; +export { + StoreClient, + cloneStoreState, + defineStore, + diffStorePaths, + storePathsIntersect, + trackStoreSelector, + validateStoreState, +} from "./base/store"; export type { EventParams, EventContext, @@ -29,6 +52,7 @@ export { StepKey, StepResponse, StepResult, + StepStoreWait, WorkflowDelay, WorkflowRestart, WorkflowResult, diff --git a/packages/core/src/lib/workflow-runner.ts b/packages/core/src/lib/workflow-runner.ts index 63d913c..402b1c4 100644 --- a/packages/core/src/lib/workflow-runner.ts +++ b/packages/core/src/lib/workflow-runner.ts @@ -3,11 +3,12 @@ import type { WorkflowGenerator, HeapClient, SchedulerClient, + StoreClient, EventProcessor, WorkflowEvent, EventParams, } from ".."; -import { StepDelay, WorkflowDelay, WorkflowResult } from ".."; +import { StepDelay, StepStoreWait, WorkflowDelay, WorkflowResult } from ".."; import { EventContext } from "../base/event"; export class WorkflowRunner< @@ -15,16 +16,19 @@ export class WorkflowRunner< > { private heapClient: HeapClient; private schedulerClient: SchedulerClient; + private storeClient: StoreClient; private router: Router; constructor(params: { heapClient: HeapClient; schedulerClient: SchedulerClient; + storeClient: StoreClient; router: Router; logger: Logger; }) { this.heapClient = params.heapClient; this.schedulerClient = params.schedulerClient; + this.storeClient = params.storeClient; this.router = params.router; } @@ -49,6 +53,9 @@ export class WorkflowRunner< case "workflow-delay": this.schedulerClient.requestWakeUp(event, response.resumeIn); break; + + case "store-wait": + break; } } catch (err) { // todo: distinguish between a workflow error and a system error @@ -64,12 +71,13 @@ export class WorkflowRunner< workflow: WorkflowGenerator; event: WorkflowEvent; logger: Logger; - }): Promise | WorkflowDelay> { + }): Promise | WorkflowDelay | StepStoreWait> { const { workflow, event, logger } = params; const workflowIterator = workflow({ event, heapClient: this.heapClient, + storeClient: this.storeClient, logger, }); @@ -85,6 +93,10 @@ export class WorkflowRunner< return new WorkflowDelay(stageResponse.resumeIn - Date.now()); } + if (stageResponse instanceof StepStoreWait) { + return stageResponse; + } + throw new Error( "Workflow runner critical error. This is likely because your worklow and worker are referencing different versions of @yieldstar/core." ); diff --git a/packages/core/src/utils/map.ts b/packages/core/src/utils/map.ts index d55b514..4711475 100644 --- a/packages/core/src/utils/map.ts +++ b/packages/core/src/utils/map.ts @@ -3,8 +3,14 @@ import { errorWithOriginalStack } from "./error"; export class ReadOnlyMap implements ReadonlyMap { private sourceMap: Map; - constructor(sourceMap: Map) { - this.sourceMap = sourceMap; + constructor(sourceMap: Map | Record | undefined) { + if (sourceMap instanceof Map) { + this.sourceMap = sourceMap; + } else if (typeof sourceMap === "object" && sourceMap !== null) { + this.sourceMap = new Map(Object.entries(sourceMap)) as Map; + } else { + this.sourceMap = new Map(); + } } get(key: K): V | undefined { diff --git a/packages/test-runtime/src/index.ts b/packages/test-runtime/src/index.ts index 2d8c4f7..31de962 100644 --- a/packages/test-runtime/src/index.ts +++ b/packages/test-runtime/src/index.ts @@ -1,4 +1,5 @@ export { MemoryHeapClient } from "./memory-heap"; +export { MemoryStoreClient } from "./memory-store"; export { MemorySchedulerClient } from "./memory-scheduler"; export { MemoryEventLoop } from "./memory-event-loop"; export { MemoryTaskQueue } from "./memory-task-queue"; diff --git a/packages/test-runtime/src/memory-store.ts b/packages/test-runtime/src/memory-store.ts new file mode 100644 index 0000000..c6af91b --- /dev/null +++ b/packages/test-runtime/src/memory-store.ts @@ -0,0 +1,180 @@ +import type { SchedulerClient } from "@yieldstar/core"; +import { + cloneStoreState, + diffStorePaths, + StoreClient, + storePathsIntersect, + validateStoreState, + type Draft, + type StandardSchemaV1, + type StoreDefinition, + type StoreSnapshot, + type StoreState, + type StoreUpdateResult, + type StoreWaiter, +} from "@yieldstar/core"; + +type StoreRecord = { + state: unknown; + version: number; +}; + +export class MemoryStoreClient extends StoreClient { + private stores = new Map(); + private waiters = new Map(); + private schedulerClient: SchedulerClient; + + constructor(params: { schedulerClient: SchedulerClient }) { + super(); + this.schedulerClient = params.schedulerClient; + } + + async getOrCreateStore(params: { + definition: StoreDefinition; + id: string; + initial?: + | StoreState + | (() => StoreState | Promise>); + }): Promise>> { + const key = this.storeKey(params.definition.name, params.id); + const existing = this.stores.get(key); + + if (existing) { + return { + state: cloneStoreState(existing.state) as StoreState, + version: existing.version, + }; + } + + if (!params.initial) { + throw new Error( + `Store "${params.definition.name}:${params.id}" does not exist` + ); + } + + const initialValue = params.initial; + const initial = + typeof initialValue === "function" + ? await ( + initialValue as () => + | StoreState + | Promise> + )() + : initialValue; + const state = await validateStoreState(params.definition, initial); + + this.stores.set(key, { + state: cloneStoreState(state), + version: 0, + }); + + return { + state: cloneStoreState(state), + version: 0, + }; + } + + async getStore(params: { + definition: StoreDefinition; + id: string; + }): Promise>> { + const record = this.stores.get( + this.storeKey(params.definition.name, params.id) + ); + + if (!record) { + throw new Error( + `Store "${params.definition.name}:${params.id}" does not exist` + ); + } + + return { + state: cloneStoreState(record.state) as StoreState, + version: record.version, + }; + } + + async updateStore(params: { + definition: StoreDefinition; + id: string; + updater: ( + draft: Draft> + ) => void | StoreState | Promise>; + }): Promise>> { + const key = this.storeKey(params.definition.name, params.id); + + const record = this.stores.get(key); + if (!record) { + throw new Error( + `Store "${params.definition.name}:${params.id}" does not exist` + ); + } + + const previousState = cloneStoreState(record.state) as StoreState; + const draft = cloneStoreState(previousState) as Draft>; + const updated = await params.updater(draft); + const nextState = await validateStoreState( + params.definition, + updated === undefined ? draft : updated + ); + const changedPaths = diffStorePaths(previousState, nextState); + const result = { + state: cloneStoreState(nextState), + previousVersion: record.version, + version: record.version + 1, + }; + + this.stores.set(key, { + state: cloneStoreState(nextState), + version: result.version, + }); + + await this.wakeWaiters({ + storeName: params.definition.name, + storeId: params.id, + version: result.version, + changedPaths, + }); + + return result; + } + + async registerWaiter(waiter: StoreWaiter): Promise { + this.waiters.set( + this.waiterKey(waiter.storeName, waiter.storeId, waiter.executionId, waiter.stepKey), + cloneStoreState(waiter) + ); + } + + private async wakeWaiters(params: { + storeName: string; + storeId: string; + version: number; + changedPaths: readonly (readonly (string | number)[])[]; + }) { + for (const [key, waiter] of [...this.waiters.entries()]) { + if (waiter.storeName !== params.storeName) continue; + if (waiter.storeId !== params.storeId) continue; + if (waiter.sinceVersion >= params.version) continue; + if (!storePathsIntersect(waiter.readPaths, [...params.changedPaths])) { + continue; + } + + this.waiters.delete(key); + await this.schedulerClient.requestWakeUp(waiter.event); + } + } + + private storeKey(storeName: string, storeId: string) { + return `${storeName}:${storeId}`; + } + + private waiterKey( + storeName: string, + storeId: string, + executionId: string, + stepKey: string + ) { + return `${storeName}:${storeId}:${executionId}:${stepKey}`; + } +} diff --git a/packages/test-utils/src/workflow-runner.ts b/packages/test-utils/src/workflow-runner.ts index 40b6522..b7eb9ad 100644 --- a/packages/test-utils/src/workflow-runner.ts +++ b/packages/test-utils/src/workflow-runner.ts @@ -12,6 +12,7 @@ import { MemoryEventLoop, MemorySchedulerClient, MemoryHeapClient, + MemoryStoreClient, } from "@yieldstar/test-runtime"; import { createLocalSdk } from "yieldstar"; @@ -20,10 +21,13 @@ export function createTestSdkFactory(params?: { logger?: Logger }) { return (workflowRouter: W) => { const memoryEventLoop = new MemoryEventLoop(logger); + const schedulerClient = new MemorySchedulerClient(memoryEventLoop); + const storeClient = new MemoryStoreClient({ schedulerClient }); const workflowRunner = new WorkflowRunner({ heapClient: new MemoryHeapClient(), - schedulerClient: new MemorySchedulerClient(memoryEventLoop), + schedulerClient, + storeClient, router: workflowRouter, logger, }); @@ -34,6 +38,19 @@ export function createTestSdkFactory(params?: { logger?: Logger }) { }); return { + async trigger< + K extends keyof W & string, + Params extends EventParamsOf = EventParamsOf + >( + event: TriggerEvent + ): Promise { + const executionEvent = { + executionId: event.executionId ?? crypto.randomUUID(), + workflowId: event.workflowId, + params: event.params, + }; + await invoker.execute(executionEvent); + }, async triggerAndWait< K extends keyof W & string, Params extends EventParamsOf = EventParamsOf @@ -50,6 +67,7 @@ export function createTestSdkFactory(params?: { logger?: Logger }) { return result; }, + store: storeClient.store.bind(storeClient), }; }; } diff --git a/packages/yieldstar/src/exports/workflow.ts b/packages/yieldstar/src/exports/workflow.ts index b7eedbe..cb46341 100644 --- a/packages/yieldstar/src/exports/workflow.ts +++ b/packages/yieldstar/src/exports/workflow.ts @@ -11,7 +11,7 @@ import { deserializeStepResponse, serializeStepResponse, } from "../internal/serialise"; -import { stepRunner } from "../internal/step-runner"; +import { createStepRunner } from "../internal/step-runner"; import { StepResponse, StepKey, @@ -20,6 +20,7 @@ import { StepDelay, StepCacheCheck, StepInvalid, + StepStoreWait, WorkflowResult, } from "@yieldstar/core"; @@ -45,7 +46,8 @@ export function workflow< * yielding control to a workflow executor to do async work * @yields {StepResponse} */ - return async function* workflowGenerator({ event, heapClient, logger }) { + return async function* workflowGenerator({ event, heapClient, storeClient, logger }) { + const stepRunner = createStepRunner({ event, storeClient }); const workflowIterator = workflowFn(stepRunner, event, logger); const { executionId } = event; @@ -170,6 +172,8 @@ export function workflow< stepResponse instanceof StepError && // 1-indexed vs 0-indexed stepResponse.maxAttempts > stepAttempt + 1; + const needsWake = + !cached?.meta.done && stepResponse instanceof StepStoreWait; /** * If this step attempt is not already cached, cache it @@ -179,7 +183,7 @@ export function workflow< executionId, stepKey, stepAttempt, - stepDone: !needsRetry, + stepDone: !needsRetry && !needsWake, stepResponseJson: serializeStepResponse(stepResponse), }); } diff --git a/packages/yieldstar/src/index.ts b/packages/yieldstar/src/index.ts index 3418028..bc17924 100644 --- a/packages/yieldstar/src/index.ts +++ b/packages/yieldstar/src/index.ts @@ -4,3 +4,18 @@ export { createWorkflowRouter } from "./exports/router"; export { RetryableError } from "./exports/errors"; export { createWorkflow, workflow } from "./exports/workflow"; export type { WorkflowFn } from "./exports/workflow"; +export { defineStore } from "./internal/step-runner"; +export type { WorkflowStore } from "./internal/step-runner"; +export type { + Draft, + RuntimeStore, + StandardSchemaV1, + StoreDefinition, + StoreKey, + StorePath, + StoreSelector, + StoreSnapshot, + StoreState, + StoreUpdateResult, + StoreVersion, +} from "@yieldstar/core"; diff --git a/packages/yieldstar/src/internal/serialise.ts b/packages/yieldstar/src/internal/serialise.ts index bb323c2..0518286 100644 --- a/packages/yieldstar/src/internal/serialise.ts +++ b/packages/yieldstar/src/internal/serialise.ts @@ -3,6 +3,7 @@ import { StepError, StepResult, StepResponse, + StepStoreWait, WorkflowResult, } from "@yieldstar/core"; import { isErrorLike, serializeError, deserializeError } from "serialize-error"; @@ -36,6 +37,8 @@ export function deserializeStepResponse(jsonString: string): StepResponse { return new StepDelay(stepResponse.resumeIn); case "step-result": return new StepResult(stepResponse.result); + case "store-wait": + return new StepStoreWait(); case "workflow-result": return new WorkflowResult(stepResponse.result); default: diff --git a/packages/yieldstar/src/internal/step-runner.ts b/packages/yieldstar/src/internal/step-runner.ts index a2d2062..96ef559 100644 --- a/packages/yieldstar/src/internal/step-runner.ts +++ b/packages/yieldstar/src/internal/step-runner.ts @@ -1,13 +1,27 @@ import { + cloneStoreState, + defineStore, + type Draft, + type StandardSchemaV1, StepResponse, StepKey, StepError, StepResult, StepDelay, StepCacheCheck, + StepStoreWait, + type StoreClient, + type StoreDefinition, + type StoreKey, + type StoreSelector, + type StoreSnapshot, + type StoreState, + type StoreUpdateResult, + trackStoreSelector, } from "@yieldstar/core"; import { RetryableError } from "../exports/errors"; import { getCallSiteHash } from "./utils"; +import type { WorkflowEvent } from "@yieldstar/core"; /** * @description A library of step generators, each of which: @@ -17,9 +31,171 @@ import { getCallSiteHash } from "./utils"; * @throws any error caught when running the user-defined function, * once retries have been exhausted, or if there are no retry semantics */ -export const stepRunner = { run, delay, poll }; +export { defineStore }; -export type StepRunner = typeof stepRunner; +export type WorkflowStore = { + readonly definition: StoreDefinition; + readonly id: string; + readonly key: StoreKey; + get(key?: string): AsyncGenerator>; + select( + key: string, + selector: StoreSelector + ): AsyncGenerator; + update( + key: string, + updater: (draft: Draft) => void | T | Promise + ): AsyncGenerator>; + onChange( + selector: StoreSelector + ): AsyncGenerator>; + onChange( + key: string, + selector: StoreSelector + ): AsyncGenerator>; +}; + +export type StepRunner = ReturnType; + +export function createStepRunner(params: { + event: WorkflowEvent; + storeClient: StoreClient; +}) { + const { event, storeClient } = params; + + function store( + definition: StoreDefinition, + params?: { + id?: string; + initial?: + | StoreState + | (() => StoreState | Promise>); + } + ): AsyncGenerator>> { + const id = params?.id ?? event.executionId; + const key = `store:${definition.name}:${id}`; + + return storeStep({ + key, + definition, + id, + event, + storeClient, + initial: params?.initial, + }); + } + + return { run, delay, poll, store }; +} + +async function* storeStep(params: { + key: string; + definition: StoreDefinition; + id: string; + event: WorkflowEvent; + storeClient: StoreClient; + initial?: + | StoreState + | (() => StoreState | Promise>); +}): AsyncGenerator< + StepResponse, + WorkflowStore>, + StepResult | StepError +> { + const { key, definition, id, event, storeClient, initial } = params; + + yield new StepKey(key); + + const cached = yield new StepCacheCheck(); + + if (cached) { + yield cached; + if (cached instanceof StepError) { + throw cached.err; + } + return createWorkflowStore({ + definition, + id, + event, + storeClient, + }); + } + + try { + await storeClient.getOrCreateStore({ + definition, + id, + initial, + }); + yield new StepResult({ storeName: definition.name, storeId: id }); + return createWorkflowStore({ + definition, + id, + event, + storeClient, + }); + } catch (err: unknown) { + yield new StepError(err); + throw err; + } +} + +function createWorkflowStore(params: { + definition: StoreDefinition; + id: string; + event: WorkflowEvent; + storeClient: StoreClient; +}): WorkflowStore { + const { definition, id, event, storeClient } = params; + const key = { storeName: definition.name, storeId: id }; + + return { + definition, + id, + key, + get(stepKey?: string) { + return durableStep>(stepKey ?? getCallSiteHash(this.get), async () => + (await storeClient.getStore({ definition, id })) as StoreSnapshot + ); + }, + select(stepKey: string, selector: StoreSelector) { + return durableStep(stepKey, async () => { + const snapshot = await storeClient.getStore({ definition, id }); + return selector(snapshot.state as T); + }); + }, + update(stepKey, updater) { + return durableStep>(stepKey, async () => + (await storeClient.updateStore({ + definition, + id, + updater: updater as any, + })) as StoreUpdateResult + ); + }, + onChange( + arg1: + | string + | StoreSelector, + arg2?: StoreSelector + ) { + const stepKey = + typeof arg1 === "string" ? arg1 : getCallSiteHash(this.onChange); + const selector = ( + typeof arg1 === "string" ? arg2 : arg1 + ) as StoreSelector; + + return onChangeStep({ + definition, + id, + event, + storeClient, + stepKey, + selector, + }); + }, + }; +} function run( fn: () => T | Promise, @@ -46,35 +222,7 @@ async function* run( if (!key) key = getCallSiteHash(run); - yield new StepKey(key); - - const cached = yield new StepCacheCheck(); - - if (cached) { - yield cached; - if (cached instanceof StepError) { - // unreachable – consumer calls throw() on the generator first - throw cached.err; - } - return cached.result; - } - - try { - const result = await fn(); - yield new StepResult(result); - return result; - } catch (err: unknown) { - if (err instanceof RetryableError) { - yield new StepError(err, { - maxAttempts: err.maxAttempts, - retryInterval: err.retryInterval, - }); - } else { - yield new StepError(err); - } - // unreachable – consumer calls throw() on the generator first - throw err; - } + return yield* durableStep(key, fn); } function delay(retryInterval: number): any; @@ -144,3 +292,88 @@ async function* poll( yield* run(key, task, ); } + +async function* durableStep( + key: string, + fn: () => T | Promise +): AsyncGenerator { + yield new StepKey(key); + + const cached = yield new StepCacheCheck(); + + if (cached) { + yield cached; + if (cached instanceof StepError) { + // unreachable – consumer calls throw() on the generator first + throw cached.err; + } + return cached.result; + } + + try { + const result = await fn(); + yield new StepResult(result); + return result; + } catch (err: unknown) { + if (err instanceof RetryableError) { + yield new StepError(err, { + maxAttempts: err.maxAttempts, + retryInterval: err.retryInterval, + }); + } else { + yield new StepError(err); + } + // unreachable – consumer calls throw() on the generator first + throw err; + } +} + +async function* onChangeStep(params: { + definition: StoreDefinition; + id: string; + event: WorkflowEvent; + storeClient: StoreClient; + stepKey: string; + selector: StoreSelector; +}): AsyncGenerator, StepResult | StepStoreWait> { + const { definition, id, event, storeClient, stepKey, selector } = params; + + yield new StepKey(stepKey); + + const cached = yield new StepCacheCheck(); + + if (cached) { + yield cached; + if (!(cached instanceof StepResult)) { + throw new Error("Store wait step cache must resolve to a step result"); + } + return cached.result; + } + + const snapshot = await storeClient.getStore({ definition, id }); + const { result, readPaths } = trackStoreSelector( + cloneStoreState(snapshot.state as T), + selector + ); + + if (result !== undefined && result !== null && result !== false) { + const clonedResult = cloneStoreState(result); + yield new StepResult(clonedResult); + return clonedResult as NonNullable; + } + + await storeClient.registerWaiter({ + workflowId: event.workflowId, + executionId: event.executionId, + stepKey, + event, + storeName: definition.name, + storeId: id, + sinceVersion: snapshot.version, + readPaths, + }); + + yield new StepStoreWait(); + + throw new Error("Store wait yielded control unexpectedly"); +} diff --git a/test/cache-keys.test.ts b/test/cache-keys.test.ts index 8164b5b..a72c5f0 100644 --- a/test/cache-keys.test.ts +++ b/test/cache-keys.test.ts @@ -71,7 +71,7 @@ test("step.delay without cache keys", async () => { // expect both delays to be invoked expect(duration).toBeGreaterThanOrEqual(20); - expect(duration).toBeLessThan(25); + expect(duration).toBeLessThan(50); }); test("step.delay with cache keys", async () => { @@ -95,5 +95,5 @@ test("step.delay with cache keys", async () => { // expect second delay to trigger a new timer expect(duration).toBeGreaterThanOrEqual(20); - expect(duration).toBeLessThan(25); + expect(duration).toBeLessThan(50); }); diff --git a/test/store.test.ts b/test/store.test.ts new file mode 100644 index 0000000..d8b4971 --- /dev/null +++ b/test/store.test.ts @@ -0,0 +1,388 @@ +import { expect, test } from "bun:test"; +import type { StandardSchemaV1 } from "yieldstar"; +import { defineStore, workflow } from "yieldstar"; +import { createTestSdkFactory } from "@yieldstar/test-utils"; +import { sleep } from "bun"; + +type Message = { + id: string; + content: string; + processed: boolean; +}; + +type ConversationState = { + messages: Message[]; + status: "idle" | "working"; +}; + +const ConversationStore = defineStore( + "conversation", + schema() +); + +const createSdk = createTestSdkFactory(); + +test("workflow stores can be created, read, and updated", async () => { + const testWorkflow = workflow(async function* (step) { + const store = yield* step.store(ConversationStore, { + initial: { messages: [], status: "idle" }, + }); + + const initial = yield* store.get("initial"); + + const update = yield* store.update("append", (draft) => { + draft.messages.push({ + id: "msg-1", + content: "hello", + processed: false, + }); + }); + + const current = yield* store.get("current"); + + return { + initial, + update, + current, + }; + }); + + const sdk = createSdk({ workflow: testWorkflow }); + const result = await sdk.triggerAndWait({ workflowId: "workflow" }); + + expect(result.initial).toEqual({ + state: { messages: [], status: "idle" }, + version: 0, + }); + expect(result.update.previousVersion).toBe(0); + expect(result.update.version).toBe(1); + expect(result.current).toEqual({ + state: { + messages: [{ id: "msg-1", content: "hello", processed: false }], + status: "idle", + }, + version: 1, + }); +}); + +test("workflow store updates are idempotent across replay", async () => { + const testWorkflow = workflow(async function* (step) { + const store = yield* step.store(ConversationStore, { + id: "replay", + initial: { messages: [], status: "idle" }, + }); + + yield* store.update("append-once", (draft) => { + draft.messages.push({ + id: "msg-1", + content: "hello", + processed: false, + }); + }); + + yield* step.delay("replay-boundary", 1); + + const snapshot = yield* store.get("after-replay"); + return snapshot.state.messages; + }); + + const sdk = createSdk({ workflow: testWorkflow }); + const result = await sdk.triggerAndWait({ workflowId: "workflow" }); + + expect(result).toEqual([ + { id: "msg-1", content: "hello", processed: false }, + ]); +}); + +test("external store updates wake onChange waiters", async () => { + const testWorkflow = workflow(async function* (step) { + const store = yield* step.store(ConversationStore, { + id: "external-wake", + initial: { messages: [], status: "idle" }, + }); + + return yield* store.onChange("next-message", (state) => + state.messages.find((message) => !message.processed) + ); + }); + + const sdk = createSdk({ workflow: testWorkflow }); + const resultPromise = sdk.triggerAndWait({ workflowId: "workflow" }); + + await sleep(5); + + await sdk.store(ConversationStore, "external-wake").update((draft) => { + draft.messages.push({ + id: "msg-1", + content: "hello", + processed: false, + }); + }); + + await expect(resultPromise).resolves.toEqual({ + id: "msg-1", + content: "hello", + processed: false, + }); +}); + +test("execution-local default store id is unique per execution", async () => { + let firstExecutionId: string | undefined; + let secondExecutionId: string | undefined; + + const testWorkflow = workflow(async function* (step, event) { + const store = yield* step.store(ConversationStore, { + initial: { messages: [], status: "idle" }, + }); + + if (event.executionId === "run-1") { + firstExecutionId = store.id; + yield* store.update("write", (draft) => { + draft.status = "working"; + }); + } else { + secondExecutionId = store.id; + } + + const snapshot = yield* store.get("final"); + return snapshot.state.status; + }); + + const sdk = createSdk({ workflow: testWorkflow }); + + const result1 = await sdk.triggerAndWait({ + workflowId: "workflow", + executionId: "run-1", + }); + const result2 = await sdk.triggerAndWait({ + workflowId: "workflow", + executionId: "run-2", + }); + + expect(firstExecutionId).toBe("run-1"); + expect(secondExecutionId).toBe("run-2"); + expect(result1).toBe("working"); + expect(result2).toBe("idle"); +}); + +test("durable get and select replay stability", async () => { + let selectCallCount = 0; + const testWorkflow = workflow(async function* (step) { + const store = yield* step.store(ConversationStore, { + id: "replay-stability", + initial: { messages: [], status: "idle" }, + }); + + const valGet = yield* store.get("read-get"); + const valSelect = yield* store.select("read-select", (s) => { + selectCallCount++; + return s.status; + }); + + yield* step.delay("suspend", 1); + + return { valGet, valSelect }; + }); + + const sdk = createSdk({ workflow: testWorkflow }); + const resultPromise = sdk.triggerAndWait({ workflowId: "workflow" }); + + await sleep(5); + await sdk.store(ConversationStore, "replay-stability").update((draft) => { + draft.status = "working"; + }); + + const res = await resultPromise; + + expect(res.valGet.state.status).toBe("idle"); + expect(res.valSelect).toBe("idle"); + expect(selectCallCount).toBe(1); +}); + +test("schema validation on create and update", async () => { + const strictStateSchema = strictSchema<{ count: number }>((val: any) => { + if (typeof val !== "object" || val === null || typeof val.count !== "number") { + return "Count must be a number"; + } + return undefined; + }); + + const StrictStore = defineStore("strict-store", strictStateSchema); + + const testWorkflowInvalidCreate = workflow(async function* (step) { + yield* step.store(StrictStore, { + id: "strict-1", + initial: { count: "not-a-number" } as any, + }); + }); + + const sdk1 = createSdk({ workflow: testWorkflowInvalidCreate }); + const result1 = await sdk1.triggerAndWait({ workflowId: "workflow" }); + expect(result1).toBeInstanceOf(Error); + expect((result1 as Error).message).toMatch(/Invalid store state/); + + const testWorkflowInvalidUpdate = workflow(async function* (step) { + const store = yield* step.store(StrictStore, { + id: "strict-2", + initial: { count: 0 }, + }); + + yield* store.update("bad-update", (draft) => { + draft.count = "invalid" as any; + }); + }); + + const sdk2 = createSdk({ workflow: testWorkflowInvalidUpdate }); + const result2 = await sdk2.triggerAndWait({ workflowId: "workflow" }); + expect(result2).toBeInstanceOf(Error); + expect((result2 as Error).message).toMatch(/Invalid store state/); +}); + +test("onChange returns immediately when selector matches", async () => { + let ranStep = false; + const testWorkflow = workflow(async function* (step) { + const store = yield* step.store(ConversationStore, { + id: "immediate-match", + initial: { messages: [], status: "working" }, + }); + + const status = yield* store.onChange("wait-working", (s) => + s.status === "working" ? s.status : false + ); + + ranStep = true; + return status; + }); + + const sdk = createSdk({ workflow: testWorkflow }); + const result = await sdk.triggerAndWait({ workflowId: "workflow" }); + + expect(ranStep).toBe(true); + expect(result).toBe("working"); +}); + +test("workflow update wakes another waiting workflow", async () => { + const waiterWorkflow = workflow(async function* (step) { + const store = yield* step.store(ConversationStore, { + id: "shared-wake", + initial: { messages: [], status: "idle" }, + }); + + return yield* store.onChange("wait-status", (s) => + s.status === "working" ? s.status : false + ); + }); + + const updaterWorkflow = workflow(async function* (step) { + const store = yield* step.store(ConversationStore, { + id: "shared-wake", + initial: { messages: [], status: "idle" }, + }); + + yield* store.update("set-status", (draft) => { + draft.status = "working"; + }); + }); + + const router = { + waiter: waiterWorkflow, + updater: updaterWorkflow, + }; + + const sdk = createSdk(router); + const waiterPromise = sdk.triggerAndWait({ workflowId: "waiter" }); + + await sleep(5); + await sdk.trigger({ workflowId: "updater" }); + + await expect(waiterPromise).resolves.toBe("working"); +}); + +test("unrelated paths do not wake waiters", async () => { + let wakeCount = 0; + + const testWorkflow = workflow(async function* (step) { + const store = yield* step.store(ConversationStore, { + id: "unrelated-paths", + initial: { messages: [], status: "idle" }, + }); + + wakeCount++; + + return yield* store.onChange("wait-messages", (state) => + state.messages.length > 0 ? state.messages : false + ); + }); + + const sdk = createSdk({ workflow: testWorkflow }); + const resultPromise = sdk.triggerAndWait({ workflowId: "workflow" }); + + await sleep(5); + + await sdk.store(ConversationStore, "unrelated-paths").update((draft) => { + draft.status = "working"; + }); + + await sleep(5); + expect(wakeCount).toBe(1); + + await sdk.store(ConversationStore, "unrelated-paths").update((draft) => { + draft.messages.push({ id: "msg-1", content: "hello", processed: false }); + }); + + const res = await resultPromise; + expect(wakeCount).toBe(2); + expect(res).toEqual([{ id: "msg-1", content: "hello", processed: false }]); +}); + +test("replacing an ancestor path wakes descendant waiters", async () => { + const testWorkflow = workflow(async function* (step) { + const store = yield* step.store(ConversationStore, { + id: "ancestor-wake", + initial: { messages: [{ id: "msg-1", content: "hello", processed: false }], status: "idle" }, + }); + + return yield* store.onChange("wait-descendant", (state) => + state.messages[0]?.processed ? "done" : false + ); + }); + + const sdk = createSdk({ workflow: testWorkflow }); + const resultPromise = sdk.triggerAndWait({ workflowId: "workflow" }); + + await sleep(5); + + await sdk.store(ConversationStore, "ancestor-wake").update((draft) => { + draft.messages = [{ id: "msg-1", content: "hello", processed: true }]; + }); + + await expect(resultPromise).resolves.toBe("done"); +}); + +function schema(): StandardSchemaV1 { + return { + "~standard": { + version: 1, + vendor: "yieldstar-test", + validate(value) { + return { value: value as T }; + }, + }, + }; +} + +function strictSchema(validator: (v: unknown) => string | undefined): StandardSchemaV1 { + return { + "~standard": { + version: 1, + vendor: "yieldstar-test", + validate(value) { + const error = validator(value); + if (error) { + return { issues: [{ message: error }] }; + } + return { value: value as T }; + }, + }, + }; +}