From b0368805a96638929425ff39dd18a5af4fc78e78 Mon Sep 17 00:00:00 2001 From: Oleksii Date: Wed, 10 Jun 2026 01:20:48 -0300 Subject: [PATCH] =?UTF-8?q?feat(triggers):=20Activepieces=20polling=20trig?= =?UTF-8?q?gers=20=E2=80=94=20poll=20loop,=20sidecar=20protocol,=20CRUD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds polling-based triggers for ActivePieces pieces: a tenant-scoped trigger registration (trigger_type: activepieces_poll) whose config names a piece, a piece trigger, auth/props (credentials:// resolved per poll), and a schedule (interval_secs or cron). The engine's trigger sync loop spawns a poll listener per registration that asks the Node sidecar's new POST /poll endpoint to run the piece trigger headlessly; each returned item creates one durable instance, and the trigger's store contents are persisted as an opaque dedupe cursor (trigger_poll_state table) and sent back on the next poll. Failed polls record last_error and bump consecutive_failures (surfaced on GET /triggers/{slug}) without ever stopping the loop. - orch8-types: TriggerType::ActivepiecesPoll + TriggerPollState - orch8-storage: trigger_poll_state table (migration 044 + sqlite schema), get/upsert poll-state on AdminStore (sqlite, postgres, encrypting) - orch8-engine: ap_poll module (config parsing, poll loop, failure bookkeeping), trigger sync wiring, cron::next_fire_from_expr, metrics - orch8-api: config validation on create, poll_state on GET, OpenAPI - sidecar: POST /poll op, findTrigger, buildTriggerContext with seeded store; webhook-strategy triggers rejected as permanent - tests: engine poll loop vs dep-free TCP mock sidecar, API CRUD/tenant isolation, storage round-trips, sidecar stub-piece polling suite Co-Authored-By: Claude Fable 5 --- activepieces/README.md | 51 +- activepieces/src/context.ts | 65 ++ activepieces/src/index.ts | 4 +- activepieces/src/registry.ts | 44 +- activepieces/src/server.ts | 156 +++- activepieces/tests/trigger_poll.test.ts | 350 ++++++++ dashboard/src/api.ts | 7 +- dashboard/src/pages/Triggers.tsx | 5 + migrations/044_trigger_poll_state.sql | 23 + migrations/down/044_trigger_poll_state.sql | 9 + orch8-api/src/openapi.rs | 1 + orch8-api/src/triggers.rs | 41 +- orch8-api/tests/triggers_activepieces.rs | 331 +++++++ orch8-engine/src/ap_poll.rs | 981 +++++++++++++++++++++ orch8-engine/src/cron.rs | 9 + orch8-engine/src/lib.rs | 1 + orch8-engine/src/metrics.rs | 4 + orch8-engine/src/triggers.rs | 8 + orch8-storage/src/encrypting.rs | 12 + orch8-storage/src/lib.rs | 16 +- orch8-storage/src/postgres/mod.rs | 14 + orch8-storage/src/postgres/triggers.rs | 71 +- orch8-storage/src/sqlite/mod.rs | 14 + orch8-storage/src/sqlite/schema.rs | 12 + orch8-storage/src/sqlite/triggers.rs | 71 +- orch8-storage/tests/storage_integration.rs | 124 +++ orch8-types/src/trigger.rs | 82 ++ 27 files changed, 2493 insertions(+), 13 deletions(-) create mode 100644 activepieces/tests/trigger_poll.test.ts create mode 100644 migrations/044_trigger_poll_state.sql create mode 100644 migrations/down/044_trigger_poll_state.sql create mode 100644 orch8-api/tests/triggers_activepieces.rs create mode 100644 orch8-engine/src/ap_poll.rs diff --git a/activepieces/README.md b/activepieces/README.md index 61a2443e..70ea613e 100644 --- a/activepieces/README.md +++ b/activepieces/README.md @@ -65,6 +65,55 @@ ORCH8_ACTIVEPIECES_URL=http://127.0.0.1:50052/execute ./orch8-server The handler prefix `ap://` is parsed as `ap://.`. `auth` and `props` are passed through to the piece as `context.auth` and `context.propsValue`. +## Polling triggers (`POST /poll`) + +The engine's `activepieces_poll` trigger loop calls `POST /poll` on a schedule +to run a piece trigger's poll headlessly: + +```json +{ + "piece": "stripe", + "trigger": "new_failed_payment", + "auth": { "access_token": "sk_live_..." }, + "props": { }, + "state": { "lastPoll": 1717171717 }, + "slug": "stripe-failed-payments" +} +``` + +Response: `{ "ok": true, "items": [ ... ], "state": { ... } }` — `items` are +the new events since the cursor (the engine creates one durable instance per +item), `state` is the trigger's `context.store` contents after the run. The +engine persists `state` verbatim and sends it back on the next poll, which is +exactly the dedupe contract AP's `pollingHelper` expects (`lastPoll` epoch / +seen-id cursors live in the store). `state` is `null` on the first poll. + +Only `TriggerStrategy.POLLING` triggers are supported — webhook-strategy +triggers are rejected with a permanent error (`onEnable`/`onDisable` +platform registration doesn't exist headlessly). Register a poll trigger on +the orch8 side with: + +```bash +curl -X POST http://localhost:8080/api/v1/triggers -H 'content-type: application/json' -d '{ + "slug": "stripe-failed-payments", + "sequence_name": "payment-recovery", + "tenant_id": "t1", + "trigger_type": "activepieces_poll", + "config": { + "piece": "stripe", + "trigger": "new_failed_payment", + "auth": "credentials://stripe-prod", + "interval_secs": 60 + } +}' +``` + +`config.auth` / `config.props` may contain `credentials://` references — +the engine resolves them tenant-scoped on every poll. Schedule is either +`interval_secs` (default 60) or `cron` (UTC), not both. Poll failures are +recorded on the registration (`GET /triggers/{slug}` returns `poll_state` +with `last_error` and `consecutive_failures`) and retried on the next tick. + ## Error semantics | HTTP status | Error type | Retryable? | @@ -87,7 +136,7 @@ Classification heuristic inside the sidecar: **Actions work.** Most community piece actions execute cleanly through the stub `ActionContext`. -**Triggers don't (yet).** Polling and webhook triggers need scaffolding for `onEnable`/`onDisable` lifecycle hooks — use orch8's native webhook / NATS / file-watch triggers for inbound events. +**Polling triggers work.** `POST /poll` runs `TriggerStrategy.POLLING` triggers with a store seeded from the engine-persisted cursor (see above). Webhook-strategy triggers are not supported — they need platform-side `onEnable`/`onDisable` registration and an inbound URL; use orch8's native webhook triggers for those events. **OAuth2 refresh is the caller's responsibility.** The adapter does not refresh tokens — pass a fresh `access_token` in `auth`. A forthcoming companion package will integrate with orch8's plugin/credentials registry. diff --git a/activepieces/src/context.ts b/activepieces/src/context.ts index 82f29db8..6201c872 100644 --- a/activepieces/src/context.ts +++ b/activepieces/src/context.ts @@ -105,3 +105,68 @@ export function buildActionContext(input: ContextInput): Record generateResumeUrl: (_params: unknown) => "", }; } + +export interface TriggerContextInput { + auth: unknown; + propsValue: Record; + /** + * Persisted store contents from the previous poll (the dedupe cursor — + * `lastPoll` epoch, seen item ids, ...). `null`/`undefined` on the first + * poll. The orch8 engine persists whatever {@link buildTriggerContext}'s + * `dumpStore` returns and sends it back on the next poll. + */ + state?: Record | null; + /** Trigger registration slug — used as the step/flow identifier. */ + slug: string; + serverPublicUrl?: string; + serverApiUrl?: string; +} + +/** + * Stub `TriggerContext` for headless polling-trigger execution. + * + * Same limitations as {@link buildActionContext}, with one key difference: + * the `store` is seeded from the caller-provided `state` blob and its final + * contents are returned via `dumpStore()`. AP polling triggers (and the + * framework's `pollingHelper`) keep their dedupe cursor in `context.store`, + * so persisting the store across polls is exactly what gives us + * exactly-once-ish item delivery without the AP platform's database. + * + * Webhook-only context members (`payload`, `webhookUrl`) are present but + * empty — polling triggers don't read them, and webhook triggers fail with + * a clear error at the route level before this context is ever built. + */ +export function buildTriggerContext(input: TriggerContextInput): { + ctx: Record; + dumpStore: () => Record; +} { + const base = buildActionContext({ + auth: input.auth, + propsValue: input.propsValue, + instanceId: input.slug, + blockId: input.slug, + serverPublicUrl: input.serverPublicUrl, + serverApiUrl: input.serverApiUrl, + }); + + const store = new Map(Object.entries(input.state ?? {})); + + const ctx: Record = { + ...base, + store: { + get: async (key: string) => store.get(key) ?? null, + put: async (key: string, value: unknown) => { + store.set(key, value); + return value; + }, + delete: async (key: string) => { + store.delete(key); + }, + }, + // Webhook-trigger members — benign empties for polling triggers. + payload: {}, + webhookUrl: "", + }; + + return { ctx, dumpStore: () => Object.fromEntries(store) }; +} diff --git a/activepieces/src/index.ts b/activepieces/src/index.ts index f6a29cf8..c6c9fbcb 100644 --- a/activepieces/src/index.ts +++ b/activepieces/src/index.ts @@ -18,8 +18,8 @@ import { createServer, ServerOptions } from "./server"; import { createDefaultLoader } from "./registry"; export { createServer } from "./server"; -export { createDefaultLoader, findAction } from "./registry"; -export { buildActionContext } from "./context"; +export { createDefaultLoader, findAction, findTrigger } from "./registry"; +export { buildActionContext, buildTriggerContext } from "./context"; export { classifyError, PieceExecutionError } from "./errors"; function parseEnvInt(name: string, fallback: number): number { diff --git a/activepieces/src/registry.ts b/activepieces/src/registry.ts index f861e9db..0b2eeea3 100644 --- a/activepieces/src/registry.ts +++ b/activepieces/src/registry.ts @@ -18,10 +18,26 @@ export interface PieceAction { run: (ctx: unknown) => Promise; } +/** + * Minimal structural view of a piece trigger. Polling triggers built with + * the AP framework's `createTrigger`/`pollingHelper` expose `type` (the + * `TriggerStrategy` — `"POLLING"` for the ones we can run headlessly) and a + * `run(ctx)` that returns the new items since the cursor stored in + * `ctx.store`. Lifecycle hooks (`onEnable`/`onDisable`) are intentionally + * not modelled — the orch8 engine owns the poll schedule, so there is no + * platform-side registration to perform. + */ +export interface PieceTrigger { + name: string; + displayName?: string; + type?: string; + run: (ctx: unknown) => Promise; +} + export interface Piece { displayName?: string; actions: () => Record | PieceAction[]; - triggers?: () => Record | unknown[]; + triggers?: () => Record | PieceTrigger[]; } export interface PieceLoader { @@ -149,3 +165,29 @@ export function findAction(piece: Piece, actionName: string): PieceAction { `action '${actionName}' not found on piece`, ); } + +/** + * Look up a trigger on a loaded piece. Mirrors {@link findAction}: handles + * both the record shape (keyed by trigger name) and the array shape (objects + * with a `.name` field). + */ +export function findTrigger(piece: Piece, triggerName: string): PieceTrigger { + const src = piece.triggers; + const raw = typeof src === "function" ? src() : src; + + if (Array.isArray(raw)) { + const found = raw.find((t) => t && t.name === triggerName); + if (found) return found; + } else if (raw && typeof raw === "object") { + const rec = raw as Record; + if (rec[triggerName]) return rec[triggerName]; + for (const t of Object.values(rec)) { + if (t && t.name === triggerName) return t; + } + } + + throw new PieceExecutionError( + "permanent", + `trigger '${triggerName}' not found on piece`, + ); +} diff --git a/activepieces/src/server.ts b/activepieces/src/server.ts index 2b6435c0..e662425d 100644 --- a/activepieces/src/server.ts +++ b/activepieces/src/server.ts @@ -2,9 +2,9 @@ // Copyright (c) 2026 Orch8, Inc. import * as http from "node:http"; -import { buildActionContext } from "./context"; +import { buildActionContext, buildTriggerContext } from "./context"; import { classifyError, PieceExecutionError } from "./errors"; -import { createDefaultLoader, findAction, Piece, PieceLoader } from "./registry"; +import { createDefaultLoader, findAction, findTrigger, Piece, PieceLoader } from "./registry"; /** * HTTP shape exchanged with the Rust `ap://` handler. @@ -41,6 +41,39 @@ export interface ExecuteRequest { block_id?: string; } +/** + * Polling-trigger protocol (POST /poll), used by the orch8 engine's + * `activepieces_poll` trigger loop. + * + * Request body: + * { + * "piece": "stripe", + * "trigger": "new_failed_payment", + * "auth": { ... } | "api-key", + * "props": { ... trigger-specific ... }, + * "state": { ... store blob from the previous poll ... } | null, + * "slug": "trigger registration slug" + * } + * + * Response body: + * Success (HTTP 200): { "ok": true, "items": [ ... ], "state": { ... } } + * - `items`: new events since the cursor (one orch8 instance each) + * - `state`: the trigger's store contents after the poll — the engine + * persists this verbatim and sends it back on the next poll. + * Failure: same envelope and status mapping as /execute. + * + * Only `TriggerStrategy.POLLING` triggers are supported; webhook-strategy + * triggers are rejected with a permanent error. + */ +export interface PollRequest { + piece: string; + trigger: string; + auth?: unknown; + props?: Record; + state?: Record | null; + slug?: string; +} + export interface ServerOptions { port: number; host?: string; @@ -91,6 +124,11 @@ async function handleRequest( return; } + if (req.method === "POST" && url === "/poll") { + await handlePoll(req, res, loader, timeoutMs, log); + return; + } + if (req.method !== "POST" || url !== "/execute") { writeJson(res, 404, { ok: false, @@ -166,6 +204,120 @@ async function handleRequest( } } +/** + * POST /poll — run a piece's polling trigger headlessly. + * + * Loads the piece, finds the trigger, seeds a trigger context's store with + * the caller-provided `state` blob, invokes `trigger.run(ctx)`, and returns + * the produced items plus the store's final contents as the next cursor. + */ +async function handlePoll( + req: http.IncomingMessage, + res: http.ServerResponse, + loader: PieceLoader, + timeoutMs: number, + log: NonNullable, +): Promise { + let body: PollRequest; + try { + body = await readJson(req); + } catch (err) { + writeJson(res, 422, { + ok: false, + error: { type: "permanent", message: `invalid request body: ${(err as Error).message}` }, + }); + return; + } + + if (!body || typeof body.piece !== "string" || typeof body.trigger !== "string") { + writeJson(res, 422, { + ok: false, + error: { type: "permanent", message: "request must include string 'piece' and 'trigger' fields" }, + }); + return; + } + if (body.state != null && (typeof body.state !== "object" || Array.isArray(body.state))) { + writeJson(res, 422, { + ok: false, + error: { type: "permanent", message: "'state' must be a JSON object or null" }, + }); + return; + } + + let piece: Piece; + try { + piece = await loader.load(body.piece); + } catch (err) { + respondWithError(res, err, log, { piece: body.piece }); + return; + } + + let trigger; + try { + trigger = findTrigger(piece, body.trigger); + } catch (err) { + respondWithError(res, err, log, { piece: body.piece, trigger: body.trigger }); + return; + } + + // Webhook-strategy triggers need platform-side registration (onEnable) + // and an inbound URL — neither exists headlessly. Fail fast and clearly. + if (typeof trigger.type === "string" && trigger.type.toUpperCase().includes("WEBHOOK")) { + respondWithError( + res, + new PieceExecutionError( + "permanent", + `trigger '${body.trigger}' uses the webhook strategy; only polling triggers are supported`, + ), + log, + { piece: body.piece, trigger: body.trigger }, + ); + return; + } + if (typeof trigger.run !== "function") { + respondWithError( + res, + new PieceExecutionError("permanent", `trigger '${body.trigger}' has no run() function`), + log, + { piece: body.piece, trigger: body.trigger }, + ); + return; + } + + const { ctx, dumpStore } = buildTriggerContext({ + auth: body.auth, + propsValue: body.props ?? {}, + state: body.state ?? null, + slug: body.slug ?? "", + }); + + const started = Date.now(); + try { + const result = await withTimeout(trigger.run(ctx), timeoutMs, body.piece, body.trigger); + const items = Array.isArray(result) ? result : result == null ? [] : [result]; + log("info", "piece trigger poll completed", { + piece: body.piece, + trigger: body.trigger, + slug: body.slug, + items: items.length, + duration_ms: Date.now() - started, + }); + writeJson(res, 200, { ok: true, items, state: dumpStore() }); + } catch (err) { + const classified = classifyError(err); + const status = classified.type === "retryable" ? 502 : 500; + log(classified.type === "retryable" ? "warn" : "error", "piece trigger poll failed", { + piece: body.piece, + trigger: body.trigger, + slug: body.slug, + duration_ms: Date.now() - started, + error_type: classified.type, + error_message: classified.message, + }); + writeJson(res, status, { ok: false, error: classified }); + } +} + function respondWithError( res: http.ServerResponse, err: unknown, diff --git a/activepieces/tests/trigger_poll.test.ts b/activepieces/tests/trigger_poll.test.ts new file mode 100644 index 00000000..06d91992 --- /dev/null +++ b/activepieces/tests/trigger_poll.test.ts @@ -0,0 +1,350 @@ +// SPDX-License-Identifier: MIT +// Copyright (c) 2026 Orch8, Inc. +// +// Tests for the polling-trigger op (POST /poll): stub pieces with polling +// triggers, cursor/state round-trips through the seeded store, item shapes, +// error classification, and webhook-strategy rejection. + +import { test } from "node:test"; +import * as assert from "node:assert/strict"; +import { AddressInfo } from "node:net"; +import { createServer } from "../src/server.ts"; +import { createDefaultLoader, Piece, PieceTrigger } from "../src/registry.ts"; +import { buildTriggerContext } from "../src/context.ts"; +import { PieceExecutionError } from "../src/errors.ts"; + +// --------------------------------------------------------------------------- +// Helpers (same pattern as server_routes.test.ts) +// --------------------------------------------------------------------------- + +function pieceWithTriggers(triggers: Record): Piece { + return { + displayName: "fake-with-triggers", + actions: () => ({}), + triggers: () => triggers, + }; +} + +/** + * Stub polling trigger mimicking the AP `pollingHelper` contract: keeps a + * numeric cursor in `ctx.store` under "cursor" and returns the items that + * appeared since. `source` is the fake upstream event log. + */ +function pollingTrigger(name: string, source: Array>): PieceTrigger { + return { + name, + type: "POLLING", + run: async (rawCtx: unknown) => { + const ctx = rawCtx as { + store: { + get: (k: string) => Promise; + put: (k: string, v: unknown) => Promise; + }; + auth: unknown; + propsValue: Record; + }; + const cursor = ((await ctx.store.get("cursor")) as number | null) ?? 0; + const fresh = source.slice(cursor); + await ctx.store.put("cursor", source.length); + return fresh; + }, + }; +} + +async function withServer( + registerPiece: (loader: ReturnType) => void, + fn: (baseUrl: string) => Promise, +): Promise { + const loader = createDefaultLoader(); + registerPiece(loader); + const server = createServer({ + port: 0, + host: "127.0.0.1", + loader, + requestTimeoutMs: 5_000, + log: () => {}, + }); + await new Promise((resolve) => server.listen(0, "127.0.0.1", resolve)); + const addr = server.address() as AddressInfo; + const baseUrl = `http://127.0.0.1:${addr.port}`; + try { + return await fn(baseUrl); + } finally { + await new Promise((resolve) => server.close(() => resolve())); + } +} + +async function post(url: string, body: unknown): Promise<{ status: number; body: any }> { + const res = await fetch(url, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify(body), + }); + const text = await res.text(); + return { status: res.status, body: text ? JSON.parse(text) : null }; +} + +// --------------------------------------------------------------------------- +// buildTriggerContext +// --------------------------------------------------------------------------- + +test("buildTriggerContext seeds the store from state and dumps mutations", async () => { + const { ctx, dumpStore } = buildTriggerContext({ + auth: { token: "t" }, + propsValue: { a: 1 }, + state: { cursor: 41, seen: ["x"] }, + slug: "my-trigger", + }); + const store = (ctx as any).store; + assert.equal(await store.get("cursor"), 41); + assert.deepEqual(await store.get("seen"), ["x"]); + assert.equal(await store.get("missing"), null); + + await store.put("cursor", 42); + await store.delete("seen"); + assert.deepEqual(dumpStore(), { cursor: 42 }); +}); + +test("buildTriggerContext with null state starts empty", async () => { + const { ctx, dumpStore } = buildTriggerContext({ + auth: null, + propsValue: {}, + state: null, + slug: "s", + }); + assert.equal(await (ctx as any).store.get("anything"), null); + assert.deepEqual(dumpStore(), {}); + // Webhook members exist but are empty. + assert.deepEqual((ctx as any).payload, {}); + assert.equal((ctx as any).webhookUrl, ""); +}); + +// --------------------------------------------------------------------------- +// POST /poll happy paths +// --------------------------------------------------------------------------- + +test("POST /poll returns items and new state from a stub polling trigger", async () => { + const source = [{ id: "evt_1" }, { id: "evt_2" }]; + await withServer( + (loader) => + loader.cache("faketrigger", pieceWithTriggers({ new_event: pollingTrigger("new_event", source) })), + async (base) => { + const { status, body } = await post(`${base}/poll`, { + piece: "faketrigger", + trigger: "new_event", + auth: { key: "k" }, + props: {}, + state: null, + slug: "reg-1", + }); + assert.equal(status, 200); + assert.equal(body.ok, true); + assert.deepEqual(body.items, [{ id: "evt_1" }, { id: "evt_2" }]); + assert.deepEqual(body.state, { cursor: 2 }); + }, + ); +}); + +test("POST /poll round-trips the cursor: second poll only sees new items", async () => { + const source = [{ id: "evt_1" }, { id: "evt_2" }]; + await withServer( + (loader) => + loader.cache("faketrigger", pieceWithTriggers({ new_event: pollingTrigger("new_event", source) })), + async (base) => { + const first = await post(`${base}/poll`, { + piece: "faketrigger", + trigger: "new_event", + state: null, + }); + assert.equal(first.body.items.length, 2); + + // Upstream produces one more event; replay the returned state. + source.push({ id: "evt_3" }); + const second = await post(`${base}/poll`, { + piece: "faketrigger", + trigger: "new_event", + state: first.body.state, + }); + assert.equal(second.status, 200); + assert.deepEqual(second.body.items, [{ id: "evt_3" }]); + assert.deepEqual(second.body.state, { cursor: 3 }); + + // Replaying the same state again yields no items (dedupe). + const third = await post(`${base}/poll`, { + piece: "faketrigger", + trigger: "new_event", + state: second.body.state, + }); + assert.deepEqual(third.body.items, []); + }, + ); +}); + +test("POST /poll passes auth and props through to the trigger context", async () => { + let seen: { auth: unknown; props: unknown } | null = null; + const trig: PieceTrigger = { + name: "spy", + type: "POLLING", + run: async (rawCtx: unknown) => { + const ctx = rawCtx as { auth: unknown; propsValue: unknown }; + seen = { auth: ctx.auth, props: ctx.propsValue }; + return []; + }, + }; + await withServer( + (loader) => loader.cache("spy-piece", pieceWithTriggers({ spy: trig })), + async (base) => { + const { status } = await post(`${base}/poll`, { + piece: "spy-piece", + trigger: "spy", + auth: { access_token: "xoxb" }, + props: { channel: "C1" }, + }); + assert.equal(status, 200); + assert.deepEqual(seen, { auth: { access_token: "xoxb" }, props: { channel: "C1" } }); + }, + ); +}); + +test("POST /poll wraps a single non-array result into one item", async () => { + const trig: PieceTrigger = { + name: "single", + type: "POLLING", + run: async () => ({ id: "only" }), + }; + await withServer( + (loader) => loader.cache("single-piece", pieceWithTriggers({ single: trig })), + async (base) => { + const { body } = await post(`${base}/poll`, { piece: "single-piece", trigger: "single" }); + assert.deepEqual(body.items, [{ id: "only" }]); + }, + ); +}); + +test("POST /poll finds array-shaped triggers by name", async () => { + const piece: Piece = { + actions: () => ({}), + triggers: () => [pollingTrigger("arr_trigger", [{ n: 1 }])], + }; + await withServer( + (loader) => loader.cache("arr-piece", piece), + async (base) => { + const { status, body } = await post(`${base}/poll`, { + piece: "arr-piece", + trigger: "arr_trigger", + }); + assert.equal(status, 200); + assert.deepEqual(body.items, [{ n: 1 }]); + }, + ); +}); + +// --------------------------------------------------------------------------- +// POST /poll failures +// --------------------------------------------------------------------------- + +test("POST /poll 422 on missing piece/trigger fields", async () => { + await withServer( + () => {}, + async (base) => { + const r1 = await post(`${base}/poll`, { piece: "x" }); + assert.equal(r1.status, 422); + assert.equal(r1.body.error.type, "permanent"); + + const r2 = await post(`${base}/poll`, { trigger: "t" }); + assert.equal(r2.status, 422); + + const r3 = await post(`${base}/poll`, { piece: "x", trigger: "t", state: [1, 2] }); + assert.equal(r3.status, 422); + assert.match(r3.body.error.message, /state/); + }, + ); +}); + +test("POST /poll 422 on unknown piece or trigger", async () => { + await withServer( + (loader) => loader.cache("known", pieceWithTriggers({})), + async (base) => { + const missingPiece = await post(`${base}/poll`, { piece: "ghost", trigger: "t" }); + assert.equal(missingPiece.status, 422); + assert.match(missingPiece.body.error.message, /not installed|invalid piece/); + + const missingTrigger = await post(`${base}/poll`, { piece: "known", trigger: "ghost" }); + assert.equal(missingTrigger.status, 422); + assert.match(missingTrigger.body.error.message, /trigger 'ghost' not found/); + }, + ); +}); + +test("POST /poll rejects webhook-strategy triggers with a permanent error", async () => { + const trig: PieceTrigger = { + name: "hooked", + type: "WEBHOOK", + run: async () => [], + }; + await withServer( + (loader) => loader.cache("hook-piece", pieceWithTriggers({ hooked: trig })), + async (base) => { + const { status, body } = await post(`${base}/poll`, { piece: "hook-piece", trigger: "hooked" }); + assert.equal(status, 422); + assert.equal(body.error.type, "permanent"); + assert.match(body.error.message, /webhook strategy/); + }, + ); +}); + +test("POST /poll classifies trigger run errors like /execute does", async () => { + const retryable: PieceTrigger = { + name: "flaky", + type: "POLLING", + run: async () => { + throw new PieceExecutionError("retryable", "upstream 503"); + }, + }; + const permanent: PieceTrigger = { + name: "broken", + type: "POLLING", + run: async () => { + throw new TypeError("piece bug"); + }, + }; + await withServer( + (loader) => loader.cache("errs", pieceWithTriggers({ flaky: retryable, broken: permanent })), + async (base) => { + const r = await post(`${base}/poll`, { piece: "errs", trigger: "flaky" }); + assert.equal(r.status, 502); + assert.equal(r.body.error.type, "retryable"); + assert.match(r.body.error.message, /upstream 503/); + + const p = await post(`${base}/poll`, { piece: "errs", trigger: "broken" }); + assert.equal(p.status, 500); + assert.equal(p.body.error.type, "permanent"); + }, + ); +}); + +test("POST /poll trigger state survives a run that throws (engine keeps old cursor)", async () => { + // The sidecar returns no state on failure — the engine keeps its persisted + // cursor. This test just pins the failure envelope shape: no items/state. + const trig: PieceTrigger = { + name: "boom", + type: "POLLING", + run: async () => { + throw new PieceExecutionError("retryable", "kaput"); + }, + }; + await withServer( + (loader) => loader.cache("boom-piece", pieceWithTriggers({ boom: trig })), + async (base) => { + const { status, body } = await post(`${base}/poll`, { + piece: "boom-piece", + trigger: "boom", + state: { cursor: 7 }, + }); + assert.equal(status, 502); + assert.equal(body.ok, false); + assert.equal(body.items, undefined); + assert.equal(body.state, undefined); + }, + ); +}); diff --git a/dashboard/src/api.ts b/dashboard/src/api.ts index 7e0ef8e1..c4c34236 100644 --- a/dashboard/src/api.ts +++ b/dashboard/src/api.ts @@ -426,7 +426,12 @@ export function deleteCronSchedule(id: string, signal?: AbortSignal): Promise serde_json::Value { + json!({ + "slug": slug, + "sequence_name": "payment-recovery", + "tenant_id": tenant, + "trigger_type": "activepieces_poll", + "config": { + "piece": "stripe", + "trigger": "new_failed_payment", + "auth": "credentials://stripe-prod", + "props": {"account": "acct_1"}, + "interval_secs": 30 + } + }) +} + +#[tokio::test] +async fn create_and_get_activepieces_poll_trigger_round_trip() { + let srv = spawn_test_server().await; + let client = reqwest::Client::new(); + + let resp = client + .post(format!("{}/triggers", srv.v1_url())) + .header("X-Tenant-Id", "t1") + .json(&poll_trigger_body("stripe-failed", "t1")) + .send() + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::CREATED); + let created: serde_json::Value = resp.json().await.unwrap(); + assert_eq!(created["trigger_type"], "activepieces_poll"); + assert_eq!(created["config"]["piece"], "stripe"); + + let resp = client + .get(format!("{}/triggers/stripe-failed", srv.v1_url())) + .header("X-Tenant-Id", "t1") + .send() + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + let got: serde_json::Value = resp.json().await.unwrap(); + assert_eq!(got["slug"], "stripe-failed"); + assert_eq!(got["trigger_type"], "activepieces_poll"); + assert_eq!(got["config"]["trigger"], "new_failed_payment"); + assert_eq!(got["config"]["interval_secs"], 30); + // Poll-state field is present (null before the first poll). + assert!( + got.as_object().unwrap().contains_key("poll_state"), + "activepieces_poll GET must carry poll_state: {got}" + ); + assert!(got["poll_state"].is_null()); + + // Non-poll triggers do NOT carry the field. + let resp = client + .post(format!("{}/triggers", srv.v1_url())) + .header("X-Tenant-Id", "t1") + .json(&json!({ + "slug": "plain-webhook", + "sequence_name": "seq", + "tenant_id": "t1" + })) + .send() + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::CREATED); + let got: serde_json::Value = client + .get(format!("{}/triggers/plain-webhook", srv.v1_url())) + .header("X-Tenant-Id", "t1") + .send() + .await + .unwrap() + .json() + .await + .unwrap(); + assert!(!got.as_object().unwrap().contains_key("poll_state")); +} + +#[tokio::test] +async fn create_rejects_invalid_poll_configs() { + let srv = spawn_test_server().await; + let client = reqwest::Client::new(); + + let cases = [ + // missing piece + json!({"trigger": "t"}), + // missing trigger + json!({"piece": "stripe"}), + // bad piece charset + json!({"piece": "Not_Valid", "trigger": "t"}), + // zero interval + json!({"piece": "stripe", "trigger": "t", "interval_secs": 0}), + // unparseable cron + json!({"piece": "stripe", "trigger": "t", "cron": "banana"}), + // both schedules + json!({"piece": "stripe", "trigger": "t", "cron": "* * * * *", "interval_secs": 5}), + // not an object + json!(null), + ]; + + for (i, config) in cases.iter().enumerate() { + let resp = client + .post(format!("{}/triggers", srv.v1_url())) + .header("X-Tenant-Id", "t1") + .json(&json!({ + "slug": format!("bad-{i}"), + "sequence_name": "seq", + "tenant_id": "t1", + "trigger_type": "activepieces_poll", + "config": config + })) + .send() + .await + .unwrap(); + assert_eq!( + resp.status(), + StatusCode::BAD_REQUEST, + "config case {i} must be rejected: {config}" + ); + let body: serde_json::Value = resp.json().await.unwrap(); + assert!( + body["error"] + .as_str() + .unwrap_or_default() + .contains("activepieces_poll"), + "error should mention the trigger kind: {body}" + ); + } + + // The same configs are fine for non-poll trigger types (config is opaque there). + let resp = client + .post(format!("{}/triggers", srv.v1_url())) + .header("X-Tenant-Id", "t1") + .json(&json!({ + "slug": "webhook-any-config", + "sequence_name": "seq", + "tenant_id": "t1", + "trigger_type": "webhook", + "config": {"piece": "Not_Valid"} + })) + .send() + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::CREATED); +} + +#[tokio::test] +async fn get_surfaces_poll_state_written_by_the_engine() { + let srv = spawn_test_server().await; + let client = reqwest::Client::new(); + + let resp = client + .post(format!("{}/triggers", srv.v1_url())) + .header("X-Tenant-Id", "t1") + .json(&poll_trigger_body("with-state", "t1")) + .send() + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::CREATED); + + // Simulate the engine's poll loop persisting a cursor + failure info. + let now = chrono::Utc::now(); + srv.storage + .upsert_trigger_poll_state(&TriggerPollState { + slug: "with-state".into(), + state: json!({"lastPoll": 1_717_171_717}), + last_poll_at: Some(now), + last_error: Some("stripe 503".into()), + consecutive_failures: 3, + updated_at: now, + }) + .await + .unwrap(); + + let got: serde_json::Value = client + .get(format!("{}/triggers/with-state", srv.v1_url())) + .header("X-Tenant-Id", "t1") + .send() + .await + .unwrap() + .json() + .await + .unwrap(); + assert_eq!( + got["poll_state"]["state"], + json!({"lastPoll": 1_717_171_717}) + ); + assert_eq!(got["poll_state"]["last_error"], "stripe 503"); + assert_eq!(got["poll_state"]["consecutive_failures"], 3); + assert!(got["poll_state"]["last_poll_at"].is_string()); +} + +#[tokio::test] +async fn tenant_isolation_for_poll_triggers() { + let srv = spawn_test_server().await; + let client = reqwest::Client::new(); + + // Tenant header mismatching body tenant_id → forbidden. + let resp = client + .post(format!("{}/triggers", srv.v1_url())) + .header("X-Tenant-Id", "tenant-b") + .json(&poll_trigger_body("a-poll", "tenant-a")) + .send() + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::FORBIDDEN); + + // Create as tenant-a. + let resp = client + .post(format!("{}/triggers", srv.v1_url())) + .header("X-Tenant-Id", "tenant-a") + .json(&poll_trigger_body("a-poll", "tenant-a")) + .send() + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::CREATED); + + // Cross-tenant GET is a 404 (existence must not leak). + let resp = client + .get(format!("{}/triggers/a-poll", srv.v1_url())) + .header("X-Tenant-Id", "tenant-b") + .send() + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::NOT_FOUND); + + // Cross-tenant DELETE likewise. + let resp = client + .delete(format!("{}/triggers/a-poll", srv.v1_url())) + .header("X-Tenant-Id", "tenant-b") + .send() + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::NOT_FOUND); + + // Owner still sees it. + let resp = client + .get(format!("{}/triggers/a-poll", srv.v1_url())) + .header("X-Tenant-Id", "tenant-a") + .send() + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK); +} + +#[tokio::test] +async fn delete_removes_trigger_and_poll_state() { + let srv = spawn_test_server().await; + let client = reqwest::Client::new(); + + let resp = client + .post(format!("{}/triggers", srv.v1_url())) + .header("X-Tenant-Id", "t1") + .json(&poll_trigger_body("doomed", "t1")) + .send() + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::CREATED); + + let now = chrono::Utc::now(); + srv.storage + .upsert_trigger_poll_state(&TriggerPollState { + slug: "doomed".into(), + state: json!({"cursor": 9}), + last_poll_at: Some(now), + last_error: None, + consecutive_failures: 0, + updated_at: now, + }) + .await + .unwrap(); + + let resp = client + .delete(format!("{}/triggers/doomed", srv.v1_url())) + .header("X-Tenant-Id", "t1") + .send() + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::NO_CONTENT); + + // Trigger gone… + let resp = client + .get(format!("{}/triggers/doomed", srv.v1_url())) + .header("X-Tenant-Id", "t1") + .send() + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::NOT_FOUND); + // …and the poll-state row was cleaned up with it. + let state = srv.storage.get_trigger_poll_state("doomed").await.unwrap(); + assert!(state.is_none(), "delete_trigger must remove poll state"); +} + +#[tokio::test] +async fn list_includes_poll_triggers() { + let srv = spawn_test_server().await; + let client = reqwest::Client::new(); + + let resp = client + .post(format!("{}/triggers", srv.v1_url())) + .header("X-Tenant-Id", "t1") + .json(&poll_trigger_body("listed-poll", "t1")) + .send() + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::CREATED); + + let got: serde_json::Value = client + .get(format!("{}/triggers", srv.v1_url())) + .header("X-Tenant-Id", "t1") + .send() + .await + .unwrap() + .json() + .await + .unwrap(); + let arr = got.as_array().unwrap(); + assert!(arr + .iter() + .any(|t| t["slug"] == "listed-poll" && t["trigger_type"] == "activepieces_poll")); +} diff --git a/orch8-engine/src/ap_poll.rs b/orch8-engine/src/ap_poll.rs new file mode 100644 index 00000000..57477793 --- /dev/null +++ b/orch8-engine/src/ap_poll.rs @@ -0,0 +1,981 @@ +//! `ActivePieces` polling triggers. +//! +//! Triggers of type [`TriggerType::ActivepiecesPoll`] periodically ask the +//! `@orch8/activepieces-worker` Node sidecar to run a piece trigger's poll +//! (Stripe "payment failed", Typeform "new submission", ...). Each item the +//! sidecar returns creates one instance of the trigger's sequence, exactly +//! like `POST /triggers/{slug}/fire` does — the item is the instance's +//! initial `context.data`. +//! +//! # Trigger config shape (`TriggerDef::config`) +//! +//! ```json +//! { +//! "piece": "stripe", +//! "trigger": "new_failed_payment", +//! "auth": "credentials://stripe-prod", +//! "props": { "account": "acct_123" }, +//! "interval_secs": 60 +//! } +//! ``` +//! +//! Either `interval_secs` (default 60) or `cron` (standard 5-field or the +//! `cron` crate's 6/7-field shape, evaluated in UTC) selects the poll +//! schedule — not both. `auth` and `props` may contain `credentials://` +//! references; they are resolved tenant-scoped on every poll so rotated +//! credentials take effect without re-registering the trigger. +//! +//! # Sidecar protocol (`POST /poll`) +//! +//! Request: `{ piece, trigger, auth, props, state, slug }` where `state` is +//! the opaque cursor blob returned by the previous successful poll (`null` +//! on the first poll). Response on success: +//! `{ "ok": true, "items": [...], "state": { ... } }` — `items` are the new +//! events since the cursor, `state` is the next cursor. Failures use the +//! same envelope as `/execute`: +//! `{ "ok": false, "error": { "type", "message", "details" } }`. +//! +//! # Failure semantics +//! +//! A failing poll records `last_error` / increments `consecutive_failures` +//! on the trigger's [`TriggerPollState`] row and retries on the next +//! schedule — the listener never exits on poll errors. The cursor is only +//! advanced after every returned item produced an instance, so delivery is +//! at-least-once: a partial failure re-delivers the whole batch. + +use std::env; +use std::sync::Arc; +use std::sync::LazyLock; +use std::time::Duration; + +use serde::Deserialize; +use serde_json::{json, Value}; +use tokio_util::sync::CancellationToken; +use tracing::{debug, error, info, warn}; + +use orch8_storage::StorageBackend; +use orch8_types::trigger::{TriggerDef, TriggerPollState, TriggerType}; + +const DEFAULT_SIDECAR_POLL_URL: &str = "http://127.0.0.1:50052/poll"; + +/// Default poll cadence when the trigger config sets neither +/// `interval_secs` nor `cron`. +pub const DEFAULT_POLL_INTERVAL_SECS: u64 = 60; + +/// Floor for both `interval_secs` and computed cron gaps — protects the +/// sidecar (and the polled `SaaS` API) from sub-second hot loops. +const MIN_POLL_DELAY: Duration = Duration::from_secs(1); + +/// Shared HTTP client for sidecar poll calls. Mirrors the action handler's +/// client (`handlers::activepieces`): pooled connections, generous ceiling +/// so we don't race the sidecar's own per-piece timeout. +static POLL_CLIENT: LazyLock = LazyLock::new(|| { + reqwest::Client::builder() + .connect_timeout(Duration::from_secs(5)) + .timeout(Duration::from_secs(75)) + .build() + .unwrap_or_else(|e| { + warn!(error = %e, "failed to build activepieces poll HTTP client, using default"); + reqwest::Client::new() + }) +}); + +/// Validated view of an `activepieces_poll` trigger's `config` JSON. +#[derive(Debug, Clone)] +pub struct ApPollConfig { + /// Piece name (`stripe`, `typeform`, ...) — same charset rules as the + /// sidecar's loader: lowercase alphanumerics and hyphens. + pub piece: String, + /// Trigger name on the piece (`new_failed_payment`, ...). + pub trigger: String, + /// Piece auth — passed verbatim to the sidecar after `credentials://` + /// resolution. + pub auth: Value, + /// Trigger props — passed verbatim after `credentials://` resolution. + pub props: Value, + /// Fixed poll interval. Mutually exclusive with `cron`. + pub interval_secs: Option, + /// Cron schedule (UTC). Mutually exclusive with `interval_secs`. + pub cron: Option, +} + +/// Parse and validate the `config` blob of an `activepieces_poll` trigger. +/// +/// # Errors +/// Returns a human-readable message when `config` is not an object, when +/// `piece`/`trigger` are missing, malformed, or empty, when `interval_secs` +/// is zero/negative, when `cron` does not parse, or when both schedule +/// fields are set. +pub fn parse_config(config: &Value) -> Result { + let obj = config + .as_object() + .ok_or_else(|| "config must be a JSON object".to_string())?; + + let piece = obj + .get("piece") + .and_then(Value::as_str) + .ok_or_else(|| "config.piece (string) is required".to_string())?; + if piece.is_empty() + || !piece + .chars() + .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '-') + || piece.starts_with('-') + { + return Err(format!( + "config.piece '{piece}' is invalid: expected lowercase alphanumerics and hyphens" + )); + } + + let trigger = obj + .get("trigger") + .and_then(Value::as_str) + .ok_or_else(|| "config.trigger (string) is required".to_string())?; + if trigger.is_empty() { + return Err("config.trigger must be non-empty".to_string()); + } + + let interval_secs = match obj.get("interval_secs") { + None | Some(Value::Null) => None, + Some(v) => { + let n = v + .as_u64() + .filter(|n| *n >= 1) + .ok_or_else(|| "config.interval_secs must be a positive integer".to_string())?; + Some(n) + } + }; + + let cron = match obj.get("cron") { + None | Some(Value::Null) => None, + Some(v) => { + let expr = v + .as_str() + .ok_or_else(|| "config.cron must be a string".to_string())?; + crate::cron::validate_cron_expr(expr) + .map_err(|e| format!("config.cron is invalid: {e}"))?; + Some(expr.to_string()) + } + }; + + if interval_secs.is_some() && cron.is_some() { + return Err("config must set either interval_secs or cron, not both".to_string()); + } + + Ok(ApPollConfig { + piece: piece.to_string(), + trigger: trigger.to_string(), + auth: obj.get("auth").cloned().unwrap_or(Value::Null), + props: obj.get("props").cloned().unwrap_or_else(|| json!({})), + interval_secs, + cron, + }) +} + +/// Derive the sidecar poll URL from the configured execute URL. The +/// `ORCH8_ACTIVEPIECES_URL` env var traditionally points at `/execute`; +/// the poll endpoint lives next to it. +fn derive_poll_url(execute_url: &str) -> String { + execute_url.strip_suffix("/execute").map_or_else( + || format!("{}/poll", execute_url.trim_end_matches('/')), + |base| format!("{base}/poll"), + ) +} + +/// Sidecar poll URL resolved once at first use, mirroring +/// `handlers::activepieces::AP_URL`. +static POLL_URL: LazyLock = LazyLock::new(|| { + env::var("ORCH8_ACTIVEPIECES_URL").map_or_else( + |_| DEFAULT_SIDECAR_POLL_URL.into(), + |url| derive_poll_url(&url), + ) +}); + +/// Run a poll listener for one `activepieces_poll` trigger until cancelled. +/// Sidecar URL is taken from `ORCH8_ACTIVEPIECES_URL` (default +/// `http://127.0.0.1:50052/execute` → poll endpoint `/poll`). +pub async fn run_ap_poll_listener( + storage: Arc, + trigger: TriggerDef, + cancel: CancellationToken, +) { + run_ap_poll_listener_with_url(storage, trigger, POLL_URL.clone(), cancel).await; +} + +/// Like [`run_ap_poll_listener`] but with an explicit sidecar poll URL — +/// the testable entry point. +pub async fn run_ap_poll_listener_with_url( + storage: Arc, + trigger: TriggerDef, + url: String, + cancel: CancellationToken, +) { + debug_assert_eq!(trigger.trigger_type, TriggerType::ActivepiecesPoll); + + let config = match parse_config(&trigger.config) { + Ok(c) => c, + Err(e) => { + // Invalid configs are rejected at the API layer; reaching this + // point means a row was edited out-of-band. Record the problem + // where GET /triggers/{slug} surfaces it, then park until the + // definition changes (the trigger sync loop restarts us). + error!(slug = %trigger.slug, error = %e, "activepieces poll trigger has invalid config"); + record_poll_failure( + storage.as_ref(), + &trigger.slug, + &format!("invalid config: {e}"), + ) + .await; + cancel.cancelled().await; + return; + } + }; + + info!( + slug = %trigger.slug, + piece = %config.piece, + trigger_name = %config.trigger, + url = %url, + "starting activepieces poll listener" + ); + + loop { + match poll_once(storage.as_ref(), &trigger, &config, &url).await { + Ok(0) => {} + Ok(n) => { + info!( + slug = %trigger.slug, + items = n, + "activepieces poll created instances" + ); + } + Err(e) => { + warn!(slug = %trigger.slug, error = %e, "activepieces poll failed"); + crate::metrics::inc(crate::metrics::AP_POLL_ERRORS); + record_poll_failure(storage.as_ref(), &trigger.slug, &e).await; + } + } + + let delay = next_poll_delay(&config); + tokio::select! { + () = cancel.cancelled() => { + info!(slug = %trigger.slug, "activepieces poll listener shutting down"); + return; + } + () = tokio::time::sleep(delay) => {} + } + } +} + +/// Time until the next poll: cron expression (UTC) if configured, otherwise +/// the fixed interval (default [`DEFAULT_POLL_INTERVAL_SECS`]). Clamped to +/// [`MIN_POLL_DELAY`] so a misconfigured schedule can't hot-loop. +fn next_poll_delay(config: &ApPollConfig) -> Duration { + if let Some(expr) = &config.cron { + if let Some(next) = crate::cron::next_fire_from_expr(expr) { + let delta = next - chrono::Utc::now(); + let delay = delta.to_std().unwrap_or(MIN_POLL_DELAY); + return delay.max(MIN_POLL_DELAY); + } + // Validated at registration, so this means "no future fire" — + // fall back to the default interval rather than spinning. + warn!(cron = %expr, "cron expression has no upcoming fire, using default interval"); + } + Duration::from_secs(config.interval_secs.unwrap_or(DEFAULT_POLL_INTERVAL_SECS)) + .max(MIN_POLL_DELAY) +} + +/// Sidecar `/poll` response envelope. +#[derive(Debug, Deserialize)] +struct PollResponse { + ok: bool, + #[serde(default)] + items: Option>, + #[serde(default)] + state: Option, + #[serde(default)] + error: Option, +} + +#[derive(Debug, Deserialize)] +struct PollError { + #[serde(rename = "type")] + #[allow(dead_code)] // parsed for forward-compat with the /execute envelope + kind: Option, + message: String, +} + +/// Execute one poll round-trip: load the cursor, resolve credentials, call +/// the sidecar, create one instance per returned item, persist the new +/// cursor. Returns the number of instances created. +async fn poll_once( + storage: &dyn StorageBackend, + trigger: &TriggerDef, + config: &ApPollConfig, + url: &str, +) -> Result { + let prev = storage + .get_trigger_poll_state(&trigger.slug) + .await + .map_err(|e| format!("failed to load poll state: {e}"))? + .unwrap_or_else(|| TriggerPollState::empty(trigger.slug.clone())); + + // Resolve credentials:// references fresh on every poll so credential + // rotation takes effect without re-registering the trigger. + let mut auth = config.auth.clone(); + let mut props = config.props.clone(); + crate::credentials::resolve_in_value(storage, trigger.tenant_id.as_str(), &mut auth) + .await + .map_err(|e| format!("auth credential resolution failed: {e}"))?; + crate::credentials::resolve_in_value(storage, trigger.tenant_id.as_str(), &mut props) + .await + .map_err(|e| format!("props credential resolution failed: {e}"))?; + + let body = json!({ + "piece": config.piece, + "trigger": config.trigger, + "auth": auth, + "props": props, + "state": prev.state, + "slug": trigger.slug, + }); + + debug!( + slug = %trigger.slug, + piece = %config.piece, + trigger_name = %config.trigger, + url = %url, + "polling activepieces sidecar" + ); + + let response = POLL_CLIENT + .post(url) + .header("content-type", "application/json") + .json(&body) + .send() + .await + .map_err(|e| format!("sidecar unreachable at {url}: {e}"))?; + + let status = response.status().as_u16(); + let text = response + .text() + .await + .map_err(|e| format!("failed to read sidecar response: {e}"))?; + + let parsed: PollResponse = serde_json::from_str(&text) + .map_err(|_| format!("sidecar returned HTTP {status} with non-envelope body"))?; + + if !parsed.ok { + let msg = parsed + .error + .map_or_else(|| format!("sidecar error (HTTP {status})"), |e| e.message); + return Err(msg); + } + + let items = parsed.items.unwrap_or_default(); + let event_meta = json!({ + "source": "activepieces_poll", + "piece": config.piece, + "trigger": config.trigger, + }); + + for item in &items { + crate::triggers::create_trigger_instance( + storage, + trigger, + item.clone(), + event_meta.clone(), + None, + ) + .await + .map_err(|e| format!("failed to create instance for polled item: {e}"))?; + } + crate::metrics::inc_by(crate::metrics::AP_POLL_ITEMS, items.len() as u64); + + let now = chrono::Utc::now(); + storage + .upsert_trigger_poll_state(&TriggerPollState { + slug: trigger.slug.clone(), + // Keep the previous cursor when the sidecar omits `state`. + state: parsed.state.unwrap_or(prev.state), + last_poll_at: Some(now), + last_error: None, + consecutive_failures: 0, + updated_at: now, + }) + .await + .map_err(|e| format!("failed to persist poll state: {e}"))?; + + Ok(items.len()) +} + +/// Record a failed poll on the trigger's state row: keep the cursor, +/// remember the error, bump the consecutive-failure counter. Best-effort — +/// a storage error here is logged, never propagated (the loop must go on). +async fn record_poll_failure(storage: &dyn StorageBackend, slug: &str, message: &str) { + let prev = match storage.get_trigger_poll_state(slug).await { + Ok(state) => state.unwrap_or_else(|| TriggerPollState::empty(slug)), + Err(e) => { + error!(slug, error = %e, "failed to load poll state while recording failure"); + TriggerPollState::empty(slug) + } + }; + let now = chrono::Utc::now(); + let next = TriggerPollState { + slug: prev.slug, + state: prev.state, + last_poll_at: Some(now), + last_error: Some(message.to_string()), + consecutive_failures: prev.consecutive_failures.saturating_add(1), + updated_at: now, + }; + if let Err(e) = storage.upsert_trigger_poll_state(&next).await { + error!(slug, error = %e, "failed to persist poll failure state"); + } +} + +#[cfg(test)] +mod tests { + use std::sync::atomic::{AtomicUsize, Ordering}; + + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + use tokio::net::TcpListener; + + use orch8_storage::sqlite::SqliteStorage; + use orch8_storage::{AdminStore, InstanceStore, SequenceStore}; + use orch8_types::ids::{Namespace, SequenceId, TenantId}; + use orch8_types::instance::InstanceState; + use orch8_types::sequence::{SequenceDefinition, SequenceStatus}; + + use super::*; + + // ── config parsing ─────────────────────────────────────────────────── + + #[test] + fn parse_config_minimal() { + let c = parse_config(&json!({"piece": "stripe", "trigger": "new_payment"})).unwrap(); + assert_eq!(c.piece, "stripe"); + assert_eq!(c.trigger, "new_payment"); + assert!(c.auth.is_null()); + assert_eq!(c.props, json!({})); + assert!(c.interval_secs.is_none()); + assert!(c.cron.is_none()); + } + + #[test] + fn parse_config_full() { + let c = parse_config(&json!({ + "piece": "google-sheets", + "trigger": "new_row", + "auth": "credentials://sheets", + "props": {"sheet": "s1"}, + "interval_secs": 30, + })) + .unwrap(); + assert_eq!(c.piece, "google-sheets"); + assert_eq!(c.interval_secs, Some(30)); + assert_eq!(c.auth, json!("credentials://sheets")); + assert_eq!(c.props, json!({"sheet": "s1"})); + } + + #[test] + fn parse_config_rejects_bad_shapes() { + assert!(parse_config(&Value::Null).is_err()); + assert!(parse_config(&json!({"trigger": "t"})).is_err()); + assert!(parse_config(&json!({"piece": "p"})).is_err()); + assert!(parse_config(&json!({"piece": "", "trigger": "t"})).is_err()); + assert!(parse_config(&json!({"piece": "Bad_Name", "trigger": "t"})).is_err()); + assert!(parse_config(&json!({"piece": "-bad", "trigger": "t"})).is_err()); + assert!(parse_config(&json!({"piece": "p", "trigger": ""})).is_err()); + assert!(parse_config(&json!({"piece": "p", "trigger": "t", "interval_secs": 0})).is_err()); + assert!(parse_config(&json!({"piece": "p", "trigger": "t", "interval_secs": -5})).is_err()); + assert!(parse_config(&json!({"piece": "p", "trigger": "t", "cron": "nonsense"})).is_err()); + assert!(parse_config( + &json!({"piece": "p", "trigger": "t", "cron": "* * * * *", "interval_secs": 5}) + ) + .is_err()); + } + + #[test] + fn parse_config_accepts_cron() { + let c = + parse_config(&json!({"piece": "p", "trigger": "t", "cron": "*/5 * * * *"})).unwrap(); + assert_eq!(c.cron.as_deref(), Some("*/5 * * * *")); + assert!(c.interval_secs.is_none()); + } + + #[test] + fn derive_poll_url_variants() { + assert_eq!( + derive_poll_url("http://127.0.0.1:50052/execute"), + "http://127.0.0.1:50052/poll" + ); + assert_eq!( + derive_poll_url("http://ap.internal:9999"), + "http://ap.internal:9999/poll" + ); + assert_eq!( + derive_poll_url("http://ap.internal:9999/"), + "http://ap.internal:9999/poll" + ); + } + + #[test] + fn next_poll_delay_interval_and_default() { + let mut c = parse_config(&json!({"piece": "p", "trigger": "t"})).unwrap(); + assert_eq!( + next_poll_delay(&c), + Duration::from_secs(DEFAULT_POLL_INTERVAL_SECS) + ); + c.interval_secs = Some(7); + assert_eq!(next_poll_delay(&c), Duration::from_secs(7)); + } + + #[test] + fn next_poll_delay_cron_is_bounded() { + let c = + parse_config(&json!({"piece": "p", "trigger": "t", "cron": "* * * * * * *"})).unwrap(); + let d = next_poll_delay(&c); + assert!(d >= MIN_POLL_DELAY); + assert!(d <= Duration::from_secs(2), "every-second cron, got {d:?}"); + } + + // ── poll round-trips against a mock sidecar ────────────────────────── + + /// Read one HTTP/1.1 request (headers + Content-Length body) from `stream`. + async fn read_request(stream: &mut tokio::net::TcpStream) -> Vec { + let mut buf = Vec::with_capacity(1024); + let mut tmp = [0u8; 1024]; + let mut header_end = None; + loop { + let n = stream.read(&mut tmp).await.unwrap_or(0); + if n == 0 { + break; + } + buf.extend_from_slice(&tmp[..n]); + if header_end.is_none() { + if let Some(pos) = buf.windows(4).position(|w| w == b"\r\n\r\n") { + header_end = Some(pos + 4); + } + } + if let Some(end) = header_end { + let headers = std::str::from_utf8(&buf[..end]).unwrap_or(""); + let cl: usize = headers + .lines() + .find_map(|l| { + let l = l.to_ascii_lowercase(); + l.strip_prefix("content-length:") + .map(|v| v.trim().parse::().unwrap_or(0)) + }) + .unwrap_or(0); + if buf.len() >= end + cl { + break; + } + } + } + buf + } + + /// Spawn a mock sidecar that answers each request with the next response + /// in `responses` (status, JSON body), recording received request bodies. + /// Once responses run out, the last one repeats. + async fn spawn_mock_sidecar( + responses: Vec<(u16, String)>, + ) -> ( + String, + Arc>>, + Arc, + ) { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let port = listener.local_addr().unwrap().port(); + let url = format!("http://127.0.0.1:{port}/poll"); + let bodies = Arc::new(tokio::sync::Mutex::new(Vec::::new())); + let hits = Arc::new(AtomicUsize::new(0)); + let bodies_srv = bodies.clone(); + let hits_srv = hits.clone(); + + tokio::spawn(async move { + loop { + let Ok((mut stream, _)) = listener.accept().await else { + break; + }; + let i = hits_srv.fetch_add(1, Ordering::SeqCst); + let (status, body) = responses + .get(i.min(responses.len().saturating_sub(1))) + .cloned() + .unwrap_or((500, "{}".into())); + let req = read_request(&mut stream).await; + if let Some(pos) = req.windows(4).position(|w| w == b"\r\n\r\n") { + if let Ok(v) = serde_json::from_slice::(&req[pos + 4..]) { + bodies_srv.lock().await.push(v); + } + } + let reason = if status == 200 { "OK" } else { "Error" }; + let resp = format!( + "HTTP/1.1 {status} {reason}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{body}", + body.len(), + ); + let _ = stream.write_all(resp.as_bytes()).await; + let _ = stream.shutdown().await; + } + }); + + (url, bodies, hits) + } + + async fn seed(storage: &SqliteStorage, seq_name: &str) -> SequenceId { + let id = SequenceId::new(); + storage + .create_sequence(&SequenceDefinition { + id, + tenant_id: TenantId::unchecked("t1"), + namespace: Namespace::new("default"), + name: seq_name.into(), + version: 1, + deprecated: false, + status: SequenceStatus::default(), + blocks: vec![], + interceptors: None, + created_at: chrono::Utc::now(), + }) + .await + .unwrap(); + id + } + + async fn list_t1_instances( + storage: &SqliteStorage, + ) -> Vec { + storage + .list_instances( + &orch8_types::filter::InstanceFilter { + tenant_id: Some(TenantId::unchecked("t1")), + ..Default::default() + }, + &orch8_types::filter::Pagination::default(), + ) + .await + .unwrap() + } + + fn mk_poll_trigger(slug: &str, seq_name: &str, config: Value) -> TriggerDef { + let now = chrono::Utc::now(); + TriggerDef { + slug: slug.into(), + sequence_name: seq_name.into(), + version: None, + tenant_id: TenantId::unchecked("t1"), + namespace: "default".into(), + enabled: true, + secret: None, + trigger_type: TriggerType::ActivepiecesPoll, + config, + created_at: now, + updated_at: now, + } + } + + #[tokio::test] + async fn poll_once_creates_one_instance_per_item_and_persists_state() { + let storage = SqliteStorage::in_memory().await.unwrap(); + let seq_id = seed(&storage, "on-payment").await; + let trigger = mk_poll_trigger( + "stripe-poll", + "on-payment", + json!({"piece": "stripe", "trigger": "failed_payment"}), + ); + let config = parse_config(&trigger.config).unwrap(); + + let (url, bodies, _) = spawn_mock_sidecar(vec![( + 200, + json!({ + "ok": true, + "items": [{"id": "pi_1", "amount": 100}, {"id": "pi_2", "amount": 250}], + "state": {"lastPoll": 1234}, + }) + .to_string(), + )]) + .await; + + let n = poll_once(&storage, &trigger, &config, &url).await.unwrap(); + assert_eq!(n, 2); + + // Two instances, each carrying one item as context.data. + let instances = list_t1_instances(&storage).await; + assert_eq!(instances.len(), 2); + let mut ids: Vec = instances + .iter() + .map(|i| i.context.data["id"].as_str().unwrap().to_string()) + .collect(); + ids.sort(); + assert_eq!(ids, vec!["pi_1", "pi_2"]); + for inst in &instances { + assert_eq!(inst.sequence_id, seq_id); + assert_eq!(inst.state, InstanceState::Scheduled); + assert_eq!(inst.metadata["_trigger"], "stripe-poll"); + assert_eq!(inst.metadata["_trigger_type"], "activepieces_poll"); + assert_eq!(inst.metadata["_trigger_event"]["piece"], "stripe"); + } + + // First poll sent a null cursor; new cursor persisted afterwards. + let sent = bodies.lock().await; + assert_eq!(sent[0]["piece"], "stripe"); + assert_eq!(sent[0]["trigger"], "failed_payment"); + assert!(sent[0]["state"].is_null()); + + let state = storage + .get_trigger_poll_state("stripe-poll") + .await + .unwrap() + .expect("state row persisted"); + assert_eq!(state.state, json!({"lastPoll": 1234})); + assert!(state.last_error.is_none()); + assert_eq!(state.consecutive_failures, 0); + assert!(state.last_poll_at.is_some()); + } + + #[tokio::test] + async fn poll_once_sends_persisted_cursor_on_next_poll() { + let storage = SqliteStorage::in_memory().await.unwrap(); + seed(&storage, "seq").await; + let trigger = mk_poll_trigger("p", "seq", json!({"piece": "x", "trigger": "t"})); + let config = parse_config(&trigger.config).unwrap(); + + let (url, bodies, _) = spawn_mock_sidecar(vec![ + ( + 200, + json!({"ok": true, "items": [], "state": {"cursor": "abc"}}).to_string(), + ), + ( + 200, + json!({"ok": true, "items": [], "state": {"cursor": "def"}}).to_string(), + ), + ]) + .await; + + poll_once(&storage, &trigger, &config, &url).await.unwrap(); + poll_once(&storage, &trigger, &config, &url).await.unwrap(); + + let sent = bodies.lock().await; + assert!(sent[0]["state"].is_null(), "first poll has no cursor"); + assert_eq!( + sent[1]["state"], + json!({"cursor": "abc"}), + "second poll must send the cursor persisted by the first" + ); + let state = storage.get_trigger_poll_state("p").await.unwrap().unwrap(); + assert_eq!(state.state, json!({"cursor": "def"})); + } + + #[tokio::test] + async fn poll_error_records_last_error_and_keeps_cursor() { + let storage = SqliteStorage::in_memory().await.unwrap(); + seed(&storage, "seq").await; + let trigger = mk_poll_trigger("p", "seq", json!({"piece": "x", "trigger": "t"})); + let config = parse_config(&trigger.config).unwrap(); + + let (url, _, _) = spawn_mock_sidecar(vec![ + ( + 200, + json!({"ok": true, "items": [], "state": {"cursor": 1}}).to_string(), + ), + ( + 502, + json!({"ok": false, "error": {"type": "retryable", "message": "stripe 503"}}) + .to_string(), + ), + ]) + .await; + + poll_once(&storage, &trigger, &config, &url).await.unwrap(); + let err = poll_once(&storage, &trigger, &config, &url) + .await + .unwrap_err(); + assert!(err.contains("stripe 503"), "got: {err}"); + record_poll_failure(&storage, &trigger.slug, &err).await; + + let state = storage.get_trigger_poll_state("p").await.unwrap().unwrap(); + assert_eq!(state.consecutive_failures, 1); + assert_eq!(state.last_error.as_deref(), Some("stripe 503")); + assert_eq!( + state.state, + json!({"cursor": 1}), + "cursor must survive failures" + ); + + // Another failure bumps the counter. + record_poll_failure(&storage, &trigger.slug, "boom").await; + let state = storage.get_trigger_poll_state("p").await.unwrap().unwrap(); + assert_eq!(state.consecutive_failures, 2); + assert_eq!(state.last_error.as_deref(), Some("boom")); + } + + #[tokio::test] + async fn poll_success_resets_failure_counter() { + let storage = SqliteStorage::in_memory().await.unwrap(); + seed(&storage, "seq").await; + let trigger = mk_poll_trigger("p", "seq", json!({"piece": "x", "trigger": "t"})); + let config = parse_config(&trigger.config).unwrap(); + + record_poll_failure(&storage, "p", "first failure").await; + record_poll_failure(&storage, "p", "second failure").await; + + let (url, _, _) = spawn_mock_sidecar(vec![( + 200, + json!({"ok": true, "items": [], "state": null}).to_string(), + )]) + .await; + poll_once(&storage, &trigger, &config, &url).await.unwrap(); + + let state = storage.get_trigger_poll_state("p").await.unwrap().unwrap(); + assert_eq!(state.consecutive_failures, 0); + assert!(state.last_error.is_none()); + } + + #[tokio::test] + async fn poll_once_resolves_credentials_in_auth() { + use orch8_types::credential::{CredentialDef, CredentialKind}; + + let storage = SqliteStorage::in_memory().await.unwrap(); + seed(&storage, "seq").await; + let now = chrono::Utc::now(); + orch8_storage::AdminStore::create_credential( + &storage, + &CredentialDef { + id: "stripe-key".into(), + tenant_id: "t1".into(), + name: "stripe-key".into(), + kind: CredentialKind::ApiKey, + value: orch8_types::config::SecretString::new( + r#"{"access_token": "sk_test_123"}"#.into(), + ), + expires_at: None, + refresh_url: None, + refresh_token: None, + enabled: true, + description: None, + created_at: now, + updated_at: now, + }, + ) + .await + .unwrap(); + + let trigger = mk_poll_trigger( + "p", + "seq", + json!({"piece": "stripe", "trigger": "t", "auth": "credentials://stripe-key"}), + ); + let config = parse_config(&trigger.config).unwrap(); + + let (url, bodies, _) = spawn_mock_sidecar(vec![( + 200, + json!({"ok": true, "items": [], "state": null}).to_string(), + )]) + .await; + poll_once(&storage, &trigger, &config, &url).await.unwrap(); + + let sent = bodies.lock().await; + assert_eq!( + sent[0]["auth"], + json!({"access_token": "sk_test_123"}), + "credentials:// reference must be resolved before hitting the sidecar" + ); + } + + #[tokio::test] + async fn listener_loop_polls_and_survives_errors_until_cancelled() { + let storage = Arc::new(SqliteStorage::in_memory().await.unwrap()); + seed(&storage, "seq").await; + let trigger = mk_poll_trigger( + "loop-trigger", + "seq", + json!({"piece": "x", "trigger": "t", "interval_secs": 1}), + ); + + // First response errors, second succeeds with one item — the loop + // must keep going after the failure. + let (url, _, hits) = spawn_mock_sidecar(vec![ + ( + 500, + json!({"ok": false, "error": {"message": "kaput"}}).to_string(), + ), + ( + 200, + json!({"ok": true, "items": [{"n": 1}], "state": {"c": 2}}).to_string(), + ), + ]) + .await; + + let cancel = CancellationToken::new(); + let handle = tokio::spawn(run_ap_poll_listener_with_url( + Arc::clone(&storage) as Arc, + trigger, + url.clone(), + cancel.clone(), + )); + + // Wait until at least two polls happened (error + success). + for _ in 0..100 { + if hits.load(Ordering::SeqCst) >= 2 { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + cancel.cancel(); + handle.await.unwrap(); + + assert!( + hits.load(Ordering::SeqCst) >= 2, + "loop must poll repeatedly" + ); + let instances = list_t1_instances(&storage).await; + assert_eq!( + instances.len(), + 1, + "second (successful) poll created the instance" + ); + let state = storage + .get_trigger_poll_state("loop-trigger") + .await + .unwrap() + .unwrap(); + assert_eq!(state.state, json!({"c": 2})); + assert_eq!( + state.consecutive_failures, 0, + "success after failure resets counter" + ); + } + + #[tokio::test] + async fn listener_with_invalid_config_records_error_and_parks() { + let storage = Arc::new(SqliteStorage::in_memory().await.unwrap()); + let trigger = mk_poll_trigger("bad-config", "seq", json!({"piece": "x"})); + + let cancel = CancellationToken::new(); + let handle = tokio::spawn(run_ap_poll_listener_with_url( + Arc::clone(&storage) as Arc, + trigger, + "http://127.0.0.1:9/poll".to_string(), + cancel.clone(), + )); + + // Wait for the failure record to land. + let mut state = None; + for _ in 0..100 { + state = storage.get_trigger_poll_state("bad-config").await.unwrap(); + if state.is_some() { + break; + } + tokio::time::sleep(Duration::from_millis(20)).await; + } + let state = state.expect("invalid config must be recorded as poll failure"); + assert!(state + .last_error + .as_deref() + .unwrap() + .contains("invalid config")); + + cancel.cancel(); + handle.await.unwrap(); + } +} diff --git a/orch8-engine/src/cron.rs b/orch8-engine/src/cron.rs index 2a6240e2..02d84b88 100644 --- a/orch8-engine/src/cron.rs +++ b/orch8-engine/src/cron.rs @@ -200,6 +200,15 @@ pub fn validate_cron_expr(expr: &str) -> Result<(), String> { .map_err(|e| e.to_string()) } +/// Next fire time (UTC) for a bare cron expression, accepting the same +/// 5/6/7-field shapes as [`validate_cron_expr`]. Returns `None` if the +/// expression is invalid or never fires again. Used by schedule consumers +/// that don't carry a full [`CronSchedule`] row (e.g. polling triggers). +pub fn next_fire_from_expr(expr: &str) -> Option> { + let normalized = normalize_cron_expr(expr); + Schedule::from_str(&normalized).ok()?.upcoming(Utc).next() +} + #[cfg(test)] mod tests { use super::*; diff --git a/orch8-engine/src/lib.rs b/orch8-engine/src/lib.rs index ee964447..cfd64228 100644 --- a/orch8-engine/src/lib.rs +++ b/orch8-engine/src/lib.rs @@ -1,3 +1,4 @@ +pub mod ap_poll; pub mod circuit_breaker; pub mod credentials; pub mod cron; diff --git a/orch8-engine/src/metrics.rs b/orch8-engine/src/metrics.rs index e81485f6..cf6de9ed 100644 --- a/orch8-engine/src/metrics.rs +++ b/orch8-engine/src/metrics.rs @@ -14,6 +14,10 @@ pub const RECOVERY_STALE: &str = "orch8_recovery_stale_instances_total"; pub const WEBHOOKS_SENT: &str = "orch8_webhooks_sent_total"; pub const WEBHOOKS_FAILED: &str = "orch8_webhooks_failed_total"; pub const CRON_TRIGGERED: &str = "orch8_cron_triggered_total"; +/// Items returned by `ActivePieces` sidecar polls (one instance per item). +pub const AP_POLL_ITEMS: &str = "orch8_ap_poll_items_total"; +/// Failed `ActivePieces` sidecar polls (recorded on the trigger's state row). +pub const AP_POLL_ERRORS: &str = "orch8_ap_poll_errors_total"; pub const CACHE_HITS: &str = "orch8_cache_hits_total"; pub const CACHE_MISSES: &str = "orch8_cache_misses_total"; pub const PRELOAD_REFS_SCANNED: &str = "orch8_preload_refs_scanned_total"; diff --git a/orch8-engine/src/triggers.rs b/orch8-engine/src/triggers.rs index e50eee61..d81428be 100644 --- a/orch8-engine/src/triggers.rs +++ b/orch8-engine/src/triggers.rs @@ -132,6 +132,14 @@ async fn sync_triggers( } }); } + TriggerType::ActivepiecesPoll => { + let storage = Arc::clone(storage); + let trigger = (*trigger).clone(); + let cancel = child_cancel; + tokio::spawn(async move { + crate::ap_poll::run_ap_poll_listener(storage, trigger, cancel).await; + }); + } _ => { warn!( slug, diff --git a/orch8-storage/src/encrypting.rs b/orch8-storage/src/encrypting.rs index bd69fd84..fe2da9dd 100644 --- a/orch8-storage/src/encrypting.rs +++ b/orch8-storage/src/encrypting.rs @@ -1194,6 +1194,18 @@ impl crate::AdminStore for EncryptingStorage { async fn delete_trigger(&self, slug: &str) -> Result<(), StorageError> { self.inner.delete_trigger(slug).await } + async fn get_trigger_poll_state( + &self, + slug: &str, + ) -> Result, StorageError> { + self.inner.get_trigger_poll_state(slug).await + } + async fn upsert_trigger_poll_state( + &self, + state: &orch8_types::trigger::TriggerPollState, + ) -> Result<(), StorageError> { + self.inner.upsert_trigger_poll_state(state).await + } // --- Credentials (with encryption) --- async fn create_credential( diff --git a/orch8-storage/src/lib.rs b/orch8-storage/src/lib.rs index 33218edd..05cc6270 100644 --- a/orch8-storage/src/lib.rs +++ b/orch8-storage/src/lib.rs @@ -29,7 +29,7 @@ use orch8_types::rate_limit::{RateLimit, RateLimitCheck}; use orch8_types::sequence::SequenceDefinition; use orch8_types::session::Session; use orch8_types::signal::Signal; -use orch8_types::trigger::TriggerDef; +use orch8_types::trigger::{TriggerDef, TriggerPollState}; use orch8_types::worker::WorkerTask; /// Represents a single telemetry event for batch ingestion. @@ -1177,8 +1177,22 @@ pub trait AdminStore: Send + Sync + 'static { async fn update_trigger(&self, trigger: &TriggerDef) -> Result<(), StorageError>; + /// Delete a trigger and its associated poll state (if any). async fn delete_trigger(&self, slug: &str) -> Result<(), StorageError>; + // === Trigger poll state (activepieces_poll) === + + /// Fetch the persisted poll cursor/state for a polling trigger. + /// Returns `None` if the trigger has never polled. + async fn get_trigger_poll_state( + &self, + slug: &str, + ) -> Result, StorageError>; + + /// Insert or replace the poll cursor/state for a polling trigger. + async fn upsert_trigger_poll_state(&self, state: &TriggerPollState) + -> Result<(), StorageError>; + // === Credentials === async fn create_credential( diff --git a/orch8-storage/src/postgres/mod.rs b/orch8-storage/src/postgres/mod.rs index 139348e4..1af8829d 100644 --- a/orch8-storage/src/postgres/mod.rs +++ b/orch8-storage/src/postgres/mod.rs @@ -1056,6 +1056,20 @@ impl crate::AdminStore for PostgresStorage { triggers::delete(self, slug).await } + async fn get_trigger_poll_state( + &self, + slug: &str, + ) -> Result, StorageError> { + triggers::get_poll_state(self, slug).await + } + + async fn upsert_trigger_poll_state( + &self, + state: &orch8_types::trigger::TriggerPollState, + ) -> Result<(), StorageError> { + triggers::upsert_poll_state(self, state).await + } + async fn create_credential( &self, credential: &orch8_types::credential::CredentialDef, diff --git a/orch8-storage/src/postgres/triggers.rs b/orch8-storage/src/postgres/triggers.rs index 33fa756d..0f2634d1 100644 --- a/orch8-storage/src/postgres/triggers.rs +++ b/orch8-storage/src/postgres/triggers.rs @@ -1,6 +1,6 @@ use orch8_types::error::StorageError; use orch8_types::ids::TenantId; -use orch8_types::trigger::{TriggerDef, TriggerType}; +use orch8_types::trigger::{TriggerDef, TriggerPollState, TriggerType}; use super::PostgresStorage; @@ -100,6 +100,8 @@ pub(super) async fn update( } pub(super) async fn delete(store: &PostgresStorage, slug: &str) -> Result<(), StorageError> { + // `trigger_poll_state.slug` has ON DELETE CASCADE in Postgres, so the + // trigger delete also removes any poll cursor. sqlx::query("DELETE FROM triggers WHERE slug = $1") .bind(slug) .execute(&store.pool) @@ -107,6 +109,73 @@ pub(super) async fn delete(store: &PostgresStorage, slug: &str) -> Result<(), St Ok(()) } +pub(super) async fn get_poll_state( + store: &PostgresStorage, + slug: &str, +) -> Result, StorageError> { + let row = sqlx::query_as::<_, PollStateRow>( + r"SELECT slug, state, last_poll_at, last_error, consecutive_failures, updated_at + FROM trigger_poll_state WHERE slug = $1", + ) + .bind(slug) + .fetch_optional(&store.pool) + .await?; + row.map(PollStateRow::into_state).transpose() +} + +pub(super) async fn upsert_poll_state( + store: &PostgresStorage, + state: &TriggerPollState, +) -> Result<(), StorageError> { + sqlx::query( + r"INSERT INTO trigger_poll_state (slug, state, last_poll_at, last_error, consecutive_failures, updated_at) + VALUES ($1,$2,$3,$4,$5,NOW()) + ON CONFLICT(slug) DO UPDATE SET + state=EXCLUDED.state, last_poll_at=EXCLUDED.last_poll_at, + last_error=EXCLUDED.last_error, + consecutive_failures=EXCLUDED.consecutive_failures, + updated_at=NOW()", + ) + .bind(&state.slug) + .bind(state.state.to_string()) + .bind(state.last_poll_at) + .bind(&state.last_error) + .bind(state.consecutive_failures) + .execute(&store.pool) + .await?; + Ok(()) +} + +#[derive(sqlx::FromRow)] +struct PollStateRow { + slug: String, + // `state` column is TEXT (matching `triggers.config`) — decode as String + // then parse. + state: String, + last_poll_at: Option>, + last_error: Option, + consecutive_failures: i32, + updated_at: chrono::DateTime, +} + +impl PollStateRow { + fn into_state(self) -> Result { + let state = if self.state.is_empty() { + serde_json::Value::Null + } else { + serde_json::from_str(&self.state).map_err(StorageError::Serialization)? + }; + Ok(TriggerPollState { + slug: self.slug, + state, + last_poll_at: self.last_poll_at, + last_error: self.last_error, + consecutive_failures: self.consecutive_failures, + updated_at: self.updated_at, + }) + } +} + #[derive(sqlx::FromRow)] struct TriggerRow { slug: String, diff --git a/orch8-storage/src/sqlite/mod.rs b/orch8-storage/src/sqlite/mod.rs index 0e7be705..0f107ede 100644 --- a/orch8-storage/src/sqlite/mod.rs +++ b/orch8-storage/src/sqlite/mod.rs @@ -1206,6 +1206,20 @@ impl crate::AdminStore for SqliteStorage { triggers::delete(self, slug).await } + async fn get_trigger_poll_state( + &self, + slug: &str, + ) -> Result, StorageError> { + triggers::get_poll_state(self, slug).await + } + + async fn upsert_trigger_poll_state( + &self, + state: &orch8_types::trigger::TriggerPollState, + ) -> Result<(), StorageError> { + triggers::upsert_poll_state(self, state).await + } + // === Credentials === async fn create_credential( diff --git a/orch8-storage/src/sqlite/schema.rs b/orch8-storage/src/sqlite/schema.rs index 3b692017..c4a04f4d 100644 --- a/orch8-storage/src/sqlite/schema.rs +++ b/orch8-storage/src/sqlite/schema.rs @@ -236,6 +236,18 @@ CREATE TABLE IF NOT EXISTS triggers ( updated_at TEXT NOT NULL ); +-- Runtime state for polling triggers (trigger_type = 'activepieces_poll'): +-- opaque dedupe cursor returned by the sidecar plus failure bookkeeping. +-- Rows are deleted explicitly alongside their trigger (see triggers::delete). +CREATE TABLE IF NOT EXISTS trigger_poll_state ( + slug TEXT PRIMARY KEY, + state TEXT NOT NULL DEFAULT 'null', + last_poll_at TEXT, + last_error TEXT, + consecutive_failures INTEGER NOT NULL DEFAULT 0, + updated_at TEXT NOT NULL +); + CREATE TABLE IF NOT EXISTS credentials ( id TEXT PRIMARY KEY, tenant_id TEXT NOT NULL DEFAULT '', diff --git a/orch8-storage/src/sqlite/triggers.rs b/orch8-storage/src/sqlite/triggers.rs index 7d2f1c2a..69b704db 100644 --- a/orch8-storage/src/sqlite/triggers.rs +++ b/orch8-storage/src/sqlite/triggers.rs @@ -1,8 +1,8 @@ use orch8_types::error::StorageError; use orch8_types::ids::TenantId; -use orch8_types::trigger::{TriggerDef, TriggerType}; +use orch8_types::trigger::{TriggerDef, TriggerPollState, TriggerType}; -use super::helpers::parse_ts; +use super::helpers::{parse_ts, parse_ts_opt}; use super::SqliteStorage; pub(super) async fn create( @@ -105,6 +105,12 @@ pub(super) async fn update( } pub(super) async fn delete(store: &SqliteStorage, slug: &str) -> Result<(), StorageError> { + // Remove poll state first so a polling trigger never leaves an orphaned + // cursor behind (SQLite schema declares no FK cascade for this table). + sqlx::query("DELETE FROM trigger_poll_state WHERE slug = ?1") + .bind(slug) + .execute(&store.pool) + .await?; sqlx::query("DELETE FROM triggers WHERE slug = ?1") .bind(slug) .execute(&store.pool) @@ -112,6 +118,67 @@ pub(super) async fn delete(store: &SqliteStorage, slug: &str) -> Result<(), Stor Ok(()) } +pub(super) async fn get_poll_state( + store: &SqliteStorage, + slug: &str, +) -> Result, StorageError> { + let row: Option = sqlx::query_as( + r"SELECT slug, state, last_poll_at, last_error, consecutive_failures, updated_at + FROM trigger_poll_state WHERE slug = ?1", + ) + .bind(slug) + .fetch_optional(&store.pool) + .await?; + row.map(PollStateRow::into_state).transpose() +} + +pub(super) async fn upsert_poll_state( + store: &SqliteStorage, + state: &TriggerPollState, +) -> Result<(), StorageError> { + sqlx::query( + r"INSERT INTO trigger_poll_state (slug, state, last_poll_at, last_error, consecutive_failures, updated_at) + VALUES (?1,?2,?3,?4,?5,?6) + ON CONFLICT(slug) DO UPDATE SET + state=excluded.state, last_poll_at=excluded.last_poll_at, + last_error=excluded.last_error, + consecutive_failures=excluded.consecutive_failures, + updated_at=excluded.updated_at", + ) + .bind(&state.slug) + .bind(state.state.to_string()) + .bind(state.last_poll_at.map(|t| t.to_rfc3339())) + .bind(&state.last_error) + .bind(state.consecutive_failures) + .bind(chrono::Utc::now().to_rfc3339()) + .execute(&store.pool) + .await?; + Ok(()) +} + +#[derive(sqlx::FromRow)] +struct PollStateRow { + slug: String, + state: String, + last_poll_at: Option, + last_error: Option, + consecutive_failures: i32, + updated_at: String, +} + +impl PollStateRow { + fn into_state(self) -> Result { + Ok(TriggerPollState { + slug: self.slug, + state: serde_json::from_str(&self.state).map_err(StorageError::Serialization)?, + last_poll_at: parse_ts_opt(self.last_poll_at)?, + last_error: self.last_error, + consecutive_failures: self.consecutive_failures, + updated_at: parse_ts(&self.updated_at)?, + }) + } +} + #[derive(sqlx::FromRow)] struct TriggerRow { slug: String, diff --git a/orch8-storage/tests/storage_integration.rs b/orch8-storage/tests/storage_integration.rs index 78945374..202c909b 100644 --- a/orch8-storage/tests/storage_integration.rs +++ b/orch8-storage/tests/storage_integration.rs @@ -3673,3 +3673,127 @@ async fn increment_total_steps_returns_zero_for_missing_instance() { let missing = InstanceId::new(); assert_eq!(s.increment_total_steps(missing).await.unwrap(), 0); } + +// --------------------------------------------------------------------------- +// Trigger poll state (activepieces_poll) +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn trigger_poll_state_upsert_get_round_trip() { + use orch8_types::trigger::TriggerPollState; + + let s = store().await; + + // Unknown slug → None. + assert!(s.get_trigger_poll_state("nope").await.unwrap().is_none()); + + let now = Utc::now(); + let state = TriggerPollState { + slug: "poller".into(), + state: json!({"lastPoll": 123, "ids": ["a", "b"]}), + last_poll_at: Some(now), + last_error: None, + consecutive_failures: 0, + updated_at: now, + }; + s.upsert_trigger_poll_state(&state).await.unwrap(); + + let got = s + .get_trigger_poll_state("poller") + .await + .unwrap() + .expect("state row"); + assert_eq!(got.slug, "poller"); + assert_eq!(got.state, json!({"lastPoll": 123, "ids": ["a", "b"]})); + assert!(got.last_error.is_none()); + assert_eq!(got.consecutive_failures, 0); + assert!(got.last_poll_at.is_some()); + + // Upsert replaces in place (cursor advance + failure bookkeeping). + let updated = TriggerPollState { + slug: "poller".into(), + state: json!({"lastPoll": 456}), + last_poll_at: Some(Utc::now()), + last_error: Some("upstream 503".into()), + consecutive_failures: 2, + updated_at: Utc::now(), + }; + s.upsert_trigger_poll_state(&updated).await.unwrap(); + + let got = s.get_trigger_poll_state("poller").await.unwrap().unwrap(); + assert_eq!(got.state, json!({"lastPoll": 456})); + assert_eq!(got.last_error.as_deref(), Some("upstream 503")); + assert_eq!(got.consecutive_failures, 2); +} + +#[tokio::test] +async fn delete_trigger_removes_poll_state() { + use orch8_types::trigger::{TriggerDef, TriggerPollState, TriggerType}; + + let s = store().await; + let now = Utc::now(); + + s.create_trigger(&TriggerDef { + slug: "poll-trig".into(), + sequence_name: "seq".into(), + version: None, + tenant_id: TenantId::unchecked("t1"), + namespace: "default".into(), + enabled: true, + secret: None, + trigger_type: TriggerType::ActivepiecesPoll, + config: json!({"piece": "stripe", "trigger": "t"}), + created_at: now, + updated_at: now, + }) + .await + .unwrap(); + + s.upsert_trigger_poll_state(&TriggerPollState { + slug: "poll-trig".into(), + state: json!({"cursor": 1}), + last_poll_at: None, + last_error: None, + consecutive_failures: 0, + updated_at: now, + }) + .await + .unwrap(); + + s.delete_trigger("poll-trig").await.unwrap(); + assert!(s.get_trigger("poll-trig").await.unwrap().is_none()); + assert!( + s.get_trigger_poll_state("poll-trig") + .await + .unwrap() + .is_none(), + "poll state must be deleted with its trigger" + ); +} + +#[tokio::test] +async fn trigger_round_trips_activepieces_poll_type() { + use orch8_types::trigger::{TriggerDef, TriggerType}; + + let s = store().await; + let now = Utc::now(); + s.create_trigger(&TriggerDef { + slug: "ap-type".into(), + sequence_name: "seq".into(), + version: None, + tenant_id: TenantId::unchecked("t1"), + namespace: "default".into(), + enabled: true, + secret: None, + trigger_type: TriggerType::ActivepiecesPoll, + config: json!({"piece": "typeform", "trigger": "new_submission", "interval_secs": 15}), + created_at: now, + updated_at: now, + }) + .await + .unwrap(); + + let got = s.get_trigger("ap-type").await.unwrap().unwrap(); + assert_eq!(got.trigger_type, TriggerType::ActivepiecesPoll); + assert_eq!(got.config["piece"], "typeform"); +} diff --git a/orch8-types/src/trigger.rs b/orch8-types/src/trigger.rs index 8fbb9cba..d0ac9f5d 100644 --- a/orch8-types/src/trigger.rs +++ b/orch8-types/src/trigger.rs @@ -20,6 +20,11 @@ pub enum TriggerType { /// Unlike webhooks, event triggers carry no HMAC validation — they're /// intended for trusted server-to-server or in-cluster integration. Event, + /// Polling trigger executed via the `ActivePieces` Node sidecar. The + /// engine periodically asks the sidecar to run a piece trigger's poll + /// (config holds piece, trigger, auth, props, and a schedule) and + /// creates one instance per returned item. + ActivepiecesPoll, } impl fmt::Display for TriggerType { @@ -29,6 +34,7 @@ impl fmt::Display for TriggerType { Self::Nats => f.write_str("nats"), Self::FileWatch => f.write_str("file_watch"), Self::Event => f.write_str("event"), + Self::ActivepiecesPoll => f.write_str("activepieces_poll"), } } } @@ -41,11 +47,53 @@ impl TriggerType { "nats" => Some(Self::Nats), "file_watch" => Some(Self::FileWatch), "event" => Some(Self::Event), + "activepieces_poll" => Some(Self::ActivepiecesPoll), _ => None, } } } +/// Runtime state for a polling trigger (`TriggerType::ActivepiecesPoll`). +/// +/// Stored separately from [`TriggerDef`] so the poll loop can persist its +/// cursor and failure bookkeeping without rewriting the user-authored +/// trigger definition (avoiding lost-update races with API edits). +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +pub struct TriggerPollState { + /// Slug of the trigger this state belongs to. + pub slug: String, + /// Opaque cursor blob returned by the sidecar's last successful poll + /// (`ActivePieces` store contents — `lastPoll` epoch, item ids, ...). + /// Sent back verbatim on the next poll for deduplication. + #[serde(default)] + pub state: serde_json::Value, + /// When the last poll attempt (success or failure) completed. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub last_poll_at: Option>, + /// Error message from the most recent failed poll. Cleared on success. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub last_error: Option, + /// Number of consecutive failed polls. Reset to 0 on success. + #[serde(default)] + pub consecutive_failures: i32, + pub updated_at: DateTime, +} + +impl TriggerPollState { + /// Fresh state for a trigger that has never polled. + #[must_use] + pub fn empty(slug: impl Into) -> Self { + Self { + slug: slug.into(), + state: serde_json::Value::Null, + last_poll_at: None, + last_error: None, + consecutive_failures: 0, + updated_at: Utc::now(), + } + } +} + /// A persisted trigger definition that maps an event source to a sequence. #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] pub struct TriggerDef { @@ -89,6 +137,40 @@ mod tests { assert_eq!(TriggerType::Nats.to_string(), "nats"); assert_eq!(TriggerType::FileWatch.to_string(), "file_watch"); assert_eq!(TriggerType::Event.to_string(), "event"); + assert_eq!( + TriggerType::ActivepiecesPoll.to_string(), + "activepieces_poll" + ); + } + + #[test] + fn activepieces_poll_round_trips_display_and_serde() { + // Display ↔ from_str_loose must agree (storage round-trips through text). + assert_eq!( + TriggerType::from_str_loose("activepieces_poll"), + Some(TriggerType::ActivepiecesPoll) + ); + let t: TriggerType = serde_json::from_str(r#""activepieces_poll""#).unwrap(); + assert_eq!(t, TriggerType::ActivepiecesPoll); + assert_eq!(serde_json::to_string(&t).unwrap(), r#""activepieces_poll""#); + } + + #[test] + fn trigger_poll_state_empty_and_round_trip() { + let s = TriggerPollState::empty("my-poll"); + assert_eq!(s.slug, "my-poll"); + assert!(s.state.is_null()); + assert!(s.last_poll_at.is_none()); + assert!(s.last_error.is_none()); + assert_eq!(s.consecutive_failures, 0); + + let json = serde_json::to_string(&s).unwrap(); + let back: TriggerPollState = serde_json::from_str(&json).unwrap(); + assert_eq!(back.slug, "my-poll"); + assert_eq!(back.consecutive_failures, 0); + // Optional fields are skipped when None. + assert!(!json.contains("last_error")); + assert!(!json.contains("last_poll_at")); } #[test]