Skip to content

Commit bc54562

Browse files
guyofeckclaude
andcommitted
feat(entities): Implement watch() method for live query subscriptions
This adds a watch() method to EntityHandler that allows subscribing to filtered subsets of entities with real-time change notifications. Features: - watch(options, callback) method with filter, sort, fields, limit options - WatchEvent with changeType ('added', 'modified', 'removed') - subscribeQuery/unsubscribeQuery socket events for server communication - Proper cleanup on unsubscribe The watch() method differs from subscribe() in that: - It supports filter, sort, fields, and limit options - It returns changeType indicating how the change affects the filtered results - It uses dedicated subscribe_query socket events Example usage: ```typescript const unsubscribe = base44.entities.Task.watch( { filter: { status: 'active' }, sort: '-created_date', limit: 10 }, (event) => { if (event.changeType === 'added') { console.log('New matching task:', event.data); } } ); ``` Fixes #83 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 38ea7cf commit bc54562

4 files changed

Lines changed: 477 additions & 0 deletions

File tree

src/modules/entities.ts

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ import {
55
RealtimeCallback,
66
RealtimeEvent,
77
RealtimeEventType,
8+
WatchCallback,
9+
WatchChangeType,
10+
WatchEvent,
11+
WatchOptions,
812
} from "./entities.types";
913
import { RoomsSocket } from "../utils/socket-utils.js";
1014

@@ -69,6 +73,26 @@ function parseRealtimeMessage(dataStr: string): RealtimeEvent | null {
6973
}
7074
}
7175

76+
/**
77+
* Parses the watch (live query) message data and extracts event information.
78+
* @internal
79+
*/
80+
function parseWatchMessage(dataStr: string): WatchEvent | null {
81+
try {
82+
const parsed = JSON.parse(dataStr);
83+
return {
84+
changeType: parsed.change_type as WatchChangeType,
85+
eventType: parsed.type as RealtimeEventType,
86+
data: parsed.data,
87+
id: parsed.id || parsed.data?.id,
88+
timestamp: parsed.timestamp || new Date().toISOString(),
89+
};
90+
} catch (error) {
91+
console.warn("[Base44 SDK] Failed to parse watch message:", error);
92+
return null;
93+
}
94+
}
95+
7296
/**
7397
* Creates a handler for a specific entity.
7498
*
@@ -186,5 +210,38 @@ function createEntityHandler(
186210

187211
return unsubscribe;
188212
},
213+
214+
// Watch for changes to a filtered subset (live query)
215+
watch(options: WatchOptions, callback: WatchCallback): () => void {
216+
const socket = getSocket();
217+
218+
// Use subscribeQuery to send subscription options to the server
219+
const unsubscribe = socket.subscribeQuery(
220+
appId,
221+
entityName,
222+
{
223+
filter: options.filter,
224+
sort: options.sort,
225+
fields: options.fields,
226+
limit: options.limit,
227+
},
228+
{
229+
update_model: (msg) => {
230+
const event = parseWatchMessage(msg.data);
231+
if (!event) {
232+
return;
233+
}
234+
235+
try {
236+
callback(event);
237+
} catch (error) {
238+
console.error("[Base44 SDK] Watch callback error:", error);
239+
}
240+
},
241+
}
242+
);
243+
244+
return unsubscribe;
245+
},
189246
};
190247
}

src/modules/entities.types.ts

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,14 @@
33
*/
44
export type RealtimeEventType = "create" | "update" | "delete";
55

6+
/**
7+
* Change types for live query (watch) subscriptions.
8+
* - "added": Entity now matches the filter but didn't before (or was created matching)
9+
* - "modified": Entity matched before and still matches
10+
* - "removed": Entity matched before but no longer matches (or was deleted)
11+
*/
12+
export type WatchChangeType = "added" | "modified" | "removed";
13+
614
/**
715
* Payload received when a realtime event occurs.
816
*/
@@ -17,6 +25,41 @@ export interface RealtimeEvent {
1725
timestamp: string;
1826
}
1927

