Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,9 @@ jobs:
working-directory: apps/web
run: |
echo "Deploying PartyKit for PR #${{ github.event.pull_request.number }}"
npx partykit deploy --domain tableslayer.com --preview pr-${{ github.event.pull_request.number }}
npx partykit deploy --domain tableslayer.com --preview pr-${{ github.event.pull_request.number }} \
--var BASE_URL=https://pr-${{ github.event.pull_request.number }}-web.fly.dev \
--var INTERNAL_API_TOKEN=${{ secrets.INTERNAL_API_TOKEN }}
env:
CLOUDFLARE_ACCOUNT_ID: ${{ secrets.CLOUDFLARE_ACCOUNT_ID }}
CLOUDFLARE_API_TOKEN: ${{ secrets.CLOUDFLARE_WORKERS_KEY }}
Expand Down Expand Up @@ -330,6 +332,7 @@ jobs:
GOOGLE_CLIENT_ID=${{ secrets.GOOGLE_CLIENT_ID }}
PUBLIC_PARTYKIT_HOST=pr-${{ github.event.pull_request.number }}.tableslayer.com
PARTYKIT_TOKEN=${{ secrets.PARTYKIT_TOKEN }}
INTERNAL_API_TOKEN=${{ secrets.INTERNAL_API_TOKEN }}
EOF
- name: Deploy Web
uses: superfly/fly-pr-review-apps@1.6.0
Expand Down
6 changes: 5 additions & 1 deletion .github/workflows/fly-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,16 @@ jobs:
GOOGLE_CLIENT_ID=${{ secrets.GOOGLE_CLIENT_ID }}
PUBLIC_PARTYKIT_HOST=partykit.tableslayer.com
PARTYKIT_TOKEN=${{ secrets.PARTYKIT_TOKEN }}
INTERNAL_API_TOKEN=${{ secrets.INTERNAL_API_TOKEN }}
EOF
- uses: ./.github/shared
- name: Deploy PartyKit servers
if: github.ref == 'refs/heads/main'
working-directory: apps/web
run: npx partykit deploy --name tableslayer --domain partykit.tableslayer.com
run: |
npx partykit deploy --name tableslayer --domain partykit.tableslayer.com \
--var BASE_URL=https://tableslayer.com \
--var INTERNAL_API_TOKEN=${{ secrets.INTERNAL_API_TOKEN }}
env:
CLOUDFLARE_ACCOUNT_ID: ${{ secrets.CLOUDFLARE_ACCOUNT_ID }}
CLOUDFLARE_API_TOKEN: ${{ secrets.CLOUDFLARE_WORKERS_KEY }}
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,6 @@ yarn-error.log*

# embedded dbs
*.db*

# local realtime debugging probes (see spec/realtime-sync-v2-progress.md)
scratch-*.mjs
12 changes: 10 additions & 2 deletions apps/web/.env-example
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,21 @@
# - (optional) A Google Cloud account (for Google OAuth login)

# The URL of your deployed app
# For local development, use 5174
# For local development, use http://localhost:5174
# For production use your domain or IP address
BASE_URL=https://localhost:5174
# Also used by the PartyKit server to reach the app for realtime persistence
BASE_URL=http://localhost:5174
# Production, development, or preview. Defines if real emails are sent, etc.
# Set to "production" for self-hosting
ENV_NAME=production

# A shared secret between the app and the PartyKit server (any long random string)
# Required outside local development: the PartyKit server uses it to authenticate
# when saving realtime changes back to the database.
# Set the SAME value as a PartyKit var when deploying:
# npx partykit deploy --var BASE_URL=<your app url> --var INTERNAL_API_TOKEN=<this value>
INTERNAL_API_TOKEN=

# Signup: https://turso.tech/
# Create a database and get keys.
# The api token is used to create and destory databases in CI
Expand Down
28 changes: 28 additions & 0 deletions apps/web/partykit/appApi.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import type * as Party from 'partykit/server';

// Server-to-server bridge from PartyKit rooms to the app's /api/internal endpoints.
// Deployments must set BASE_URL (the app's URL, same value the app itself uses)
// and INTERNAL_API_TOKEN as PartyKit vars; dev falls back to the local SvelteKit
// server and the shared dev token. APP_API_URL remains as an explicit override.

