Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
492 changes: 492 additions & 0 deletions SPEC.md

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions examples/http-server/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { createWorkflowWorker } from "@yieldstar/bun-worker-invoker";
import {
SqliteSchedulerClient,
SqliteHeapClient,
SqliteStoreClient,
SqliteTaskQueueClient,
SqliteTimersClient,
} from "@yieldstar/bun-sqlite-runtime";
Expand All @@ -16,10 +17,15 @@ const schedulerClient = new SqliteSchedulerClient({
taskQueueClient: new SqliteTaskQueueClient(runtimeDb),
timersClient: new SqliteTimersClient(runtimeDb),
});
const storeClient = new SqliteStoreClient({
db: runtimeDb,
schedulerClient,
});

const workflowRunner = new WorkflowRunner({
heapClient,
schedulerClient,
storeClient,
router: workflowRouter,
logger,
});
Expand Down
6 changes: 6 additions & 0 deletions examples/local-execution/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { createWorkflowWorker } from "@yieldstar/bun-worker-invoker";
import {
SqliteSchedulerClient,
SqliteHeapClient,
SqliteStoreClient,
SqliteTaskQueueClient,
SqliteTimersClient,
} from "@yieldstar/bun-sqlite-runtime";
Expand All @@ -16,11 +17,16 @@ const schedulerClient = new SqliteSchedulerClient({
taskQueueClient: new SqliteTaskQueueClient(runtimeDb),
timersClient: new SqliteTimersClient(runtimeDb),
});
const storeClient = new SqliteStoreClient({
db: runtimeDb,
schedulerClient,
});

const workflowRunner = new WorkflowRunner({
router: workflowRouter,
heapClient,
schedulerClient,
storeClient,
logger,
});

Expand Down
1 change: 1 addition & 0 deletions packages/bun-sqlite-runtime/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export { SqliteHeapClient } from "./sqlite-heap";
export { SqliteStoreClient } from "./sqlite-store";
export { SqliteSchedulerClient } from "./sqlite-scheduler";
export { SqliteEventLoop } from "./sqlite-event-loop";
export { SqliteTaskQueueClient } from "./sqlite-task-queue";
Expand Down
69 changes: 69 additions & 0 deletions packages/bun-sqlite-runtime/src/sqlite-store.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import { Database } from "bun:sqlite";
import { expect, test } from "bun:test";
import {
defineStore,
type SchedulerClient,
type StandardSchemaV1,
type WorkflowEvent,
} from "@yieldstar/core";
import { SqliteStoreClient } from "./sqlite-store";

type State = {
messages: { id: string }[];
};

const Store = defineStore("sqlite-test", schema<State>());

test("sqlite store updates wake matching waiters", async () => {
const db = new Database(":memory:");
const events: WorkflowEvent[] = [];
const schedulerClient: SchedulerClient = {
async requestWakeUp(event) {
events.push(event);
},
};
const client = new SqliteStoreClient({ db, schedulerClient });
const event = {
workflowId: "workflow",
executionId: "execution",
params: undefined,
context: new Map(),
};

await client.getOrCreateStore({
definition: Store,
id: "one",
initial: { messages: [] },
});
await client.registerWaiter({
workflowId: event.workflowId,
executionId: event.executionId,
stepKey: "next-message",
event,
storeName: Store.name,
storeId: "one",
sinceVersion: 0,
readPaths: [["messages"]],
});
await client.updateStore({
definition: Store,
id: "one",
updater(draft) {
draft.messages.push({ id: "msg-1" });
},
});

expect(events).toEqual([event]);
});

function schema<T>(): StandardSchemaV1<unknown, T> {
return {
"~standard": {
version: 1,
vendor: "yieldstar-test",
validate(value) {
return { value: value as T };
},
},
};
}
Loading