Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions packages/cli/src/templates/cron.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
export const cronIndexTemplate = () => `// This is where you define your Edgepod cron jobs!
// Cron jobs run on a schedule inside your Durable Object using Cloudflare's alarm API.
// They execute even when no requests are incoming — perfect for cleanup, reminders, etc.
//
// Define a cron job using the schedule() helper and export it:
//
// import { createSchedule } from "@edgepod/server";
// import type { CronCtx } from "../types";
//
// export const cleanupExpiredSessions = createSchedule("0 */6 * * *", async (ctx: CronCtx) => {
// await ctx.db.delete(schema.sessions).where(lt(schema.sessions.expiresAt, new Date()));
// });
//

export {};
`;
2 changes: 2 additions & 0 deletions packages/cli/src/templates/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ export const serverTemplate = (opts?: DataLocationOptions) => {
return `import { edgePodFetch, BaseEdgePodEngine } from "@edgepod/server";
import * as schema from "../schema";
import * as functions from "../functions";
import * as cron from "../cron";
import migrations from "./migrations/index";

export class EdgePodEngine extends BaseEdgePodEngine {
protected override schema = schema;
protected override userFunctions = functions;
protected override cronFunctions = cron;
protected override migrations = migrations;
}

Expand Down
4 changes: 3 additions & 1 deletion packages/cli/src/templates/types.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
export const genTypesTemplate = () => `/// <reference types="./.generated/cloudflare-env" />
import * as schema from "./schema";
import type { EdgePodContext } from "@edgepod/server";
import type { EdgePodContext, CronContext } from "@edgepod/server";

// Any env vars added to .env will be automatically deployed via Wrangler
// Please add any custom env vars you need to the CustomEnv type below for type safety in your functions
Expand Down Expand Up @@ -28,4 +28,6 @@ export type Ctx<TVariables extends Record<string, unknown> = Variables> = EdgePo
TVariables,
User
>;

export type CronCtx = CronContext<typeof schema, Env & CustomEnv>;
`;
2 changes: 2 additions & 0 deletions packages/cli/src/utils/files.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import fs from "node:fs/promises";
import { webcrypto } from "node:crypto";
import { consola } from "consola";
import { functionsIndexTemplate } from "../templates/functions";
import { cronIndexTemplate } from "../templates/cron";
import { middlewaresTemplate } from "../templates/middlewares";
import { schemaTemplate } from "../templates/schema";
import { wranglerJsonTemplate } from "../templates/wrangler";
Expand All @@ -23,6 +24,7 @@ export default null;
["edgepod/types.ts", genTypesTemplate()],
["edgepod/schema.ts", schemaTemplate()],
["edgepod/middlewares.ts", middlewaresTemplate()],
["edgepod/cron/index.ts", cronIndexTemplate()],
["edgepod/functions/index.ts", functionsIndexTemplate()],
["edgepod/.generated/server.ts", serverTemplate(opts)],
["edgepod/.generated/migrations/index.ts", emptyMigrationsIndex],
Expand Down
1 change: 1 addition & 0 deletions packages/server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
"dependencies": {
"@cloudflare/workers-types": "^4.20260423.1",
"@logtape/logtape": "^2.0.5",
"croner": "^10.0.1",
"drizzle-orm": "^0.45.2",
"jose": "^6.2.3",
"neverthrow": "^8.2.0",
Expand Down
1 change: 1 addition & 0 deletions packages/server/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from "./types";
export * from "./server";
export * from "./tools/createMiddleware";
export * from "./tools/createSchedule";
121 changes: 121 additions & 0 deletions packages/server/src/server/cron.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import { Cron } from "croner";
import { createTrackedDb } from "../tools/createTrackedDb";
import { createLogger } from "./logger";
import { hashMetaTableNames } from "../tools/hashTableName";
import type { CronContext, CronDefinition, EdgePodSessionMap } from "../types";

export async function scheduleNextAlarm(
cronFunctions: Record<string, CronDefinition<any, any, any>>,
setAlarm: (ms: number) => Promise<void>,
) {
const now = new Date();
let soonestMs: number | null = null;

for (const entry of Object.values(cronFunctions)) {
try {
const cron = new Cron(entry.schedule, { timezone: "UTC" });
const nextRuns = cron.nextRuns(1, now);
const nextRun = nextRuns[0];
if (nextRun) {
const nextMs = nextRun.getTime();
if (soonestMs === null || nextMs < soonestMs) {
soonestMs = nextMs;
}
}
} catch (e) {
console.error(
"[EdgePod] Failed to parse cron schedule:",
e instanceof Error ? e.message : String(e),
);
}
}

if (soonestMs !== null) {
await setAlarm(soonestMs);
}
}

export async function executeCron(
name: string,
def: CronDefinition<any, any, any>,
scheduledTime: Date,
rawDb: any,
env: any,
activeSessions: EdgePodSessionMap,
cascadeGraph: Map<string, Set<string>>,
broadcastInvalidations: (tables: string[]) => void,
) {
const traceId = `cron:${name}:${scheduledTime.getTime()}`;

const tablesRead = new Set<string>();
const tablesWritten = new Set<string>();
const warnings: string[] = [];
const variableStore = new Map();

const dbProxy = createTrackedDb(
rawDb,
`cron:${name}`,
activeSessions,
tablesRead,
tablesWritten,
cascadeGraph,
warnings,
);

const ctx: CronContext<any, any, Record<string, any>> = {
db: dbProxy as any,
unsafeRawDb: rawDb,
env,
log: createLogger(traceId),
subscribeTo: (_tables: string[]) => {},
invalidate: (tables: string[]) => tables.forEach((t) => tablesWritten.add(t)),
set: (key: string, value: any) => variableStore.set(key, value),
get: (key: string) => variableStore.get(key) as any,
cron: def.schedule,
scheduledTime: scheduledTime.toISOString(),
};

await def.handler(ctx);

if (tablesWritten.size > 0) {
const hashedTableNames = hashMetaTableNames(Array.from(tablesWritten));
broadcastInvalidations(hashedTableNames);
}
}

export async function handleCronAlarm(
cronFunctions: Record<string, CronDefinition<any, any, any>>,
setAlarm: (ms: number) => Promise<void>,
rawDb: any,
env: any,
activeSessions: EdgePodSessionMap,
cascadeGraph: Map<string, Set<string>>,
broadcastInvalidations: (tables: string[]) => void,
) {
const now = new Date();
const checkFrom = new Date(now.getTime() - 120000);

for (const [name, entry] of Object.entries(cronFunctions)) {
try {
const cron = new Cron(entry.schedule, { timezone: "UTC" });
const nextRuns = cron.nextRuns(1, checkFrom);
const nextRun = nextRuns[0];
if (nextRun && nextRun.getTime() <= now.getTime()) {
await executeCron(
name,
entry,
nextRun,
rawDb,
env,
activeSessions,
cascadeGraph,
broadcastInvalidations,
);
}
Comment thread
greptile-apps[bot] marked this conversation as resolved.
} catch (e) {
console.error(`[EdgePod] Cron "${name}" failed:`, e instanceof Error ? e.message : String(e));
}
}

await scheduleNextAlarm(cronFunctions, setAlarm);
}
35 changes: 33 additions & 2 deletions packages/server/src/server/do.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,16 @@ import { createTrackedDb } from "../tools/createTrackedDb";
import { buildCascadeGraph } from "../tools/buildCascadeGraph";
import { initJwtSigner, getJwtSigner } from "./auth";
import { initLogger, createLogger } from "./logger";
import type { EdgePodSessionMap, EdgePodContext, RpcRequest, RpcMeta, JsonValue } from "../types";
import type {
EdgePodSessionMap,
EdgePodContext,
CronDefinition,
RpcRequest,
RpcMeta,
JsonValue,
} from "../types";
import { hashMetaTableNames, hashTableName } from "../tools/hashTableName";
import { scheduleNextAlarm, handleCronAlarm } from "./cron";

// neverthrow discriminated unions (Result<T, E>) are not serializable across
// Cloudflare Durable Object RPC boundaries because they carry prototype
Expand All @@ -20,6 +28,7 @@ export class BaseEdgePodEngine extends DurableObject {
private activeSessions: EdgePodSessionMap = new Map();
private cascadeGraph: Map<string, Set<string>> = new Map();
protected userFunctions: Record<string, (...args: any[]) => Promise<JsonValue> | JsonValue> = {};
protected cronFunctions: Record<string, CronDefinition<any, any, any>> = {};
protected schema: Record<string, unknown> = {};
protected migrations: {
journal: unknown;
Expand Down Expand Up @@ -53,6 +62,15 @@ export class BaseEdgePodEngine extends DurableObject {
if (this.migrations) {
await migrate(this.rawDb, this.migrations as any);
}

// Schedule the first alarm on cold start. This replaces any existing alarm,
// which is safe — when the DO wakes from hibernation for an RPC request (not
// alarm), the recomputed soonest-next-fire will be ≤ the previously pending
// alarm time, so no cron run is lost. When the DO wakes because an alarm fired,
// the alarm has already been consumed and alarm() will fire regardless.
if (Object.keys(this.cronFunctions).length > 0) {
await scheduleNextAlarm(this.cronFunctions, (ms) => this.ctx.storage.setAlarm(ms));
}
Comment thread
greptile-apps[bot] marked this conversation as resolved.
});
}

Expand Down Expand Up @@ -109,7 +127,20 @@ export class BaseEdgePodEngine extends DurableObject {
}
}

// The RPC Execution Engine
// Cron
override async alarm(): Promise<void> {
await handleCronAlarm(
this.cronFunctions,
(ms) => this.ctx.storage.setAlarm(ms),
this.rawDb,
this.env,
this.activeSessions,
this.cascadeGraph,
(tables) => this.broadcastInvalidations(tables),
);
}

// RPC Execution Engine
// aka this is where we are running user code
async executeRpc(functionName: string, args: any, rpcCtx: RpcRequest): Promise<ExecuteRpcResult> {
const { headers, user, traceId, reactive } = rpcCtx;
Expand Down
12 changes: 12 additions & 0 deletions packages/server/src/tools/createSchedule.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import type { CronDefinition, CronContext } from "../types";

export function createSchedule<
TSchema extends Record<string, unknown> = Record<string, unknown>,
TEnv = Record<string, string>,
TVariables extends Record<string, unknown> = Record<string, unknown>,
>(
schedule: string,
handler: (ctx: CronContext<TSchema, TEnv, TVariables>) => Promise<void>,
): CronDefinition<TSchema, TEnv, TVariables> {
return { schedule, handler };
}
19 changes: 19 additions & 0 deletions packages/server/src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,22 @@ export type EdgePodContext<
set: <Key extends keyof TVariables>(key: Key, value: TVariables[Key]) => void;
get: <Key extends keyof TVariables>(key: Key) => TVariables[Key];
};

// Cron
export type CronContext<
TSchema extends Record<string, unknown>,
TEnv = Record<string, string>,
TVariables extends Record<string, unknown> = Record<string, unknown>,
> = Omit<EdgePodContext<TSchema, TEnv, TVariables>, "headers" | "user" | "signJwt"> & {
cron: string;
scheduledTime: string;
};

export type CronDefinition<
TSchema extends Record<string, unknown> = Record<string, unknown>,
TEnv = Record<string, string>,
TVariables extends Record<string, unknown> = Record<string, unknown>,
> = {
schedule: string;
handler: (ctx: CronContext<TSchema, TEnv, TVariables>) => Promise<void>;
};
2 changes: 2 additions & 0 deletions playground/edgepod/.generated/server.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import { edgePodFetch, BaseEdgePodEngine } from "@edgepod/server";
import * as schema from "../schema";
import * as functions from "../functions";
import * as cron from "../cron";
import migrations from "./migrations/index";

export class EdgePodEngine extends BaseEdgePodEngine {
protected override schema = schema;
protected override userFunctions = functions;
protected override cronFunctions = cron;
protected override migrations = migrations;
}

Expand Down
14 changes: 14 additions & 0 deletions playground/edgepod/cron/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// This is where you define your Edgepod cron jobs!
// Cron jobs run on a schedule inside your Durable Object using Cloudflare's alarm API.
// They execute even when no requests are incoming — perfect for cleanup, reminders, etc.
//
// Define a cron job using the schedule() helper and export it:
//
// import { createSchedule } from "@edgepod/server";
// import type { CronCtx } from "../types";
//
// export const cleanupExpiredSessions = createSchedule("0 */6 * * *", async (ctx: CronCtx) => {
// await ctx.db.delete(schema.sessions).where(lt(schema.sessions.expiresAt, new Date()));
// });
//
export {};
4 changes: 3 additions & 1 deletion playground/edgepod/types.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/// <reference types="./.generated/cloudflare-env" />
import * as schema from "./schema";
import type { EdgePodContext } from "@edgepod/server";
import type { EdgePodContext, CronContext } from "@edgepod/server";

// Any env vars added to .env will be automatically deployed via Wrangler
// Please add any custom env vars you need to the CustomEnv type below for type safety in your functions
Expand Down Expand Up @@ -28,3 +28,5 @@ export type Ctx<TVariables extends Record<string, unknown> = Variables> = EdgePo
TVariables,
User
>;

export type CronCtx = CronContext<typeof schema, Env & CustomEnv>;
9 changes: 9 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.