Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion extensions/relay/adapters/discord/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,10 @@ export class DiscordRuntime {
},
recent,
);
if (authorityOutcomeAllowsDelivery(outcome) && "status" in outcome.binding) return outcome.binding;
if (authorityOutcomeAllowsDelivery(outcome) && "status" in outcome.binding) {
route.actions.clearLocalStatus?.("relay-binding-authority");
return outcome.binding;
}
if (outcome.kind === "state-unavailable") route.actions.setLocalStatus?.("relay-binding-authority", bindingAuthorityDiagnostic(outcome) ?? "Relay state is unavailable; protected messenger delivery was suppressed.");
if (outcome.kind === "revoked" || outcome.kind === "moved" || outcome.kind === "state-unavailable") this.recentBindingBySessionKey.delete(route.sessionKey);
return undefined;
Expand Down
5 changes: 4 additions & 1 deletion extensions/relay/adapters/slack/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,10 @@ export class SlackRuntime {
},
recent,
);
if (authorityOutcomeAllowsDelivery(outcome) && "status" in outcome.binding) return outcome.binding;
if (authorityOutcomeAllowsDelivery(outcome) && "status" in outcome.binding) {
route.actions.clearLocalStatus?.("relay-binding-authority");
return outcome.binding;
}
if (outcome.kind === "state-unavailable") route.actions.setLocalStatus?.("relay-binding-authority", bindingAuthorityDiagnostic(outcome) ?? "Relay state is unavailable; protected messenger delivery was suppressed.");
if (outcome.kind === "revoked" || outcome.kind === "moved" || outcome.kind === "state-unavailable") {
this.ownedBindingSessionKeys.delete(route.sessionKey);
Expand Down
12 changes: 10 additions & 2 deletions extensions/relay/adapters/telegram/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -329,15 +329,23 @@ export class InProcessTunnelRuntime implements TunnelRuntime {
{ sessionKey: route.sessionKey, chatId: baseBinding.chatId, userId: baseBinding.userId, includePaused: true, allowVolatileFallback: true },
baseBinding,
);
if (baseOutcome.kind === "state-unavailable") route.actions.setLocalStatus?.("relay-binding-authority", bindingAuthorityDiagnostic(baseOutcome) ?? "Relay state is unavailable; protected messenger delivery was suppressed.");
if (baseOutcome.kind === "state-unavailable") {
route.actions.setLocalStatus?.("relay-binding-authority", bindingAuthorityDiagnostic(baseOutcome) ?? "Relay state is unavailable; protected messenger delivery was suppressed.");
return undefined;
}
route.actions.clearLocalStatus?.("relay-binding-authority");
return authorityOutcomeAllowsDelivery(baseOutcome) ? { ...baseOutcome.binding, chatId: binding.chatId, userId: binding.userId } : undefined;
}
const outcome = resolveTelegramBindingAuthority(
snapshot,
{ sessionKey: route.sessionKey, chatId: binding.chatId, userId: binding.userId, includePaused: true, allowVolatileFallback: true },
binding,
);
if (outcome.kind === "state-unavailable") route.actions.setLocalStatus?.("relay-binding-authority", bindingAuthorityDiagnostic(outcome) ?? "Relay state is unavailable; protected messenger delivery was suppressed.");
if (outcome.kind === "state-unavailable") {
route.actions.setLocalStatus?.("relay-binding-authority", bindingAuthorityDiagnostic(outcome) ?? "Relay state is unavailable; protected messenger delivery was suppressed.");
return undefined;
}
route.actions.clearLocalStatus?.("relay-binding-authority");
return authorityOutcomeAllowsDelivery(outcome) ? outcome.binding : undefined;
}