28+
/**
29+
* Payload received when a watch (live query) event occurs.
30+
*/
31+
export interface WatchEvent {
32+
/** The type of change relative to the subscription filter */
33+
changeType: WatchChangeType;
34+
/** The CUD event type that triggered this change */
35+
eventType: RealtimeEventType;
36+
/** The entity data after the change */
37+
data: any;
38+
/** The unique identifier of the affected entity */
39+
id: string;
40+
/** ISO 8601 timestamp of when the event occurred */
41+
timestamp: string;
42+
}
43+
44+
/**
45+
* Options for watch (live query) subscriptions.
46+
*/
47+
export interface WatchOptions {
48+
/** MongoDB-style filter query */
49+
filter?: Record<string, any>;
50+
/** Sort field with optional '-' prefix for descending */
51+
sort?: string;
52+
/** Array of field names to include in the response */
53+
fields?: string[];
54+
/** Maximum number of results */
55+
limit?: number;
56+
}
57+
58+
/**
59+
* Callback function invoked when a watch (live query) event occurs.
60+
*/
61+
export type WatchCallback = (event: WatchEvent) => void;
62+
2063
/**
2164
* Callback function invoked when a realtime event occurs.
2265
*/
@@ -312,6 +355,64 @@ export interface EntityHandler {
312355
* ```
313356
*/
314357
subscribe(callback: RealtimeCallback): () => void;
358+
359+
/**
360+
* Watches for changes to a filtered subset of records (live query).
361+
*
362+
* Similar to `subscribe`, but allows you to specify filter, sort, fields,
363+
* and limit options. The callback receives events with a `changeType` that
364+
* indicates how the change affects the filtered result set:
365+
* - `'added'`: A record now matches your filter (created or updated to match)
366+
* - `'modified'`: A record still matches your filter but was updated
367+
* - `'removed'`: A record no longer matches your filter (deleted or updated to not match)
368+
*
369+
* @param options - Options for the watch subscription:
370+
* - `filter`: MongoDB-style filter query to match records
371+
* - `sort`: Sort field with optional '-' prefix for descending order
372+
* - `fields`: Array of field names to include in the response
373+
* - `limit`: Maximum number of records to track
374+
* @param callback - Callback function called when a matching entity changes. The callback receives an event object with:
375+
* - `changeType`: How the change affects your filtered results - `'added'`, `'modified'`, or `'removed'`
376+
* - `eventType`: The underlying CUD operation - `'create'`, `'update'`, or `'delete'`
377+
* - `data`: The entity data after the change
378+
* - `id`: The unique identifier of the affected entity
379+
* - `timestamp`: ISO 8601 timestamp of when the event occurred
380+
* @returns Unsubscribe function to stop receiving updates.
381+
*
382+
* @example
383+
* ```typescript
384+
* // Watch for changes to active high-priority tasks
385+
* const unsubscribe = base44.entities.Task.watch(
386+
* {
387+
* filter: { status: 'active', priority: 'high' },
388+
* sort: '-created_date',
389+
* limit: 10
390+
* },
391+
* (event) => {
392+
* if (event.changeType === 'added') {
393+
* console.log('New high-priority task:', event.data);
394+
* } else if (event.changeType === 'removed') {
395+
* console.log('Task no longer high-priority:', event.id);
396+
* }
397+
* }
398+
* );
399+
*
400+
* // Later, clean up the subscription
401+
* unsubscribe();
402+
* ```
403+
*
404+
* @example
405+
* ```typescript
406+
* // Watch for changes to current user's tasks
407+
* const unsubscribe = base44.entities.Task.watch(
408+
* { filter: { assignee: currentUser.id } },
409+
* (event) => {
410+
* console.log(`My task ${event.id} was ${event.changeType}:`, event.data);
411+
* }
412+
* );
413+
* ```
414+
*/
415+
watch(options: WatchOptions, callback: WatchCallback): () => void;
315416
}
316417

317418
/**

src/utils/socket-utils.ts

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,46 @@ export interface RoomsSocketConfig {
1212
export type TSocketRoom = string;
1313
export type TJsonStr = string;
1414

15+
/**
16+
* Options for watch (live query) subscriptions.
17+
*/
18+
export interface WatchSubscriptionOptions {
19+
filter?: Record<string, any>;
20+
sort?: string;
21+
fields?: string[];
22+
limit?: number;
23+
}
24+
1525
type RoomsSocketEventsMap = {
1626
listen: {
1727
connect: () => Promise<void> | void;
1828
update_model: (msg: {
1929
room: string;
2030
data: TJsonStr;
2131
}) => Promise<void> | void;
32+
subscribed: (msg: {
33+
room: string;
34+
entity_name: string;
35+
options: WatchSubscriptionOptions;
36+
}) => Promise<void> | void;
37+
unsubscribed: (msg: {
38+
room: string;
39+
entity_name: string;
40+
}) => Promise<void> | void;
2241
error: (error: Error) => Promise<void> | void;
2342
};
2443
emit: {
2544
join: (room: string) => void;
2645
leave: (room: string) => void;
46+
subscribe_query: (data: {
47+
app_id: string;
48+
entity_name: string;
49+
options: WatchSubscriptionOptions;
50+
}) => void;
51+
unsubscribe_query: (data: {
52+
app_id: string;
53+
entity_name: string;
54+
}) => void;
2755
};
2856
};
2957

@@ -53,6 +81,14 @@ function initializeSocket(
5381
return handlers.update_model?.(msg);
5482
});
5583

84+
socket.on("subscribed", async (msg) => {
85+
return handlers.subscribed?.(msg);
86+
});
87+
88+
socket.on("unsubscribed", async (msg) => {
89+
return handlers.unsubscribed?.(msg);
90+
});
91+
5692
socket.on("error", async (error) => {
5793
return handlers.error?.(error);
5894
});
@@ -162,9 +198,53 @@ export function RoomsSocket({ config }: { config: RoomsSocketConfig }) {
162198
};
163199
};
164200

201+
/**
202+
* Subscribe to a live query with filter, sort, fields, and limit options.
203+
* This sends subscribe_query to the server and sets up listeners for updates.
204+
*/
205+
const subscribeQuery = (
206+
appId: string,
207+
entityName: string,
208+
options: WatchSubscriptionOptions,
209+
handlers: Partial<{ [k in TEvent]: THandler<k> }>
210+
) => {
211+
// The room name matches the backend format
212+
const room = `entities:${appId}:${entityName}:watch`;
213+
214+
// Add handlers for this room
215+
if (!roomsToListeners[room]) {
216+
roomsToListeners[room] = [];
217+
}
218+
roomsToListeners[room].push(handlers);
219+
220+
// Send subscribe_query event to server
221+
socket.emit("subscribe_query", {
222+
app_id: appId,
223+
entity_name: entityName,
224+
options,
225+
});
226+
227+
// Return unsubscribe function
228+
return () => {
229+
roomsToListeners[room] =
230+
roomsToListeners[room]?.filter((listener) => listener !== handlers) ??
231+
[];
232+
233+
if (roomsToListeners[room].length === 0) {
234+
// Send unsubscribe_query event to server
235+
socket.emit("unsubscribe_query", {
236+
app_id: appId,
237+
entity_name: entityName,
238+
});
239+
delete roomsToListeners[room];
240+
}
241+
};
242+
};
243+
165244
return {
166245
socket,
167246
subscribeToRoom,
247+
subscribeQuery,
168248
updateConfig,
169249
updateModel,
170250
disconnect,

0 commit comments

Comments
 (0)