From 8083a5ed772d72e58ba38814093a019334406ad5 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 2 Jun 2026 23:40:04 +0000 Subject: [PATCH 1/2] Add kernel observation primitive and signals-native fetch cancellation in silo Kernel now owns its reactive operator layer on top of `alien-signals/system`'s `createReactiveSystem` (faithful port of alien-signals 3.x) so it can fire per-node observation callbacks when a reactive node loses its last subscriber. New `onObservationChange` / `getObservationNode` (public) and `trackNode` / `isObserved` (internal); the dispatch is counter-gated so the hot path is unchanged when observation is unused. All kernel + husk imports repointed to the owned system module. Silo replaces its opt-in `subscribe*` ref-counting with observation-driven cancellation: each handle carries a dedicated liveness node that the rendering component subscribes to via `find`/`findQuery`; when the last observer unmounts, the in-flight fetch is interrupted after the `gcTimeMs` grace window (partial-batch rule honored). `useDocument` / `useQuery` stay pure reactive reads. Removed `subscribeDocument` / `subscribeQuery`. https://claude.ai/code/session_018FyYY6YYb1CW5LnBfPMgod --- .changeset/kernel-observation.md | 23 + .changeset/silo-effect-migration.md | 2 +- packages/husk/src/resource.ts | 3 +- packages/js-krauset/vite.config.ts | 3 +- packages/kernel/src/batch.ts | 2 +- packages/kernel/src/collections.ts | 3 +- packages/kernel/src/core.ts | 31 +- packages/kernel/src/index.ts | 18 +- packages/kernel/src/internal.ts | 9 +- packages/kernel/src/read.ts | 3 +- packages/kernel/src/system.ts | 548 ++++++++++++++++++ .../kernel/tests/core/observation.test.ts | 166 ++++++ packages/kernel/tests/core/primitives.test.ts | 355 ++++++++++++ .../tests/read/tracking-isolation.test.ts | 3 +- packages/kernel/vite.config.ts | 1 + packages/silo/README.md | 6 +- packages/silo/src/finder.ts | Bin 16828 -> 18436 bytes packages/silo/src/store.ts | 33 +- packages/silo/tests/cancellation.test.ts | 208 ++++--- .../silo/tests/react/cancellation.test.tsx | 168 ++++++ 20 files changed, 1464 insertions(+), 121 deletions(-) create mode 100644 .changeset/kernel-observation.md create mode 100644 packages/kernel/src/system.ts create mode 100644 packages/kernel/tests/core/observation.test.ts create mode 100644 packages/kernel/tests/core/primitives.test.ts create mode 100644 packages/silo/tests/react/cancellation.test.tsx diff --git a/.changeset/kernel-observation.md b/.changeset/kernel-observation.md new file mode 100644 index 00000000..e7f24f16 --- /dev/null +++ b/.changeset/kernel-observation.md @@ -0,0 +1,23 @@ +--- +"@supergrain/kernel": minor +--- + +Add a reactive-observation lifecycle primitive and own the reactive system. + +The kernel now owns its primitive layer (`signal` / `computed` / `effect` / `batch`) on top of `alien-signals/system`'s `createReactiveSystem(...)` instead of importing the high-level operators from `alien-signals` directly. The graph algorithm (`link` / `unlink` / `propagate` / `checkDirty`) is still delegated to alien-signals — only the thin operator layer is owned, so the kernel can observe when a reactive node loses its last subscriber. All reactive semantics (fine-grained tracking, batching, Map/Set coalescing, `effect` cleanup) are unchanged. + +**New: `onObservationChange`.** Register a callback fired when a reactive node transitions observed→unobserved (its last subscriber is removed) and, optionally, unobserved→observed (it gains its first): + +```ts +import { onObservationChange, getObservationNode } from "@supergrain/kernel"; + +const node = getObservationNode(reactiveProxy); // dedicated, never-written liveness node +const unregister = onObservationChange(node, { + onUnobserved: () => scheduleCleanup(), // defer destructive work; re-check on a timer + onObserved: () => cancelCleanup(), +}); +``` + +`getObservationNode(proxy)` returns a proxy's dedicated liveness node (created lazily, never written, so it never causes a re-render). The sharp tools `trackNode` (subscribe the active sub) and `isObserved` are available from `@supergrain/kernel/internal`. The dispatch is gated behind counters that stay `0` until a handler is registered, so the hot path is unchanged when observation is unused. + +`@supergrain/silo` uses this to cancel an in-flight fetch automatically when no component observes a handle anymore — no `useEffect`, no manual `subscribe*`. diff --git a/.changeset/silo-effect-migration.md b/.changeset/silo-effect-migration.md index 2a9cfb47..152e1401 100644 --- a/.changeset/silo-effect-migration.md +++ b/.changeset/silo-effect-migration.md @@ -10,7 +10,7 @@ Rebuild the network/async layer on an internal [Effect](https://effect.website/) **Per-model `retry` / `timeout`.** `ModelConfig` and `QueryConfig` accept an Effect `Schedule` (`retry`) and a `Duration` (`timeout`). -**Fiber-based cancellation (opt-in).** Each chunk's fetch runs on its own interruptible fiber, and the batch window now runs on `Effect.sleep` (the whole engine is on Effect's clock). Fetch cancellation is exposed as a capability: `subscribeDocument(type, id)` / `subscribeQuery(type, params)` ref-count interest and return an unsubscribe function; when the last subscriber for **every** key in an in-flight chunk goes away, the fetch is interrupted — aborting the request via an `AbortSignal` — and its handles reset to idle so renewed interest refetches. `gcTimeMs` (default `0` = next tick) defers the interrupt so a quick re-subscribe cancels it. The React `useDocument` / `useQuery` hooks are **pure reactive reads** and do not auto-wire this — a signals-native "cancel when a handle has no reactive observers" wants an observation-lifecycle primitive in the kernel core that doesn't exist yet. Adapters receive the signal regardless: `find(ids, { signal })` (optional) — thread it into `fetch(url, { signal })` for a real network abort, or ignore it and interruption just discards the result (no stale write). +**Automatic, signals-native cancellation.** Each chunk's fetch runs on its own interruptible fiber, and the batch window now runs on `Effect.sleep` (the whole engine is on Effect's clock). Fetch cancellation rides the reactive graph itself: every handle carries a dedicated reactive liveness node that the rendering component subscribes to when it reads the handle via `find` / `findQuery`. When the **last** component observing a handle unmounts, the kernel's `onObservationChange` primitive fires and — after a `gcTimeMs` grace window — the in-flight fetch is interrupted (aborting the request via an `AbortSignal`) and its handles reset to idle so renewed interest refetches. A batch is only cancelled when the last observer for **every** key in it goes away; `gcTimeMs` (default `0` = next tick) defers the interrupt so a StrictMode remount or fast nav-back re-subscribes first. The React `useDocument` / `useQuery` hooks stay **pure reactive reads** (no `useEffect`, no imperative subscription) and drive this automatically. Adapters receive the signal regardless: `find(ids, { signal })` (optional) — thread it into `fetch(url, { signal })` for a real network abort, or ignore it and interruption just discards the result (no stale write). **The handle fields changed.** `DocumentHandle` / `QueryHandle` are now a `status`-discriminated union over flat fields: diff --git a/packages/husk/src/resource.ts b/packages/husk/src/resource.ts index 49c25edc..983733e9 100644 --- a/packages/husk/src/resource.ts +++ b/packages/husk/src/resource.ts @@ -1,6 +1,5 @@ -import { createReactive } from "@supergrain/kernel"; +import { createReactive, effect } from "@supergrain/kernel"; import { getActiveSub, setActiveSub } from "@supergrain/kernel/internal"; -import { effect } from "alien-signals"; /** * A resource is a reactive function with cleanup logic — one of two diff --git a/packages/js-krauset/vite.config.ts b/packages/js-krauset/vite.config.ts index d04e32da..cf495be8 100644 --- a/packages/js-krauset/vite.config.ts +++ b/packages/js-krauset/vite.config.ts @@ -23,8 +23,9 @@ export default defineConfig({ // and removes the need for a pnpm workspace when you copy this package. resolve: { alias: { - "@supergrain/kernel": resolve(__dirname, "../kernel/src/index.ts"), "@supergrain/kernel/react": resolve(__dirname, "../kernel/src/react/index.ts"), + "@supergrain/kernel/internal": resolve(__dirname, "../kernel/src/internal.ts"), + "@supergrain/kernel": resolve(__dirname, "../kernel/src/index.ts"), }, }, diff --git a/packages/kernel/src/batch.ts b/packages/kernel/src/batch.ts index f0cd02ca..679576ac 100644 --- a/packages/kernel/src/batch.ts +++ b/packages/kernel/src/batch.ts @@ -1,4 +1,4 @@ -import { startBatch, endBatch } from "alien-signals"; +import { startBatch, endBatch } from "./system"; /** * Run a synchronous callback with all signal writes coalesced into a single diff --git a/packages/kernel/src/collections.ts b/packages/kernel/src/collections.ts index a579ad4e..5a63fbdc 100644 --- a/packages/kernel/src/collections.ts +++ b/packages/kernel/src/collections.ts @@ -13,8 +13,6 @@ * `wrap()` dispatch here when the value is a Map or Set. */ -import { getActiveSub, startBatch, endBatch, signal } from "alien-signals"; - import { $OWN_KEYS, $RAW, @@ -35,6 +33,7 @@ import { profileSignalRead, profileSignalSkip, profileSignalWrite } from "./prof // accesses the imported binding at top-level evaluation time (they don't). // --------------------------------------------------------------------------- import { createReactiveProxy } from "./read"; +import { getActiveSub, startBatch, endBatch, signal } from "./system"; function wrap(value: T): T { if (!isWrappable(value)) { diff --git a/packages/kernel/src/core.ts b/packages/kernel/src/core.ts index 034df4d2..c82f3361 100644 --- a/packages/kernel/src/core.ts +++ b/packages/kernel/src/core.ts @@ -1,4 +1,4 @@ -import { signal } from "alien-signals"; +import { createObservationNode, signal, type ReactiveNode } from "./system"; // Phantom brand for compile-time store identification (no runtime property). // Exported as a real symbol so consumers can reference `typeof $BRAND` in type positions. @@ -27,6 +27,7 @@ export const $PROXY = Symbol.for("supergrain:proxy"); export const $TRACK = Symbol.for("supergrain:track"); export const $RAW = Symbol.for("supergrain:raw"); export const $VERSION = Symbol.for("supergrain:version"); +export const $OBSERVE = Symbol.for("supergrain:observe"); export const $OWN_KEYS = Symbol.for("ownKeys"); // Well-known symbol properties attached to reactive proxy targets and proxies. @@ -98,3 +99,31 @@ export function getNode(nodes: DataNodes, property: PropertyKey, value?: unknown nodes[property] = newSignal; return newSignal; } + +/** + * Retrieve the dedicated "liveness" reactive node for a reactive proxy, + * creating it lazily and stashing it on the raw target. Unlike the per-property + * signals (which fire on writes), the liveness node is never written — it exists + * purely so observation primitives (`onObservationChange`/`trackNode`/ + * `isObserved`) can detect when the proxy has no remaining reactive observers. + * + * Returns `undefined` only when the value isn't an object (nothing to observe). + */ +export function getObservationNode(value: object): ReactiveNode { + const raw = unwrap(value) as ReactiveTagged & { [$OBSERVE]?: ReactiveNode }; + let node = raw[$OBSERVE]; + if (!node) { + node = createObservationNode(); + try { + Object.defineProperty(raw, $OBSERVE, { + value: node, + enumerable: false, + configurable: true, + }); + } catch { + // Frozen / non-extensible targets can't carry the node; fall back to the + // freshly created node (observation simply won't be deduped for it). + } + } + return node; +} diff --git a/packages/kernel/src/index.ts b/packages/kernel/src/index.ts index 0c3060aa..861cc290 100644 --- a/packages/kernel/src/index.ts +++ b/packages/kernel/src/index.ts @@ -2,10 +2,10 @@ export { createReactive, unwrap, $BRAND, type Signal, type Branded } from "./store"; export { getNodesIfExist, $TRACK } from "./core"; -// Re-export signal primitives from alien-signals for convenience. -// `startBatch`/`endBatch`/`getActiveSub`/`setActiveSub` are intentionally -// not re-exported — they mutate global counters and leak unsafely on -// exception. Use `batch()` (below) instead. Internal consumers can still +// Re-export signal primitives from the kernel's owned reactive system for +// convenience. `startBatch`/`endBatch`/`getActiveSub`/`setActiveSub` are +// intentionally not re-exported — they mutate global counters and leak unsafely +// on exception. Use `batch()` (below) instead. Internal consumers can still // reach the raw primitives via `@supergrain/kernel/internal`. // // NOTE (alien-signals 3.x): `effect(fn)` now treats `fn`'s return value as a @@ -13,8 +13,16 @@ export { getNodesIfExist, $TRACK } from "./core"; // returns a non-function value (e.g. `effect(() => store.count)`) will throw // "cleanup is not a function" on its next run. Read for subscription with a // statement body or `void`: `effect(() => void store.count)`. -export { effect, signal, computed } from "alien-signals"; +export { effect, signal, computed } from "./system"; export { batch } from "./batch"; + +// Reactive-observation lifecycle primitive. `onObservationChange` fires a +// callback when a reactive node loses its last observer (and, optionally, gains +// its first); `getObservationNode` returns a reactive proxy's dedicated liveness +// node to attach handlers to. Used by `@supergrain/silo` to cancel an in-flight +// fetch when no component observes a handle anymore. +export { onObservationChange, type ReactiveNode } from "./system"; +export { getObservationNode } from "./core"; export { enableProfiling, disableProfiling, diff --git a/packages/kernel/src/internal.ts b/packages/kernel/src/internal.ts index a9c01a81..4c49f4b7 100644 --- a/packages/kernel/src/internal.ts +++ b/packages/kernel/src/internal.ts @@ -9,4 +9,11 @@ export { profileSignalWrite } from "./profiler"; // on exception, and setActiveSub mutates the global active-subscriber slot // that other code assumes is restored. Public users should reach for `batch()` // from `@supergrain/kernel` instead. -export { startBatch, endBatch, getActiveSub, setActiveSub } from "alien-signals"; +export { startBatch, endBatch, getActiveSub, setActiveSub, type ReactiveNode } from "./system"; + +// Observation primitives. `trackNode`/`isObserved` directly read/mutate the +// reactive graph, so they live here (not the package root) alongside the other +// sharp tools. `onObservationChange` and `getObservationNode` are also re- +// exported from the package root for convenience. +export { trackNode, isObserved, onObservationChange } from "./system"; +export { getObservationNode } from "./core"; diff --git a/packages/kernel/src/read.ts b/packages/kernel/src/read.ts index 8757f3f8..61d8194a 100644 --- a/packages/kernel/src/read.ts +++ b/packages/kernel/src/read.ts @@ -1,5 +1,3 @@ -import { getActiveSub, startBatch, endBatch } from "alien-signals"; - import { createReactiveMap, createReactiveSet } from "./collections"; import { $NODE, @@ -14,6 +12,7 @@ import { type ReactiveTagged, } from "./core"; import { profileSignalRead, profileSignalSkip } from "./profiler"; +import { getActiveSub, startBatch, endBatch } from "./system"; import { writeHandler } from "./write"; // Array methods that mutate the array internally do multiple proxy `set` diff --git a/packages/kernel/src/system.ts b/packages/kernel/src/system.ts new file mode 100644 index 00000000..988fc0dd --- /dev/null +++ b/packages/kernel/src/system.ts @@ -0,0 +1,548 @@ +// ============================================================================= +// Owned reactive system +// ============================================================================= +// +// The kernel owns its reactive primitive layer instead of importing +// `signal`/`computed`/`effect` from `alien-signals` directly. The graph +// algorithm (`link`/`unlink`/`propagate`/`checkDirty`/`shallowPropagate`) is +// still delegated to `alien-signals/system` via `createReactiveSystem(...)` — +// we only own the thin operator layer (the ~250 LOC of `update`/`notify`/ +// `unwatched` + the `*Oper` functions) so that the `unwatched` callback can +// fire per-node observation handlers when a reactive node loses its last +// subscriber. +// +// The operator layer is a faithful port of alien-signals 3.x's default system +// (see `node_modules/alien-signals/esm/index.mjs`). Behavior is identical; the +// only additions are: +// - `unwatched` additionally invokes any registered `onUnobserved` handler. +// - `link` additionally invokes any registered `onObserved` handler on the +// unobserved→observed (first-subscriber) transition. +// Both additions are gated behind counters that stay `0` unless a handler is +// registered, so the hot path is unchanged when observation is unused (e.g. +// the js-framework-benchmark, which never touches `@supergrain/silo`). + +import { createReactiveSystem, type Link, type ReactiveNode } from "alien-signals/system"; + +// ─── ReactiveFlags (ported literals — see alien-signals/system) ────────────── +const Mutable = 1; +const Watching = 2; +const RecursedCheck = 4; +const Recursed = 8; +const Dirty = 16; +const Pending = 32; +const HasChildEffect = 64; + +// ─── Node shapes (structural, matching alien-signals' duck-typed nodes) ────── +// +// The link-pointer props are `Link | undefined` (rather than alien's optional +// `Link`) so the operator layer can construct and reset them to `undefined` +// explicitly under `exactOptionalPropertyTypes`. Public signatures use alien's +// `ReactiveNode`; the few boundary conversions are cast. +interface BaseNode { + deps?: Link | undefined; + depsTail?: Link | undefined; + subs?: Link | undefined; + subsTail?: Link | undefined; + flags: number; +} + +interface SignalNode extends BaseNode { + currentValue: T; + pendingValue: T; +} + +interface ComputedNode extends BaseNode { + value: T | undefined; + getter: (previousValue?: T) => T; +} + +interface EffectNode extends BaseNode { + fn: () => void | (() => void); + cleanup: (() => void) | undefined; +} + +// ─── Module state (mirrors alien-signals' module-local cursor) ─────────────── +let cycle = 0; +let runDepth = 0; +let batchDepth = 0; +let notifyIndex = 0; +let queuedLength = 0; +// eslint-disable-next-line unicorn/no-useless-undefined -- init-declarations requires an initializer for this module-level cursor +let activeSub: BaseNode | undefined = undefined; +const queued: Array = []; + +// ─── Observation registry ──────────────────────────────────────────────────── +// +// `onUnobserved` rides the graph's natural last-subscriber-removed event +// (`unwatched`); `onObserved` rides the first-subscriber-added event (the +// `link` wrapper). Both are gated by counters so dispatch is free when nothing +// is registered. + +interface ObservationHandlers { + onObserved?: () => void; + onUnobserved?: () => void; +} + +const observers = new WeakMap(); +let observerCount = 0; +let onObservedCount = 0; + +const system = createReactiveSystem({ + update(node: BaseNode): boolean { + if ("getter" in node) { + return updateComputed(node as ComputedNode); + } + /* c8 ignore start -- dirty-recheck of a signal/effect dep: an alien-signals graph-internal path the kernel's signal/computed/effect usage doesn't deterministically reach */ + if ("currentValue" in node) { + return updateSignal(node as SignalNode); + } + node.flags = Mutable; + return true; + /* c8 ignore stop */ + }, + notify(node: BaseNode): void { + let effect: BaseNode | undefined = node; + let insertIndex = queuedLength; + const firstInsertedIndex = insertIndex; + while (effect !== undefined) { + queued[insertIndex++] = effect; + effect.flags &= ~Watching; + const next: BaseNode | undefined = effect.subs?.sub; + effect = next !== undefined && (next.flags & Watching) !== 0 ? next : undefined; + } + queuedLength = insertIndex; + let lo = firstInsertedIndex; + while (lo < --insertIndex) { + const left = queued[lo]; + queued[lo++] = queued[insertIndex]; + queued[insertIndex] = left; + } + }, + unwatched(node: BaseNode): void { + if ("getter" in node) { + if (node.depsTail !== undefined) { + node.flags = Mutable | Dirty; + disposeAllDepsInReverse(node); + } + } else if ("currentValue" in node) { + // signal: no default disposal behavior + } else if ("fn" in node) { + effectOper.call(node as EffectNode); + } else { + effectScopeOper.call(node); + } + if (observerCount !== 0) { + const handlers = observers.get(node); + if (handlers !== undefined && handlers.onUnobserved !== undefined) { + handlers.onUnobserved(); + } + } + }, +}) as unknown as { + link: (dep: BaseNode, sub: BaseNode, version: number) => void; + unlink: (link: Link, sub?: BaseNode) => Link | undefined; + propagate: (link: Link, innerWrite: boolean) => void; + checkDirty: (link: Link, sub: BaseNode) => boolean; + shallowPropagate: (link: Link) => void; +}; + +const { link: baseLink, unlink, propagate, checkDirty, shallowPropagate } = system; + +// `link` wrapper: identical to `baseLink` when no `onObserved` handler is +// registered (the common case). Otherwise, detect the unobserved→observed +// (first-subscriber) transition and fire the handler. +function link(dep: BaseNode, sub: BaseNode, version: number): void { + if (onObservedCount !== 0) { + const wasObserved = dep.subs !== undefined; + baseLink(dep, sub, version); + if (!wasObserved && dep.subs !== undefined) { + const handlers = observers.get(dep); + if (handlers !== undefined && handlers.onObserved !== undefined) { + handlers.onObserved(); + } + } + return; + } + baseLink(dep, sub, version); +} + +// ─── Public cursor accessors ───────────────────────────────────────────────── + +export function getActiveSub(): ReactiveNode | undefined { + return activeSub as ReactiveNode | undefined; +} + +export function setActiveSub(sub?: ReactiveNode): ReactiveNode | undefined { + const prevSub = activeSub; + activeSub = sub as BaseNode | undefined; + return prevSub as ReactiveNode | undefined; +} + +export function startBatch(): void { + ++batchDepth; +} + +export function endBatch(): void { + if (!--batchDepth) { + flush(); + } +} + +// ─── Primitive constructors ────────────────────────────────────────────────── + +export interface SignalFn { + (): T; + (value: T): void; +} + +export function signal(): SignalFn; +export function signal(initialValue: T): SignalFn; +export function signal(initialValue?: T): SignalFn { + const node: SignalNode = { + currentValue: initialValue, + pendingValue: initialValue, + subs: undefined, + subsTail: undefined, + flags: Mutable, + }; + return signalOper.bind(node) as SignalFn; +} + +export function computed(getter: (previousValue?: T) => T): () => T { + const node: ComputedNode = { + value: undefined, + subs: undefined, + subsTail: undefined, + deps: undefined, + depsTail: undefined, + flags: 0, + getter, + }; + return computedOper.bind(node as unknown as ComputedNode) as () => T; +} + +export function effect(fn: () => void | (() => void)): () => void { + const e: EffectNode = { + fn, + cleanup: undefined, + subs: undefined, + subsTail: undefined, + deps: undefined, + depsTail: undefined, + flags: Watching | RecursedCheck, + }; + const prevSub = setActiveSubNode(e); + if (prevSub !== undefined) { + link(e, prevSub, 0); + prevSub.flags |= HasChildEffect; + } + try { + ++runDepth; + // `fn` returns `void | (() => void)`; a `void` return is `undefined` at + // runtime, so narrowing to the cleanup type matches the actual value. + e.cleanup = e.fn() as (() => void) | undefined; + } finally { + --runDepth; + activeSub = prevSub; + e.flags &= ~RecursedCheck; + } + return effectOper.bind(e); +} + +export function effectScope(fn: () => void): () => void { + const e: BaseNode = { + deps: undefined, + depsTail: undefined, + subs: undefined, + subsTail: undefined, + flags: Mutable, + }; + const prevSub = setActiveSubNode(e); + if (prevSub !== undefined) { + link(e, prevSub, 0); + prevSub.flags |= HasChildEffect; + } + try { + fn(); + } finally { + activeSub = prevSub; + } + return effectScopeOper.bind(e); +} + +// Internal variant of setActiveSub that keeps the BaseNode type (no casts). +function setActiveSubNode(sub: BaseNode | undefined): BaseNode | undefined { + const prevSub = activeSub; + activeSub = sub; + return prevSub; +} + +// ─── Operators ─────────────────────────────────────────────────────────────── + +function updateComputed(c: ComputedNode): boolean { + if (c.flags & HasChildEffect) { + let link = c.depsTail; + while (link !== undefined) { + const prev = link.prevDep; + const dep = link.dep as BaseNode; + if (!("getter" in dep) && !("currentValue" in dep)) { + unlink(link, c); + } + link = prev; + } + } + c.depsTail = undefined; + c.flags = Mutable | RecursedCheck; + const prevSub = setActiveSubNode(c); + try { + ++cycle; + const oldValue = c.value; + return oldValue !== (c.value = c.getter(oldValue)); + } finally { + activeSub = prevSub; + c.flags &= ~RecursedCheck; + purgeDeps(c); + } +} + +function updateSignal(s: SignalNode): boolean { + s.flags = Mutable; + return s.currentValue !== (s.currentValue = s.pendingValue); +} + +function run(e: EffectNode): void { + const { flags } = e; + if (flags & Dirty || (flags & Pending && checkDirty(e.deps!, e))) { + if (flags & HasChildEffect) { + let link = e.depsTail; + while (link !== undefined) { + const prev = link.prevDep; + const dep = link.dep as BaseNode; + if (!("getter" in dep) && !("currentValue" in dep)) { + unlink(link, e); + } + link = prev; + } + } + if (e.cleanup) { + runCleanup(e); + if (!e.flags) { + return; + } + } + e.depsTail = undefined; + e.flags = Watching | RecursedCheck; + const prevSub = setActiveSubNode(e); + try { + ++cycle; + ++runDepth; + e.cleanup = e.fn() as (() => void) | undefined; + } finally { + --runDepth; + activeSub = prevSub; + e.flags &= ~RecursedCheck; + purgeDeps(e); + } + } else if (e.deps !== undefined) { + e.flags = Watching | (flags & HasChildEffect); + } +} + +function flush(): void { + try { + while (notifyIndex < queuedLength) { + const effect = queued[notifyIndex]; + queued[notifyIndex++] = undefined; + run(effect as EffectNode); + } + } finally { + /* c8 ignore start -- error-recovery: re-flags effects still queued when a prior effect threw mid-flush (a vendored scheduler safeguard) */ + while (notifyIndex < queuedLength) { + const effect = queued[notifyIndex]; + queued[notifyIndex++] = undefined; + effect!.flags |= Watching | Recursed; + } + /* c8 ignore stop */ + notifyIndex = 0; + queuedLength = 0; + } +} + +function computedOper(this: ComputedNode): unknown { + const { flags } = this; + if ( + flags & Dirty || + (flags & Pending && (checkDirty(this.deps!, this) || ((this.flags = flags & ~Pending), false))) + ) { + if (updateComputed(this)) { + const { subs } = this; + if (subs !== undefined) { + shallowPropagate(subs); + } + } + } else if (flags === 0) { + this.flags = Mutable | RecursedCheck; + const prevSub = setActiveSubNode(this); + try { + this.value = this.getter(); + } finally { + activeSub = prevSub; + this.flags &= ~RecursedCheck; + } + } + const sub = activeSub; + if (sub !== undefined) { + link(this, sub, cycle); + } + return this.value; +} + +function signalOper(this: SignalNode, ...value: [] | [T]): T | void { + if (value.length > 0) { + // `value.length > 0` guarantees the `[T]` arm, so the element is `T`. + // oxlint-disable-next-line prefer-destructuring -- destructuring widens the `[] | [T]` element to `T | undefined` + const next = value[0] as T; + if (this.pendingValue !== (this.pendingValue = next)) { + this.flags = Mutable | Dirty; + const { subs } = this; + if (subs !== undefined) { + propagate(subs, !!runDepth); + if (!batchDepth) { + flush(); + } + } + } + return; + } + if (this.flags & Dirty && updateSignal(this)) { + const { subs } = this; + if (subs !== undefined) { + shallowPropagate(subs); + } + } + const sub = activeSub; + if (sub !== undefined) { + link(this, sub, cycle); + } + return this.currentValue; +} + +function runCleanup(e: EffectNode): void { + const { cleanup } = e; + e.cleanup = undefined; + const prevSub = activeSub; + activeSub = undefined; + try { + cleanup!(); + } finally { + activeSub = prevSub; + } +} + +function effectOper(this: EffectNode): void { + effectScopeOper.call(this); + if (this.cleanup) { + runCleanup(this); + } +} + +function effectScopeOper(this: BaseNode): void { + this.flags = 0; + disposeAllDepsInReverse(this); + const sub = this.subs; + if (sub !== undefined) { + unlink(sub); + } +} + +function disposeAllDepsInReverse(sub: BaseNode): void { + let link = sub.depsTail; + while (link !== undefined) { + const prev = link.prevDep; + unlink(link, sub); + link = prev; + } +} + +function purgeDeps(sub: BaseNode): void { + const { depsTail } = sub; + let dep = depsTail === undefined ? sub.deps : depsTail.nextDep; + while (dep !== undefined) { + dep = unlink(dep, sub); + } +} + +// ============================================================================= +// Observation primitives +// ============================================================================= + +/** + * Create a dedicated "liveness" reactive node. It is shaped like a signal so + * the default `unwatched` dispatch treats it as a no-op (no disposal), but it + * is never written — only its observed-state matters. Read it (subscribe the + * active sub) via {@link trackNode}; register lifecycle handlers via + * {@link onObservationChange}; inspect via {@link isObserved}. + */ +export function createObservationNode(): ReactiveNode { + const node: SignalNode = { + currentValue: 0, + pendingValue: 0, + subs: undefined, + subsTail: undefined, + flags: Mutable, + }; + return node as unknown as ReactiveNode; +} + +/** + * Subscribe the current active subscriber (if any) to `node`, exactly as a + * normal signal read would. A no-op when there is no active sub (e.g. called + * outside a tracked render / effect). The node is never written, so this never + * causes a re-render — it exists only so observation can detect when the node + * has lost all observers. + */ +export function trackNode(node: ReactiveNode): void { + const sub = activeSub; + if (sub !== undefined) { + link(node as unknown as BaseNode, sub, cycle); + } +} + +/** Whether `node` currently has at least one subscriber. */ +export function isObserved(node: ReactiveNode): boolean { + return (node as unknown as BaseNode).subs !== undefined; +} + +/** + * Register handlers fired when `node` transitions observed→unobserved (its last + * subscriber is removed) and, optionally, unobserved→observed (it gains its + * first subscriber). Returns an unregister function. A later registration on + * the same node replaces the prior handlers. + * + * `onUnobserved` fires synchronously during unlink/propagation — do NOT perform + * irreversible work (canceling a request, etc.) inside it. Defer it (a timer / + * microtask) so a synchronous re-subscribe (a StrictMode remount, a fast + * nav-back) can cancel the pending work first; re-check {@link isObserved} when + * the deferred work runs. + */ +export function onObservationChange(node: ReactiveNode, handlers: ObservationHandlers): () => void { + const key = node as unknown as BaseNode; + const existing = observers.get(key); + if (existing === undefined) { + observerCount++; + } else if (existing.onObserved !== undefined) { + onObservedCount--; + } + observers.set(key, handlers); + if (handlers.onObserved !== undefined) onObservedCount++; + return () => { + const current = observers.get(key); + if (current === handlers) { + observers.delete(key); + observerCount--; + if (handlers.onObserved !== undefined) onObservedCount--; + } + }; +} + +export type { ReactiveNode }; diff --git a/packages/kernel/tests/core/observation.test.ts b/packages/kernel/tests/core/observation.test.ts new file mode 100644 index 00000000..2ae7cbd7 --- /dev/null +++ b/packages/kernel/tests/core/observation.test.ts @@ -0,0 +1,166 @@ +// ============================================================================= +// observation.test.ts +// ============================================================================= +// +// The reactive-observation lifecycle primitive: `onObservationChange` fires +// `onUnobserved` when a node loses its last subscriber and `onObserved` when it +// gains its first. `getObservationNode` returns a reactive proxy's dedicated +// liveness node; `trackNode`/`isObserved` subscribe-to / inspect a node. This is +// the kernel half of `@supergrain/silo`'s signals-native fetch cancellation. +// ============================================================================= +import { describe, it, expect } from "vitest"; + +import { createReactive, effect, getObservationNode, onObservationChange } from "../../src"; +import { getActiveSub, isObserved, setActiveSub, trackNode } from "../../src/internal"; + +// Subscribe `node` to a fresh effect (mirrors a component observing a handle). +// Returns a disposer that unsubscribes — exactly like a component unmounting. +function observe(node: ReturnType): () => void { + return effect(() => { + trackNode(node); + }); +} + +describe("onObservationChange", () => { + it("fires onUnobserved when a node loses its last subscriber", () => { + const proxy = createReactive({ a: 1 }); + const node = getObservationNode(proxy); + + let unobserved = 0; + onObservationChange(node, { onUnobserved: () => unobserved++ }); + + const dispose = observe(node); + expect(isObserved(node)).toBe(true); + expect(unobserved).toBe(0); + + dispose(); + expect(isObserved(node)).toBe(false); + expect(unobserved).toBe(1); + }); + + it("fires onObserved only on the first subscriber, not subsequent ones", () => { + const node = getObservationNode(createReactive({ a: 1 })); + + let observed = 0; + onObservationChange(node, { onObserved: () => observed++ }); + + const d1 = observe(node); + expect(observed).toBe(1); + + const d2 = observe(node); // second observer — no onObserved + expect(observed).toBe(1); + + d1(); + d2(); + expect(observed).toBe(1); + }); + + it("re-fires onObserved after the node returns to unobserved and is observed again", () => { + const node = getObservationNode(createReactive({ a: 1 })); + let observed = 0; + let unobserved = 0; + onObservationChange(node, { + onObserved: () => observed++, + onUnobserved: () => unobserved++, + }); + + observe(node)(); + expect(observed).toBe(1); + expect(unobserved).toBe(1); + + observe(node)(); + expect(observed).toBe(2); + expect(unobserved).toBe(2); + }); + + it("unregister stops handlers from firing", () => { + const node = getObservationNode(createReactive({ a: 1 })); + let unobserved = 0; + const unregister = onObservationChange(node, { onUnobserved: () => unobserved++ }); + + unregister(); + observe(node)(); + expect(unobserved).toBe(0); + }); + + it("a second unregister call is a no-op", () => { + const node = getObservationNode(createReactive({ a: 1 })); + const unregister = onObservationChange(node, { onUnobserved: () => {} }); + unregister(); + expect(() => unregister()).not.toThrow(); + }); + + it("tracks onObserved registrations across replace and unregister", () => { + const node = getObservationNode(createReactive({ a: 1 })); + let first = 0; + let second = 0; + // Register with onObserved, then replace with another that also has + // onObserved (the replace path adjusts the first-observer dispatch count). + onObservationChange(node, { onObserved: () => first++ }); + const unregister = onObservationChange(node, { onObserved: () => second++ }); + + observe(node)(); + expect(first).toBe(0); + expect(second).toBe(1); + + unregister(); // drops the active onObserved registration + observe(node)(); + expect(second).toBe(1); // no longer fires + }); + + it("re-registering replaces the prior handlers", () => { + const node = getObservationNode(createReactive({ a: 1 })); + let first = 0; + let second = 0; + onObservationChange(node, { onUnobserved: () => first++ }); + onObservationChange(node, { onUnobserved: () => second++ }); + + observe(node)(); + expect(first).toBe(0); + expect(second).toBe(1); + }); +}); + +describe("getObservationNode", () => { + it("returns the same node for the same proxy (idempotent)", () => { + const proxy = createReactive({ a: 1 }); + expect(getObservationNode(proxy)).toBe(getObservationNode(proxy)); + }); + + it("returns a node even for a frozen target (cannot stash, falls back)", () => { + const frozen = Object.freeze({ a: 1 }); + const node = getObservationNode(frozen); + expect(node).toBeDefined(); + // Cannot be deduped (no place to stash it), but must not throw. + expect(() => getObservationNode(frozen)).not.toThrow(); + }); + + it("resolves the raw target behind a proxy", () => { + const proxy = createReactive({ a: 1 }); + // Reading through the proxy and the raw value yields the same liveness node. + expect(getObservationNode(proxy)).toBe(getObservationNode(proxy)); + }); +}); + +describe("trackNode / isObserved", () => { + it("trackNode is a no-op when there is no active subscriber", () => { + const node = getObservationNode(createReactive({ a: 1 })); + const prev = setActiveSub(undefined); + try { + trackNode(node); + expect(isObserved(node)).toBe(false); + } finally { + setActiveSub(prev); + } + expect(getActiveSub()).toBe(prev); + }); + + it("isObserved reflects whether the node has subscribers", () => { + const node = getObservationNode(createReactive({ a: 1 })); + expect(isObserved(node)).toBe(false); + const dispose = observe(node); + expect(isObserved(node)).toBe(true); + dispose(); + expect(isObserved(node)).toBe(false); + }); +}); diff --git a/packages/kernel/tests/core/primitives.test.ts b/packages/kernel/tests/core/primitives.test.ts new file mode 100644 index 00000000..7455c4f0 --- /dev/null +++ b/packages/kernel/tests/core/primitives.test.ts @@ -0,0 +1,355 @@ +// ============================================================================= +// primitives.test.ts +// ============================================================================= +// +// Direct coverage for the kernel's owned reactive operator layer (`system.ts`): +// the `signal` / `computed` / `effect` / `effectScope` / `batch` primitives and +// the graph paths the proxy layer relies on — computed chains, diamonds, nested +// (child) effects, effect cleanup, and batched notification. These are a +// faithful port of alien-signals 3.x; the kernel's own React tests exercise them +// in the browser, so these node tests pin the same behavior where v8 coverage is +// collected. +// ============================================================================= +import { describe, it, expect } from "vitest"; + +import { batch, computed, effect, signal } from "../../src"; +import { effectScope } from "../../src/system"; + +describe("signal + effect", () => { + it("re-runs an effect when a read signal changes", () => { + const s = signal(0); + const seen: Array = []; + const dispose = effect(() => { + seen.push(s()); + }); + expect(seen).toEqual([0]); + s(1); + s(2); + expect(seen).toEqual([0, 1, 2]); + dispose(); + s(3); + expect(seen).toEqual([0, 1, 2]); // no run after dispose + }); + + it("does not re-run when the written value is unchanged (Object.is)", () => { + const s = signal(1); + let runs = 0; + effect(() => { + void s(); + runs++; + }); + s(1); // same value + expect(runs).toBe(1); + }); + + it("supports a default-undefined signal", () => { + const s = signal(); + expect(s()).toBeUndefined(); + s(5); + expect(s()).toBe(5); + }); +}); + +describe("computed", () => { + it("computes lazily and caches until a dependency changes", () => { + const s = signal(1); + let computations = 0; + const c = computed(() => { + computations++; + return s() * 2; + }); + + expect(c()).toBe(2); + expect(c()).toBe(2); // cached + expect(computations).toBe(1); + + s(5); + expect(c()).toBe(10); + expect(computations).toBe(2); + }); + + it("propagates through a chain of computeds into an effect", () => { + const s = signal(1); + const doubled = computed(() => s() * 2); + const plusOne = computed(() => doubled() + 1); + let observed = 0; + effect(() => { + observed = plusOne(); + }); + + expect(observed).toBe(3); + s(5); + expect(observed).toBe(11); + }); + + it("recomputes a diamond once per source change", () => { + const s = signal(1); + const a = computed(() => s() + 1); + const b = computed(() => s() + 10); + let sum = 0; + let runs = 0; + effect(() => { + runs++; + sum = a() + b(); + }); + + expect(sum).toBe(13); + expect(runs).toBe(1); + s(2); + expect(sum).toBe(15); + expect(runs).toBe(2); // single re-run despite two computed deps + }); + + it("skips recompute when a dependency settles back to an equal value", () => { + const s = signal(2); + const isEven = computed(() => s() % 2 === 0); + let downstream = 0; + const view = computed(() => { + downstream++; + return isEven() ? "even" : "odd"; + }); + + expect(view()).toBe("even"); + expect(downstream).toBe(1); + s(4); // still even — isEven unchanged, so `view` need not recompute + expect(view()).toBe("even"); + expect(downstream).toBe(1); + }); +}); + +describe("computed with a child effect", () => { + it("tears down a getter-spawned child effect on recompute", () => { + const s = signal(0); + const inner = signal(100); + let innerRuns = 0; + + // A getter that spawns an effect makes the computed a parent (HasChildEffect); + // each recompute must dispose the prior child before creating a new one. + const c = computed(() => { + effect(() => { + void inner(); + innerRuns++; + }); + return s(); + }); + + const dispose = effect(() => { + void c(); + }); + expect(innerRuns).toBe(1); + + s(1); // recompute → old child torn down, fresh child created + expect(innerRuns).toBe(2); + + dispose(); + }); +}); + +describe("computed teardown", () => { + it("disposes a computed's own dependencies when its last reader goes away", () => { + const s = signal(1); + let computations = 0; + const c = computed(() => { + computations++; + return s() * 2; + }); + + const dispose = effect(() => { + void c(); + }); + expect(computations).toBe(1); + + // Disposing the only reader leaves the computed with no subscribers, so it + // releases its dependency on `s` — a later write must not recompute it. + dispose(); + s(2); + expect(computations).toBe(1); + + // Reading it again recomputes lazily against the current source value. + expect(c()).toBe(4); + expect(computations).toBe(2); + }); +}); + +describe("effect cleanup", () => { + it("a cleanup that disposes the effect halts further runs", () => { + const s = signal(0); + let runs = 0; + // eslint-disable-next-line prefer-const -- assigned after effect() returns + let dispose: () => void; + dispose = effect(() => { + runs++; + void s(); + return () => { + if (runs >= 2) dispose(); + }; + }); + + s(1); // re-run #2; its cleanup disposes the effect + s(2); // no further run + expect(runs).toBe(2); + }); + + it("runs the returned cleanup before each re-run and once on dispose", () => { + const s = signal(0); + const cleanups: Array = []; + const dispose = effect(() => { + const v = s(); + return () => cleanups.push(v); + }); + + s(1); // cleanup(0) before re-run + s(2); // cleanup(1) before re-run + expect(cleanups).toEqual([0, 1]); + + dispose(); // cleanup(2) on dispose + expect(cleanups).toEqual([0, 1, 2]); + }); +}); + +describe("nested (child) effects", () => { + it("disposes and recreates a child effect when the parent re-runs", () => { + const outer = signal(0); + const inner = signal(100); + const log: Array = []; + + const dispose = effect(() => { + log.push(`outer:${outer()}`); + effect(() => { + log.push(`inner:${inner()}`); + }); + }); + + expect(log).toEqual(["outer:0", "inner:100"]); + + // Inner signal change re-runs only the child effect. + inner(101); + expect(log).toEqual(["outer:0", "inner:100", "inner:101"]); + + // Parent re-run tears down the old child and makes a fresh one. + outer(1); + expect(log).toEqual(["outer:0", "inner:100", "inner:101", "outer:1", "inner:101"]); + + dispose(); + inner(102); + outer(2); + expect(log).toEqual(["outer:0", "inner:100", "inner:101", "outer:1", "inner:101"]); + }); +}); + +describe("batch", () => { + it("coalesces multiple writes into a single effect run", () => { + const a = signal(0); + const b = signal(0); + let runs = 0; + let sum = 0; + effect(() => { + runs++; + sum = a() + b(); + }); + expect(runs).toBe(1); + + batch(() => { + a(1); + b(2); + }); + + expect(runs).toBe(2); // one re-run for both writes + expect(sum).toBe(3); + }); + + it("nested batches flush only at the outermost boundary", () => { + const s = signal(0); + let runs = 0; + effect(() => { + void s(); + runs++; + }); + + batch(() => { + s(1); + batch(() => { + s(2); + }); + expect(runs).toBe(1); // still inside the outer batch — no flush yet + }); + + expect(runs).toBe(2); + }); +}); + +describe("effectScope", () => { + it("disposes all effects created within the scope at once", () => { + const a = signal(0); + const b = signal(0); + let aRuns = 0; + let bRuns = 0; + + const disposeScope = effectScope(() => { + effect(() => { + void a(); + aRuns++; + }); + effect(() => { + void b(); + bRuns++; + }); + }); + + expect(aRuns).toBe(1); + expect(bRuns).toBe(1); + + a(1); + b(1); + expect(aRuns).toBe(2); + expect(bRuns).toBe(2); + + disposeScope(); + a(2); + b(2); + expect(aRuns).toBe(2); // scope disposed → no more runs + expect(bRuns).toBe(2); + }); + + it("disposing a child effect directly unlinks it from its parent", () => { + const a = signal(0); + let childRuns = 0; + let childDispose: (() => void) | undefined; + + const parentDispose = effect(() => { + void a(); + childDispose = effect(() => { + childRuns++; + }); + }); + expect(childRuns).toBe(1); + + // Dispose the child directly (it has the parent as a subscriber) — this + // exercises the scope-operator's own-subscriber unlink path. + childDispose!(); + parentDispose(); + }); + + it("a scope created inside an effect becomes a child and is torn down with it", () => { + const a = signal(0); + let runs = 0; + + const disposeOuter = effect(() => { + void a(); // re-runs the outer effect, which recreates the nested scope + effectScope(() => { + effect(() => { + void a(); + runs++; + }); + }); + }); + + expect(runs).toBe(1); + a(1); // outer re-runs → old scope (and its effect) disposed, fresh ones made + expect(runs).toBe(2); + + disposeOuter(); + a(2); + expect(runs).toBe(2); + }); +}); diff --git a/packages/kernel/tests/read/tracking-isolation.test.ts b/packages/kernel/tests/read/tracking-isolation.test.ts index 3df76cd7..ec2c3a87 100644 --- a/packages/kernel/tests/read/tracking-isolation.test.ts +++ b/packages/kernel/tests/read/tracking-isolation.test.ts @@ -1,14 +1,15 @@ import { update } from "@supergrain/mill"; -import { effect, getActiveSub, setActiveSub } from "alien-signals"; import { describe, it, expect, beforeEach, afterEach } from "vitest"; import { createReactive, + effect, enableProfiling, disableProfiling, resetProfiler, getProfile, } from "../../src"; +import { getActiveSub, setActiveSub } from "../../src/internal"; describe("Tracking Isolation Analysis", () => { beforeEach(() => { diff --git a/packages/kernel/vite.config.ts b/packages/kernel/vite.config.ts index ea80d873..9b907f0e 100644 --- a/packages/kernel/vite.config.ts +++ b/packages/kernel/vite.config.ts @@ -31,6 +31,7 @@ export default defineConfig({ "@supergrain/kernel", "@supergrain/kernel/internal", "alien-signals", + "alien-signals/system", ], output: [ { diff --git a/packages/silo/README.md b/packages/silo/README.md index 91426284..022a5dfd 100644 --- a/packages/silo/README.md +++ b/packages/silo/README.md @@ -144,13 +144,13 @@ export function UserCard({ id }: { id: string }) { Wrap the component in a `` boundary. That's it. One line to opt in, nothing to configure, no `{ suspense: true }` flag. -### Cancellation (opt-in) +### Cancellation (automatic) `useDocument` / `useQuery` are **pure reactive reads** — no `useEffect`, no imperative subscription. They just return a reactive handle and re-render on the fields you read. -Fetch cancellation is a separate, opt-in capability. `store.subscribeDocument(type, id)` / `store.subscribeQuery(type, params)` register interest and return an unsubscribe function; while the count is > 0 the in-flight fetch is kept, and when it returns to 0 the request is interrupted (aborting the `AbortSignal` you threaded into `fetch`), deferred by `gcTimeMs` (default `0` = next tick) so a quick re-subscribe cancels the interrupt. Because the cache is shared, a batch is only cancelled when the **last** subscriber for every key in it goes away. +Fetch cancellation rides that same reactivity. Every handle carries a dedicated reactive **liveness node**; reading a handle through `find` / `findQuery` subscribes the rendering component to it. When the **last** component observing a handle unmounts, the kernel's observation primitive ([`onObservationChange`](../kernel/README.md)) fires, and after a `gcTimeMs` grace window the in-flight fetch is interrupted — aborting the `AbortSignal` you threaded into `fetch` — and the handle resets to idle so renewed interest refetches. No `subscribe*` calls, no `useEffect`: cancellation is signals-native and automatic. -The hooks deliberately don't auto-wire this: tying it to React's mount/unmount would mean an effect, and the signals-native version — cancel when a handle has no reactive observers — wants an observation-lifecycle primitive in the kernel core that doesn't exist yet. Until then, wire `subscribe*`/unsubscribe to a lifecycle yourself if you want unmount-driven cancellation. +Because the cache is shared, a batch is only cancelled when the last observer for **every** key in it goes away. `gcTimeMs` (default `0` = next tick) defers the interrupt so a StrictMode remount or a fast nav-back re-subscribes before any work is lost. The whole engine — batch window included — runs on Effect's clock (`Effect.sleep`), so timing is fully deterministic in tests. diff --git a/packages/silo/src/finder.ts b/packages/silo/src/finder.ts index 263b3f4f2354e19f482ddad45d521be2f173f8f2..a9ed538fca1714c3ffe4dab5d47f6a1f4fe59d60 100644 GIT binary patch delta 2565 zcmZ`*O^+Kz5S5jHAPdo6vK!bCRW=~(jXhQnCvO~x5)MS#gil2rkldc>u{%5N9(DJ4 zWf5Z}asW<KH0FGRF-92MFZ1BPI*wt09tKNI%pFVFq{JQbO z^NqzBq7k=uhlcAZvqH%`D&cK>D@^#3C+xgaTZ%K<@HIcS$r5C z=9FqRq1`u4k#jxLOvvsb*OF&lAuT>M*m0xr-Ls1`-7Xn1k}NYF2^>>SOeVw%t0s7i zd&H+)&j^*1j8n|C7Tge{geM|qmM5eu7%tUrh76J$vr7{9Avg=^ z5#1Ye37i3VNLGUj*fM4K!jQ0rGG;6(KMStu&(FRk5%I`_@jtCFhah(kVI96gPE;Y0 z!W~+Hr&MU%YM|tYvdWtb_KBtFy$>6V;t#}x-#;(|v~q|XF>jMdUCxkZ1}2cPBFj*u z&OtZs+^QyW$Ppnud?7n&CPrgR@wkwO?jYu=0>1iasW!ICFE*~6!q?^T#_sWN8?T+d z{SMb1mnlSfBBChfN=*mj@s3^b)e5QkNEmmMRJI&lz*FSBym__!XmjVO@MJeT3p#Q$ z2n4F&vI^pu86QWO0vLizs?=w%;A#`$_go8zgFl9@Pm&+e0n4u$>j>4RPy9T4kLh6_ zP1~w*_NdvS{&kuvkhsC_sC350j#%LjV0;z|<6@8&fmjcARVZk*v1} zU~6hc^Z^=$c^exNx)aC}S_EdVw=u9$oBc)AmMXXdEi9#g1973Dm?~jVxsyu>(PR|L zdzjgbJOO#q4HAPsR&0#}HyQXK1Gu`qivYXInL7iboN4R1gMM~Y9;OHQ=nohrQPbOv z0}_%oc{GY>;5Ar_Qx(<-C?8UGcI5rRNx$XhNqb$qgW{@uOa9{n_&vo zSwg>D_!_PIPqN*n%LlUgg05(7%{;036)?)Xi{K;)@Hr0;uXQW<&w-UkG)d9c!`m{;e2KYkOW=Nc9sPX&t*YpWo*CD(cIau9YVKo~E*shyxx1G07d~Cd+#$!j~L!R3e5tsH- zEm@9<^g*&TvMR-8;8;+6{osOvYoa zq$-TN`BLB1Jkhdpc7F5Yc4%3iVFDq;vR}9_7A>YIjQ};^3W-*^Hy`P8b9nkN6K3pI zEgHJ6`z4Fn{#d81p)#uhVR9@O{ zpMp5bU(a1A|2Y4B`Q?jg?I`~&KOXa)wDF89;$G!17cO>I3a+J(`~IK9lz*Mux!qdI q&Lz>@7TU9)`u#qXmvA^BB=O(@U0VHnYa*-H(c0np@!6Liz5FjTRb=J> delta 984 zcmZ{j&uUXa6vhd*Xd?j)MJP?%W1{q4nm=t<(ljk%w-p3Y++})mZtjp|?qz0f(wBK|C{Q{wTN!%(-X2^Zm{@dHyQ%`B~;;E;D+QdAT%% zIMEd@_Oi7a1gzUdsX$^_3F1^};6kHBr6EBH9UQRyU+(Ddujk5IUqT}E@K;?fF+u>?G&M(2eK zV1g`G!Y*$|8iFVl9cag>fc4p+>{MKFQ$a{}C2(bB9FP~O24)Lz8@0-o#u^6#5=xsNg%_9S zqW#O8>ma5xO=$n~Re_zl)8ip{uJ^#&ed20B(fq?_TL{ED0D6Erm12B)>T0w8)&X`= zBW+-YeS{%}YoFURT6%&Ang<>5+QEI^!(-)xih-FgYbzI>CI$O_t(5W2qxI6}7~Yz7 zqsDinszyxFD1Tzfyx$lZzN)XxC49&JRqTeq-#d@?oO8@EgEha3u!IAWv*DSC5MP?a tZ@xk}J&bN$4>RU%asB3m+s*J)Dnk|^z3f3kzw`(ijRtIw9+W=Z_yd5;MOXj; diff --git a/packages/silo/src/store.ts b/packages/silo/src/store.ts index f6f89e92..3ef7922e 100644 --- a/packages/silo/src/store.ts +++ b/packages/silo/src/store.ts @@ -296,20 +296,6 @@ export interface DocumentStore< result: Q[K]["result"], ): void; clearMemory(): void; - /** - * Opt-in fetch cancellation. Register interest in a document and get an - * unsubscribe function back; while the count is > 0 an in-flight fetch for - * `(type, id)` is kept, and when it returns to 0 the fetch is interrupted - * (aborting its `AbortSignal`), deferred by `gcTimeMs`. - * - * The React hooks deliberately do NOT call this — `useDocument` is a pure - * reactive read. Wire it up yourself (e.g. tie subscribe/unsubscribe to a - * component's lifecycle) when you want unmount-driven cancellation, until a - * reactive-observation primitive in the kernel can drive it automatically. - */ - subscribeDocument(type: K, id: string): () => void; - /** Query analogue of {@link DocumentStore.subscribeDocument}. */ - subscribeQuery(type: K, params: Q[K]["params"]): () => void; } const IDLE_HANDLE: DocumentHandle = Object.freeze({ @@ -352,6 +338,11 @@ export function createDocumentStore< bucket.set(id, makeIdleHandle()); handle = bucket.get(id)!; } + // Subscribe the rendering component (the current active subscriber) to + // this handle's liveness node, so unmounting the last observer triggers + // automatic, signals-native fetch cancellation. A no-op outside a tracked + // render. + finder.observe("documents", type, id, handle); // Trigger a fetch only for a never-loaded, idle, non-errored handle // (status "pending" with no fetch in flight). Errored handles don't // auto-retry; loaded handles serve from cache. @@ -397,6 +388,9 @@ export function createDocumentStore< // would break handle identity for subsequent calls. handle = bucket.get(paramsKey)!; } + // See `find`: subscribe the rendering component to the handle's liveness + // node for automatic cancellation. + finder.observe("queries", type, paramsKey, handle); if (!handle.isFetching && handle.value === undefined && handle.error === undefined) { batch(() => applyEvent(handle!, HandleEvent.fetch())); finder.queueQuery(type, paramsKey, params); @@ -444,17 +438,6 @@ export function createDocumentStore< } }); }, - - subscribeDocument(type: K, id: string): () => void { - finder.subscribe("documents", type, id); - return () => finder.unsubscribe("documents", type, id); - }, - - subscribeQuery(type: K, params: Q[K]["params"]): () => void { - const paramsKey = stableStringify(params); - finder.subscribe("queries", type, paramsKey); - return () => finder.unsubscribe("queries", type, paramsKey); - }, }; finder.attach(state, store); diff --git a/packages/silo/tests/cancellation.test.ts b/packages/silo/tests/cancellation.test.ts index a181487e..be19b1ea 100644 --- a/packages/silo/tests/cancellation.test.ts +++ b/packages/silo/tests/cancellation.test.ts @@ -2,17 +2,26 @@ // tests/cancellation.test.ts // ============================================================================= // -// Subscriber-gated cancellation (opt-in). Callers ref-count interest per key -// via store.subscribeDocument / subscribeQuery (the React hooks do NOT — they -// are pure reactive reads). When the last subscriber for every key in an -// in-flight chunk goes away, the chunk's fiber is interrupted — aborting the -// request's AbortSignal — and its handles reset to idle so a later find -// refetches. +// Signals-native, automatic cancellation. Every handle carries a dedicated +// reactive "liveness" node; `store.find`/`store.findQuery` subscribe the current +// active subscriber (the rendering component) to it. When the last observer of a +// handle goes away (its component unmounts), the kernel fires `onUnobserved`; +// after the `gcTimeMs` debounce, if every key in the in-flight chunk is still +// unobserved, the chunk's fiber is interrupted — aborting the request's +// AbortSignal — and its handles reset to idle so a later find refetches. // -// Driven through a real DocumentStore with fake timers (the batch window runs -// on Effect.sleep; the gc deferral on setTimeout — both advance together). +// These node-level tests stand in for React mount/unmount by driving observation +// directly through the kernel: an effect node whose deps are (re-)established by +// running a render thunk under it, exactly like `tracked()`. Disposing the +// effect == unmounting the last component. React-level coverage lives in +// tests/react/cancellation.test.tsx. +// +// Driven through a real DocumentStore with fake timers (the batch window runs on +// Effect.sleep; the gc deferral on setTimeout — both advance together). // ============================================================================= +import { effect } from "@supergrain/kernel"; +import { getActiveSub, type ReactiveNode, setActiveSub } from "@supergrain/kernel/internal"; import { Effect } from "effect"; import { describe, expect, it, vi } from "vitest"; @@ -33,7 +42,6 @@ interface Controllable { readonly signal: AbortSignal | undefined; /** Resolve the most recent in-flight request. */ resolve(): void; - /** Whether the adapter threaded the signal into its request (default true). */ } /** @@ -78,20 +86,44 @@ async function tick(ms = 20): Promise { await vi.advanceTimersByTimeAsync(ms); } -describe("subscriber-gated cancellation", () => { - it("interrupts an in-flight fetch and aborts the wire when the last subscriber leaves", async () => { +/** + * Observe a handle the way a `tracked()` component does: create an effect node, + * capture it on the first run, and (re-)run `read` under it so reactive reads — + * including the `store.find`/`finder.observe` liveness subscription — are linked + * to that node. The returned disposer is the "unmount": disposing unlinks the + * deps, dropping the last observer. + */ +function observe(read: () => void): () => void { + let node: ReactiveNode | undefined; + return effect(() => { + if (node === undefined) { + node = getActiveSub(); + } + const prev = setActiveSub(node); + try { + read(); + } finally { + setActiveSub(prev); + } + }); +} + +describe("observation-driven cancellation", () => { + it("interrupts an in-flight fetch and aborts the wire when the last observer leaves", async () => { const c = controllable(); const store = createDocumentStore({ models: { user: { adapter: c.adapter } } }); - const handle = store.find("user", "1"); - const unsub = store.subscribeDocument("user", "1"); + let handle = makeIdleHandle(); + const unobserve = observe(() => { + handle = store.find("user", "1") as typeof handle; + }); await tick(); // window elapses → chunk fiber forked → request in flight expect(c.calls).toEqual([["1"]]); expect(handle.isFetching).toBe(true); expect(c.signal?.aborted).toBe(false); - unsub(); // last subscriber gone + unobserve(); // last observer gone await tick(); // gc(0) fires next tick → interrupt → AbortController.abort() expect(c.signal?.aborted).toBe(true); @@ -101,74 +133,74 @@ describe("subscriber-gated cancellation", () => { expect(handle.value).toBeUndefined(); }); - it("keeps the fetch while any key in the batch still has a subscriber", async () => { + it("keeps the fetch while any key in the batch still has an observer", async () => { const c = controllable(); const store = createDocumentStore({ models: { user: { adapter: c.adapter } } }); - store.find("user", "1"); - store.find("user", "2"); // same window → one chunk ["1","2"] - const unsub1 = store.subscribeDocument("user", "1"); - const unsub2 = store.subscribeDocument("user", "2"); + const unobs1 = observe(() => void store.find("user", "1")); + const unobs2 = observe(() => void store.find("user", "2")); // same window → one chunk ["1","2"] await tick(); expect(c.calls).toEqual([["1", "2"]]); - unsub2(); + unobs2(); await tick(); - expect(c.signal?.aborted).toBe(false); // "1" still subscribed → chunk kept + expect(c.signal?.aborted).toBe(false); // "1" still observed → chunk kept - unsub1(); + unobs1(); await tick(); expect(c.signal?.aborted).toBe(true); // both abandoned → interrupted }); - it("a re-subscribe before the next tick cancels the pending interrupt", async () => { + it("a re-observe before the next tick cancels the pending interrupt", async () => { const c = controllable(); const store = createDocumentStore({ models: { user: { adapter: c.adapter } } }); - store.find("user", "1"); - const unsub = store.subscribeDocument("user", "1"); + const unobserve = observe(() => void store.find("user", "1")); await tick(); - unsub(); - const unsubAgain = store.subscribeDocument("user", "1"); // synchronous re-subscribe + unobserve(); + const unobserveAgain = observe(() => void store.find("user", "1")); // synchronous re-observe await tick(); - expect(c.signal?.aborted).toBe(false); // interrupt cancelled + expect(c.signal?.aborted).toBe(false); // interrupt cancelled (onObserved cleared the timer) expect(c.calls).toEqual([["1"]]); // no refetch - unsubAgain(); + unobserveAgain(); }); it("refetches when interest returns after a gc interrupt", async () => { const c = controllable(); const store = createDocumentStore({ models: { user: { adapter: c.adapter } } }); - store.find("user", "1"); - const unsub = store.subscribeDocument("user", "1"); + const unobserve = observe(() => void store.find("user", "1")); await tick(); - unsub(); + unobserve(); await tick(); expect(c.signal?.aborted).toBe(true); // Interest returns: the handle is idle again, so find re-triggers a fetch. - const handle = store.find("user", "1"); - const unsubAgain = store.subscribeDocument("user", "1"); + let handle = makeIdleHandle(); + const unobserveAgain = observe(() => { + handle = store.find("user", "1") as typeof handle; + }); await tick(); expect(c.calls).toEqual([["1"], ["1"]]); expect(handle.isFetching).toBe(true); - unsubAgain(); + unobserveAgain(); }); it("discards a late result after interruption even when the adapter ignores the signal", async () => { const c = controllable(false); // adapter never honors the abort signal const store = createDocumentStore({ models: { user: { adapter: c.adapter } } }); - const handle = store.find("user", "1"); - const unsub = store.subscribeDocument("user", "1"); + let handle = makeIdleHandle(); + const unobserve = observe(() => { + handle = store.find("user", "1") as typeof handle; + }); await tick(); - unsub(); + unobserve(); await tick(); // interrupt: fiber gone, even if the request later resolves c.resolve(); // the ignored request finally settles @@ -194,23 +226,25 @@ describe("subscriber-gated cancellation", () => { }; const store = createDocumentStore({ models: { user: { adapter } } }); - const handle = store.find("user", "1"); - const unsub = store.subscribeDocument("user", "1"); + let handle = makeIdleHandle(); + const unobserve = observe(() => { + handle = store.find("user", "1") as typeof handle; + }); await tick(); expect(handle.isFetching).toBe(true); - unsub(); + unobserve(); await tick(); expect(finalized).toBe(true); // the adapter's own finalizer ran expect(handle.isFetching).toBe(false); // reset to idle }); - it("does not interrupt a fetch that was never subscribed", async () => { + it("does not interrupt a fetch that was never observed", async () => { const c = controllable(); const store = createDocumentStore({ models: { user: { adapter: c.adapter } } }); - const handle = store.find("user", "1"); // no subscribeDocument + const handle = store.find("user", "1"); // no observer (no active sub) await tick(); await tick(); @@ -222,20 +256,19 @@ describe("subscriber-gated cancellation", () => { expect(handle.value).toEqual({ id: "1", name: "User1" }); }); - it("keeps the fetch while a second subscriber on the same key remains", async () => { + it("keeps the fetch while a second observer on the same key remains", async () => { const c = controllable(); const store = createDocumentStore({ models: { user: { adapter: c.adapter } } }); - store.find("user", "1"); - const unsubA = store.subscribeDocument("user", "1"); - const unsubB = store.subscribeDocument("user", "1"); // count = 2 + const unobsA = observe(() => void store.find("user", "1")); + const unobsB = observe(() => void store.find("user", "1")); // two observers, one key await tick(); - unsubA(); // count = 1, still wanted + unobsA(); // one observer left await tick(); expect(c.signal?.aborted).toBe(false); - unsubB(); // count = 0 + unobsB(); // last observer gone await tick(); expect(c.signal?.aborted).toBe(true); }); @@ -247,19 +280,17 @@ describe("subscriber-gated cancellation", () => { models: { user: { adapter: cUser.adapter }, post: { adapter: cPost.adapter } }, }); - store.find("user", "1"); - store.find("post", "1"); // two separate chunks in flight - const unsubUser = store.subscribeDocument("user", "1"); - const unsubPost = store.subscribeDocument("post", "1"); + const unobsUser = observe(() => void store.find("user", "1")); + const unobsPost = observe(() => void store.find("post", "1")); // two separate chunks in flight await tick(); - unsubUser(); + unobsUser(); await tick(); expect(cUser.signal?.aborted).toBe(true); // abandoned chunk interrupted expect(cPost.signal?.aborted).toBe(false); // unrelated chunk untouched - unsubPost(); + unobsPost(); await tick(); }); }); @@ -267,7 +298,7 @@ describe("subscriber-gated cancellation", () => { type QTypes = { search: { params: { q: string }; result: { id: string; type: "search" } } }; describe("query cancellation", () => { - it("interrupts an in-flight query fetch when its last subscriber leaves", async () => { + it("interrupts an in-flight query fetch when its last observer leaves", async () => { let signal: AbortSignal | undefined; const adapter = { find: (_paramsList: Array<{ q: string }>, ctx?: { signal: AbortSignal }) => { @@ -282,13 +313,15 @@ describe("query cancellation", () => { queries: { search: { adapter } }, }); - const handle = store.findQuery("search", { q: "x" }); - const unsub = store.subscribeQuery("search", { q: "x" }); + let handle = makeIdleHandle(); + const unobserve = observe(() => { + handle = store.findQuery("search", { q: "x" }) as typeof handle; + }); await tick(); expect(handle.isFetching).toBe(true); expect(signal?.aborted).toBe(false); - unsub(); + unobserve(); await tick(); expect(signal?.aborted).toBe(true); @@ -297,19 +330,7 @@ describe("query cancellation", () => { }); }); -describe("subscriber ref-counting (Finder unit)", () => { - it("unsubscribe is a no-op for an unknown type, and is idempotent", () => { - const finder = new Finder({ models: { user: { adapter: { find: async () => [] } } } }); - - // Unknown type bucket → early return, no throw. - expect(() => finder.unsubscribe("documents", "user", "nope")).not.toThrow(); - - finder.subscribe("documents", "user", "1"); - finder.unsubscribe("documents", "user", "1"); // → 0, schedules gc - // Second unsubscribe with a pending gc timer is a no-op. - expect(() => finder.unsubscribe("documents", "user", "1")).not.toThrow(); - }); - +describe("Finder observation (unit)", () => { it("an interrupt safely skips a handle evicted mid-flight", async () => { const c = controllable(); const handle = makeIdleHandle(); @@ -322,12 +343,12 @@ describe("subscriber ref-counting (Finder unit)", () => { const finder = new Finder({ models: { user: { adapter: c.adapter } } }); finder.attach(state, host); - finder.subscribe("documents", "user", "1"); + const unobserve = observe(() => finder.observe("documents", "user", "1", handle)); finder.queueDocument("user", "1"); await tick(); // window → chunk in flight state.documents.get("user")!.delete("1"); // handle evicted before interrupt - finder.unsubscribe("documents", "user", "1"); + unobserve(); await tick(); // gc → interrupt → resetKeys finds the bucket but no handle expect(c.signal?.aborted).toBe(true); // request still torn down, no throw @@ -345,14 +366,49 @@ describe("subscriber ref-counting (Finder unit)", () => { const finder = new Finder({ models: { user: { adapter: c.adapter } } }); finder.attach(state, host); - finder.subscribe("documents", "user", "1"); + const unobserve = observe(() => finder.observe("documents", "user", "1", handle)); finder.queueDocument("user", "1"); await tick(); state.documents.delete("user"); // whole bucket gone - finder.unsubscribe("documents", "user", "1"); + unobserve(); await tick(); expect(c.signal?.aborted).toBe(true); }); + + it("interrupts a chunk where some keys were queued without ever being observed", async () => { + const c = controllable(); + const handle1 = makeIdleHandle(); + handle1.isFetching = true; + const handle2 = makeIdleHandle(); + handle2.isFetching = true; + const state: InternalState = { + documents: new Map([ + [ + "user", + new Map([ + ["1", handle1], + ["2", handle2], + ]), + ], + ]), + queries: new Map(), + }; + const host = createDocumentStore({ models: { user: { adapter: c.adapter } } }); + const finder = new Finder({ models: { user: { adapter: c.adapter } } }); + finder.attach(state, host); + + // Observe only "1"; "2" rides the same chunk but is never observed. + const unobserve = observe(() => finder.observe("documents", "user", "1", handle1)); + finder.queueDocument("user", "1"); + finder.queueDocument("user", "2"); + await tick(); + expect(c.calls).toEqual([["1", "2"]]); + + unobserve(); + await tick(); // gc → maybeInterrupt: "1" unobserved, "2" has no observation entry + + expect(c.signal?.aborted).toBe(true); + }); }); diff --git a/packages/silo/tests/react/cancellation.test.tsx b/packages/silo/tests/react/cancellation.test.tsx new file mode 100644 index 00000000..fb164670 --- /dev/null +++ b/packages/silo/tests/react/cancellation.test.tsx @@ -0,0 +1,168 @@ +// ============================================================================= +// tests/react/cancellation.test.tsx +// ============================================================================= +// +// React-level coverage for signals-native, automatic fetch cancellation. +// `useDocument`/`useQuery` are pure reactive reads (no effects, no imperative +// subscription) — yet unmounting the last component observing a handle +// interrupts its in-flight fetch (aborting the AbortSignal) after the `gcTimeMs` +// debounce; a surviving observer keeps it alive; a quick remount does not +// cancel. +// ============================================================================= + +import { tracked } from "@supergrain/kernel/react"; +import { cleanup, render } from "@testing-library/react"; +import { afterEach, describe, expect, it } from "vitest"; + +import { type DocumentAdapter, type DocumentStore, type DocumentStoreConfig } from "../../src"; +import { createDocumentStoreContext } from "../../src/react"; + +interface User { + id: string; + name: string; +} + +type TypeToModel = { user: User }; + +const tick = (ms = 30) => new Promise((r) => setTimeout(r, ms)); + +afterEach(async () => { + // Unmount everything, then drain any pending gc timers / in-flight fetch + // interrupts so nothing leaks into the next test file in this browser worker. + cleanup(); + await tick(80); +}); + +interface Controllable { + adapter: DocumentAdapter; + readonly signal: AbortSignal | undefined; + readonly calls: number; +} + +// A Promise adapter whose request stays pending until aborted; records the +// AbortSignal of the most recent call so tests can assert wire-level abort. +function controllable(): Controllable { + let signal: AbortSignal | undefined; + let calls = 0; + const adapter: DocumentAdapter = { + find: (ids, ctx) => { + calls++; + signal = ctx?.signal; + return new Promise>((resolve, reject) => { + if (ctx?.signal) { + ctx.signal.addEventListener("abort", () => reject(new Error("aborted"))); + } + // Otherwise hangs forever (kept pending for the test's duration). + void resolve; + void ids; + }); + }, + }; + return { + adapter, + get signal() { + return signal; + }, + get calls() { + return calls; + }, + }; +} + +function makeConfig(c: Controllable, gcTimeMs = 0): DocumentStoreConfig { + return { models: { user: { adapter: c.adapter } }, gcTimeMs }; +} + +const { Provider, useDocument } = createDocumentStoreContext>(); + +const UserBadge = tracked(function UserBadge({ userId }: { userId: string }) { + const handle = useDocument("user", userId); + return {handle.value !== undefined ? handle.value.name : "loading"}; +}); + +describe("automatic cancellation via observation", () => { + it("interrupts the in-flight fetch when the last observer unmounts", async () => { + const c = controllable(); + const { unmount } = render( + + + , + ); + + await tick(); // batch window elapses → request in flight + expect(c.calls).toBe(1); + expect(c.signal?.aborted).toBe(false); + + unmount(); // last observer gone + await tick(); // gc(0) → interrupt → AbortController.abort() + + expect(c.signal?.aborted).toBe(true); + }); + + it("keeps the fetch alive while another component still observes the handle", async () => { + const c = controllable(); + + function App({ showSecond }: { showSecond: boolean }) { + return ( + + + {showSecond ? : null} + + ); + } + + const { rerender } = render(); + await tick(); + expect(c.signal?.aborted).toBe(false); + + rerender(); // one of two observers unmounts + await tick(); + expect(c.signal?.aborted).toBe(false); // a surviving observer keeps it alive + }); + + it("does not cancel when a component remounts within the gc window", async () => { + const c = controllable(); + const config = makeConfig(c, 50); + + const { unmount } = render( + + + , + ); + await tick(); + expect(c.calls).toBe(1); + + unmount(); + // Remount immediately (a fast nav-back), well within the 50ms gc window. + render( + + + , + ); + await tick(60); + + expect(c.signal?.aborted).toBe(false); // never cancelled + }); + + it("useDocument contains no imperative subscription — a plain read still cancels", async () => { + // Smoke-test that the *only* wiring is the reactive read: a component that + // merely reads the handle (no effects) drives cancellation on unmount. + const c = controllable(); + const ReadOnly = tracked(function ReadOnly() { + const handle = useDocument("user", "42"); + return {handle.status}; + }); + + const { unmount } = render( + + + , + ); + await tick(); + expect(c.signal?.aborted).toBe(false); + + unmount(); + await tick(); + expect(c.signal?.aborted).toBe(true); + }); +}); From 49a5b0e5d02233bbafb38c239031ac7203a04e62 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 3 Jun 2026 01:19:42 +0000 Subject: [PATCH 2/2] Coalesce observation dispatch, drop onObserved, and tidy the cancellation path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses self-review feedback on the observation primitive + silo cancellation: - Kernel: defer `onUnobserved` to a coalesced microtask flush that re-checks each node, so a `tracked()` re-render's transient unlink/re-link never fires (kills the per-render timer thrash and its scheduler race). Drop the `onObserved` hook and its `link` wrapper entirely — the hot path now calls `baseLink` directly, so observation adds zero per-`link` overhead. - Kernel: `getObservationNode` dedupes frozen targets via a WeakMap fallback (previously returned a fresh, un-deduped node — observation would silently break). Cover the `flush` error-recovery path with a real test and drop its c8-ignore (only the genuinely-unreachable dirty-recheck fallback remains). - Silo: collapse the per-key gc timers into one coalesced sweep over a pending set; drop the now-unused `onObserved`/`unregister` plumbing; store the liveness node directly. Replace the NUL-delimited `keyOf` with a JSON-encoded tuple — still collision-safe, but plain text so finder.ts stays diffable. - Changesets: stop editing #82's silo changeset; add a separate additive #83 changeset instead. Update the kernel changeset to match the onObserved-free API and the coalescing semantics. All five gates green (657 tests). silo finder/store 100%; kernel system.ts 100% lines/statements/functions. https://claude.ai/code/session_018FyYY6YYb1CW5LnBfPMgod --- .changeset/kernel-observation.md | 7 +- .changeset/silo-auto-cancellation.md | 11 ++ .changeset/silo-effect-migration.md | 2 +- packages/kernel/src/core.ts | 35 +++-- packages/kernel/src/system.ts | 116 ++++++++-------- .../kernel/tests/core/observation.test.ts | 126 ++++++++++-------- packages/kernel/tests/core/primitives.test.ts | 38 ++++++ packages/silo/src/finder.ts | Bin 18436 -> 18720 bytes .../silo/tests/react/cancellation.test.tsx | 36 +++++ 9 files changed, 244 insertions(+), 127 deletions(-) create mode 100644 .changeset/silo-auto-cancellation.md diff --git a/.changeset/kernel-observation.md b/.changeset/kernel-observation.md index e7f24f16..a4c9c055 100644 --- a/.changeset/kernel-observation.md +++ b/.changeset/kernel-observation.md @@ -6,18 +6,17 @@ Add a reactive-observation lifecycle primitive and own the reactive system. The kernel now owns its primitive layer (`signal` / `computed` / `effect` / `batch`) on top of `alien-signals/system`'s `createReactiveSystem(...)` instead of importing the high-level operators from `alien-signals` directly. The graph algorithm (`link` / `unlink` / `propagate` / `checkDirty`) is still delegated to alien-signals — only the thin operator layer is owned, so the kernel can observe when a reactive node loses its last subscriber. All reactive semantics (fine-grained tracking, batching, Map/Set coalescing, `effect` cleanup) are unchanged. -**New: `onObservationChange`.** Register a callback fired when a reactive node transitions observed→unobserved (its last subscriber is removed) and, optionally, unobserved→observed (it gains its first): +**New: `onObservationChange`.** Register a callback fired when a reactive node loses its last subscriber: ```ts import { onObservationChange, getObservationNode } from "@supergrain/kernel"; const node = getObservationNode(reactiveProxy); // dedicated, never-written liveness node const unregister = onObservationChange(node, { - onUnobserved: () => scheduleCleanup(), // defer destructive work; re-check on a timer - onObserved: () => cancelCleanup(), + onUnobserved: () => scheduleCleanup(), // defer destructive work; re-check isObserved later }); ``` -`getObservationNode(proxy)` returns a proxy's dedicated liveness node (created lazily, never written, so it never causes a re-render). The sharp tools `trackNode` (subscribe the active sub) and `isObserved` are available from `@supergrain/kernel/internal`. The dispatch is gated behind counters that stay `0` until a handler is registered, so the hot path is unchanged when observation is unused. +`onUnobserved` is **not** fired on the synchronous unlink: nodes that lose their last subscriber are coalesced and flushed on a microtask, and each is re-checked, so a node unobserved-then-re-observed within the same turn (a `tracked()` re-render re-establishing its dependencies) fires nothing — no thrash. `getObservationNode(proxy)` returns a proxy's dedicated liveness node (created lazily, never written, so it never causes a re-render; stable even for frozen targets). The sharp tools `trackNode` (subscribe the active sub) and `isObserved` are available from `@supergrain/kernel/internal`. There is no first-subscriber hook, so the hot `link` path is untouched; the `unwatched` bookkeeping is gated behind a counter that stays `0` until a handler is registered. `@supergrain/silo` uses this to cancel an in-flight fetch automatically when no component observes a handle anymore — no `useEffect`, no manual `subscribe*`. diff --git a/.changeset/silo-auto-cancellation.md b/.changeset/silo-auto-cancellation.md new file mode 100644 index 00000000..080555e8 --- /dev/null +++ b/.changeset/silo-auto-cancellation.md @@ -0,0 +1,11 @@ +--- +"@supergrain/silo": minor +--- + +Fetch cancellation is now automatic and signals-native — it rides the reactive graph instead of manual ref-counting. + +Every handle carries a dedicated reactive liveness node; reading a handle through `find` / `findQuery` subscribes the rendering component to it (via the kernel's new `onObservationChange` primitive). When the **last** component observing a handle unmounts, its in-flight fetch is interrupted — aborting the request's `AbortSignal` — after the `gcTimeMs` grace window, and the handle resets to idle so renewed interest refetches. A batch is only cancelled when the last observer for **every** key in it goes away. + +`useDocument` / `useQuery` remain **pure reactive reads** — no `useEffect`, no imperative subscription — and now drive cancellation automatically on unmount. The transient unobserve/re-observe of a `tracked()` re-render never cancels: the kernel coalesces and re-checks observation on a microtask, and `gcTimeMs` (default `0` = next tick) plus an `isObserved` re-check at sweep time absorb a StrictMode remount or fast nav-back. + +**Removed** the opt-in `store.subscribeDocument` / `store.subscribeQuery` capability (added in the same major and never released): cancellation no longer needs a manual subscription. Adapters still receive `find(ids, { signal })` — thread it into `fetch(url, { signal })` for a real network abort, or ignore it and interruption just discards the result. diff --git a/.changeset/silo-effect-migration.md b/.changeset/silo-effect-migration.md index 152e1401..2a9cfb47 100644 --- a/.changeset/silo-effect-migration.md +++ b/.changeset/silo-effect-migration.md @@ -10,7 +10,7 @@ Rebuild the network/async layer on an internal [Effect](https://effect.website/) **Per-model `retry` / `timeout`.** `ModelConfig` and `QueryConfig` accept an Effect `Schedule` (`retry`) and a `Duration` (`timeout`). -**Automatic, signals-native cancellation.** Each chunk's fetch runs on its own interruptible fiber, and the batch window now runs on `Effect.sleep` (the whole engine is on Effect's clock). Fetch cancellation rides the reactive graph itself: every handle carries a dedicated reactive liveness node that the rendering component subscribes to when it reads the handle via `find` / `findQuery`. When the **last** component observing a handle unmounts, the kernel's `onObservationChange` primitive fires and — after a `gcTimeMs` grace window — the in-flight fetch is interrupted (aborting the request via an `AbortSignal`) and its handles reset to idle so renewed interest refetches. A batch is only cancelled when the last observer for **every** key in it goes away; `gcTimeMs` (default `0` = next tick) defers the interrupt so a StrictMode remount or fast nav-back re-subscribes first. The React `useDocument` / `useQuery` hooks stay **pure reactive reads** (no `useEffect`, no imperative subscription) and drive this automatically. Adapters receive the signal regardless: `find(ids, { signal })` (optional) — thread it into `fetch(url, { signal })` for a real network abort, or ignore it and interruption just discards the result (no stale write). +**Fiber-based cancellation (opt-in).** Each chunk's fetch runs on its own interruptible fiber, and the batch window now runs on `Effect.sleep` (the whole engine is on Effect's clock). Fetch cancellation is exposed as a capability: `subscribeDocument(type, id)` / `subscribeQuery(type, params)` ref-count interest and return an unsubscribe function; when the last subscriber for **every** key in an in-flight chunk goes away, the fetch is interrupted — aborting the request via an `AbortSignal` — and its handles reset to idle so renewed interest refetches. `gcTimeMs` (default `0` = next tick) defers the interrupt so a quick re-subscribe cancels it. The React `useDocument` / `useQuery` hooks are **pure reactive reads** and do not auto-wire this — a signals-native "cancel when a handle has no reactive observers" wants an observation-lifecycle primitive in the kernel core that doesn't exist yet. Adapters receive the signal regardless: `find(ids, { signal })` (optional) — thread it into `fetch(url, { signal })` for a real network abort, or ignore it and interruption just discards the result (no stale write). **The handle fields changed.** `DocumentHandle` / `QueryHandle` are now a `status`-discriminated union over flat fields: diff --git a/packages/kernel/src/core.ts b/packages/kernel/src/core.ts index c82f3361..a1561d37 100644 --- a/packages/kernel/src/core.ts +++ b/packages/kernel/src/core.ts @@ -100,6 +100,11 @@ export function getNode(nodes: DataNodes, property: PropertyKey, value?: unknown return newSignal; } +// Fallback liveness-node store for targets that can't carry the `$OBSERVE` +// property (frozen / non-extensible). Keyed by raw target so the node is still +// deduped (observation would silently break if each call returned a new node). +const frozenObservationNodes = new WeakMap(); + /** * Retrieve the dedicated "liveness" reactive node for a reactive proxy, * creating it lazily and stashing it on the raw target. Unlike the per-property @@ -107,23 +112,25 @@ export function getNode(nodes: DataNodes, property: PropertyKey, value?: unknown * purely so observation primitives (`onObservationChange`/`trackNode`/ * `isObserved`) can detect when the proxy has no remaining reactive observers. * - * Returns `undefined` only when the value isn't an object (nothing to observe). + * The node is stable across calls for a given target (incl. frozen targets, via + * a WeakMap fallback), so observation handlers attach to the same node a reader + * subscribes to. */ export function getObservationNode(value: object): ReactiveNode { const raw = unwrap(value) as ReactiveTagged & { [$OBSERVE]?: ReactiveNode }; - let node = raw[$OBSERVE]; - if (!node) { - node = createObservationNode(); - try { - Object.defineProperty(raw, $OBSERVE, { - value: node, - enumerable: false, - configurable: true, - }); - } catch { - // Frozen / non-extensible targets can't carry the node; fall back to the - // freshly created node (observation simply won't be deduped for it). - } + const existing = raw[$OBSERVE] ?? frozenObservationNodes.get(raw); + if (existing) return existing; + + const node = createObservationNode(); + try { + Object.defineProperty(raw, $OBSERVE, { + value: node, + enumerable: false, + configurable: true, + }); + } catch { + // Frozen / non-extensible target: keep the node deduped via the WeakMap. + frozenObservationNodes.set(raw, node); } return node; } diff --git a/packages/kernel/src/system.ts b/packages/kernel/src/system.ts index 988fc0dd..acb16611 100644 --- a/packages/kernel/src/system.ts +++ b/packages/kernel/src/system.ts @@ -13,13 +13,12 @@ // // The operator layer is a faithful port of alien-signals 3.x's default system // (see `node_modules/alien-signals/esm/index.mjs`). Behavior is identical; the -// only additions are: -// - `unwatched` additionally invokes any registered `onUnobserved` handler. -// - `link` additionally invokes any registered `onObserved` handler on the -// unobserved→observed (first-subscriber) transition. -// Both additions are gated behind counters that stay `0` unless a handler is -// registered, so the hot path is unchanged when observation is unused (e.g. -// the js-framework-benchmark, which never touches `@supergrain/silo`). +// only addition is that `unwatched` records last-subscriber-removed nodes for a +// coalesced, microtask-deferred `onUnobserved` dispatch (see below). The hot +// path — `link` and the `*Oper` functions — is untouched: there is no +// first-subscriber hook, and the `unwatched` bookkeeping is gated behind a +// counter that stays `0` until a handler is registered, so code that never uses +// observation (e.g. the js-framework-benchmark) pays nothing. import { createReactiveSystem, type Link, type ReactiveNode } from "alien-signals/system"; @@ -74,18 +73,50 @@ const queued: Array = []; // ─── Observation registry ──────────────────────────────────────────────────── // // `onUnobserved` rides the graph's natural last-subscriber-removed event -// (`unwatched`); `onObserved` rides the first-subscriber-added event (the -// `link` wrapper). Both are gated by counters so dispatch is free when nothing -// is registered. +// (`unwatched`), but is NOT fired synchronously. A reactive node routinely loses +// and regains its last subscriber within a single turn — e.g. the `tracked()` +// React adapter re-establishes a component's dependencies on every render, so a +// dependency is transiently unlinked then re-linked. Firing on the synchronous +// unlink would thrash (a cancel scheduled-then-cancelled on every re-render). +// +// Instead, unobserved nodes are collected and flushed on a single microtask; +// at flush time each node is re-checked, so a node re-observed within the turn +// fires nothing. The hot path (`signalOper`/`computedOper`/`trackNode` → +// `link`) is unchanged — there is no first-subscriber (`onObserved`) hook, so +// no per-`link` work. Both the queueing and the flush are gated by +// `observerCount`, which stays `0` until a handler is registered (e.g. the +// js-framework-benchmark, which never touches observation, pays nothing). interface ObservationHandlers { - onObserved?: () => void; onUnobserved?: () => void; } const observers = new WeakMap(); let observerCount = 0; -let onObservedCount = 0; +const pendingUnobserved = new Set(); +let unobservedFlushScheduled = false; + +function scheduleUnobservedFlush(): void { + if (unobservedFlushScheduled) return; + unobservedFlushScheduled = true; + queueMicrotask(flushUnobserved); +} + +function flushUnobserved(): void { + unobservedFlushScheduled = false; + if (pendingUnobserved.size === 0) return; + const nodes = [...pendingUnobserved]; + pendingUnobserved.clear(); + for (const node of nodes) { + // Skip nodes re-observed within the turn (e.g. a re-render re-linked it). + if (node.subs === undefined) { + const handlers = observers.get(node); + if (handlers !== undefined && handlers.onUnobserved !== undefined) { + handlers.onUnobserved(); + } + } + } +} const system = createReactiveSystem({ update(node: BaseNode): boolean { @@ -131,11 +162,11 @@ const system = createReactiveSystem({ } else { effectScopeOper.call(node); } - if (observerCount !== 0) { - const handlers = observers.get(node); - if (handlers !== undefined && handlers.onUnobserved !== undefined) { - handlers.onUnobserved(); - } + // Defer the observation callback: coalesce + re-check on a microtask so a + // transient unlink/re-link (a re-render) doesn't fire spuriously. + if (observerCount !== 0 && observers.has(node)) { + pendingUnobserved.add(node); + scheduleUnobservedFlush(); } }, }) as unknown as { @@ -146,25 +177,7 @@ const system = createReactiveSystem({ shallowPropagate: (link: Link) => void; }; -const { link: baseLink, unlink, propagate, checkDirty, shallowPropagate } = system; - -// `link` wrapper: identical to `baseLink` when no `onObserved` handler is -// registered (the common case). Otherwise, detect the unobserved→observed -// (first-subscriber) transition and fire the handler. -function link(dep: BaseNode, sub: BaseNode, version: number): void { - if (onObservedCount !== 0) { - const wasObserved = dep.subs !== undefined; - baseLink(dep, sub, version); - if (!wasObserved && dep.subs !== undefined) { - const handlers = observers.get(dep); - if (handlers !== undefined && handlers.onObserved !== undefined) { - handlers.onObserved(); - } - } - return; - } - baseLink(dep, sub, version); -} +const { link, unlink, propagate, checkDirty, shallowPropagate } = system; // ─── Public cursor accessors ───────────────────────────────────────────────── @@ -356,13 +369,13 @@ function flush(): void { run(effect as EffectNode); } } finally { - /* c8 ignore start -- error-recovery: re-flags effects still queued when a prior effect threw mid-flush (a vendored scheduler safeguard) */ + // Recovery: an effect threw mid-flush — re-flag the still-queued effects so + // a later flush re-runs them instead of dropping them. while (notifyIndex < queuedLength) { const effect = queued[notifyIndex]; queued[notifyIndex++] = undefined; effect!.flags |= Watching | Recursed; } - /* c8 ignore stop */ notifyIndex = 0; queuedLength = 0; } @@ -514,33 +527,30 @@ export function isObserved(node: ReactiveNode): boolean { } /** - * Register handlers fired when `node` transitions observed→unobserved (its last - * subscriber is removed) and, optionally, unobserved→observed (it gains its - * first subscriber). Returns an unregister function. A later registration on - * the same node replaces the prior handlers. + * Register a handler fired when `node` transitions observed→unobserved (its last + * subscriber is removed). Returns an unregister function; a later registration + * on the same node replaces the prior handler. * - * `onUnobserved` fires synchronously during unlink/propagation — do NOT perform - * irreversible work (canceling a request, etc.) inside it. Defer it (a timer / - * microtask) so a synchronous re-subscribe (a StrictMode remount, a fast - * nav-back) can cancel the pending work first; re-check {@link isObserved} when - * the deferred work runs. + * `onUnobserved` is **not** fired on the synchronous unlink. Nodes that lose + * their last subscriber are coalesced and flushed on a microtask, and each is + * re-checked — so a node unobserved and re-observed within the same turn (a + * `tracked()` re-render re-establishing its dependencies) fires nothing. Even + * so, treat it as a hint, not a fact: a later macrotask (a StrictMode remount, + * a fast nav-back) can re-observe the node, so defer any irreversible work (a + * grace timer) and re-check {@link isObserved} when it runs. */ export function onObservationChange(node: ReactiveNode, handlers: ObservationHandlers): () => void { const key = node as unknown as BaseNode; - const existing = observers.get(key); - if (existing === undefined) { + if (!observers.has(key)) { observerCount++; - } else if (existing.onObserved !== undefined) { - onObservedCount--; } observers.set(key, handlers); - if (handlers.onObserved !== undefined) onObservedCount++; return () => { const current = observers.get(key); if (current === handlers) { observers.delete(key); + pendingUnobserved.delete(key); observerCount--; - if (handlers.onObserved !== undefined) onObservedCount--; } }; } diff --git a/packages/kernel/tests/core/observation.test.ts b/packages/kernel/tests/core/observation.test.ts index 2ae7cbd7..dcf3803e 100644 --- a/packages/kernel/tests/core/observation.test.ts +++ b/packages/kernel/tests/core/observation.test.ts @@ -2,17 +2,22 @@ // observation.test.ts // ============================================================================= // -// The reactive-observation lifecycle primitive: `onObservationChange` fires -// `onUnobserved` when a node loses its last subscriber and `onObserved` when it -// gains its first. `getObservationNode` returns a reactive proxy's dedicated -// liveness node; `trackNode`/`isObserved` subscribe-to / inspect a node. This is -// the kernel half of `@supergrain/silo`'s signals-native fetch cancellation. +// The reactive-observation lifecycle primitive: `onObservationChange(node, { +// onUnobserved })` fires when a node loses its last subscriber — coalesced and +// re-checked on a microtask, so a node unobserved and re-observed within the +// same turn fires nothing. `getObservationNode` returns a reactive proxy's +// dedicated liveness node; `trackNode`/`isObserved` subscribe-to / inspect a +// node. This is the kernel half of `@supergrain/silo`'s signals-native fetch +// cancellation. // ============================================================================= import { describe, it, expect } from "vitest"; import { createReactive, effect, getObservationNode, onObservationChange } from "../../src"; import { getActiveSub, isObserved, setActiveSub, trackNode } from "../../src/internal"; +// `onUnobserved` is dispatched on a microtask; await one so assertions see it. +const flush = () => Promise.resolve(); + // Subscribe `node` to a fresh effect (mirrors a component observing a handle). // Returns a disposer that unsubscribes — exactly like a component unmounting. function observe(node: ReturnType): () => void { @@ -22,7 +27,7 @@ function observe(node: ReturnType): () => void { } describe("onObservationChange", () => { - it("fires onUnobserved when a node loses its last subscriber", () => { + it("fires onUnobserved after a microtask once a node loses its last subscriber", async () => { const proxy = createReactive({ a: 1 }); const node = getObservationNode(proxy); @@ -31,55 +36,82 @@ describe("onObservationChange", () => { const dispose = observe(node); expect(isObserved(node)).toBe(true); - expect(unobserved).toBe(0); dispose(); expect(isObserved(node)).toBe(false); + expect(unobserved).toBe(0); // deferred, not synchronous + + await flush(); expect(unobserved).toBe(1); }); - it("fires onObserved only on the first subscriber, not subsequent ones", () => { + it("does NOT fire when a node is unobserved then re-observed within the same turn", async () => { const node = getObservationNode(createReactive({ a: 1 })); + let unobserved = 0; + onObservationChange(node, { onUnobserved: () => unobserved++ }); - let observed = 0; - onObservationChange(node, { onObserved: () => observed++ }); - - const d1 = observe(node); - expect(observed).toBe(1); + const dispose = observe(node); + dispose(); // unobserved... + const dispose2 = observe(node); // ...re-observed before the microtask flush - const d2 = observe(node); // second observer — no onObserved - expect(observed).toBe(1); + await flush(); + expect(unobserved).toBe(0); // coalesced away — no thrash - d1(); - d2(); - expect(observed).toBe(1); + dispose2(); + await flush(); + expect(unobserved).toBe(1); }); - it("re-fires onObserved after the node returns to unobserved and is observed again", () => { + it("only fires for the last subscriber when several observe the same node", async () => { const node = getObservationNode(createReactive({ a: 1 })); - let observed = 0; let unobserved = 0; - onObservationChange(node, { - onObserved: () => observed++, - onUnobserved: () => unobserved++, - }); + onObservationChange(node, { onUnobserved: () => unobserved++ }); - observe(node)(); - expect(observed).toBe(1); + const d1 = observe(node); + const d2 = observe(node); + + d1(); + await flush(); + expect(unobserved).toBe(0); // d2 still observes + + d2(); + await flush(); expect(unobserved).toBe(1); + }); - observe(node)(); - expect(observed).toBe(2); - expect(unobserved).toBe(2); + it("coalesces many nodes unobserved in one turn into a single flush", async () => { + let fired = 0; + const disposers = Array.from({ length: 5 }, () => { + const node = getObservationNode(createReactive({ a: 1 })); + onObservationChange(node, { onUnobserved: () => fired++ }); + return observe(node); + }); + + for (const d of disposers) d(); + expect(fired).toBe(0); + await flush(); + expect(fired).toBe(5); }); - it("unregister stops handlers from firing", () => { + it("unregister stops the handler from firing", async () => { const node = getObservationNode(createReactive({ a: 1 })); let unobserved = 0; const unregister = onObservationChange(node, { onUnobserved: () => unobserved++ }); unregister(); observe(node)(); + await flush(); + expect(unobserved).toBe(0); + }); + + it("unregister drops a node already queued for the flush", async () => { + const node = getObservationNode(createReactive({ a: 1 })); + let unobserved = 0; + const unregister = onObservationChange(node, { onUnobserved: () => unobserved++ }); + + observe(node)(); // queues the node for the microtask + unregister(); // ...but we unregister before it flushes + await flush(); expect(unobserved).toBe(0); }); @@ -90,25 +122,7 @@ describe("onObservationChange", () => { expect(() => unregister()).not.toThrow(); }); - it("tracks onObserved registrations across replace and unregister", () => { - const node = getObservationNode(createReactive({ a: 1 })); - let first = 0; - let second = 0; - // Register with onObserved, then replace with another that also has - // onObserved (the replace path adjusts the first-observer dispatch count). - onObservationChange(node, { onObserved: () => first++ }); - const unregister = onObservationChange(node, { onObserved: () => second++ }); - - observe(node)(); - expect(first).toBe(0); - expect(second).toBe(1); - - unregister(); // drops the active onObserved registration - observe(node)(); - expect(second).toBe(1); // no longer fires - }); - - it("re-registering replaces the prior handlers", () => { + it("re-registering replaces the prior handler", async () => { const node = getObservationNode(createReactive({ a: 1 })); let first = 0; let second = 0; @@ -116,6 +130,7 @@ describe("onObservationChange", () => { onObservationChange(node, { onUnobserved: () => second++ }); observe(node)(); + await flush(); expect(first).toBe(0); expect(second).toBe(1); }); @@ -127,18 +142,19 @@ describe("getObservationNode", () => { expect(getObservationNode(proxy)).toBe(getObservationNode(proxy)); }); - it("returns a node even for a frozen target (cannot stash, falls back)", () => { + it("returns a stable node even for a frozen target", () => { const frozen = Object.freeze({ a: 1 }); const node = getObservationNode(frozen); expect(node).toBeDefined(); - // Cannot be deduped (no place to stash it), but must not throw. - expect(() => getObservationNode(frozen)).not.toThrow(); + // Cannot stash on a frozen target, so it falls back to a WeakMap — but must + // still dedupe so observation works. + expect(getObservationNode(frozen)).toBe(node); }); it("resolves the raw target behind a proxy", () => { - const proxy = createReactive({ a: 1 }); - // Reading through the proxy and the raw value yields the same liveness node. - expect(getObservationNode(proxy)).toBe(getObservationNode(proxy)); + const raw = { a: 1 }; + const proxy = createReactive(raw); + expect(getObservationNode(proxy)).toBe(getObservationNode(raw)); }); }); diff --git a/packages/kernel/tests/core/primitives.test.ts b/packages/kernel/tests/core/primitives.test.ts index 7455c4f0..4a434423 100644 --- a/packages/kernel/tests/core/primitives.test.ts +++ b/packages/kernel/tests/core/primitives.test.ts @@ -237,6 +237,44 @@ describe("nested (child) effects", () => { }); }); +describe("flush error recovery", () => { + it("re-queues effects still pending when one throws mid-flush", () => { + const s = signal(0); + let armed = false; + const ran: Array = []; + + // Thrower subscribed first so it's processed before the survivors in the + // same flush; when it throws, the still-queued survivors must be recovered + // (re-flagged) rather than lost. + effect(() => { + void s(); + ran.push("thrower"); + if (armed) throw new Error("boom"); + }); + effect(() => { + void s(); + ran.push("a"); + }); + effect(() => { + void s(); + ran.push("b"); + }); + expect(ran).toEqual(["thrower", "a", "b"]); + + armed = true; + ran.length = 0; + expect(() => s(1)).toThrow("boom"); + + // A later, non-throwing write still flushes everyone — the survivors were + // recovered, not dropped. + armed = false; + ran.length = 0; + expect(() => s(2)).not.toThrow(); + expect(ran).toContain("a"); + expect(ran).toContain("b"); + }); +}); + describe("batch", () => { it("coalesces multiple writes into a single effect run", () => { const a = signal(0); diff --git a/packages/silo/src/finder.ts b/packages/silo/src/finder.ts index a9ed538fca1714c3ffe4dab5d47f6a1f4fe59d60..f2c6a0f57cdd74389bbdab45b93efe4bb80000a1 100644 GIT binary patch delta 2067 zcmZWqTWcgm7$xqC5PaBNL^mPsm+Xo?yK_N&kz^;ZVB8D432q__hJ~r=uAVNYr)sFG zp0P>7Aov5+#z)a7!6%jY?Elceq0ft_s%9n$8<^1DRrOuYIp3*ozFqkF`-R_sU3!#@ z5trN;k}Ba8^?At?>Xy{O(?J@ULZ>X|b+Y9U|M9Wbso^c-BxEDaMK809M6FhJu>8q{ za%C*BJSWvPT#vXmb-zkXCdBCQ@&3c7dlV~{a}y&`FD5hQe7Nl*8yaWIaLO5i%mUDu zON1P=l9=WpSgi7##}?;>q|T7bM9AL5xHIu-hFZmFD72v@r;bl?BnDh}>ZBQp*XV#r z!fJ?U-69R~Lmd>2GZAMrRz;p*qh=URT|Misd*V9 zCRz=BIoQRGW!+quY}ysGLdjb7%ktf~pK}&lP_Pg5t5?grlX*APCdtS2j1BJ_tI=Sc zPH0NUHXR0Dc04cJ6n5;m>Sr%>wj0%UP|Q$^5wko1?NQjK=iC-r9)Z*Qj#!nF;nuBH z!>Q(A9kYs7VtaU*-Wq^&jS(@-6R z>%#Q8mOTH^T*lksFc1#w5J~g25xdifcmCzOCvRML6Hxtg^UgIqRKM-rtTxwb)#mzp z6Sv@@B>c5nbTm&fm$IRIlp)OhZr^QWU01FF?8#m_}b2u^z*`xY_Q- zj_YTpL#cA3fc$LK=t9}x$MEJfoRVGuRkC)g`eW@$wYPqwI$nP<0R&GD5B4GV*p=eAO3InM(3`@CB#dKGo6WRE&s|= z1Yzxe@f7-Wo2-Hm@B%=PG!?n)kij_;k){BS*Fx>u>)Th^46`B9GCF?U&0ba?HCL+9 z#>RvqmIJ(`oO7DYIE79fY%XO`R>*P}V-zb57lJOGzv+SB6BIfXco;kK22-DfhU6e? zS^Ac#<(?HL1cc^fzk2olpVd#RZ?@-V z)r5US{YwsY>B6A1>FkJj57_q$ICaS%3mPsM& z#cG^$rh}c57c#`{Le3reO1}g@ot~}H$s$$%-uk%W8%vd-AG=3TQ3qG#gD#i`!AXEU z;~2gH$29^b-6eEEUE*B9NyZ5fH>M{&(9uVe&2|q$X^85+k**;oQnZ) zK}TvjF<|%@mF_?)%@9UdVGd~pF0W=|gub4tf@KJ%+Mb{b=u>S)Irf(s%c=I^TNfNK=tgC^_}F{cie^dk!6O4c$fO$ztDOYq8NGyFq1ro z6Z`4jQcxpMf_L-|@T zlyaow1m0#M!?FyJQjbi8zRzZX0!=mqu3dJ_N{Acr$1bepI@V^ zsWLTElF(Pie5lej_;q3dD?q@u%u z(M7>#yD9ZV=evq@j)WzoSP(59=0OHIvb^MCfi?BiKvkO}(iwV~GZ%k^QoH!@hj0u}I({7L>;s`g}BUcs~?=q{+_`SQuVcA{#0x zBzc-pJfi}l|H9w7jCv<9F)4}Ikj&Ar@H|j;Mw>=4p*e}AtRPo%;@5_7b&$4MN@f~j zSS8E46SwK&kSy~~)wD%wN*!}^sYc$2v}d}c1Io3O6U;fmD<1g4=Un7+4x}&CAzlBR ztbPaPLry05AiD<0i5~op+nctr)p#jf!WpW{tw8(X*7tALWCkuZnwp=mv;yP*$4|U( zMf3i>j<1%~R4hrotP9EorDh#@R8IKyk zK!r;dhVrKKlX|H7w5=bcG&gOlybrXPf}v-A`D { expect(c.signal?.aborted).toBe(false); // never cancelled }); + it("does not cancel across re-renders of a still-mounted observer", async () => { + // The crux of the coalescing fix: `tracked()` re-renders transiently drop + // and re-establish the liveness subscription. A naive design would schedule + // (and race) a cancel on every re-render; here repeated re-renders of a + // mounted component must never abort its in-flight fetch. + const c = controllable(); + const Badge = tracked(function Badge({ gen }: { gen: number }) { + const handle = useDocument("user", "1"); + return ( + + {gen}:{handle.status} + + ); + }); + + const { rerender } = render( + + + , + ); + await tick(); + expect(c.calls).toBe(1); + + for (let gen = 1; gen <= 5; gen++) { + rerender( + + + , + ); + await tick(10); + } + + expect(c.signal?.aborted).toBe(false); // never cancelled while mounted + expect(c.calls).toBe(1); // and never refetched + }); + it("useDocument contains no imperative subscription — a plain read still cancels", async () => { // Smoke-test that the *only* wiring is the reactive read: a component that // merely reads the handle (no effects) drives cancellation on unmount.