Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 34 additions & 49 deletions src/commands/emails/receiving/listen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,19 @@ import { getCancelExitCode, setSigintHandler } from '../../../lib/cli-exit';
import type { GlobalOpts } from '../../../lib/client';
import { requireClient } from '../../../lib/client';
import { buildHelpText } from '../../../lib/help-text';
import { errorMessage, outputError } from '../../../lib/output';
import { outputError } from '../../../lib/output';
import { retryPoll } from '../../../lib/retry-poll';
import { safeTerminalText } from '../../../lib/safe-terminal-text';
import { createSpinner } from '../../../lib/spinner';
import { isInteractive } from '../../../lib/tty';

const PAGE_SIZE = 100;
const MAX_CONSECUTIVE_ERRORS = 5;

function timestamp(): string {
return new Date().toLocaleTimeString('en-GB', { hour12: false });
}
const timestamp = (): string =>
new Date().toLocaleTimeString('en-GB', { hour12: false });

function displayEmail(email: ListReceivingEmail, jsonMode: boolean): void {
const displayEmail = (email: ListReceivingEmail, jsonMode: boolean): void => {
if (jsonMode) {
console.log(JSON.stringify(email));
} else {
Expand All @@ -31,7 +32,7 @@ function displayEmail(email: ListReceivingEmail, jsonMode: boolean): void {
`${ts} ${from} -> ${to} ${pc.bold(`"${subject}"`)} ${pc.dim(id)}\n`,
);
}
}
};

export const listenReceivingCommand = new Command('listen')
.description('Poll for new inbound emails and display them as they arrive')
Expand Down Expand Up @@ -74,7 +75,6 @@ Ctrl+C exits cleanly.`,
const resend = await requireClient(globalOpts);
const jsonMode = globalOpts.json || !isInteractive();

// Initial poll — just grab the latest email to establish our starting point
const spinner = createSpinner(
'Connecting...',
globalOpts.quiet || jsonMode,
Expand All @@ -83,36 +83,27 @@ Ctrl+C exits cleanly.`,
const seenIds = new Set<string>();
let consecutiveErrors = 0;

