Skip to content
Open
7 changes: 7 additions & 0 deletions .changeset/require-pg-sync-source-urls.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@electric-ax/agents-runtime': patch
'@electric-ax/agents-server': patch
'@electric-ax/agents': patch
---

Require an explicit Electric shape endpoint URL for pg-sync observations. Source identity is now derived from the shape options alone (not per-request metadata) so re-registrations reuse the same bridge and stream, and registration validates the endpoint by fetching the shape log up front, failing with Electric's error instead of retrying silently.
5 changes: 4 additions & 1 deletion packages/agents-runtime/src/observation-sources.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,10 @@ export function canonicalPgSyncOptions(
}

export function sourceRefForPgSync(options: PgSyncOptions): string {
return hashString(JSON.stringify(canonicalPgSyncOptions(options)))
// metadata is per-request context (principal, wakeId, ...) and must not
// change the identity of the observed shape.
const { metadata: _metadata, ...identity } = canonicalPgSyncOptions(options)
return hashString(JSON.stringify(identity))
}

export interface EntityObservationSource extends ObservationSource {
Expand Down
61 changes: 36 additions & 25 deletions packages/agents-runtime/src/process-wake.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,32 @@ function toError(err: unknown): Error {
return err instanceof Error ? err : new Error(String(err))
}

// Rebind an observation source to the sourceRef/streamUrl the server assigned
// at registration time (entities, pgSync), keeping its manifest entry in sync.
function withRegisteredManifestEntry(
source: ObservationSource,
sourceType: `entities` | `pgSync`,
registered: { sourceRef: string; streamUrl: string }
): ObservationSource {
const originalEntry = source.toManifestEntry() as Record<string, unknown>
return {
...source,
sourceRef: registered.sourceRef,
streamUrl: registered.streamUrl,
toManifestEntry() {
return {
...originalEntry,
key: `source:${sourceType}:${registered.sourceRef}`,
sourceRef: registered.sourceRef,
config: {
...((originalEntry.config as Record<string, unknown>) ?? {}),
streamUrl: registered.streamUrl,
},
} as unknown as ReturnType<ObservationSource[`toManifestEntry`]>
},
}
}

async function resolveHeadersProvider(
provider: ProcessWakeConfig[`claimHeaders`]
): Promise<Record<string, string> | undefined> {
Expand Down Expand Up @@ -1714,26 +1740,11 @@ export async function processWake(
const ensured = await serverClient.ensureEntitiesMembershipStream(
(source as EntitiesObservationSource).tags
)
const originalEntry = source.toManifestEntry() as Record<
string,
unknown
>
observedSource = {
...source,
sourceRef: ensured.sourceRef,
streamUrl: ensured.streamUrl,
toManifestEntry() {
return {
...originalEntry,
key: `source:entities:${ensured.sourceRef}`,
sourceRef: ensured.sourceRef,
config: {
...((originalEntry.config as Record<string, unknown>) ?? {}),
streamUrl: ensured.streamUrl,
},
} as unknown as ReturnType<ObservationSource[`toManifestEntry`]>
},
}
observedSource = withRegisteredManifestEntry(
source,
`entities`,
ensured
)
}

let registeredPgSync: { streamUrl: string; sourceRef: string } | undefined
Expand All @@ -1748,11 +1759,11 @@ export async function processWake(
wakeId,
}
)
observedSource = {
...source,
sourceRef: registeredPgSync.sourceRef,
streamUrl: registeredPgSync.streamUrl,
}
observedSource = withRegisteredManifestEntry(
source,
`pgSync`,
registeredPgSync
)
}

