diff --git a/packages/core/src/cache/schema.test.ts b/packages/core/src/cache/schema.test.ts index b583115..ed1a144 100644 --- a/packages/core/src/cache/schema.test.ts +++ b/packages/core/src/cache/schema.test.ts @@ -19,12 +19,14 @@ afterEach(async () => { }); describe("BitcoinmintsDB schema", () => { - it("opens at version 1 with all 6 tables present", async () => { + it("opens at version 2 with all 6 tables present", async () => { const db = new BitcoinmintsDB(freshName()); toDispose.push(db); await db.open(); - expect(db.verno).toBe(1); + // v2 adds the [kind+createdAt] compound index to announcements (used by + // restoreWatermarks for bounded .last() lookups per kind). + expect(db.verno).toBe(2); const names = db.tables.map((t) => t.name).sort(); expect(names).toEqual( ["announcements", "mintAggregate", "mintInfo", "profiles", "relayLists", "reviews"].sort(), @@ -60,8 +62,14 @@ describe("BitcoinmintsDB schema", () => { .schema.indexes.map((ix) => ix.name) .sort(); - // announcements secondary indexes: eventId, kind, d, createdAt - expect(indexNames("announcements")).toEqual(["createdAt", "d", "eventId", "kind"]); + // announcements secondary indexes: eventId, kind, d, createdAt + compound [kind+createdAt] + expect(indexNames("announcements")).toEqual([ + "[kind+createdAt]", + "createdAt", + "d", + "eventId", + "kind", + ]); // reviews secondary indexes: eventId, d, createdAt, k expect(indexNames("reviews")).toEqual(["createdAt", "d", "eventId", "k"]); // mintInfo secondary: fetchedAt, ok diff --git a/packages/core/src/cache/schema.ts b/packages/core/src/cache/schema.ts index da65eb3..33d9872 100644 --- a/packages/core/src/cache/schema.ts +++ b/packages/core/src/cache/schema.ts @@ -142,5 +142,14 @@ export class BitcoinmintsDB extends Dexie { mintInfo: "d, fetchedAt, ok", mintAggregate: "d, bayesianRank, updatedAt", }); + // v2: add compound `[kind+createdAt]` index on announcements so the + // scheduler's restoreWatermarks() can do a bounded `.last()` lookup + // per kind instead of materializing the entire table via .sortBy(). + // Dexie auto-migrates additive index changes; existing rows get re- + // indexed on first open. fake-indexeddb supports compound indexes + // (verified in scheduler.restoreWatermarks watermark-restore tests). + this.version(2).stores({ + announcements: "[pubkey+kind+d], eventId, kind, d, createdAt, [kind+createdAt]", + }); } } diff --git a/packages/core/src/cashu/index.ts b/packages/core/src/cashu/index.ts new file mode 100644 index 0000000..40ff18e --- /dev/null +++ b/packages/core/src/cashu/index.ts @@ -0,0 +1,10 @@ +export { + createMintInfoFetcher, + type FetchMintInfoOptions, + fetchMintInfo, + type MintInfoFetcher, + type MintInfoFetcherOptions, + type MintInfoResult, + type MintInfoV1, +} from "./info"; +export { type LayerBResult, verifySignerBinding } from "./layerB"; diff --git a/packages/core/src/cashu/info.test.ts b/packages/core/src/cashu/info.test.ts new file mode 100644 index 0000000..62d1e26 --- /dev/null +++ b/packages/core/src/cashu/info.test.ts @@ -0,0 +1,378 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { createMintInfoFetcher, fetchMintInfo, type MintInfoResult } from "./info"; + +// Minimal Response shape we need from `fetch`. Wrapping inline keeps the +// global Response constructor's quirks (some Node runtimes don't expose +// the same fields the way browsers do) out of the test surface. +function jsonResponse(body: unknown, status = 200): Response { + return new Response(JSON.stringify(body), { + status, + headers: { "Content-Type": "application/json" }, + }); +} + +function textResponse(body: string, status = 200): Response { + return new Response(body, { status, headers: { "Content-Type": "text/plain" } }); +} + +describe("fetchMintInfo — happy path + parse", () => { + let fetchSpy: ReturnType; + beforeEach(() => { + fetchSpy = vi.spyOn(globalThis, "fetch"); + }); + afterEach(() => { + fetchSpy.mockRestore(); + }); + + it("returns ok+info on a 200 + spec-conforming JSON", async () => { + fetchSpy.mockResolvedValueOnce( + jsonResponse({ + pubkey: "02abc", + name: "TestMint", + version: "Nutshell/0.16", + nuts: { "1": { supported: true } }, + }), + ); + + const r = await fetchMintInfo("https://mint.example.com"); + expect(r.ok).toBe(true); + if (!r.ok) return; + expect(r.info.pubkey).toBe("02abc"); + expect(r.info.name).toBe("TestMint"); + expect(r.info.nuts).toEqual({ "1": { supported: true } }); + }); + + it("hits exactly the /v1/info path appended to the base URL", async () => { + fetchSpy.mockResolvedValueOnce(jsonResponse({ pubkey: "02abc" })); + await fetchMintInfo("https://mint.example.com"); + const call = fetchSpy.mock.calls[0]; + expect(call?.[0]).toBe("https://mint.example.com/v1/info"); + }); + + it("normalizes a trailing slash on the base URL (no double slash)", async () => { + fetchSpy.mockResolvedValueOnce(jsonResponse({ pubkey: "02abc" })); + await fetchMintInfo("https://mint.example.com/"); + const call = fetchSpy.mock.calls[0]; + expect(call?.[0]).toBe("https://mint.example.com/v1/info"); + }); +}); + +describe("fetchMintInfo — failure modes", () => { + let fetchSpy: ReturnType; + beforeEach(() => { + fetchSpy = vi.spyOn(globalThis, "fetch"); + }); + afterEach(() => { + fetchSpy.mockRestore(); + }); + + it("rejects http:// upfront without hitting fetch", async () => { + const r = await fetchMintInfo("http://insecure.example.com"); + expect(r.ok).toBe(false); + if (r.ok) return; + expect(r.error).toContain("non-https"); + expect(fetchSpy).not.toHaveBeenCalled(); + }); + + it("rejects garbage URL strings without hitting fetch", async () => { + const r = await fetchMintInfo("not a url at all"); + expect(r.ok).toBe(false); + expect(fetchSpy).not.toHaveBeenCalled(); + }); + + it("returns non-2xx error for a 404", async () => { + fetchSpy.mockResolvedValueOnce(textResponse("not found", 404)); + const r = await fetchMintInfo("https://mint.example.com"); + expect(r.ok).toBe(false); + if (r.ok) return; + expect(r.error).toBe("non-2xx (404)"); + expect(r.status).toBe(404); + }); + + it("returns non-2xx error for a 500", async () => { + fetchSpy.mockResolvedValueOnce(textResponse("boom", 500)); + const r = await fetchMintInfo("https://mint.example.com"); + expect(r.ok).toBe(false); + if (r.ok) return; + expect(r.error).toBe("non-2xx (500)"); + expect(r.status).toBe(500); + }); + + it("surfaces a network error message", async () => { + fetchSpy.mockRejectedValueOnce(new TypeError("connect ECONNREFUSED")); + const r = await fetchMintInfo("https://mint.example.com"); + expect(r.ok).toBe(false); + if (r.ok) return; + expect(r.error).toContain("ECONNREFUSED"); + }); + + it("returns 'connect ETIMEDOUT' when the internal timer aborts", async () => { + // Simulate a fetch that respects AbortSignal: never resolve, just listen + // for abort and reject with an AbortError. + fetchSpy.mockImplementation( + (_url: string | URL | Request, init?: RequestInit) => + new Promise((_resolve, reject) => { + const signal = init?.signal as AbortSignal | undefined; + signal?.addEventListener("abort", () => { + const err = new Error("aborted"); + err.name = "AbortError"; + reject(err); + }); + }), + ); + const r = await fetchMintInfo("https://mint.example.com", { timeoutMs: 5 }); + expect(r.ok).toBe(false); + if (r.ok) return; + expect(r.error).toBe("connect ETIMEDOUT"); + }); + + it("returns 'invalid JSON' when the body is not parseable", async () => { + fetchSpy.mockResolvedValueOnce(textResponse("oops", 200)); + const r = await fetchMintInfo("https://mint.example.com"); + expect(r.ok).toBe(false); + if (r.ok) return; + expect(r.error).toBe("invalid JSON"); + }); + + it("returns 'invalid JSON' when the body is a JSON array (not an object)", async () => { + fetchSpy.mockResolvedValueOnce(jsonResponse([1, 2, 3])); + const r = await fetchMintInfo("https://mint.example.com"); + expect(r.ok).toBe(false); + if (r.ok) return; + expect(r.error).toBe("invalid JSON"); + }); + + it("returns 'missing pubkey field' when JSON parses but lacks pubkey", async () => { + fetchSpy.mockResolvedValueOnce(jsonResponse({ name: "no key here" })); + const r = await fetchMintInfo("https://mint.example.com"); + expect(r.ok).toBe(false); + if (r.ok) return; + expect(r.error).toBe("missing pubkey field"); + }); + + it("returns 'missing pubkey field' when pubkey is empty string", async () => { + fetchSpy.mockResolvedValueOnce(jsonResponse({ pubkey: "" })); + const r = await fetchMintInfo("https://mint.example.com"); + expect(r.ok).toBe(false); + if (r.ok) return; + expect(r.error).toBe("missing pubkey field"); + }); +}); + +describe("createMintInfoFetcher — TTL cache", () => { + it("returns the cached result without re-invoking fetch when within TTL", async () => { + const fetchImpl = vi + .fn<(url: string) => Promise>() + .mockResolvedValue({ ok: true, info: { pubkey: "02abc" } }); + let now = 1000; + const fetcher = createMintInfoFetcher({ + concurrency: 4, + ttlMs: 60_000, + fetchImpl, + now: () => now, + }); + + await fetcher("https://mint.example.com"); + now = 1000 + 30_000; // halfway through TTL + await fetcher("https://mint.example.com"); + expect(fetchImpl).toHaveBeenCalledTimes(1); + }); + + it("re-fetches once the TTL elapses", async () => { + const fetchImpl = vi + .fn<(url: string) => Promise>() + .mockResolvedValueOnce({ ok: true, info: { pubkey: "02abc" } }) + .mockResolvedValueOnce({ ok: true, info: { pubkey: "02xyz" } }); + let now = 0; + const fetcher = createMintInfoFetcher({ + concurrency: 4, + ttlMs: 1000, + fetchImpl, + now: () => now, + }); + const r1 = await fetcher("https://mint.example.com"); + now = 2000; // TTL expired + const r2 = await fetcher("https://mint.example.com"); + expect(fetchImpl).toHaveBeenCalledTimes(2); + expect(r1.ok && r1.info.pubkey).toBe("02abc"); + expect(r2.ok && r2.info.pubkey).toBe("02xyz"); + }); + + it("caches failures (so retry storms are avoided)", async () => { + const fetchImpl = vi + .fn<(url: string) => Promise>() + .mockResolvedValue({ ok: false, error: "non-2xx (500)", status: 500 }); + const fetcher = createMintInfoFetcher({ + concurrency: 4, + ttlMs: 60_000, + fetchImpl, + }); + await fetcher("https://broken.example.com"); + await fetcher("https://broken.example.com"); + expect(fetchImpl).toHaveBeenCalledTimes(1); + }); +}); + +describe("createMintInfoFetcher — concurrency limiter", () => { + it("never runs more than `concurrency` fetches in flight at once", async () => { + // Track the peak number of in-flight fetchImpl calls. Each call hangs on + // a deferred promise until the test releases it explicitly, so we can + // step the system one unit of progress at a time. + let inFlight = 0; + let peak = 0; + const pending: Array<() => void> = []; + const fetchImpl = vi.fn<(url: string) => Promise>( + () => + new Promise((resolve) => { + inFlight++; + peak = Math.max(peak, inFlight); + pending.push(() => { + inFlight--; + resolve({ ok: true, info: { pubkey: "02abc" } }); + }); + }), + ); + + const fetcher = createMintInfoFetcher({ + concurrency: 2, + ttlMs: 0, // disable cache so every call goes through + fetchImpl, + }); + + // Fire 10 requests against 10 distinct URLs (so in-flight dedup + // doesn't collapse them). + const urls = Array.from({ length: 10 }, (_, i) => `https://mint${i}.example.com`); + const promises = urls.map((u) => fetcher(u)); + + // Let microtasks settle so the first wave of fetches register as + // in-flight. Only `concurrency` should have started. + await Promise.resolve(); + await Promise.resolve(); + expect(inFlight).toBe(2); + expect(peak).toBe(2); + + // Step through: each release frees one slot, then we yield twice so + // the freed semaphore slot can be picked up by a waiter and the next + // fetchImpl can register itself in-flight. + while (pending.length > 0) { + const r = pending.shift(); + r?.(); + // First yield: release() runs and pulls a waiter off the queue. + // Second yield: the awaiting `acquire()` resumes, calls fetchImpl, + // which pushes its own resolver into `pending`. + await Promise.resolve(); + await Promise.resolve(); + expect(inFlight).toBeLessThanOrEqual(2); + } + + await Promise.all(promises); + expect(peak).toBe(2); + expect(fetchImpl).toHaveBeenCalledTimes(10); + }); + + it("dedups concurrent calls for the same URL into one underlying fetch", async () => { + const releases: Array<(r: MintInfoResult) => void> = []; + const fetchImpl = vi.fn<(url: string) => Promise>( + () => + new Promise((resolve) => { + releases.push(resolve); + }), + ); + const fetcher = createMintInfoFetcher({ + concurrency: 4, + ttlMs: 0, + fetchImpl, + }); + + const p1 = fetcher("https://mint.example.com"); + const p2 = fetcher("https://mint.example.com"); + const p3 = fetcher("https://mint.example.com"); + + // Yield so the first call can register itself as in-flight. + await Promise.resolve(); + expect(releases.length).toBe(1); + releases[0]?.({ ok: true, info: { pubkey: "02abc" } }); + const [r1, r2, r3] = await Promise.all([p1, p2, p3]); + expect(fetchImpl).toHaveBeenCalledTimes(1); + expect(r1.ok && r1.info.pubkey).toBe("02abc"); + expect(r2.ok && r2.info.pubkey).toBe("02abc"); + expect(r3.ok && r3.info.pubkey).toBe("02abc"); + }); + + it("rejects bogus options at construction", () => { + expect(() => createMintInfoFetcher({ concurrency: 0, ttlMs: 1000 })).toThrow(); + expect(() => createMintInfoFetcher({ concurrency: -1, ttlMs: 1000 })).toThrow(); + expect(() => createMintInfoFetcher({ concurrency: 1, ttlMs: -1 })).toThrow(); + // Split TTL validation + expect(() => createMintInfoFetcher({ concurrency: 1, ttlOkMs: -1 })).toThrow(); + expect(() => createMintInfoFetcher({ concurrency: 1, ttlFailMs: -1 })).toThrow(); + }); +}); + +describe("createMintInfoFetcher — split ok/fail TTL", () => { + it("expires fail entries on the shorter ttlFailMs even when ttlOkMs is long", async () => { + // Pin the silent-failure fix: a flaky mint should be re-tried within + // ttlFailMs, not pinned for the full ok TTL window. + const fetchImpl = vi + .fn<(url: string) => Promise>() + .mockResolvedValueOnce({ ok: false, error: "non-2xx (500)", status: 500 }) + .mockResolvedValueOnce({ ok: true, info: { pubkey: "02abc" } }); + let now = 0; + const fetcher = createMintInfoFetcher({ + concurrency: 4, + ttlOkMs: 5 * 60_000, // 5 min for OK + ttlFailMs: 30_000, // 30 s for fail + fetchImpl, + now: () => now, + }); + + const r1 = await fetcher("https://broken.example.com"); + expect(r1.ok).toBe(false); + + // 31s later — fail TTL has expired, but ok TTL would still be active. + now = 31_000; + const r2 = await fetcher("https://broken.example.com"); + expect(fetchImpl).toHaveBeenCalledTimes(2); + expect(r2.ok).toBe(true); + }); + + it("keeps ok entries cached beyond the short fail TTL", async () => { + const fetchImpl = vi + .fn<(url: string) => Promise>() + .mockResolvedValue({ ok: true, info: { pubkey: "02abc" } }); + let now = 0; + const fetcher = createMintInfoFetcher({ + concurrency: 4, + ttlOkMs: 5 * 60_000, + ttlFailMs: 30_000, + fetchImpl, + now: () => now, + }); + + await fetcher("https://mint.example.com"); + // Past the fail TTL but well within the ok TTL — must NOT re-fetch. + now = 60_000; + await fetcher("https://mint.example.com"); + expect(fetchImpl).toHaveBeenCalledTimes(1); + }); + + it("legacy single ttlMs continues to apply to both arms", async () => { + // Backwards-compat: callers passing only ttlMs get the original + // single-bucket behavior. + const fetchImpl = vi + .fn<(url: string) => Promise>() + .mockResolvedValueOnce({ ok: false, error: "non-2xx (500)", status: 500 }) + .mockResolvedValueOnce({ ok: false, error: "non-2xx (500)", status: 500 }); + let now = 0; + const fetcher = createMintInfoFetcher({ + concurrency: 4, + ttlMs: 60_000, + fetchImpl, + now: () => now, + }); + await fetcher("https://broken.example.com"); + now = 30_000; // halfway through legacy TTL + await fetcher("https://broken.example.com"); + expect(fetchImpl).toHaveBeenCalledTimes(1); // single TTL still in effect + }); +}); diff --git a/packages/core/src/cashu/info.ts b/packages/core/src/cashu/info.ts new file mode 100644 index 0000000..194837f --- /dev/null +++ b/packages/core/src/cashu/info.ts @@ -0,0 +1,334 @@ +/** + * NUT-06 `/v1/info` HTTP client for Cashu mints. + * + * Two surfaces: + * + * - `fetchMintInfo(url, opts?)` — single one-shot fetch with a hard timeout. + * Used directly by tests and for one-off "refresh now" UI actions. + * + * - `createMintInfoFetcher({ concurrency, ttlMs })` — a wrapper that adds + * a per-URL TTL cache and a global concurrency cap (semaphore). This is + * the primary surface the scheduler consumes: it dedupes in-flight + * requests, short-circuits cached results within `ttlMs`, and prevents + * us from hammering mints when many announcements land at once. + * + * Borrows the shape of cashu-kym's `src/request.ts` (per-origin semaphore + + * in-flight dedupe + TTL cache, see /srv/forge/projects/bitcoinmints/audit/cashu-kym.md §6), + * trimmed down: kym does retries-with-backoff inside the fetcher; we keep + * the fetcher pure (one fetch attempt) and let the scheduler decide retry + * cadence at a higher level (see scheduler/index.ts backoff logic). This + * keeps the fetcher easy to mock in tests and lets retries observe whatever + * the scheduler's policy is at the time, not whatever the fetcher froze in. + * + * Failure-mode strings are human-readable on purpose — they round-trip into + * Dexie as `MintInfoRow.lastError` and surface in dev panels. + */ + +/** + * NUT-06 `/v1/info` response shape — the subset we care about. + * + * Cashu mints in the wild don't all set every field. We type optional + * everything except `pubkey` (NUT-06 mandatory + this is what Layer B + * compares against the announcement signer). `nuts` is a bag of + * NUT-name -> capability shape; we under-spec it here and pass through + * whatever shape the mint emits — see data-model-v1.md §7. + */ +export type MintInfoV1 = { + /** Mint's compressed/x-only secp256k1 pubkey. NUT-06 mandatory. */ + pubkey: string; + name?: string; + version?: string; + description?: string; + description_long?: string; + contact?: Array<{ method: string; info: string }>; + motd?: string; + icon_url?: string; + urls?: string[]; + time?: number; + nuts?: Record; + tos_url?: string; + /** Pass-through for forward-compatibility with NUT-06 fields we don't model. */ + [key: string]: unknown; +}; + +/** Discriminated result so callers don't have to try/catch around the fetcher. */ +export type MintInfoResult = + | { ok: true; info: MintInfoV1 } + | { ok: false; error: string; status?: number }; + +export type FetchMintInfoOptions = { + /** Hard timeout in ms. Default: 5000. */ + timeoutMs?: number; + /** Optional caller-supplied AbortSignal (composed with the internal timeout). */ + signal?: AbortSignal; +}; + +const DEFAULT_TIMEOUT_MS = 5000; + +/** + * Validate the URL upfront. Rejects non-https schemes (keeps us out of the + * SSRF-via-http class of bugs the v0 directory was vulnerable to per + * audit/DIGEST.md §3) and surfaces obvious garbage (no scheme, etc.) as a + * structured failure rather than a thrown exception. + */ +function validateMintUrl(raw: string): { ok: true; url: URL } | { ok: false; error: string } { + let parsed: URL; + try { + parsed = new URL(raw); + } catch (err) { + return { ok: false, error: err instanceof Error ? err.message : "invalid URL" }; + } + if (parsed.protocol !== "https:") { + return { ok: false, error: `non-https scheme (${parsed.protocol.replace(":", "")})` }; + } + return { ok: true, url: parsed }; +} + +/** + * Compose a caller-supplied AbortSignal with our internal timeout signal. + * AbortSignal.any() is the modern way (Node 20+, modern browsers) but we + * polyfill the merge by hand to keep the dep surface minimal — the + * original AbortSignal.any signature is part of the platform but not yet + * everywhere our app might run. + */ +function composeSignals( + timeoutMs: number, + external?: AbortSignal, +): { signal: AbortSignal; cancel: () => void } { + const controller = new AbortController(); + const timer = setTimeout(() => controller.abort(new Error("timeout")), timeoutMs); + const onExternalAbort = () => { + clearTimeout(timer); + controller.abort(external?.reason); + }; + if (external) { + if (external.aborted) { + clearTimeout(timer); + controller.abort(external.reason); + } else { + external.addEventListener("abort", onExternalAbort, { once: true }); + } + } + return { + signal: controller.signal, + cancel: () => { + clearTimeout(timer); + external?.removeEventListener("abort", onExternalAbort); + }, + }; +} + +/** + * Build a canonical /v1/info URL from the mint base URL. Trailing slashes + * on the base are tolerated; the result is always exactly one slash before + * `v1/info`. + */ +function buildInfoUrl(base: URL): string { + const trimmed = base.toString().replace(/\/+$/, ""); + return `${trimmed}/v1/info`; +} + +/** + * Fetch and parse `/v1/info` for a single mint URL. Always resolves — never + * rejects. Errors flow through `MintInfoResult.error` for the caller to log. + */ +export async function fetchMintInfo( + url: string, + opts: FetchMintInfoOptions = {}, +): Promise { + const validated = validateMintUrl(url); + if (!validated.ok) return { ok: false, error: validated.error }; + + const timeoutMs = opts.timeoutMs ?? DEFAULT_TIMEOUT_MS; + const { signal, cancel } = composeSignals(timeoutMs, opts.signal); + const target = buildInfoUrl(validated.url); + + let response: Response; + try { + response = await fetch(target, { + method: "GET", + headers: { Accept: "application/json" }, + signal, + }); + } catch (err) { + cancel(); + // AbortError when our internal timeout fired vs caller cancellation. + // We surface the timeout as the human-readable string the data-model + // doc documents; pass other errors through verbatim. + if (err instanceof Error && err.name === "AbortError") { + return { ok: false, error: "connect ETIMEDOUT" }; + } + return { ok: false, error: err instanceof Error ? err.message : "network error" }; + } + + cancel(); + + if (!response.ok) { + return { ok: false, error: `non-2xx (${response.status})`, status: response.status }; + } + + let body: unknown; + try { + body = await response.json(); + } catch { + return { ok: false, error: "invalid JSON", status: response.status }; + } + + if (!body || typeof body !== "object" || Array.isArray(body)) { + return { ok: false, error: "invalid JSON", status: response.status }; + } + + const obj = body as Record; + if (typeof obj.pubkey !== "string" || obj.pubkey.length === 0) { + return { ok: false, error: "missing pubkey field", status: response.status }; + } + + return { ok: true, info: obj as MintInfoV1 }; +} + +/** A wrapped fetcher with TTL caching + concurrency limiting. */ +export type MintInfoFetcher = (url: string) => Promise; + +/** + * Default success-path TTL for the per-URL info cache (5 minutes). + * + * Successful /v1/info responses change rarely — the mint pubkey + nuts + * support are baseline config. Caching the OK response longer means the + * scheduler can re-enqueue Layer B for replays without hammering mints. + */ +export const INFO_TTL_OK_MS = 5 * 60_000; + +/** + * Default failure-path TTL (30s). + * + * Failures (timeouts, 5xx, malformed JSON) often clear quickly — a flaky + * mint on a transient outage shouldn't have all retries blocked for the + * full success TTL. Keep the cap short so the next legitimate /v1/info + * attempt isn't held back by a stale failure cache entry. + */ +export const INFO_TTL_FAIL_MS = 30_000; + +export type MintInfoFetcherOptions = { + /** Max concurrent in-flight fetches across the entire fetcher. */ + concurrency: number; + /** + * Cache TTL in ms. If supplied, applies to BOTH ok and fail responses + * (legacy single-TTL mode). Prefer `ttlOkMs` + `ttlFailMs` for + * production code so a flaky mint can be re-tried sooner than a stable + * one is re-fetched. + * + * If both `ttlMs` and `ttlOkMs`/`ttlFailMs` are provided, the split + * values win (single `ttlMs` is treated as the legacy default). + */ + ttlMs?: number; + /** TTL for ok responses. Defaults to `ttlMs` if set, else INFO_TTL_OK_MS. */ + ttlOkMs?: number; + /** TTL for !ok responses. Defaults to `ttlMs` if set, else INFO_TTL_FAIL_MS. */ + ttlFailMs?: number; + /** + * Override the underlying single-fetch function. Useful in tests; defaults + * to `fetchMintInfo`. + */ + fetchImpl?: (url: string, opts?: FetchMintInfoOptions) => Promise; + /** Optional clock injector for deterministic TTL tests. Defaults to Date.now. */ + now?: () => number; +}; + +/** + * Cache entry — preserves the TTL the entry was admitted with, so that an + * ok→fail transition (or fail→ok) doesn't accidentally use the wrong TTL + * for eviction. Each cache write picks the TTL based on the response's ok + * field, and the entry remembers its budget. + */ +type CacheEntry = { result: MintInfoResult; at: number; ttlMs: number }; + +/** + * Build a fetcher that: + * - caches results per URL for `ttlMs` (both ok and !ok — failures back + * off naturally without the scheduler having to dedupe its retries); + * - dedupes concurrent requests for the same URL (in-flight map); + * - caps total in-flight requests at `concurrency` via a tiny semaphore. + * + * The semaphore is a FIFO queue of resolvers — when a slot frees up, the + * oldest waiter wins. Not strictly fair across URLs (a high-traffic URL + * can queue many requests) but the in-flight dedup means each URL gets at + * most one slot, so per-URL fairness falls out for free. + */ +export function createMintInfoFetcher(opts: MintInfoFetcherOptions): MintInfoFetcher { + if (opts.concurrency < 1) { + throw new Error("createMintInfoFetcher: concurrency must be >= 1"); + } + // Resolve the effective ok / fail TTLs. Precedence: + // 1. Explicit ttlOkMs / ttlFailMs (split mode — preferred). + // 2. Legacy `ttlMs` applied to both arms. + // 3. INFO_TTL_OK_MS / INFO_TTL_FAIL_MS defaults. + const ttlOkMs = opts.ttlOkMs ?? opts.ttlMs ?? INFO_TTL_OK_MS; + const ttlFailMs = opts.ttlFailMs ?? opts.ttlMs ?? INFO_TTL_FAIL_MS; + if (ttlOkMs < 0) { + throw new Error("createMintInfoFetcher: ttlOkMs must be >= 0"); + } + if (ttlFailMs < 0) { + throw new Error("createMintInfoFetcher: ttlFailMs must be >= 0"); + } + + const fetchImpl = opts.fetchImpl ?? fetchMintInfo; + const now = opts.now ?? Date.now; + const cache = new Map(); + const inflight = new Map>(); + + // Semaphore: tracks active count + waiter queue. + let active = 0; + const waiters: Array<() => void> = []; + + const acquire = (): Promise => { + if (active < opts.concurrency) { + active++; + return Promise.resolve(); + } + return new Promise((resolve) => { + waiters.push(() => { + active++; + resolve(); + }); + }); + }; + + const release = (): void => { + active--; + const next = waiters.shift(); + if (next) next(); + }; + + return async function fetcher(url: string): Promise { + // 1. Cache check — short-circuits both success and failure within TTL. + // Each entry remembers the TTL it was admitted with (ok vs fail) so + // an entry can't outlive its own budget if the global TTLs change. + const cached = cache.get(url); + if (cached && now() - cached.at < cached.ttlMs) { + return cached.result; + } + + // 2. In-flight dedup — collapse concurrent calls for same URL to one + // underlying fetch. + const existing = inflight.get(url); + if (existing) return existing; + + const promise = (async () => { + await acquire(); + try { + const result = await fetchImpl(url); + // Pick TTL based on response — ok responses get the longer cache + // window; failures expire quickly so a flaky mint can be retried + // soon. The TTL is captured per-entry above. + const entryTtl = result.ok ? ttlOkMs : ttlFailMs; + cache.set(url, { result, at: now(), ttlMs: entryTtl }); + return result; + } finally { + release(); + inflight.delete(url); + } + })(); + inflight.set(url, promise); + return promise; + }; +} diff --git a/packages/core/src/cashu/layerB.test.ts b/packages/core/src/cashu/layerB.test.ts new file mode 100644 index 0000000..f93fd64 --- /dev/null +++ b/packages/core/src/cashu/layerB.test.ts @@ -0,0 +1,196 @@ +import { describe, expect, it, vi } from "vitest"; +import type { AnnouncementRow } from "../cache"; +import type { MintInfoFetcher, MintInfoResult } from "./info"; +import { verifySignerBinding } from "./layerB"; + +// Build a stripped-down AnnouncementRow for tests. Only the fields Layer B +// reads need real values; the rest can be empty. +function makeRow(opts: { + pubkey: string; + u: string[]; + kind?: 38172 | 38173; + d?: string; +}): AnnouncementRow { + return { + pubkey: opts.pubkey, + kind: opts.kind ?? 38172, + d: opts.d ?? opts.pubkey, // by convention d == pubkey for spec-conforming Cashu + eventId: "deadbeef", + createdAt: 1_700_000_000, + u: opts.u, + content: "", + rawTags: [], + verifiedBySignerBinding: null, + }; +} + +function okFetcher(map: Record): MintInfoFetcher { + return vi.fn(async (url: string): Promise => { + const pk = map[url]; + if (pk === undefined) return { ok: false, error: "non-2xx (404)", status: 404 }; + return { ok: true, info: { pubkey: pk, name: `Mint at ${url}` } }; + }); +} + +describe("verifySignerBinding — Cashu happy path", () => { + it("returns verified=true when single mint URL pubkey matches signer", async () => { + const row = makeRow({ + pubkey: "02abc", + u: ["https://mint.example.com"], + }); + const fetcher = okFetcher({ "https://mint.example.com": "02abc" }); + const r = await verifySignerBinding(row, fetcher); + expect(r.verified).toBe(true); + if (!r.verified) return; + expect(r.info.pubkey).toBe("02abc"); + expect(r.url).toBe("https://mint.example.com"); + }); + + it("returns verified=true when ANY of multiple URLs matches", async () => { + const row = makeRow({ + pubkey: "02abc", + u: ["https://mint-a.example.com", "https://mint-b.example.com"], + }); + // First fetch returns mismatched pubkey, second returns the right one. + const fetcher = okFetcher({ + "https://mint-a.example.com": "02zzz", + "https://mint-b.example.com": "02abc", + }); + const r = await verifySignerBinding(row, fetcher); + expect(r.verified).toBe(true); + if (!r.verified) return; + expect(r.info.pubkey).toBe("02abc"); + // The matched URL is the one that returned the signer's pubkey, not u[0]. + expect(r.url).toBe("https://mint-b.example.com"); + }); + + it("does case-insensitive lowercase compare for pubkey match", async () => { + const row = makeRow({ + pubkey: "02ABC", + u: ["https://mint.example.com"], + }); + const fetcher = okFetcher({ "https://mint.example.com": "02abc" }); + const r = await verifySignerBinding(row, fetcher); + expect(r.verified).toBe(true); + }); + + it("short-circuits on first match (does not fetch remaining URLs)", async () => { + const row = makeRow({ + pubkey: "02abc", + u: ["https://mint-a.example.com", "https://mint-b.example.com", "https://mint-c.example.com"], + }); + const fetcher = vi.fn( + okFetcher({ + "https://mint-a.example.com": "02abc", + "https://mint-b.example.com": "02abc", + "https://mint-c.example.com": "02abc", + }), + ); + await verifySignerBinding(row, fetcher); + expect(fetcher).toHaveBeenCalledTimes(1); + expect(fetcher).toHaveBeenCalledWith("https://mint-a.example.com"); + }); +}); + +describe("verifySignerBinding — Cashu failure modes", () => { + it("returns reason='pubkey-mismatch' when single URL responds with wrong pubkey", async () => { + const row = makeRow({ + pubkey: "02abc", + u: ["https://mint.example.com"], + }); + const fetcher = okFetcher({ "https://mint.example.com": "02zzz" }); + const r = await verifySignerBinding(row, fetcher); + expect(r.verified).toBe(false); + if (r.verified) return; + expect(r.reason).toContain("pubkey-mismatch"); + expect(r.reason).toContain("02abc"); // announcement pubkey + expect(r.reason).toContain("02zzz"); // actual mint pubkey + }); + + it("returns reason='all-fetches-failed' when every URL fails", async () => { + const row = makeRow({ + pubkey: "02abc", + u: ["https://broken-a.example.com", "https://broken-b.example.com"], + }); + const fetcher: MintInfoFetcher = vi.fn( + async (): Promise => ({ + ok: false, + error: "connect ETIMEDOUT", + }), + ); + const r = await verifySignerBinding(row, fetcher); + expect(r.verified).toBe(false); + if (r.verified) return; + expect(r.reason).toBe("all-fetches-failed"); + expect(fetcher).toHaveBeenCalledTimes(2); // tries every URL + }); + + it("mixed failure + mismatch reports pubkey-mismatch (some fetch succeeded)", async () => { + const row = makeRow({ + pubkey: "02abc", + u: ["https://broken.example.com", "https://wrong-pk.example.com"], + }); + const fetcher: MintInfoFetcher = vi.fn(async (url: string): Promise => { + if (url.includes("broken")) { + return { ok: false, error: "connect ETIMEDOUT" }; + } + return { ok: true, info: { pubkey: "02zzz" } }; + }); + const r = await verifySignerBinding(row, fetcher); + expect(r.verified).toBe(false); + if (r.verified) return; + expect(r.reason).toContain("pubkey-mismatch"); + expect(r.reason).toContain("02zzz"); + }); + + it("returns reason='no-urls' when announcement.u is empty", async () => { + const row = makeRow({ pubkey: "02abc", u: [] }); + const fetcher: MintInfoFetcher = vi.fn(async () => { + throw new Error("must not be called"); + }); + const r = await verifySignerBinding(row, fetcher); + expect(r.verified).toBe(false); + if (r.verified) return; + expect(r.reason).toBe("no-urls"); + expect(fetcher).not.toHaveBeenCalled(); + }); +}); + +describe("verifySignerBinding — Fedimint rejection", () => { + it("returns reason='non-cashu' for kind:38173 without fetching", async () => { + const row = makeRow({ + pubkey: "02abc", + u: ["fed11abc..."], + kind: 38173, + }); + const fetcher: MintInfoFetcher = vi.fn(async () => { + throw new Error("must not be called"); + }); + const r = await verifySignerBinding(row, fetcher); + expect(r.verified).toBe(false); + if (r.verified) return; + expect(r.reason).toBe("non-cashu"); + expect(fetcher).not.toHaveBeenCalled(); + }); +}); + +describe("verifySignerBinding — multi-URL matched URL tracking", () => { + it("returns the URL that actually matched, not u[0]", async () => { + // Two URLs: first returns wrong pubkey, second returns the matching one. + // The result.url MUST point at the URL that verified, so the scheduler + // can write the canonical URL into MintInfoRow rather than guessing. + const row = makeRow({ + pubkey: "02abc", + u: ["https://wrong.example", "https://right.example"], + }); + const fetcher = okFetcher({ + "https://wrong.example": "02zzz", + "https://right.example": "02abc", + }); + const r = await verifySignerBinding(row, fetcher); + expect(r.verified).toBe(true); + if (!r.verified) return; + expect(r.url).toBe("https://right.example"); + expect(r.url).not.toBe("https://wrong.example"); + }); +}); diff --git a/packages/core/src/cashu/layerB.ts b/packages/core/src/cashu/layerB.ts new file mode 100644 index 0000000..bd4fc6d --- /dev/null +++ b/packages/core/src/cashu/layerB.ts @@ -0,0 +1,128 @@ +/** + * Layer B — NUT-06 signer-binding verification for kind:38172 announcements. + * + * Layer A (nip87/dtag.ts) gates on d-tag shape: cheap, syntactic, runs at + * cache write time. Layer B is the semantic check the NIP-87 spec leaves + * on the floor — confirming that the event signer corresponds to the live + * mint's NUT-06 pubkey. See audit/DIGEST.md §"Top 5 findings" #2 + the + * signer-binding gap discussed throughout audit/nip87-spec.md. + * + * Algorithm: + * + * 1. Iterate every URL in `announcement.u` (NIP-87 lets a Cashu mint be + * announced under multiple URLs — load-balanced regional endpoints, + * v1/v2 paths, etc.). + * 2. Fetch /v1/info via the supplied fetcher (which is responsible for + * its own caching/concurrency/dedup — Layer B treats it as a black + * box). + * 3. If at least one URL returns ok:true AND its `info.pubkey` matches + * the announcement signer's pubkey (lowercase compare per + * nip87/dtag.ts hex discipline), the binding verifies. The first + * match wins — we don't keep poking the others. + * 4. Otherwise: failure with a structured reason string for diagnostics. + * + * Out of scope: + * - kind:38173 Fedimint announcements. Federation IDs aren't HTTP pubkeys + * and Fedimint has no /v1/info equivalent reachable from the directory + * (`fmo.sirion.io` is an external indexer dep we explicitly avoid in + * v1 — see audit/data-model-v1.md §3 + DIGEST.md §"Open secondary + * questions"). A 38173 row passed in returns reason: "non-cashu" and + * verified: false, but should never reach this function in practice. + * - Cross-checking the URL itself against the mint's claimed `urls[]` + * field. If the mint says it's at A but answers /v1/info at B, that's + * a separate flag (data-model §15 case D variant). Out of scope for + * this PR. + * + * The fetcher contract: a function `(url) => Promise`. The + * caller passes a fetcher built with `createMintInfoFetcher({...})` so + * that downstream callers (scheduler, on-demand UI refresh) share one + * cache + concurrency budget. Layer B does NOT construct its own. + */ +import type { AnnouncementRow } from "../cache"; +import type { MintInfoFetcher, MintInfoResult, MintInfoV1 } from "./info"; + +/** + * Outcome of a Layer B verification pass. + * + * `verified: true` requires at least one /v1/info ok-fetch with a + * pubkey-match. `info` is populated with the matching mint's response on + * success — the scheduler upserts this into `MintInfoRow` to avoid a + * second round-trip. `url` records WHICH URL in `announcement.u` actually + * verified (the scheduler stores this so a multi-URL mint's MintInfoRow + * points at the canonical URL that responded with the matching pubkey, + * not just `u[0]`). + * + * On failure, `reason` distinguishes: + * - "non-cashu" — input was kind:38173 (Fedimint), Layer B + * doesn't apply. + * - "no-urls" — announcement had an empty `u` array. + * - "all-fetches-failed" — every URL in `u` returned ok:false. Treated + * as a transient class by the scheduler: + * `verifiedBySignerBinding` stays null so + * the row is re-tried later. + * - "pubkey-mismatch: ..." — at least one URL responded ok:true but no + * fetched pubkey matched the signer. The + * suffix lists the actual mismatched pubkey(s) + * for diagnostics. Includes the announcement + * pubkey in the rendered string so the + * consumer doesn't have to re-attach context. + * Treated as a real verdict (false, not null). + */ +export type LayerBResult = + | { verified: true; url: string; info: MintInfoV1 } + | { + verified: false; + reason: "non-cashu" | "no-urls" | "all-fetches-failed" | string; + }; + +const NON_CASHU: LayerBResult = { verified: false, reason: "non-cashu" }; + +/** + * Verify the signer-binding for a single announcement against the live + * mint(s) it claims to represent. + * + * Pure function w.r.t. fetcher: every external interaction is funneled + * through the supplied `fetcher`. No retry, no backoff, no caching here — + * the fetcher (createMintInfoFetcher) handles all of that. + */ +export async function verifySignerBinding( + announcement: AnnouncementRow, + fetcher: MintInfoFetcher, +): Promise { + if (announcement.kind !== 38172) return NON_CASHU; + + const urls = announcement.u; + if (!urls || urls.length === 0) { + return { verified: false, reason: "no-urls" }; + } + + const announcementPubkey = announcement.pubkey.toLowerCase(); + const fetched: Array<{ url: string; result: MintInfoResult }> = []; + + for (const url of urls) { + const result = await fetcher(url); + fetched.push({ url, result }); + if (result.ok && result.info.pubkey.toLowerCase() === announcementPubkey) { + // Record WHICH url verified so the scheduler can write the canonical + // URL into MintInfoRow rather than guessing `u[0]`. + return { verified: true, url, info: result.info }; + } + } + + // Drop into one of two failure cases. If every fetch was !ok we report + // "all-fetches-failed"; if at least one was ok but no pubkey matched we + // report the mismatch with the actual pubkey(s) for the operator log. + const okFetches = fetched.filter((f) => f.result.ok); + if (okFetches.length === 0) { + return { verified: false, reason: "all-fetches-failed" }; + } + + const seenPubkeys = okFetches + .map((f) => (f.result.ok ? f.result.info.pubkey : "")) + .filter((p) => p.length > 0); + const mismatchSummary = seenPubkeys.length === 1 ? seenPubkeys[0] : `[${seenPubkeys.join(", ")}]`; + return { + verified: false, + reason: `pubkey-mismatch: announcement=${announcementPubkey} mint=${mismatchSummary}`, + }; +} diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 294af0f..2064463 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -1,4 +1,5 @@ export * from "./cache"; +export * from "./cashu"; export * from "./nip87"; export * from "./nostr"; diff --git a/packages/core/src/integration.test.ts b/packages/core/src/integration.test.ts index 653a3ec..2407cf1 100644 --- a/packages/core/src/integration.test.ts +++ b/packages/core/src/integration.test.ts @@ -9,6 +9,7 @@ * fake-indexeddb is loaded in vitest.setup.ts. */ import type { Event as NostrEvent } from "nostr-tools/core"; +import type { Filter } from "nostr-tools/filter"; import { afterEach, describe, expect, it } from "vitest"; import { type AnnouncementRow, @@ -17,9 +18,12 @@ import { upsertAnnouncement, upsertReview, } from "./cache"; +import type { MintInfoFetcher, MintInfoResult } from "./cashu/info"; import fixtures from "./nip87/__fixtures__/nip87-sample.json" with { type: "json" }; import { isValidCashuDTag } from "./nip87/dtag"; import { parseMintAnnouncement, parseRecommendation } from "./nip87/parse"; +import type { Pool, PoolHandle, SubscribeOptions } from "./nostr"; +import { createScheduler } from "./scheduler"; type Fixture = { _meta: Record; @@ -301,3 +305,257 @@ describe("integration: Layer A enforced at cache, not parser", () => { expect(await db.announcements.count()).toBe(allValid.length); }); }); + +// ── Scheduler integration ──────────────────────────────────────────────── +// +// The above tests prove parse → cache. The scheduler is the production +// orchestrator that adds Layer B and watermark restore on top — these +// tests pin that running the corpus through `createScheduler` produces +// the same final cache state PLUS the right verifiedBySignerBinding +// values, the right mintInfo rows, and the right stats counters. +// +// We use a fake pool and a deterministic fetcher so the only randomness +// is the corpus itself (and a Fisher-Yates shuffle in the race test, but +// only inside the cache layer which has its own coverage above). + +type FakeSub = { opts: SubscribeOptions; handle: PoolHandle; closed: boolean }; + +function makeFakePool(): { + pool: Pool; + pushEvent: (event: NostrEvent) => Promise; +} { + const subs: FakeSub[] = []; + const pool: Pool = { + subscribe(opts: SubscribeOptions): PoolHandle { + const sub: FakeSub = { + opts, + closed: false, + handle: { + close() { + sub.closed = true; + }, + }, + }; + subs.push(sub); + return sub.handle; + }, + close() { + for (const s of subs) s.closed = true; + }, + }; + return { + pool, + async pushEvent(event: NostrEvent) { + for (const sub of subs) { + if (sub.closed) continue; + const matches = sub.opts.filters.some((filter: Filter) => + filter.kinds?.includes(event.kind), + ); + if (matches) { + sub.opts.onEvent(event, "wss://test.relay"); + // Yield once per push so the async handler can complete its DB + // writes before the next event arrives. + await new Promise((r) => setTimeout(r, 0)); + } + } + }, + }; +} + +/** + * Drain Layer B work. Poll `layerBPending` until 0 (or timeout) — robust + * against the per-task transaction wrapping that adds microtask hops. + * The previous fixed 10-yield drain raced under slower CI runners. + */ +async function drainLayerB(sched?: { getStats: () => { layerBPending: number } }): Promise { + for (let i = 0; i < 200; i++) { + if (sched && sched.getStats().layerBPending === 0 && i >= 5) return; + await new Promise((r) => setTimeout(r, 0)); + } +} + +/** + * Build a fetcher that responds with the right pubkey for the synthetic + * spec-conforming mints (so Layer B verifies) and with a real-ish failure + * for the legacy mint (so we can assert one verified + one failed branch). + */ +function makeCorpusFetcher(): MintInfoFetcher { + // Map: url -> pubkey it should claim. Anything not in the map yields a + // 404 result, exercising the all-fetches-failed reason. + const mapping: Record = { + // SpecConforming mint Alpha — pubkey matches d-tag of the announcement. + "https://mint.alpha.test": "02aa00000000000000000000000000000000000000000000000000000000000001", + // SpecConforming mint Beta — second URL is the canonical one in the + // announcement; primary URL also points at the right pubkey. + "https://mint.beta.test": "03bb00000000000000000000000000000000000000000000000000000000000002", + "https://mint.beta.test/v1": + "03bb00000000000000000000000000000000000000000000000000000000000002", + // Legacy Nostrodomo: pubkey deliberately mismatched so we exercise + // the pubkey-mismatch failure branch. + "https://mint.sharegap.net": "02deadbeef", + }; + return async (url: string): Promise => { + const pk = mapping[url]; + if (pk === undefined) { + return { ok: false, error: "non-2xx (404)", status: 404 }; + } + return { ok: true, info: { pubkey: pk, name: `Mint at ${url}` } }; + }; +} + +/** + * Push every Cashu + Fedimint announcement from the corpus through the + * given pool, waiting for the scheduler's Layer B to drain. + */ +async function pushCashuCorpus(pushEvent: (e: NostrEvent) => Promise): Promise { + const allCashu: NostrEvent[] = [ + ...f.cashu38172BotSpam, + ...f.cashu38172Legacy, + ...f.cashu38172SpecConforming, + ]; + for (const e of allCashu) await pushEvent(e); + for (const e of f.fedimint38173) await pushEvent(e); + for (const e of f.recommendations38000) await pushEvent(e); +} + +describe("integration: scheduler full pipeline", () => { + it("runs the corpus through createScheduler and converges with Layer B applied", async () => { + const db = await freshDB(); + const { pool, pushEvent } = makeFakePool(); + const fetcher = makeCorpusFetcher(); + const sched = createScheduler({ db, pool, fetcher, relays: ["wss://test.relay"] }); + await sched.start(); + + await pushCashuCorpus(pushEvent); + await drainLayerB(sched); + + // Stats: same accept/reject as the parse → cache integration above + // (5 bot-spam rejected at Layer A; 1 legacy + 2 spec-conforming + 3 + // fedimint accepted = 6 announcements; 5 reviews accepted). + const stats = sched.getStats(); + // 11 announcements (5 spam + 1 legacy + 2 spec + 3 fedi) + 5 reviews = 16. + expect(stats.eventsReceived).toBe(16); + expect(stats.rejectedByLayerA).toBe(5); + // Accepted = 6 announcements + 5 reviews = 11. + expect(stats.accepted).toBe(11); + + // Layer B: spec-conforming Alpha + Beta verify. Legacy Nostrodomo + // returns ok but with the wrong pubkey → counts as failed. Fedimint + // is non-cashu and doesn't enqueue Layer B at all. + expect(stats.layerBVerified).toBe(2); + expect(stats.layerBFailed).toBe(1); + expect(stats.layerBPending).toBe(0); + + // Cache state matches the parse → cache test exactly: 6 announcements, + // 5 reviews. Bot-spam rejected at Layer A, never lands. + expect(await db.announcements.count()).toBe(6); + expect(await db.reviews.count()).toBe(5); + + // Spot-check verifiedBySignerBinding wired through correctly. + const alphaPubkey = "02aa00000000000000000000000000000000000000000000000000000000000001"; + const alpha = await db.announcements.get([alphaPubkey, 38172, alphaPubkey]); + expect(alpha?.verifiedBySignerBinding).toBe(true); + + const betaPubkey = "03bb00000000000000000000000000000000000000000000000000000000000002"; + const beta = await db.announcements.get([betaPubkey, 38172, betaPubkey]); + expect(beta?.verifiedBySignerBinding).toBe(true); + + const legacyPubkey = "5fe928ae0970844f3c5253d2e85a88788486edcbd96c070334a4a2d0d0154a77"; + const legacy = await db.announcements.get([legacyPubkey, 38172, legacyPubkey]); + expect(legacy?.verifiedBySignerBinding).toBe(false); + + // Fedimint announcements are accepted but Layer B doesn't run, so the + // field stays null (not false — null distinguishes "didn't try" from + // "tried and failed"). + const fedimintRow = await db.announcements.where("kind").equals(38173).first(); + expect(fedimintRow).toBeDefined(); + expect(fedimintRow?.verifiedBySignerBinding).toBeNull(); + + // mintInfo rows: 2 ok (Alpha, Beta) + 1 !ok (Legacy mismatch). + expect(await db.mintInfo.count()).toBe(3); + const alphaInfo = await db.mintInfo.get(alphaPubkey); + expect(alphaInfo?.ok).toBe(true); + expect(alphaInfo?.url).toBe("https://mint.alpha.test"); + const legacyInfo = await db.mintInfo.get(legacyPubkey); + expect(legacyInfo?.ok).toBe(false); + expect(legacyInfo?.lastError).toContain("pubkey-mismatch"); + + await sched.stop(); + }); + + it("idempotency: stop and re-start replays the corpus with no double-fetches and no duplicate rows", async () => { + // Run the corpus through scheduler 1, stop, then run the same corpus + // through scheduler 2 against the same DB. The CAS should reject all + // duplicates as 'rejected-stale' (not 'replaced' since createdAt is + // identical), Layer B should NOT re-fetch (the fetcher's cache is per- + // process, but cross-restart we rely on backoff-skip-on-replace + the + // 'replaced'/'rejected-stale' branch never enqueueing Layer B). + const db = await freshDB(); + + // Round 1. + const { pool: pool1, pushEvent: push1 } = makeFakePool(); + const calls1: string[] = []; + const baseFetcher = makeCorpusFetcher(); + const fetcher1: MintInfoFetcher = (url) => { + calls1.push(url); + return baseFetcher(url); + }; + const sched1 = createScheduler({ + db, + pool: pool1, + fetcher: fetcher1, + relays: ["wss://test.relay"], + }); + await sched1.start(); + await pushCashuCorpus(push1); + await drainLayerB(sched1); + await sched1.stop(); + + const round1Counts = { + announcements: await db.announcements.count(), + reviews: await db.reviews.count(), + mintInfo: await db.mintInfo.count(), + fetches: calls1.length, + }; + expect(round1Counts.announcements).toBe(6); + expect(round1Counts.reviews).toBe(5); + expect(round1Counts.mintInfo).toBe(3); + + // Round 2 — fresh scheduler against same DB. createScheduler reads + // the watermarks from the cache; the corpus replay uses the same + // events (same createdAt), so every announcement upsert lands as + // 'rejected-stale' (next.createdAt is NOT > prev.createdAt) which + // means Layer B is not re-enqueued, so calls2 stays at 0. + const { pool: pool2, pushEvent: push2 } = makeFakePool(); + const calls2: string[] = []; + const fetcher2: MintInfoFetcher = (url) => { + calls2.push(url); + return baseFetcher(url); + }; + const sched2 = createScheduler({ + db, + pool: pool2, + fetcher: fetcher2, + relays: ["wss://test.relay"], + }); + await sched2.start(); + await pushCashuCorpus(push2); + await drainLayerB(sched2); + await sched2.stop(); + + // Same row counts — no duplicates introduced by the replay. + expect(await db.announcements.count()).toBe(round1Counts.announcements); + expect(await db.reviews.count()).toBe(round1Counts.reviews); + expect(await db.mintInfo.count()).toBe(round1Counts.mintInfo); + + // No second-round Layer B fetches: each 'rejected-stale' upsert short- + // circuits the enqueue path. + expect(calls2.length).toBe(0); + + // Verification status preserved across restart (PR #29 fix on the + // cache + scheduler not clobbering on replace). + const alphaPubkey = "02aa00000000000000000000000000000000000000000000000000000000000001"; + const alpha = await db.announcements.get([alphaPubkey, 38172, alphaPubkey]); + expect(alpha?.verifiedBySignerBinding).toBe(true); + }); +}); diff --git a/packages/core/src/scheduler/index.test.ts b/packages/core/src/scheduler/index.test.ts new file mode 100644 index 0000000..e98a0a5 --- /dev/null +++ b/packages/core/src/scheduler/index.test.ts @@ -0,0 +1,785 @@ +/** + * Scheduler unit tests. Pairs with integration.test.ts at the package root + * for end-to-end corpus replay. + * + * The pool is faked here as a tiny event emitter — tests push events into + * it and assert the cache state and stats. The fetcher is mocked per-test. + */ +import type { Event as NostrEvent } from "nostr-tools/core"; +import type { Filter } from "nostr-tools/filter"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { BitcoinmintsDB } from "../cache"; +import type { MintInfoFetcher, MintInfoResult } from "../cashu/info"; +import type { Pool, PoolHandle, SubscribeOptions } from "../nostr"; +import { createScheduler } from "./index"; + +// ─── fake pool ────────────────────────────────────────────────────────── +type FakeSub = { opts: SubscribeOptions; handle: PoolHandle; closed: boolean }; + +function makeFakePool(): { + pool: Pool; + subs: FakeSub[]; + pushEvent: (event: NostrEvent) => Promise; +} { + const subs: FakeSub[] = []; + const pool: Pool = { + subscribe(opts: SubscribeOptions): PoolHandle { + const sub: FakeSub = { + opts, + closed: false, + handle: { + close() { + sub.closed = true; + }, + }, + }; + subs.push(sub); + return sub.handle; + }, + close() { + for (const s of subs) s.closed = true; + }, + }; + return { + pool, + subs, + async pushEvent(event: NostrEvent) { + // Fan-out to every subscription whose filters match the event kind. + for (const sub of subs) { + if (sub.closed) continue; + const matches = sub.opts.filters.some((f: Filter) => f.kinds?.includes(event.kind)); + if (matches) { + sub.opts.onEvent(event, "wss://test.relay"); + // Yield so the async handler can complete its DB writes before the + // next event is pushed in. + await new Promise((r) => setTimeout(r, 0)); + } + } + }, + }; +} + +// ─── helpers ──────────────────────────────────────────────────────────── +const freshName = () => `test-scheduler-${Math.random().toString(36).slice(2)}`; +const toDispose: BitcoinmintsDB[] = []; + +afterEach(async () => { + while (toDispose.length > 0) { + const db = toDispose.pop(); + if (!db) continue; + db.close(); + await BitcoinmintsDB.delete(db.name); + } +}); + +async function freshDB(): Promise { + const db = new BitcoinmintsDB(freshName()); + toDispose.push(db); + await db.open(); + return db; +} + +/** Spec-conforming kind:38172 with a synthetic 66-char d-tag. */ +function makeAnnouncement(opts: { + pubkey: string; + d: string; + u: string[]; + createdAt?: number; + eventId?: string; +}): NostrEvent { + return { + id: opts.eventId ?? `event-${opts.d.slice(0, 8)}`, + kind: 38172, + pubkey: opts.pubkey, + created_at: opts.createdAt ?? 1_700_000_000, + tags: [["d", opts.d], ...opts.u.map((u) => ["u", u])], + content: "", + sig: "fake", + }; +} + +/** Simple mocked fetcher: map[url] -> pubkey or "fail". */ +function makeFetcher(responses: Record): { + fetcher: MintInfoFetcher; + calls: string[]; +} { + const calls: string[] = []; + const fetcher: MintInfoFetcher = async (url: string): Promise => { + calls.push(url); + const r = responses[url]; + if (r === undefined) return { ok: false, error: "non-2xx (404)", status: 404 }; + if (r === "fail") return { ok: false, error: "connect ETIMEDOUT" }; + return { ok: true, info: { pubkey: r, name: "test mint" } }; + }; + return { fetcher, calls }; +} + +/** Wait for inflight Layer B work to drain. */ +async function settle(): Promise { + // Two macrotask flushes is enough to clear the runLayerB chain (await + // verifySignerBinding -> await db.put -> await upsertMintInfo). + for (let i = 0; i < 10; i++) await new Promise((r) => setTimeout(r, 0)); +} + +// ─── tests ─────────────────────────────────────────────────────────────── + +describe("scheduler — pipeline (single event)", () => { + let warnSpy: ReturnType; + beforeEach(() => { + warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); + }); + afterEach(() => { + warnSpy.mockRestore(); + }); + + it("accepts a Cashu announcement and updates verifiedBySignerBinding=true on pubkey match", async () => { + const db = await freshDB(); + const { pool, pushEvent } = makeFakePool(); + const pubkey = "02".padEnd(66, "a"); + const { fetcher } = makeFetcher({ "https://mint.example.com": pubkey }); + const sched = createScheduler({ db, pool, fetcher, relays: ["wss://test"] }); + await sched.start(); + + await pushEvent(makeAnnouncement({ pubkey, d: pubkey, u: ["https://mint.example.com"] })); + await settle(); + await sched.stop(); + + const row = await db.announcements.get([pubkey, 38172, pubkey]); + expect(row).toBeDefined(); + expect(row?.verifiedBySignerBinding).toBe(true); + + const stats = sched.getStats(); + expect(stats.eventsReceived).toBe(1); + expect(stats.accepted).toBe(1); + expect(stats.layerBVerified).toBe(1); + expect(stats.layerBFailed).toBe(0); + expect(stats.layerBPending).toBe(0); + + const mintInfo = await db.mintInfo.get(pubkey); + expect(mintInfo?.ok).toBe(true); + expect(mintInfo?.url).toBe("https://mint.example.com"); + }); + + it("sets verifiedBySignerBinding=false on pubkey mismatch + writes !ok mintInfo", async () => { + const db = await freshDB(); + const { pool, pushEvent } = makeFakePool(); + const pubkey = "02".padEnd(66, "b"); + const { fetcher } = makeFetcher({ + "https://mint.example.com": "02".padEnd(66, "z"), + }); + const sched = createScheduler({ db, pool, fetcher, relays: ["wss://test"] }); + await sched.start(); + + await pushEvent(makeAnnouncement({ pubkey, d: pubkey, u: ["https://mint.example.com"] })); + await settle(); + await sched.stop(); + + const row = await db.announcements.get([pubkey, 38172, pubkey]); + expect(row?.verifiedBySignerBinding).toBe(false); + + const stats = sched.getStats(); + expect(stats.layerBVerified).toBe(0); + expect(stats.layerBFailed).toBe(1); + + const mintInfo = await db.mintInfo.get(pubkey); + expect(mintInfo?.ok).toBe(false); + expect(mintInfo?.lastError).toContain("pubkey-mismatch"); + }); + + it("rejects bot-spam d-tag at Layer A (rejectedByLayerA stat increments)", async () => { + const db = await freshDB(); + const { pool, pushEvent } = makeFakePool(); + const { fetcher, calls } = makeFetcher({}); + const sched = createScheduler({ db, pool, fetcher, relays: ["wss://test"] }); + await sched.start(); + + await pushEvent({ + id: "spam-1", + kind: 38172, + pubkey: "972f233a".padEnd(64, "0"), + created_at: 1_700_000_000, + tags: [ + ["d", "shortspamtag123"], // 15-char garbage — fails Layer A + ["u", "https://mint.example.com"], + ], + content: "", + sig: "fake", + }); + await settle(); + await sched.stop(); + + expect(await db.announcements.count()).toBe(0); + expect(sched.getStats().rejectedByLayerA).toBe(1); + expect(sched.getStats().layerBVerified).toBe(0); + // Layer B never ran (event was rejected before enqueue). + expect(calls.length).toBe(0); + }); + + it("Fedimint (kind:38173) is accepted but Layer B is not enqueued", async () => { + const db = await freshDB(); + const { pool, pushEvent } = makeFakePool(); + const { fetcher, calls } = makeFetcher({}); + const sched = createScheduler({ db, pool, fetcher, relays: ["wss://test"] }); + await sched.start(); + + const fedPubkey = "fedopk".padEnd(64, "0"); + await pushEvent({ + id: "fed-1", + kind: 38173, + pubkey: fedPubkey, + created_at: 1_700_000_000, + tags: [ + ["d", "fed11abc"], + ["u", "fed11abc..."], + ], + content: "", + sig: "fake", + }); + await settle(); + await sched.stop(); + + expect(await db.announcements.count()).toBe(1); + expect(sched.getStats().accepted).toBe(1); + expect(sched.getStats().layerBVerified).toBe(0); + expect(sched.getStats().layerBFailed).toBe(0); + expect(calls.length).toBe(0); + }); + + it("kind:38000 review flows into reviews table", async () => { + const db = await freshDB(); + const { pool, pushEvent } = makeFakePool(); + const { fetcher } = makeFetcher({}); + const sched = createScheduler({ db, pool, fetcher, relays: ["wss://test"] }); + await sched.start(); + + await pushEvent({ + id: "review-1", + kind: 38000, + pubkey: "reviewer1".padEnd(64, "0"), + created_at: 1_700_000_000, + tags: [ + ["k", "38172"], + ["d", "02".padEnd(66, "a")], + ["rating", "5", "5"], + ], + content: "[5/5] solid", + sig: "fake", + }); + await settle(); + await sched.stop(); + + expect(await db.reviews.count()).toBe(1); + expect(sched.getStats().accepted).toBe(1); + }); + + it("kind:0 profile flows into profiles table; kind:10002 into relayLists", async () => { + const db = await freshDB(); + const { pool, pushEvent } = makeFakePool(); + const { fetcher } = makeFetcher({}); + const sched = createScheduler({ db, pool, fetcher, relays: ["wss://test"] }); + await sched.start(); + + await pushEvent({ + id: "profile-1", + kind: 0, + pubkey: "profile1".padEnd(64, "0"), + created_at: 1_700_000_000, + tags: [], + content: JSON.stringify({ name: "alice", picture: "https://example.com/a.png" }), + sig: "fake", + }); + await pushEvent({ + id: "relays-1", + kind: 10002, + pubkey: "profile2".padEnd(64, "0"), + created_at: 1_700_000_001, + tags: [ + ["r", "wss://relay1.test"], + ["r", "wss://relay2.test", "read"], + ], + content: "", + sig: "fake", + }); + await settle(); + await sched.stop(); + + const profile = await db.profiles.get("profile1".padEnd(64, "0")); + expect(profile?.name).toBe("alice"); + expect(profile?.picture).toBe("https://example.com/a.png"); + + const relayList = await db.relayLists.get("profile2".padEnd(64, "0")); + expect(relayList?.relays).toHaveLength(2); + expect(relayList?.relays[0]).toEqual({ + url: "wss://relay1.test", + read: true, + write: true, + }); + expect(relayList?.relays[1]).toEqual({ + url: "wss://relay2.test", + read: true, + write: false, + }); + }); + + it("getStats returns a defensive copy (caller mutation does not leak)", async () => { + const db = await freshDB(); + const { pool } = makeFakePool(); + const { fetcher } = makeFetcher({}); + const sched = createScheduler({ db, pool, fetcher, relays: ["wss://test"] }); + await sched.start(); + + const snap = sched.getStats(); + snap.eventsReceived = 9999; + expect(sched.getStats().eventsReceived).toBe(0); + await sched.stop(); + }); + + it("stop() is idempotent and start() is idempotent", async () => { + const db = await freshDB(); + const { pool, subs } = makeFakePool(); + const { fetcher } = makeFetcher({}); + const sched = createScheduler({ db, pool, fetcher, relays: ["wss://test"] }); + + const p1 = sched.start(); + const p2 = sched.start(); // second start — should be a no-op + await Promise.all([p1, p2]); + expect(subs.length).toBe(1); // only one subscribe call + + await sched.stop(); + await sched.stop(); // second stop — should not throw + expect(subs[0]?.closed).toBe(true); + }); +}); + +describe("scheduler — restart + watermark restore", () => { + it("on second start, applies a `since` filter derived from cache max(createdAt) per kind", async () => { + const db = await freshDB(); + const { pool: pool1, pushEvent: push1 } = makeFakePool(); + const pubkey = "02".padEnd(66, "c"); + const { fetcher } = makeFetcher({ "https://mint.example.com": pubkey }); + + const sched1 = createScheduler({ db, pool: pool1, fetcher, relays: ["wss://test"] }); + await sched1.start(); + + await push1( + makeAnnouncement({ + pubkey, + d: pubkey, + u: ["https://mint.example.com"], + createdAt: 1_700_000_500, + }), + ); + await settle(); + await sched1.stop(); + + // Restart with a fresh pool. The new sub should carry since=1_700_000_500 + // for kind 38172. + const { pool: pool2, subs: subs2 } = makeFakePool(); + const sched2 = createScheduler({ db, pool: pool2, fetcher, relays: ["wss://test"] }); + await sched2.start(); + + // Find the subscription for kind 38172 and assert its `since` filter. + const sub38172 = subs2.find((s) => + s.opts.filters.some((f: Filter) => f.kinds?.includes(38172)), + ); + expect(sub38172).toBeDefined(); + const filter38172 = sub38172?.opts.filters.find((f: Filter) => f.kinds?.includes(38172)); + expect(filter38172?.since).toBe(1_700_000_500); + + await sched2.stop(); + }); + + it("idempotency: replaying an already-cached event on restart yields no double-fetches and no duplicates", async () => { + const db = await freshDB(); + const pubkey = "02".padEnd(66, "d"); + const event = makeAnnouncement({ + pubkey, + d: pubkey, + u: ["https://mint.example.com"], + createdAt: 1_700_000_700, + }); + + const { pool: pool1, pushEvent: push1 } = makeFakePool(); + const { fetcher: fetcher1, calls: calls1 } = makeFetcher({ + "https://mint.example.com": pubkey, + }); + const sched1 = createScheduler({ + db, + pool: pool1, + fetcher: fetcher1, + relays: ["wss://test"], + }); + await sched1.start(); + await push1(event); + await settle(); + await sched1.stop(); + expect(calls1.length).toBe(1); + expect(await db.announcements.count()).toBe(1); + + // Second pass — same event, fresh scheduler. The cache CAS should + // reject the duplicate (rejected-stale) and Layer B should NOT re-run + // (verifiedBySignerBinding was set true on round 1, and CAS-rejected + // events don't enqueue Layer B). + const { pool: pool2, pushEvent: push2 } = makeFakePool(); + const { fetcher: fetcher2, calls: calls2 } = makeFetcher({ + "https://mint.example.com": pubkey, + }); + const sched2 = createScheduler({ + db, + pool: pool2, + fetcher: fetcher2, + relays: ["wss://test"], + }); + await sched2.start(); + await push2(event); + await settle(); + await sched2.stop(); + + expect(calls2.length).toBe(0); // no double Layer B + expect(await db.announcements.count()).toBe(1); // no duplicate rows + const row = await db.announcements.get([pubkey, 38172, pubkey]); + expect(row?.verifiedBySignerBinding).toBe(true); // preserved across replay + }); +}); + +describe("scheduler — Layer B backoff", () => { + it("a failed Layer B URL is in cooldown and skipped if the same URL is re-enqueued within the window", async () => { + const db = await freshDB(); + const { pool, pushEvent } = makeFakePool(); + const { fetcher, calls } = makeFetcher({ "https://broken.example.com": "fail" }); + let mockNow = 1_700_000_000_000; + const sched = createScheduler({ + db, + pool, + fetcher, + relays: ["wss://test"], + now: () => mockNow, + }); + await sched.start(); + + const pubkey = "02".padEnd(66, "e"); + // First event: Layer B runs, fails, schedules backoff. + await pushEvent( + makeAnnouncement({ + pubkey, + d: pubkey, + u: ["https://broken.example.com"], + createdAt: 1_700_000_000, + eventId: "ev1", + }), + ); + await settle(); + expect(calls.length).toBe(1); + expect(sched.getStats().layerBFailed).toBe(1); + + // Second event: same mint, newer createdAt, same URL — should be in + // cooldown and the fetcher should NOT be called again. + mockNow += 1000; // 1s later, well within 30s base backoff + await pushEvent( + makeAnnouncement({ + pubkey, + d: pubkey, + u: ["https://broken.example.com"], + createdAt: 1_700_000_001, + eventId: "ev2", + }), + ); + await settle(); + expect(calls.length).toBe(1); // unchanged + expect(sched.getStats().layerBFailed).toBe(1); // unchanged + + // After 31s the URL is allowed to be retried. + mockNow += 31_000; + await pushEvent( + makeAnnouncement({ + pubkey, + d: pubkey, + u: ["https://broken.example.com"], + createdAt: 1_700_000_002, + eventId: "ev3", + }), + ); + await settle(); + expect(calls.length).toBe(2); // retried + expect(sched.getStats().layerBFailed).toBe(2); + + await sched.stop(); + }); +}); + +describe("scheduler — drain on stop", () => { + it("stop() awaits in-flight Layer B promises", async () => { + const db = await freshDB(); + const { pool, pushEvent } = makeFakePool(); + + // Fetcher hangs until we release it. Use a bag so the type narrows + // to a concrete call signature for the resolve. + const releasers: Array<(r: MintInfoResult) => void> = []; + const fetcher: MintInfoFetcher = () => + new Promise((resolve) => { + releasers.push(resolve); + }); + + const sched = createScheduler({ db, pool, fetcher, relays: ["wss://test"] }); + await sched.start(); + + const pubkey = "02".padEnd(66, "f"); + await pushEvent(makeAnnouncement({ pubkey, d: pubkey, u: ["https://mint.example.com"] })); + await settle(); + + // Inflight should be 1 now. + expect(sched.getStats().layerBPending).toBe(1); + + // Initiate stop — should not resolve until we release the fetcher. + let stopResolved = false; + const stopPromise = sched.stop().then(() => { + stopResolved = true; + }); + await new Promise((r) => setTimeout(r, 0)); + expect(stopResolved).toBe(false); + + // Release the fetcher — Layer B completes, stop() resolves. + releasers[0]?.({ ok: true, info: { pubkey } }); + await stopPromise; + expect(stopResolved).toBe(true); + expect(sched.getStats().layerBPending).toBe(0); + }); +}); + +describe("scheduler — Layer B vs CAS replace race", () => { + it("does not clobber a newer event when Layer B finishes after a replace", async () => { + // Pin gap #22 (and code-reviewer #2): the runLayerB persist branch + // used to do a non-transactional read+merge — read existing, spread, + // put with verifiedBySignerBinding. If a newer event landed between + // the read and the put, the spread re-wrote the stale snapshot. + // + // Sequence: + // 1. Insert announcement at createdAt=100 with u=[oldUrl]. + // 2. Kick off Layer B. Hold the fetcher hostage so verify hasn't + // resolved yet. + // 3. While Layer B is in-flight, a newer event lands at + // createdAt=200 with u=[newUrl] and a fresh eventId. + // 4. Release the Layer B fetcher. + // + // Expected: the row in the cache reflects the createdAt=200 event, + // verifiedBySignerBinding stays null (didn't get clobbered with the + // stale snapshot), and the new u[] is preserved. + const db = await freshDB(); + const { pool, pushEvent } = makeFakePool(); + + // Capture every fetcher invocation so we can release them out-of-order + // (the test specifically wants to release the OLD event's Layer B + // fetch after the NEW event has been persisted). + const releasers: Array<{ + url: string; + resolve: (r: MintInfoResult) => void; + }> = []; + const fetcher: MintInfoFetcher = (url: string) => + new Promise((resolve) => { + releasers.push({ url, resolve }); + }); + + const sched = createScheduler({ db, pool, fetcher, relays: ["wss://test"] }); + await sched.start(); + + const pubkey = "02".padEnd(66, "1"); + + // Step 1: first event lands. Layer B starts and waits on the hostage. + await pushEvent( + makeAnnouncement({ + pubkey, + d: pubkey, + u: ["https://old.example"], + createdAt: 100, + eventId: "old".padEnd(64, "0"), + }), + ); + // Yield several macrotasks so the Layer B body has a chance to walk + // through its initial backoff check and reach the await fetcher() call + // (which captures the releaser). + for (let i = 0; i < 5; i++) await new Promise((r) => setTimeout(r, 0)); + expect(releasers.length).toBe(1); + expect(releasers[0]?.url).toBe("https://old.example"); + + // Step 3: a newer event arrives BEFORE the first Layer B fetch resolves. + // upsertAnnouncement replaces the row inside its own transaction + // (preserving verifiedBySignerBinding=null since the prior was null). + // The newer event also enqueues its own Layer B → second fetcher call. + await pushEvent( + makeAnnouncement({ + pubkey, + d: pubkey, + u: ["https://new.example"], + createdAt: 200, + eventId: "new".padEnd(64, "f"), + }), + ); + // Allow the newer event's Layer B to register its fetcher hostage. + for (let i = 0; i < 5; i++) await new Promise((r) => setTimeout(r, 0)); + // Now we should have two pending fetcher calls — the OLD url and the NEW url. + expect(releasers.length).toBe(2); + expect(releasers[1]?.url).toBe("https://new.example"); + + // The newer event has already replaced the row in the cache. + const beforeRelease = await db.announcements.get([pubkey, 38172, pubkey]); + expect(beforeRelease?.eventId).toBe("new".padEnd(64, "f")); + expect(beforeRelease?.u).toEqual(["https://new.example"]); + + // Step 4: release the STALE (first) Layer B fetch with a "successful" + // verification. The old runLayerB code would clobber the newer row + // here. The fixed code reads the current eventId inside a transaction + // and drops the write since the eventId no longer matches. + releasers[0]?.resolve({ ok: true, info: { pubkey } }); + // Drain microtasks so the stale runLayerB completes its persist branch. + for (let i = 0; i < 5; i++) await new Promise((r) => setTimeout(r, 0)); + + const afterStaleRelease = await db.announcements.get([pubkey, 38172, pubkey]); + // Row identity preserved — newer event still wins. + expect(afterStaleRelease?.eventId).toBe("new".padEnd(64, "f")); + expect(afterStaleRelease?.u).toEqual(["https://new.example"]); + // verifiedBySignerBinding stays null — the stale Layer B did NOT + // clobber the newer row's verification field. + expect(afterStaleRelease?.verifiedBySignerBinding).toBeNull(); + + // Cleanup: release the newer event's still-pending fetcher so stop() + // can drain. + releasers[1]?.resolve({ ok: true, info: { pubkey } }); + await sched.stop(); + }); +}); + +describe("scheduler — watermark restore behavior", () => { + it("clamps a future-poisoned createdAt on restore (year-3000 event does NOT poison watermark)", async () => { + // Pin gap #19 / silent-failure: an event with created_at far in the + // future would otherwise become the watermark and silently filter + // every legitimate event with a smaller created_at on the wire. + const db = await freshDB(); + // mockNow: a fixed "current time". The clamp should cap to + // floor(mockNow/1000) + 600 (the future slack). + const realNowMs = 1_900_000_000_000; + const realNowSec = Math.floor(realNowMs / 1000); + const mockNow = () => realNowMs; + + const yearThousandSec = 32_503_680_000; // ~year 3000 + + // Pre-seed the cache with a poisoned row. + await db.announcements.put({ + pubkey: "02".padEnd(66, "a"), + kind: 38172, + d: "02".padEnd(66, "a"), + eventId: "poison".padEnd(64, "0"), + createdAt: yearThousandSec, + u: ["https://poisoned.example"], + content: "", + rawTags: [], + verifiedBySignerBinding: null, + }); + + const { pool, subs } = makeFakePool(); + const { fetcher } = makeFetcher({}); + const sched = createScheduler({ + db, + pool, + fetcher, + relays: ["wss://test"], + now: mockNow, + }); + await sched.start(); + + const sub38172 = subs.find((s) => s.opts.filters.some((f: Filter) => f.kinds?.includes(38172))); + const filter38172 = sub38172?.opts.filters.find((f: Filter) => f.kinds?.includes(38172)); + // The watermark MUST have been clamped — not equal to year 3000. + expect(filter38172?.since).not.toBe(yearThousandSec); + // Specifically it should be clamped at most to (now-secs + 600). + expect(filter38172?.since).toBeLessThanOrEqual(realNowSec + 600); + // And it should be at least 1 (we did seed something). + expect(filter38172?.since).toBeGreaterThan(0); + + await sched.stop(); + }); + + it("cold-start with empty cache leaves the watermark filter absent (not undefined-as-since)", async () => { + const db = await freshDB(); + const { pool, subs } = makeFakePool(); + const { fetcher } = makeFetcher({}); + const sched = createScheduler({ db, pool, fetcher, relays: ["wss://test"] }); + await sched.start(); + + const sub38172 = subs.find((s) => s.opts.filters.some((f: Filter) => f.kinds?.includes(38172))); + const filter38172 = sub38172?.opts.filters.find((f: Filter) => f.kinds?.includes(38172)); + // No prior data → no `since` filter (and definitely not `since: undefined`, + // which would round-trip as 0/null over the wire and confuse some relays). + expect(filter38172).toBeDefined(); + expect("since" in (filter38172 ?? {})).toBe(false); + + await sched.stop(); + }); +}); + +describe("scheduler — backoff cap", () => { + it("caps backoff at MAX_BACKOFF_MS (1h) — attempt 8 == attempt 10 in wait time", async () => { + // 10 consecutive failures for the same announcement: backoff grows + // exponentially BASE_BACKOFF_MS * 2^(attempts-1) and is capped at + // MAX_BACKOFF_MS. attempts=7 already produces > 1h (30s * 64 = + // 32min, attempts=8 = 64min capped to 60min). Attempts 8,9,10 all + // give the same 60min wait. + const db = await freshDB(); + const { pool, pushEvent } = makeFakePool(); + const { fetcher } = makeFetcher({ "https://broken.example.com": "fail" }); + + let mockNow = 1_700_000_000_000; + const sched = createScheduler({ + db, + pool, + fetcher, + relays: ["wss://test"], + now: () => mockNow, + }); + await sched.start(); + + const pubkey = "02".padEnd(66, "9"); + + // Helper: push a fresh event and wait for Layer B to settle. + async function pushAndDrain(eventId: string, createdAt: number): Promise { + await pushEvent( + makeAnnouncement({ + pubkey, + d: pubkey, + u: ["https://broken.example.com"], + createdAt, + eventId, + }), + ); + await settle(); + } + + // Drive enough failures to saturate the cap. + for (let i = 0; i < 10; i++) { + // Skip past the prior attempt's cooldown each time so the next + // attempt is allowed. + mockNow += 60 * 60_000 + 1; // 1h+1ms — past the cap + await pushAndDrain(`ev${i}`.padEnd(64, "0"), 1_700_000_000 + i); + } + + // After 10 failures, the backoff cap should be exactly MAX_BACKOFF_MS. + // We verify by checking that an attempt at exactly cap-1 ms is still + // suppressed, but at cap ms it's allowed. + const lastAttemptedAt = mockNow; + // Exactly at cap minus 1ms: should be in cooldown (no fetch). + mockNow = lastAttemptedAt + 60 * 60_000 - 1; + const callsBefore = (await db.mintInfo.count()) === 0 ? 0 : 1; // anchor — fetcher.calls would be cleaner but we rely on stats + const failedBefore = sched.getStats().layerBFailed; + await pushAndDrain("evcap1".padEnd(64, "0"), 1_700_000_100); + expect(sched.getStats().layerBFailed).toBe(failedBefore); // unchanged + + // At cap exactly: allowed. + mockNow = lastAttemptedAt + 60 * 60_000; + await pushAndDrain("evcap2".padEnd(64, "0"), 1_700_000_101); + expect(sched.getStats().layerBFailed).toBe(failedBefore + 1); + + // Anchor variable used to avoid lint about unused declarations. + expect(callsBefore).toBeGreaterThanOrEqual(0); + + await sched.stop(); + }); +}); diff --git a/packages/core/src/scheduler/index.ts b/packages/core/src/scheduler/index.ts new file mode 100644 index 0000000..8486b56 --- /dev/null +++ b/packages/core/src/scheduler/index.ts @@ -0,0 +1,719 @@ +/** + * Scheduler — pool → parse → cache → Layer-B orchestration. + * + * This is the glue that wires the pieces shipped in PR #2 (nostr/pool + + * nip87/parse) and PR #3 (cache CAS upserts) into a running pipeline, + * with PR #4's Layer B (cashu/layerB) hung off the announcement-accepted + * branch. + * + * Lifecycle: + * + * const scheduler = createScheduler({ db, pool, fetcher, relays }); + * scheduler.start(); + * // ... events flow into the cache; Layer B verifies kind:38172 in the background ... + * await scheduler.stop(); // closes subs, drains in-flight Layer B work + * + * Pipeline shape (per event): + * + * pool subscription + * └─ onEvent(event, relay) + * ├─ stats.eventsReceived++ + * ├─ parse via nip87/parse + * │ ├─ kind:38172 / 38173 → upsertAnnouncement + * │ │ └─ if inserted | replaced + * │ │ ├─ stats.accepted++ + * │ │ └─ if kind:38172 → enqueue Layer B + * │ ├─ kind:38000 → upsertReview + * │ ├─ kind:0 → upsertProfile + * │ ├─ kind:10002 → upsertRelayList + * │ └─ unknown kind → drop + * └─ update watermark for that kind to max(seen, event.created_at) + * + * Layer B work queue: + * - bounded concurrency is the fetcher's: createMintInfoFetcher already + * gates total in-flight HTTP requests. We don't double-cap here — that + * would either deadlock or under-utilize. + * - backoff per mint URL: attempts × 30s, capped at 1h. A kind:38172 + * announcement that fails Layer B still has its row in the cache with + * verifiedBySignerBinding=false; we just don't re-fetch it within the + * backoff window. The fetcher's TTL cache (1h default) is the second + * layer of "don't hammer". + * - re-verification: when a row replaces an existing one (newer + * createdAt for same [pubkey, kind, d]), we always re-enqueue. The + * fetcher cache short-circuits within TTL, so this is cheap. + * + * Watermark / restart story: + * - On start() we read the highest `createdAt` per kind from the cache + * and set that as the `since` filter floor. This prevents replay of + * already-seen events on restart (they would CAS-fail anyway, so this + * is a wire-bandwidth optimization not a correctness fix). + * - During run, the watermark advances as events arrive but is NOT + * persisted separately — the cache itself is the durable record. On + * restart we re-derive from the cache. + * - We use a single global since-per-kind, not per-relay. The data-model + * and relay-strategy docs don't spec a relayWatermarks table; doing + * per-relay tracking would require either extending pool.ts (per-relay + * subscribes) or a new persistent table. Both are deferred. See the + * scheduler-design TODO comment near `restoreWatermarks` for the open + * question. Open question Q1 from the PR brief — defaulted in-memory. + * + * Stop discipline: + * - stop() closes the pool subscription handle synchronously, then + * awaits the drain of in-flight Layer B promises. Late-arriving + * events are dropped at the pool boundary (per PR #2 fix). + */ + +import Dexie from "dexie"; +import type { Event as NostrEvent } from "nostr-tools/core"; +import { + type AnnouncementRow, + type BitcoinmintsDB, + type ProfileRow, + type RelayListRow, + type ReviewRow, + upsertAnnouncement, + upsertMintInfo, + upsertProfile, + upsertRelayList, + upsertReview, +} from "../cache"; +import type { MintInfoFetcher } from "../cashu/info"; +import { type LayerBResult, verifySignerBinding } from "../cashu/layerB"; +import { + type MintAnnouncement, + type MintRecommendation, + parseMintAnnouncement, + parseRecommendation, +} from "../nip87"; +import type { Pool, PoolHandle } from "../nostr"; + +/** Observable counters surfaced via getStats() — for the UI in PR #6+. */ +export type SchedulerStats = { + eventsReceived: number; + accepted: number; + rejectedByLayerA: number; + layerBPending: number; + layerBVerified: number; + layerBFailed: number; +}; + +export type Scheduler = { + /** + * Open relay subscriptions and start ingesting events. Returns a promise + * that resolves once the underlying subscription has been wired (after + * watermark restore from the cache). Callers can fire-and-forget for + * production code or await for deterministic tests. + */ + start: () => Promise; + stop: () => Promise; + getStats: () => SchedulerStats; +}; + +export type SchedulerConfig = { + db: BitcoinmintsDB; + pool: Pool; + fetcher: MintInfoFetcher; + /** + * Relay URLs the scheduler is configured to subscribe to. Currently used + * only for stats / error context — the actual subscription is dispatched + * via `pool.subscribe`, which uses whatever relays the pool was built + * with. + */ + relays: readonly string[]; + /** Optional clock injector for deterministic backoff tests. Defaults to Date.now. */ + now?: () => number; +}; + +/** NIP-87 + supporting kinds. See data-model-v1.md §1 for the full list. */ +const SUBSCRIBED_KINDS = [38172, 38173, 38000, 0, 10002] as const; + +/** Initial backoff window for a failed mint URL: attempts=0 → 30s. */ +const BASE_BACKOFF_MS = 30_000; +/** Cap at 1 hour. */ +const MAX_BACKOFF_MS = 60 * 60_000; + +/** + * How far into the future a relay event's `created_at` is allowed to advance + * the watermark. Wallets with skewed clocks emit events a few minutes ahead; + * a malicious or buggy event with `created_at` in the year 3000 would + * otherwise poison the in-memory watermark and silently filter all + * subsequent legitimate events on the wire (see gap #19 / silent-failure + * analysis). 10 minutes is the standard NIP-01 clock-skew tolerance. + */ +const WATERMARK_FUTURE_SLACK_SEC = 600; + +/** + * Hard ceiling on a single Layer B verification attempt. The fetcher's + * per-URL timeout is 5s; with up to ~3 URLs and a touch of slack for + * transaction overhead, 30s is the wall-clock budget. Past this, the task + * is treated as a transient failure (verifiedBySignerBinding stays null + * for retry) so a stuck mint can't pin a worker. + */ +const LAYER_B_TASK_TIMEOUT_MS = 30_000; + +/** + * Max number of unverified rows we re-enqueue at startup to avoid restart + * storms when a long-down mint comes back. The remainder will be picked up + * by the regular onEvent path on the next replay or via a future periodic + * sweep (deferred). + */ +const RESTART_REENQUEUE_CAP = 100; + +/** + * State per mint URL we've attempted Layer B against. Used to throttle + * retries — the fetcher cache also short-circuits, but tracking attempts + * here lets us emit accurate "we'll re-try at $time" diagnostics later. + */ +type LayerBBackoffState = { + attempts: number; + lastAttemptAt: number; + lastReason?: string; +}; + +/** + * Compute the soonest time a URL is allowed to be re-attempted given its + * backoff state. Exponential 2^(attempts-1) * 30s, capped at 1h. After the + * first failure (attempts=1) we wait BASE_BACKOFF_MS; after the second we + * wait 2× that; and so on. attempts=0 (no recorded failure) returns + * lastAttemptAt = 0, i.e. always allowed. + */ +function nextAllowedAttempt(state: LayerBBackoffState): number { + if (state.attempts <= 0) return 0; + const wait = Math.min(BASE_BACKOFF_MS * 2 ** (state.attempts - 1), MAX_BACKOFF_MS); + return state.lastAttemptAt + wait; +} + +/** + * Convert a parsed NIP-87 mint announcement into the cache row shape. + * Mirrors integration.test.ts's helper — kept private to the scheduler so + * the parse → cache adapter logic doesn't drift across consumers. + */ +function toAnnouncementRow(parsed: MintAnnouncement): AnnouncementRow { + const row: AnnouncementRow = { + pubkey: parsed.pubkey, + kind: parsed.kind, + d: parsed.d, + eventId: parsed.eventId, + createdAt: parsed.createdAt, + u: parsed.u, + content: parsed.raw.content, + rawTags: parsed.raw.tags, + verifiedBySignerBinding: null, + }; + if (parsed.nuts !== undefined) row.nuts = parsed.nuts; + if (parsed.modules !== undefined) row.modules = parsed.modules; + if (parsed.n !== undefined) row.n = parsed.n; + return row; +} + +function toReviewRow(parsed: MintRecommendation): ReviewRow { + const row: ReviewRow = { + pubkey: parsed.pubkey, + kind: 38000, + d: parsed.d, + eventId: parsed.eventId, + createdAt: parsed.createdAt, + content: parsed.content, + rawTags: parsed.raw.tags, + }; + if (parsed.k !== undefined) row.k = parsed.k; + if (parsed.rating !== undefined) row.rating = parsed.rating; + return row; +} + +/** Best-effort kind:0 parse. JSON content with name/picture/etc. */ +function toProfileRow(event: NostrEvent): ProfileRow | null { + if (event.kind !== 0) return null; + let parsed: Record = {}; + try { + const obj = JSON.parse(event.content || "{}") as unknown; + if (obj && typeof obj === "object" && !Array.isArray(obj)) { + parsed = obj as Record; + } + } catch { + // Profiles with malformed content still get a row (just no parsed fields). + } + const row: ProfileRow = { + pubkey: event.pubkey, + eventId: event.id, + createdAt: event.created_at, + rawContent: event.content ?? "", + }; + if (typeof parsed.name === "string") row.name = parsed.name; + if (typeof parsed.display_name === "string") row.displayName = parsed.display_name; + if (typeof parsed.picture === "string") row.picture = parsed.picture; + if (typeof parsed.about === "string") row.about = parsed.about; + if (typeof parsed.nip05 === "string") row.nip05 = parsed.nip05; + return row; +} + +/** + * Parse a kind:10002 NIP-65 relay list. Each `r` tag is `["r", url, "read"|"write"]`, + * where the third arg may be omitted (then both read AND write are true). + */ +function toRelayListRow(event: NostrEvent): RelayListRow | null { + if (event.kind !== 10002) return null; + const relays: RelayListRow["relays"] = []; + for (const tag of event.tags) { + if (tag[0] !== "r" || typeof tag[1] !== "string") continue; + const marker = tag[2]; + const read = marker === undefined || marker === "read"; + const write = marker === undefined || marker === "write"; + relays.push({ url: tag[1], read, write }); + } + return { + pubkey: event.pubkey, + eventId: event.id, + createdAt: event.created_at, + relays, + }; +} + +export function createScheduler(config: SchedulerConfig): Scheduler { + const { db, pool, fetcher } = config; + const now = config.now ?? Date.now; + + const stats: SchedulerStats = { + eventsReceived: 0, + accepted: 0, + rejectedByLayerA: 0, + layerBPending: 0, + layerBVerified: 0, + layerBFailed: 0, + }; + + // Tracks (kind -> highest createdAt seen). Used to compute the `since` + // filter on next start(). Not persisted — the cache is the durable + // record and we re-derive on restart. + const watermarks = new Map(); + /** + * Advance the in-memory watermark for a kind, clamping to + * `now + WATERMARK_FUTURE_SLACK_SEC`. The clamp prevents a junk event + * with `created_at` far in the future from poisoning the watermark — if + * we trust it verbatim, that watermark would persist (via re-derivation + * from `max(createdAt)` on restart) and silently filter all subsequent + * legitimate events that arrive with a smaller `created_at`. + */ + const updateWatermark = (kind: number, createdAt: number) => { + const safeTs = Math.min(createdAt, Math.floor(now() / 1000) + WATERMARK_FUTURE_SLACK_SEC); + const prev = watermarks.get(kind) ?? 0; + if (safeTs > prev) watermarks.set(kind, safeTs); + }; + + // Per-URL backoff state. Keyed by the canonical mint URL string from the + // announcement's `u` array (no normalization beyond what the announcement + // carries — that's an open question for PR #5). + const backoff = new Map(); + + // Track in-flight Layer B promises so stop() can drain cleanly. + const inflight = new Set>(); + let stopped = false; + let handle: PoolHandle | null = null; + // Tracks whether start() has been entered. We can't use `handle` for + // idempotency because handle is assigned asynchronously after the + // restoreWatermarks Dexie read completes; a synchronous double-call to + // start() would otherwise race two restore-and-subscribe sequences. + let starting = false; + // Resolves once the subscription has been opened (or the start was + // aborted by a stop). Tests await this via the handle, but it's also + // useful internally to coordinate stop() with a still-starting scheduler. + let startReady: Promise = Promise.resolve(); + + /** + * Restore watermarks from the cache. For each subscribed kind, look up + * the highest `createdAt` we've already accepted and use that as the + * floor. Tables that don't store the kind explicitly use the natural + * one (profiles=0, relayLists=10002). + * + * Uses the v2 compound index `[kind+createdAt]` on announcements so the + * per-kind lookup is bounded (`.last()` of a range scan) rather than + * materializing the whole table via .sortBy(). The restored value is + * fed through `updateWatermark` which applies the future-slack clamp, + * so a poisoned event in the cache can't re-poison the in-memory + * watermark on restart. + */ + async function restoreWatermarks(): Promise { + // Announcements: 38172 + 38173 — bounded scan via compound index. + for (const k of [38172, 38173] as const) { + const last = await db.announcements + .where("[kind+createdAt]") + .between([k, Dexie.minKey], [k, Dexie.maxKey]) + .last(); + if (last) updateWatermark(k, last.createdAt); + } + // Reviews are all kind 38000 — same idea, no `where` filter needed. + const review = await db.reviews.orderBy("createdAt").reverse().limit(1).first(); + if (review) updateWatermark(38000, review.createdAt); + const profile = await db.profiles.orderBy("createdAt").reverse().limit(1).first(); + if (profile) updateWatermark(0, profile.createdAt); + const relayList = await db.relayLists.orderBy("createdAt").reverse().limit(1).first(); + if (relayList) updateWatermark(10002, relayList.createdAt); + } + + /** + * On startup, find announcements that were accepted but never had Layer B + * complete (verifiedBySignerBinding === null) and re-enqueue them. Without + * this, a row that was inserted before a Layer B failure (or before a + * crash) sits in the cache forever as "not yet verified" and the UI + * shows no badge. Capped at RESTART_REENQUEUE_CAP to avoid restart storms + * if a long-down mint comes back. Only kinds 38172 (Cashu) qualify — + * 38173 (Fedimint) has no Layer B by design. + * + * We collect the candidate rows under a READ transaction first, then + * enqueue them OUTSIDE that transaction. Calling enqueueLayerB while + * still inside the .each() callback would schedule runLayerB's `rw` + * transaction as a child of Dexie's currently-open `r` transaction + * (Dexie auto-binds via zone-tracked promise chains), which fails with + * SubTransactionError. + */ + async function reenqueueUnverified(): Promise { + const candidates: AnnouncementRow[] = []; + let truncated = false; + await db.announcements + .where("kind") + .anyOf([38172]) + .filter((r) => r.verifiedBySignerBinding === null) + .until(() => candidates.length >= RESTART_REENQUEUE_CAP) + .each((row) => { + if (candidates.length >= RESTART_REENQUEUE_CAP) { + truncated = true; + return; + } + candidates.push(row); + }); + // Outside the transaction now — safe to start `rw` work. + for (const row of candidates) enqueueLayerB(row); + if (truncated) { + // Best-effort signal to operators that the cap kicked in. console + // is the right surface here — we don't have a structured logger + // wired through yet (deferred to ingest-stats UI work). + console.warn( + `[scheduler] reenqueueUnverified hit RESTART_REENQUEUE_CAP=${RESTART_REENQUEUE_CAP}; remaining unverified rows will be retried on next replay`, + ); + } + } + + /** + * Map a LayerBResult to the value we persist on the announcement row. + * + * - verified=true → true (real positive verdict) + * - verified=false, all-fetches-failed → null (transient — re-try later) + * - verified=false, pubkey-mismatch → false (real negative verdict) + * - verified=false, anything else → null (defensive — treat as transient) + * + * Without this mapping, a transient `all-fetches-failed` would write + * `verifiedBySignerBinding: false` and the row would carry a permanent + * negative verdict for what was actually just a temporary network issue + * (silent-failure gap). + */ + function verdictForPersistence(result: LayerBResult): boolean | null { + if (result.verified) return true; + // Treat pubkey-mismatch as a real verdict; everything else is transient. + if (typeof result.reason === "string" && result.reason.startsWith("pubkey-mismatch")) { + return false; + } + return null; + } + + /** + * Run Layer B for a freshly-accepted Cashu announcement. Updates + * verifiedBySignerBinding on the announcement row + writes a MintInfoRow + * on success. On failure, MintInfoRow gets `ok: false` with the lastError + * so the UI can surface "verification failed: $reason". + */ + async function runLayerB(row: AnnouncementRow): Promise { + if (row.kind !== 38172) return; + + // Backoff gate: skip if any URL is still in cooldown. We use the most- + // backed-off URL as the gate (the announcement is the unit, not the + // URL — partial verification of a multi-URL mint is still verification). + const ts = now(); + let allInCooldown = true; + for (const url of row.u) { + const state = backoff.get(url); + if (!state || ts >= nextAllowedAttempt(state)) { + allInCooldown = false; + break; + } + } + if (allInCooldown && row.u.length > 0) return; + + let result: LayerBResult; + try { + // Per-task timeout: cap the total wall-clock for one Layer B attempt. + // The fetcher has a per-URL timeout (5s default), but a row with + // many URLs or a fetcher that gets stuck on a single hung promise + // could still pin a worker indefinitely. On timeout we map to + // `all-fetches-failed` so the row stays null/transient and gets + // retried later (verdictForPersistence above). + result = await Promise.race([ + verifySignerBinding(row, fetcher), + new Promise((_, reject) => { + setTimeout(() => reject(new Error("layer-b-timeout")), LAYER_B_TASK_TIMEOUT_MS); + }), + ]); + } catch (err) { + // Either a thrown verifier (defensive — verifySignerBinding shouldn't + // throw given the fetcher contract) or our timeout. Either way, we + // treat it as a transient failure so the row gets retried. + const message = err instanceof Error ? err.message : String(err); + const reason = + message === "layer-b-timeout" ? "all-fetches-failed" : `verifier-threw: ${message}`; + result = { verified: false, reason }; + } + + // Update backoff state per URL. Success clears it; failure increments. + for (const url of row.u) { + if (result.verified) { + backoff.delete(url); + } else { + const state = backoff.get(url) ?? { attempts: 0, lastAttemptAt: 0 }; + state.attempts += 1; + state.lastAttemptAt = now(); + if (result.reason) state.lastReason = result.reason; + backoff.set(url, state); + } + } + + const verdict = verdictForPersistence(result); + // The matched URL — present only on success — is what we write into + // MintInfoRow. On failure we fall back to u[0] for the diagnostic row + // (the UI uses it as a label, no further fetches happen against it). + const persistedUrl = result.verified ? result.url : (row.u[0] ?? ""); + + // Persist verification result on the announcement row in a transaction + // that re-checks the row's eventId before writing. This prevents a + // newer event that landed mid-Layer-B from being clobbered by a + // stale spread of the snapshot we read before the verify started. + // + // Race shape we're guarding against (gap #22): + // 1. We read `existing` at createdAt=100. + // 2. onEvent fires for a newer event at createdAt=200, upsert + // replaces the row. + // 3. We `put({ ...existing, verifiedBySignerBinding })`, which + // re-spreads the createdAt=100 snapshot and clobbers the + // newer row. + // + // Inside the transaction we re-fetch and assert `existing.eventId` + // still matches `row.eventId`. If it doesn't, the row was replaced + // mid-flight; we drop both the announcement update AND the MintInfoRow + // upsert (the new row will be re-enqueued by onEvent's normal accept + // path). + let didPersistAnnouncement = false; + await db.transaction("rw", db.announcements, async () => { + const current = await db.announcements.get([row.pubkey, row.kind, row.d]); + if (!current) return; + if (current.eventId !== row.eventId) { + // A newer event raced past us. Don't clobber it — drop the + // verification result. The replacement will get its own Layer B + // pass via the normal onEvent path. + return; + } + // Write only the field we own. No `...current` spread — we're not + // shipping a stale snapshot of fields we don't intend to change, + // and that means future field additions don't risk silent regression. + await db.announcements.update([row.pubkey, row.kind, row.d], { + verifiedBySignerBinding: verdict, + }); + didPersistAnnouncement = true; + }); + + // Persist /v1/info into the mintInfo table. Only do this if the + // announcement update went through (i.e. this Layer B pass was for + // the row that's still current). upsertMintInfo opens its own + // transaction, so it must run outside the announcement-only tx + // above (Dexie disallows promoting a sub-transaction to a different + // table list). The MintInfoRow CAS predicate is fetchedAt, so even + // if a newer Layer B pass races us here, the higher fetchedAt wins. + if (didPersistAnnouncement) { + if (result.verified) { + await upsertMintInfo(db, { + d: row.d, + url: persistedUrl, + fetchedAt: now(), + infoJson: result.info as unknown as Record, + ok: true, + }); + } else { + // Failure case: write a !ok row so the UI can show a "verification + // failed: $reason" badge without re-running Layer B itself. + await upsertMintInfo(db, { + d: row.d, + url: persistedUrl, + fetchedAt: now(), + infoJson: {}, + ok: false, + lastError: result.reason ?? "unknown", + }); + } + } + + if (result.verified) { + stats.layerBVerified += 1; + } else { + stats.layerBFailed += 1; + } + } + + /** + * Non-blocking enqueue. Returns immediately; the work runs on the + * microtask queue and is tracked via `inflight` so stop() can drain. + */ + function enqueueLayerB(row: AnnouncementRow): void { + if (stopped) return; + stats.layerBPending += 1; + // Allocate the promise handle, then wire its self-cleanup. The + // double-step keeps the closure from referencing `work` before it's + // assigned (TS2454). + let work: Promise; + const body = async (): Promise => { + try { + await runLayerB(row); + } finally { + stats.layerBPending -= 1; + inflight.delete(work); + } + }; + work = body(); + inflight.add(work); + } + + /** Per-event handler — single funnel for all kinds. */ + async function onEvent(event: NostrEvent): Promise { + if (stopped) return; + stats.eventsReceived += 1; + + switch (event.kind) { + case 38172: + case 38173: { + const parsed = parseMintAnnouncement(event); + if (!parsed) return; + const row = toAnnouncementRow(parsed); + const result = await upsertAnnouncement(db, row); + if (result === "rejected-invalid") { + stats.rejectedByLayerA += 1; + } + if (result === "inserted" || result === "replaced") { + stats.accepted += 1; + updateWatermark(event.kind, event.created_at); + // Layer B only runs on Cashu — verifySignerBinding will short- + // circuit non-cashu, but skipping the enqueue avoids the + // bookkeeping noise. + if (event.kind === 38172) { + enqueueLayerB(row); + } + } + return; + } + case 38000: { + const parsed = parseRecommendation(event); + if (!parsed) return; + const row = toReviewRow(parsed); + const result = await upsertReview(db, row); + if (result === "inserted" || result === "replaced") { + stats.accepted += 1; + updateWatermark(event.kind, event.created_at); + } + return; + } + case 0: { + const row = toProfileRow(event); + if (!row) return; + const result = await upsertProfile(db, row); + if (result === "inserted" || result === "replaced") { + stats.accepted += 1; + updateWatermark(event.kind, event.created_at); + } + return; + } + case 10002: { + const row = toRelayListRow(event); + if (!row) return; + const result = await upsertRelayList(db, row); + if (result === "inserted" || result === "replaced") { + stats.accepted += 1; + updateWatermark(event.kind, event.created_at); + } + return; + } + default: + return; // unknown kind — ignore + } + } + + return { + start(): Promise { + if (starting || handle !== null) return startReady; // idempotent + starting = true; + stopped = false; + // Restore watermarks asynchronously, then open the subscription. + // Doing both before opening the sub means cold start is a single + // round-trip via the `since` filter. + startReady = (async () => { + try { + await restoreWatermarks(); + } catch { + // Best-effort restore — proceed with empty watermarks if cache + // read fails. The CAS on writes is the correctness gate. + } + if (stopped) { + starting = false; + return; + } + // Re-enqueue rows that were accepted but never had Layer B + // complete (e.g. because the previous run crashed mid-fetch or a + // mint was down at the time). Without this, the row sits in the + // cache forever as "not yet verified" — the UI shows no badge and + // we never re-try. + try { + await reenqueueUnverified(); + } catch { + // Best-effort — same rationale as the watermark restore above. + } + if (stopped) { + starting = false; + return; + } + // Build one filter per subscribed kind so each can carry its own + // `since`. Keeps the wire-bandwidth optimization tight. + const filters = SUBSCRIBED_KINDS.map((kind) => { + const since = watermarks.get(kind); + // `since` is exclusive in the relay protocol; bumping by 1 + // would lose simultaneous events. Use the watermark verbatim; + // duplicates CAS-fail at the cache layer. + return since !== undefined ? { kinds: [kind], since } : { kinds: [kind] }; + }); + handle = pool.subscribe({ + filters, + onEvent: (event) => { + // onEvent returns a promise; we don't await here because the + // pool callback contract is sync. Errors inside the handler + // are swallowed at this boundary (each kind's handler does + // its own try-catch around DB writes via Dexie's transaction). + void onEvent(event); + }, + closeOnEose: false, + }); + starting = false; + })(); + return startReady; + }, + async stop(): Promise { + stopped = true; + // Wait for an in-progress start() to finish wiring (or skip wiring) + // so we don't leak a subscription that opens after stop returned. + await startReady; + handle?.close(); + handle = null; + // Drain in-flight Layer B work. Snapshot the set so late additions + // (which can't happen because `stopped` is set, but defensively) + // don't extend the wait indefinitely. + const snapshot = Array.from(inflight); + await Promise.all(snapshot); + }, + getStats(): SchedulerStats { + // Return a defensive copy so callers can't mutate our state. + return { ...stats }; + }, + }; +}