diff --git a/src/commands/emails/receiving/listen.ts b/src/commands/emails/receiving/listen.ts index a6382a9b..0f4fe018 100644 --- a/src/commands/emails/receiving/listen.ts +++ b/src/commands/emails/receiving/listen.ts @@ -5,18 +5,19 @@ import { getCancelExitCode, setSigintHandler } from '../../../lib/cli-exit'; import type { GlobalOpts } from '../../../lib/client'; import { requireClient } from '../../../lib/client'; import { buildHelpText } from '../../../lib/help-text'; -import { errorMessage, outputError } from '../../../lib/output'; +import { outputError } from '../../../lib/output'; +import { retryPoll } from '../../../lib/retry-poll'; import { safeTerminalText } from '../../../lib/safe-terminal-text'; import { createSpinner } from '../../../lib/spinner'; import { isInteractive } from '../../../lib/tty'; const PAGE_SIZE = 100; +const MAX_CONSECUTIVE_ERRORS = 5; -function timestamp(): string { - return new Date().toLocaleTimeString('en-GB', { hour12: false }); -} +const timestamp = (): string => + new Date().toLocaleTimeString('en-GB', { hour12: false }); -function displayEmail(email: ListReceivingEmail, jsonMode: boolean): void { +const displayEmail = (email: ListReceivingEmail, jsonMode: boolean): void => { if (jsonMode) { console.log(JSON.stringify(email)); } else { @@ -31,7 +32,7 @@ function displayEmail(email: ListReceivingEmail, jsonMode: boolean): void { `${ts} ${from} -> ${to} ${pc.bold(`"${subject}"`)} ${pc.dim(id)}\n`, ); } -} +}; export const listenReceivingCommand = new Command('listen') .description('Poll for new inbound emails and display them as they arrive') @@ -74,7 +75,6 @@ Ctrl+C exits cleanly.`, const resend = await requireClient(globalOpts); const jsonMode = globalOpts.json || !isInteractive(); - // Initial poll — just grab the latest email to establish our starting point const spinner = createSpinner( 'Connecting...', globalOpts.quiet || jsonMode, @@ -83,36 +83,27 @@ Ctrl+C exits cleanly.`, const seenIds = new Set(); let consecutiveErrors = 0; - try { - const { data, error } = await resend.emails.receiving.list({ limit: 1 }); - if (error || !data) { - spinner.fail('Failed to connect'); - outputError( - { - message: error?.message ?? 'Unexpected empty response', - code: 'list_error', - }, - { json: globalOpts.json }, - ); - } - - for (const email of data.data) { - seenIds.add(email.id); - } + const result = await retryPoll(() => + resend.emails.receiving.list({ limit: 1 }), + ); - spinner.stop('Ready'); - } catch (err) { + if (!result.success) { spinner.fail('Failed to connect'); outputError( { - message: errorMessage(err, 'Unknown error'), + message: result.message, code: 'list_error', }, { json: globalOpts.json }, ); } - // Print banner + for (const email of result.data.data) { + seenIds.add(email.id); + } + + spinner.stop('Ready'); + if (!jsonMode) { process.stderr.write('\n'); process.stderr.write(` ${pc.bold('Polling:')} every ${interval}s\n`); @@ -121,35 +112,32 @@ Ctrl+C exits cleanly.`, ); } - // Helper: handle consecutive poll errors and exit if threshold reached - function handlePollError(message: string): void { + const handlePollError = (message: string): void => { consecutiveErrors++; if (!jsonMode) { process.stderr.write( `${pc.dim(`[${timestamp()}]`)} ${pc.yellow('Warning:')} ${message}\n`, ); } - if (consecutiveErrors >= 5) { + if (consecutiveErrors >= MAX_CONSECUTIVE_ERRORS) { outputError( { - message: 'Exiting after 5 consecutive API failures.', + message: `Exiting after ${MAX_CONSECUTIVE_ERRORS} consecutive API failures.`, code: 'poll_error', }, { json: globalOpts.json }, ); } - } + }; - // Poll loop — paginates until it hits a seen email to guarantee no misses let timeoutHandle: ReturnType; - async function poll(): Promise { + const poll = async (): Promise => { try { const newEmails: ListReceivingEmail[] = []; let cursor: string | undefined; let hasMore = true; - // Paginate through results until we find an email we've already seen while (hasMore) { const params: { limit: number; after?: string } = { limit: PAGE_SIZE, @@ -158,14 +146,16 @@ Ctrl+C exits cleanly.`, params.after = cursor; } - const { data, error } = await resend.emails.receiving.list(params); - if (error || !data) { - handlePollError(error?.message ?? 'Empty response'); + const pollResult = await retryPoll(() => + resend.emails.receiving.list(params), + ); + if (!pollResult.success) { + handlePollError(pollResult.message); return; } let foundSeen = false; - for (const email of data.data) { + for (const email of pollResult.data.data) { if (seenIds.has(email.id)) { foundSeen = true; break; @@ -173,35 +163,31 @@ Ctrl+C exits cleanly.`, newEmails.push(email); } - if (foundSeen || !data.has_more) { + if (foundSeen || !pollResult.data.has_more) { hasMore = false; } else { - // Use the last email as cursor for the next page - cursor = data.data[data.data.length - 1]?.id; + cursor = pollResult.data.data[pollResult.data.data.length - 1]?.id; } } consecutiveErrors = 0; - // Mark all new emails as seen for (const email of newEmails) { seenIds.add(email.id); } - // Display in chronological order (oldest first) - for (const email of newEmails.reverse()) { + for (const email of [...newEmails].reverse()) { displayEmail(email, jsonMode); } } catch (err) { - handlePollError(errorMessage(err, 'Unknown error')); + handlePollError(err instanceof Error ? err.message : 'Unknown error'); } finally { timeoutHandle = setTimeout(poll, interval * 1000); } - } + }; timeoutHandle = setTimeout(poll, interval * 1000); - // Graceful exit const handleSignal = () => { clearTimeout(timeoutHandle); if (!jsonMode) { @@ -213,6 +199,5 @@ Ctrl+C exits cleanly.`, setSigintHandler(handleSignal); process.on('SIGTERM', handleSignal); - // Keep alive await new Promise(() => {}); }); diff --git a/src/lib/retry-poll.ts b/src/lib/retry-poll.ts new file mode 100644 index 00000000..6356d951 --- /dev/null +++ b/src/lib/retry-poll.ts @@ -0,0 +1,68 @@ +type SdkResponse = { + readonly data: T | null; + readonly error: { readonly message: string; readonly name?: string } | null; + readonly headers?: Record | null; +}; + +type RetryPollSuccess = { readonly success: true; readonly data: T }; +type RetryPollFailure = { readonly success: false; readonly message: string }; +export type RetryPollResult = RetryPollSuccess | RetryPollFailure; + +export type RetryPollOpts = { + readonly delayMs?: (ms: number) => Promise; +}; + +const DEFAULT_RETRY_DELAYS = [1, 2, 4]; +const MAX_RETRIES = DEFAULT_RETRY_DELAYS.length; + +const defaultDelay = (ms: number): Promise => + new Promise((resolve) => setTimeout(resolve, ms)); + +const parseRetryDelay = ( + headers?: Record | null, +): number | undefined => { + const value = headers?.['retry-after']; + if (!value) { + return undefined; + } + const seconds = Number(value); + return Number.isFinite(seconds) && seconds >= 0 ? seconds : undefined; +}; + +const pollAttempt = async ( + call: () => Promise>, + n: number, + delay: (ms: number) => Promise, +): Promise> => { + try { + const { data, error, headers } = await call(); + if (error) { + if (n < MAX_RETRIES && error.name === 'rate_limit_exceeded') { + const wait = parseRetryDelay(headers) ?? DEFAULT_RETRY_DELAYS[n]; + await delay(wait * 1000); + return pollAttempt(call, n + 1, delay); + } + return { success: false, message: error.message }; + } + if (data === null) { + return { success: false, message: 'Unexpected empty response' }; + } + return { success: true, data }; + } catch (err) { + if (n < MAX_RETRIES) { + const wait = DEFAULT_RETRY_DELAYS[n]; + await delay(wait * 1000); + return pollAttempt(call, n + 1, delay); + } + return { + success: false, + message: err instanceof Error ? err.message : 'Unknown error', + }; + } +}; + +export const retryPoll = ( + call: () => Promise>, + opts: RetryPollOpts = {}, +): Promise> => + pollAttempt(call, 0, opts.delayMs ?? defaultDelay); diff --git a/tests/commands/emails/receiving/listen.test.ts b/tests/commands/emails/receiving/listen.test.ts index 9b38a048..343a67b7 100644 --- a/tests/commands/emails/receiving/listen.test.ts +++ b/tests/commands/emails/receiving/listen.test.ts @@ -3,8 +3,8 @@ import { beforeEach, describe, expect, + it, type MockInstance, - test, vi, } from 'vitest'; import { @@ -69,7 +69,7 @@ describe('emails receiving listen command', () => { exitSpy = undefined; }); - test('errors with invalid_interval for interval below 2', async () => { + it('errors with invalid_interval for interval below 2', async () => { setNonInteractive(); errorSpy = vi.spyOn(console, 'error').mockImplementation(() => {}); exitSpy = mockExitThrow(); @@ -85,7 +85,7 @@ describe('emails receiving listen command', () => { expect(output).toContain('invalid_interval'); }); - test('errors with auth_error when no API key', async () => { + it('errors with auth_error when no API key', async () => { setNonInteractive(); delete process.env.RESEND_API_KEY; process.env.XDG_CONFIG_HOME = '/tmp/nonexistent-resend'; @@ -103,7 +103,7 @@ describe('emails receiving listen command', () => { expect(output).toContain('auth_error'); }); - test('errors with list_error when SDK returns an error', async () => { + it('errors with list_error when SDK returns an error', async () => { setNonInteractive(); mockList.mockResolvedValueOnce( mockSdkError('Server error', 'server_error'), @@ -125,7 +125,7 @@ describe('emails receiving listen command', () => { expect(output).toContain('list_error'); }); - test('initial fetch calls SDK with correct limit', async () => { + it('initial fetch calls SDK with correct limit', async () => { vi.useFakeTimers(); setupOutputSpies(); @@ -134,7 +134,6 @@ describe('emails receiving listen command', () => { ); listenReceivingCommand.parseAsync([], { from: 'user' }).catch(() => {}); - // Flush microtasks so the initial SDK call resolves await vi.advanceTimersByTimeAsync(0); expect(mockList).toHaveBeenCalledTimes(1); diff --git a/tests/lib/retry-poll.test.ts b/tests/lib/retry-poll.test.ts new file mode 100644 index 00000000..426c0fe8 --- /dev/null +++ b/tests/lib/retry-poll.test.ts @@ -0,0 +1,175 @@ +import { describe, expect, it } from 'vitest'; +import { retryPoll } from '../../src/lib/retry-poll'; + +const noDelay = (_ms: number) => Promise.resolve(); + +const makeResponse = (data: T) => ({ + data, + error: null, + headers: null, +}); + +const makeError = (message: string, name?: string) => ({ + data: null, + error: { message, name }, + headers: null, +}); + +const makeRateLimited = (retryAfter?: string) => ({ + data: null, + error: { message: 'Rate limited', name: 'rate_limit_exceeded' }, + headers: retryAfter ? { 'retry-after': retryAfter } : null, +}); + +describe('retryPoll', () => { + it('returns data on first successful call', async () => { + const result = await retryPoll( + () => Promise.resolve(makeResponse({ id: '1' })), + { delayMs: noDelay }, + ); + + expect(result).toEqual({ success: true, data: { id: '1' } }); + }); + + it('returns failure for non-retryable SDK error', async () => { + const result = await retryPoll( + () => Promise.resolve(makeError('Bad request', 'validation_error')), + { delayMs: noDelay }, + ); + + expect(result).toEqual({ success: false, message: 'Bad request' }); + }); + + it('retries rate_limit_exceeded and succeeds', async () => { + let attempt = 0; + const result = await retryPoll( + () => { + attempt++; + if (attempt <= 2) { + return Promise.resolve(makeRateLimited()); + } + return Promise.resolve(makeResponse({ id: 'ok' })); + }, + { delayMs: noDelay }, + ); + + expect(attempt).toBe(3); + expect(result).toEqual({ success: true, data: { id: 'ok' } }); + }); + + it('fails after exhausting retries on rate_limit_exceeded', async () => { + let attempt = 0; + const result = await retryPoll( + () => { + attempt++; + return Promise.resolve(makeRateLimited()); + }, + { delayMs: noDelay }, + ); + + expect(attempt).toBe(4); + expect(result).toEqual({ success: false, message: 'Rate limited' }); + }); + + it('retries thrown errors and succeeds', async () => { + let attempt = 0; + const result = await retryPoll( + () => { + attempt++; + if (attempt <= 2) { + return Promise.reject(new Error('Network timeout')); + } + return Promise.resolve(makeResponse({ recovered: true })); + }, + { delayMs: noDelay }, + ); + + expect(attempt).toBe(3); + expect(result).toEqual({ success: true, data: { recovered: true } }); + }); + + it('fails after exhausting retries on thrown errors', async () => { + let attempt = 0; + const result = await retryPoll( + () => { + attempt++; + return Promise.reject(new Error('Connection refused')); + }, + { delayMs: noDelay }, + ); + + expect(attempt).toBe(4); + expect(result).toEqual({ + success: false, + message: 'Connection refused', + }); + }); + + it('uses retry-after header when available', async () => { + const delays: number[] = []; + let attempt = 0; + await retryPoll( + () => { + attempt++; + if (attempt === 1) { + return Promise.resolve(makeRateLimited('3')); + } + return Promise.resolve(makeResponse({ id: 'ok' })); + }, + { + delayMs: (ms) => { + delays.push(ms); + return Promise.resolve(); + }, + }, + ); + + expect(delays).toEqual([3000]); + }); + + it('returns failure for null data response', async () => { + const result = await retryPoll( + () => Promise.resolve({ data: null, error: null, headers: null }), + { delayMs: noDelay }, + ); + + expect(result).toEqual({ + success: false, + message: 'Unexpected empty response', + }); + }); + + it('handles non-Error thrown values', async () => { + let attempt = 0; + const result = await retryPoll( + () => { + attempt++; + return Promise.reject('string error'); + }, + { delayMs: noDelay }, + ); + + expect(attempt).toBe(4); + expect(result).toEqual({ success: false, message: 'Unknown error' }); + }); + + it('recovers from mixed rate-limit and thrown errors', async () => { + let attempt = 0; + const result = await retryPoll( + () => { + attempt++; + if (attempt === 1) { + return Promise.resolve(makeRateLimited()); + } + if (attempt === 2) { + return Promise.reject(new Error('Network blip')); + } + return Promise.resolve(makeResponse({ id: 'recovered' })); + }, + { delayMs: noDelay }, + ); + + expect(attempt).toBe(3); + expect(result).toEqual({ success: true, data: { id: 'recovered' } }); + }); +});