diff --git a/packages/cli/src/templates/cron.ts b/packages/cli/src/templates/cron.ts new file mode 100644 index 0000000..5e66447 --- /dev/null +++ b/packages/cli/src/templates/cron.ts @@ -0,0 +1,16 @@ +export const cronIndexTemplate = () => `// This is where you define your Edgepod cron jobs! +// Cron jobs run on a schedule inside your Durable Object using Cloudflare's alarm API. +// They execute even when no requests are incoming — perfect for cleanup, reminders, etc. +// +// Define a cron job using the schedule() helper and export it: +// +// import { createSchedule } from "@edgepod/server"; +// import type { CronCtx } from "../types"; +// +// export const cleanupExpiredSessions = createSchedule("0 */6 * * *", async (ctx: CronCtx) => { +// await ctx.db.delete(schema.sessions).where(lt(schema.sessions.expiresAt, new Date())); +// }); +// + +export {}; +`; diff --git a/packages/cli/src/templates/server.ts b/packages/cli/src/templates/server.ts index 8901e9a..246db50 100644 --- a/packages/cli/src/templates/server.ts +++ b/packages/cli/src/templates/server.ts @@ -13,11 +13,13 @@ export const serverTemplate = (opts?: DataLocationOptions) => { return `import { edgePodFetch, BaseEdgePodEngine } from "@edgepod/server"; import * as schema from "../schema"; import * as functions from "../functions"; +import * as cron from "../cron"; import migrations from "./migrations/index"; export class EdgePodEngine extends BaseEdgePodEngine { protected override schema = schema; protected override userFunctions = functions; + protected override cronFunctions = cron; protected override migrations = migrations; } diff --git a/packages/cli/src/templates/types.ts b/packages/cli/src/templates/types.ts index 94318de..fcab57a 100644 --- a/packages/cli/src/templates/types.ts +++ b/packages/cli/src/templates/types.ts @@ -1,6 +1,6 @@ export const genTypesTemplate = () => `/// import * as schema from "./schema"; -import type { EdgePodContext } from "@edgepod/server"; +import type { EdgePodContext, CronContext } from "@edgepod/server"; // Any env vars added to .env will be automatically deployed via Wrangler // Please add any custom env vars you need to the CustomEnv type below for type safety in your functions @@ -28,4 +28,6 @@ export type Ctx = Variables> = EdgePo TVariables, User >; + +export type CronCtx = CronContext; `; diff --git a/packages/cli/src/utils/files.ts b/packages/cli/src/utils/files.ts index 905df06..acff861 100644 --- a/packages/cli/src/utils/files.ts +++ b/packages/cli/src/utils/files.ts @@ -2,6 +2,7 @@ import fs from "node:fs/promises"; import { webcrypto } from "node:crypto"; import { consola } from "consola"; import { functionsIndexTemplate } from "../templates/functions"; +import { cronIndexTemplate } from "../templates/cron"; import { middlewaresTemplate } from "../templates/middlewares"; import { schemaTemplate } from "../templates/schema"; import { wranglerJsonTemplate } from "../templates/wrangler"; @@ -23,6 +24,7 @@ export default null; ["edgepod/types.ts", genTypesTemplate()], ["edgepod/schema.ts", schemaTemplate()], ["edgepod/middlewares.ts", middlewaresTemplate()], + ["edgepod/cron/index.ts", cronIndexTemplate()], ["edgepod/functions/index.ts", functionsIndexTemplate()], ["edgepod/.generated/server.ts", serverTemplate(opts)], ["edgepod/.generated/migrations/index.ts", emptyMigrationsIndex], diff --git a/packages/server/package.json b/packages/server/package.json index 321a26f..8327177 100644 --- a/packages/server/package.json +++ b/packages/server/package.json @@ -46,6 +46,7 @@ "dependencies": { "@cloudflare/workers-types": "^4.20260423.1", "@logtape/logtape": "^2.0.5", + "croner": "^10.0.1", "drizzle-orm": "^0.45.2", "jose": "^6.2.3", "neverthrow": "^8.2.0", diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index a35d565..0142297 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -1,3 +1,4 @@ export * from "./types"; export * from "./server"; export * from "./tools/createMiddleware"; +export * from "./tools/createSchedule"; diff --git a/packages/server/src/server/cron.ts b/packages/server/src/server/cron.ts new file mode 100644 index 0000000..14ff2f4 --- /dev/null +++ b/packages/server/src/server/cron.ts @@ -0,0 +1,121 @@ +import { Cron } from "croner"; +import { createTrackedDb } from "../tools/createTrackedDb"; +import { createLogger } from "./logger"; +import { hashMetaTableNames } from "../tools/hashTableName"; +import type { CronContext, CronDefinition, EdgePodSessionMap } from "../types"; + +export async function scheduleNextAlarm( + cronFunctions: Record>, + setAlarm: (ms: number) => Promise, +) { + const now = new Date(); + let soonestMs: number | null = null; + + for (const entry of Object.values(cronFunctions)) { + try { + const cron = new Cron(entry.schedule, { timezone: "UTC" }); + const nextRuns = cron.nextRuns(1, now); + const nextRun = nextRuns[0]; + if (nextRun) { + const nextMs = nextRun.getTime(); + if (soonestMs === null || nextMs < soonestMs) { + soonestMs = nextMs; + } + } + } catch (e) { + console.error( + "[EdgePod] Failed to parse cron schedule:", + e instanceof Error ? e.message : String(e), + ); + } + } + + if (soonestMs !== null) { + await setAlarm(soonestMs); + } +} + +export async function executeCron( + name: string, + def: CronDefinition, + scheduledTime: Date, + rawDb: any, + env: any, + activeSessions: EdgePodSessionMap, + cascadeGraph: Map>, + broadcastInvalidations: (tables: string[]) => void, +) { + const traceId = `cron:${name}:${scheduledTime.getTime()}`; + + const tablesRead = new Set(); + const tablesWritten = new Set(); + const warnings: string[] = []; + const variableStore = new Map(); + + const dbProxy = createTrackedDb( + rawDb, + `cron:${name}`, + activeSessions, + tablesRead, + tablesWritten, + cascadeGraph, + warnings, + ); + + const ctx: CronContext> = { + db: dbProxy as any, + unsafeRawDb: rawDb, + env, + log: createLogger(traceId), + subscribeTo: (_tables: string[]) => {}, + invalidate: (tables: string[]) => tables.forEach((t) => tablesWritten.add(t)), + set: (key: string, value: any) => variableStore.set(key, value), + get: (key: string) => variableStore.get(key) as any, + cron: def.schedule, + scheduledTime: scheduledTime.toISOString(), + }; + + await def.handler(ctx); + + if (tablesWritten.size > 0) { + const hashedTableNames = hashMetaTableNames(Array.from(tablesWritten)); + broadcastInvalidations(hashedTableNames); + } +} + +export async function handleCronAlarm( + cronFunctions: Record>, + setAlarm: (ms: number) => Promise, + rawDb: any, + env: any, + activeSessions: EdgePodSessionMap, + cascadeGraph: Map>, + broadcastInvalidations: (tables: string[]) => void, +) { + const now = new Date(); + const checkFrom = new Date(now.getTime() - 120000); + + for (const [name, entry] of Object.entries(cronFunctions)) { + try { + const cron = new Cron(entry.schedule, { timezone: "UTC" }); + const nextRuns = cron.nextRuns(1, checkFrom); + const nextRun = nextRuns[0]; + if (nextRun && nextRun.getTime() <= now.getTime()) { + await executeCron( + name, + entry, + nextRun, + rawDb, + env, + activeSessions, + cascadeGraph, + broadcastInvalidations, + ); + } + } catch (e) { + console.error(`[EdgePod] Cron "${name}" failed:`, e instanceof Error ? e.message : String(e)); + } + } + + await scheduleNextAlarm(cronFunctions, setAlarm); +} diff --git a/packages/server/src/server/do.ts b/packages/server/src/server/do.ts index b6baf7a..6153996 100644 --- a/packages/server/src/server/do.ts +++ b/packages/server/src/server/do.ts @@ -5,8 +5,16 @@ import { createTrackedDb } from "../tools/createTrackedDb"; import { buildCascadeGraph } from "../tools/buildCascadeGraph"; import { initJwtSigner, getJwtSigner } from "./auth"; import { initLogger, createLogger } from "./logger"; -import type { EdgePodSessionMap, EdgePodContext, RpcRequest, RpcMeta, JsonValue } from "../types"; +import type { + EdgePodSessionMap, + EdgePodContext, + CronDefinition, + RpcRequest, + RpcMeta, + JsonValue, +} from "../types"; import { hashMetaTableNames, hashTableName } from "../tools/hashTableName"; +import { scheduleNextAlarm, handleCronAlarm } from "./cron"; // neverthrow discriminated unions (Result) are not serializable across // Cloudflare Durable Object RPC boundaries because they carry prototype @@ -20,6 +28,7 @@ export class BaseEdgePodEngine extends DurableObject { private activeSessions: EdgePodSessionMap = new Map(); private cascadeGraph: Map> = new Map(); protected userFunctions: Record Promise | JsonValue> = {}; + protected cronFunctions: Record> = {}; protected schema: Record = {}; protected migrations: { journal: unknown; @@ -53,6 +62,15 @@ export class BaseEdgePodEngine extends DurableObject { if (this.migrations) { await migrate(this.rawDb, this.migrations as any); } + + // Schedule the first alarm on cold start. This replaces any existing alarm, + // which is safe — when the DO wakes from hibernation for an RPC request (not + // alarm), the recomputed soonest-next-fire will be ≤ the previously pending + // alarm time, so no cron run is lost. When the DO wakes because an alarm fired, + // the alarm has already been consumed and alarm() will fire regardless. + if (Object.keys(this.cronFunctions).length > 0) { + await scheduleNextAlarm(this.cronFunctions, (ms) => this.ctx.storage.setAlarm(ms)); + } }); } @@ -109,7 +127,20 @@ export class BaseEdgePodEngine extends DurableObject { } } - // The RPC Execution Engine + // Cron + override async alarm(): Promise { + await handleCronAlarm( + this.cronFunctions, + (ms) => this.ctx.storage.setAlarm(ms), + this.rawDb, + this.env, + this.activeSessions, + this.cascadeGraph, + (tables) => this.broadcastInvalidations(tables), + ); + } + + // RPC Execution Engine // aka this is where we are running user code async executeRpc(functionName: string, args: any, rpcCtx: RpcRequest): Promise { const { headers, user, traceId, reactive } = rpcCtx; diff --git a/packages/server/src/tools/createSchedule.ts b/packages/server/src/tools/createSchedule.ts new file mode 100644 index 0000000..b8f7439 --- /dev/null +++ b/packages/server/src/tools/createSchedule.ts @@ -0,0 +1,12 @@ +import type { CronDefinition, CronContext } from "../types"; + +export function createSchedule< + TSchema extends Record = Record, + TEnv = Record, + TVariables extends Record = Record, +>( + schedule: string, + handler: (ctx: CronContext) => Promise, +): CronDefinition { + return { schedule, handler }; +} diff --git a/packages/server/src/types/index.ts b/packages/server/src/types/index.ts index 53134aa..3c3d5e0 100644 --- a/packages/server/src/types/index.ts +++ b/packages/server/src/types/index.ts @@ -78,3 +78,22 @@ export type EdgePodContext< set: (key: Key, value: TVariables[Key]) => void; get: (key: Key) => TVariables[Key]; }; + +// Cron +export type CronContext< + TSchema extends Record, + TEnv = Record, + TVariables extends Record = Record, +> = Omit, "headers" | "user" | "signJwt"> & { + cron: string; + scheduledTime: string; +}; + +export type CronDefinition< + TSchema extends Record = Record, + TEnv = Record, + TVariables extends Record = Record, +> = { + schedule: string; + handler: (ctx: CronContext) => Promise; +}; diff --git a/playground/edgepod/.generated/server.ts b/playground/edgepod/.generated/server.ts index f688df4..7c0eded 100644 --- a/playground/edgepod/.generated/server.ts +++ b/playground/edgepod/.generated/server.ts @@ -1,11 +1,13 @@ import { edgePodFetch, BaseEdgePodEngine } from "@edgepod/server"; import * as schema from "../schema"; import * as functions from "../functions"; +import * as cron from "../cron"; import migrations from "./migrations/index"; export class EdgePodEngine extends BaseEdgePodEngine { protected override schema = schema; protected override userFunctions = functions; + protected override cronFunctions = cron; protected override migrations = migrations; } diff --git a/playground/edgepod/cron/index.ts b/playground/edgepod/cron/index.ts new file mode 100644 index 0000000..d50cce9 --- /dev/null +++ b/playground/edgepod/cron/index.ts @@ -0,0 +1,14 @@ +// This is where you define your Edgepod cron jobs! +// Cron jobs run on a schedule inside your Durable Object using Cloudflare's alarm API. +// They execute even when no requests are incoming — perfect for cleanup, reminders, etc. +// +// Define a cron job using the schedule() helper and export it: +// +// import { createSchedule } from "@edgepod/server"; +// import type { CronCtx } from "../types"; +// +// export const cleanupExpiredSessions = createSchedule("0 */6 * * *", async (ctx: CronCtx) => { +// await ctx.db.delete(schema.sessions).where(lt(schema.sessions.expiresAt, new Date())); +// }); +// +export {}; diff --git a/playground/edgepod/types.ts b/playground/edgepod/types.ts index e545ec0..5ec437f 100644 --- a/playground/edgepod/types.ts +++ b/playground/edgepod/types.ts @@ -1,6 +1,6 @@ /// import * as schema from "./schema"; -import type { EdgePodContext } from "@edgepod/server"; +import type { EdgePodContext, CronContext } from "@edgepod/server"; // Any env vars added to .env will be automatically deployed via Wrangler // Please add any custom env vars you need to the CustomEnv type below for type safety in your functions @@ -28,3 +28,5 @@ export type Ctx = Variables> = EdgePo TVariables, User >; + +export type CronCtx = CronContext; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 9ad1da2..8b63220 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -109,6 +109,9 @@ importers: '@logtape/logtape': specifier: ^2.0.5 version: 2.0.5 + croner: + specifier: ^10.0.1 + version: 10.0.1 drizzle-orm: specifier: ^0.45.2 version: 0.45.2(@cloudflare/workers-types@4.20260423.1)(@electric-sql/pglite@0.4.1)(@types/better-sqlite3@7.6.13)(better-sqlite3@12.9.0)(mysql2@3.15.3)(postgres@3.4.7) @@ -2131,6 +2134,10 @@ packages: resolution: {integrity: sha512-ei8Aos7ja0weRpFzJnEA9UHJ/7XQmqglbRwnf2ATjcB9Wq874VKH9kfjjirM6UhU2/E5fFYadylyhFldcqSidQ==} engines: {node: '>=18'} + croner@10.0.1: + resolution: {integrity: sha512-ixNtAJndqh173VQ4KodSdJEI6nuioBWI0V1ITNKhZZsO0pEMoDxz539T4FTTbSZ/xIOSuDnzxLVRqBVSvPNE2g==} + engines: {node: '>=18.0'} + cross-spawn@7.0.6: resolution: {integrity: sha512-uV2QOWP2nWzsy2aMp8aRibhi9dlzF5Hgh5SHaB9OiTGEyDTiJJyx0uy51QXdyWbtAHNua4XJzUKca3OzKUd3vA==} engines: {node: '>= 8'} @@ -5017,6 +5024,8 @@ snapshots: cookie@1.1.1: {} + croner@10.0.1: {} + cross-spawn@7.0.6: dependencies: path-key: 3.1.1