export const appRequest = async <T>(room: Party.Room, path: string, body: unknown): Promise<T> => {
// 5174 is the web app's pinned dev port (apps/web/vite.config.ts)
const configured = (room.env.APP_API_URL ?? room.env.BASE_URL) as string | undefined;
// partykit dev reads the app's .env; tolerate a https://localhost BASE_URL there
const base = (configured ?? 'http://localhost:5174').replace(/^https:\/\/(localhost|127\.)/, 'http://$1');
const token = (room.env.INTERNAL_API_TOKEN as string | undefined) ?? 'dev-internal-token';

const response = await fetch(`${base}${path}`, {
method: 'POST',
headers: {
'content-type': 'application/json',
'x-internal-token': token
},
body: JSON.stringify(body)
});

if (!response.ok) {
throw new Error(`${path} failed: ${response.status} ${await response.text()}`);
}
return (await response.json()) as T;
};
216 changes: 205 additions & 11 deletions apps/web/partykit/gameSession.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,218 @@
import type * as Party from 'partykit/server';
import { onConnect } from 'y-partykit';
import { onConnect, unstable_getYDoc, type YPartyKitOptions } from 'y-partykit';
import type * as Y from 'yjs';
import { classifySceneEvents, getScenesMap, hydrateGameSessionDoc, isDocHydrated } from '../src/lib/realtime/docSchema';
import type { ScenePart } from '../src/lib/realtime/types';
import {
hydrationDataFromWire,
sceneWireFromDoc,
type PersistSessionWire,
type SessionSnapshotWire
} from '../src/lib/realtime/wire';
import { appRequest } from './appApi';

const ALL_PARTS: ScenePart[] = ['settings', 'markers', 'lights', 'annotations', 'fogMask'];
const HYDRATION_ORIGIN = 'server-hydration';
const PERSIST_RETRY_MS = 15000;

/**
* Authoritative home of a game session's live document. The room hydrates its Y doc
* from the database on first use and is the ONLY writer of scene data back to the
* database (debounced, dirty-scenes-only). Clients never save scene state.
*
* Change attribution: client edits arrive as applied updates (transaction.local ===
* false) while this server's own hydration runs in a local transaction — so the
* dirty tracker keys off `remote` and hydration never echoes back into a DB write.
*/
export default class GameSessionServer implements Party.Server {
constructor(public room: Party.Room) {}
#dirty = new Map<string, Set<ScenePart>>();
#deletedSceneIds = new Set<string>();
#observed = false;
#persisting = false;
#retryTimer: ReturnType<typeof setTimeout> | null = null;
#options: YPartyKitOptions;

constructor(public room: Party.Room) {
this.#options = {
persist: { mode: 'snapshot' },
gc: false, // y-partykit forces this with persist; setting it keeps the options hash stable
callback: {
handler: () => {
this.#stats.flushes++;
return this.#persistDirty();
},
debounceWait: 2000,
debounceMaxWait: 10000
}
};
}

// Diagnostics surfaced via the `debug` room command
#stats = { flushes: 0, persistAttempts: 0, persistOk: 0, persistFail: 0, lastError: '' };

#markDirty(sceneId: string, parts: Iterable<ScenePart>) {
let set = this.#dirty.get(sceneId);
if (!set) {
set = new Set();
this.#dirty.set(sceneId, set);
}
for (const part of parts) set.add(part);
}

async #getDoc() {
const doc = await unstable_getYDoc(this.room, this.#options);

if (!this.#observed) {
this.#observed = true;
getScenesMap(doc).observeDeep((events, transaction) => {
for (const change of classifySceneEvents(events as Y.YEvent<Y.Map<unknown>>[], transaction)) {
if (!change.remote) continue;
if (change.part === 'scenes') {
for (const sceneId of change.keys) {
if (getScenesMap(doc).has(sceneId)) {
this.#markDirty(sceneId, ALL_PARTS);
} else {
this.#dirty.delete(sceneId);
this.#deletedSceneIds.add(sceneId);
}
}
} else {
this.#markDirty(change.sceneId, [change.part]);
}
}
});
}

if (!isDocHydrated(doc)) {
const snapshot = await appRequest<SessionSnapshotWire>(this.room, '/api/internal/sessionSnapshot', {
gameSessionId: this.room.id
});
// Re-check: another connection may have hydrated while we awaited
if (!isDocHydrated(doc)) {
hydrateGameSessionDoc(doc, hydrationDataFromWire(snapshot), HYDRATION_ORIGIN);
}
}

return doc;
}

