Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 50 additions & 1 deletion activepieces/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,55 @@ ORCH8_ACTIVEPIECES_URL=http://127.0.0.1:50052/execute ./orch8-server

The handler prefix `ap://` is parsed as `ap://<piece>.<action>`. `auth` and `props` are passed through to the piece as `context.auth` and `context.propsValue`.

## Polling triggers (`POST /poll`)

The engine's `activepieces_poll` trigger loop calls `POST /poll` on a schedule
to run a piece trigger's poll headlessly:

```json
{
"piece": "stripe",
"trigger": "new_failed_payment",
"auth": { "access_token": "sk_live_..." },
"props": { },
"state": { "lastPoll": 1717171717 },
"slug": "stripe-failed-payments"
}
```

Response: `{ "ok": true, "items": [ ... ], "state": { ... } }` — `items` are
the new events since the cursor (the engine creates one durable instance per
item), `state` is the trigger's `context.store` contents after the run. The
engine persists `state` verbatim and sends it back on the next poll, which is
exactly the dedupe contract AP's `pollingHelper` expects (`lastPoll` epoch /
seen-id cursors live in the store). `state` is `null` on the first poll.

Only `TriggerStrategy.POLLING` triggers are supported — webhook-strategy
triggers are rejected with a permanent error (`onEnable`/`onDisable`
platform registration doesn't exist headlessly). Register a poll trigger on
the orch8 side with:

```bash
curl -X POST http://localhost:8080/api/v1/triggers -H 'content-type: application/json' -d '{
"slug": "stripe-failed-payments",
"sequence_name": "payment-recovery",
"tenant_id": "t1",
"trigger_type": "activepieces_poll",
"config": {
"piece": "stripe",
"trigger": "new_failed_payment",
"auth": "credentials://stripe-prod",
"interval_secs": 60
}
}'
```

`config.auth` / `config.props` may contain `credentials://` references —
the engine resolves them tenant-scoped on every poll. Schedule is either
`interval_secs` (default 60) or `cron` (UTC), not both. Poll failures are
recorded on the registration (`GET /triggers/{slug}` returns `poll_state`
with `last_error` and `consecutive_failures`) and retried on the next tick.

## Error semantics

| HTTP status | Error type | Retryable? |
Expand All @@ -87,7 +136,7 @@ Classification heuristic inside the sidecar:

**Actions work.** Most community piece actions execute cleanly through the stub `ActionContext`.

**Triggers don't (yet).** Polling and webhook triggers need scaffolding for `onEnable`/`onDisable` lifecycle hooks — use orch8's native webhook / NATS / file-watch triggers for inbound events.
**Polling triggers work.** `POST /poll` runs `TriggerStrategy.POLLING` triggers with a store seeded from the engine-persisted cursor (see above). Webhook-strategy triggers are not supported — they need platform-side `onEnable`/`onDisable` registration and an inbound URL; use orch8's native webhook triggers for those events.

**OAuth2 refresh is the caller's responsibility.** The adapter does not refresh tokens — pass a fresh `access_token` in `auth`. A forthcoming companion package will integrate with orch8's plugin/credentials registry.

Expand Down
65 changes: 65 additions & 0 deletions activepieces/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,68 @@ export function buildActionContext(input: ContextInput): Record<string, unknown>
generateResumeUrl: (_params: unknown) => "",
};
}

export interface TriggerContextInput {
auth: unknown;
propsValue: Record<string, unknown>;
/**
* Persisted store contents from the previous poll (the dedupe cursor —
* `lastPoll` epoch, seen item ids, ...). `null`/`undefined` on the first
* poll. The orch8 engine persists whatever {@link buildTriggerContext}'s
* `dumpStore` returns and sends it back on the next poll.
*/
state?: Record<string, unknown> | null;
/** Trigger registration slug — used as the step/flow identifier. */
slug: string;
serverPublicUrl?: string;
serverApiUrl?: string;
}

