Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,17 @@ JWT_SECRET=replace-with-a-secure-jwt-secret
# Provider balance cache TTL (seconds)
PROVIDER_BALANCE_CACHE_TTL=60

# ---------------------------------------------------------------------------
# Accounting Integrations (QuickBooks & Xero)
# ---------------------------------------------------------------------------
QUICKBOOKS_CLIENT_ID=your_quickbooks_client_id
QUICKBOOKS_CLIENT_SECRET=your_quickbooks_client_secret
QUICKBOOKS_REDIRECT_URI=http://localhost:3000/api/accounting/quickbooks/callback

XERO_CLIENT_ID=your_xero_client_id
XERO_CLIENT_SECRET=your_xero_client_secret
XERO_REDIRECT_URI=http://localhost:3000/api/accounting/xero/callback

# ---------------------------------------------------------------------------
# Centralized Logging — Loki Transport (feature/centralized-logging)
# ---------------------------------------------------------------------------
Expand Down
30 changes: 30 additions & 0 deletions src/config/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,30 @@ export const env = cleanEnv(process.env, {
desc: "API key for third-party AML/sanction screening provider (e.g. Elliptic, Chainalysis)",
example: "ell_live_xxxxxxxxxxxx",
}),
QUICKBOOKS_CLIENT_ID: str({
default: "",
desc: "QuickBooks Online OAuth 2.0 Client ID",
}),
QUICKBOOKS_CLIENT_SECRET: str({
default: "",
desc: "QuickBooks Online OAuth 2.0 Client Secret",
}),
QUICKBOOKS_REDIRECT_URI: str({
default: "http://localhost:3000/api/accounting/quickbooks/callback",
desc: "QuickBooks Online OAuth 2.0 Redirect URI",
}),
XERO_CLIENT_ID: str({
default: "",
desc: "Xero OAuth 2.0 Client ID",
}),
XERO_CLIENT_SECRET: str({
default: "",
desc: "Xero OAuth 2.0 Client Secret",
}),
XERO_REDIRECT_URI: str({
default: "http://localhost:3000/api/accounting/xero/callback",
desc: "Xero OAuth 2.0 Redirect URI",
}),
});

// Re-export specific values for convenience
Expand All @@ -132,4 +156,10 @@ export const {
INDEX_REINDEX_MIN_SIZE_MB,
INDEX_REINDEX_MAX_SCAN_COUNT,
INDEX_REINDEX_MAX_ACTIVE_CONNECTIONS,
QUICKBOOKS_CLIENT_ID,
QUICKBOOKS_CLIENT_SECRET,
QUICKBOOKS_REDIRECT_URI,
XERO_CLIENT_ID,
XERO_CLIENT_SECRET,
XERO_REDIRECT_URI,
} = env;
11 changes: 9 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ import exchangeRateBufferRoutes from "./routes/exchangeRateBuffers";
import adminAssetRoutes from "./routes/admin/assets";
import settingsRoutes from "./routes/settings";
import merchantWebhooksRouter from "./routes/merchantWebhooks";
import accountingRoutes from "./routes/accounting";




Expand Down Expand Up @@ -406,6 +408,7 @@ app.use("/api/exchange-rate-buffers", exchangeRateBufferRoutes);
app.use("/api/admin/assets", adminAssetRoutes);
app.use("/api/settings", settingsRoutes);
app.use("/api/merchant/webhooks", merchantWebhooksRouter);
app.use("/api/accounting", accountingRoutes);

