Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 48 additions & 21 deletions extensions/relay/adapters/discord/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import { deliverWorkspaceFileToRequester, formatRequesterFileDeliveryResult, par
import { classifySharedRoomEvent, normalizeMachineSelector, parseSharedRoomSessionsArgs, parseSharedRoomToArgs, parseSharedRoomUseArgs, resolveSharedRoomMachineTarget, sharedRoomAddressingFromEvent, sharedRoomMachineIdentity, type SharedRoomAddressing, type SharedRoomMachineIdentity } from "../../core/shared-room.js";
import { buildDelegatedTaskPrompt, delegationCommandFromAction, delegationIngressEventKey, delegationRoomFromMessage, evaluateDelegationIngress, isPeerBotIdentity } from "../../core/agent-delegation-runtime.js";
import { transitionDelegationTask, type DelegationTaskRecord } from "../../core/agent-delegation.js";
import { parseApprovalActionData, parseApprovalTextCommand } from "../../core/approval-gates.js";
import { parseApprovalActionData, parseApprovalTextCommand, type ApprovalDecisionKind, type ApprovalDecisionResult } from "../../core/approval-gates.js";

const DISCORD_CHANNEL = "discord" as const;
const IMAGE_PROMPT_FALLBACK = "Please inspect the attached image.";
Expand Down Expand Up @@ -255,6 +255,7 @@ export class DiscordRuntime {
}
return;
}
if (routedCommand && await this.handleApprovalCommand(routedMessage, routedCommand)) return;
const preferredSessionKey = routedCommand?.name === "to" ? await this.targetSessionKeyForToCommand(routedMessage, routedCommand.args) : await this.sharedRoomPreferredSessionKey(routedMessage);
const binding = await this.findDiscordBinding(routedMessage, { preferredSessionKey });
if (!binding || !isDiscordIdentityAllowed(routedMessage.sender, this.config.discord)) {
Expand Down Expand Up @@ -395,21 +396,56 @@ export class DiscordRuntime {
private async handleApprovalAction(action: ChannelInboundAction): Promise<boolean> {
const parsed = parseApprovalActionData(action.actionData);
if (!parsed) return false;
const binding = await this.findDiscordBinding(action);
const route = binding ? this.routes.get(binding.sessionKey) : undefined;
if (!binding || !route?.actions.resolveApprovalDecision) {
await this.adapter?.answerAction(action.actionId, { text: "Approval request is stale.", alert: true });
if (!this.config.discord || !isDiscordIdentityAllowed(action.sender, this.config.discord)) {
await this.adapter?.answerAction(action.actionId, { text: "This Discord action is not authorized.", alert: true });
return true;
}
const result = await route.actions.resolveApprovalDecision({
approvalId: parsed.approvalId,
decision: parsed.decision,
const result = await this.resolveApprovalDecisionFromDiscord(action, parsed.approvalId, parsed.decision);
await this.adapter?.answerAction(action.actionId, { text: result.message, alert: !result.ok });
return true;
}

private async resolveApprovalDecisionFromDiscord(
message: Pick<ChannelInboundMessage, "conversation" | "sender">,
approvalId: string,
decision: ApprovalDecisionKind,
): Promise<ApprovalDecisionResult> {
const request = await this.store.getApprovalRequest(approvalId);
if (!request || request.requester.channel !== DISCORD_CHANNEL || (request.requester.instanceId ?? "default") !== this.instanceId) {
return { ok: false, status: "stale", message: "Approval request is stale." };
}
const route = this.routes.get(request.sessionKey);
if (!route?.actions.resolveApprovalDecision) {
return { ok: false, status: "stale", message: "Approval target is offline or stale." };
}
const binding = await this.store.getActiveChannelBindingForSession(DISCORD_CHANNEL, request.sessionKey, {
instanceId: this.instanceId,
conversationId: message.conversation.id,
userId: message.sender.userId,
includePaused: true,
});
if (!binding) {
return { ok: false, status: "unauthorized", message: "This Discord action is not authorized." };
}
return route.actions.resolveApprovalDecision({
approvalId,
decision,
channel: DISCORD_CHANNEL,
instanceId: this.instanceId,
conversationId: action.conversation.id,
userId: action.sender.userId,
conversationId: message.conversation.id,
userId: message.sender.userId,
});
await this.adapter?.answerAction(action.actionId, { text: result.message, alert: !result.ok });
}

private async handleApprovalCommand(message: ChannelInboundMessage, command: DiscordCommand): Promise<boolean> {
const approvalCommand = parseApprovalTextCommand(command.name, command.args);
if (!approvalCommand) return false;
if (!this.config.discord || !isDiscordIdentityAllowed(message.sender, this.config.discord)) {
await this.sendText(message, "This Discord identity is not authorized to control this PiRelay machine bot.");
return true;
}
const result = await this.resolveApprovalDecisionFromDiscord(message, approvalCommand.approvalId, approvalCommand.decision);
await this.sendText(message, result.message);
return true;
}

Expand Down Expand Up @@ -716,16 +752,7 @@ export class DiscordRuntime {
}

private async handleCommand(message: ChannelInboundMessage, binding: ChannelPersistedBindingRecord, route: SessionRoute, command: DiscordCommand): Promise<void> {
const approvalCommand = parseApprovalTextCommand(command.name, command.args);
if (approvalCommand) {
if (!route.actions.resolveApprovalDecision) {
await this.sendText(message, "Approval request is stale.");
return;
}
const result = await route.actions.resolveApprovalDecision({ approvalId: approvalCommand.approvalId, decision: approvalCommand.decision, channel: DISCORD_CHANNEL, instanceId: this.instanceId, conversationId: message.conversation.id, userId: message.sender.userId });
await this.sendText(message, result.message);
return;
}
if (await this.handleApprovalCommand(message, command)) return;
switch (command.name) {
case "help":
await this.sendText(message, DISCORD_HELP_TEXT);
Expand Down
74 changes: 52 additions & 22 deletions extensions/relay/adapters/slack/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import { SlackChannelAdapter, isSlackIdentityAllowed, slackEnvelopeToChannelEven
import { createSlackLiveOperations, type SlackMessageEventFromHistory } from "./live-client.js";
import { delegationCommandFromAction, delegationIngressEventKey, delegationRoomFromMessage, evaluateDelegationIngress, isPeerBotIdentity } from "../../core/agent-delegation-runtime.js";
import { transitionDelegationTask, type DelegationTaskRecord } from "../../core/agent-delegation.js";
import { parseApprovalActionData, parseApprovalTextCommand } from "../../core/approval-gates.js";
import { parseApprovalActionData, parseApprovalTextCommand, type ApprovalDecisionKind, type ApprovalDecisionResult } from "../../core/approval-gates.js";

const SLACK_CHANNEL = "slack" as const;
const SLACK_HELP_TEXT = buildHelpText({
Expand Down Expand Up @@ -372,22 +372,59 @@ export class SlackRuntime {
private async handleApprovalAction(action: ChannelInboundAction): Promise<boolean> {
const parsed = parseApprovalActionData(action.actionData);
if (!parsed) return false;
const binding = await this.findSlackBinding(action);
const route = binding ? this.routes.get(binding.sessionKey) : undefined;
if (!binding || !route?.actions.resolveApprovalDecision || !this.adapter) {
await this.adapter?.answerAction(action.actionId, { text: "Approval request is stale." });
const slackConfig = this.configForInstance();
if (!this.adapter || !slackConfig || !isSlackIdentityAllowed(action.sender, slackConfig)) {
await this.adapter?.answerAction(action.actionId, { text: "This Slack identity is not authorized to control PiRelay." });
return true;
}
const result = await route.actions.resolveApprovalDecision({
approvalId: parsed.approvalId,
decision: parsed.decision,
const result = await this.resolveApprovalDecisionFromSlack(action, parsed.approvalId, parsed.decision);
await this.adapter.answerAction(action.actionId, { text: result.message });
return true;
}

private async resolveApprovalDecisionFromSlack(
message: Pick<ChannelInboundMessage, "conversation" | "sender" | "metadata">,
approvalId: string,
decision: ApprovalDecisionKind,
): Promise<ApprovalDecisionResult> {
const request = await this.store.getApprovalRequest(approvalId);
if (!request || request.requester.channel !== SLACK_CHANNEL || (request.requester.instanceId ?? "default") !== this.instanceId) {
return { ok: false, status: "stale", message: "Approval request is stale." };
}
const route = this.routes.get(request.sessionKey);
if (!route?.actions.resolveApprovalDecision) {
return { ok: false, status: "stale", message: "Approval target is offline or stale." };
}
const binding = await this.store.getActiveChannelBindingForSession(SLACK_CHANNEL, request.sessionKey, {
instanceId: this.instanceId,
conversationId: message.conversation.id,
userId: message.sender.userId,
includePaused: true,
});
if (!binding) {
return { ok: false, status: "unauthorized", message: "This Slack identity is not authorized to control PiRelay." };
}
return route.actions.resolveApprovalDecision({
approvalId,
decision,
channel: SLACK_CHANNEL,
instanceId: this.instanceId,
conversationId: action.conversation.id,
userId: action.sender.userId,
threadId: typeof action.metadata?.threadTs === "string" ? action.metadata.threadTs : undefined,
conversationId: message.conversation.id,
userId: message.sender.userId,
threadId: typeof message.metadata?.threadTs === "string" ? message.metadata.threadTs : undefined,
});
await this.adapter.answerAction(action.actionId, { text: result.message });
}

private async handleApprovalCommand(message: ChannelInboundMessage, command: SlackCommand): Promise<boolean> {
const approvalCommand = parseApprovalTextCommand(command.name, command.args);
if (!approvalCommand) return false;
const slackConfig = this.configForInstance();
if (!slackConfig || !isSlackIdentityAllowed(message.sender, slackConfig)) {
await this.sendText(message, "This Slack identity is not authorized to control PiRelay.");
return true;
}
const result = await this.resolveApprovalDecisionFromSlack(message, approvalCommand.approvalId, approvalCommand.decision);
await this.sendText(message, result.message);
return true;
}

Expand Down Expand Up @@ -472,6 +509,8 @@ export class SlackRuntime {
}
if (isPeerBotIdentity(message.sender)) return;
if (!slackConfig || !isSlackIdentityAllowed(message.sender, slackConfig)) return;
const approvalCommand = parseSlackCommand(routedMessage.text);
if (approvalCommand && await this.handleApprovalCommand(routedMessage, approvalCommand)) return;
const binding = await this.findSlackBinding(routedMessage) ?? await this.livePreseededBinding(routedMessage);
if (!binding) {
await this.sendText(message, message.conversation.kind === "private"
Expand Down Expand Up @@ -745,16 +784,7 @@ export class SlackRuntime {
}

private async handleSlackCommand(message: ChannelInboundMessage, binding: ChannelPersistedBindingRecord, route: SessionRoute, command: SlackCommand): Promise<void> {
const approvalCommand = parseApprovalTextCommand(command.name, command.args);
if (approvalCommand) {
if (!route.actions.resolveApprovalDecision) {
await this.sendText(message, "Approval request is stale.");
return;
}
const result = await route.actions.resolveApprovalDecision({ approvalId: approvalCommand.approvalId, decision: approvalCommand.decision, channel: SLACK_CHANNEL, instanceId: this.instanceId, conversationId: message.conversation.id, userId: message.sender.userId, threadId: typeof message.metadata?.threadTs === "string" ? message.metadata.threadTs : undefined });
await this.sendText(message, result.message);
return;
}
if (await this.handleApprovalCommand(message, command)) return;
if (binding.paused && !commandAllowsWhilePaused(command.name)) {
await this.sendText(message, "Remote delivery is paused for this Slack binding. Use `relay resume` first.");
return;
Expand Down
64 changes: 39 additions & 25 deletions extensions/relay/adapters/telegram/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ import { formatFullOutput, formatRelayStatusForRoute, formatSessionSelectorError
import { commandIntentFromPipeline, runTelegramIngressPipeline, telegramActionFromPipelineResult } from "./middleware.js";
import { delegationIngressEventKey, delegationRoomFromMessage, evaluateDelegationIngress } from "../../core/agent-delegation-runtime.js";
import { transitionDelegationTask, type DelegationTaskRecord } from "../../core/agent-delegation.js";
import { parseApprovalActionData, parseApprovalTextCommand } from "../../core/approval-gates.js";
import { parseApprovalActionData, parseApprovalTextCommand, type ApprovalDecisionKind, type ApprovalDecisionResult } from "../../core/approval-gates.js";
import {
appendRecentActivity,
displayProgressMode,
Expand Down Expand Up @@ -722,6 +722,12 @@ export class InProcessTunnelRuntime implements TunnelRuntime {
await this.handleAuthorizedCommand(undefined, message, command.command, command.args);
return;
}
const approvalCommand = command ? parseApprovalTextCommand(command.command, command.args) : undefined;
if (approvalCommand) {
const result = await this.resolveApprovalDecisionFromTelegram(message.chat.id, message.user, approvalCommand.approvalId, approvalCommand.decision);
await this.api.sendPlainText(message.chat.id, result.message);
return;
}

const bindingSnapshot = await this.store.loadBindingAuthoritySnapshot();
if (bindingSnapshot.kind === "state-unavailable") {
Expand Down Expand Up @@ -829,24 +835,7 @@ export class InProcessTunnelRuntime implements TunnelRuntime {
private async processCallback(callback: TelegramInboundCallback): Promise<void> {
const approvalAction = parseApprovalActionData(callback.data);
if (approvalAction) {
const binding = await this.activeBindingForMessage(callback.chat.id, callback.user.id);
const route = binding ? this.routes.get(binding.sessionKey) : undefined;
if (!route?.actions.resolveApprovalDecision) {
await this.api.answerCallbackQuery(callback.callbackQueryId, "Approval request is stale.");
return;
}
if (!route.binding || !(await this.isAuthorized(route, callback.user))) {
await this.api.answerCallbackQuery(callback.callbackQueryId, "Unauthorized.");
return;
}
const result = await route.actions.resolveApprovalDecision({
approvalId: approvalAction.approvalId,
decision: approvalAction.decision,
channel: "telegram",
instanceId: "default",
conversationId: String(callback.chat.id),
userId: String(callback.user.id),
});
const result = await this.resolveApprovalDecisionFromTelegram(callback.chat.id, callback.user, approvalAction.approvalId, approvalAction.decision);
await this.api.answerCallbackQuery(callback.callbackQueryId, result.message);
return;
}
Expand Down Expand Up @@ -1707,13 +1696,9 @@ export class InProcessTunnelRuntime implements TunnelRuntime {
args: string,
): Promise<void> {
const approvalCommand = parseApprovalTextCommand(command, args);
if (approvalCommand && route?.actions.resolveApprovalDecision) {
const result = await route.actions.resolveApprovalDecision({ approvalId: approvalCommand.approvalId, decision: approvalCommand.decision, channel: "telegram", instanceId: "default", conversationId: String(message.chat.id), userId: String(message.user.id) });
await this.api.sendPlainText(message.chat.id, result.message);
return;
}
if (approvalCommand) {
await this.api.sendPlainText(message.chat.id, "Approval request is stale.");
const result = await this.resolveApprovalDecisionFromTelegram(message.chat.id, message.user, approvalCommand.approvalId, approvalCommand.decision);
await this.api.sendPlainText(message.chat.id, result.message);
return;
}
if (command === "help") {
Expand Down Expand Up @@ -1936,6 +1921,35 @@ export class InProcessTunnelRuntime implements TunnelRuntime {
}
}

private async resolveApprovalDecisionFromTelegram(
chatId: number,
user: TelegramUserSummary,
approvalId: string,
decision: ApprovalDecisionKind,
): Promise<ApprovalDecisionResult> {
const request = await this.store.getApprovalRequest(approvalId);
if (!request || request.requester.channel !== "telegram") {
return { ok: false, status: "stale", message: "Approval request is stale." };
}
const route = this.routes.get(request.sessionKey);
if (!route?.actions.resolveApprovalDecision) {
return { ok: false, status: "stale", message: "Approval target is offline or stale." };
}
const binding = await this.store.getActiveBindingForSession(request.sessionKey, { chatId, userId: user.id, includePaused: true });
if (binding) route.binding = binding;
if (!route.binding || !(await this.isAuthorized(route, user))) {
return { ok: false, status: "unauthorized", message: "Unauthorized." };
}
return route.actions.resolveApprovalDecision({
approvalId,
decision,
channel: "telegram",
instanceId: "default",
conversationId: String(chatId),
userId: String(user.id),
});
}

private async deliverPlainPrompt(route: SessionRoute, message: TelegramInboundMessage, text: string): Promise<void> {
this.clearAnswerStateForRoute(route);
const delivery = this.promptDeliveryState(route);
Expand Down
Loading