async #persistDirty() {
if (this.#persisting) return;
if (this.#dirty.size === 0 && this.#deletedSceneIds.size === 0) return;

this.#persisting = true;
this.#stats.persistAttempts++;
const dirty = this.#dirty;
const deleted = this.#deletedSceneIds;
this.#dirty = new Map();
this.#deletedSceneIds = new Set();

try {
const doc = await unstable_getYDoc(this.room, this.#options);
const payload: PersistSessionWire = {
gameSessionId: this.room.id,
scenes: [...dirty]
.map(([sceneId, parts]) => sceneWireFromDoc(doc, sceneId, [...parts]))
.filter((scene): scene is NonNullable<typeof scene> => scene !== null),
deletedSceneIds: [...deleted]
};
await appRequest(this.room, '/api/internal/persistSession', payload);
this.#stats.persistOk++;
} catch (error) {
this.#stats.persistFail++;
this.#stats.lastError = error instanceof Error ? `${error.message}` : String(error);
// Merge back what we drained and retry; the doc itself is safe in room
// storage regardless.
for (const [sceneId, parts] of dirty) this.#markDirty(sceneId, parts);
for (const sceneId of deleted) this.#deletedSceneIds.add(sceneId);
console.error(`persistSession failed for ${this.room.id}; retrying in ${PERSIST_RETRY_MS}ms`, error);
this.#scheduleRetry();
} finally {
this.#persisting = false;
}
}

// PartyKit alarms cannot read room.id (platform limitation), so retries use an
// in-instance timer. If the room is evicted before the retry fires, the doc
// snapshot is still safe and the next connection re-hydrates and re-persists.
#scheduleRetry() {
if (this.#retryTimer) return;
this.#retryTimer = setTimeout(async () => {
this.#retryTimer = null;
try {
await this.#getDoc();
} catch (error) {
console.error(`hydration retry failed for game session ${this.room.id}`, error);
this.#scheduleRetry();
}
await this.#persistDirty();
}, PERSIST_RETRY_MS);
}

async onConnect(conn: Party.Connection) {
return await onConnect(conn, this.room, {
// Use snapshot persistence mode - stores latest document state
persist: { mode: 'snapshot' }
// A failed hydration must not kill the websocket: keep the connection open,
// let clients wait on the ready gate, and retry shortly.
try {
await this.#getDoc();
} catch (error) {
console.error(`hydration failed for game session ${this.room.id}; retrying in ${PERSIST_RETRY_MS}ms`, error);
this.#scheduleRetry();
}
return onConnect(conn, this.room, this.#options);
}

// PartyKit automatically handles persistence with snapshot mode
// No need to load from database - clients initialize Y.js with fresh data from SSR
// No need to save to database - clients handle saving through saveScene()
});
async onClose() {
const remaining = [...this.room.getConnections()].length;
if (remaining === 0) {
await this.#persistDirty();
}
}

async onRequest(req: Party.Request) {
// Handle ping requests for diagnostics
// Ping endpoint for diagnostics
if (req.method === 'POST') {
const body = await req.json();
const body = (await req.json()) as { type?: string; timestamp?: number };

// Diagnostics: expose persister state without needing terminal access
if (body.type === 'debug') {
const token = (this.room.env.INTERNAL_API_TOKEN as string | undefined) ?? 'dev-internal-token';
if (req.headers.get('x-internal-token') !== token) {
return new Response('Unauthorized', { status: 401 });
}
return new Response(
JSON.stringify({
observed: this.#observed,
persisting: this.#persisting,
dirty: [...this.#dirty.entries()].map(([sceneId, parts]) => [sceneId.slice(0, 8), [...parts]]),
deleted: [...this.#deletedSceneIds],
stats: this.#stats
}),
{ headers: { 'Content-Type': 'application/json' } }
);
}

// After the app writes session data to the DB directly (import, admin tools),
// it calls this to rebuild the live doc from the database.
if (body.type === 'resync') {
const token = (this.room.env.INTERNAL_API_TOKEN as string | undefined) ?? 'dev-internal-token';
if (req.headers.get('x-internal-token') !== token) {
return new Response('Unauthorized', { status: 401 });
}
const doc = await unstable_getYDoc(this.room, this.#options);
const snapshot = await appRequest<SessionSnapshotWire>(this.room, '/api/internal/sessionSnapshot', {
gameSessionId: this.room.id
});
doc.transact(() => {
const scenes = getScenesMap(doc);
for (const sceneId of [...scenes.keys()]) scenes.delete(sceneId);
}, HYDRATION_ORIGIN);
hydrateGameSessionDoc(doc, hydrationDataFromWire(snapshot), HYDRATION_ORIGIN);
// The resynced doc now matches the DB; drop any stale dirty state
this.#dirty.clear();
this.#deletedSceneIds.clear();
return new Response(JSON.stringify({ ok: true }), { headers: { 'Content-Type': 'application/json' } });
}

if (body.type === 'ping') {
return new Response(
JSON.stringify({
Expand Down
Loading
Loading