// Subscriptions management
app.use("/api/subscriptions", subscriptionsRoutes);
Expand Down Expand Up @@ -590,9 +593,13 @@ async function initializeRuntime(): Promise<void> {
await layeredCache.init();
console.log("Layered cache (L1/L2) initialized");

const { startProviderBalanceAlertWorker, scheduleProviderBalanceAlertJob } =
await import("./queue");
const {
startProviderBalanceAlertWorker,
scheduleProviderBalanceAlertJob,
startAccountingTokenRefreshWorker,
} = await import("./queue");
startProviderBalanceAlertWorker();
startAccountingTokenRefreshWorker();
await scheduleProviderBalanceAlertJob();
console.log("Provider balance alert queue initialized");
} catch (err) {
Expand Down
55 changes: 55 additions & 0 deletions src/queue/accountingTokenRefreshQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import { Queue, JobsOptions } from "bullmq";
import { queueOptions } from "./config";

export const ACCOUNTING_TOKEN_REFRESH_QUEUE_NAME = "accounting-token-refresh";

export const accountingTokenRefreshQueue = new Queue(
ACCOUNTING_TOKEN_REFRESH_QUEUE_NAME,
queueOptions
);

export interface AccountingTokenRefreshJobData {
connectionId: string;
provider: "quickbooks" | "xero";
}

/**
* Adds a job to refresh an accounting token.
*
* @param connectionId The ID of the connection to refresh
* @param provider The accounting provider
* @param delayMs Delay in milliseconds (e.g., 10 minutes before expiry)
*/
export async function addAccountingTokenRefreshJob(
connectionId: string,
provider: "quickbooks" | "xero",
delayMs: number
): Promise<void> {
const jobOptions: JobsOptions = {
delay: delayMs,
removeOnComplete: true,
removeOnFail: {
age: 24 * 3600, // keep failed jobs for 24 hours
},
attempts: 3,
backoff: {
type: "exponential",
delay: 5000,
},
};

await accountingTokenRefreshQueue.add(
`refresh-${connectionId}`,
{ connectionId, provider },
jobOptions
);
}

export async function removeAccountingTokenRefreshJob(connectionId: string): Promise<void> {
const jobs = await accountingTokenRefreshQueue.getJobs(["delayed", "waiting"]);
for (const job of jobs) {
if (job.data.connectionId === connectionId) {
await job.remove();
}
}
}
52 changes: 52 additions & 0 deletions src/queue/accountingTokenRefreshWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { Worker, Job } from "bullmq";
import { queueOptions } from "./config";
import { ACCOUNTING_TOKEN_REFRESH_QUEUE_NAME, AccountingTokenRefreshJobData } from "./accountingTokenRefreshQueue";
import { AccountingService } from "../services/accounting";
import { logger } from "../services/logger";

let worker: Worker | null = null;

export function startAccountingTokenRefreshWorker(): void {
if (worker) return;

const accountingService = new AccountingService();

worker = new Worker(
ACCOUNTING_TOKEN_REFRESH_QUEUE_NAME,
async (job: Job<AccountingTokenRefreshJobData>) => {
const { connectionId, provider } = job.data;

logger.info(`Processing token refresh for ${provider} connection ${connectionId}`);

try {
if (provider === "quickbooks") {
await accountingService.refreshQuickBooksToken(connectionId);
} else if (provider === "xero") {
await accountingService.refreshXeroToken(connectionId);
} else {
throw new Error(`Unsupported accounting provider: ${provider}`);
}

logger.info(`Successfully refreshed tokens for ${provider} connection ${connectionId}`);
} catch (error) {
logger.error(`Failed to refresh tokens for ${provider} connection ${connectionId}:`, error);
throw error; // Re-throw to trigger BullMQ retry
}
},
queueOptions
);

worker.on("failed", (job, err) => {
logger.error(`Accounting token refresh job ${job?.id} failed:`, err);
});

logger.info("Accounting token refresh worker started");
}

export async function closeAccountingTokenRefreshWorker(): Promise<void> {
if (worker) {
await worker.close();
worker = null;
logger.info("Accounting token refresh worker closed");
}
}
10 changes: 10 additions & 0 deletions src/queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,14 @@ import {
startProviderBalanceAlertWorker,
} from "./providerBalanceAlertWorker";
import { closeAccountMergeWorker } from "./accountMergeWorker";
import {
startAccountingTokenRefreshWorker,
closeAccountingTokenRefreshWorker,
} from "./accountingTokenRefreshWorker";

export async function shutdownQueue(): Promise<void> {
console.log("Shutting down queues...");
await closeAccountingTokenRefreshWorker().catch(() => undefined);
await closeProviderBalanceAlertWorker().catch(() => undefined);
await closeProviderBalanceAlertQueue().catch(() => undefined);
await closeAccountMergeWorker().catch(() => undefined);
Expand Down Expand Up @@ -74,5 +79,10 @@ export {
closeAccountMergeWorker,
} from "./accountMergeWorker";

export {
startAccountingTokenRefreshWorker,
closeAccountingTokenRefreshWorker,
};

// Trace-ID propagation utilities
export { withTraceId, traceIdFromJob, childLoggerWithTrace, TRACE_ID_KEY } from "./trace";
55 changes: 23 additions & 32 deletions src/routes/accounting.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
consumeXeroOAuthState,
} from "../services/xeroOauthState";
import { z } from "zod";
import { v4 as uuidv4 } from "uuid";

const router = Router();
const accountingService = new AccountingService();
Expand All @@ -18,9 +19,10 @@ const XERO_SUCCESS_REDIRECT_URL = process.env.XERO_SUCCESS_REDIRECT_URL || "";
const XERO_FAILURE_REDIRECT_URL = process.env.XERO_FAILURE_REDIRECT_URL || "";

// Validation schemas
const connectQuickBooksSchema = z.object({
const connectQuickBooksCallbackSchema = z.object({
code: z.string(),
realmId: z.string(),
state: z.string(),
});

const connectXeroSchema = z.object({
Expand Down Expand Up @@ -167,68 +169,57 @@ router.get(

// Get authorization URLs
router.get(
"/auth/quickbooks/url",
"/quickbooks/auth",
async (req: Request, res: Response, next: NextFunction) => {
try {
const authUrl = accountingService.getQuickBooksAuthUrl();
res.json({ authUrl });
const state = uuidv4();
const authUrl = accountingService.getQuickBooksAuthUrl(state);
res.redirect(authUrl);
} catch (error) {
next(error);
}
},
);

router.get(
"/auth/xero/url",
"/auth/quickbooks/url",
async (req: Request, res: Response, next: NextFunction) => {
try {
const authUrl = accountingService.getXeroAuthUrl();
const authUrl = accountingService.getQuickBooksAuthUrl();
res.json({ authUrl });
} catch (error) {
next(error);
}
},
);

// Handle OAuth callbacks
router.post(
"/auth/quickbooks/callback",
validateRequest(connectQuickBooksSchema),
router.get(
"/auth/xero/url",
async (req: Request, res: Response, next: NextFunction) => {
try {
const { code, realmId } = req.body;
const userId = (req as any).user.id;

const connection = await accountingService.handleQuickBooksCallback(
code,
realmId,
userId,
);

res.status(201).json({
connection: {
id: connection.id,
provider: connection.provider,
isActive: connection.isActive,
createdAt: connection.createdAt,
},
});
const authUrl = accountingService.getXeroAuthUrl();
res.json({ authUrl });
} catch (error) {
next(error);
}
},
);

router.post(
"/auth/xero/callback",
validateRequest(connectXeroSchema),
// Handle QuickBooks OAuth callback
router.get(
"/quickbooks/callback",
async (req: Request, res: Response, next: NextFunction) => {
try {
const { code } = req.body;
const { code, realmId, state } = req.query as {
code: string;
realmId: string;
state: string;
};
const userId = (req as any).user.id;

const connection = await accountingService.handleXeroCallback(
const connection = await accountingService.handleQuickBooksCallback(
code,
realmId,
userId,
);

Expand Down
Loading
Loading