if (effectiveWake) {
Expand Down
8 changes: 4 additions & 4 deletions packages/agents-runtime/src/setup-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -886,10 +886,9 @@ export function createSetupContext(
source.ensureStream.contentType
)
}
const sourceStreamUrl =
source.sourceType === `pgSync` || !source.streamUrl.startsWith(`/`)
? source.streamUrl
: appendPathToUrl(config.serverBaseUrl, source.streamUrl)
const sourceStreamUrl = source.streamUrl.startsWith(`/`)
? appendPathToUrl(config.serverBaseUrl, source.streamUrl)
: source.streamUrl
sourceDb = await wiring.createSourceDb(
sourceStreamUrl,
source.schema,
Expand All @@ -912,6 +911,7 @@ export function createSetupContext(
return {
sourceType: source.sourceType,
sourceRef: source.sourceRef,
streamUrl: source.streamUrl,
db: sourceDb,
events,
}
Expand Down
1 change: 1 addition & 0 deletions packages/agents-runtime/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,7 @@ export interface SourceWakeConfig {
export interface ObservationHandle {
sourceType: string
sourceRef: string
streamUrl?: string
db?: EntityStreamDB | ObservationStreamDB
events: Array<ChangeEvent>
}
Expand Down
28 changes: 26 additions & 2 deletions packages/agents-runtime/test/process-wake.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1619,6 +1619,7 @@ describe(`processWake`, () => {

it(`pgSync observe registers pgSync source before source DB preload`, async () => {
const source = pgSync({
url: `http://localhost:30000/v1/shape`,
table: `todos`,
where: `priority = $1`,
params: [`high`],
Expand All @@ -1635,12 +1636,23 @@ describe(`processWake`, () => {
close: mockSourceDbClose,
})

await processWake(makeNotification(), BASE_CONFIG)
const result = await processWake(makeNotification(), BASE_CONFIG)

expect(result?.manifest).toEqual(
expect.arrayContaining([
expect.objectContaining({
key: `source:pgSync:pg-source-1`,
sourceRef: `pg-source-1`,
config: expect.objectContaining({
streamUrl: `/_electric/pg-sync/default/pg-source-1`,
}),
}),
])
)
expect(mockCreateStreamDB).toHaveBeenCalledWith(
expect.objectContaining({
streamOptions: expect.objectContaining({
url: `/_electric/pg-sync/default/pg-source-1`,
url: `http://localhost:3000/_electric/pg-sync/default/pg-source-1`,
contentType: `application/json`,
}),
state: expect.objectContaining({
Expand Down Expand Up @@ -1670,6 +1682,18 @@ describe(`processWake`, () => {
wakeId: `wake-abc`,
},
})
const wakeCall = fetchMock.mock.calls.find(
([url, opts]) =>
String(url).includes(`/_electric/wake`) &&
!String(url).includes(`wake-abc`) &&
(opts as RequestInit | undefined)?.method === `POST`
)
const wakeBody = JSON.parse(wakeCall![1]!.body as string) as Record<
string,
unknown
>
expect(wakeBody.sourceUrl).toBe(`/_electric/pg-sync/default/pg-source-1`)
expect(wakeBody.manifestKey).toBe(`source:pgSync:pg-source-1`)
expect(fetchMock.mock.invocationCallOrder[pgSyncCallIndex]).toBeLessThan(
mockSourceDbPreload.mock.invocationCallOrder[0]
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ describe(`runtime-server-client.registerPgSyncSource`, () => {
})

const options = {
url: `http://localhost:30000/v1/shape`,
table: `todos`,
columns: [`id`, `text`],
where: `priority = $1`,
Expand Down Expand Up @@ -59,7 +60,10 @@ describe(`runtime-server-client.registerPgSyncSource`, () => {
})

await expect(
client.registerPgSyncSource({ table: `todos` })
client.registerPgSyncSource({
url: `http://localhost:30000/v1/shape`,
table: `todos`,
})
).rejects.toThrow(/registerPgSyncSource failed \(400\): bad table/)
})
})
4 changes: 4 additions & 0 deletions packages/agents-server/src/entity-registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1641,6 +1641,10 @@ export class PostgresRegistry {
.where(this.pgSyncBridgeWhere(sourceRef))
}

async deletePgSyncBridge(sourceRef: string): Promise<void> {
await this.db.delete(pgSyncBridges).where(this.pgSyncBridgeWhere(sourceRef))
}

async upsertEntityBridge(row: {
sourceRef: string
tags: EntityTags
Expand Down
8 changes: 2 additions & 6 deletions packages/agents-server/src/manifest-side-effects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@ export function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === `object` && value !== null && !Array.isArray(value)
}

function getPgSyncManifestStreamPath(sourceRef: string): string {
return `/_electric/pg-sync/${sourceRef}`
}

export function extractManifestSourceUrl(
manifest: Record<string, unknown> | undefined
): string | undefined {
Expand Down Expand Up @@ -62,8 +58,8 @@ export function extractManifestSourceUrl(
}

if (manifest.sourceType === `pgSync`) {
return typeof manifest.sourceRef === `string`
? getPgSyncManifestStreamPath(manifest.sourceRef)
return typeof config?.streamUrl === `string`
? config.streamUrl
: undefined
}

Expand Down
Loading
Loading