/**
* Stub `TriggerContext` for headless polling-trigger execution.
*
* Same limitations as {@link buildActionContext}, with one key difference:
* the `store` is seeded from the caller-provided `state` blob and its final
* contents are returned via `dumpStore()`. AP polling triggers (and the
* framework's `pollingHelper`) keep their dedupe cursor in `context.store`,
* so persisting the store across polls is exactly what gives us
* exactly-once-ish item delivery without the AP platform's database.
*
* Webhook-only context members (`payload`, `webhookUrl`) are present but
* empty — polling triggers don't read them, and webhook triggers fail with
* a clear error at the route level before this context is ever built.
*/
export function buildTriggerContext(input: TriggerContextInput): {
ctx: Record<string, unknown>;
dumpStore: () => Record<string, unknown>;
} {
const base = buildActionContext({
auth: input.auth,
propsValue: input.propsValue,
instanceId: input.slug,
blockId: input.slug,
serverPublicUrl: input.serverPublicUrl,
serverApiUrl: input.serverApiUrl,
});

const store = new Map<string, unknown>(Object.entries(input.state ?? {}));

const ctx: Record<string, unknown> = {
...base,
store: {
get: async (key: string) => store.get(key) ?? null,
put: async (key: string, value: unknown) => {
store.set(key, value);
return value;
},
delete: async (key: string) => {
store.delete(key);
},
},
// Webhook-trigger members — benign empties for polling triggers.
payload: {},
webhookUrl: "",
};

return { ctx, dumpStore: () => Object.fromEntries(store) };
}
4 changes: 2 additions & 2 deletions activepieces/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import { createServer, ServerOptions } from "./server";
import { createDefaultLoader } from "./registry";

export { createServer } from "./server";
export { createDefaultLoader, findAction } from "./registry";
export { buildActionContext } from "./context";
export { createDefaultLoader, findAction, findTrigger } from "./registry";
export { buildActionContext, buildTriggerContext } from "./context";
export { classifyError, PieceExecutionError } from "./errors";