Expand Down
62 changes: 52 additions & 10 deletions extensions/relay/broker/process.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const [
commandSurfacesModule,
requesterFileDeliveryModule,
bindingAuthorityModule,
telegramRouteBindingModule,
] = await Promise.all([
jiti.import('../core/guided-answer.ts'),
jiti.import('../adapters/telegram/actions.ts'),
Expand All @@ -30,6 +31,7 @@ const [
jiti.import('../commands/surfaces.ts'),
jiti.import('../core/requester-file-delivery.ts'),
jiti.import('../core/binding-authority.ts'),
jiti.import('./telegram-route-binding.ts'),
]);

function requiredFunction(module, modulePath, exportName) {
Expand Down Expand Up @@ -109,6 +111,7 @@ const bindingAuthorityStateFromData = requiredFunction(bindingAuthorityModule, '
const resolveTelegramBindingAuthority = requiredFunction(bindingAuthorityModule, './binding-authority.ts', 'resolveTelegramBindingAuthority');
const stateUnavailableBindingAuthority = requiredFunction(bindingAuthorityModule, './binding-authority.ts', 'stateUnavailableBindingAuthority');
const telegramDestinationKey = requiredFunction(bindingAuthorityModule, './binding-authority.ts', 'telegramDestinationKey');
const routeWithPersistedTelegramBinding = requiredFunction(telegramRouteBindingModule, './telegram-route-binding.ts', 'routeWithPersistedTelegramBinding');

const socketPath = process.env.TELEGRAM_TUNNEL_BROKER_SOCKET_PATH;
const pidPath = process.env.TELEGRAM_TUNNEL_BROKER_PID_PATH;
Expand Down Expand Up @@ -950,9 +953,24 @@ async function flushProgress(sessionKey, chatId, userId, key) {
await sendPlainText(chatId, `${sourcePrefixForRoute(route)}${text}`);
}

async function consumePendingPairing(nonce, expectedChannel = 'telegram') {
async function pairingHashForCode(nonce) {
const { createHash } = await import('node:crypto');
const nonceHash = createHash('sha256').update(nonce).digest('hex');
return createHash('sha256').update(nonce).digest('hex');
}

async function inspectPendingPairing(nonce, expectedChannel = 'telegram') {
const nonceHash = await pairingHashForCode(nonce);
const state = await loadState();
const pairing = state.pendingPairings[nonceHash];
if (!pairing) return { status: 'missing' };
if (expectedChannel && pairing.channel && pairing.channel !== expectedChannel) return { status: 'wrong-channel', pairing };
if (pairing.consumedAt) return { status: 'consumed', pairing };
if (Date.parse(pairing.expiresAt) <= Date.now()) return { status: 'expired', pairing };
return { status: 'active', pairing };
}

async function consumePendingPairing(nonce, expectedChannel = 'telegram') {
const nonceHash = await pairingHashForCode(nonce);
let found;
await updateState((state) => {
const pairing = state.pendingPairings[nonceHash];
Expand Down Expand Up @@ -1005,22 +1023,41 @@ async function handlePairStart(message, nonce) {
await sendPlainText(message.chat.id, 'Pairing only works from a private Telegram chat with the bot.');
return;
}
const pairing = await consumePendingPairing(nonce, 'telegram');
if (!pairing) {
await sendPlainText(message.chat.id, 'This pairing link is invalid or expired. Run /relay connect telegram again in Pi.');
const inspected = await inspectPendingPairing(nonce, 'telegram');
if (inspected.status !== 'active') {
const text = inspected.status === 'wrong-channel'
? 'This pairing link belongs to a different messenger. Re-run /relay connect telegram in Pi.'
: 'This pairing link is invalid or expired. Run /relay connect telegram again in Pi.';
await sendPlainText(message.chat.id, text);
return;
}
const pairing = inspected.pairing;
const route = routes.get(pairing.sessionKey);
if (!route) {
await sendPlainText(message.chat.id, `The target Pi session (${pairing.sessionLabel}) is not online anymore. Re-run /relay connect telegram locally.`);
await sendPlainText(message.chat.id, `The target Pi session (${pairing.sessionLabel}) is not online right now. Keep Pi running and retry this pairing link before it expires, or run /relay connect telegram locally again.`);
return;
}
const allowedByList = (config.allowUserIds || []).length > 0 && (config.allowUserIds || []).includes(message.user.id);
const approved = allowedByList || (await requestClient(route, 'confirmPairing', { identity: message.user }));
let approved = allowedByList;
if (!approved) {
try {
approved = Boolean(await requestClient(route, 'confirmPairing', { identity: message.user }));
} catch (error) {
console.warn(`Telegram pairing approval failed for ${route.sessionLabel}: ${redact(error instanceof Error ? error.message : String(error))}`);
await sendPlainText(message.chat.id, 'Pi did not respond to the pairing approval request. Keep Pi running and retry this pairing link before it expires, or run /relay connect telegram locally again.');
return;
}
}
if (!approved) {
await consumePendingPairing(nonce, 'telegram');
await sendPlainText(message.chat.id, 'Pairing was declined locally. Ask the Pi user to retry the connection flow.');
return;
}
const consumed = await consumePendingPairing(nonce, 'telegram');
if (!consumed) {
await sendPlainText(message.chat.id, 'This pairing link was already used or expired. Run /relay connect telegram again in Pi.');
return;
}

const binding = {
sessionKey: route.sessionKey,
Expand All @@ -1039,8 +1076,12 @@ async function handlePairStart(message, nonce) {
route.binding = binding;
await upsertBinding(binding);
activeSessionByChatId.set(String(message.chat.id), route.sessionKey);
await requestClient(route, 'persistBinding', { binding, revoked: false });
await requestClient(route, 'appendAudit', { message: `Telegram relay paired with ${getUserLabel(message.user)}.` });
await requestClient(route, 'persistBinding', { binding, revoked: false }).catch((error) => {
console.warn(`Telegram pairing persisted in broker state but client persist failed for ${route.sessionLabel}: ${redact(error instanceof Error ? error.message : String(error))}`);
});
await requestClient(route, 'appendAudit', { message: `Telegram relay paired with ${getUserLabel(message.user)}.` }).catch((error) => {
console.warn(`Telegram pairing audit failed for ${route.sessionLabel}: ${redact(error instanceof Error ? error.message : String(error))}`);
});
await sendPlainText(message.chat.id, `Connected to Pi session ${route.sessionLabel}. Send text prompts directly, or use /help for tunnel commands.`);
}

Expand Down Expand Up @@ -2107,7 +2148,8 @@ async function handleClientRequest(socket, message) {
return;
}
case 'registerRoute': {
const route = await stripRevokedBindingFromRoute(message.route);
const state = await loadState();
const route = await stripRevokedBindingFromRoute(routeWithPersistedTelegramBinding(message.route, state));
if (!clients.has(socket)) clients.set(socket, { clientId: message.clientId, routes: new Set() });
const client = clients.get(socket);
client.clientId = message.clientId;
Expand Down
26 changes: 26 additions & 0 deletions extensions/relay/broker/telegram-route-binding.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import type { TelegramBindingMetadata } from "../core/types.js";

export interface BrokerTelegramRouteBindingState {
bindings?: Record<string, (TelegramBindingMetadata & { status?: string; revokedAt?: string }) | undefined>;
}

export interface BrokerTelegramRouteLike {
sessionKey: string;
binding?: TelegramBindingMetadata;
}

export function persistedTelegramBindingForRoute(route: Pick<BrokerTelegramRouteLike, "sessionKey">, state: BrokerTelegramRouteBindingState): TelegramBindingMetadata | undefined {
const binding = state.bindings?.[route.sessionKey];
if (!binding || binding.status === "revoked" || binding.revokedAt) return undefined;
return binding;
}

export type BrokerTelegramRouteWithOptionalBinding<T extends Pick<BrokerTelegramRouteLike, "sessionKey">> = T & { binding?: TelegramBindingMetadata };

export function routeWithPersistedTelegramBinding<T extends BrokerTelegramRouteLike & { binding: TelegramBindingMetadata }>(route: T, state: BrokerTelegramRouteBindingState): T;
export function routeWithPersistedTelegramBinding<T extends Pick<BrokerTelegramRouteLike, "sessionKey">>(route: T, state: BrokerTelegramRouteBindingState): BrokerTelegramRouteWithOptionalBinding<T>;
export function routeWithPersistedTelegramBinding<T extends Pick<BrokerTelegramRouteLike, "sessionKey">>(route: T, state: BrokerTelegramRouteBindingState): BrokerTelegramRouteWithOptionalBinding<T> {
if ("binding" in route && route.binding) return route as BrokerTelegramRouteWithOptionalBinding<T>;
const binding = persistedTelegramBindingForRoute(route, state);
return binding ? { ...route, binding } : route;
}
41 changes: 38 additions & 3 deletions extensions/relay/broker/tunnel-runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ export class BrokerTunnelRuntime implements TunnelRuntime {
private socket?: Socket;
private buffer = "";
private connecting?: Promise<void>;
private reconnectTimer?: ReturnType<typeof setTimeout>;
private reconnectDelayMs = 250;
private started = false;
private setupCache?: SetupCache;

Expand All @@ -83,6 +85,8 @@ export class BrokerTunnelRuntime implements TunnelRuntime {

async stop(): Promise<void> {
this.started = false;
if (this.reconnectTimer) clearTimeout(this.reconnectTimer);
this.reconnectTimer = undefined;
this.rejectPending(new Error("Broker runtime stopped."));
this.socket?.destroy();
this.socket = undefined;
Expand Down Expand Up @@ -164,6 +168,9 @@ export class BrokerTunnelRuntime implements TunnelRuntime {
}

private attachSocket(socket: Socket): void {
if (this.reconnectTimer) clearTimeout(this.reconnectTimer);
this.reconnectTimer = undefined;
this.reconnectDelayMs = 250;
this.socket = socket;
this.buffer = "";
socket.setEncoding("utf8");
Expand All @@ -182,6 +189,7 @@ export class BrokerTunnelRuntime implements TunnelRuntime {
socket.on("close", () => {
if (this.socket === socket) this.socket = undefined;
this.rejectPending(new Error("Broker connection closed."));
this.scheduleReconnect();
});
socket.on("error", () => {
// close handler covers recovery
Expand Down Expand Up @@ -432,14 +440,41 @@ export class BrokerTunnelRuntime implements TunnelRuntime {
const deadline = Date.now() + 10_000;
while (Date.now() < deadline) {
try {
await this.connectSocket();
this.socket?.destroy();
this.socket = undefined;
await this.probeSocket();
return;
} catch {
await new Promise((resolvePromise) => setTimeout(resolvePromise, 150));
}
}
throw new Error("PiRelay broker did not start in time.");
}

private async probeSocket(): Promise<void> {
await new Promise<void>((resolvePromise, rejectPromise) => {
const socket = createConnection(this.socketPath);
const onError = (error: Error) => {
socket.destroy();
rejectPromise(error);
};
socket.once("error", onError);
socket.once("connect", () => {
socket.off("error", onError);
socket.end();
resolvePromise();
});
});
}

private scheduleReconnect(): void {
if (!this.started || this.reconnectTimer || this.connecting) return;
this.reconnectTimer = setTimeout(() => {
this.reconnectTimer = undefined;
if (!this.started) return;
void this.ensureConnected().catch(() => {
this.reconnectDelayMs = Math.min(this.reconnectDelayMs * 2, 5_000);
this.scheduleReconnect();
});
}, this.reconnectDelayMs);
this.reconnectTimer.unref?.();
}
}
1 change: 1 addition & 0 deletions extensions/relay/core/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ export interface SessionRouteActions {
appendAudit(message: string): void;
notifyLocal?(message: string, level?: "info" | "warning" | "error"): void;
setLocalStatus?(key: string, value: string): void;
clearLocalStatus?(key: string): void;
refreshLocalStatus?(): void;
persistBinding(binding: TelegramBindingMetadata | null, revoked?: boolean): void;
promptLocalConfirmation(identity: RelayPairingIdentity): Promise<PairingApprovalDecision | boolean>;
Expand Down
11 changes: 11 additions & 0 deletions extensions/relay/runtime/extension-runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,10 @@ export default function telegramTunnelExtension(pi: ExtensionAPI): void {
safeSetStatus(statusKeyForChannel(channel, instanceId), formatRelayStatusLine({ channel, ...state }), ctx);
}

function clearStatus(key: string, ctx = latestContext): void {
safeSetStatus(key, "", ctx);
}

function discordStatusConfigured(config: DiscordRelayConfig | undefined): boolean {
return Boolean(config?.enabled && config.botToken);
}
Expand Down Expand Up @@ -731,6 +735,11 @@ export default function telegramTunnelExtension(pi: ExtensionAPI): void {
if (!live) return;
safeSetStatus(key, value, live);
},
clearLocalStatus: (key) => {
const live = liveContextForRoute(route);
if (!live) return;
clearStatus(key, live);
},
refreshLocalStatus: () => {
const live = liveContextForRoute(route);
if (!live) return;
Expand Down Expand Up @@ -874,6 +883,8 @@ export default function telegramTunnelExtension(pi: ExtensionAPI): void {
if (rejected) {
const message = rejected.reason instanceof Error ? rejected.reason.message : String(rejected.reason);
safeSetStatus("relay-lifecycle", `relay lifecycle warning: ${redactSecrets(message)}`, ctx);
} else {
clearStatus("relay-lifecycle", ctx);
}
}

Expand Down
44 changes: 43 additions & 1 deletion tests/broker-namespace.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,32 @@ describe("broker namespace isolation", () => {
expect(socketPath(betaRuntime)).toBe(join(stateDir, "broker-beta-5c86c8f7d9db9ca4.sock"));
});

it("resyncs registered routes when the broker socket is recreated", async () => {
const stateDir = await mkdtemp(join(shortSocketTmpdir(), "pirelay-broker-reconnect-"));
tempDirs.push(stateDir);
const runtime = new BrokerTunnelRuntime(config(stateDir));
const firstMessages: Array<Record<string, unknown>> = [];
const firstServer = await listenJsonBroker(socketPath(runtime), firstMessages);

await runtime.registerRoute(route("docs-session"));
expect(firstMessages.map((message) => message.action)).toContain("registerRoute");

for (const socket of sockets.splice(0)) socket.destroy();
(runtime as unknown as { socket?: Socket }).socket?.destroy();
(runtime as unknown as { socket?: Socket }).socket = undefined;
await closeServer(firstServer);
await rm(socketPath(runtime), { force: true });

const secondMessages: Array<Record<string, unknown>> = [];
await listenJsonBroker(socketPath(runtime), secondMessages);
await (runtime as unknown as { ensureConnected(): Promise<void> }).ensureConnected();

await waitForCondition(() => secondMessages.some((message) => message.action === "registerRoute"));
expect(secondMessages.some((message) => (message.route as { sessionKey?: string } | undefined)?.sessionKey === "docs-session")).toBe(true);

await runtime.stop();
});

it("does not share route registration or delivery across namespace sockets", async () => {
const stateDir = await mkdtemp(join(shortSocketTmpdir(), "pirelay-broker-namespace-"));
tempDirs.push(stateDir);
Expand Down Expand Up @@ -62,7 +88,7 @@ function socketPath(runtime: BrokerTunnelRuntime): string {
return (runtime as unknown as { socketPath: string }).socketPath;
}

async function listenJsonBroker(path: string, messages: Array<Record<string, unknown>>): Promise<void> {
async function listenJsonBroker(path: string, messages: Array<Record<string, unknown>>): Promise<Server> {
const server = createServer((socket) => {
sockets.push(socket);
socket.setEncoding("utf8");
Expand Down Expand Up @@ -90,6 +116,22 @@ async function listenJsonBroker(path: string, messages: Array<Record<string, unk
resolve();
});
});
return server;
}

function closeServer(server: Server): Promise<void> {
const index = servers.indexOf(server);
if (index >= 0) servers.splice(index, 1);
return new Promise((resolve, reject) => server.close((error) => error ? reject(error) : resolve()));
}

async function waitForCondition(predicate: () => boolean, timeoutMs = 2_000): Promise<void> {
const deadline = Date.now() + timeoutMs;
while (Date.now() < deadline) {
if (predicate()) return;
await new Promise((resolve) => setTimeout(resolve, 25));
}
throw new Error("Timed out waiting for condition.");
}

function config(stateDir: string): TelegramTunnelConfig {
Expand Down
Loading