diff --git a/.env.example b/.env.example index af7611fc..a15f5552 100644 --- a/.env.example +++ b/.env.example @@ -512,6 +512,17 @@ JWT_SECRET=replace-with-a-secure-jwt-secret # Provider balance cache TTL (seconds) PROVIDER_BALANCE_CACHE_TTL=60 +# --------------------------------------------------------------------------- +# Accounting Integrations (QuickBooks & Xero) +# --------------------------------------------------------------------------- +QUICKBOOKS_CLIENT_ID=your_quickbooks_client_id +QUICKBOOKS_CLIENT_SECRET=your_quickbooks_client_secret +QUICKBOOKS_REDIRECT_URI=http://localhost:3000/api/accounting/quickbooks/callback + +XERO_CLIENT_ID=your_xero_client_id +XERO_CLIENT_SECRET=your_xero_client_secret +XERO_REDIRECT_URI=http://localhost:3000/api/accounting/xero/callback + # --------------------------------------------------------------------------- # Centralized Logging — Loki Transport (feature/centralized-logging) # --------------------------------------------------------------------------- diff --git a/src/config/env.ts b/src/config/env.ts index bb98570f..456820d2 100644 --- a/src/config/env.ts +++ b/src/config/env.ts @@ -110,6 +110,30 @@ export const env = cleanEnv(process.env, { desc: "API key for third-party AML/sanction screening provider (e.g. Elliptic, Chainalysis)", example: "ell_live_xxxxxxxxxxxx", }), + QUICKBOOKS_CLIENT_ID: str({ + default: "", + desc: "QuickBooks Online OAuth 2.0 Client ID", + }), + QUICKBOOKS_CLIENT_SECRET: str({ + default: "", + desc: "QuickBooks Online OAuth 2.0 Client Secret", + }), + QUICKBOOKS_REDIRECT_URI: str({ + default: "http://localhost:3000/api/accounting/quickbooks/callback", + desc: "QuickBooks Online OAuth 2.0 Redirect URI", + }), + XERO_CLIENT_ID: str({ + default: "", + desc: "Xero OAuth 2.0 Client ID", + }), + XERO_CLIENT_SECRET: str({ + default: "", + desc: "Xero OAuth 2.0 Client Secret", + }), + XERO_REDIRECT_URI: str({ + default: "http://localhost:3000/api/accounting/xero/callback", + desc: "Xero OAuth 2.0 Redirect URI", + }), }); // Re-export specific values for convenience @@ -132,4 +156,10 @@ export const { INDEX_REINDEX_MIN_SIZE_MB, INDEX_REINDEX_MAX_SCAN_COUNT, INDEX_REINDEX_MAX_ACTIVE_CONNECTIONS, + QUICKBOOKS_CLIENT_ID, + QUICKBOOKS_CLIENT_SECRET, + QUICKBOOKS_REDIRECT_URI, + XERO_CLIENT_ID, + XERO_CLIENT_SECRET, + XERO_REDIRECT_URI, } = env; diff --git a/src/index.ts b/src/index.ts index d6cb8f6a..e1281e92 100644 --- a/src/index.ts +++ b/src/index.ts @@ -90,6 +90,8 @@ import exchangeRateBufferRoutes from "./routes/exchangeRateBuffers"; import adminAssetRoutes from "./routes/admin/assets"; import settingsRoutes from "./routes/settings"; import merchantWebhooksRouter from "./routes/merchantWebhooks"; +import accountingRoutes from "./routes/accounting"; + @@ -406,6 +408,7 @@ app.use("/api/exchange-rate-buffers", exchangeRateBufferRoutes); app.use("/api/admin/assets", adminAssetRoutes); app.use("/api/settings", settingsRoutes); app.use("/api/merchant/webhooks", merchantWebhooksRouter); +app.use("/api/accounting", accountingRoutes); // Subscriptions management app.use("/api/subscriptions", subscriptionsRoutes); @@ -590,9 +593,13 @@ async function initializeRuntime(): Promise { await layeredCache.init(); console.log("Layered cache (L1/L2) initialized"); - const { startProviderBalanceAlertWorker, scheduleProviderBalanceAlertJob } = - await import("./queue"); + const { + startProviderBalanceAlertWorker, + scheduleProviderBalanceAlertJob, + startAccountingTokenRefreshWorker, + } = await import("./queue"); startProviderBalanceAlertWorker(); + startAccountingTokenRefreshWorker(); await scheduleProviderBalanceAlertJob(); console.log("Provider balance alert queue initialized"); } catch (err) { diff --git a/src/queue/accountingTokenRefreshQueue.ts b/src/queue/accountingTokenRefreshQueue.ts new file mode 100644 index 00000000..c7a22a88 --- /dev/null +++ b/src/queue/accountingTokenRefreshQueue.ts @@ -0,0 +1,55 @@ +import { Queue, JobsOptions } from "bullmq"; +import { queueOptions } from "./config"; + +export const ACCOUNTING_TOKEN_REFRESH_QUEUE_NAME = "accounting-token-refresh"; + +export const accountingTokenRefreshQueue = new Queue( + ACCOUNTING_TOKEN_REFRESH_QUEUE_NAME, + queueOptions +); + +export interface AccountingTokenRefreshJobData { + connectionId: string; + provider: "quickbooks" | "xero"; +} + +/** + * Adds a job to refresh an accounting token. + * + * @param connectionId The ID of the connection to refresh + * @param provider The accounting provider + * @param delayMs Delay in milliseconds (e.g., 10 minutes before expiry) + */ +export async function addAccountingTokenRefreshJob( + connectionId: string, + provider: "quickbooks" | "xero", + delayMs: number +): Promise { + const jobOptions: JobsOptions = { + delay: delayMs, + removeOnComplete: true, + removeOnFail: { + age: 24 * 3600, // keep failed jobs for 24 hours + }, + attempts: 3, + backoff: { + type: "exponential", + delay: 5000, + }, + }; + + await accountingTokenRefreshQueue.add( + `refresh-${connectionId}`, + { connectionId, provider }, + jobOptions + ); +} + +export async function removeAccountingTokenRefreshJob(connectionId: string): Promise { + const jobs = await accountingTokenRefreshQueue.getJobs(["delayed", "waiting"]); + for (const job of jobs) { + if (job.data.connectionId === connectionId) { + await job.remove(); + } + } +} diff --git a/src/queue/accountingTokenRefreshWorker.ts b/src/queue/accountingTokenRefreshWorker.ts new file mode 100644 index 00000000..cea860d5 --- /dev/null +++ b/src/queue/accountingTokenRefreshWorker.ts @@ -0,0 +1,52 @@ +import { Worker, Job } from "bullmq"; +import { queueOptions } from "./config"; +import { ACCOUNTING_TOKEN_REFRESH_QUEUE_NAME, AccountingTokenRefreshJobData } from "./accountingTokenRefreshQueue"; +import { AccountingService } from "../services/accounting"; +import { logger } from "../services/logger"; + +let worker: Worker | null = null; + +export function startAccountingTokenRefreshWorker(): void { + if (worker) return; + + const accountingService = new AccountingService(); + + worker = new Worker( + ACCOUNTING_TOKEN_REFRESH_QUEUE_NAME, + async (job: Job) => { + const { connectionId, provider } = job.data; + + logger.info(`Processing token refresh for ${provider} connection ${connectionId}`); + + try { + if (provider === "quickbooks") { + await accountingService.refreshQuickBooksToken(connectionId); + } else if (provider === "xero") { + await accountingService.refreshXeroToken(connectionId); + } else { + throw new Error(`Unsupported accounting provider: ${provider}`); + } + + logger.info(`Successfully refreshed tokens for ${provider} connection ${connectionId}`); + } catch (error) { + logger.error(`Failed to refresh tokens for ${provider} connection ${connectionId}:`, error); + throw error; // Re-throw to trigger BullMQ retry + } + }, + queueOptions + ); + + worker.on("failed", (job, err) => { + logger.error(`Accounting token refresh job ${job?.id} failed:`, err); + }); + + logger.info("Accounting token refresh worker started"); +} + +export async function closeAccountingTokenRefreshWorker(): Promise { + if (worker) { + await worker.close(); + worker = null; + logger.info("Accounting token refresh worker closed"); + } +} diff --git a/src/queue/index.ts b/src/queue/index.ts index 27e30184..d90715ac 100644 --- a/src/queue/index.ts +++ b/src/queue/index.ts @@ -11,9 +11,14 @@ import { startProviderBalanceAlertWorker, } from "./providerBalanceAlertWorker"; import { closeAccountMergeWorker } from "./accountMergeWorker"; +import { + startAccountingTokenRefreshWorker, + closeAccountingTokenRefreshWorker, +} from "./accountingTokenRefreshWorker"; export async function shutdownQueue(): Promise { console.log("Shutting down queues..."); + await closeAccountingTokenRefreshWorker().catch(() => undefined); await closeProviderBalanceAlertWorker().catch(() => undefined); await closeProviderBalanceAlertQueue().catch(() => undefined); await closeAccountMergeWorker().catch(() => undefined); @@ -74,5 +79,10 @@ export { closeAccountMergeWorker, } from "./accountMergeWorker"; +export { + startAccountingTokenRefreshWorker, + closeAccountingTokenRefreshWorker, +}; + // Trace-ID propagation utilities export { withTraceId, traceIdFromJob, childLoggerWithTrace, TRACE_ID_KEY } from "./trace"; diff --git a/src/routes/accounting.ts b/src/routes/accounting.ts index 5c98ca3d..354bfac1 100644 --- a/src/routes/accounting.ts +++ b/src/routes/accounting.ts @@ -8,6 +8,7 @@ import { consumeXeroOAuthState, } from "../services/xeroOauthState"; import { z } from "zod"; +import { v4 as uuidv4 } from "uuid"; const router = Router(); const accountingService = new AccountingService(); @@ -18,9 +19,10 @@ const XERO_SUCCESS_REDIRECT_URL = process.env.XERO_SUCCESS_REDIRECT_URL || ""; const XERO_FAILURE_REDIRECT_URL = process.env.XERO_FAILURE_REDIRECT_URL || ""; // Validation schemas -const connectQuickBooksSchema = z.object({ +const connectQuickBooksCallbackSchema = z.object({ code: z.string(), realmId: z.string(), + state: z.string(), }); const connectXeroSchema = z.object({ @@ -167,11 +169,12 @@ router.get( // Get authorization URLs router.get( - "/auth/quickbooks/url", + "/quickbooks/auth", async (req: Request, res: Response, next: NextFunction) => { try { - const authUrl = accountingService.getQuickBooksAuthUrl(); - res.json({ authUrl }); + const state = uuidv4(); + const authUrl = accountingService.getQuickBooksAuthUrl(state); + res.redirect(authUrl); } catch (error) { next(error); } @@ -179,10 +182,10 @@ router.get( ); router.get( - "/auth/xero/url", + "/auth/quickbooks/url", async (req: Request, res: Response, next: NextFunction) => { try { - const authUrl = accountingService.getXeroAuthUrl(); + const authUrl = accountingService.getQuickBooksAuthUrl(); res.json({ authUrl }); } catch (error) { next(error); @@ -190,45 +193,33 @@ router.get( }, ); -// Handle OAuth callbacks -router.post( - "/auth/quickbooks/callback", - validateRequest(connectQuickBooksSchema), +router.get( + "/auth/xero/url", async (req: Request, res: Response, next: NextFunction) => { try { - const { code, realmId } = req.body; - const userId = (req as any).user.id; - - const connection = await accountingService.handleQuickBooksCallback( - code, - realmId, - userId, - ); - - res.status(201).json({ - connection: { - id: connection.id, - provider: connection.provider, - isActive: connection.isActive, - createdAt: connection.createdAt, - }, - }); + const authUrl = accountingService.getXeroAuthUrl(); + res.json({ authUrl }); } catch (error) { next(error); } }, ); -router.post( - "/auth/xero/callback", - validateRequest(connectXeroSchema), +// Handle QuickBooks OAuth callback +router.get( + "/quickbooks/callback", async (req: Request, res: Response, next: NextFunction) => { try { - const { code } = req.body; + const { code, realmId, state } = req.query as { + code: string; + realmId: string; + state: string; + }; const userId = (req as any).user.id; - const connection = await accountingService.handleXeroCallback( + const connection = await accountingService.handleQuickBooksCallback( code, + realmId, userId, ); diff --git a/src/services/accounting.ts b/src/services/accounting.ts index 5c240179..3d43c7b6 100644 --- a/src/services/accounting.ts +++ b/src/services/accounting.ts @@ -1,1190 +1,2475 @@ -import { pool } from "../config/database"; -import { redisClient } from "../config/redis"; -import axios from "axios"; -import { v4 as uuidv4 } from "uuid"; - -export enum AccountingProvider { - QUICKBOOKS = "quickbooks", - XERO = "xero", -} - -export interface AccountingConnection { - id: string; - userId: string; - provider: AccountingProvider; - realmId?: string; // QuickBooks company ID - tenantId?: string; // Xero tenant ID (active organization) - tenantName?: string; // Xero organization name (for display / selection) - accessToken: string; - refreshToken: string; - expiresAt: Date; - isActive: boolean; - createdAt: Date; - updatedAt: Date; -} - -export interface CategoryMapping { - id: string; - connectionId: string; - mobileMoneyCategory: string; - accountingCategoryId: string; - accountingCategoryName: string; - createdAt: Date; -} - -export interface SyncLog { - id: string; - connectionId: string; - syncType: "daily_pnl" | "fee_revenue"; - status: "pending" | "in_progress" | "completed" | "failed"; - recordsProcessed: number; - recordsSucceeded: number; - recordsFailed: number; - errorMessage?: string; - syncedAt: Date; -} - -export interface PnLData { - date: string; - revenue: number; - fees: number; - netProfit: number; - transactions: number; -} - -export interface QuickBooksTokenResponse { - access_token: string; - refresh_token: string; - expires_in: number; - token_type: string; - x_refresh_token_expires_in: number; -} - -export interface XeroTokenResponse { - access_token: string; - refresh_token: string; - expires_in: number; - token_type: string; - scope: string; -} - -/** - * A single Xero "connection" as returned by GET https://api.xero.com/connections. - * Each entry represents one organization (tenant) the user authorized. - */ -export interface XeroTenant { - id?: string; // connection id - tenantId: string; - tenantName?: string; - tenantType?: string; -} - -export class AccountingService { - private readonly quickbooksClientId: string; - private readonly quickbooksClientSecret: string; - private readonly quickbooksRedirectUri: string; - private readonly xeroClientId: string; - private readonly xeroClientSecret: string; - private readonly xeroRedirectUri: string; - - constructor() { - this.quickbooksClientId = process.env.QUICKBOOKS_CLIENT_ID || ""; - this.quickbooksClientSecret = process.env.QUICKBOOKS_CLIENT_SECRET || ""; - this.quickbooksRedirectUri = process.env.QUICKBOOKS_REDIRECT_URI || ""; - this.xeroClientId = process.env.XERO_CLIENT_ID || ""; - this.xeroClientSecret = process.env.XERO_CLIENT_SECRET || ""; - this.xeroRedirectUri = process.env.XERO_REDIRECT_URI || ""; - } - - // OAuth2 Authorization URLs - getQuickBooksAuthUrl(): string { - const scopes = [ - "com.intuit.quickbooks.accounting", - "com.intuit.quickbooks.payment", - ].join(" "); - - const params = new URLSearchParams({ - client_id: this.quickbooksClientId, - redirect_uri: this.quickbooksRedirectUri, - response_type: "code", - scope: scopes, - state: uuidv4(), - }); - - return `https://appcenter.intuit.com/connect/oauth2?${params.toString()}`; - } - - /** - * Build the Xero OAuth 2.0 authorization URL. - * - * `offline_access` is required for Xero to return a refresh token, and the - * `state` value is used for CSRF protection + to re-associate the headerless - * browser callback with the user who started the flow. When no state is - * supplied a random one is generated so existing callers keep working. - */ - getXeroAuthUrl(state?: string): string { - const scopes = [ - "offline_access", - "openid", - "profile", - "email", - "accounting.transactions", - "accounting.reports.read", - "accounting.settings", - ].join(" "); - - const params = new URLSearchParams({ - client_id: this.xeroClientId, - redirect_uri: this.xeroRedirectUri, - response_type: "code", - scope: scopes, - state: state || uuidv4(), - }); - - return `https://login.xero.com/identity/connect/authorize?${params.toString()}`; - } - - // Handle OAuth2 callbacks - async handleQuickBooksCallback( - code: string, - realmId: string, - userId: string, - ): Promise { - try { - const tokenResponse = await this.exchangeQuickBooksCode(code); - - const connection: AccountingConnection = { - id: uuidv4(), - userId, - provider: AccountingProvider.QUICKBOOKS, - realmId, - accessToken: tokenResponse.access_token, - refreshToken: tokenResponse.refresh_token, - expiresAt: new Date(Date.now() + tokenResponse.expires_in * 1000), - isActive: true, - createdAt: new Date(), - updatedAt: new Date(), - }; - - await this.saveConnection(connection); - return connection; - } catch (error) { - throw new Error(`QuickBooks OAuth failed: ${error}`); - } - } - - async handleXeroCallback( - code: string, - userId: string, - selectedTenantId?: string, - ): Promise { - try { - const tokenResponse = await this.exchangeXeroCode(code); - - // Resolve the tenant (organization) this token grants access to. A single - // Xero login can be connected to multiple organizations, so we must fetch - // the live list of authorized tenants and pick the right one. - const tenants = await this.getXeroTenants(tokenResponse.access_token); - - if (!tenants || tenants.length === 0) { - throw new Error( - "No Xero organizations are connected to this login. " + - "Please authorize at least one organization and try again.", - ); - } - - const activeTenant = this.resolveActiveXeroTenant( - tenants, - selectedTenantId, - ); - - const connection: AccountingConnection = { - id: uuidv4(), - userId, - provider: AccountingProvider.XERO, - tenantId: activeTenant.tenantId, - tenantName: activeTenant.tenantName, - accessToken: tokenResponse.access_token, - refreshToken: tokenResponse.refresh_token, - expiresAt: new Date(Date.now() + tokenResponse.expires_in * 1000), - isActive: true, - createdAt: new Date(), - updatedAt: new Date(), - }; - - await this.saveConnection(connection); - return connection; - } catch (error) { - throw new Error(`Xero OAuth failed: ${error}`); - } - } - - /** - * Expose the list of Xero organizations (tenants) associated with an - * authorization code. Useful when the front-end wants to let the user pick - * which organization to connect before finalizing the connection. - */ - async listXeroTenantsFromCode(code: string): Promise { - const tokenResponse = await this.exchangeXeroCode(code); - return this.getXeroTenants(tokenResponse.access_token); - } - - /** - * Pick the active tenant from the list returned by Xero. - * - * - When the caller explicitly selects a tenant we honor it (multi-tenant - * selection), but only if it is actually present in the authorized list. - * - Otherwise we default to the first authorized organization. - */ - private resolveActiveXeroTenant( - tenants: XeroTenant[], - selectedTenantId?: string, - ): XeroTenant { - if (selectedTenantId) { - const match = tenants.find((t) => t.tenantId === selectedTenantId); - if (!match) { - throw new Error( - `Selected Xero tenant "${selectedTenantId}" is not among the authorized organizations.`, - ); - } - return match; - } - return tenants[0]; - } - - // Exchange authorization code for tokens - private async exchangeQuickBooksCode( - code: string, - ): Promise { - const response = await axios.post( - "https://oauth.platform.intuit.com/oauth2/v1/tokens/bearer", - new URLSearchParams({ - grant_type: "authorization_code", - code, - redirect_uri: this.quickbooksRedirectUri, - }), - { - headers: { - "Content-Type": "application/x-www-form-urlencoded", - Authorization: `Basic ${Buffer.from( - `${this.quickbooksClientId}:${this.quickbooksClientSecret}`, - ).toString("base64")}`, - }, - }, - ); - - return response.data; - } - - private async exchangeXeroCode(code: string): Promise { - const response = await axios.post( - "https://identity.xero.com/connect/token", - new URLSearchParams({ - grant_type: "authorization_code", - code, - redirect_uri: this.xeroRedirectUri, - }), - { - headers: { - "Content-Type": "application/x-www-form-urlencoded", - Authorization: `Basic ${Buffer.from( - `${this.xeroClientId}:${this.xeroClientSecret}`, - ).toString("base64")}`, - }, - }, - ); - - return response.data; - } - - // Get Xero tenants (organizations) authorized for this access token. - private async getXeroTenants(accessToken: string): Promise { - const response = await axios.get("https://api.xero.com/connections", { - headers: { - Authorization: `Bearer ${accessToken}`, - "Content-Type": "application/json", - }, - }); - - const raw = Array.isArray(response.data) ? response.data : []; - - // Normalize the Xero connections payload into our XeroTenant shape. The - // Xero API returns objects like: - // { id, tenantId, tenantType, tenantName, ... } - return raw - .map((entry: any) => ({ - id: entry.id, - tenantId: entry.tenantId, - tenantName: entry.tenantName, - tenantType: entry.tenantType, - })) - .filter((t: XeroTenant) => Boolean(t.tenantId)); - } - - // Refresh access tokens - async refreshQuickBooksToken(connectionId: string): Promise { - const connection = await this.getConnection(connectionId); - if (!connection || connection.provider !== AccountingProvider.QUICKBOOKS) { - throw new Error("QuickBooks connection not found"); - } - - try { - const response = await axios.post( - "https://oauth.platform.intuit.com/oauth2/v1/tokens/bearer", - new URLSearchParams({ - grant_type: "refresh_token", - refresh_token: connection.refreshToken, - }), - { - headers: { - "Content-Type": "application/x-www-form-urlencoded", - Authorization: `Basic ${Buffer.from( - `${this.quickbooksClientId}:${this.quickbooksClientSecret}`, - ).toString("base64")}`, - }, - }, - ); - - await this.updateConnectionTokens(connectionId, { - accessToken: response.data.access_token, - refreshToken: response.data.refresh_token, - expiresAt: new Date(Date.now() + response.data.expires_in * 1000), - }); - } catch (error) { - throw new Error(`QuickBooks token refresh failed: ${error}`); - } - } - - async refreshXeroToken(connectionId: string): Promise { - const connection = await this.getConnection(connectionId); - if (!connection || connection.provider !== AccountingProvider.XERO) { - throw new Error("Xero connection not found"); - } - - try { - const response = await axios.post( - "https://identity.xero.com/connect/token", - new URLSearchParams({ - grant_type: "refresh_token", - refresh_token: connection.refreshToken, - }), - { - headers: { - "Content-Type": "application/x-www-form-urlencoded", - Authorization: `Basic ${Buffer.from( - `${this.xeroClientId}:${this.xeroClientSecret}`, - ).toString("base64")}`, - }, - }, - ); - - await this.updateConnectionTokens(connectionId, { - accessToken: response.data.access_token, - refreshToken: response.data.refresh_token, - expiresAt: new Date(Date.now() + response.data.expires_in * 1000), - }); - } catch (error) { - throw new Error(`Xero token refresh failed: ${error}`); - } - } - - // Category mapping - async createCategoryMapping( - connectionId: string, - mobileMoneyCategory: string, - accountingCategoryId: string, - accountingCategoryName: string, - ): Promise { - const mapping: CategoryMapping = { - id: uuidv4(), - connectionId, - mobileMoneyCategory, - accountingCategoryId, - accountingCategoryName, - createdAt: new Date(), - }; - - await pool.query( - `INSERT INTO category_mappings (id, connection_id, mobile_money_category, accounting_category_id, accounting_category_name, created_at) - VALUES ($1, $2, $3, $4, $5, $6)`, - [ - mapping.id, - mapping.connectionId, - mapping.mobileMoneyCategory, - mapping.accountingCategoryId, - mapping.accountingCategoryName, - mapping.createdAt, - ], - ); - - return mapping; - } - - async getCategoryMappings(connectionId: string): Promise { - const result = await pool.query( - "SELECT * FROM category_mappings WHERE connection_id = $1 ORDER BY mobile_money_category", - [connectionId], - ); - - return result.rows; - } - - async getAccountingCategories( - connectionId: string, - ): Promise> { - const connection = await this.getConnection(connectionId); - if (!connection) { - throw new Error("Connection not found"); - } - - if (connection.provider === AccountingProvider.QUICKBOOKS) { - return this.getQuickBooksCategories(connection); - } else if (connection.provider === AccountingProvider.XERO) { - return this.getXeroCategories(connection); - } - - throw new Error("Unsupported provider"); - } - - private async getQuickBooksCategories( - connection: AccountingConnection, - ): Promise> { - await this.ensureValidToken(connection.id); - - const connectionData = await this.getConnection(connection.id); - const response = await axios.get( - `https://quickbooks.api.intuit.com/v3/company/${connectionData!.realmId}/query?query=SELECT * FROM Account WHERE Active=true`, - { - headers: { - Authorization: `Bearer ${connectionData!.accessToken}`, - Accept: "application/json", - }, - }, - ); - - return response.data.QueryResponse.Account.map((account: any) => ({ - id: account.Id, - name: account.Name, - })); - } - - private async getXeroCategories( - connection: AccountingConnection, - ): Promise> { - await this.ensureValidToken(connection.id); - - const connectionData = await this.getConnection(connection.id); - const response = await axios.get( - "https://api.xero.com/api.xro/2.0/Accounts", - { - headers: { - Authorization: `Bearer ${connectionData!.accessToken}`, - "Xero-tenant-id": connectionData!.tenantId, - Accept: "application/json", - }, - }, - ); - - return response.data.Accounts.map((account: any) => ({ - id: account.AccountID, - name: account.Name, - })); - } - - // Sync functions - async syncDailyPnL(connectionId: string, date: string): Promise { - const syncLog: SyncLog = { - id: uuidv4(), - connectionId, - syncType: "daily_pnl", - status: "in_progress", - recordsProcessed: 0, - recordsSucceeded: 0, - recordsFailed: 0, - syncedAt: new Date(), - }; - - await this.createSyncLog(syncLog); - - try { - const connection = await this.getConnection(connectionId); - if (!connection) { - throw new Error("Connection not found"); - } - - await this.ensureValidToken(connectionId); - - // Get PnL data for the date - const pnlData = await this.getPnLData(date); - - if (connection.provider === AccountingProvider.QUICKBOOKS) { - await this.syncPnLToQuickBooks(connection, pnlData, syncLog); - } else if (connection.provider === AccountingProvider.XERO) { - await this.syncPnLToXero(connection, pnlData, syncLog); - } - - syncLog.status = "completed"; - await this.updateSyncLog(syncLog); - } catch (error) { - syncLog.status = "failed"; - syncLog.errorMessage = - error instanceof Error ? error.message : "Unknown error"; - await this.updateSyncLog(syncLog); - } - - return syncLog; - } - - async syncFeeRevenue(connectionId: string, date: string): Promise { - const syncLog: SyncLog = { - id: uuidv4(), - connectionId, - syncType: "fee_revenue", - status: "in_progress", - recordsProcessed: 0, - recordsSucceeded: 0, - recordsFailed: 0, - syncedAt: new Date(), - }; - - await this.createSyncLog(syncLog); - - try { - const connection = await this.getConnection(connectionId); - if (!connection) { - throw new Error("Connection not found"); - } - - await this.ensureValidToken(connectionId); - - // Get fee revenue data for the date - const feeData = await this.getFeeRevenueData(date); - - if (connection.provider === AccountingProvider.QUICKBOOKS) { - await this.syncFeeRevenueToQuickBooks(connection, feeData, syncLog); - } else if (connection.provider === AccountingProvider.XERO) { - await this.syncFeeRevenueToXero(connection, feeData, syncLog); - } - - syncLog.status = "completed"; - await this.updateSyncLog(syncLog); - } catch (error) { - syncLog.status = "failed"; - syncLog.errorMessage = - error instanceof Error ? error.message : "Unknown error"; - await this.updateSyncLog(syncLog); - } - - return syncLog; - } - - // Database operations - private async saveConnection( - connection: AccountingConnection, - ): Promise { - await pool.query( - `INSERT INTO accounting_connections - (id, user_id, provider, realm_id, tenant_id, tenant_name, access_token, refresh_token, expires_at, is_active, created_at, updated_at) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) - ON CONFLICT (id) DO UPDATE SET - tenant_id = EXCLUDED.tenant_id, - tenant_name = EXCLUDED.tenant_name, - access_token = EXCLUDED.access_token, - refresh_token = EXCLUDED.refresh_token, - expires_at = EXCLUDED.expires_at, - is_active = EXCLUDED.is_active, - updated_at = EXCLUDED.updated_at`, - [ - connection.id, - connection.userId, - connection.provider, - connection.realmId, - connection.tenantId, - connection.tenantName, - connection.accessToken, - connection.refreshToken, - connection.expiresAt, - connection.isActive, - connection.createdAt, - connection.updatedAt, - ], - ); - } - - private async updateConnectionTokens( - connectionId: string, - tokens: { accessToken: string; refreshToken: string; expiresAt: Date }, - ): Promise { - await pool.query( - "UPDATE accounting_connections SET access_token = $1, refresh_token = $2, expires_at = $3, updated_at = $4 WHERE id = $5", - [ - tokens.accessToken, - tokens.refreshToken, - tokens.expiresAt, - new Date(), - connectionId, - ], - ); - } - - async getConnection( - connectionId: string, - ): Promise { - const result = await pool.query( - "SELECT * FROM accounting_connections WHERE id = $1", - [connectionId], - ); - - if (result.rows.length === 0) { - return null; - } - - return result.rows[0]; - } - - async getUserConnections(userId: string): Promise { - const result = await pool.query( - "SELECT * FROM accounting_connections WHERE user_id = $1 AND is_active = true ORDER BY created_at DESC", - [userId], - ); - - return result.rows; - } - - private async createSyncLog(syncLog: SyncLog): Promise { - await pool.query( - `INSERT INTO sync_logs - (id, connection_id, sync_type, status, records_processed, records_succeeded, records_failed, error_message, synced_at) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)`, - [ - syncLog.id, - syncLog.connectionId, - syncLog.syncType, - syncLog.status, - syncLog.recordsProcessed, - syncLog.recordsSucceeded, - syncLog.recordsFailed, - syncLog.errorMessage, - syncLog.syncedAt, - ], - ); - } - - private async updateSyncLog(syncLog: SyncLog): Promise { - await pool.query( - `UPDATE sync_logs SET - status = $1, records_processed = $2, records_succeeded = $3, records_failed = $4, error_message = $5 - WHERE id = $6`, - [ - syncLog.status, - syncLog.recordsProcessed, - syncLog.recordsSucceeded, - syncLog.recordsFailed, - syncLog.errorMessage, - syncLog.id, - ], - ); - } - - async getSyncLogs( - connectionId: string, - limit: number = 50, - ): Promise { - const result = await pool.query( - "SELECT * FROM sync_logs WHERE connection_id = $1 ORDER BY synced_at DESC LIMIT $2", - [connectionId, limit], - ); - - return result.rows; - } - - // Helper functions - private async ensureValidToken(connectionId: string): Promise { - const connection = await this.getConnection(connectionId); - if (!connection) { - throw new Error("Connection not found"); - } - - if (new Date() >= connection.expiresAt) { - if (connection.provider === AccountingProvider.QUICKBOOKS) { - await this.refreshQuickBooksToken(connectionId); - } else if (connection.provider === AccountingProvider.XERO) { - await this.refreshXeroToken(connectionId); - } - } - } - - private async getPnLData(date: string): Promise { - // Calculate PnL data from transactions - const query = ` - SELECT - COUNT(*) as transactions, - COALESCE(SUM(CASE WHEN type = 'credit' THEN amount ELSE 0 END), 0) as revenue, - COALESCE(SUM(fee), 0) as fees - FROM transactions - WHERE DATE(created_at) = $1 - AND status = 'completed' - `; - - const result = await pool.query(query, [date]); - const { transactions, revenue, fees } = result.rows[0]; - - return { - date, - revenue: parseFloat(revenue), - fees: parseFloat(fees), - netProfit: parseFloat(revenue) - parseFloat(fees), - transactions: parseInt(transactions), - }; - } - - private async getFeeRevenueData( - date: string, - ): Promise<{ category: string; amount: number }[]> { - // Get fee revenue broken down by category - const query = ` - SELECT - fee_category, - SUM(fee) as amount - FROM transactions - WHERE DATE(created_at) = $1 - AND status = 'completed' - AND fee > 0 - GROUP BY fee_category - ORDER BY amount DESC - `; - - const result = await pool.query(query, [date]); - return result.rows.map((row) => ({ - category: row.fee_category || "General Fees", - amount: parseFloat(row.amount), - })); - } - - // Provider-specific sync implementations - private async syncPnLToQuickBooks( - connection: AccountingConnection, - pnlData: PnLData, - syncLog: SyncLog, - ): Promise { - const connectionData = await this.getConnection(connection.id); - const mappings = await this.getCategoryMappings(connection.id); - - // Create journal entry for PnL - const journalEntry = { - TxnDate: pnlData.date, - Line: [ - { - Description: `Daily P&L - ${pnlData.date}`, - Amount: pnlData.revenue, - DetailType: "JournalEntryLineDetail", - JournalEntryLineDetail: { - PostingType: "Credit", - AccountRef: this.getMappedCategory(mappings, "revenue") || { - value: "1", - }, // Default to Sales - }, - }, - { - Description: `Daily Fees - ${pnlData.date}`, - Amount: pnlData.fees, - DetailType: "JournalEntryLineDetail", - JournalEntryLineDetail: { - PostingType: "Debit", - AccountRef: this.getMappedCategory(mappings, "fees") || { - value: "4", - }, // Default to Expense - }, - }, - ], - }; - - try { - await axios.post( - `https://quickbooks.api.intuit.com/v3/company/${connectionData!.realmId}/journalentry`, - journalEntry, - { - headers: { - Authorization: `Bearer ${connectionData!.accessToken}`, - "Content-Type": "application/json", - }, - }, - ); - - syncLog.recordsProcessed = 1; - syncLog.recordsSucceeded = 1; - } catch (error) { - syncLog.recordsProcessed = 1; - syncLog.recordsFailed = 1; - throw error; - } - } - - private async syncPnLToXero( - connection: AccountingConnection, - pnlData: PnLData, - syncLog: SyncLog, - ): Promise { - const connectionData = await this.getConnection(connection.id); - const mappings = await this.getCategoryMappings(connection.id); - - // Create manual journal entry for PnL - const journalEntry = { - Date: pnlData.date, - JournalLines: [ - { - Description: `Daily P&L - ${pnlData.date}`, - CreditAmount: pnlData.revenue, - AccountID: this.getMappedCategory(mappings, "revenue") || "1", // Default to Sales - }, - { - Description: `Daily Fees - ${pnlData.date}`, - DebitAmount: pnlData.fees, - AccountID: this.getMappedCategory(mappings, "fees") || "4", // Default to Expense - }, - ], - }; - - try { - await axios.put( - "https://api.xero.com/api.xro/2.0/ManualJournals", - journalEntry, - { - headers: { - Authorization: `Bearer ${connectionData!.accessToken}`, - "Xero-tenant-id": connectionData!.tenantId, - "Content-Type": "application/json", - }, - }, - ); - - syncLog.recordsProcessed = 1; - syncLog.recordsSucceeded = 1; - } catch (error) { - syncLog.recordsProcessed = 1; - syncLog.recordsFailed = 1; - throw error; - } - } - - private async syncFeeRevenueToQuickBooks( - connection: AccountingConnection, - feeData: Array<{ category: string; amount: number }>, - syncLog: SyncLog, - ): Promise { - const connectionData = await this.getConnection(connection.id); - const mappings = await this.getCategoryMappings(connection.id); - - const lines = feeData.map((fee) => ({ - Description: `Fee Revenue - ${fee.category}`, - Amount: fee.amount, - DetailType: "JournalEntryLineDetail", - JournalEntryLineDetail: { - PostingType: "Credit", - AccountRef: this.getMappedCategory(mappings, fee.category) || { - value: "1", - }, - }, - })); - - const journalEntry = { - TxnDate: new Date().toISOString().split("T")[0], - Line: lines, - }; - - try { - await axios.post( - `https://quickbooks.api.intuit.com/v3/company/${connectionData!.realmId}/journalentry`, - journalEntry, - { - headers: { - Authorization: `Bearer ${connectionData!.accessToken}`, - "Content-Type": "application/json", - }, - }, - ); - - syncLog.recordsProcessed = feeData.length; - syncLog.recordsSucceeded = feeData.length; - } catch (error) { - syncLog.recordsProcessed = feeData.length; - syncLog.recordsFailed = feeData.length; - throw error; - } - } - - private async syncFeeRevenueToXero( - connection: AccountingConnection, - feeData: Array<{ category: string; amount: number }>, - syncLog: SyncLog, - ): Promise { - const connectionData = await this.getConnection(connection.id); - const mappings = await this.getCategoryMappings(connection.id); - - const journalLines = feeData.map((fee) => ({ - Description: `Fee Revenue - ${fee.category}`, - CreditAmount: fee.amount, - AccountID: this.getMappedCategory(mappings, fee.category) || "1", - })); - - const journalEntry = { - Date: new Date().toISOString().split("T")[0], - JournalLines: journalLines, - }; - - try { - await axios.put( - "https://api.xero.com/api.xro/2.0/ManualJournals", - journalEntry, - { - headers: { - Authorization: `Bearer ${connectionData!.accessToken}`, - "Xero-tenant-id": connectionData!.tenantId, - "Content-Type": "application/json", - }, - }, - ); - - syncLog.recordsProcessed = feeData.length; - syncLog.recordsSucceeded = feeData.length; - } catch (error) { - syncLog.recordsProcessed = feeData.length; - syncLog.recordsFailed = feeData.length; - throw error; - } - } - - private async syncWithdrawalToXeroBill( - connection: AccountingConnection, - transaction: { - id: string; - userId: string; - type: string; - amount: number; - fee: number; - currency: string; - referenceNumber: string; - provider: string; - createdAt: Date; - }, - mappings: CategoryMapping[] - ): Promise { - const connectionData = await this.getConnection(connection.id); - const txnDate = transaction.createdAt.toISOString().split("T")[0]; - const withdrawalAccountId = this.getMappedCategory(mappings, "withdrawal"); - const feeAccountId = this.getMappedCategory(mappings, "fees"); - - const withdrawalLine: any = { - Description: `Withdrawal payout - ref:${transaction.referenceNumber} via ${transaction.provider}`, - Quantity: 1, - UnitAmount: transaction.amount, - TaxType: "NONE", - }; - - if (withdrawalAccountId) { - withdrawalLine.AccountID = withdrawalAccountId; - } else { - withdrawalLine.AccountCode = "500"; - } - - const lineItems: any[] = [withdrawalLine]; - - if (transaction.fee > 0) { - const feeLine: any = { - Description: `Fee - Withdrawal payout ref:${transaction.referenceNumber}`, - Quantity: 1, - UnitAmount: transaction.fee, - TaxType: "NONE", - }; - - if (feeAccountId) { - feeLine.AccountID = feeAccountId; - } else { - feeLine.AccountCode = "500"; - } - - lineItems.push(feeLine); - } - - const bill = { - Type: "ACCPAY", - Contact: { - Name: "Mobile Money Payouts", - }, - Date: txnDate, - DueDate: txnDate, - Reference: transaction.referenceNumber, - Status: "AUTHORISED", - LineItems: lineItems, - }; - - await axios.post( - "https://api.xero.com/api.xro/2.0/Bills", - { Bills: [bill] }, - { - headers: { - Authorization: `Bearer ${connectionData!.accessToken}`, - "Xero-tenant-id": connectionData!.tenantId, - "Content-Type": "application/json", - }, - } - ); - } - - private async syncXeroTransactionToManualJournal( - connection: AccountingConnection, - transaction: { - id: string; - userId: string; - type: string; - amount: number; - fee: number; - currency: string; - referenceNumber: string; - provider: string; - createdAt: Date; - } - ): Promise { - const freshConnection = await this.getConnection(connection.id); - const txnDate = transaction.createdAt.toISOString().split("T")[0]; - const description = `${transaction.type} - ref:${transaction.referenceNumber} via ${transaction.provider}`; - - const journalLines: object[] = [ - { - Description: description, - CreditAmount: transaction.amount, - AccountID: "revenue-account-id", - }, - ]; - - if (transaction.fee > 0) { - journalLines.push({ - Description: `Fee - ${description}`, - DebitAmount: transaction.fee, - AccountID: "expense-account-id", - }); - } - - await axios.put( - "https://api.xero.com/api.xro/2.0/ManualJournals", - { Date: txnDate, Narration: transaction.id, JournalLines: journalLines }, - { - headers: { - Authorization: `Bearer ${freshConnection!.accessToken}`, - "Xero-tenant-id": freshConnection!.tenantId, - "Content-Type": "application/json", - }, - } - ); - } - - private getMappedCategory(mappings: CategoryMapping[], mobileMoneyCategory: string): string | null { - const mapping = mappings.find(m => m.mobileMoneyCategory === mobileMoneyCategory); - return mapping ? mapping.accountingCategoryId : null; - } - - /** - * Sync a single completed transaction to all active accounting connections for the user. - * Called automatically when a transaction.completed event fires. - */ - async syncTransaction(transaction: { - id: string; - userId: string; - type: string; - amount: number; - fee: number; - currency: string; - referenceNumber: string; - provider: string; - createdAt: Date; - }): Promise { - const connections = await this.getUserConnections(transaction.userId); - if (connections.length === 0) return; - - for (const connection of connections) { - await this.ensureValidToken(connection.id); - const fresh = await this.getConnection(connection.id); - if (!fresh) continue; - - const txnDate = transaction.createdAt.toISOString().split("T")[0]; - const description = `${transaction.type} - ref:${transaction.referenceNumber} via ${transaction.provider}`; - - try { - if (connection.provider === AccountingProvider.QUICKBOOKS) { - await axios.post( - `https://quickbooks.api.intuit.com/v3/company/${fresh.realmId}/journalentry`, - { - TxnDate: txnDate, - PrivateNote: transaction.id, - Line: [ - { - Description: description, - Amount: transaction.amount, - DetailType: "JournalEntryLineDetail", - JournalEntryLineDetail: { - PostingType: "Credit", - AccountRef: { value: "1" }, // Sales / Revenue - }, - }, - ...(transaction.fee > 0 - ? [ - { - Description: `Fee - ${description}`, - Amount: transaction.fee, - DetailType: "JournalEntryLineDetail", - JournalEntryLineDetail: { - PostingType: "Debit", - AccountRef: { value: "4" }, // Expense - }, - }, - ] - : []), - ], - }, - { - headers: { - Authorization: `Bearer ${fresh.accessToken}`, - "Content-Type": "application/json", - }, - }, - ); - } else if (connection.provider === AccountingProvider.XERO) { - if (transaction.type === "withdraw") { - const mappings = await this.getCategoryMappings(connection.id); - const withdrawalAccountId = this.getMappedCategory(mappings, "withdrawal"); - - if (withdrawalAccountId) { - await this.syncWithdrawalToXeroBill(fresh, transaction, mappings); - } else { - await this.syncXeroTransactionToManualJournal(fresh, transaction); - } - } else { - await this.syncXeroTransactionToManualJournal(fresh, transaction); - } - } - - await pool.query( - `INSERT INTO accounting_sync_queue - (transaction_id, connection_id, status, synced_at) - VALUES ($1, $2, 'synced', NOW()) - ON CONFLICT (transaction_id, connection_id) DO UPDATE - SET status = 'synced', synced_at = NOW()`, - [transaction.id, connection.id], - ); - } catch (err) { - await pool.query( - `INSERT INTO accounting_sync_queue - (transaction_id, connection_id, status, error_message, synced_at) - VALUES ($1, $2, 'failed', $3, NOW()) - ON CONFLICT (transaction_id, connection_id) DO UPDATE - SET status = 'failed', error_message = $3, synced_at = NOW()`, - [ - transaction.id, - connection.id, - err instanceof Error ? err.message : String(err), - ], - ); - } - } - } -} +import { pool } from "../config/database"; +import { redisClient } from "../config/redis"; +import axios from "axios"; +import { v4 as uuidv4 } from "uuid"; +import { encryptField, decryptField } from "../utils/encryption"; +import { + addAccountingTokenRefreshJob, + removeAccountingTokenRefreshJob, +} from "../queue/accountingTokenRefreshQueue"; +import { logger } from "./logger"; + +export enum AccountingProvider { + QUICKBOOKS = "quickbooks", + XERO = "xero", +} + +export interface AccountingConnection { + id: string; + userId: string; + provider: AccountingProvider; + realmId?: string; // QuickBooks company ID + tenantId?: string; // Xero tenant ID (active organization) + tenantName?: string; // Xero organization name (for display / selection) + accessToken: string; + refreshToken: string; + expiresAt: Date; + isActive: boolean; + createdAt: Date; + updatedAt: Date; +} + +export interface CategoryMapping { + id: string; + connectionId: string; + mobileMoneyCategory: string; + accountingCategoryId: string; + accountingCategoryName: string; + createdAt: Date; +} + +export interface SyncLog { + id: string; + connectionId: string; + syncType: "daily_pnl" | "fee_revenue"; + status: "pending" | "in_progress" | "completed" | "failed"; + recordsProcessed: number; + recordsSucceeded: number; + recordsFailed: number; + errorMessage?: string; + syncedAt: Date; +} + +export interface PnLData { + date: string; + revenue: number; + fees: number; + netProfit: number; + transactions: number; +} + +export interface QuickBooksTokenResponse { + access_token: string; + refresh_token: string; + expires_in: number; + token_type: string; + x_refresh_token_expires_in: number; +} + +export interface XeroTokenResponse { + access_token: string; + refresh_token: string; + expires_in: number; + token_type: string; + scope: string; +} + +/** + * A single Xero "connection" as returned by GET https://api.xero.com/connections. + * Each entry represents one organization (tenant) the user authorized. + */ +export interface XeroTenant { + id?: string; // connection id + tenantId: string; + tenantName?: string; + tenantType?: string; +} + +export class AccountingService { + private readonly quickbooksClientId: string; + private readonly quickbooksClientSecret: string; + private readonly quickbooksRedirectUri: string; + private readonly xeroClientId: string; + private readonly xeroClientSecret: string; + private readonly xeroRedirectUri: string; + + constructor() { + this.quickbooksClientId = process.env.QUICKBOOKS_CLIENT_ID || ""; + this.quickbooksClientSecret = process.env.QUICKBOOKS_CLIENT_SECRET || ""; + this.quickbooksRedirectUri = process.env.QUICKBOOKS_REDIRECT_URI || ""; + this.xeroClientId = process.env.XERO_CLIENT_ID || ""; + this.xeroClientSecret = process.env.XERO_CLIENT_SECRET || ""; + this.xeroRedirectUri = process.env.XERO_REDIRECT_URI || ""; + } + + // OAuth2 Authorization URLs + getQuickBooksAuthUrl(state: string = uuidv4()): string { + const scopes = [ + "com.intuit.quickbooks.accounting", + "com.intuit.quickbooks.payment", + ].join(" "); + + const params = new URLSearchParams({ + client_id: this.quickbooksClientId, + redirect_uri: this.quickbooksRedirectUri, + response_type: "code", + scope: scopes, + state, + }); + + return `https://appcenter.intuit.com/connect/oauth2?${params.toString()}`; + } + + /** + * Build the Xero OAuth 2.0 authorization URL. + * + * `offline_access` is required for Xero to return a refresh token, and the + * `state` value is used for CSRF protection + to re-associate the headerless + * browser callback with the user who started the flow. When no state is + * supplied a random one is generated so existing callers keep working. + */ + getXeroAuthUrl(state?: string): string { + const scopes = [ + "offline_access", + "openid", + "profile", + "email", + "accounting.transactions", + "accounting.reports.read", + "accounting.settings", + ].join(" "); + + const params = new URLSearchParams({ + client_id: this.xeroClientId, + redirect_uri: this.xeroRedirectUri, + response_type: "code", + scope: scopes, + state: state || uuidv4(), + }); + + return `https://login.xero.com/identity/connect/authorize?${params.toString()}`; + } + + // Handle OAuth2 callbacks + async handleQuickBooksCallback( + code: string, + realmId: string, + userId: string, + ): Promise { + try { + const tokenResponse = await this.exchangeQuickBooksCode(code); + + const connection: AccountingConnection = { + id: uuidv4(), + userId, + provider: AccountingProvider.QUICKBOOKS, + realmId, + accessToken: tokenResponse.access_token, + refreshToken: tokenResponse.refresh_token, + expiresAt: new Date(Date.now() + tokenResponse.expires_in * 1000), + isActive: true, + createdAt: new Date(), + updatedAt: new Date(), + }; + + await this.saveConnection(connection); + await this.scheduleTokenRefresh(connection); + + return connection; + } catch (error) { + logger.error(`QuickBooks OAuth callback failed: ${error}`); + throw new Error(`QuickBooks OAuth failed: ${error}`); + } + } + + async handleXeroCallback( + code: string, + userId: string, + selectedTenantId?: string, + ): Promise { + try { + const tokenResponse = await this.exchangeXeroCode(code); + + // Resolve the tenant (organization) this token grants access to. A single + // Xero login can be connected to multiple organizations, so we must fetch + // the live list of authorized tenants and pick the right one. + const tenants = await this.getXeroTenants(tokenResponse.access_token); + + if (!tenants || tenants.length === 0) { + throw new Error( + "No Xero organizations are connected to this login. " + + "Please authorize at least one organization and try again.", + ); + } + + const activeTenant = this.resolveActiveXeroTenant( + tenants, + selectedTenantId, + ); + + const connection: AccountingConnection = { + id: uuidv4(), + userId, + provider: AccountingProvider.XERO, + tenantId: activeTenant.tenantId, + tenantName: activeTenant.tenantName, + accessToken: tokenResponse.access_token, + refreshToken: tokenResponse.refresh_token, + expiresAt: new Date(Date.now() + tokenResponse.expires_in * 1000), + isActive: true, + createdAt: new Date(), + updatedAt: new Date(), + }; + + await this.saveConnection(connection); + await this.scheduleTokenRefresh(connection); + + return connection; + } catch (error) { + logger.error(`Xero OAuth callback failed: ${error}`); + throw new Error(`Xero OAuth failed: ${error}`); + } + } + + /** + * Expose the list of Xero organizations (tenants) associated with an + * authorization code. Useful when the front-end wants to let the user pick + * which organization to connect before finalizing the connection. + */ + async listXeroTenantsFromCode(code: string): Promise { + const tokenResponse = await this.exchangeXeroCode(code); + return this.getXeroTenants(tokenResponse.access_token); + } + + /** + * Pick the active tenant from the list returned by Xero. + * + * - When the caller explicitly selects a tenant we honor it (multi-tenant + * selection), but only if it is actually present in the authorized list. + * - Otherwise we default to the first authorized organization. + */ + private resolveActiveXeroTenant( + tenants: XeroTenant[], + selectedTenantId?: string, + ): XeroTenant { + if (selectedTenantId) { + const match = tenants.find((t) => t.tenantId === selectedTenantId); + if (!match) { + throw new Error( + `Selected Xero tenant "${selectedTenantId}" is not among the authorized organizations.`, + ); + } + return match; + } + return tenants[0]; + } + + private async scheduleTokenRefresh( + connection: AccountingConnection, + ): Promise { + // Refresh 10 minutes before expiry + const refreshBufferMs = 10 * 60 * 1000; + const delayMs = + connection.expiresAt.getTime() - Date.now() - refreshBufferMs; + + // If token is already expired or expires very soon, refresh immediately (small delay for safety) + const finalDelayMs = Math.max(5000, delayMs); + + await removeAccountingTokenRefreshJob(connection.id); + await addAccountingTokenRefreshJob( + connection.id, + connection.provider, + finalDelayMs, + ); + + logger.info( + `Scheduled token refresh for connection ${connection.id} in ${finalDelayMs}ms`, + ); + } + + // Exchange authorization code for tokens + private async exchangeQuickBooksCode( + code: string, + ): Promise { + const response = await axios.post( + "https://oauth.platform.intuit.com/oauth2/v1/tokens/bearer", + new URLSearchParams({ + grant_type: "authorization_code", + code, + redirect_uri: this.quickbooksRedirectUri, + }), + { + headers: { + "Content-Type": "application/x-www-form-urlencoded", + Authorization: `Basic ${Buffer.from( + `${this.quickbooksClientId}:${this.quickbooksClientSecret}`, + ).toString("base64")}`, + }, + }, + ); + + return response.data; + } + + private async exchangeXeroCode(code: string): Promise { + const response = await axios.post( + "https://identity.xero.com/connect/token", + new URLSearchParams({ + grant_type: "authorization_code", + code, + redirect_uri: this.xeroRedirectUri, + }), + { + headers: { + "Content-Type": "application/x-www-form-urlencoded", + Authorization: `Basic ${Buffer.from( + `${this.xeroClientId}:${this.xeroClientSecret}`, + ).toString("base64")}`, + }, + }, + ); + + return response.data; + } + + // Get Xero tenants (organizations) authorized for this access token. + private async getXeroTenants(accessToken: string): Promise { + const response = await axios.get("https://api.xero.com/connections", { + headers: { + Authorization: `Bearer ${accessToken}`, + "Content-Type": "application/json", + }, + }); + + const raw = Array.isArray(response.data) ? response.data : []; + + // Normalize the Xero connections payload into our XeroTenant shape. The + // Xero API returns objects like: + // { id, tenantId, tenantType, tenantName, ... } + return raw + .map((entry: any) => ({ + id: entry.id, + tenantId: entry.tenantId, + tenantName: entry.tenantName, + tenantType: entry.tenantType, + })) + .filter((t: XeroTenant) => Boolean(t.tenantId)); + } + + // Refresh access tokens + async refreshQuickBooksToken(connectionId: string): Promise { + const connection = await this.getConnection(connectionId); + if (!connection || connection.provider !== AccountingProvider.QUICKBOOKS) { + throw new Error("QuickBooks connection not found"); + } + + try { + const response = await axios.post( + "https://oauth.platform.intuit.com/oauth2/v1/tokens/bearer", + new URLSearchParams({ + grant_type: "refresh_token", + refresh_token: connection.refreshToken, + }), + { + headers: { + "Content-Type": "application/x-www-form-urlencoded", + Authorization: `Basic ${Buffer.from( + `${this.quickbooksClientId}:${this.quickbooksClientSecret}`, + ).toString("base64")}`, + }, + }, + ); + + const updatedConnection: AccountingConnection = { + ...connection, + accessToken: response.data.access_token, + refreshToken: response.data.refresh_token, + expiresAt: new Date(Date.now() + response.data.expires_in * 1000), + updatedAt: new Date(), + }; + + await this.updateConnectionTokens(connectionId, { + accessToken: updatedConnection.accessToken, + refreshToken: updatedConnection.refreshToken, + expiresAt: updatedConnection.expiresAt, + }); + + await this.scheduleTokenRefresh(updatedConnection); + logger.info( + `Successfully refreshed QuickBooks token for connection ${connectionId}`, + ); + } catch (error) { + logger.error( + `QuickBooks token refresh failed for ${connectionId}: ${error}`, + ); + throw new Error(`QuickBooks token refresh failed: ${error}`); + } + } + + async refreshXeroToken(connectionId: string): Promise { + const connection = await this.getConnection(connectionId); + if (!connection || connection.provider !== AccountingProvider.XERO) { + throw new Error("Xero connection not found"); + } + + try { + const response = await axios.post( + "https://identity.xero.com/connect/token", + new URLSearchParams({ + grant_type: "refresh_token", + refresh_token: connection.refreshToken, + }), + { + headers: { + "Content-Type": "application/x-www-form-urlencoded", + Authorization: `Basic ${Buffer.from( + `${this.xeroClientId}:${this.xeroClientSecret}`, + ).toString("base64")}`, + }, + }, + ); + + const updatedConnection: AccountingConnection = { + ...connection, + accessToken: response.data.access_token, + refreshToken: response.data.refresh_token, + expiresAt: new Date(Date.now() + response.data.expires_in * 1000), + updatedAt: new Date(), + }; + + await this.updateConnectionTokens(connectionId, { + accessToken: updatedConnection.accessToken, + refreshToken: updatedConnection.refreshToken, + expiresAt: updatedConnection.expiresAt, + }); + + await this.scheduleTokenRefresh(updatedConnection); + logger.info( + `Successfully refreshed Xero token for connection ${connectionId}`, + ); + } catch (error) { + logger.error(`Xero token refresh failed for ${connectionId}: ${error}`); + throw new Error(`Xero token refresh failed: ${error}`); + } + } + + // Category mapping + async createCategoryMapping( + connectionId: string, + mobileMoneyCategory: string, + accountingCategoryId: string, + accountingCategoryName: string, + ): Promise { + const mapping: CategoryMapping = { + id: uuidv4(), + connectionId, + mobileMoneyCategory, + accountingCategoryId, + accountingCategoryName, + createdAt: new Date(), + }; + + await pool.query( + `INSERT INTO category_mappings (id, connection_id, mobile_money_category, accounting_category_id, accounting_category_name, created_at) + VALUES ($1, $2, $3, $4, $5, $6)`, + [ + mapping.id, + mapping.connectionId, + mapping.mobileMoneyCategory, + mapping.accountingCategoryId, + mapping.accountingCategoryName, + mapping.createdAt, + ], + ); + + return mapping; + } + + async getCategoryMappings(connectionId: string): Promise { + const result = await pool.query( + "SELECT * FROM category_mappings WHERE connection_id = $1 ORDER BY mobile_money_category", + [connectionId], + ); + + return result.rows; + } + + async getAccountingCategories( + connectionId: string, + ): Promise> { + const connection = await this.getConnection(connectionId); + if (!connection) { + throw new Error("Connection not found"); + } + + if (connection.provider === AccountingProvider.QUICKBOOKS) { + return this.getQuickBooksCategories(connection); + } else if (connection.provider === AccountingProvider.XERO) { + return this.getXeroCategories(connection); + } + + throw new Error("Unsupported provider"); + } + + private async getQuickBooksCategories( + connection: AccountingConnection, + ): Promise> { + await this.ensureValidToken(connection.id); + + const connectionData = await this.getConnection(connection.id); + const response = await axios.get( + `https://quickbooks.api.intuit.com/v3/company/${connectionData!.realmId}/query?query=SELECT * FROM Account WHERE Active=true`, + { + headers: { + Authorization: `Bearer ${connectionData!.accessToken}`, + Accept: "application/json", + }, + }, + ); + + return response.data.QueryResponse.Account.map((account: any) => ({ + id: account.Id, + name: account.Name, + })); + } + + private async getXeroCategories( + connection: AccountingConnection, + ): Promise> { + await this.ensureValidToken(connection.id); + + const connectionData = await this.getConnection(connection.id); + const response = await axios.get( + "https://api.xero.com/api.xro/2.0/Accounts", + { + headers: { + Authorization: `Bearer ${connectionData!.accessToken}`, + "Xero-tenant-id": connectionData!.tenantId, + Accept: "application/json", + }, + }, + ); + + return response.data.Accounts.map((account: any) => ({ + id: account.AccountID, + name: account.Name, + })); + } + + // Sync functions + async syncDailyPnL(connectionId: string, date: string): Promise { + const syncLog: SyncLog = { + id: uuidv4(), + connectionId, + syncType: "daily_pnl", + status: "in_progress", + recordsProcessed: 0, + recordsSucceeded: 0, + recordsFailed: 0, + syncedAt: new Date(), + }; + + await this.createSyncLog(syncLog); + + try { + const connection = await this.getConnection(connectionId); + if (!connection) { + throw new Error("Connection not found"); + } + + await this.ensureValidToken(connectionId); + + // Get PnL data for the date + const pnlData = await this.getPnLData(date); + + if (connection.provider === AccountingProvider.QUICKBOOKS) { + await this.syncPnLToQuickBooks(connection, pnlData, syncLog); + } else if (connection.provider === AccountingProvider.XERO) { + await this.syncPnLToXero(connection, pnlData, syncLog); + } + + syncLog.status = "completed"; + await this.updateSyncLog(syncLog); + } catch (error) { + syncLog.status = "failed"; + syncLog.errorMessage = + error instanceof Error ? error.message : "Unknown error"; + await this.updateSyncLog(syncLog); + } + + return syncLog; + } + + async syncFeeRevenue(connectionId: string, date: string): Promise { + const syncLog: SyncLog = { + id: uuidv4(), + connectionId, + syncType: "fee_revenue", + status: "in_progress", + recordsProcessed: 0, + recordsSucceeded: 0, + recordsFailed: 0, + syncedAt: new Date(), + }; + + await this.createSyncLog(syncLog); + + try { + const connection = await this.getConnection(connectionId); + if (!connection) { + throw new Error("Connection not found"); + } + + await this.ensureValidToken(connectionId); + + // Get fee revenue data for the date + const feeData = await this.getFeeRevenueData(date); + + if (connection.provider === AccountingProvider.QUICKBOOKS) { + await this.syncFeeRevenueToQuickBooks(connection, feeData, syncLog); + } else if (connection.provider === AccountingProvider.XERO) { + await this.syncFeeRevenueToXero(connection, feeData, syncLog); + } + + syncLog.status = "completed"; + await this.updateSyncLog(syncLog); + } catch (error) { + syncLog.status = "failed"; + syncLog.errorMessage = + error instanceof Error ? error.message : "Unknown error"; + await this.updateSyncLog(syncLog); + } + + return syncLog; + } + + // Database operations + private async saveConnection( + connection: AccountingConnection, + ): Promise { + const encryptedAccessToken = encryptField(connection.accessToken); + const encryptedRefreshToken = encryptField(connection.refreshToken); + + await pool.query( + `INSERT INTO accounting_connections + (id, user_id, provider, realm_id, tenant_id, tenant_name, access_token, refresh_token, expires_at, is_active, created_at, updated_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) + ON CONFLICT (id) DO UPDATE SET + tenant_id = EXCLUDED.tenant_id, + tenant_name = EXCLUDED.tenant_name, + access_token = EXCLUDED.access_token, + refresh_token = EXCLUDED.refresh_token, + expires_at = EXCLUDED.expires_at, + is_active = EXCLUDED.is_active, + updated_at = EXCLUDED.updated_at`, + [ + connection.id, + connection.userId, + connection.provider, + connection.realmId, + connection.tenantId, + connection.tenantName, + encryptedAccessToken, + encryptedRefreshToken, + connection.expiresAt, + connection.isActive, + connection.createdAt, + connection.updatedAt, + ], + ); + } + + private async updateConnectionTokens( + connectionId: string, + tokens: { accessToken: string; refreshToken: string; expiresAt: Date }, + ): Promise { + const encryptedAccessToken = encryptField(tokens.accessToken); + const encryptedRefreshToken = encryptField(tokens.refreshToken); + + await pool.query( + "UPDATE accounting_connections SET access_token = $1, refresh_token = $2, expires_at = $3, updated_at = $4 WHERE id = $5", + [ + encryptedAccessToken, + encryptedRefreshToken, + tokens.expiresAt, + new Date(), + connectionId, + ], + ); + } + + async getConnection( + connectionId: string, + ): Promise { + const result = await pool.query( + "SELECT * FROM accounting_connections WHERE id = $1", + [connectionId], + ); + + if (result.rows.length === 0) { + return null; + } + + const row = result.rows[0]; + return { + ...row, + accessToken: decryptField(row.access_token) || row.access_token, + refreshToken: decryptField(row.refresh_token) || row.refresh_token, + expiresAt: new Date(row.expires_at), + isActive: row.is_active, + createdAt: new Date(row.created_at), + updatedAt: new Date(row.updated_at), + }; + } + + async getUserConnections(userId: string): Promise { + const result = await pool.query( + "SELECT * FROM accounting_connections WHERE user_id = $1 AND is_active = true ORDER BY created_at DESC", + [userId], + ); + + return result.rows.map((row) => ({ + ...row, + accessToken: decryptField(row.access_token) || row.access_token, + refreshToken: decryptField(row.refresh_token) || row.refresh_token, + expiresAt: new Date(row.expires_at), + isActive: row.is_active, + createdAt: new Date(row.created_at), + updatedAt: new Date(row.updated_at), + })); + } + + private async createSyncLog(syncLog: SyncLog): Promise { + await pool.query( + `INSERT INTO sync_logs + (id, connection_id, sync_type, status, records_processed, records_succeeded, records_failed, error_message, synced_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)`, + [ + syncLog.id, + syncLog.connectionId, + syncLog.syncType, + syncLog.status, + syncLog.recordsProcessed, + syncLog.recordsSucceeded, + syncLog.recordsFailed, + syncLog.errorMessage, + syncLog.syncedAt, + ], + ); + } + + private async updateSyncLog(syncLog: SyncLog): Promise { + await pool.query( + `UPDATE sync_logs SET + status = $1, records_processed = $2, records_succeeded = $3, records_failed = $4, error_message = $5 + WHERE id = $6`, + [ + syncLog.status, + syncLog.recordsProcessed, + syncLog.recordsSucceeded, + syncLog.recordsFailed, + syncLog.errorMessage, + syncLog.id, + ], + ); + } + + async getSyncLogs( + connectionId: string, + limit: number = 50, + ): Promise { + const result = await pool.query( + "SELECT * FROM sync_logs WHERE connection_id = $1 ORDER BY synced_at DESC LIMIT $2", + [connectionId, limit], + ); + + return result.rows; + } + + // Helper functions + private async ensureValidToken(connectionId: string): Promise { + const connection = await this.getConnection(connectionId); + if (!connection) { + throw new Error("Connection not found"); + } + + if (new Date() >= connection.expiresAt) { + if (connection.provider === AccountingProvider.QUICKBOOKS) { + await this.refreshQuickBooksToken(connectionId); + } else if (connection.provider === AccountingProvider.XERO) { + await this.refreshXeroToken(connectionId); + } + } + } + + private async getPnLData(date: string): Promise { + // Calculate PnL data from transactions + const query = ` + SELECT + COUNT(*) as transactions, + COALESCE(SUM(CASE WHEN type = 'credit' THEN amount ELSE 0 END), 0) as revenue, + COALESCE(SUM(fee), 0) as fees + FROM transactions + WHERE DATE(created_at) = $1 + AND status = 'completed' + `; + + const result = await pool.query(query, [date]); + const { transactions, revenue, fees } = result.rows[0]; + + return { + date, + revenue: parseFloat(revenue), + fees: parseFloat(fees), + netProfit: parseFloat(revenue) - parseFloat(fees), + transactions: parseInt(transactions), + }; + } + + private async getFeeRevenueData( + date: string, + ): Promise<{ category: string; amount: number }[]> { + // Get fee revenue broken down by category + const query = ` + SELECT + fee_category, + SUM(fee) as amount + FROM transactions + WHERE DATE(created_at) = $1 + AND status = 'completed' + AND fee > 0 + GROUP BY fee_category + ORDER BY amount DESC + `; + + const result = await pool.query(query, [date]); + return result.rows.map((row) => ({ + category: row.fee_category || "General Fees", + amount: parseFloat(row.amount), + })); + } + + // Provider-specific sync implementations + private async syncPnLToQuickBooks( + connection: AccountingConnection, + pnlData: PnLData, + syncLog: SyncLog, + ): Promise { + const connectionData = await this.getConnection(connection.id); + const mappings = await this.getCategoryMappings(connection.id); + + // Create journal entry for PnL + const journalEntry = { + TxnDate: pnlData.date, + Line: [ + { + Description: `Daily P&L - ${pnlData.date}`, + Amount: pnlData.revenue, + DetailType: "JournalEntryLineDetail", + JournalEntryLineDetail: { + PostingType: "Credit", + AccountRef: this.getMappedCategory(mappings, "revenue") || { + value: "1", + }, // Default to Sales + }, + }, + { + Description: `Daily Fees - ${pnlData.date}`, + Amount: pnlData.fees, + DetailType: "JournalEntryLineDetail", + JournalEntryLineDetail: { + PostingType: "Debit", + AccountRef: this.getMappedCategory(mappings, "fees") || { + value: "4", + }, // Default to Expense + }, + }, + ], + }; + + try { + await axios.post( + `https://quickbooks.api.intuit.com/v3/company/${connectionData!.realmId}/journalentry`, + journalEntry, + { + headers: { + Authorization: `Bearer ${connectionData!.accessToken}`, + "Content-Type": "application/json", + }, + }, + ); + + syncLog.recordsProcessed = 1; + syncLog.recordsSucceeded = 1; + } catch (error) { + syncLog.recordsProcessed = 1; + syncLog.recordsFailed = 1; + throw error; + } + } + + private async syncPnLToXero( + connection: AccountingConnection, + pnlData: PnLData, + syncLog: SyncLog, + ): Promise { + const connectionData = await this.getConnection(connection.id); + const mappings = await this.getCategoryMappings(connection.id); + + // Create manual journal entry for PnL + const journalEntry = { + Date: pnlData.date, + JournalLines: [ + { + Description: `Daily P&L - ${pnlData.date}`, + CreditAmount: pnlData.revenue, + AccountID: this.getMappedCategory(mappings, "revenue") || "1", // Default to Sales + }, + { + Description: `Daily Fees - ${pnlData.date}`, + DebitAmount: pnlData.fees, + AccountID: this.getMappedCategory(mappings, "fees") || "4", // Default to Expense + }, + ], + }; + + try { + await axios.put( + "https://api.xero.com/api.xro/2.0/ManualJournals", + journalEntry, + { + headers: { + Authorization: `Bearer ${connectionData!.accessToken}`, + "Xero-tenant-id": connectionData!.tenantId, + "Content-Type": "application/json", + }, + }, + ); + + syncLog.recordsProcessed = 1; + syncLog.recordsSucceeded = 1; + } catch (error) { + syncLog.recordsProcessed = 1; + syncLog.recordsFailed = 1; + throw error; + } + } + + private async syncFeeRevenueToQuickBooks( + connection: AccountingConnection, + feeData: Array<{ category: string; amount: number }>, + syncLog: SyncLog, + ): Promise { + const connectionData = await this.getConnection(connection.id); + const mappings = await this.getCategoryMappings(connection.id); + + const lines = feeData.map((fee) => ({ + Description: `Fee Revenue - ${fee.category}`, + Amount: fee.amount, + DetailType: "JournalEntryLineDetail", + JournalEntryLineDetail: { + PostingType: "Credit", + AccountRef: this.getMappedCategory(mappings, fee.category) || { + value: "1", + }, + }, + })); + + const journalEntry = { + TxnDate: new Date().toISOString().split("T")[0], + Line: lines, + }; + + try { + await axios.post( + `https://quickbooks.api.intuit.com/v3/company/${connectionData!.realmId}/journalentry`, + journalEntry, + { + headers: { + Authorization: `Bearer ${connectionData!.accessToken}`, + "Content-Type": "application/json", + }, + }, + ); + + syncLog.recordsProcessed = feeData.length; + syncLog.recordsSucceeded = feeData.length; + } catch (error) { + syncLog.recordsProcessed = feeData.length; + syncLog.recordsFailed = feeData.length; + throw error; + } + } + + private async syncFeeRevenueToXero( + connection: AccountingConnection, + feeData: Array<{ category: string; amount: number }>, + syncLog: SyncLog, + ): Promise { + const connectionData = await this.getConnection(connection.id); + const mappings = await this.getCategoryMappings(connection.id); + + const journalLines = feeData.map((fee) => ({ + Description: `Fee Revenue - ${fee.category}`, + CreditAmount: fee.amount, + AccountID: this.getMappedCategory(mappings, fee.category) || "1", + })); + + const journalEntry = { + Date: new Date().toISOString().split("T")[0], + JournalLines: journalLines, + }; + + try { + await axios.put( + "https://api.xero.com/api.xro/2.0/ManualJournals", + journalEntry, + { + headers: { + Authorization: `Bearer ${connectionData!.accessToken}`, + "Xero-tenant-id": connectionData!.tenantId, + "Content-Type": "application/json", + }, + }, + ); + + syncLog.recordsProcessed = feeData.length; + syncLog.recordsSucceeded = feeData.length; + } catch (error) { + syncLog.recordsProcessed = feeData.length; + syncLog.recordsFailed = feeData.length; + throw error; + } + } + + private async syncWithdrawalToXeroBill( + connection: AccountingConnection, + transaction: { + id: string; + userId: string; + type: string; + amount: number; + fee: number; + currency: string; + referenceNumber: string; + provider: string; + createdAt: Date; + }, + mappings: CategoryMapping[], + ): Promise { + const connectionData = await this.getConnection(connection.id); + const txnDate = transaction.createdAt.toISOString().split("T")[0]; + const withdrawalAccountId = this.getMappedCategory(mappings, "withdrawal"); + const feeAccountId = this.getMappedCategory(mappings, "fees"); + + const withdrawalLine: any = { + Description: `Withdrawal payout - ref:${transaction.referenceNumber} via ${transaction.provider}`, + Quantity: 1, + UnitAmount: transaction.amount, + TaxType: "NONE", + }; + + if (withdrawalAccountId) { + withdrawalLine.AccountID = withdrawalAccountId; + } else { + withdrawalLine.AccountCode = "500"; + } + + const lineItems: any[] = [withdrawalLine]; + + if (transaction.fee > 0) { + const feeLine: any = { + Description: `Fee - Withdrawal payout ref:${transaction.referenceNumber}`, + Quantity: 1, + UnitAmount: transaction.fee, + TaxType: "NONE", + }; + + if (feeAccountId) { + feeLine.AccountID = feeAccountId; + } else { + feeLine.AccountCode = "500"; + } + + lineItems.push(feeLine); + } + + const bill = { + Type: "ACCPAY", + Contact: { + Name: "Mobile Money Payouts", + }, + Date: txnDate, + DueDate: txnDate, + Reference: transaction.referenceNumber, + Status: "AUTHORISED", + LineItems: lineItems, + }; + + await axios.post( + "https://api.xero.com/api.xro/2.0/Bills", + { Bills: [bill] }, + { + headers: { + Authorization: `Bearer ${connectionData!.accessToken}`, + "Xero-tenant-id": connectionData!.tenantId, + "Content-Type": "application/json", + }, + }, + ); + } + + private async syncXeroTransactionToManualJournal( + connection: AccountingConnection, + transaction: { + id: string; + userId: string; + type: string; + amount: number; + fee: number; + currency: string; + referenceNumber: string; + provider: string; + createdAt: Date; + }, + ): Promise { + const freshConnection = await this.getConnection(connection.id); + const txnDate = transaction.createdAt.toISOString().split("T")[0]; + const description = `${transaction.type} - ref:${transaction.referenceNumber} via ${transaction.provider}`; + + const journalLines: object[] = [ + { + Description: description, + CreditAmount: transaction.amount, + AccountID: "revenue-account-id", + }, + ]; + + if (transaction.fee > 0) { + journalLines.push({ + Description: `Fee - ${description}`, + DebitAmount: transaction.fee, + AccountID: "expense-account-id", + }); + } + + await axios.put( + "https://api.xero.com/api.xro/2.0/ManualJournals", + { Date: txnDate, Narration: transaction.id, JournalLines: journalLines }, + { + headers: { + Authorization: `Bearer ${freshConnection!.accessToken}`, + "Xero-tenant-id": freshConnection!.tenantId, + "Content-Type": "application/json", + }, + }, + ); + } + + private getMappedCategory( + mappings: CategoryMapping[], + mobileMoneyCategory: string, + ): string | null { + const mapping = mappings.find( + (m) => m.mobileMoneyCategory === mobileMoneyCategory, + ); + return mapping ? mapping.accountingCategoryId : null; + } + + /** + * Sync a single completed transaction to all active accounting connections for the user. + * Called automatically when a transaction.completed event fires. + */ + async syncTransaction(transaction: { + id: string; + userId: string; + type: string; + amount: number; + fee: number; + currency: string; + referenceNumber: string; + provider: string; + createdAt: Date; + }): Promise { + const connections = await this.getUserConnections(transaction.userId); + if (connections.length === 0) return; + + for (const connection of connections) { + await this.ensureValidToken(connection.id); + const fresh = await this.getConnection(connection.id); + if (!fresh) continue; + + const txnDate = transaction.createdAt.toISOString().split("T")[0]; + const description = `${transaction.type} - ref:${transaction.referenceNumber} via ${transaction.provider}`; + + try { + if (connection.provider === AccountingProvider.QUICKBOOKS) { + await axios.post( + `https://quickbooks.api.intuit.com/v3/company/${fresh.realmId}/journalentry`, + { + TxnDate: txnDate, + PrivateNote: transaction.id, + Line: [ + { + Description: description, + Amount: transaction.amount, + DetailType: "JournalEntryLineDetail", + JournalEntryLineDetail: { + PostingType: "Credit", + AccountRef: { value: "1" }, // Sales / Revenue + }, + }, + ...(transaction.fee > 0 + ? [ + { + Description: `Fee - ${description}`, + Amount: transaction.fee, + DetailType: "JournalEntryLineDetail", + JournalEntryLineDetail: { + PostingType: "Debit", + AccountRef: { value: "4" }, // Expense + }, + }, + ] + : []), + ], + }, + { + headers: { + Authorization: `Bearer ${fresh.accessToken}`, + "Content-Type": "application/json", + }, + }, + ); + } else if (connection.provider === AccountingProvider.XERO) { + if (transaction.type === "withdraw") { + const mappings = await this.getCategoryMappings(connection.id); + const withdrawalAccountId = this.getMappedCategory( + mappings, + "withdrawal", + ); + + if (withdrawalAccountId) { + await this.syncWithdrawalToXeroBill(fresh, transaction, mappings); + } else { + await this.syncXeroTransactionToManualJournal(fresh, transaction); + } + } else { + await this.syncXeroTransactionToManualJournal(fresh, transaction); + } + } + + await pool.query( + `INSERT INTO accounting_sync_queue + (transaction_id, connection_id, status, synced_at) + VALUES ($1, $2, 'synced', NOW()) + ON CONFLICT (transaction_id, connection_id) DO UPDATE + SET status = 'synced', synced_at = NOW()`, + [transaction.id, connection.id], + ); + } catch (err) { + await pool.query( + `INSERT INTO accounting_sync_queue + (transaction_id, connection_id, status, error_message, synced_at) + VALUES ($1, $2, 'failed', $3, NOW()) + ON CONFLICT (transaction_id, connection_id) DO UPDATE + SET status = 'failed', error_message = $3, synced_at = NOW()`, + [ + transaction.id, + connection.id, + err instanceof Error ? err.message : String(err), + ], + ); + } + } + } +} + +export interface AccountingConnection { + id: string; + userId: string; + provider: AccountingProvider; + realmId?: string; // QuickBooks company ID + tenantId?: string; // Xero tenant ID (active organization) + tenantName?: string; // Xero organization name (for display / selection) + accessToken: string; + refreshToken: string; + expiresAt: Date; + isActive: boolean; + createdAt: Date; + updatedAt: Date; +} + +export interface CategoryMapping { + id: string; + connectionId: string; + mobileMoneyCategory: string; + accountingCategoryId: string; + accountingCategoryName: string; + createdAt: Date; +} + +export interface SyncLog { + id: string; + connectionId: string; + syncType: "daily_pnl" | "fee_revenue"; + status: "pending" | "in_progress" | "completed" | "failed"; + recordsProcessed: number; + recordsSucceeded: number; + recordsFailed: number; + errorMessage?: string; + syncedAt: Date; +} + +export interface PnLData { + date: string; + revenue: number; + fees: number; + netProfit: number; + transactions: number; +} + +export interface QuickBooksTokenResponse { + access_token: string; + refresh_token: string; + expires_in: number; + token_type: string; + x_refresh_token_expires_in: number; +} + +export interface XeroTokenResponse { + access_token: string; + refresh_token: string; + expires_in: number; + token_type: string; + scope: string; +} + +/** + * A single Xero "connection" as returned by GET https://api.xero.com/connections. + * Each entry represents one organization (tenant) the user authorized. + */ +export interface XeroTenant { + id?: string; // connection id + tenantId: string; + tenantName?: string; + tenantType?: string; +} + +export class AccountingService { + private readonly quickbooksClientId: string; + private readonly quickbooksClientSecret: string; + private readonly quickbooksRedirectUri: string; + private readonly xeroClientId: string; + private readonly xeroClientSecret: string; + private readonly xeroRedirectUri: string; + + constructor() { + this.quickbooksClientId = process.env.QUICKBOOKS_CLIENT_ID || ""; + this.quickbooksClientSecret = process.env.QUICKBOOKS_CLIENT_SECRET || ""; + this.quickbooksRedirectUri = process.env.QUICKBOOKS_REDIRECT_URI || ""; + this.xeroClientId = process.env.XERO_CLIENT_ID || ""; + this.xeroClientSecret = process.env.XERO_CLIENT_SECRET || ""; + this.xeroRedirectUri = process.env.XERO_REDIRECT_URI || ""; + } + + // OAuth2 Authorization URLs + getQuickBooksAuthUrl(): string { + const scopes = [ + "com.intuit.quickbooks.accounting", + "com.intuit.quickbooks.payment", + ].join(" "); + + const params = new URLSearchParams({ + client_id: this.quickbooksClientId, + redirect_uri: this.quickbooksRedirectUri, + response_type: "code", + scope: scopes, + state: uuidv4(), + }); + + return `https://appcenter.intuit.com/connect/oauth2?${params.toString()}`; + } + + /** + * Build the Xero OAuth 2.0 authorization URL. + * + * `offline_access` is required for Xero to return a refresh token, and the + * `state` value is used for CSRF protection + to re-associate the headerless + * browser callback with the user who started the flow. When no state is + * supplied a random one is generated so existing callers keep working. + */ + getXeroAuthUrl(state?: string): string { + const scopes = [ + "offline_access", + "openid", + "profile", + "email", + "accounting.transactions", + "accounting.reports.read", + "accounting.settings", + ].join(" "); + + const params = new URLSearchParams({ + client_id: this.xeroClientId, + redirect_uri: this.xeroRedirectUri, + response_type: "code", + scope: scopes, + state: state || uuidv4(), + }); + + return `https://login.xero.com/identity/connect/authorize?${params.toString()}`; + } + + // Handle OAuth2 callbacks + async handleQuickBooksCallback( + code: string, + realmId: string, + userId: string, + ): Promise { + try { + const tokenResponse = await this.exchangeQuickBooksCode(code); + + const connection: AccountingConnection = { + id: uuidv4(), + userId, + provider: AccountingProvider.QUICKBOOKS, + realmId, + accessToken: tokenResponse.access_token, + refreshToken: tokenResponse.refresh_token, + expiresAt: new Date(Date.now() + tokenResponse.expires_in * 1000), + isActive: true, + createdAt: new Date(), + updatedAt: new Date(), + }; + + await this.saveConnection(connection); + return connection; + } catch (error) { + throw new Error(`QuickBooks OAuth failed: ${error}`); + } + } + + async handleXeroCallback( + code: string, + userId: string, + selectedTenantId?: string, + ): Promise { + try { + const tokenResponse = await this.exchangeXeroCode(code); + + // Resolve the tenant (organization) this token grants access to. A single + // Xero login can be connected to multiple organizations, so we must fetch + // the live list of authorized tenants and pick the right one. + const tenants = await this.getXeroTenants(tokenResponse.access_token); + + if (!tenants || tenants.length === 0) { + throw new Error( + "No Xero organizations are connected to this login. " + + "Please authorize at least one organization and try again.", + ); + } + + const activeTenant = this.resolveActiveXeroTenant( + tenants, + selectedTenantId, + ); + + const connection: AccountingConnection = { + id: uuidv4(), + userId, + provider: AccountingProvider.XERO, + tenantId: activeTenant.tenantId, + tenantName: activeTenant.tenantName, + accessToken: tokenResponse.access_token, + refreshToken: tokenResponse.refresh_token, + expiresAt: new Date(Date.now() + tokenResponse.expires_in * 1000), + isActive: true, + createdAt: new Date(), + updatedAt: new Date(), + }; + + await this.saveConnection(connection); + return connection; + } catch (error) { + throw new Error(`Xero OAuth failed: ${error}`); + } + } + + /** + * Expose the list of Xero organizations (tenants) associated with an + * authorization code. Useful when the front-end wants to let the user pick + * which organization to connect before finalizing the connection. + */ + async listXeroTenantsFromCode(code: string): Promise { + const tokenResponse = await this.exchangeXeroCode(code); + return this.getXeroTenants(tokenResponse.access_token); + } + + /** + * Pick the active tenant from the list returned by Xero. + * + * - When the caller explicitly selects a tenant we honor it (multi-tenant + * selection), but only if it is actually present in the authorized list. + * - Otherwise we default to the first authorized organization. + */ + private resolveActiveXeroTenant( + tenants: XeroTenant[], + selectedTenantId?: string, + ): XeroTenant { + if (selectedTenantId) { + const match = tenants.find((t) => t.tenantId === selectedTenantId); + if (!match) { + throw new Error( + `Selected Xero tenant "${selectedTenantId}" is not among the authorized organizations.`, + ); + } + return match; + } + return tenants[0]; + } + + // Exchange authorization code for tokens + private async exchangeQuickBooksCode( + code: string, + ): Promise { + const response = await axios.post( + "https://oauth.platform.intuit.com/oauth2/v1/tokens/bearer", + new URLSearchParams({ + grant_type: "authorization_code", + code, + redirect_uri: this.quickbooksRedirectUri, + }), + { + headers: { + "Content-Type": "application/x-www-form-urlencoded", + Authorization: `Basic ${Buffer.from( + `${this.quickbooksClientId}:${this.quickbooksClientSecret}`, + ).toString("base64")}`, + }, + }, + ); + + return response.data; + } + + private async exchangeXeroCode(code: string): Promise { + const response = await axios.post( + "https://identity.xero.com/connect/token", + new URLSearchParams({ + grant_type: "authorization_code", + code, + redirect_uri: this.xeroRedirectUri, + }), + { + headers: { + "Content-Type": "application/x-www-form-urlencoded", + Authorization: `Basic ${Buffer.from( + `${this.xeroClientId}:${this.xeroClientSecret}`, + ).toString("base64")}`, + }, + }, + ); + + return response.data; + } + + // Get Xero tenants (organizations) authorized for this access token. + private async getXeroTenants(accessToken: string): Promise { + const response = await axios.get("https://api.xero.com/connections", { + headers: { + Authorization: `Bearer ${accessToken}`, + "Content-Type": "application/json", + }, + }); + + const raw = Array.isArray(response.data) ? response.data : []; + + // Normalize the Xero connections payload into our XeroTenant shape. The + // Xero API returns objects like: + // { id, tenantId, tenantType, tenantName, ... } + return raw + .map((entry: any) => ({ + id: entry.id, + tenantId: entry.tenantId, + tenantName: entry.tenantName, + tenantType: entry.tenantType, + })) + .filter((t: XeroTenant) => Boolean(t.tenantId)); + } + + // Refresh access tokens + async refreshQuickBooksToken(connectionId: string): Promise { + const connection = await this.getConnection(connectionId); + if (!connection || connection.provider !== AccountingProvider.QUICKBOOKS) { + throw new Error("QuickBooks connection not found"); + } + + try { + const response = await axios.post( + "https://oauth.platform.intuit.com/oauth2/v1/tokens/bearer", + new URLSearchParams({ + grant_type: "refresh_token", + refresh_token: connection.refreshToken, + }), + { + headers: { + "Content-Type": "application/x-www-form-urlencoded", + Authorization: `Basic ${Buffer.from( + `${this.quickbooksClientId}:${this.quickbooksClientSecret}`, + ).toString("base64")}`, + }, + }, + ); + + await this.updateConnectionTokens(connectionId, { + accessToken: response.data.access_token, + refreshToken: response.data.refresh_token, + expiresAt: new Date(Date.now() + response.data.expires_in * 1000), + }); + } catch (error) { + throw new Error(`QuickBooks token refresh failed: ${error}`); + } + } + + async refreshXeroToken(connectionId: string): Promise { + const connection = await this.getConnection(connectionId); + if (!connection || connection.provider !== AccountingProvider.XERO) { + throw new Error("Xero connection not found"); + } + + try { + const response = await axios.post( + "https://identity.xero.com/connect/token", + new URLSearchParams({ + grant_type: "refresh_token", + refresh_token: connection.refreshToken, + }), + { + headers: { + "Content-Type": "application/x-www-form-urlencoded", + Authorization: `Basic ${Buffer.from( + `${this.xeroClientId}:${this.xeroClientSecret}`, + ).toString("base64")}`, + }, + }, + ); + + await this.updateConnectionTokens(connectionId, { + accessToken: response.data.access_token, + refreshToken: response.data.refresh_token, + expiresAt: new Date(Date.now() + response.data.expires_in * 1000), + }); + } catch (error) { + throw new Error(`Xero token refresh failed: ${error}`); + } + } + + // Category mapping + async createCategoryMapping( + connectionId: string, + mobileMoneyCategory: string, + accountingCategoryId: string, + accountingCategoryName: string, + ): Promise { + const mapping: CategoryMapping = { + id: uuidv4(), + connectionId, + mobileMoneyCategory, + accountingCategoryId, + accountingCategoryName, + createdAt: new Date(), + }; + + await pool.query( + `INSERT INTO category_mappings (id, connection_id, mobile_money_category, accounting_category_id, accounting_category_name, created_at) + VALUES ($1, $2, $3, $4, $5, $6)`, + [ + mapping.id, + mapping.connectionId, + mapping.mobileMoneyCategory, + mapping.accountingCategoryId, + mapping.accountingCategoryName, + mapping.createdAt, + ], + ); + + return mapping; + } + + async getCategoryMappings(connectionId: string): Promise { + const result = await pool.query( + "SELECT * FROM category_mappings WHERE connection_id = $1 ORDER BY mobile_money_category", + [connectionId], + ); + + return result.rows; + } + + async getAccountingCategories( + connectionId: string, + ): Promise> { + const connection = await this.getConnection(connectionId); + if (!connection) { + throw new Error("Connection not found"); + } + + if (connection.provider === AccountingProvider.QUICKBOOKS) { + return this.getQuickBooksCategories(connection); + } else if (connection.provider === AccountingProvider.XERO) { + return this.getXeroCategories(connection); + } + + throw new Error("Unsupported provider"); + } + + private async getQuickBooksCategories( + connection: AccountingConnection, + ): Promise> { + await this.ensureValidToken(connection.id); + + const connectionData = await this.getConnection(connection.id); + const response = await axios.get( + `https://quickbooks.api.intuit.com/v3/company/${connectionData!.realmId}/query?query=SELECT * FROM Account WHERE Active=true`, + { + headers: { + Authorization: `Bearer ${connectionData!.accessToken}`, + Accept: "application/json", + }, + }, + ); + + return response.data.QueryResponse.Account.map((account: any) => ({ + id: account.Id, + name: account.Name, + })); + } + + private async getXeroCategories( + connection: AccountingConnection, + ): Promise> { + await this.ensureValidToken(connection.id); + + const connectionData = await this.getConnection(connection.id); + const response = await axios.get( + "https://api.xero.com/api.xro/2.0/Accounts", + { + headers: { + Authorization: `Bearer ${connectionData!.accessToken}`, + "Xero-tenant-id": connectionData!.tenantId, + Accept: "application/json", + }, + }, + ); + + return response.data.Accounts.map((account: any) => ({ + id: account.AccountID, + name: account.Name, + })); + } + + // Sync functions + async syncDailyPnL(connectionId: string, date: string): Promise { + const syncLog: SyncLog = { + id: uuidv4(), + connectionId, + syncType: "daily_pnl", + status: "in_progress", + recordsProcessed: 0, + recordsSucceeded: 0, + recordsFailed: 0, + syncedAt: new Date(), + }; + + await this.createSyncLog(syncLog); + + try { + const connection = await this.getConnection(connectionId); + if (!connection) { + throw new Error("Connection not found"); + } + + await this.ensureValidToken(connectionId); + + // Get PnL data for the date + const pnlData = await this.getPnLData(date); + + if (connection.provider === AccountingProvider.QUICKBOOKS) { + await this.syncPnLToQuickBooks(connection, pnlData, syncLog); + } else if (connection.provider === AccountingProvider.XERO) { + await this.syncPnLToXero(connection, pnlData, syncLog); + } + + syncLog.status = "completed"; + await this.updateSyncLog(syncLog); + } catch (error) { + syncLog.status = "failed"; + syncLog.errorMessage = + error instanceof Error ? error.message : "Unknown error"; + await this.updateSyncLog(syncLog); + } + + return syncLog; + } + + async syncFeeRevenue(connectionId: string, date: string): Promise { + const syncLog: SyncLog = { + id: uuidv4(), + connectionId, + syncType: "fee_revenue", + status: "in_progress", + recordsProcessed: 0, + recordsSucceeded: 0, + recordsFailed: 0, + syncedAt: new Date(), + }; + + await this.createSyncLog(syncLog); + + try { + const connection = await this.getConnection(connectionId); + if (!connection) { + throw new Error("Connection not found"); + } + + await this.ensureValidToken(connectionId); + + // Get fee revenue data for the date + const feeData = await this.getFeeRevenueData(date); + + if (connection.provider === AccountingProvider.QUICKBOOKS) { + await this.syncFeeRevenueToQuickBooks(connection, feeData, syncLog); + } else if (connection.provider === AccountingProvider.XERO) { + await this.syncFeeRevenueToXero(connection, feeData, syncLog); + } + + syncLog.status = "completed"; + await this.updateSyncLog(syncLog); + } catch (error) { + syncLog.status = "failed"; + syncLog.errorMessage = + error instanceof Error ? error.message : "Unknown error"; + await this.updateSyncLog(syncLog); + } + + return syncLog; + } + + // Database operations + private async saveConnection( + connection: AccountingConnection, + ): Promise { + await pool.query( + `INSERT INTO accounting_connections + (id, user_id, provider, realm_id, tenant_id, tenant_name, access_token, refresh_token, expires_at, is_active, created_at, updated_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) + ON CONFLICT (id) DO UPDATE SET + tenant_id = EXCLUDED.tenant_id, + tenant_name = EXCLUDED.tenant_name, + access_token = EXCLUDED.access_token, + refresh_token = EXCLUDED.refresh_token, + expires_at = EXCLUDED.expires_at, + is_active = EXCLUDED.is_active, + updated_at = EXCLUDED.updated_at`, + [ + connection.id, + connection.userId, + connection.provider, + connection.realmId, + connection.tenantId, + connection.tenantName, + connection.accessToken, + connection.refreshToken, + connection.expiresAt, + connection.isActive, + connection.createdAt, + connection.updatedAt, + ], + ); + } + + private async updateConnectionTokens( + connectionId: string, + tokens: { accessToken: string; refreshToken: string; expiresAt: Date }, + ): Promise { + await pool.query( + "UPDATE accounting_connections SET access_token = $1, refresh_token = $2, expires_at = $3, updated_at = $4 WHERE id = $5", + [ + tokens.accessToken, + tokens.refreshToken, + tokens.expiresAt, + new Date(), + connectionId, + ], + ); + } + + async getConnection( + connectionId: string, + ): Promise { + const result = await pool.query( + "SELECT * FROM accounting_connections WHERE id = $1", + [connectionId], + ); + + if (result.rows.length === 0) { + return null; + } + + return result.rows[0]; + } + + async getUserConnections(userId: string): Promise { + const result = await pool.query( + "SELECT * FROM accounting_connections WHERE user_id = $1 AND is_active = true ORDER BY created_at DESC", + [userId], + ); + + return result.rows; + } + + private async createSyncLog(syncLog: SyncLog): Promise { + await pool.query( + `INSERT INTO sync_logs + (id, connection_id, sync_type, status, records_processed, records_succeeded, records_failed, error_message, synced_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)`, + [ + syncLog.id, + syncLog.connectionId, + syncLog.syncType, + syncLog.status, + syncLog.recordsProcessed, + syncLog.recordsSucceeded, + syncLog.recordsFailed, + syncLog.errorMessage, + syncLog.syncedAt, + ], + ); + } + + private async updateSyncLog(syncLog: SyncLog): Promise { + await pool.query( + `UPDATE sync_logs SET + status = $1, records_processed = $2, records_succeeded = $3, records_failed = $4, error_message = $5 + WHERE id = $6`, + [ + syncLog.status, + syncLog.recordsProcessed, + syncLog.recordsSucceeded, + syncLog.recordsFailed, + syncLog.errorMessage, + syncLog.id, + ], + ); + } + + async getSyncLogs( + connectionId: string, + limit: number = 50, + ): Promise { + const result = await pool.query( + "SELECT * FROM sync_logs WHERE connection_id = $1 ORDER BY synced_at DESC LIMIT $2", + [connectionId, limit], + ); + + return result.rows; + } + + // Helper functions + private async ensureValidToken(connectionId: string): Promise { + const connection = await this.getConnection(connectionId); + if (!connection) { + throw new Error("Connection not found"); + } + + if (new Date() >= connection.expiresAt) { + if (connection.provider === AccountingProvider.QUICKBOOKS) { + await this.refreshQuickBooksToken(connectionId); + } else if (connection.provider === AccountingProvider.XERO) { + await this.refreshXeroToken(connectionId); + } + } + } + + private async getPnLData(date: string): Promise { + // Calculate PnL data from transactions + const query = ` + SELECT + COUNT(*) as transactions, + COALESCE(SUM(CASE WHEN type = 'credit' THEN amount ELSE 0 END), 0) as revenue, + COALESCE(SUM(fee), 0) as fees + FROM transactions + WHERE DATE(created_at) = $1 + AND status = 'completed' + `; + + const result = await pool.query(query, [date]); + const { transactions, revenue, fees } = result.rows[0]; + + return { + date, + revenue: parseFloat(revenue), + fees: parseFloat(fees), + netProfit: parseFloat(revenue) - parseFloat(fees), + transactions: parseInt(transactions), + }; + } + + private async getFeeRevenueData( + date: string, + ): Promise<{ category: string; amount: number }[]> { + // Get fee revenue broken down by category + const query = ` + SELECT + fee_category, + SUM(fee) as amount + FROM transactions + WHERE DATE(created_at) = $1 + AND status = 'completed' + AND fee > 0 + GROUP BY fee_category + ORDER BY amount DESC + `; + + const result = await pool.query(query, [date]); + return result.rows.map((row) => ({ + category: row.fee_category || "General Fees", + amount: parseFloat(row.amount), + })); + } + + // Provider-specific sync implementations + private async syncPnLToQuickBooks( + connection: AccountingConnection, + pnlData: PnLData, + syncLog: SyncLog, + ): Promise { + const connectionData = await this.getConnection(connection.id); + const mappings = await this.getCategoryMappings(connection.id); + + // Create journal entry for PnL + const journalEntry = { + TxnDate: pnlData.date, + Line: [ + { + Description: `Daily P&L - ${pnlData.date}`, + Amount: pnlData.revenue, + DetailType: "JournalEntryLineDetail", + JournalEntryLineDetail: { + PostingType: "Credit", + AccountRef: this.getMappedCategory(mappings, "revenue") || { + value: "1", + }, // Default to Sales + }, + }, + { + Description: `Daily Fees - ${pnlData.date}`, + Amount: pnlData.fees, + DetailType: "JournalEntryLineDetail", + JournalEntryLineDetail: { + PostingType: "Debit", + AccountRef: this.getMappedCategory(mappings, "fees") || { + value: "4", + }, // Default to Expense + }, + }, + ], + }; + + try { + await axios.post( + `https://quickbooks.api.intuit.com/v3/company/${connectionData!.realmId}/journalentry`, + journalEntry, + { + headers: { + Authorization: `Bearer ${connectionData!.accessToken}`, + "Content-Type": "application/json", + }, + }, + ); + + syncLog.recordsProcessed = 1; + syncLog.recordsSucceeded = 1; + } catch (error) { + syncLog.recordsProcessed = 1; + syncLog.recordsFailed = 1; + throw error; + } + } + + private async syncPnLToXero( + connection: AccountingConnection, + pnlData: PnLData, + syncLog: SyncLog, + ): Promise { + const connectionData = await this.getConnection(connection.id); + const mappings = await this.getCategoryMappings(connection.id); + + // Create manual journal entry for PnL + const journalEntry = { + Date: pnlData.date, + JournalLines: [ + { + Description: `Daily P&L - ${pnlData.date}`, + CreditAmount: pnlData.revenue, + AccountID: this.getMappedCategory(mappings, "revenue") || "1", // Default to Sales + }, + { + Description: `Daily Fees - ${pnlData.date}`, + DebitAmount: pnlData.fees, + AccountID: this.getMappedCategory(mappings, "fees") || "4", // Default to Expense + }, + ], + }; + + try { + await axios.put( + "https://api.xero.com/api.xro/2.0/ManualJournals", + journalEntry, + { + headers: { + Authorization: `Bearer ${connectionData!.accessToken}`, + "Xero-tenant-id": connectionData!.tenantId, + "Content-Type": "application/json", + }, + }, + ); + + syncLog.recordsProcessed = 1; + syncLog.recordsSucceeded = 1; + } catch (error) { + syncLog.recordsProcessed = 1; + syncLog.recordsFailed = 1; + throw error; + } + } + + private async syncFeeRevenueToQuickBooks( + connection: AccountingConnection, + feeData: Array<{ category: string; amount: number }>, + syncLog: SyncLog, + ): Promise { + const connectionData = await this.getConnection(connection.id); + const mappings = await this.getCategoryMappings(connection.id); + + const lines = feeData.map((fee) => ({ + Description: `Fee Revenue - ${fee.category}`, + Amount: fee.amount, + DetailType: "JournalEntryLineDetail", + JournalEntryLineDetail: { + PostingType: "Credit", + AccountRef: this.getMappedCategory(mappings, fee.category) || { + value: "1", + }, + }, + })); + + const journalEntry = { + TxnDate: new Date().toISOString().split("T")[0], + Line: lines, + }; + + try { + await axios.post( + `https://quickbooks.api.intuit.com/v3/company/${connectionData!.realmId}/journalentry`, + journalEntry, + { + headers: { + Authorization: `Bearer ${connectionData!.accessToken}`, + "Content-Type": "application/json", + }, + }, + ); + + syncLog.recordsProcessed = feeData.length; + syncLog.recordsSucceeded = feeData.length; + } catch (error) { + syncLog.recordsProcessed = feeData.length; + syncLog.recordsFailed = feeData.length; + throw error; + } + } + + private async syncFeeRevenueToXero( + connection: AccountingConnection, + feeData: Array<{ category: string; amount: number }>, + syncLog: SyncLog, + ): Promise { + const connectionData = await this.getConnection(connection.id); + const mappings = await this.getCategoryMappings(connection.id); + + const journalLines = feeData.map((fee) => ({ + Description: `Fee Revenue - ${fee.category}`, + CreditAmount: fee.amount, + AccountID: this.getMappedCategory(mappings, fee.category) || "1", + })); + + const journalEntry = { + Date: new Date().toISOString().split("T")[0], + JournalLines: journalLines, + }; + + try { + await axios.put( + "https://api.xero.com/api.xro/2.0/ManualJournals", + journalEntry, + { + headers: { + Authorization: `Bearer ${connectionData!.accessToken}`, + "Xero-tenant-id": connectionData!.tenantId, + "Content-Type": "application/json", + }, + }, + ); + + syncLog.recordsProcessed = feeData.length; + syncLog.recordsSucceeded = feeData.length; + } catch (error) { + syncLog.recordsProcessed = feeData.length; + syncLog.recordsFailed = feeData.length; + throw error; + } + } + + private async syncWithdrawalToXeroBill( + connection: AccountingConnection, + transaction: { + id: string; + userId: string; + type: string; + amount: number; + fee: number; + currency: string; + referenceNumber: string; + provider: string; + createdAt: Date; + }, + mappings: CategoryMapping[], + ): Promise { + const connectionData = await this.getConnection(connection.id); + const txnDate = transaction.createdAt.toISOString().split("T")[0]; + const withdrawalAccountId = this.getMappedCategory(mappings, "withdrawal"); + const feeAccountId = this.getMappedCategory(mappings, "fees"); + + const withdrawalLine: any = { + Description: `Withdrawal payout - ref:${transaction.referenceNumber} via ${transaction.provider}`, + Quantity: 1, + UnitAmount: transaction.amount, + TaxType: "NONE", + }; + + if (withdrawalAccountId) { + withdrawalLine.AccountID = withdrawalAccountId; + } else { + withdrawalLine.AccountCode = "500"; + } + + const lineItems: any[] = [withdrawalLine]; + + if (transaction.fee > 0) { + const feeLine: any = { + Description: `Fee - Withdrawal payout ref:${transaction.referenceNumber}`, + Quantity: 1, + UnitAmount: transaction.fee, + TaxType: "NONE", + }; + + if (feeAccountId) { + feeLine.AccountID = feeAccountId; + } else { + feeLine.AccountCode = "500"; + } + + lineItems.push(feeLine); + } + + const bill = { + Type: "ACCPAY", + Contact: { + Name: "Mobile Money Payouts", + }, + Date: txnDate, + DueDate: txnDate, + Reference: transaction.referenceNumber, + Status: "AUTHORISED", + LineItems: lineItems, + }; + + await axios.post( + "https://api.xero.com/api.xro/2.0/Bills", + { Bills: [bill] }, + { + headers: { + Authorization: `Bearer ${connectionData!.accessToken}`, + "Xero-tenant-id": connectionData!.tenantId, + "Content-Type": "application/json", + }, + }, + ); + } + + private async syncXeroTransactionToManualJournal( + connection: AccountingConnection, + transaction: { + id: string; + userId: string; + type: string; + amount: number; + fee: number; + currency: string; + referenceNumber: string; + provider: string; + createdAt: Date; + }, + ): Promise { + const freshConnection = await this.getConnection(connection.id); + const txnDate = transaction.createdAt.toISOString().split("T")[0]; + const description = `${transaction.type} - ref:${transaction.referenceNumber} via ${transaction.provider}`; + + const journalLines: object[] = [ + { + Description: description, + CreditAmount: transaction.amount, + AccountID: "revenue-account-id", + }, + ]; + + if (transaction.fee > 0) { + journalLines.push({ + Description: `Fee - ${description}`, + DebitAmount: transaction.fee, + AccountID: "expense-account-id", + }); + } + + await axios.put( + "https://api.xero.com/api.xro/2.0/ManualJournals", + { Date: txnDate, Narration: transaction.id, JournalLines: journalLines }, + { + headers: { + Authorization: `Bearer ${freshConnection!.accessToken}`, + "Xero-tenant-id": freshConnection!.tenantId, + "Content-Type": "application/json", + }, + }, + ); + } + + private getMappedCategory( + mappings: CategoryMapping[], + mobileMoneyCategory: string, + ): string | null { + const mapping = mappings.find( + (m) => m.mobileMoneyCategory === mobileMoneyCategory, + ); + return mapping ? mapping.accountingCategoryId : null; + } + + /** + * Sync a single completed transaction to all active accounting connections for the user. + * Called automatically when a transaction.completed event fires. + */ + async syncTransaction(transaction: { + id: string; + userId: string; + type: string; + amount: number; + fee: number; + currency: string; + referenceNumber: string; + provider: string; + createdAt: Date; + }): Promise { + const connections = await this.getUserConnections(transaction.userId); + if (connections.length === 0) return; + + for (const connection of connections) { + await this.ensureValidToken(connection.id); + const fresh = await this.getConnection(connection.id); + if (!fresh) continue; + + const txnDate = transaction.createdAt.toISOString().split("T")[0]; + const description = `${transaction.type} - ref:${transaction.referenceNumber} via ${transaction.provider}`; + + try { + if (connection.provider === AccountingProvider.QUICKBOOKS) { + await axios.post( + `https://quickbooks.api.intuit.com/v3/company/${fresh.realmId}/journalentry`, + { + TxnDate: txnDate, + PrivateNote: transaction.id, + Line: [ + { + Description: description, + Amount: transaction.amount, + DetailType: "JournalEntryLineDetail", + JournalEntryLineDetail: { + PostingType: "Credit", + AccountRef: { value: "1" }, // Sales / Revenue + }, + }, + ...(transaction.fee > 0 + ? [ + { + Description: `Fee - ${description}`, + Amount: transaction.fee, + DetailType: "JournalEntryLineDetail", + JournalEntryLineDetail: { + PostingType: "Debit", + AccountRef: { value: "4" }, // Expense + }, + }, + ] + : []), + ], + }, + { + headers: { + Authorization: `Bearer ${fresh.accessToken}`, + "Content-Type": "application/json", + }, + }, + ); + } else if (connection.provider === AccountingProvider.XERO) { + if (transaction.type === "withdraw") { + const mappings = await this.getCategoryMappings(connection.id); + const withdrawalAccountId = this.getMappedCategory( + mappings, + "withdrawal", + ); + + if (withdrawalAccountId) { + await this.syncWithdrawalToXeroBill(fresh, transaction, mappings); + } else { + await this.syncXeroTransactionToManualJournal(fresh, transaction); + } + } else { + await this.syncXeroTransactionToManualJournal(fresh, transaction); + } + } + + await pool.query( + `INSERT INTO accounting_sync_queue + (transaction_id, connection_id, status, synced_at) + VALUES ($1, $2, 'synced', NOW()) + ON CONFLICT (transaction_id, connection_id) DO UPDATE + SET status = 'synced', synced_at = NOW()`, + [transaction.id, connection.id], + ); + } catch (err) { + await pool.query( + `INSERT INTO accounting_sync_queue + (transaction_id, connection_id, status, error_message, synced_at) + VALUES ($1, $2, 'failed', $3, NOW()) + ON CONFLICT (transaction_id, connection_id) DO UPDATE + SET status = 'failed', error_message = $3, synced_at = NOW()`, + [ + transaction.id, + connection.id, + err instanceof Error ? err.message : String(err), + ], + ); + } + } + } +} diff --git a/src/services/mobilemoney/mobileMoneyService.ts b/src/services/mobilemoney/mobileMoneyService.ts index a138b6f5..ed5bf940 100644 --- a/src/services/mobilemoney/mobileMoneyService.ts +++ b/src/services/mobilemoney/mobileMoneyService.ts @@ -37,7 +37,11 @@ export interface MobileMoneyProvider { ): Promise<{ success: boolean; data?: unknown; error?: unknown }>; sendBatchPayout?( items: BatchPayoutItem[], - ): Promise<{ success: boolean; results: BatchPayoutResult[]; error?: unknown }>; + ): Promise<{ + success: boolean; + results: BatchPayoutResult[]; + error?: unknown; + }>; getTransactionStatus( referenceId: string, ): Promise<{ status: ProviderTransactionStatus }>; diff --git a/src/stellar/sep02.ts b/src/stellar/sep02.ts index 49c05601..620d8364 100644 --- a/src/stellar/sep02.ts +++ b/src/stellar/sep02.ts @@ -2,6 +2,7 @@ import { Router, Request, Response } from "express"; import { Pool } from "pg"; import { z } from "zod"; import crypto from "crypto"; +import { getNetworkPassphrase } from "../config/stellar"; // ── Validation Schema ──────────────────────────────────────────────────────── @@ -161,13 +162,9 @@ export function createFederationRouter(db: Pool): Router { // ── TOML Helper ────────────────────────────────────────────────────────────── export function buildStellarToml(): string { - const network = process.env.STELLAR_NETWORK || "testnet"; - const isMainnet = network === "mainnet"; - const passphrase = isMainnet - ? "Public Global Stellar Network ; September 2015" - : "Test SDF Network ; September 2015"; + const passphrase = getNetworkPassphrase(); const domain = process.env.STELLAR_FEDERATION_DOMAIN || "mobilemoney.com"; - + return [ `FEDERATION_SERVER="https://${domain}/federation"`, `NETWORK_PASSPHRASE="${passphrase}"`