try {
const { data, error } = await resend.emails.receiving.list({ limit: 1 });
if (error || !data) {
spinner.fail('Failed to connect');
outputError(
{
message: error?.message ?? 'Unexpected empty response',
code: 'list_error',
},
{ json: globalOpts.json },
);
}

for (const email of data.data) {
seenIds.add(email.id);
}
const result = await retryPoll(() =>
resend.emails.receiving.list({ limit: 1 }),
);

spinner.stop('Ready');
} catch (err) {
if (!result.success) {
spinner.fail('Failed to connect');
outputError(
{
message: errorMessage(err, 'Unknown error'),
message: result.message,
code: 'list_error',
},
{ json: globalOpts.json },
);
}

// Print banner
for (const email of result.data.data) {
seenIds.add(email.id);
}

spinner.stop('Ready');

if (!jsonMode) {
process.stderr.write('\n');
process.stderr.write(` ${pc.bold('Polling:')} every ${interval}s\n`);
Expand All @@ -121,35 +112,32 @@ Ctrl+C exits cleanly.`,
);
}

// Helper: handle consecutive poll errors and exit if threshold reached
function handlePollError(message: string): void {
const handlePollError = (message: string): void => {
consecutiveErrors++;
if (!jsonMode) {
process.stderr.write(
`${pc.dim(`[${timestamp()}]`)} ${pc.yellow('Warning:')} ${message}\n`,
);
}
if (consecutiveErrors >= 5) {
if (consecutiveErrors >= MAX_CONSECUTIVE_ERRORS) {
outputError(
{
message: 'Exiting after 5 consecutive API failures.',
message: `Exiting after ${MAX_CONSECUTIVE_ERRORS} consecutive API failures.`,
code: 'poll_error',
},
{ json: globalOpts.json },
);
}
}
};

// Poll loop — paginates until it hits a seen email to guarantee no misses
let timeoutHandle: ReturnType<typeof setTimeout>;

async function poll(): Promise<void> {
const poll = async (): Promise<void> => {
try {
const newEmails: ListReceivingEmail[] = [];
let cursor: string | undefined;
let hasMore = true;

// Paginate through results until we find an email we've already seen
while (hasMore) {
const params: { limit: number; after?: string } = {
limit: PAGE_SIZE,
Expand All @@ -158,50 +146,48 @@ Ctrl+C exits cleanly.`,
params.after = cursor;
}

const { data, error } = await resend.emails.receiving.list(params);
if (error || !data) {
handlePollError(error?.message ?? 'Empty response');
const pollResult = await retryPoll(() =>
resend.emails.receiving.list(params),
);
if (!pollResult.success) {
handlePollError(pollResult.message);
return;
}

let foundSeen = false;
for (const email of data.data) {
for (const email of pollResult.data.data) {
if (seenIds.has(email.id)) {
foundSeen = true;
break;
}
newEmails.push(email);
}

if (foundSeen || !data.has_more) {
if (foundSeen || !pollResult.data.has_more) {
hasMore = false;
} else {
// Use the last email as cursor for the next page
cursor = data.data[data.data.length - 1]?.id;
cursor = pollResult.data.data[pollResult.data.data.length - 1]?.id;
}
}

consecutiveErrors = 0;

// Mark all new emails as seen
for (const email of newEmails) {
seenIds.add(email.id);
}

// Display in chronological order (oldest first)
for (const email of newEmails.reverse()) {
for (const email of [...newEmails].reverse()) {
displayEmail(email, jsonMode);
}
} catch (err) {
handlePollError(errorMessage(err, 'Unknown error'));
handlePollError(err instanceof Error ? err.message : 'Unknown error');
} finally {
timeoutHandle = setTimeout(poll, interval * 1000);
}
}
};

timeoutHandle = setTimeout(poll, interval * 1000);

// Graceful exit
const handleSignal = () => {
clearTimeout(timeoutHandle);
if (!jsonMode) {
Expand All @@ -213,6 +199,5 @@ Ctrl+C exits cleanly.`,
setSigintHandler(handleSignal);
process.on('SIGTERM', handleSignal);

// Keep alive
await new Promise(() => {});
});
68 changes: 68 additions & 0 deletions src/lib/retry-poll.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
type SdkResponse<T> = {
readonly data: T | null;
readonly error: { readonly message: string; readonly name?: string } | null;
readonly headers?: Record<string, string> | null;
};

type RetryPollSuccess<T> = { readonly success: true; readonly data: T };
type RetryPollFailure = { readonly success: false; readonly message: string };
export type RetryPollResult<T> = RetryPollSuccess<T> | RetryPollFailure;

export type RetryPollOpts = {
readonly delayMs?: (ms: number) => Promise<void>;
};

const DEFAULT_RETRY_DELAYS = [1, 2, 4];
const MAX_RETRIES = DEFAULT_RETRY_DELAYS.length;

const defaultDelay = (ms: number): Promise<void> =>
new Promise((resolve) => setTimeout(resolve, ms));

const parseRetryDelay = (
headers?: Record<string, string> | null,
): number | undefined => {
const value = headers?.['retry-after'];
if (!value) {
return undefined;
}
const seconds = Number(value);
return Number.isFinite(seconds) && seconds >= 0 ? seconds : undefined;
};

const pollAttempt = async <T>(
call: () => Promise<SdkResponse<T>>,
n: number,
delay: (ms: number) => Promise<void>,
): Promise<RetryPollResult<T>> => {
try {
const { data, error, headers } = await call();
if (error) {
if (n < MAX_RETRIES && error.name === 'rate_limit_exceeded') {
const wait = parseRetryDelay(headers) ?? DEFAULT_RETRY_DELAYS[n];
await delay(wait * 1000);
return pollAttempt(call, n + 1, delay);
}
return { success: false, message: error.message };
}
if (data === null) {
return { success: false, message: 'Unexpected empty response' };
}
return { success: true, data };
} catch (err) {
if (n < MAX_RETRIES) {
const wait = DEFAULT_RETRY_DELAYS[n];
await delay(wait * 1000);
return pollAttempt(call, n + 1, delay);
}
return {
success: false,
message: err instanceof Error ? err.message : 'Unknown error',
};
}
};

export const retryPoll = <T>(
call: () => Promise<SdkResponse<T>>,
opts: RetryPollOpts = {},
): Promise<RetryPollResult<T>> =>
pollAttempt(call, 0, opts.delayMs ?? defaultDelay);
11 changes: 5 additions & 6 deletions tests/commands/emails/receiving/listen.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import {
beforeEach,
describe,
expect,
it,
type MockInstance,
test,
vi,
} from 'vitest';
import {
Expand Down Expand Up @@ -69,7 +69,7 @@ describe('emails receiving listen command', () => {
exitSpy = undefined;
});

test('errors with invalid_interval for interval below 2', async () => {
it('errors with invalid_interval for interval below 2', async () => {
setNonInteractive();
errorSpy = vi.spyOn(console, 'error').mockImplementation(() => {});
exitSpy = mockExitThrow();
Expand All @@ -85,7 +85,7 @@ describe('emails receiving listen command', () => {
expect(output).toContain('invalid_interval');
});

test('errors with auth_error when no API key', async () => {
it('errors with auth_error when no API key', async () => {
setNonInteractive();
delete process.env.RESEND_API_KEY;
process.env.XDG_CONFIG_HOME = '/tmp/nonexistent-resend';
Expand All @@ -103,7 +103,7 @@ describe('emails receiving listen command', () => {
expect(output).toContain('auth_error');
});

test('errors with list_error when SDK returns an error', async () => {
it('errors with list_error when SDK returns an error', async () => {
setNonInteractive();
mockList.mockResolvedValueOnce(
mockSdkError('Server error', 'server_error'),
Expand All @@ -125,7 +125,7 @@ describe('emails receiving listen command', () => {
expect(output).toContain('list_error');
});

test('initial fetch calls SDK with correct limit', async () => {
it('initial fetch calls SDK with correct limit', async () => {
vi.useFakeTimers();
setupOutputSpies();

Expand All @@ -134,7 +134,6 @@ describe('emails receiving listen command', () => {
);

listenReceivingCommand.parseAsync([], { from: 'user' }).catch(() => {});
// Flush microtasks so the initial SDK call resolves
await vi.advanceTimersByTimeAsync(0);

expect(mockList).toHaveBeenCalledTimes(1);
Expand Down
Loading