function parseEnvInt(name: string, fallback: number): number {
Expand Down
44 changes: 43 additions & 1 deletion activepieces/src/registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,26 @@ export interface PieceAction {
run: (ctx: unknown) => Promise<unknown>;
}

/**
* Minimal structural view of a piece trigger. Polling triggers built with
* the AP framework's `createTrigger`/`pollingHelper` expose `type` (the
* `TriggerStrategy` — `"POLLING"` for the ones we can run headlessly) and a
* `run(ctx)` that returns the new items since the cursor stored in
* `ctx.store`. Lifecycle hooks (`onEnable`/`onDisable`) are intentionally
* not modelled — the orch8 engine owns the poll schedule, so there is no
* platform-side registration to perform.
*/
export interface PieceTrigger {
name: string;
displayName?: string;
type?: string;
run: (ctx: unknown) => Promise<unknown>;
}

export interface Piece {
displayName?: string;
actions: () => Record<string, PieceAction> | PieceAction[];
triggers?: () => Record<string, unknown> | unknown[];
triggers?: () => Record<string, PieceTrigger> | PieceTrigger[];
}

export interface PieceLoader {
Expand Down Expand Up @@ -149,3 +165,29 @@ export function findAction(piece: Piece, actionName: string): PieceAction {
`action '${actionName}' not found on piece`,
);
}

/**
* Look up a trigger on a loaded piece. Mirrors {@link findAction}: handles
* both the record shape (keyed by trigger name) and the array shape (objects
* with a `.name` field).
*/
export function findTrigger(piece: Piece, triggerName: string): PieceTrigger {
const src = piece.triggers;
const raw = typeof src === "function" ? src() : src;

if (Array.isArray(raw)) {
const found = raw.find((t) => t && t.name === triggerName);
if (found) return found;
} else if (raw && typeof raw === "object") {
const rec = raw as Record<string, PieceTrigger>;
if (rec[triggerName]) return rec[triggerName];
for (const t of Object.values(rec)) {
if (t && t.name === triggerName) return t;
}
}

throw new PieceExecutionError(
"permanent",
`trigger '${triggerName}' not found on piece`,
);
}
156 changes: 154 additions & 2 deletions activepieces/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
// Copyright (c) 2026 Orch8, Inc.

import * as http from "node:http";
import { buildActionContext } from "./context";
import { buildActionContext, buildTriggerContext } from "./context";
import { classifyError, PieceExecutionError } from "./errors";
import { createDefaultLoader, findAction, Piece, PieceLoader } from "./registry";
import { createDefaultLoader, findAction, findTrigger, Piece, PieceLoader } from "./registry";

/**
* HTTP shape exchanged with the Rust `ap://` handler.
Expand Down Expand Up @@ -41,6 +41,39 @@ export interface ExecuteRequest {
block_id?: string;
}

/**
* Polling-trigger protocol (POST /poll), used by the orch8 engine's
* `activepieces_poll` trigger loop.
*
* Request body:
* {
* "piece": "stripe",
* "trigger": "new_failed_payment",
* "auth": { ... } | "api-key",
* "props": { ... trigger-specific ... },
* "state": { ... store blob from the previous poll ... } | null,
* "slug": "trigger registration slug"
* }
*
* Response body:
* Success (HTTP 200): { "ok": true, "items": [ ... ], "state": { ... } }
* - `items`: new events since the cursor (one orch8 instance each)
* - `state`: the trigger's store contents after the poll — the engine
* persists this verbatim and sends it back on the next poll.
* Failure: same envelope and status mapping as /execute.
*
* Only `TriggerStrategy.POLLING` triggers are supported; webhook-strategy
* triggers are rejected with a permanent error.
*/
export interface PollRequest {
piece: string;
trigger: string;
auth?: unknown;
props?: Record<string, unknown>;
state?: Record<string, unknown> | null;
slug?: string;
}

export interface ServerOptions {
port: number;
host?: string;
Expand Down Expand Up @@ -91,6 +124,11 @@ async function handleRequest(
return;
}

if (req.method === "POST" && url === "/poll") {
await handlePoll(req, res, loader, timeoutMs, log);
return;
}

if (req.method !== "POST" || url !== "/execute") {
writeJson(res, 404, {
ok: false,
Expand Down Expand Up @@ -166,6 +204,120 @@ async function handleRequest(
}
}

/**
* POST /poll — run a piece's polling trigger headlessly.
*
* Loads the piece, finds the trigger, seeds a trigger context's store with
* the caller-provided `state` blob, invokes `trigger.run(ctx)`, and returns
* the produced items plus the store's final contents as the next cursor.
*/
async function handlePoll(
req: http.IncomingMessage,
res: http.ServerResponse,
loader: PieceLoader,
timeoutMs: number,
log: NonNullable<ServerOptions["log"]>,
): Promise<void> {
let body: PollRequest;
try {
body = await readJson<PollRequest>(req);
} catch (err) {
writeJson(res, 422, {
ok: false,
error: { type: "permanent", message: `invalid request body: ${(err as Error).message}` },
});
return;
}

if (!body || typeof body.piece !== "string" || typeof body.trigger !== "string") {
writeJson(res, 422, {
ok: false,
error: { type: "permanent", message: "request must include string 'piece' and 'trigger' fields" },
});
return;
}
if (body.state != null && (typeof body.state !== "object" || Array.isArray(body.state))) {
writeJson(res, 422, {
ok: false,
error: { type: "permanent", message: "'state' must be a JSON object or null" },
});
return;
}

let piece: Piece;
try {
piece = await loader.load(body.piece);
} catch (err) {
respondWithError(res, err, log, { piece: body.piece });
return;
}

let trigger;
try {
trigger = findTrigger(piece, body.trigger);
} catch (err) {
respondWithError(res, err, log, { piece: body.piece, trigger: body.trigger });
return;
}

// Webhook-strategy triggers need platform-side registration (onEnable)
// and an inbound URL — neither exists headlessly. Fail fast and clearly.
if (typeof trigger.type === "string" && trigger.type.toUpperCase().includes("WEBHOOK")) {
respondWithError(
res,
new PieceExecutionError(
"permanent",
`trigger '${body.trigger}' uses the webhook strategy; only polling triggers are supported`,
),
log,
{ piece: body.piece, trigger: body.trigger },
);
return;
}
if (typeof trigger.run !== "function") {
respondWithError(
res,
new PieceExecutionError("permanent", `trigger '${body.trigger}' has no run() function`),
log,
{ piece: body.piece, trigger: body.trigger },
);
return;
}

const { ctx, dumpStore } = buildTriggerContext({
auth: body.auth,
propsValue: body.props ?? {},
state: body.state ?? null,
slug: body.slug ?? "",
});

const started = Date.now();
try {
const result = await withTimeout(trigger.run(ctx), timeoutMs, body.piece, body.trigger);
const items = Array.isArray(result) ? result : result == null ? [] : [result];
log("info", "piece trigger poll completed", {
piece: body.piece,
trigger: body.trigger,
slug: body.slug,
items: items.length,
duration_ms: Date.now() - started,
});
writeJson(res, 200, { ok: true, items, state: dumpStore() });
} catch (err) {
const classified = classifyError(err);
const status = classified.type === "retryable" ? 502 : 500;
log(classified.type === "retryable" ? "warn" : "error", "piece trigger poll failed", {
piece: body.piece,
trigger: body.trigger,
slug: body.slug,
duration_ms: Date.now() - started,
error_type: classified.type,
error_message: classified.message,
});
writeJson(res, status, { ok: false, error: classified });
}
}

function respondWithError(
res: http.ServerResponse,
err: unknown,
Expand Down
Loading
Loading