diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 482b7bfd..bc8267f2 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -22,3 +22,4 @@ jobs: - run: npm run lint - run: npm run build - run: npm run docker:test + - run: npm run docker:test:deposit-webhook diff --git a/compose.test.deposit-webhook.yml b/compose.test.deposit-webhook.yml new file mode 100644 index 00000000..4c969a2e --- /dev/null +++ b/compose.test.deposit-webhook.yml @@ -0,0 +1,55 @@ +services: + test-deposit-webhook: + image: node:22-alpine + command: sh -c "npm install && npm run test:deposit-webhook:e2e" + working_dir: /usr/src/app + volumes: + - .:/usr/src/app + environment: + NODE_ENV: test + INTEGRATION_TESTS: true + DEPOSIT_WEBHOOK_E2E: true + CONFIG_FILE: xion-test.config.json + depends_on: + db: + condition: service_healthy + redis: + condition: service_healthy + xion: + condition: service_healthy + tty: true + + db: + image: timescale/timescaledb:2.18.1-pg17 + environment: + POSTGRES_DB: test + POSTGRES_USER: test + POSTGRES_PASSWORD: test + healthcheck: + test: ['CMD-SHELL', 'pg_isready -U test'] + interval: 1s + timeout: 3s + retries: 20 + + redis: + image: redis:7-alpine + healthcheck: + test: ['CMD-SHELL', 'redis-cli ping'] + interval: 1s + timeout: 3s + retries: 20 + + xion: + build: + context: . + dockerfile: docker/xiond-local/Dockerfile + healthcheck: + test: + [ + 'CMD-SHELL', + 'curl -fsS http://127.0.0.1:26657/status >/dev/null || exit 1', + ] + interval: 1s + timeout: 3s + retries: 40 + diff --git a/config.json.example b/config.json.example index ae3d9f4b..91f63d6a 100644 --- a/config.json.example +++ b/config.json.example @@ -1,4 +1,5 @@ { + "chainId": "juno-testnet", "home": "~/.juno/indexer", "localRpc": "http://localhost:26657", "remoteRpc": "https://juno-testnet-rpc.polkachu.com", @@ -48,6 +49,7 @@ "inboxSecret": "secret", "notifierSecret": "secret", "websocketsSecret": "secret", + "webhookTimeoutMs": 15000, "accountsJwtSecret": "secret", "rbamWebhookBaseUrl": "", "rbamWebhookSecret": "" diff --git a/docker/xiond-local/Dockerfile b/docker/xiond-local/Dockerfile new file mode 100644 index 00000000..78a2b1e2 --- /dev/null +++ b/docker/xiond-local/Dockerfile @@ -0,0 +1,28 @@ +FROM debian:bookworm-slim + +ARG XIOND_VERSION=28.1.0-rc1 + +RUN apt-get update \ + && apt-get install -y --no-install-recommends ca-certificates curl tar \ + && rm -rf /var/lib/apt/lists/* + +RUN arch="$(dpkg --print-architecture)" \ + && case "$arch" in \ + amd64) xion_arch="amd64" ;; \ + arm64) xion_arch="arm64" ;; \ + *) echo "Unsupported architecture: $arch" >&2; exit 1 ;; \ + esac \ + && curl -fsSL \ + "https://github.com/burnt-labs/xion/releases/download/v${XIOND_VERSION}/xiond_${XIOND_VERSION}_linux_${xion_arch}.tar.gz" \ + -o /tmp/xiond.tar.gz \ + && tar -xzf /tmp/xiond.tar.gz -C /usr/local/bin xiond \ + && chmod +x /usr/local/bin/xiond \ + && rm -f /tmp/xiond.tar.gz + +COPY docker/xiond-local/start.sh /usr/local/bin/start-xiond + +RUN chmod +x /usr/local/bin/start-xiond + +EXPOSE 26657 + +CMD ["start-xiond"] diff --git a/docker/xiond-local/start.sh b/docker/xiond-local/start.sh new file mode 100644 index 00000000..751eb6c4 --- /dev/null +++ b/docker/xiond-local/start.sh @@ -0,0 +1,52 @@ +#!/bin/sh + +set -eu + +XION_HOME="${XION_HOME:-/xion}" +CHAIN_ID="${CHAIN_ID:-localxion-1}" +DENOM="${DENOM:-uxion}" +MONIKER="${MONIKER:-local-validator}" +VALIDATOR_MNEMONIC="${VALIDATOR_MNEMONIC:-decorate bright ozone fork gallery riot bus exhaust worth way bone indoor calm squirrel merry zero scheme cotton until shop any excess stage laundry}" + +mkdir -p "$XION_HOME" + +if [ ! -f "$XION_HOME/config/genesis.json" ]; then + xiond init "$MONIKER" \ + --chain-id "$CHAIN_ID" \ + --default-denom "$DENOM" \ + --home "$XION_HOME" \ + --overwrite >/dev/null + + printf '%s\n' "$VALIDATOR_MNEMONIC" | xiond keys add validator \ + --recover \ + --keyring-backend test \ + --home "$XION_HOME" >/dev/null + + xiond genesis add-genesis-account validator "100000000000${DENOM}" \ + --keyring-backend test \ + --home "$XION_HOME" >/dev/null + + xiond genesis gentx validator "100000000${DENOM}" \ + --chain-id "$CHAIN_ID" \ + --keyring-backend test \ + --home "$XION_HOME" >/dev/null + + xiond genesis collect-gentxs --home "$XION_HOME" >/dev/null + + sed -i "s/^minimum-gas-prices = .*/minimum-gas-prices = \"0${DENOM}\"/" \ + "$XION_HOME/config/app.toml" + sed -i 's/^timeout_commit = .*/timeout_commit = "1s"/' \ + "$XION_HOME/config/config.toml" + sed -i 's/^timeout_propose = .*/timeout_propose = "1s"/' \ + "$XION_HOME/config/config.toml" + sed -i 's/^timeout_prevote = .*/timeout_prevote = "500ms"/' \ + "$XION_HOME/config/config.toml" + sed -i 's/^timeout_precommit = .*/timeout_precommit = "500ms"/' \ + "$XION_HOME/config/config.toml" +fi + +exec xiond start \ + --home "$XION_HOME" \ + --minimum-gas-prices "0${DENOM}" \ + --rpc.laddr tcp://0.0.0.0:26657 \ + --log_level warn diff --git a/docs/api.md b/docs/api.md index 2411e175..1ae7f700 100644 --- a/docs/api.md +++ b/docs/api.md @@ -364,6 +364,179 @@ On error: } ``` +#### GET `/deposit-webhook-registrations` + +List deposit webhook registrations for the authenticated account. + +Response: + +```ts +{ + "registrations": { + "id": number + "description": string | null + "endpointUrl": string + "authHeader": string | null + "authToken": string | null + "watchedWallets": string[] + "allowedNativeDenoms": string[] + "allowedCw20Contracts": string[] + "enabled": boolean + }[] +} +``` + +#### POST `/deposit-webhook-registrations` + +Create a new deposit webhook registration. + +This registers a deposit-detection listener for matching inbound transfers to +the supplied watched wallets and asset filters. It is not a generic +balance-change subscription. + +Request: + +```ts +{ + "description": string | null + "endpointUrl": string + "authHeader": string | null + "authToken": string | null + "watchedWallets": string[] + "allowedNativeDenoms": string[] + "allowedCw20Contracts": string[] + "enabled": boolean | undefined +} +``` + +Response: + +```ts +{ + "registration": { + "id": number + "description": string | null + "endpointUrl": string + "authHeader": string | null + "authToken": string | null + "watchedWallets": string[] + "allowedNativeDenoms": string[] + "allowedCw20Contracts": string[] + "enabled": boolean + } +} +``` + +Registered destinations receive the outbound deposit payload documented in +[webhooks.md](./webhooks.md), including the stable `txHash` field for downstream +on-chain verification. + +or error: + +```ts +{ + "error": string +} +``` + +Example: + +```sh +curl -X POST https://daodaoindexer.burnt.com/deposit-webhook-registrations \ + -H 'Authorization: Bearer ' \ + -H 'Content-Type: application/json' \ + -d '{ + "description": "Production deposit listener", + "endpointUrl": "https://partner.example/deposits", + "authHeader": "Authorization", + "authToken": "secret-token", + "watchedWallets": ["xion1watchedwallet", "xion1secondwallet"], + "allowedNativeDenoms": ["uxion"], + "allowedCw20Contracts": ["xion1stablecoincontract"], + "enabled": true + }' +``` + +#### PATCH `/deposit-webhook-registrations/:id` + +Update an existing deposit webhook registration. + +Request: + +```ts +{ + "description": string | null | undefined + "endpointUrl": string | undefined + "authHeader": string | null | undefined + "authToken": string | null | undefined + "watchedWallets": string[] | undefined + "allowedNativeDenoms": string[] | undefined + "allowedCw20Contracts": string[] | undefined + "enabled": boolean | undefined +} +``` + +Response: + +```ts +{ + "registration": { + "id": number + "description": string | null + "endpointUrl": string + "authHeader": string | null + "authToken": string | null + "watchedWallets": string[] + "allowedNativeDenoms": string[] + "allowedCw20Contracts": string[] + "enabled": boolean + } +} +``` + +or error: + +```ts +{ + "error": string +} +``` + +Example: + +```sh +curl -X PATCH https://daodaoindexer.burnt.com/deposit-webhook-registrations/7 \ + -H 'Authorization: Bearer ' \ + -H 'Content-Type: application/json' \ + -d '{ + "watchedWallets": ["xion1watchedwallet", "xion1thirdwallet"], + "allowedNativeDenoms": [], + "allowedCw20Contracts": ["xion1stablecoincontract"], + "enabled": true + }' +``` + +#### DELETE `/deposit-webhook-registrations/:id` + +Delete a deposit webhook registration. + +Empty response on success. + +On error: + +```ts +{ + "error": string +} +``` + +Example: + +```sh +curl -X DELETE https://daodaoindexer.burnt.com/deposit-webhook-registrations/7 \ + -H 'Authorization: Bearer ' +``` + #### GET `/webhooks` List webhooks. diff --git a/docs/webhooks.md b/docs/webhooks.md index be6f2c4f..dfcc087f 100644 --- a/docs/webhooks.md +++ b/docs/webhooks.md @@ -23,6 +23,11 @@ called with a HTTP request, while Soketi endpoints use the `soketi` config and a JS library to interact with it. If you are not using WebSockets, you can ignore Soketi and use URL endpoints only. +URL webhooks are delivered with at-least-once semantics. BullMQ retries failed +jobs up to 3 times with exponential backoff, and the queue worker applies an +explicit HTTP timeout to each outbound request. Consumers should treat webhook +payloads as retryable and use a deterministic idempotency key when available. + ```ts type Webhook< Event extends DependableEventModel = DependableEventModel, @@ -145,3 +150,126 @@ const makeIndexerCwReceiptPaid: WebhookMaker = (config) => } ``` ```` + +## Deposit webhook + +The Xion deposit webhook integration emits normalized deposit detections as +`Extraction` events and forwards them through the built-in webhook queue. +Registrations are created per account through the authenticated account API, not +through static indexer config. + +This is a deposit-detection webhook, not a generic balance-change feed. It only +fires when the indexer observes a matching inbound native-bank or CW20 transfer +into a watched wallet for an allowed asset. + +Create a registration with `POST /deposit-webhook-registrations`: + +```ts +{ + "description": "Sandbox deposit listener", + "endpointUrl": "https://partner.example/deposits", + "authHeader": "Authorization", + "authToken": "secret-token", + "watchedWallets": ["xion1..."], + "allowedNativeDenoms": ["uxion"], + "allowedCw20Contracts": ["xion1stablecoin..."], + "enabled": true +} +``` + +Example: + +```sh +curl -X POST https://daodaoindexer.burnt.com/deposit-webhook-registrations \ + -H 'Authorization: Bearer ' \ + -H 'Content-Type: application/json' \ + -d '{ + "description": "Sandbox deposit listener", + "endpointUrl": "https://partner.example/deposits", + "authHeader": "Authorization", + "authToken": "secret-token", + "watchedWallets": ["xion1watchedwallet"], + "allowedNativeDenoms": ["uxion"], + "allowedCw20Contracts": ["xion1stablecoincontract"], + "enabled": true + }' +``` + +Each registration owns: + +- the destination webhook URL +- optional auth header and token +- one or more watched wallet addresses +- one or more allowed native denoms and/or CW20 contract addresses + +When a matching deposit is detected, the indexer sends `POST` to the +registration's `endpointUrl` with: + +- `Content-Type: application/json` +- `Idempotency-Key: ` +- the configured auth header, if one was supplied + +Example native-asset payload: + +```json +{ + "idempotencyKey": "xion-mainnet-1:7:ABC123...:xion1watchedwallet:native:uxion:42000000:2:0", + "wallet": "xion1watchedwallet", + "recipient": "xion1watchedwallet", + "sender": "xion1senderwallet", + "amount": "42000000", + "assetType": "native", + "denom": "uxion", + "contractAddress": null, + "blockHeight": "1234567", + "blockTimeUnixMs": "1710000000000", + "txHash": "ABC123..." +} +``` + +The `txHash` field is part of the stable deposit webhook payload contract and is +intended to be used by downstream consumers for on-chain verification and +idempotent ingest. + +Example CW20 payload: + +```json +{ + "idempotencyKey": "xion-mainnet-1:7:DEF456...:xion1watchedwallet:cw20:xion1stablecoincontract:1000000:4", + "wallet": "xion1watchedwallet", + "recipient": "xion1watchedwallet", + "sender": "xion1senderwallet", + "amount": "1000000", + "assetType": "cw20", + "denom": null, + "contractAddress": "xion1stablecoincontract", + "blockHeight": "1234568", + "blockTimeUnixMs": "1710000005000", + "txHash": "DEF456..." +} +``` + +For bank multi-send events, `sender` may be `null` when multiple input wallets +fund the same output and the provenance is ambiguous. + +The deposit webhook extractor uses `chainId` as part of the deterministic +idempotency key. If `chainId` is omitted from config, it falls back to the +connected RPC client chain ID. + +When `authHeader` is `Authorization`, the indexer automatically prefixes the +token with `Bearer ` unless the token already includes it. Deposit webhook +requests also include an `Idempotency-Key` header derived from the normalized +deposit event. Consumers should treat the body plus `Idempotency-Key` and +`txHash` as the canonical deposit-detection payload. + +Delivery is at-least-once. Failed webhook jobs are retried by BullMQ with +exponential backoff, and duplicate delivery is possible. Consumers should treat +the webhook as a retryable signal and make downstream ingestion idempotent. + +Operational guidance: + +- Return a `2xx` response as soon as the request is durably accepted. +- Do on-chain verification asynchronously after acknowledgement. +- Use `Idempotency-Key` and/or `txHash` to make ingestion idempotent. +- Keep the endpoint fast. The queue worker applies an HTTP timeout to outbound + webhook delivery. diff --git a/package.json b/package.json index 071f1124..7f8c27ed 100644 --- a/package.json +++ b/package.json @@ -9,6 +9,8 @@ "format": "eslint . --fix", "test": "NODE_ENV=test node --nolazy --inspect=0.0.0.0:9227 ./node_modules/vitest/vitest.mjs --no-file-parallelism", "docker:test": "docker compose -f compose.test.yml up --exit-code-from test", + "test:deposit-webhook:e2e": "NODE_ENV=test DEPOSIT_WEBHOOK_E2E=true node --nolazy ./node_modules/vitest/vitest.mjs run src/test/e2e/depositWebhook.e2e.test.ts --config vitest.config.mts --no-file-parallelism", + "docker:test:deposit-webhook": "docker compose -f compose.test.deposit-webhook.yml up --exit-code-from test-deposit-webhook --no-attach db --no-attach redis --no-attach xion", "docker:test:dump": "docker compose -f compose.dump-test.yml up --exit-code-from test", "docker:test:computer": "docker compose -f compose.test.yml -f compose.test-computer.yml up --exit-code-from test", "coverage": "npm run test -- --coverage", diff --git a/src/db/connection.ts b/src/db/connection.ts index 69ee472e..868f3600 100644 --- a/src/db/connection.ts +++ b/src/db/connection.ts @@ -10,6 +10,7 @@ import { objectMatchesStructure } from '@/utils' import { Account, AccountCodeIdSet, + AccountDepositWebhookRegistration, AccountKey, AccountKeyCredit, AccountWebhook, @@ -73,6 +74,7 @@ const getModelsForType = (type: DbType): SequelizeOptions['models'] => ? [ Account, AccountCodeIdSet, + AccountDepositWebhookRegistration, AccountKey, AccountKeyCredit, AccountWebhook, diff --git a/src/db/migrations/20260411132020-create-account-deposit-webhook-registration.ts b/src/db/migrations/20260411132020-create-account-deposit-webhook-registration.ts new file mode 100644 index 00000000..664259fc --- /dev/null +++ b/src/db/migrations/20260411132020-create-account-deposit-webhook-registration.ts @@ -0,0 +1,79 @@ +import { QueryInterface, fn } from 'sequelize' +import { DataType } from 'sequelize-typescript' + +module.exports = { + async up(queryInterface: QueryInterface) { + await queryInterface.createTable('AccountDepositWebhookRegistrations', { + id: { + primaryKey: true, + autoIncrement: true, + type: DataType.INTEGER, + }, + accountPublicKey: { + allowNull: false, + type: DataType.STRING, + references: { + model: 'Accounts', + key: 'publicKey', + }, + onUpdate: 'CASCADE', + onDelete: 'CASCADE', + }, + description: { + allowNull: true, + type: DataType.STRING, + }, + endpointUrl: { + allowNull: false, + type: DataType.STRING, + }, + authHeader: { + allowNull: true, + type: DataType.STRING, + }, + authToken: { + allowNull: true, + type: DataType.STRING, + }, + watchedWallets: { + allowNull: false, + type: DataType.ARRAY(DataType.STRING), + defaultValue: [], + }, + allowedNativeDenoms: { + allowNull: false, + type: DataType.ARRAY(DataType.STRING), + defaultValue: [], + }, + allowedCw20Contracts: { + allowNull: false, + type: DataType.ARRAY(DataType.STRING), + defaultValue: [], + }, + enabled: { + allowNull: false, + type: DataType.BOOLEAN, + defaultValue: true, + }, + createdAt: { + allowNull: false, + type: DataType.DATE, + defaultValue: fn('NOW'), + }, + updatedAt: { + allowNull: false, + type: DataType.DATE, + defaultValue: fn('NOW'), + }, + }) + await queryInterface.addIndex('AccountDepositWebhookRegistrations', { + fields: ['accountPublicKey'], + }) + await queryInterface.addIndex('AccountDepositWebhookRegistrations', { + fields: ['enabled'], + }) + }, + async down(queryInterface: QueryInterface) { + await queryInterface.dropTable('AccountDepositWebhookRegistrations') + }, +} diff --git a/src/db/models/Account.ts b/src/db/models/Account.ts index 1d9860e8..fe5b8e3e 100644 --- a/src/db/models/Account.ts +++ b/src/db/models/Account.ts @@ -15,6 +15,7 @@ import { import { ConfigManager } from '@/config' import { AccountCodeIdSet } from './AccountCodeIdSet' +import { AccountDepositWebhookRegistration } from './AccountDepositWebhookRegistration' import { AccountKey } from './AccountKey' import { AccountKeyCredit, @@ -43,6 +44,9 @@ export class Account extends Model { @HasMany(() => AccountWebhook, 'accountPublicKey') declare webhooks: AccountWebhook[] + @HasMany(() => AccountDepositWebhookRegistration, 'accountPublicKey') + declare depositWebhookRegistrations: AccountDepositWebhookRegistration[] + @HasMany(() => AccountCodeIdSet, 'accountPublicKey') declare codeIdSets: AccountCodeIdSet[] diff --git a/src/db/models/AccountDepositWebhookRegistration.test.ts b/src/db/models/AccountDepositWebhookRegistration.test.ts new file mode 100644 index 00000000..b244e781 --- /dev/null +++ b/src/db/models/AccountDepositWebhookRegistration.test.ts @@ -0,0 +1,122 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest' + +import { AccountDepositWebhookRegistration } from './AccountDepositWebhookRegistration' + +describe('AccountDepositWebhookRegistration', () => { + const makeRegistration = ( + overrides: Partial = {} + ) => + ({ + id: 7, + accountPublicKey: 'account', + description: null, + endpointUrl: 'https://partner.example/deposits', + authHeader: null, + authToken: null, + watchedWallets: ['xion1watchedwallet'], + allowedNativeDenoms: ['uxion'], + allowedCw20Contracts: ['xion1stablecoincontract'], + enabled: true, + matchesNativeDeposit: + AccountDepositWebhookRegistration.prototype.matchesNativeDeposit, + matchesCw20Deposit: + AccountDepositWebhookRegistration.prototype.matchesCw20Deposit, + ...overrides, + } as unknown as AccountDepositWebhookRegistration) + + beforeEach(() => { + AccountDepositWebhookRegistration.invalidateActiveRegistrationsCache() + vi.restoreAllMocks() + }) + + it('matches native and cw20 deposits only when enabled and filtered', () => { + const registration = makeRegistration() + + expect( + registration.matchesNativeDeposit('xion1watchedwallet', 'uxion') + ).toBe(true) + expect(registration.matchesNativeDeposit('xion1otherwallet', 'uxion')).toBe( + false + ) + expect( + registration.matchesCw20Deposit( + 'xion1watchedwallet', + 'xion1stablecoincontract' + ) + ).toBe(true) + expect( + registration.matchesCw20Deposit( + 'xion1watchedwallet', + 'xion1othercontract' + ) + ).toBe(false) + + registration.enabled = false + + expect( + registration.matchesNativeDeposit('xion1watchedwallet', 'uxion') + ).toBe(false) + expect( + registration.matchesCw20Deposit( + 'xion1watchedwallet', + 'xion1stablecoincontract' + ) + ).toBe(false) + }) + + it('reuses cached enabled registrations until invalidated', async () => { + const findAllSpy = vi.spyOn(AccountDepositWebhookRegistration, 'findAll') + findAllSpy.mockResolvedValue([makeRegistration()]) + + const first = await AccountDepositWebhookRegistration.getEnabledCached() + const second = await AccountDepositWebhookRegistration.getEnabledCached() + + expect(first).toHaveLength(1) + expect(second).toHaveLength(1) + expect(findAllSpy).toHaveBeenCalledTimes(1) + }) + + it('invalidates the enabled-registration cache after save and destroy', async () => { + const publishSpy = vi + .spyOn( + AccountDepositWebhookRegistration, + 'publishActiveRegistrationsCacheInvalidation' + ) + .mockResolvedValue() + const findAllSpy = vi.spyOn(AccountDepositWebhookRegistration, 'findAll') + const firstRegistration = makeRegistration({ + id: 7, + watchedWallets: ['xion1watchedwallet'], + }) + const secondRegistration = makeRegistration({ + id: 8, + watchedWallets: ['xion1secondwallet'], + endpointUrl: 'https://partner.example/deposits-two', + }) + findAllSpy + .mockResolvedValueOnce([firstRegistration]) + .mockResolvedValueOnce([firstRegistration, secondRegistration]) + .mockResolvedValueOnce([firstRegistration]) + + await AccountDepositWebhookRegistration.getEnabledCached() + expect(findAllSpy).toHaveBeenCalledTimes(1) + + await AccountDepositWebhookRegistration.afterSaveHook() + + const afterCreate = + await AccountDepositWebhookRegistration.getEnabledCached() + expect(findAllSpy).toHaveBeenCalledTimes(2) + expect(afterCreate.map(({ id }) => id)).toEqual([ + firstRegistration.id, + secondRegistration.id, + ]) + + await AccountDepositWebhookRegistration.afterDestroyHook() + + const afterDestroy = + await AccountDepositWebhookRegistration.getEnabledCached() + expect(findAllSpy).toHaveBeenCalledTimes(3) + expect(afterDestroy.map(({ id }) => id)).toEqual([firstRegistration.id]) + expect(publishSpy).toHaveBeenCalledTimes(2) + }) +}) diff --git a/src/db/models/AccountDepositWebhookRegistration.ts b/src/db/models/AccountDepositWebhookRegistration.ts new file mode 100644 index 00000000..d69403ce --- /dev/null +++ b/src/db/models/AccountDepositWebhookRegistration.ts @@ -0,0 +1,257 @@ +import type Redis from 'ioredis' +import { + AfterDestroy, + AfterSave, + AllowNull, + AutoIncrement, + BelongsTo, + Column, + DataType, + Default, + ForeignKey, + Model, + PrimaryKey, + Table, +} from 'sequelize-typescript' + +import { getRedis, getRedisConfig } from '@/config' + +import { Account } from './Account' + +export type AccountDepositWebhookRegistrationApiJson = { + id: number + description: string | null + endpointUrl: string + authHeader: string | null + authToken: string | null + watchedWallets: string[] + allowedNativeDenoms: string[] + allowedCw20Contracts: string[] + enabled: boolean +} + +type ActiveRegistrationsCache = { + cachedAt: number + registrations: AccountDepositWebhookRegistration[] +} + +@Table({ + timestamps: true, +}) +export class AccountDepositWebhookRegistration extends Model { + static readonly activeRegistrationsCacheInvalidationChannel = + 'account-deposit-webhook-registrations:invalidate' + + @PrimaryKey + @AutoIncrement + @Column(DataType.INTEGER) + declare id: number + + @AllowNull(false) + @ForeignKey(() => Account) + @Column(DataType.STRING) + declare accountPublicKey: string + + @BelongsTo(() => Account) + declare account: Account + + @AllowNull + @Column(DataType.STRING) + declare description: string | null + + @AllowNull(false) + @Column(DataType.STRING) + declare endpointUrl: string + + @AllowNull + @Column(DataType.STRING) + declare authHeader: string | null + + @AllowNull + @Column(DataType.STRING) + declare authToken: string | null + + @AllowNull(false) + @Default([]) + @Column(DataType.ARRAY(DataType.STRING)) + declare watchedWallets: string[] + + @AllowNull(false) + @Default([]) + @Column(DataType.ARRAY(DataType.STRING)) + declare allowedNativeDenoms: string[] + + @AllowNull(false) + @Default([]) + @Column(DataType.ARRAY(DataType.STRING)) + declare allowedCw20Contracts: string[] + + @AllowNull(false) + @Default(true) + @Column(DataType.BOOLEAN) + declare enabled: boolean + + get apiJson(): AccountDepositWebhookRegistrationApiJson { + return { + id: this.id, + description: this.description, + endpointUrl: this.endpointUrl, + authHeader: this.authHeader, + authToken: this.authToken, + watchedWallets: this.watchedWallets || [], + allowedNativeDenoms: this.allowedNativeDenoms || [], + allowedCw20Contracts: this.allowedCw20Contracts || [], + enabled: this.enabled, + } + } + + matchesNativeDeposit(wallet: string, denom: string): boolean { + return ( + this.enabled && + (this.watchedWallets || []).includes(wallet) && + (this.allowedNativeDenoms || []).includes(denom) + ) + } + + matchesCw20Deposit(wallet: string, contractAddress: string): boolean { + return ( + this.enabled && + (this.watchedWallets || []).includes(wallet) && + (this.allowedCw20Contracts || []).includes(contractAddress) + ) + } + + private static activeRegistrationsCache?: ActiveRegistrationsCache + private static activeRegistrationsCacheTtlMs = 5_000 + private static activeRegistrationsCacheSubscriber?: Redis + private static activeRegistrationsCacheSubscriberReady?: Promise + + static invalidateActiveRegistrationsCache() { + this.activeRegistrationsCache = undefined + } + + static async ensureActiveRegistrationsCacheSubscription(): Promise { + if (this.activeRegistrationsCacheSubscriberReady) { + await this.activeRegistrationsCacheSubscriberReady + return + } + + if (!getRedisConfig()) { + return + } + + const subscriber = getRedis() + subscriber.on('error', (error) => { + console.error( + 'Error in deposit webhook registration cache invalidation subscriber:', + error + ) + }) + subscriber.on('message', (channel) => { + if (channel === this.activeRegistrationsCacheInvalidationChannel) { + this.invalidateActiveRegistrationsCache() + } + }) + + this.activeRegistrationsCacheSubscriber = subscriber + this.activeRegistrationsCacheSubscriberReady = subscriber + .subscribe(this.activeRegistrationsCacheInvalidationChannel) + .then(() => undefined) + .catch((error) => { + this.activeRegistrationsCacheSubscriber = undefined + this.activeRegistrationsCacheSubscriberReady = undefined + subscriber.disconnect() + console.error( + 'Error subscribing to deposit webhook registration cache invalidation:', + error + ) + }) + + await this.activeRegistrationsCacheSubscriberReady + } + + static async closeActiveRegistrationsCacheSubscription(): Promise { + const subscriber = this.activeRegistrationsCacheSubscriber + this.activeRegistrationsCacheSubscriber = undefined + this.activeRegistrationsCacheSubscriberReady = undefined + + if (!subscriber) { + return + } + + await subscriber.quit().catch(() => { + subscriber.disconnect() + }) + } + + static async publishActiveRegistrationsCacheInvalidation(): Promise { + if (!getRedisConfig()) { + return + } + + const publisher = getRedis() + + try { + await publisher.publish( + this.activeRegistrationsCacheInvalidationChannel, + Date.now().toString() + ) + await publisher.quit() + } catch (error) { + publisher.disconnect() + console.error( + 'Error publishing deposit webhook registration cache invalidation:', + error + ) + } + } + + static async getEnabledCached(): Promise< + AccountDepositWebhookRegistration[] + > { + if ( + this.activeRegistrationsCache && + Date.now() - this.activeRegistrationsCache.cachedAt < + this.activeRegistrationsCacheTtlMs + ) { + return this.activeRegistrationsCache.registrations + } + + const registrations = await this.findAll({ + where: { + enabled: true, + }, + order: [['id', 'ASC']], + }) + + this.activeRegistrationsCache = { + cachedAt: Date.now(), + registrations, + } + + return registrations + } + + static async findEnabledByPk( + id: number + ): Promise { + return await this.findOne({ + where: { + id, + enabled: true, + }, + }) + } + + @AfterSave + static async afterSaveHook() { + this.invalidateActiveRegistrationsCache() + await this.publishActiveRegistrationsCacheInvalidation() + } + + @AfterDestroy + static async afterDestroyHook() { + this.invalidateActiveRegistrationsCache() + await this.publishActiveRegistrationsCacheInvalidation() + } +} diff --git a/src/db/models/index.ts b/src/db/models/index.ts index 4e71da02..a07e0464 100644 --- a/src/db/models/index.ts +++ b/src/db/models/index.ts @@ -1,5 +1,6 @@ export * from './Account' export * from './AccountCodeIdSet' +export * from './AccountDepositWebhookRegistration' export * from './AccountKey' export * from './AccountKeyCredit' export * from './AccountWebhook' diff --git a/src/listener/extractors/xion/depositWebhook.test.ts b/src/listener/extractors/xion/depositWebhook.test.ts new file mode 100644 index 00000000..9530c3aa --- /dev/null +++ b/src/listener/extractors/xion/depositWebhook.test.ts @@ -0,0 +1,654 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest' + +import { ConfigManager } from '@/config' +import { AccountDepositWebhookRegistration, Extraction } from '@/db' +import { ExtractorEnv, ExtractorHandleableData } from '@/types' + +import { + IndexedWasmEventDataSource, + StargateMessageDataSource, +} from '../../sources' +import { + DEPOSIT_WEBHOOK_EXTRACTION_PREFIX, + XionDepositWebhookExtractor, +} from './depositWebhook' + +describe('XionDepositWebhookExtractor', () => { + let extractor: XionDepositWebhookExtractor + + const makeRegistration = ( + overrides: Partial = {} + ) => + ({ + id: 7, + accountPublicKey: 'account', + description: 'Sandbox deposit listener', + endpointUrl: 'https://partner.example/deposits', + authHeader: 'Authorization', + authToken: 'secret-token', + watchedWallets: ['xion1watchedwallet'], + allowedNativeDenoms: ['uxion'], + allowedCw20Contracts: ['xion1stablecoincontract'], + enabled: true, + matchesNativeDeposit: + AccountDepositWebhookRegistration.prototype.matchesNativeDeposit, + matchesCw20Deposit: + AccountDepositWebhookRegistration.prototype.matchesCw20Deposit, + ...overrides, + } as unknown as AccountDepositWebhookRegistration) + + beforeEach(() => { + vi.clearAllMocks() + vi.spyOn( + AccountDepositWebhookRegistration, + 'getEnabledCached' + ).mockResolvedValue([makeRegistration()]) + vi.spyOn(Extraction, 'bulkCreate').mockImplementation( + async (records) => records as any + ) + + const env: ExtractorEnv = { + config: { + ...ConfigManager.load(), + chainId: 'xion-testnet-1', + }, + sendWebhooks: false, + autoCosmWasmClient: {} as any, + txHash: 'test-tx-hash', + block: { + height: '12345', + timeUnixMs: '1700000000000', + timestamp: '2023-11-14T22:13:20.000Z', + }, + } + + extractor = new XionDepositWebhookExtractor(env) + }) + + it('extracts native deposits to watched wallets', async () => { + const data: ExtractorHandleableData[] = [ + StargateMessageDataSource.handleable('bankTransfer', { + typeUrl: '/cosmos.bank.v1beta1.MsgSend', + messageIndex: 2, + value: { + fromAddress: 'xion1senderwallet', + toAddress: 'xion1watchedwallet', + amount: [ + { + denom: 'uxion', + amount: '42', + }, + ], + }, + }), + ] + + const result = (await extractor.extract(data)) as Extraction[] + + expect(result).toHaveLength(1) + expect(result[0].address).toBe('xion1watchedwallet') + expect(result[0].name).toContain(DEPOSIT_WEBHOOK_EXTRACTION_PREFIX) + expect(result[0].data).toEqual({ + registrationId: 7, + idempotencyKey: + 'xion-testnet-1:7:test-tx-hash:xion1watchedwallet:native:uxion:42:2:0', + wallet: 'xion1watchedwallet', + recipient: 'xion1watchedwallet', + sender: 'xion1senderwallet', + amount: '42', + assetType: 'native', + denom: 'uxion', + contractAddress: null, + blockHeight: '12345', + blockTimeUnixMs: '1700000000000', + txHash: 'test-tx-hash', + }) + }) + + it('extracts cw20 deposits for allowed contracts', async () => { + const data: ExtractorHandleableData[] = [ + IndexedWasmEventDataSource.handleable('cw20Transfer', { + address: 'xion1stablecoincontract', + key: 'action', + value: 'transfer', + eventIndex: 4, + attributes: { + action: ['transfer'], + sender: ['xion1senderwallet'], + recipient: ['xion1watchedwallet'], + amount: ['1000000'], + }, + _attributes: [ + { key: '_contract_address', value: 'xion1stablecoincontract' }, + { key: 'action', value: 'transfer' }, + { key: 'sender', value: 'xion1senderwallet' }, + { key: 'recipient', value: 'xion1watchedwallet' }, + { key: 'amount', value: '1000000' }, + ], + }), + ] + + const result = (await extractor.extract(data)) as Extraction[] + + expect(result).toHaveLength(1) + expect(result[0].address).toBe('xion1watchedwallet') + expect(result[0].data).toEqual({ + registrationId: 7, + idempotencyKey: + 'xion-testnet-1:7:test-tx-hash:xion1watchedwallet:cw20:xion1stablecoincontract:1000000:4', + wallet: 'xion1watchedwallet', + recipient: 'xion1watchedwallet', + sender: 'xion1senderwallet', + amount: '1000000', + assetType: 'cw20', + denom: null, + contractAddress: 'xion1stablecoincontract', + blockHeight: '12345', + blockTimeUnixMs: '1700000000000', + txHash: 'test-tx-hash', + }) + }) + + it('falls back to the connected client chain ID when config chainId is empty', async () => { + extractor = new XionDepositWebhookExtractor({ + ...extractor.env, + config: { + ...extractor.env.config, + chainId: '', + }, + autoCosmWasmClient: { + chainId: 'xion-mainnet-1', + } as any, + }) + + const data: ExtractorHandleableData[] = [ + StargateMessageDataSource.handleable('bankTransfer', { + typeUrl: '/cosmos.bank.v1beta1.MsgSend', + messageIndex: 7, + value: { + fromAddress: 'xion1senderwallet', + toAddress: 'xion1watchedwallet', + amount: [ + { + denom: 'uxion', + amount: '42', + }, + ], + }, + }), + ] + + const result = (await extractor.extract(data)) as Extraction[] + + expect(result).toHaveLength(1) + expect(result[0].data).toMatchObject({ + idempotencyKey: + 'xion-mainnet-1:7:test-tx-hash:xion1watchedwallet:native:uxion:42:7:0', + }) + }) + + it('fails when no chain ID can be determined for idempotency keys', async () => { + extractor = new XionDepositWebhookExtractor({ + ...extractor.env, + config: { + ...extractor.env.config, + chainId: '', + }, + autoCosmWasmClient: {} as any, + }) + + const data: ExtractorHandleableData[] = [ + StargateMessageDataSource.handleable('bankTransfer', { + typeUrl: '/cosmos.bank.v1beta1.MsgSend', + messageIndex: 9, + value: { + fromAddress: 'xion1senderwallet', + toAddress: 'xion1watchedwallet', + amount: [{ denom: 'uxion', amount: '42' }], + }, + }), + ] + + await expect(extractor.extract(data)).rejects.toThrow( + 'Could not determine chainId required for deposit webhook idempotency keys.' + ) + }) + + it('uses deterministic unique names for same-block deposits', async () => { + const data: ExtractorHandleableData[] = [ + StargateMessageDataSource.handleable('bankTransfer', { + typeUrl: '/cosmos.bank.v1beta1.MsgSend', + messageIndex: 0, + value: { + fromAddress: 'xion1senderwallet', + toAddress: 'xion1watchedwallet', + amount: [{ denom: 'uxion', amount: '42' }], + }, + }), + StargateMessageDataSource.handleable('bankTransfer', { + typeUrl: '/cosmos.bank.v1beta1.MsgSend', + messageIndex: 1, + value: { + fromAddress: 'xion1senderwallet', + toAddress: 'xion1watchedwallet', + amount: [{ denom: 'uxion', amount: '42' }], + }, + }), + ] + + const result = (await extractor.extract(data)) as Extraction[] + + expect(result).toHaveLength(2) + expect(result[0].name).not.toBe(result[1].name) + }) + + it('ignores deposits that do not match watched wallets or allowed assets', async () => { + const data: ExtractorHandleableData[] = [ + StargateMessageDataSource.handleable('bankTransfer', { + typeUrl: '/cosmos.bank.v1beta1.MsgSend', + messageIndex: 0, + value: { + fromAddress: 'xion1senderwallet', + toAddress: 'xion1otherwallet', + amount: [{ denom: 'uxion', amount: '42' }], + }, + }), + IndexedWasmEventDataSource.handleable('cw20Transfer', { + address: 'xion1othercontract', + key: 'action', + value: 'transfer', + eventIndex: 1, + attributes: { + action: ['transfer'], + recipient: ['xion1watchedwallet'], + amount: ['1000000'], + }, + _attributes: [ + { key: '_contract_address', value: 'xion1othercontract' }, + { key: 'action', value: 'transfer' }, + { key: 'recipient', value: 'xion1watchedwallet' }, + { key: 'amount', value: '1000000' }, + ], + }), + ] + + await expect(extractor.extract(data)).resolves.toEqual([]) + }) + + it('supports snake_case bank message addresses', async () => { + const data: ExtractorHandleableData[] = [ + StargateMessageDataSource.handleable('bankTransfer', { + typeUrl: '/cosmos.bank.v1beta1.MsgSend', + messageIndex: 8, + value: { + from_address: 'xion1senderwallet', + to_address: 'xion1watchedwallet', + amount: [{ denom: 'uxion', amount: '42' }], + }, + }), + ] + + const result = (await extractor.extract(data)) as Extraction[] + + expect(result).toHaveLength(1) + expect(result[0].data).toMatchObject({ + sender: 'xion1senderwallet', + recipient: 'xion1watchedwallet', + idempotencyKey: + 'xion-testnet-1:7:test-tx-hash:xion1watchedwallet:native:uxion:42:8:0', + }) + }) + + it('returns no deposits when no registrations are enabled', async () => { + vi.mocked( + AccountDepositWebhookRegistration.getEnabledCached + ).mockResolvedValue([]) + + const data: ExtractorHandleableData[] = [ + StargateMessageDataSource.handleable('bankTransfer', { + typeUrl: '/cosmos.bank.v1beta1.MsgSend', + messageIndex: 0, + value: { + fromAddress: 'xion1senderwallet', + toAddress: 'xion1watchedwallet', + amount: [{ denom: 'uxion', amount: '42' }], + }, + }), + IndexedWasmEventDataSource.handleable('cw20Transfer', { + address: 'xion1stablecoincontract', + key: 'action', + value: 'transfer', + eventIndex: 1, + attributes: { + action: ['transfer'], + sender: ['xion1senderwallet'], + recipient: ['xion1watchedwallet'], + amount: ['1000000'], + }, + _attributes: [ + { key: '_contract_address', value: 'xion1stablecoincontract' }, + { key: 'action', value: 'transfer' }, + { key: 'sender', value: 'xion1senderwallet' }, + { key: 'recipient', value: 'xion1watchedwallet' }, + { key: 'amount', value: '1000000' }, + ], + }), + ] + + await expect(extractor.extract(data)).resolves.toEqual([]) + }) + + it('ignores malformed bank transfers and unsupported type URLs', async () => { + const data: ExtractorHandleableData[] = [ + StargateMessageDataSource.handleable('bankTransfer', { + typeUrl: '/cosmos.bank.v1beta1.MsgSend', + messageIndex: 10, + value: { + fromAddress: 'xion1senderwallet', + amount: [{ denom: 'uxion', amount: '42' }], + }, + }), + StargateMessageDataSource.handleable('bankTransfer', { + typeUrl: '/cosmos.bank.v1beta1.MsgSend', + messageIndex: 11, + value: { + fromAddress: 'xion1senderwallet', + toAddress: 'xion1watchedwallet', + amount: [{ denom: 'uxion' }], + }, + }), + StargateMessageDataSource.handleable('bankTransfer', { + typeUrl: '/cosmos.bank.v1beta1.MsgSend', + messageIndex: 12, + value: { + fromAddress: 'xion1senderwallet', + toAddress: 'xion1watchedwallet', + amount: '42', + }, + }), + StargateMessageDataSource.handleable('bankTransfer', { + typeUrl: '/cosmos.bank.v1beta1.MsgUnknown', + messageIndex: 13, + value: { + fromAddress: 'xion1senderwallet', + toAddress: 'xion1watchedwallet', + amount: [{ denom: 'uxion', amount: '42' }], + }, + }), + ] + + await expect(extractor.extract(data)).resolves.toEqual([]) + }) + + it('extracts MsgMultiSend deposits and nulls sender for multi-sender inputs', async () => { + const data: ExtractorHandleableData[] = [ + StargateMessageDataSource.handleable('bankTransfer', { + typeUrl: '/cosmos.bank.v1beta1.MsgMultiSend', + messageIndex: 3, + value: { + inputs: [ + { + address: 'xion1senderone', + coins: [{ denom: 'uxion', amount: '30' }], + }, + { + address: 'xion1sendertwo', + coins: [{ denom: 'uxion', amount: '12' }], + }, + ], + outputs: [ + { + address: 'xion1watchedwallet', + coins: [{ denom: 'uxion', amount: '42' }], + }, + ], + }, + }), + ] + + const result = (await extractor.extract(data)) as Extraction[] + + expect(result).toHaveLength(1) + expect(result[0].data).toEqual({ + registrationId: 7, + idempotencyKey: + 'xion-testnet-1:7:test-tx-hash:xion1watchedwallet:native:uxion:42:3:0:0', + wallet: 'xion1watchedwallet', + recipient: 'xion1watchedwallet', + sender: null, + amount: '42', + assetType: 'native', + denom: 'uxion', + contractAddress: null, + blockHeight: '12345', + blockTimeUnixMs: '1700000000000', + txHash: 'test-tx-hash', + }) + }) + + it('extracts MsgMultiSend deposits and preserves sender for single-sender inputs', async () => { + const data: ExtractorHandleableData[] = [ + StargateMessageDataSource.handleable('bankTransfer', { + typeUrl: '/cosmos.bank.v1beta1.MsgMultiSend', + messageIndex: 5, + value: { + inputs: [ + { + address: 'xion1senderone', + coins: [{ denom: 'uxion', amount: '42' }], + }, + ], + outputs: [ + { + address: 'xion1watchedwallet', + coins: [{ denom: 'uxion', amount: '42' }], + }, + ], + }, + }), + ] + + const result = (await extractor.extract(data)) as Extraction[] + + expect(result).toHaveLength(1) + expect(result[0].data).toEqual({ + registrationId: 7, + idempotencyKey: + 'xion-testnet-1:7:test-tx-hash:xion1watchedwallet:native:uxion:42:5:0:0', + wallet: 'xion1watchedwallet', + recipient: 'xion1watchedwallet', + sender: 'xion1senderone', + amount: '42', + assetType: 'native', + denom: 'uxion', + contractAddress: null, + blockHeight: '12345', + blockTimeUnixMs: '1700000000000', + txHash: 'test-tx-hash', + }) + }) + + it('extracts unique deposits for multiple watched outputs from one sender', async () => { + vi.spyOn( + AccountDepositWebhookRegistration, + 'getEnabledCached' + ).mockResolvedValue([ + makeRegistration({ + watchedWallets: ['xion1watchedwallet', 'xion1watchedwallettwo'], + }), + ]) + + const data: ExtractorHandleableData[] = [ + StargateMessageDataSource.handleable('bankTransfer', { + typeUrl: '/cosmos.bank.v1beta1.MsgMultiSend', + messageIndex: 6, + value: { + inputs: [ + { + address: 'xion1senderone', + coins: [{ denom: 'uxion', amount: '100' }], + }, + ], + outputs: [ + { + address: 'xion1watchedwallet', + coins: [{ denom: 'uxion', amount: '50' }], + }, + { + address: 'xion1watchedwallettwo', + coins: [{ denom: 'uxion', amount: '50' }], + }, + ], + }, + }), + ] + + const result = (await extractor.extract(data)) as Extraction[] + + expect(result).toHaveLength(2) + expect(result.map((event) => event.data)).toEqual([ + { + registrationId: 7, + idempotencyKey: + 'xion-testnet-1:7:test-tx-hash:xion1watchedwallet:native:uxion:50:6:0:0', + wallet: 'xion1watchedwallet', + recipient: 'xion1watchedwallet', + sender: 'xion1senderone', + amount: '50', + assetType: 'native', + denom: 'uxion', + contractAddress: null, + blockHeight: '12345', + blockTimeUnixMs: '1700000000000', + txHash: 'test-tx-hash', + }, + { + registrationId: 7, + idempotencyKey: + 'xion-testnet-1:7:test-tx-hash:xion1watchedwallettwo:native:uxion:50:6:1:0', + wallet: 'xion1watchedwallettwo', + recipient: 'xion1watchedwallettwo', + sender: 'xion1senderone', + amount: '50', + assetType: 'native', + denom: 'uxion', + contractAddress: null, + blockHeight: '12345', + blockTimeUnixMs: '1700000000000', + txHash: 'test-tx-hash', + }, + ]) + expect(result[0].name).not.toBe(result[1].name) + }) + + it('extracts cw20 transfer_from deposits using owner as sender fallback', async () => { + const data: ExtractorHandleableData[] = [ + IndexedWasmEventDataSource.handleable('cw20Transfer', { + address: 'xion1stablecoincontract', + key: 'action', + value: 'transfer_from', + eventIndex: 14, + attributes: { + action: ['transfer_from'], + owner: ['xion1ownerwallet'], + recipient: ['xion1watchedwallet'], + amount: ['1000000'], + }, + _attributes: [ + { key: '_contract_address', value: 'xion1stablecoincontract' }, + { key: 'action', value: 'transfer_from' }, + { key: 'owner', value: 'xion1ownerwallet' }, + { key: 'recipient', value: 'xion1watchedwallet' }, + { key: 'amount', value: '1000000' }, + ], + }), + ] + + const result = (await extractor.extract(data)) as Extraction[] + + expect(result).toHaveLength(1) + expect(result[0].data).toMatchObject({ + sender: 'xion1ownerwallet', + assetType: 'cw20', + contractAddress: 'xion1stablecoincontract', + idempotencyKey: + 'xion-testnet-1:7:test-tx-hash:xion1watchedwallet:cw20:xion1stablecoincontract:1000000:14', + }) + }) + + it('extracts cw20 send_from deposits using from as sender fallback', async () => { + const data: ExtractorHandleableData[] = [ + IndexedWasmEventDataSource.handleable('cw20Transfer', { + address: 'xion1stablecoincontract', + key: 'action', + value: 'send_from', + eventIndex: 15, + attributes: { + action: ['send_from'], + from: ['xion1fromwallet'], + recipient: ['xion1watchedwallet'], + amount: ['1000000'], + }, + _attributes: [ + { key: '_contract_address', value: 'xion1stablecoincontract' }, + { key: 'action', value: 'send_from' }, + { key: 'from', value: 'xion1fromwallet' }, + { key: 'recipient', value: 'xion1watchedwallet' }, + { key: 'amount', value: '1000000' }, + ], + }), + ] + + const result = (await extractor.extract(data)) as Extraction[] + + expect(result).toHaveLength(1) + expect(result[0].data).toMatchObject({ + sender: 'xion1fromwallet', + assetType: 'cw20', + contractAddress: 'xion1stablecoincontract', + idempotencyKey: + 'xion-testnet-1:7:test-tx-hash:xion1watchedwallet:cw20:xion1stablecoincontract:1000000:15', + }) + }) + + it('ignores malformed cw20 transfers missing recipient or amount', async () => { + const data: ExtractorHandleableData[] = [ + IndexedWasmEventDataSource.handleable('cw20Transfer', { + address: 'xion1stablecoincontract', + key: 'action', + value: 'transfer', + eventIndex: 16, + attributes: { + action: ['transfer'], + sender: ['xion1senderwallet'], + amount: ['1000000'], + }, + _attributes: [ + { key: '_contract_address', value: 'xion1stablecoincontract' }, + { key: 'action', value: 'transfer' }, + { key: 'sender', value: 'xion1senderwallet' }, + { key: 'amount', value: '1000000' }, + ], + }), + IndexedWasmEventDataSource.handleable('cw20Transfer', { + address: 'xion1stablecoincontract', + key: 'action', + value: 'transfer', + eventIndex: 17, + attributes: { + action: ['transfer'], + sender: ['xion1senderwallet'], + recipient: ['xion1watchedwallet'], + }, + _attributes: [ + { key: '_contract_address', value: 'xion1stablecoincontract' }, + { key: 'action', value: 'transfer' }, + { key: 'sender', value: 'xion1senderwallet' }, + { key: 'recipient', value: 'xion1watchedwallet' }, + ], + }), + ] + + await expect(extractor.extract(data)).resolves.toEqual([]) + }) +}) diff --git a/src/listener/extractors/xion/depositWebhook.ts b/src/listener/extractors/xion/depositWebhook.ts new file mode 100644 index 00000000..2dad2732 --- /dev/null +++ b/src/listener/extractors/xion/depositWebhook.ts @@ -0,0 +1,283 @@ +import { AccountDepositWebhookRegistration } from '@/db' +import { + ExtractorDataSource, + ExtractorHandler, + ExtractorHandlerOutput, +} from '@/types' + +import { + IndexedWasmEventData, + IndexedWasmEventDataSource, + StargateMessageData, + StargateMessageDataSource, +} from '../../sources' +import { Extractor } from '../base' + +export const DEPOSIT_WEBHOOK_EXTRACTION_PREFIX = 'xion/deposit_webhook:' + +export type DepositWebhookExtractionData = { + registrationId: number + idempotencyKey: string + wallet: string + recipient: string + sender: string | null + amount: string + assetType: 'native' | 'cw20' + denom: string | null + contractAddress: string | null + blockHeight: string + blockTimeUnixMs: string + txHash: string +} + +type Coin = { + denom: string + amount: string +} + +export const getDepositWebhookExtractionName = (idempotencyKey: string) => + `${DEPOSIT_WEBHOOK_EXTRACTION_PREFIX}${idempotencyKey}` + +const normalizeAddress = (address: string) => address.trim() + +const isNonEmptyString = (value: unknown): value is string => + typeof value === 'string' && value.length > 0 + +const asString = (value: unknown): string | undefined => + isNonEmptyString(value) ? value : undefined + +const asCoins = (value: unknown): Coin[] => + Array.isArray(value) + ? value.flatMap((coin) => + coin && + typeof coin === 'object' && + isNonEmptyString((coin as any).denom) && + isNonEmptyString((coin as any).amount) + ? [ + { + denom: (coin as any).denom, + amount: (coin as any).amount, + }, + ] + : [] + ) + : [] + +const getBankMessageAddresses = ( + value: Record +): { + fromAddress?: string + toAddress?: string +} => ({ + fromAddress: asString(value.fromAddress) ?? asString(value.from_address), + toAddress: asString(value.toAddress) ?? asString(value.to_address), +}) + +export class XionDepositWebhookExtractor extends Extractor { + static type = 'xion-deposit-webhook' + + static sources: ExtractorDataSource[] = [ + StargateMessageDataSource.source('bankTransfer', { + typeUrl: [ + '/cosmos.bank.v1beta1.MsgSend', + '/cosmos.bank.v1beta1.MsgMultiSend', + ], + }), + IndexedWasmEventDataSource.source('cw20Transfer', { + key: 'action', + // Include send/send_from because watched deposit destinations may be + // contracts as well as externally owned wallets. We still only emit when + // the configured watched recipient matches exactly. + value: ['transfer', 'transfer_from', 'send', 'send_from'], + otherAttributes: ['recipient', 'amount'], + }), + ] + + protected bankTransfer: ExtractorHandler = (data) => + this.extractBankTransfers(data) + + protected cw20Transfer: ExtractorHandler = (data) => + this.extractCw20Transfers(data) + + private get chainId(): string { + const chainId = + this.env.config.chainId || this.env.autoCosmWasmClient.chainId || '' + + if (!chainId) { + throw new Error( + 'Could not determine chainId required for deposit webhook idempotency keys.' + ) + } + + return chainId + } + + private makeExtraction( + registration: AccountDepositWebhookRegistration, + wallet: string, + sender: string | null, + amount: string, + assetType: 'native' | 'cw20', + assetReference: string, + uniqueIndex: string + ): ExtractorHandlerOutput { + const idempotencyKey = [ + this.chainId, + registration.id, + this.env.txHash, + normalizeAddress(wallet), + assetType, + assetReference, + amount, + uniqueIndex, + ].join(':') + + const data: DepositWebhookExtractionData = { + registrationId: registration.id, + idempotencyKey, + wallet, + recipient: wallet, + sender, + amount, + assetType, + denom: assetType === 'native' ? assetReference : null, + contractAddress: assetType === 'cw20' ? assetReference : null, + blockHeight: this.env.block.height, + blockTimeUnixMs: this.env.block.timeUnixMs, + txHash: this.env.txHash, + } + + return { + address: wallet, + name: getDepositWebhookExtractionName(idempotencyKey), + data, + } + } + + private async getRegistrations(): Promise< + AccountDepositWebhookRegistration[] + > { + return await AccountDepositWebhookRegistration.getEnabledCached() + } + + private async extractBankTransfers({ + typeUrl, + value, + messageIndex, + }: StargateMessageData): Promise { + const registrations = await this.getRegistrations() + if (registrations.length === 0) { + return [] + } + + if (typeUrl === '/cosmos.bank.v1beta1.MsgSend') { + const { fromAddress, toAddress } = getBankMessageAddresses(value) + if (!toAddress) { + return [] + } + + return registrations.flatMap((registration) => + asCoins(value.amount) + .filter(({ denom }) => + registration.matchesNativeDeposit(toAddress, denom) + ) + .map(({ denom, amount }, coinIndex) => + this.makeExtraction( + registration, + toAddress, + fromAddress ?? null, + amount, + 'native', + denom, + `${messageIndex}:${coinIndex}` + ) + ) + ) + } + + if (typeUrl === '/cosmos.bank.v1beta1.MsgMultiSend') { + const inputs = Array.isArray(value.inputs) ? value.inputs : [] + const outputs = Array.isArray(value.outputs) ? value.outputs : [] + + const inputAddresses = inputs + .map((input) => asString((input as any)?.address)) + .filter(isNonEmptyString) + const senders = [...new Set(inputAddresses)] + // Multi-send can aggregate multiple input addresses into the same output. + // When provenance is ambiguous, emit `null` instead of choosing one + // arbitrarily so downstream consumers do not over-trust the sender field. + const sender = senders.length === 1 ? senders[0] : null + + return outputs.flatMap((output, outputIndex) => { + const wallet = asString((output as any)?.address) + if (!wallet) { + return [] + } + + return registrations.flatMap((registration) => + asCoins((output as any)?.coins) + .filter(({ denom }) => + registration.matchesNativeDeposit(wallet, denom) + ) + .map(({ denom, amount }, coinIndex) => + this.makeExtraction( + registration, + wallet, + sender, + amount, + 'native', + denom, + `${messageIndex}:${outputIndex}:${coinIndex}` + ) + ) + ) + }) + } + + return [] + } + + private async extractCw20Transfers({ + address, + attributes, + eventIndex, + }: IndexedWasmEventData): Promise { + const registrations = await this.getRegistrations() + if (registrations.length === 0) { + return [] + } + + const contractAddress = normalizeAddress(address) + const wallet = attributes.recipient?.[0] + if (!wallet) { + return [] + } + + const amount = attributes.amount?.[0] + if (!amount) { + return [] + } + + const sender = + attributes.sender?.[0] ?? + attributes.owner?.[0] ?? + attributes.from?.[0] ?? + null + + return registrations + .filter((registration) => + registration.matchesCw20Deposit(wallet, contractAddress) + ) + .map((registration) => + this.makeExtraction( + registration, + wallet, + sender, + amount, + 'cw20', + contractAddress, + `${eventIndex}` + ) + ) + } +} diff --git a/src/listener/extractors/xion/index.ts b/src/listener/extractors/xion/index.ts index 2deed416..1bd01273 100644 --- a/src/listener/extractors/xion/index.ts +++ b/src/listener/extractors/xion/index.ts @@ -1,2 +1,3 @@ export * from './asset' export * from './marketplace' +export { XionDepositWebhookExtractor } from './depositWebhook' diff --git a/src/listener/sources/IndexedWasmEvent.ts b/src/listener/sources/IndexedWasmEvent.ts new file mode 100644 index 00000000..0cae42a3 --- /dev/null +++ b/src/listener/sources/IndexedWasmEvent.ts @@ -0,0 +1,155 @@ +import { + DataSourceData, + ExtractableTxInput, + ExtractorDataSource, + ExtractorHandleableData, +} from '@/types' + +import { DataSource } from './base' + +export type IndexedWasmEventDataSourceConfig = { + /** + * The key or keys to match. + */ + key: string | string[] + /** + * The value or values to match. + */ + value: string | string[] + /** + * Other attributes to ensure are present. + */ + otherAttributes?: string[] +} + +export type IndexedWasmEventData = { + /** + * The address of the contract that emitted the event. + */ + address: string + /** + * The key of the matched attribute. + */ + key: string + /** + * The value of the matched attribute. + */ + value: string + /** + * The position of the wasm event within the transaction. + */ + eventIndex: number + /** + * A map of attribute key to all values. + */ + attributes: Partial> + /** + * The raw event attributes. + */ + _attributes: { + key: string + value: string + }[] +} + +export class IndexedWasmEventDataSource extends DataSource< + IndexedWasmEventDataSourceConfig, + IndexedWasmEventData +> { + static get type(): string { + return 'wasm/indexed-event' + } + + static source( + handler: string, + config: IndexedWasmEventDataSourceConfig + ): ExtractorDataSource { + return { + type: this.type, + handler, + config, + } + } + + static handleable( + handler: string, + data: IndexedWasmEventData + ): ExtractorHandleableData { + return { + source: this.type, + handler, + data, + } + } + + static data( + data: Omit + ): DataSourceData { + return { + source: this.type, + data: { + ...data, + attributes: data._attributes.reduce( + (acc, { key, value }) => ({ + ...acc, + [key]: [...(acc[key] || []), value], + }), + {} as Record + ), + }, + } + } + + private equalsOrContains(a: string | string[], b: string): boolean { + return Array.isArray(a) ? a.includes(b) : a === b + } + + match({ events }: ExtractableTxInput): IndexedWasmEventData[] { + return events.flatMap(({ type, attributes }, eventIndex) => + type === 'wasm' && + attributes.some( + ({ key, value }) => key === '_contract_address' && value.length > 0 + ) && + (!this.config.otherAttributes || + this.config.otherAttributes.every((otherKey) => + attributes.some(({ key }) => key === otherKey) + )) + ? attributes.flatMap(({ key, value }) => + this.equalsOrContains(this.config.key, key) && + this.equalsOrContains(this.config.value, value) + ? { + address: attributes.find( + ({ key }) => key === '_contract_address' + )!.value, + key, + value, + eventIndex, + attributes: attributes.reduce( + (acc, { key, value }) => ({ + ...acc, + [key]: [...(acc[key] || []), value], + }), + {} as Record + ), + _attributes: [...attributes], + } + : [] + ) + : [] + ) + } + + isOurData(data: IndexedWasmEventData): boolean { + return ( + this.equalsOrContains(this.config.key, data.key) && + this.equalsOrContains(this.config.value, data.value) && + (!this.config.otherAttributes || + this.config.otherAttributes.every( + (otherKey) => + otherKey in data.attributes && + Array.isArray(data.attributes[otherKey]) && + data.attributes[otherKey]!.length > 0 + )) + ) + } +} diff --git a/src/listener/sources/StargateMessage.test.ts b/src/listener/sources/StargateMessage.test.ts new file mode 100644 index 00000000..3d500d9b --- /dev/null +++ b/src/listener/sources/StargateMessage.test.ts @@ -0,0 +1,68 @@ +import { describe, expect, it } from 'vitest' + +import { StargateMessageDataSource } from './StargateMessage' + +describe('StargateMessageDataSource', () => { + it('matches decoded stargate messages and preserves message index', () => { + const source = new StargateMessageDataSource({ + typeUrl: '/cosmos.bank.v1beta1.MsgSend', + }) + + const matches = source.match({ + hash: 'tx-hash', + messages: [ + { + typeUrl: '/cosmos.bank.v1beta1.MsgDelegate', + value: { + delegatorAddress: 'xion1delegator', + }, + }, + { + typeUrl: '/cosmos.bank.v1beta1.MsgSend', + value: { + fromAddress: 'xion1sender', + toAddress: 'xion1recipient', + amount: [ + { + denom: 'uxion', + amount: '42', + }, + ], + }, + }, + ] as any, + events: [], + }) + + expect(matches).toEqual([ + { + typeUrl: '/cosmos.bank.v1beta1.MsgSend', + value: { + fromAddress: 'xion1sender', + toAddress: 'xion1recipient', + amount: [ + { + denom: 'uxion', + amount: '42', + }, + ], + }, + messageIndex: 1, + }, + ]) + }) + + it('ignores non-decoded messages', () => { + const source = new StargateMessageDataSource({ + typeUrl: ['/cosmos.bank.v1beta1.MsgSend'], + }) + + expect( + source.match({ + hash: 'tx-hash', + messages: [{ foo: 'bar' }, null] as any, + events: [], + }) + ).toEqual([]) + }) +}) diff --git a/src/listener/sources/StargateMessage.ts b/src/listener/sources/StargateMessage.ts new file mode 100644 index 00000000..627a1ee5 --- /dev/null +++ b/src/listener/sources/StargateMessage.ts @@ -0,0 +1,107 @@ +import { + DataSourceData, + ExtractableTxInput, + ExtractorDataSource, + ExtractorHandleableData, +} from '@/types' + +import { DataSource } from './base' + +export type StargateMessageDataSourceConfig = { + /** + * The type URL or URLs to match. + */ + typeUrl: string | string[] +} + +export type StargateMessageData = { + /** + * The protobuf type URL. + */ + typeUrl: string + /** + * The decoded message value. + */ + value: Record + /** + * The position of the message within the transaction. + */ + messageIndex: number +} + +type DecodedMessage = { + typeUrl: string + value: Record +} + +export class StargateMessageDataSource extends DataSource< + StargateMessageDataSourceConfig, + StargateMessageData +> { + static get type(): string { + return 'stargate/message' + } + + static source( + handler: string, + config: StargateMessageDataSourceConfig + ): ExtractorDataSource { + return { + type: this.type, + handler, + config, + } + } + + static handleable( + handler: string, + data: StargateMessageData + ): ExtractorHandleableData { + return { + source: this.type, + handler, + data, + } + } + + static data(data: StargateMessageData): DataSourceData { + return { + source: this.type, + data, + } + } + + private equalsOrContains(a: string | string[], b: string): boolean { + return Array.isArray(a) ? a.includes(b) : a === b + } + + private isDecodedMessage(message: unknown): message is DecodedMessage { + return ( + !!message && + typeof message === 'object' && + 'typeUrl' in message && + typeof message.typeUrl === 'string' && + 'value' in message && + !!message.value && + typeof message.value === 'object' && + !Array.isArray(message.value) + ) + } + + match({ messages }: ExtractableTxInput): StargateMessageData[] { + return messages.flatMap((message, messageIndex) => + this.isDecodedMessage(message) && + this.equalsOrContains(this.config.typeUrl, message.typeUrl) + ? { + typeUrl: message.typeUrl, + value: message.value, + messageIndex, + } + : [] + ) + } + + isOurData(data: StargateMessageData): boolean { + return this.equalsOrContains(this.config.typeUrl, data.typeUrl) + } +} diff --git a/src/listener/sources/index.ts b/src/listener/sources/index.ts index 5c4e127d..09dae8fd 100644 --- a/src/listener/sources/index.ts +++ b/src/listener/sources/index.ts @@ -1,10 +1,16 @@ +export * from './IndexedWasmEvent' +export * from './StargateMessage' export * from './WasmEvent' export * from './WasmInstantiateOrMigrate' +import { IndexedWasmEventDataSource } from './IndexedWasmEvent' +import { StargateMessageDataSource } from './StargateMessage' import { WasmEventDataSource } from './WasmEvent' import { WasmInstantiateOrMigrateDataSource } from './WasmInstantiateOrMigrate' const _getDataSources = () => [ + IndexedWasmEventDataSource, + StargateMessageDataSource, WasmEventDataSource, WasmInstantiateOrMigrateDataSource, // Add more data sources here. diff --git a/src/queues/queues/extract.test.ts b/src/queues/queues/extract.test.ts index 4e25a907..7c7d9203 100644 --- a/src/queues/queues/extract.test.ts +++ b/src/queues/queues/extract.test.ts @@ -2,7 +2,7 @@ import { Job } from 'bullmq' import { Mock, beforeEach, describe, expect, it, vi } from 'vitest' import { ConfigManager } from '@/config' -import { Extraction } from '@/db' +import { AccountDepositWebhookRegistration, Extraction } from '@/db' import * as listenerModule from '@/listener' import * as search from '@/search' import { AutoCosmWasmClient } from '@/utils' @@ -59,6 +59,10 @@ describe('ExtractQueue', () => { vi.spyOn(AutoCosmWasmClient.prototype, 'getValidClient').mockResolvedValue( {} as any ) + vi.spyOn( + AccountDepositWebhookRegistration, + 'ensureActiveRegistrationsCacheSubscription' + ).mockResolvedValue() vi.spyOn(utils, 'getContractInfo').mockImplementation(vi.fn()) // Create extract queue diff --git a/src/queues/queues/extract.ts b/src/queues/queues/extract.ts index d1ff639a..ea3cbdbb 100644 --- a/src/queues/queues/extract.ts +++ b/src/queues/queues/extract.ts @@ -1,7 +1,7 @@ import { Job, Queue } from 'bullmq' import { Sequelize } from 'sequelize' -import { Block, State } from '@/db' +import { AccountDepositWebhookRegistration, Block, State } from '@/db' import { getExtractorMap } from '@/listener' import { queueMeilisearchIndexUpdates } from '@/search' import { ExtractorEnv, ExtractorHandleableData } from '@/types' @@ -38,6 +38,7 @@ export class ExtractQueue extends BaseQueue { this.options.config.remoteRpc ) await this.autoCosmWasmClient.update() + await AccountDepositWebhookRegistration.ensureActiveRegistrationsCacheSubscription() } async process(job: Job) { diff --git a/src/queues/queues/webhooks.test.ts b/src/queues/queues/webhooks.test.ts new file mode 100644 index 00000000..8937c117 --- /dev/null +++ b/src/queues/queues/webhooks.test.ts @@ -0,0 +1,90 @@ +import { Job } from 'bullmq' +import { beforeEach, describe, expect, it, vi } from 'vitest' + +import { PendingWebhook, WebhookType } from '@/types' + +const { mockedAxios } = vi.hoisted(() => ({ + mockedAxios: vi.fn(), +})) + +vi.mock('axios', () => ({ + __esModule: true, + default: mockedAxios, +})) + +describe('WebhooksQueue', () => { + let WebhooksQueue: typeof import('./webhooks').WebhooksQueue + + beforeEach(async () => { + vi.resetModules() + vi.clearAllMocks() + mockedAxios.mockResolvedValue({ + status: 200, + data: {}, + headers: {}, + } as any) + ;({ WebhooksQueue } = await import('./webhooks')) + }) + + it('applies configured HTTP timeouts to outbound webhooks', async () => { + const queue = new WebhooksQueue({ + config: { + webhookTimeoutMs: 4321, + } as any, + sendWebhooks: true, + }) + + await queue.process({ + data: { + eventType: 'Extraction', + eventId: 1, + endpoint: { + type: WebhookType.Url, + url: 'https://partner.example/deposits', + method: 'POST', + }, + value: { + hello: 'world', + }, + }, + } as Job) + + expect(mockedAxios).toHaveBeenCalledWith( + 'https://partner.example/deposits', + expect.objectContaining({ + method: 'POST', + timeout: 4321, + data: { + hello: 'world', + }, + }) + ) + }) + + it('falls back to the default webhook timeout', async () => { + const queue = new WebhooksQueue({ + config: {} as any, + sendWebhooks: true, + }) + + await queue.process({ + data: { + eventType: 'Extraction', + eventId: 1, + endpoint: { + type: WebhookType.Url, + url: 'https://partner.example/deposits', + method: 'POST', + }, + value: {}, + }, + } as Job) + + expect(mockedAxios).toHaveBeenCalledWith( + 'https://partner.example/deposits', + expect.objectContaining({ + timeout: 15000, + }) + ) + }) +}) diff --git a/src/queues/queues/webhooks.ts b/src/queues/queues/webhooks.ts index 5f1afce3..8c73cc14 100644 --- a/src/queues/queues/webhooks.ts +++ b/src/queues/queues/webhooks.ts @@ -27,6 +27,7 @@ export class WebhooksQueue extends BaseQueue { case WebhookType.Url: { await axios(endpoint.url, { method: endpoint.method, + timeout: this.options.config.webhookTimeoutMs ?? 15000, // https://stackoverflow.com/a/74735197 headers: { 'Accept-Encoding': 'gzip,deflate,compress', diff --git a/src/scripts/workers.ts b/src/scripts/workers.ts index f2fffe92..3b1e1025 100644 --- a/src/scripts/workers.ts +++ b/src/scripts/workers.ts @@ -2,7 +2,7 @@ import * as Sentry from '@sentry/node' import { Command } from 'commander' import { ConfigManager, testRedisConnection } from '@/config' -import { State, loadDb } from '@/db' +import { AccountDepositWebhookRegistration, State, loadDb } from '@/db' import { QueueOptions, queues } from '@/queues' import { WasmCodeService } from '@/services/wasm-codes' import { DbType } from '@/types' @@ -102,8 +102,11 @@ const main = async () => { WasmCodeService.instance.stopUpdater() // Close DB connections. - await dataSequelize.close() - await accountsSequelize.close() + await Promise.all([ + dataSequelize.close(), + accountsSequelize.close(), + AccountDepositWebhookRegistration.closeActiveRegistrationsCacheSubscription(), + ]) // Exit. process.exit(0) diff --git a/src/server/routes/account/createDepositWebhookRegistration.ts b/src/server/routes/account/createDepositWebhookRegistration.ts new file mode 100644 index 00000000..8ac7d411 --- /dev/null +++ b/src/server/routes/account/createDepositWebhookRegistration.ts @@ -0,0 +1,62 @@ +import Router from '@koa/router' +import { DefaultContext } from 'koa' + +import { + AccountDepositWebhookRegistration, + AccountDepositWebhookRegistrationApiJson, +} from '@/db' + +import { validateAndNormalizeDepositWebhookRegistration } from './depositWebhookRegistrationUtils' +import { AccountState } from './types' + +type CreateDepositWebhookRegistrationRequest = Pick< + AccountDepositWebhookRegistration, + | 'description' + | 'endpointUrl' + | 'authHeader' + | 'authToken' + | 'watchedWallets' + | 'allowedNativeDenoms' + | 'allowedCw20Contracts' + | 'enabled' +> + +type CreateDepositWebhookRegistrationResponse = + | { + registration: AccountDepositWebhookRegistrationApiJson + } + | { + error: string + } + +export const createDepositWebhookRegistration: Router.Middleware< + AccountState, + DefaultContext, + CreateDepositWebhookRegistrationResponse +> = async (ctx) => { + const body: CreateDepositWebhookRegistrationRequest = ctx.request.body + + const validation = validateAndNormalizeDepositWebhookRegistration({ + body, + requireAll: true, + }) + if ('error' in validation) { + ctx.status = 400 + ctx.body = validation + return + } + + const registration = + await ctx.state.account.$create( + 'depositWebhookRegistration', + { + ...validation.normalized, + enabled: validation.normalized.enabled ?? true, + } + ) + + ctx.status = 201 + ctx.body = { + registration: registration.apiJson, + } +} diff --git a/src/server/routes/account/deleteDepositWebhookRegistration.ts b/src/server/routes/account/deleteDepositWebhookRegistration.ts new file mode 100644 index 00000000..f289ccfc --- /dev/null +++ b/src/server/routes/account/deleteDepositWebhookRegistration.ts @@ -0,0 +1,36 @@ +import Router from '@koa/router' +import { DefaultContext } from 'koa' + +import { AccountDepositWebhookRegistration } from '@/db' + +import { AccountState } from './types' + +type DeleteDepositWebhookRegistrationResponse = + | undefined + | { + error: string + } + +export const deleteDepositWebhookRegistration: Router.Middleware< + AccountState, + DefaultContext, + DeleteDepositWebhookRegistrationResponse +> = async (ctx) => { + const registration = await AccountDepositWebhookRegistration.findOne({ + where: { + id: ctx.params.id, + accountPublicKey: ctx.state.account.publicKey, + }, + }) + + if (!registration) { + ctx.status = 404 + ctx.body = { + error: 'Deposit webhook registration not found.', + } + return + } + + await registration.destroy() + ctx.status = 204 +} diff --git a/src/server/routes/account/depositWebhookRegistrationUtils.ts b/src/server/routes/account/depositWebhookRegistrationUtils.ts new file mode 100644 index 00000000..b5114827 --- /dev/null +++ b/src/server/routes/account/depositWebhookRegistrationUtils.ts @@ -0,0 +1,166 @@ +import { AccountDepositWebhookRegistration } from '@/db' + +type NormalizedRegistrationInput = { + description?: string | null + endpointUrl?: string + authHeader?: string | null + authToken?: string | null + watchedWallets?: string[] + allowedNativeDenoms?: string[] + allowedCw20Contracts?: string[] + enabled?: boolean +} + +const normalizeString = (value: unknown): string | undefined => + typeof value === 'string' && value.trim().length > 0 + ? value.trim() + : undefined + +const normalizeOptionalString = (value: unknown): string | null | undefined => + value === undefined + ? undefined + : value === null + ? null + : normalizeString(value) || null + +const normalizeStringArray = ( + value: unknown, + trim = true +): string[] | undefined => + Array.isArray(value) + ? value + .map((item) => + typeof item === 'string' ? (trim ? item.trim() : item) : '' + ) + .filter(Boolean) + : undefined + +export const validateAndNormalizeDepositWebhookRegistration = ({ + body, + requireAll = false, +}: { + body: Partial + requireAll?: boolean +}): + | { + error: string + } + | { + normalized: NormalizedRegistrationInput + } => { + const description = normalizeOptionalString(body.description) + if ( + description !== undefined && + description !== null && + description.length > 255 + ) { + return { + error: 'Description too long.', + } + } + + const endpointUrl = normalizeString(body.endpointUrl) + if ((requireAll || 'endpointUrl' in body) && !endpointUrl) { + return { + error: 'Invalid endpoint URL.', + } + } + if (endpointUrl) { + try { + new URL(endpointUrl) + } catch { + return { + error: 'Invalid endpoint URL.', + } + } + } + + const authHeader = normalizeOptionalString(body.authHeader) + if ('authHeader' in body && body.authHeader !== null && authHeader === null) { + return { + error: 'Invalid auth header.', + } + } + + const authToken = normalizeOptionalString(body.authToken) + + const watchedWallets = normalizeStringArray(body.watchedWallets) + if ((requireAll || 'watchedWallets' in body) && !watchedWallets?.length) { + return { + error: 'At least one watched wallet is required.', + } + } + + const allowedNativeDenoms = normalizeStringArray(body.allowedNativeDenoms) + if ('allowedNativeDenoms' in body && allowedNativeDenoms === undefined) { + return { + error: 'Invalid native denoms.', + } + } + + const allowedCw20Contracts = normalizeStringArray(body.allowedCw20Contracts) + if ('allowedCw20Contracts' in body && allowedCw20Contracts === undefined) { + return { + error: 'Invalid CW20 contracts.', + } + } + + const hasNativeDenoms = + allowedNativeDenoms !== undefined + ? allowedNativeDenoms.length > 0 + : undefined + const hasCw20Contracts = + allowedCw20Contracts !== undefined + ? allowedCw20Contracts.length > 0 + : undefined + + if ( + (requireAll && + !( + (allowedNativeDenoms?.length || 0) > 0 || + (allowedCw20Contracts?.length || 0) > 0 + )) || + (('allowedNativeDenoms' in body || 'allowedCw20Contracts' in body) && + hasNativeDenoms === false && + hasCw20Contracts === false) + ) { + return { + error: 'At least one allowed asset filter is required.', + } + } + + if ('enabled' in body && typeof body.enabled !== 'boolean') { + return { + error: 'Invalid enabled flag.', + } + } + + return { + normalized: { + ...(description !== undefined && { + description, + }), + ...(endpointUrl !== undefined && { + endpointUrl, + }), + ...(authHeader !== undefined && { + authHeader, + }), + ...(authToken !== undefined && { + authToken, + }), + ...(watchedWallets !== undefined && { + watchedWallets, + }), + ...(allowedNativeDenoms !== undefined && { + allowedNativeDenoms, + }), + ...(allowedCw20Contracts !== undefined && { + allowedCw20Contracts, + }), + ...('enabled' in body && { + enabled: body.enabled, + }), + }, + } +} diff --git a/src/server/routes/account/index.ts b/src/server/routes/account/index.ts index ac75a506..b0e27372 100644 --- a/src/server/routes/account/index.ts +++ b/src/server/routes/account/index.ts @@ -3,21 +3,25 @@ import { koaBody } from 'koa-body' import { authMiddleware } from './auth' import { createCodeIdSet } from './createCodeIdSet' +import { createDepositWebhookRegistration } from './createDepositWebhookRegistration' import { createKey } from './createKey' import { createWebhook } from './createWebhook' import { deleteCodeIdSet } from './deleteCodeIdSet' +import { deleteDepositWebhookRegistration } from './deleteDepositWebhookRegistration' import { deleteWebhook } from './deleteWebhook' import { fireWebhookEvent } from './fireWebhookEvent' import { getConfig } from './getConfig' import { getNonce } from './getNonce' import { getWebhookEvents } from './getWebhookEvents' import { listCodeIdSets } from './listCodeIdSets' +import { listDepositWebhookRegistrations } from './listDepositWebhookRegistrations' import { listKeys } from './listKeys' import { listWebhooks } from './listWebhooks' import { login } from './login' import { paymentWebhook } from './paymentWebhook' import { resetKey } from './resetKey' import { updateCodeIdSet } from './updateCodeIdSet' +import { updateDepositWebhookRegistration } from './updateDepositWebhookRegistration' import { updateWebhook } from './updateWebhook' export const accountRouter = new Router() @@ -53,6 +57,30 @@ accountRouter.post('/keys/:id/reset', resetKey) // List code ID sets. accountRouter.get('/code-id-sets', listCodeIdSets) +// List deposit webhook registrations. +accountRouter.get( + '/deposit-webhook-registrations', + listDepositWebhookRegistrations +) + +// Create deposit webhook registration. +accountRouter.post( + '/deposit-webhook-registrations', + createDepositWebhookRegistration +) + +// Update deposit webhook registration. +accountRouter.patch( + '/deposit-webhook-registrations/:id', + updateDepositWebhookRegistration +) + +// Delete deposit webhook registration. +accountRouter.delete( + '/deposit-webhook-registrations/:id', + deleteDepositWebhookRegistration +) + // Create new code ID set. accountRouter.post('/code-id-sets', createCodeIdSet) diff --git a/src/server/routes/account/listDepositWebhookRegistrations.ts b/src/server/routes/account/listDepositWebhookRegistrations.ts new file mode 100644 index 00000000..be8c3d92 --- /dev/null +++ b/src/server/routes/account/listDepositWebhookRegistrations.ts @@ -0,0 +1,28 @@ +import Router from '@koa/router' +import { DefaultContext } from 'koa' + +import { AccountDepositWebhookRegistrationApiJson } from '@/db' + +import { AccountState } from './types' + +type ListDepositWebhookRegistrationsResponse = { + registrations: AccountDepositWebhookRegistrationApiJson[] +} + +export const listDepositWebhookRegistrations: Router.Middleware< + AccountState, + DefaultContext, + ListDepositWebhookRegistrationsResponse +> = async (ctx) => { + const registrations = await ctx.state.account.$get( + 'depositWebhookRegistrations', + { + order: [['id', 'ASC']], + } + ) + + ctx.status = 200 + ctx.body = { + registrations: registrations.map((registration) => registration.apiJson), + } +} diff --git a/src/server/routes/account/updateDepositWebhookRegistration.ts b/src/server/routes/account/updateDepositWebhookRegistration.ts new file mode 100644 index 00000000..67cdc112 --- /dev/null +++ b/src/server/routes/account/updateDepositWebhookRegistration.ts @@ -0,0 +1,96 @@ +import Router from '@koa/router' +import { DefaultContext } from 'koa' + +import { + AccountDepositWebhookRegistration, + AccountDepositWebhookRegistrationApiJson, +} from '@/db' + +import { validateAndNormalizeDepositWebhookRegistration } from './depositWebhookRegistrationUtils' +import { AccountState } from './types' + +type UpdateDepositWebhookRegistrationRequest = Pick< + AccountDepositWebhookRegistration, + | 'description' + | 'endpointUrl' + | 'authHeader' + | 'authToken' + | 'watchedWallets' + | 'allowedNativeDenoms' + | 'allowedCw20Contracts' + | 'enabled' +> + +type UpdateDepositWebhookRegistrationResponse = + | { + registration: AccountDepositWebhookRegistrationApiJson + } + | { + error: string + } + +export const updateDepositWebhookRegistration: Router.Middleware< + AccountState, + DefaultContext, + UpdateDepositWebhookRegistrationResponse +> = async (ctx) => { + const registration = await AccountDepositWebhookRegistration.findOne({ + where: { + id: ctx.params.id, + accountPublicKey: ctx.state.account.publicKey, + }, + }) + + if (!registration) { + ctx.status = 404 + ctx.body = { + error: 'Deposit webhook registration not found.', + } + return + } + + const body: UpdateDepositWebhookRegistrationRequest = ctx.request.body + const validation = validateAndNormalizeDepositWebhookRegistration({ + body, + }) + if ('error' in validation) { + ctx.status = 400 + ctx.body = validation + return + } + + // Validate the final registration state if any asset filters are modified. + const nextAllowedNativeDenoms = + validation.normalized.allowedNativeDenoms ?? + registration.allowedNativeDenoms + const nextAllowedCw20Contracts = + validation.normalized.allowedCw20Contracts ?? + registration.allowedCw20Contracts + if ( + nextAllowedNativeDenoms.length === 0 && + nextAllowedCw20Contracts.length === 0 + ) { + ctx.status = 400 + ctx.body = { + error: 'At least one allowed asset filter is required.', + } + return + } + + const nextWatchedWallets = + validation.normalized.watchedWallets ?? registration.watchedWallets + if (nextWatchedWallets.length === 0) { + ctx.status = 400 + ctx.body = { + error: 'At least one watched wallet is required.', + } + return + } + + await registration.update(validation.normalized) + + ctx.status = 200 + ctx.body = { + registration: registration.apiJson, + } +} diff --git a/src/server/test/account/createDepositWebhookRegistration.test.ts b/src/server/test/account/createDepositWebhookRegistration.test.ts new file mode 100644 index 00000000..b32cd333 --- /dev/null +++ b/src/server/test/account/createDepositWebhookRegistration.test.ts @@ -0,0 +1,94 @@ +import request from 'supertest' +import { beforeEach, describe, expect, it } from 'vitest' + +import { Account, AccountDepositWebhookRegistration } from '@/db' +import { getAccountWithAuth } from '@/test/utils' + +import { app } from './app' + +describe('POST /deposit-webhook-registrations', () => { + let account: Account + let token: string + + beforeEach(async () => { + const { account: _account, token: _token } = await getAccountWithAuth() + account = _account + token = _token + }) + + it('returns error if no auth token', async () => { + await request(app.callback()) + .post('/deposit-webhook-registrations') + .send({}) + .expect(401) + .expect({ + error: 'No token.', + }) + }) + + it('validates required fields', async () => { + await request(app.callback()) + .post('/deposit-webhook-registrations') + .set('Authorization', `Bearer ${token}`) + .send({}) + .expect(400) + .expect({ + error: 'Invalid endpoint URL.', + }) + + await request(app.callback()) + .post('/deposit-webhook-registrations') + .set('Authorization', `Bearer ${token}`) + .send({ + endpointUrl: 'https://partner.example/deposits', + }) + .expect(400) + .expect({ + error: 'At least one watched wallet is required.', + }) + + await request(app.callback()) + .post('/deposit-webhook-registrations') + .set('Authorization', `Bearer ${token}`) + .send({ + endpointUrl: 'https://partner.example/deposits', + watchedWallets: ['xion1watchedwallet'], + }) + .expect(400) + .expect({ + error: 'At least one allowed asset filter is required.', + }) + }) + + it('creates a registration', async () => { + const response = await request(app.callback()) + .post('/deposit-webhook-registrations') + .set('Authorization', `Bearer ${token}`) + .send({ + description: 'Sandbox deposit listener', + endpointUrl: 'https://partner.example/deposits', + authHeader: 'Authorization', + authToken: 'secret-token', + watchedWallets: ['xion1watchedwallet'], + allowedNativeDenoms: ['uxion'], + allowedCw20Contracts: ['xion1stablecoincontract'], + }) + .expect(201) + + expect(response.body.registration).toMatchObject({ + description: 'Sandbox deposit listener', + endpointUrl: 'https://partner.example/deposits', + authHeader: 'Authorization', + authToken: 'secret-token', + watchedWallets: ['xion1watchedwallet'], + allowedNativeDenoms: ['uxion'], + allowedCw20Contracts: ['xion1stablecoincontract'], + enabled: true, + }) + + const registrations = await account.$get('depositWebhookRegistrations') + expect(registrations).toHaveLength(1) + expect(registrations[0].apiJson).toEqual(response.body.registration) + expect(await AccountDepositWebhookRegistration.count()).toBe(1) + }) +}) diff --git a/src/server/test/account/deleteDepositWebhookRegistration.test.ts b/src/server/test/account/deleteDepositWebhookRegistration.test.ts new file mode 100644 index 00000000..130d6d6b --- /dev/null +++ b/src/server/test/account/deleteDepositWebhookRegistration.test.ts @@ -0,0 +1,84 @@ +import request from 'supertest' +import { beforeEach, describe, expect, it } from 'vitest' + +import { AccountDepositWebhookRegistration } from '@/db' +import { getAccountWithAuth } from '@/test/utils' + +import { app } from './app' + +describe('DELETE /deposit-webhook-registrations/:id', () => { + let token: string + let registration: AccountDepositWebhookRegistration + + beforeEach(async () => { + const { account, token: _token } = await getAccountWithAuth() + + token = _token + + registration = await account.$create( + 'depositWebhookRegistration', + { + description: 'Sandbox deposit listener', + endpointUrl: 'https://partner.example/deposits', + watchedWallets: ['xion1watchedwallet'], + allowedNativeDenoms: ['uxion'], + allowedCw20Contracts: [], + } + ) + }) + + it('returns error if no auth token', async () => { + await request(app.callback()) + .delete(`/deposit-webhook-registrations/${registration.id}`) + .expect(401) + .expect({ + error: 'No token.', + }) + }) + + it('returns error if registration does not exist', async () => { + await request(app.callback()) + .delete(`/deposit-webhook-registrations/${registration.id + 1}`) + .set('Authorization', `Bearer ${token}`) + .expect(404) + .expect({ + error: 'Deposit webhook registration not found.', + }) + }) + + it('returns error if registration is owned by another account', async () => { + const { account: anotherAccount } = await getAccountWithAuth() + const anotherRegistration = + await anotherAccount.$create( + 'depositWebhookRegistration', + { + description: 'Other', + endpointUrl: 'https://other.example/deposits', + watchedWallets: ['xion1otherwallet'], + allowedNativeDenoms: ['uxion'], + allowedCw20Contracts: [], + } + ) + + await request(app.callback()) + .delete(`/deposit-webhook-registrations/${anotherRegistration.id}`) + .set('Authorization', `Bearer ${token}`) + .expect(404) + .expect({ + error: 'Deposit webhook registration not found.', + }) + }) + + it('deletes registration', async () => { + const initialCount = await AccountDepositWebhookRegistration.count() + + await request(app.callback()) + .delete(`/deposit-webhook-registrations/${registration.id}`) + .set('Authorization', `Bearer ${token}`) + .expect(204) + + expect(await AccountDepositWebhookRegistration.count()).toBe( + initialCount - 1 + ) + }) +}) diff --git a/src/server/test/account/listDepositWebhookRegistrations.test.ts b/src/server/test/account/listDepositWebhookRegistrations.test.ts new file mode 100644 index 00000000..9f0ac1a3 --- /dev/null +++ b/src/server/test/account/listDepositWebhookRegistrations.test.ts @@ -0,0 +1,63 @@ +import request from 'supertest' +import { beforeEach, describe, it } from 'vitest' + +import { Account, AccountDepositWebhookRegistration } from '@/db' +import { getAccountWithAuth } from '@/test/utils' + +import { app } from './app' + +describe('GET /deposit-webhook-registrations', () => { + let account: Account + let token: string + + beforeEach(async () => { + const { account: _account, token: _token } = await getAccountWithAuth() + account = _account + token = _token + }) + + it('returns error if no auth token', async () => { + await request(app.callback()) + .get('/deposit-webhook-registrations') + .expect(401) + .expect({ + error: 'No token.', + }) + }) + + it('lists registrations for the authenticated account only', async () => { + const registration = + await account.$create( + 'depositWebhookRegistration', + { + description: 'Sandbox deposit listener', + endpointUrl: 'https://partner.example/deposits', + authHeader: 'Authorization', + authToken: 'secret-token', + watchedWallets: ['xion1watchedwallet'], + allowedNativeDenoms: ['uxion'], + allowedCw20Contracts: [], + } + ) + + const { account: anotherAccount } = await getAccountWithAuth() + await anotherAccount.$create( + 'depositWebhookRegistration', + { + description: 'Other', + endpointUrl: 'https://other.example/deposits', + watchedWallets: ['xion1otherwallet'], + allowedNativeDenoms: ['uxion'], + allowedCw20Contracts: [], + } + ) + + await request(app.callback()) + .get('/deposit-webhook-registrations') + .set('Authorization', `Bearer ${token}`) + .expect(200) + .expect({ + registrations: [registration.apiJson], + }) + }) +}) diff --git a/src/server/test/account/updateDepositWebhookRegistration.test.ts b/src/server/test/account/updateDepositWebhookRegistration.test.ts new file mode 100644 index 00000000..2a5753fe --- /dev/null +++ b/src/server/test/account/updateDepositWebhookRegistration.test.ts @@ -0,0 +1,102 @@ +import request from 'supertest' +import { beforeEach, describe, expect, it } from 'vitest' + +import { Account, AccountDepositWebhookRegistration } from '@/db' +import { getAccountWithAuth } from '@/test/utils' + +import { app } from './app' + +describe('PATCH /deposit-webhook-registrations/:id', () => { + let account: Account + let token: string + let registration: AccountDepositWebhookRegistration + + beforeEach(async () => { + const { account: _account, token: _token } = await getAccountWithAuth() + account = _account + token = _token + + registration = await account.$create( + 'depositWebhookRegistration', + { + description: 'Sandbox deposit listener', + endpointUrl: 'https://partner.example/deposits', + authHeader: 'Authorization', + authToken: 'secret-token', + watchedWallets: ['xion1watchedwallet'], + allowedNativeDenoms: ['uxion'], + allowedCw20Contracts: [], + } + ) + }) + + it('returns error if no auth token', async () => { + await request(app.callback()) + .patch(`/deposit-webhook-registrations/${registration.id}`) + .expect(401) + .expect({ + error: 'No token.', + }) + }) + + it('returns error if registration not found', async () => { + await request(app.callback()) + .patch(`/deposit-webhook-registrations/${registration.id + 1}`) + .set('Authorization', `Bearer ${token}`) + .expect(404) + .expect({ + error: 'Deposit webhook registration not found.', + }) + }) + + it('updates a registration', async () => { + const response = await request(app.callback()) + .patch(`/deposit-webhook-registrations/${registration.id}`) + .set('Authorization', `Bearer ${token}`) + .send({ + endpointUrl: 'https://partner.example/prod-deposits', + watchedWallets: ['xion1watchedwallet', 'xion1secondwallet'], + allowedNativeDenoms: [], + allowedCw20Contracts: ['xion1stablecoincontract'], + enabled: false, + }) + .expect(200) + + expect(response.body.registration).toMatchObject({ + id: registration.id, + endpointUrl: 'https://partner.example/prod-deposits', + watchedWallets: ['xion1watchedwallet', 'xion1secondwallet'], + allowedNativeDenoms: [], + allowedCw20Contracts: ['xion1stablecoincontract'], + enabled: false, + }) + + await registration.reload() + expect(registration.apiJson).toEqual(response.body.registration) + }) + + it('rejects removing all watched wallets or asset filters', async () => { + await request(app.callback()) + .patch(`/deposit-webhook-registrations/${registration.id}`) + .set('Authorization', `Bearer ${token}`) + .send({ + watchedWallets: [], + }) + .expect(400) + .expect({ + error: 'At least one watched wallet is required.', + }) + + await request(app.callback()) + .patch(`/deposit-webhook-registrations/${registration.id}`) + .set('Authorization', `Bearer ${token}`) + .send({ + allowedNativeDenoms: [], + allowedCw20Contracts: [], + }) + .expect(400) + .expect({ + error: 'At least one allowed asset filter is required.', + }) + }) +}) diff --git a/src/test/e2e/depositWebhook.e2e.test.ts b/src/test/e2e/depositWebhook.e2e.test.ts new file mode 100644 index 00000000..f50e2b88 --- /dev/null +++ b/src/test/e2e/depositWebhook.e2e.test.ts @@ -0,0 +1,261 @@ +import http, { IncomingMessage } from 'http' + +import { DirectSecp256k1HdWallet } from '@cosmjs/proto-signing' +import { SigningStargateClient, coins } from '@cosmjs/stargate' +import { decodeRawProtobufMsg } from '@dao-dao/types/protobuf/utils' +import request from 'supertest' +import { afterEach, describe, expect, it } from 'vitest' + +import { ConfigManager } from '@/config' +import { AccountDepositWebhookRegistration, Extraction, State } from '@/db' +import { getExtractors } from '@/listener' +import { closeAllBullQueues } from '@/queues' +import { QueueOptions } from '@/queues/base' +import { ExtractQueue, WebhooksQueue } from '@/queues/queues' +import { app as accountApp } from '@/server/test/account/app' +import { BlockIterator } from '@/services' +import { getAccountWithAuth } from '@/test/utils' +import { ExtractableTxInput, ExtractorEnv } from '@/types' +import { AutoCosmWasmClient } from '@/utils' + +const VALIDATOR_MNEMONIC = + 'decorate bright ozone fork gallery riot bus exhaust worth way bone indoor calm squirrel merry zero scheme cotton until shop any excess stage laundry' +const DEPOSIT_AMOUNT = '12345' + +type ReceivedWebhook = { + headers: IncomingMessage['headers'] + body: any +} + +const enabled = process.env.DEPOSIT_WEBHOOK_E2E === 'true' + +const waitFor = async ( + fn: () => Promise, + timeoutMs = 45_000, + intervalMs = 250 +): Promise => { + const startedAt = Date.now() + + while (Date.now() - startedAt < timeoutMs) { + const value = await fn() + if (value !== undefined) { + return value + } + + await new Promise((resolve) => setTimeout(resolve, intervalMs)) + } + + throw new Error(`Timed out after ${timeoutMs}ms.`) +} + +describe.runIf(enabled)('deposit webhook validator e2e', () => { + let receiver: http.Server | undefined + + afterEach(async () => { + await closeAllBullQueues() + await AccountDepositWebhookRegistration.closeActiveRegistrationsCacheSubscription() + await new Promise((resolve, reject) => { + if (!receiver) { + resolve() + return + } + + receiver.close((error) => (error ? reject(error) : resolve())) + receiver = undefined + }) + }) + + it('delivers a webhook for a real inbound native transfer', async () => { + const config = ConfigManager.load() + const rpcUrl = config.remoteRpc + + await State.createSingletonIfMissing(config.chainId) + + const receiverPromise = new Promise((resolve) => { + receiver = http.createServer((req, res) => { + let body = '' + + req.on('data', (chunk) => { + body += chunk.toString() + }) + req.on('end', () => { + res.statusCode = 200 + res.end('ok') + + resolve({ + headers: req.headers, + body: JSON.parse(body), + }) + }) + }) + }) + + await new Promise((resolve, reject) => { + receiver!.listen(0, '127.0.0.1', (error?: Error) => + error ? reject(error) : resolve() + ) + }) + + const receiverPort = ( + receiver!.address() as { + port: number + } + ).port + + const { token } = await getAccountWithAuth() + + const watchedWallet = await DirectSecp256k1HdWallet.generate(12, { + prefix: config.bech32Prefix, + }).then(async (wallet) => (await wallet.getAccounts())[0].address) + + await request(accountApp.callback()) + .post('/deposit-webhook-registrations') + .set('Authorization', `Bearer ${token}`) + .send({ + description: 'validator e2e', + endpointUrl: `http://127.0.0.1:${receiverPort}/deposits`, + authHeader: 'Authorization', + authToken: 'secret-token', + watchedWallets: [watchedWallet], + allowedNativeDenoms: ['uxion'], + enabled: true, + }) + .expect(201) + + const workerOptions: QueueOptions = { + config, + sendWebhooks: true, + } + const extractQueue = new ExtractQueue(workerOptions) + await extractQueue.init() + const extractWorker = extractQueue.getWorker() + const webhooksWorker = new WebhooksQueue(workerOptions).getWorker() + + await Promise.all([ + extractWorker.waitUntilReady(), + webhooksWorker.waitUntilReady(), + ]) + + const autoCosmWasmClient = new AutoCosmWasmClient(rpcUrl) + await autoCosmWasmClient.update() + + const wallet = await DirectSecp256k1HdWallet.fromMnemonic( + VALIDATOR_MNEMONIC, + { + prefix: config.bech32Prefix, + } + ) + const [validatorAccount] = await wallet.getAccounts() + const signingClient = await SigningStargateClient.connectWithSigner( + rpcUrl, + wallet + ) + + await waitFor(async () => + (await signingClient.getHeight()) >= 12 ? true : undefined + ) + const startHeight = await signingClient.getHeight() + + const blockIterator = new BlockIterator({ + rpcUrl, + autoCosmWasmClient, + startHeight, + }) + + const iteratorPromise = blockIterator.iterate({ + onTx: async ( + { hash, code, messages: rawMessages, height, events }, + block + ) => { + if (code !== 0) { + return + } + + const messages = rawMessages.flatMap((message) => { + try { + return decodeRawProtobufMsg(message) + } catch { + return message + } + }) + + const input: ExtractableTxInput = { + hash, + messages, + events, + } + + const env: Pick = { + txHash: hash, + block: { + height: BigInt(height).toString(), + timeUnixMs: BigInt(Date.parse(block.time)).toString(), + timestamp: new Date(block.time).toISOString(), + }, + } + + for (const Extractor of getExtractors()) { + const data = Extractor.match(input) + if (data.length === 0) { + continue + } + + await ExtractQueue.addBulk( + data.map((matched) => ({ + name: `${Extractor.type} (${matched.source})`, + data: { + extractor: Extractor.type, + data: matched, + env, + }, + })) + ) + } + }, + }) + + const result = await signingClient.sendTokens( + validatorAccount.address, + watchedWallet, + coins(DEPOSIT_AMOUNT, 'uxion'), + { + amount: [], + gas: '200000', + } + ) + + expect(result.code).toBe(0) + + const webhook = await receiverPromise + + expect(webhook.headers.authorization).toBe('Bearer secret-token') + expect(webhook.headers['idempotency-key']).toContain( + `${config.chainId}:1:${result.transactionHash}:${watchedWallet}:native:uxion:${DEPOSIT_AMOUNT}` + ) + expect(webhook.body).toMatchObject({ + wallet: watchedWallet, + recipient: watchedWallet, + sender: validatorAccount.address, + denom: 'uxion', + amount: DEPOSIT_AMOUNT, + assetType: 'native', + contractAddress: null, + txHash: result.transactionHash, + }) + + await waitFor(async () => { + const extraction = await Extraction.findOne({ + where: { + txHash: result.transactionHash, + }, + }) + + return extraction ?? undefined + }) + + blockIterator.stopFetching() + await iteratorPromise + + await Promise.all([extractWorker.close(), webhooksWorker.close()]) + }, 90_000) +}) diff --git a/src/test/setup.ts b/src/test/setup.ts index 694746f4..07b2b791 100644 --- a/src/test/setup.ts +++ b/src/test/setup.ts @@ -4,7 +4,7 @@ import './mocks' import { afterAll, beforeAll, beforeEach, vi } from 'vitest' import { ConfigManager } from '@/config' -import { closeDb, loadDb, setup } from '@/db' +import { AccountDepositWebhookRegistration, closeDb, loadDb, setup } from '@/db' import { closeAllBullQueues } from '@/queues' import { setUpRouter } from '@/server/routes' import { app as testAccountApp } from '@/server/test/account/app' @@ -82,6 +82,9 @@ afterAll(async () => { closeDb(), // Close bull queues after all tests. closeAllBullQueues(), + // Close any Redis subscriber opened by deposit webhook registration cache + // invalidation. + AccountDepositWebhookRegistration.closeActiveRegistrationsCacheSubscription(), ]) } }) diff --git a/src/types/config.ts b/src/types/config.ts index a8d42892..f06da410 100644 --- a/src/types/config.ts +++ b/src/types/config.ts @@ -21,10 +21,12 @@ export type DB = { uri?: string } & Pick< > export type Config = { + chainId?: string home: string localRpc?: string remoteRpc: string bech32Prefix: string + webhookTimeoutMs?: number db: { data: DB accounts: DB diff --git a/src/webhooks/webhooks/depositWebhook.test.ts b/src/webhooks/webhooks/depositWebhook.test.ts new file mode 100644 index 00000000..14f7f0c9 --- /dev/null +++ b/src/webhooks/webhooks/depositWebhook.test.ts @@ -0,0 +1,287 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest' + +import { ConfigManager } from '@/config' +import { AccountDepositWebhookRegistration, Extraction } from '@/db' +import { DEPOSIT_WEBHOOK_EXTRACTION_PREFIX } from '@/listener/extractors/xion/depositWebhook' + +import { makeDepositDetectedWebhook } from './depositWebhook' + +describe('Deposit webhook', () => { + const makeRegistration = ( + overrides: Partial = {} + ) => + ({ + id: 7, + accountPublicKey: 'account', + description: 'Sandbox deposit listener', + endpointUrl: 'https://partner.example/deposits', + authHeader: 'Authorization', + authToken: 'secret-token', + watchedWallets: ['xion1watchedwallet'], + allowedNativeDenoms: ['uxion'], + allowedCw20Contracts: [], + enabled: true, + ...overrides, + } as unknown as AccountDepositWebhookRegistration) + + const makeEvent = (data: Record) => + ({ + id: 1, + address: 'xion1watchedwallet', + name: `${DEPOSIT_WEBHOOK_EXTRACTION_PREFIX}idempotency-key`, + blockHeight: '12345', + blockTimeUnixMs: '1700000000000', + txHash: 'test-tx-hash', + data, + } as unknown as Extraction) + + const makeWebhook = () => { + const webhook = makeDepositDetectedWebhook(ConfigManager.load(), {} as any) + if (!webhook) { + throw new Error('Expected deposit webhook to be defined.') + } + + return webhook + } + + beforeEach(() => { + vi.clearAllMocks() + vi.spyOn( + AccountDepositWebhookRegistration, + 'findEnabledByPk' + ).mockResolvedValue(makeRegistration()) + }) + + it('builds a bearer-authenticated endpoint and forwards extraction payload', async () => { + const webhook = makeWebhook() + + const event = makeEvent({ + registrationId: 7, + idempotencyKey: 'idempotency-key', + wallet: 'xion1watchedwallet', + recipient: 'xion1watchedwallet', + sender: 'xion1senderwallet', + amount: '1000000', + assetType: 'native', + denom: 'uxion', + contractAddress: null, + blockHeight: '12345', + blockTimeUnixMs: '1700000000000', + txHash: 'test-tx-hash', + }) + + const endpoint = await (webhook.endpoint as any)(event, {}) + const value = await webhook.getValue(event, async () => null, {} as any) + + expect( + AccountDepositWebhookRegistration.findEnabledByPk + ).toHaveBeenCalledWith(7) + expect(endpoint).toEqual({ + type: 'url', + url: 'https://partner.example/deposits', + method: 'POST', + headers: { + Authorization: 'Bearer secret-token', + 'Idempotency-Key': 'idempotency-key', + }, + }) + expect(value).toEqual({ + idempotencyKey: 'idempotency-key', + wallet: 'xion1watchedwallet', + recipient: 'xion1watchedwallet', + sender: 'xion1senderwallet', + amount: '1000000', + assetType: 'native', + denom: 'uxion', + contractAddress: null, + blockHeight: '12345', + blockTimeUnixMs: '1700000000000', + txHash: 'test-tx-hash', + }) + }) + + it('uses custom auth headers without bearer prefixing', async () => { + vi.mocked( + AccountDepositWebhookRegistration.findEnabledByPk + ).mockResolvedValue( + makeRegistration({ + authHeader: 'X-API-Key', + authToken: 'raw-secret', + }) + ) + + const webhook = makeWebhook() + + const event = makeEvent({ + registrationId: 7, + idempotencyKey: 'idempotency-key', + wallet: 'xion1watchedwallet', + recipient: 'xion1watchedwallet', + sender: null, + amount: '1000000', + assetType: 'native', + denom: 'uxion', + contractAddress: null, + blockHeight: '12345', + blockTimeUnixMs: '1700000000000', + txHash: 'test-tx-hash', + }) + + const endpoint = await (webhook.endpoint as any)(event, {}) + + expect(endpoint).toEqual({ + type: 'url', + url: 'https://partner.example/deposits', + method: 'POST', + headers: { + 'X-API-Key': 'raw-secret', + 'Idempotency-Key': 'idempotency-key', + }, + }) + }) + + it('preserves explicit bearer tokens without double prefixing', async () => { + vi.mocked( + AccountDepositWebhookRegistration.findEnabledByPk + ).mockResolvedValue( + makeRegistration({ + authToken: 'Bearer secret-token', + }) + ) + + const webhook = makeWebhook() + + const event = makeEvent({ + registrationId: 7, + idempotencyKey: 'idempotency-key', + wallet: 'xion1watchedwallet', + recipient: 'xion1watchedwallet', + sender: null, + amount: '1000000', + assetType: 'native', + denom: 'uxion', + contractAddress: null, + blockHeight: '12345', + blockTimeUnixMs: '1700000000000', + txHash: 'test-tx-hash', + }) + + const endpoint = await (webhook.endpoint as any)(event, {}) + + expect(endpoint).toEqual({ + type: 'url', + url: 'https://partner.example/deposits', + method: 'POST', + headers: { + Authorization: 'Bearer secret-token', + 'Idempotency-Key': 'idempotency-key', + }, + }) + }) + + it('sends only the idempotency key when no auth token is configured', async () => { + vi.mocked( + AccountDepositWebhookRegistration.findEnabledByPk + ).mockResolvedValue( + makeRegistration({ + authHeader: null, + authToken: null, + }) + ) + + const webhook = makeWebhook() + + const event = makeEvent({ + registrationId: 7, + idempotencyKey: 'idempotency-key', + wallet: 'xion1watchedwallet', + recipient: 'xion1watchedwallet', + sender: null, + amount: '1000000', + assetType: 'native', + denom: 'uxion', + contractAddress: null, + blockHeight: '12345', + blockTimeUnixMs: '1700000000000', + txHash: 'test-tx-hash', + }) + + const endpoint = await (webhook.endpoint as any)(event, {}) + + expect(endpoint).toEqual({ + type: 'url', + url: 'https://partner.example/deposits', + method: 'POST', + headers: { + 'Idempotency-Key': 'idempotency-key', + }, + }) + }) + + it('skips delivery when the registration no longer exists or is disabled', async () => { + vi.mocked( + AccountDepositWebhookRegistration.findEnabledByPk + ).mockResolvedValue(null) + + const webhook = makeWebhook() + + const event = makeEvent({ + registrationId: 7, + idempotencyKey: 'idempotency-key', + wallet: 'xion1watchedwallet', + recipient: 'xion1watchedwallet', + sender: null, + amount: '1000000', + assetType: 'native', + denom: 'uxion', + contractAddress: null, + blockHeight: '12345', + blockTimeUnixMs: '1700000000000', + txHash: 'test-tx-hash', + }) + + await expect((webhook.endpoint as any)(event, {})).resolves.toBeUndefined() + }) + + it('validates payloads before returning webhook body', async () => { + const webhook = makeWebhook() + + const event = { + ...makeEvent({ + idempotencyKey: 'idempotency-key', + wallet: 'xion1watchedwallet', + }), + name: `${DEPOSIT_WEBHOOK_EXTRACTION_PREFIX}bad-payload`, + } as Extraction + + await expect( + webhook.getValue(event, async () => null, {} as any) + ).rejects.toThrow('Invalid deposit webhook extraction payload') + }) + + it('validates payloads before building the webhook endpoint', async () => { + const webhook = makeWebhook() + + const event = { + ...makeEvent({ + registrationId: 7, + idempotencyKey: 'idempotency-key', + wallet: 'xion1watchedwallet', + recipient: 'xion1watchedwallet', + sender: null, + amount: '1000000', + assetType: 'cw20', + denom: null, + contractAddress: null, + blockHeight: '12345', + blockTimeUnixMs: '1700000000000', + txHash: 'test-tx-hash', + }), + name: `${DEPOSIT_WEBHOOK_EXTRACTION_PREFIX}bad-payload`, + } as Extraction + + await expect((webhook.endpoint as any)(event, {})).rejects.toThrow( + 'Invalid deposit webhook extraction payload' + ) + }) +}) diff --git a/src/webhooks/webhooks/depositWebhook.ts b/src/webhooks/webhooks/depositWebhook.ts new file mode 100644 index 00000000..2b0954e2 --- /dev/null +++ b/src/webhooks/webhooks/depositWebhook.ts @@ -0,0 +1,98 @@ +import { AccountDepositWebhookRegistration, Extraction } from '@/db' +import { DEPOSIT_WEBHOOK_EXTRACTION_PREFIX } from '@/listener/extractors/xion/depositWebhook' +import type { DepositWebhookExtractionData } from '@/listener/extractors/xion/depositWebhook' +import { WebhookMaker, WebhookType } from '@/types' + +const DEFAULT_AUTH_HEADER = 'Authorization' + +const getAuthHeaderValue = (authHeader: string, authToken: string) => + authHeader.toLowerCase() === DEFAULT_AUTH_HEADER.toLowerCase() && + !authToken.toLowerCase().startsWith('bearer ') + ? `Bearer ${authToken}` + : authToken + +const isOptionalString = (value: unknown): value is string | null | undefined => + value === null || value === undefined || typeof value === 'string' + +const getDepositWebhookData = ( + event: Extraction +): DepositWebhookExtractionData => { + const data = event.data + + if ( + !data || + typeof data !== 'object' || + typeof (data as DepositWebhookExtractionData).registrationId !== 'number' || + typeof (data as DepositWebhookExtractionData).idempotencyKey !== 'string' || + typeof (data as DepositWebhookExtractionData).wallet !== 'string' || + typeof (data as DepositWebhookExtractionData).recipient !== 'string' || + typeof (data as DepositWebhookExtractionData).amount !== 'string' || + !['native', 'cw20'].includes( + String((data as DepositWebhookExtractionData).assetType) + ) || + typeof (data as DepositWebhookExtractionData).blockHeight !== 'string' || + typeof (data as DepositWebhookExtractionData).blockTimeUnixMs !== + 'string' || + typeof (data as DepositWebhookExtractionData).txHash !== 'string' || + !isOptionalString((data as DepositWebhookExtractionData).sender) || + !isOptionalString((data as DepositWebhookExtractionData).denom) || + !isOptionalString((data as DepositWebhookExtractionData).contractAddress) + ) { + throw new Error( + `Invalid deposit webhook extraction payload for event ${event.id}.` + ) + } + + const depositData = data as DepositWebhookExtractionData + if ( + (depositData.assetType === 'native' && + typeof depositData.denom !== 'string') || + (depositData.assetType === 'cw20' && + typeof depositData.contractAddress !== 'string') + ) { + throw new Error( + `Invalid deposit webhook extraction payload for event ${event.id}.` + ) + } + + return depositData +} + +export const makeDepositDetectedWebhook: WebhookMaker = ( + _config +) => ({ + filter: { + EventType: Extraction, + matches: (event) => + event.name.startsWith(DEPOSIT_WEBHOOK_EXTRACTION_PREFIX), + }, + endpoint: async (event) => { + const deposit = getDepositWebhookData(event) + const registration = + await AccountDepositWebhookRegistration.findEnabledByPk( + deposit.registrationId + ) + if (!registration) { + return + } + + const header = registration.authHeader || DEFAULT_AUTH_HEADER + + return { + type: WebhookType.Url, + url: registration.endpointUrl, + method: 'POST', + headers: { + ...(registration.authToken && { + [header]: getAuthHeaderValue(header, registration.authToken), + }), + 'Idempotency-Key': deposit.idempotencyKey, + }, + } + }, + getValue: async (event) => { + const { registrationId: _registrationId, ...payload } = + getDepositWebhookData(event) + return payload + }, +}) diff --git a/src/webhooks/webhooks/index.ts b/src/webhooks/webhooks/index.ts index e9315b5c..8fb6b152 100644 --- a/src/webhooks/webhooks/index.ts +++ b/src/webhooks/webhooks/index.ts @@ -5,6 +5,7 @@ import { State, WasmStateEvent } from '@/db' import { WasmCodeService } from '@/services/wasm-codes' import { Config, ProcessedWebhook, Webhook, WebhookMaker } from '@/types' +import * as depositWebhook from './depositWebhook' import * as discord from './discord' import * as indexerCwReceipt from './indexerCwReceipt' import * as notify from './notify' @@ -20,6 +21,7 @@ export const getProcessedWebhooks = async ( const processWebhooks = (config: Config) => { const webhookMakers: WebhookMaker[] = [ // Add webhook makers here. + ...Object.values(depositWebhook), ...Object.values(discord), ...Object.values(telegram), ...Object.values(indexerCwReceipt), diff --git a/xion-test.config.json b/xion-test.config.json new file mode 100644 index 00000000..6ba62669 --- /dev/null +++ b/xion-test.config.json @@ -0,0 +1,38 @@ +{ + "home": "/tmp/argus-test-home", + "chainId": "localxion-1", + "localRpc": "http://xion:26657", + "remoteRpc": "http://xion:26657", + "bech32Prefix": "xion", + "webhookTimeoutMs": 5000, + "redis": { + "host": "redis", + "password": "" + }, + "db": { + "data": { + "dialect": "postgres", + "host": "db", + "database": "test", + "username": "test", + "password": "test" + }, + "accounts": { + "dialect": "postgres", + "host": "db", + "database": "test", + "username": "test", + "password": "test" + } + }, + "payment": { + "cwReceiptAddress": "cwReceiptAddress", + "cwReceiptWebhookSecret": "cwReceiptWebhookSecret", + "nativeDenomAccepted": "uxion", + "creditScaleFactor": 1 + }, + "accountsJwtSecret": "accountsJwtSecret", + "codeIds": { + "dao-dao-core": [1] + } +}