Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 64 additions & 13 deletions app/ai/openai.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,11 @@ export async function callOpenAiChatStream(
mode: AiMode,
params: AiRequestBase,
onDelta: (text: string) => void | Promise<void>,
opts?: { isCancelled?: () => boolean; getAbortSignal?: () => Promise<boolean> },
opts?: {
isCancelled?: () => boolean;
getAbortSignal?: () => Promise<boolean>;
abortSignal?: AbortSignal;
},
): Promise<AiResponseBase> {
if (!client) {
return {
Expand All @@ -125,38 +129,73 @@ export async function callOpenAiChatStream(
mode === "token_info"
? "You are a blockchain and token analyst. Answer clearly and briefly.\n\n"
: "";
let onAbort: (() => void) | null = null;

try {
if (opts?.abortSignal?.aborted) {
return {
ok: false,
provider: "openai",
mode,
error: "Generation aborted by newer message.",
};
}

const stream = client.responses.stream({
model: "gpt-5.2",
...(params.instructions ? { instructions: params.instructions } : {}),
input: `${prefix}${trimmed}`,
});
const abortStream = (): void => {
try {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(stream as any)?.abort?.();
} catch {
/* ignore */
}
};
let cancelLogged = false;
const hardAbort = (reason: string): void => {
if (!cancelLogged) {
cancelLogged = true;
console.log("[STREAM] aborted immediately:", reason);
}
abortStream();
};
onAbort = (): void => {
abortStream();
};
if (opts?.abortSignal) {
opts.abortSignal.addEventListener("abort", onAbort, { once: true });
}

stream.on("response.output_text.delta", async (event: { snapshot?: string }) => {
if (opts?.abortSignal?.aborted) {
hardAbort("abortSignal");
return;
}
if (opts?.isCancelled && opts.isCancelled()) {
try {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(stream as any)?.abort?.();
} catch {
/* ignore */
}
hardAbort("isCancelled");
return;
}
if (opts?.getAbortSignal && (await opts.getAbortSignal())) {
try {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(stream as any)?.abort?.();
} catch {
/* ignore */
}
console.log("[STREAM] stale generation");
hardAbort("getAbortSignal");
return;
}
const text = event?.snapshot ?? "";
if (text.length > 0) void Promise.resolve(onDelta(text));
});

const response = await stream.finalResponse();
if (opts?.abortSignal?.aborted) {
return {
ok: false,
provider: "openai",
mode,
error: "Generation aborted by newer message.",
};
}
const r = response as any;
let output_text = r.output_text;
if (output_text == null || String(output_text).trim() === "") {
Expand Down Expand Up @@ -189,6 +228,14 @@ export async function callOpenAiChatStream(
usage: r.usage ?? undefined,
};
} catch (e: any) {
if (opts?.abortSignal?.aborted) {
return {
ok: false,
provider: "openai",
mode,
error: "Generation aborted by newer message.",
};
}
const message =
(e && typeof e === "object" && "message" in e ? (e as Error).message : null) ??
(e != null ? String(e) : "Failed to call OpenAI. Check OPENAI env and network.");
Expand All @@ -198,5 +245,9 @@ export async function callOpenAiChatStream(
mode,
error: message,
};
} finally {
if (opts?.abortSignal && onAbort) {
opts.abortSignal.removeEventListener("abort", onAbort);
}
}
}
6 changes: 5 additions & 1 deletion app/ai/transmitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,11 @@ export async function transmit(request: AiRequest): Promise<AiResponse> {
export async function transmitStream(
request: AiRequest,
onDelta: (text: string) => void | Promise<void>,
opts?: { isCancelled?: () => boolean; getAbortSignal?: () => Promise<boolean> },
opts?: {
isCancelled?: () => boolean;
getAbortSignal?: () => Promise<boolean>;
abortSignal?: AbortSignal;
},
): Promise<AiResponse> {
const mode: AiMode = request.mode ?? "chat";
const thread = request.threadContext;
Expand Down
31 changes: 31 additions & 0 deletions app/api/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,29 @@ function normalizeBase(base: string): string {
return base.replace(/\/$/, "");
}

function isLocalhostUrl(value: string): boolean {
try {
const u = new URL(value);
return isPrivateOrLocalHost(u.hostname);
} catch {
return false;
}
}

function isLikelyTelegramMiniApp(): boolean {
if (typeof window === "undefined") return false;
try {
if ((window.location.hash ?? "").includes("tgWebApp")) return true;
} catch {
// ignore
}
try {
return !!(window as unknown as { Telegram?: { WebApp?: unknown } }).Telegram?.WebApp;
} catch {
return false;
}
}

function isPrivateOrLocalHost(hostname: string): boolean {
if (hostname === "localhost" || hostname === "127.0.0.1") return true;
if (hostname.startsWith("10.")) return true;
Expand Down Expand Up @@ -92,6 +115,14 @@ function getNodeBaseUrl(): string {
export function getApiBaseUrl(): string {
const envBase = process.env.EXPO_PUBLIC_API_BASE_URL?.trim();
if (envBase) {
if (
typeof window !== "undefined" &&
isLikelyTelegramMiniApp() &&
isLocalhostUrl(envBase)
) {
const browserBase = getBrowserBaseUrl();
if (browserBase) return browserBase;
}
return normalizeBase(envBase);
}

Expand Down
2 changes: 2 additions & 0 deletions app/api/telegram.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ async function handler(
res?: { status: (n: number) => void; setHeader: (k: string, v: string) => void; end: (s?: string) => void }
): Promise<Response | void> {
const method = (request as { method?: string }).method ?? request.method;
console.log('[api/telegram]', method, new Date().toISOString());
if (method === 'GET') {
const body = { ok: true, endpoint: 'telegram', use: 'POST with initData' };
if (res) {
Expand All @@ -41,6 +42,7 @@ async function handler(
try {
const { handlePost } = await import('../telegram/post.js');
const response = await handlePost(request);
console.log('[api/telegram] POST status', response.status);
if (res) {
res.status(response.status);
response.headers.forEach((v, k) => res.setHeader(k, v));
Expand Down
Loading