Y/hub is a collaborative document backend built on Yjs. It implements the standard y-websocket protocol and extends it with attribution, history management, and selective undo/redo capabilities.
All endpoints require an auth-cookie which will be checked via the PERM
CALLBACK.
It is assumed that all documents can be identified by a unique {org}/{docid}
combination. Furthermore, all "body" content is encoded via lib0/encoding's
encodeAny. All binary data in parameters is encoded via base64.
The standard WebSocket backend that is compatible with y-websocket, and TipTapProvider.
For each Yjs document, there is always a gc'd version, and a non-gc'd version. Optionally, you may fork the document to a branch, which users can use for implementing suggestions. Branched documents have a gc'd version and a non-gc'd version as well.
ws://{host}/ws/{org}/{docid}parameters:{ gc?: boolean, branch?: string, customAttributions?: string }gc=true(default): standard garbage-collected documentgc=false: full document history which can be used to reconstruct editing history.branch="main": (default) The default branch-name if not specified otherwise.branch=string: Optionally, define a custom branch. Changes won't be automatically synced with other branches.customAttributions=string: optional comma-separatedkey:valuepairs (e.g.source:ai,model:gpt4). All updates sent through this connection will include these custom attributions in the contentmap, stored asinsert:<key>/delete:<key>attribution attributes alongside the standard ones.
Retrieve and update the Yjs document via REST API.
Retrieve the current state of the Yjs document.
GET /ydoc/{org}/{docid}parameters:{ gc?: boolean, branch?: string, awareness?: boolean }gc=true(default): retrieve the garbage-collected documentgc=false: retrieve the full document history (non-gc version)branch="main"(default): the branch to retrieveawareness=true: also include the room's merged awareness state in the response (default: omitted)- Returns
{ doc: Uint8Array, awareness?: Uint8Array }.docis the encoded Yjs document update.awareness, when requested and non-empty, contains the bare awareness update bytes (same format asencodeAwarenessUpdate(...)and as theawarenessfield accepted byPATCH /ydoc) — directly consumable byapplyAwarenessUpdate. Omitted when the room has no awareness state.
Update the Yjs document with new changes. Requires write access.
PATCH /ydoc/{org}/{docid}body:{ update?: Uint8Array, awareness?: Uint8Array, customAttributions?: Array<{ k: string, v: string }> }parameters:{ branch?: string }update: optional Yjs update (encoded viaY.encodeStateAsUpdateor similar). Diffed against the current document state — only new content is applied and attributed. Attributions are automatically assigned to the authenticated user.awareness: optional awareness update bytes — the bare output ofencodeAwarenessUpdate(awareness, clientIds)from@y/protocols/awareness(nomessageAwarenesswire-format prefix). Distributed to connected clients through the same Redis channel the WebSocket path uses.customAttributions: optional array of key-value pairs to attach as custom attributions to theupdate's changes. Stored asinsert:<key>/delete:<key>attribution attributes alongside the standard ones. Has no effect when onlyawarenessis supplied.branch="main"(default): the branch to update- At least one of
updateorawarenessmust be present; an empty body returns400 Bad Request. - Changes are distributed to connected WebSocket clients.
- Returns
{ success: true, message: string }on success.
import * as Y from 'yjs'
import * as encoding from 'lib0/encoding'
import * as decoding from 'lib0/decoding'
// Retrieve the current document
const getResponse = await fetch('/ydoc/my-org/my-doc-id')
const getBuffer = await getResponse.arrayBuffer()
const getDecoder = decoding.createDecoder(new Uint8Array(getBuffer))
const { doc } = decoding.readAny(getDecoder)
// Apply the remote state to a local document
const ydoc = new Y.Doc()
Y.applyUpdate(ydoc, doc)
// Make local changes
ydoc.getText('content').insert(0, 'Hello World')
// Encode the update and send it
const update = Y.encodeStateAsUpdate(ydoc)
const encoder = encoding.createEncoder()
encoding.writeAny(encoder, { update })
const body = encoding.toUint8Array(encoder)
const patchResponse = await fetch('/ydoc/my-org/my-doc-id', {
method: 'PATCH',
headers: { 'Content-Type': 'application/octet-stream' },
body
})Rollback all changes that match the pattern. The changes will be distributed via websockets.
POST /rollback/{org}/{docid}body:{ from?: number, to?: number, by?: string, contentIds?: Y.ContentIds, customAttributions?: Array<{ k: string, v: string }>, withCustomAttributions?: Array<{ k: string, v: string }> }from/to: unix timestamp range filterby=string: comma-separated list of user-ids that matches the attributionscontentIds: Changeset that describes the changes between two versions.customAttributions: optional array of key-value pairs to attach as custom attributions to the rollback changes themselves (the undo operation).withCustomAttributions: optional array of key-value pairs to filter which changes to undo. Only changes whose attributions match all specified key-value pairs will be rolled back.
- Rollback all changes that happened after timestamp
X:POST /rollback/{org}/{docid}?from=X- If your "versions" have timestamps, this call enables you to revert to a specific version of the document.
- Rollback all changes from user-id
Uthat happened between timestampXandY:POST /rollback/{org}/{docid}?by=U&from=X&to=Y- This call enables you to undo all changes within a certain editing-interval.
- Rollback all changes of a certain user between two versions:
POST /rollback/{org}/{docid}body:{ by: userid, contentIds: Y.createContentIdsFromDocDiff(prevYDoc, nextYDoc) }
Visualize attributed changes using either pure deltas or by retrieving the before and after state of a Yjs doc. Optionally, include relevant attributions.
GET /changeset/{org}/docidparameters:{ from?: number, to?: number, by?: string, ydoc?: boolean, contentIds?: Y.ContentIds, delta?: boolean, attributions?: boolean, withCustomAttributions?: string }from/to: unix timestamp range filterby=string: comma-separated list of user-ids that matches the attributionswithCustomAttributions=string: filter by custom attributions usingkey:valuepairs, comma-separated (e.g.source:import,tag:v2). Only changes matching all specified attributions are included.contentIds: Changeset that describes the changes between two versions. @todo not implementedydoc=true: include encoded Yjs docsdelta=true: include delta representationattributions=true: include attributions- Returns
{ prevDoc?: Y.Doc, nextDoc?: Y.Doc, attributions?: Y.ContentMap, delta?: Delta }- currently returns only the ydoc.get()-delta.
- Retrieve activity
GET /activity/{org}/{docid}?from={now-1day} - Optionally, bundle changes that belong to each other:
[1, 2, 70, 71] ⇒ [2, 71]- because1,2and70,71belong to each other. - For each timestamp:
GET /changeset/{org}/{docid}?from=timestamps[I - 1]&to=timestamps[I]&delta=true&attributions=true - Which will give you the state of the document at timestamp
from:deltaStateand the (attributed) diff that is needed to get to timestampto:diff.
Retrieve all editing-timestamps for a certain document. Use the activity API and the changeset API to reconstruct an editing trail.
GET /activity/{org}/{docid}parameters:{ from?: number, to?: number, by?: string, limit?: number, order?: string, group?: boolean, delta?: boolean, withCustomAttributions?: string, customAttributions?: boolean, contentIds?: string }from/to: unix timestamp range filterby=string: comma-separated list of user-ids to filter bywithCustomAttributions=string: filter by custom attributions usingkey:valuepairs, comma-separated (e.g.source:import,tag:v2). Only changes matching all specified attributions are included.contentIds=string: base64-encodedY.ContentIdsbinary. When provided, only activity entries whose content intersects the given content set are returned. Encode viabuffer.toBase64(Y.encodeContentIds(contentIds))(import * as buffer from 'lib0/buffer').limit=number: maximum number of entries to returnorder='asc'|'desc':"asc"(oldest first) or"desc"(newest first, default)group=boolean: bundle consecutive changes from the same user into a single entry (experimental)delta=boolean: include delta representation for each activity entrycustomAttributions=true: include the list of custom attributions associated with each activity entry. When enabled, each entry includes acustomAttributionsfield containing deduplicated{ k, v }pairs collected from the underlying attribution attributes (e.g.insert:<key>). When grouping is enabled, custom attributions from merged entries are combined and deduplicated.- Returns
Array<{ from: number, to: number, by: string?, delta?: Delta, customAttributions?: Array<{ k: string, v: string }> }>customAttributionsis only present whencustomAttributions=true
Webhooks are configured using environment variables.
YDOC_UPDATE_CALLBACK=http://localhost:5173/ydocbody:encoded ydoc- Called whenever the Yjs document was updated (after a debounce)YDOC_CHANGE_CALLBACK=http://localhost:5173/ydocbody:{ ydoc: v2 encoded ydoc, delta: delta describing all changes }- Called whenever the Yjs document was updated (after a debounce).AUTH_PERM_CALLBACK=http://localhost:5173/auth/perm- Called to check Authentication of a client.
The YHub class is available when importing @y/hub directly. It exposes methods for reading and writing documents programmatically, bypassing the WebSocket and REST layers. This is useful for server-side scripts, migrations, and data pipelines.
import { createYHub } from '@y/hub'
const yhub = await createYHub(config)| Field | Type | Required | Description |
|---|---|---|---|
redis.url |
string |
yes | Redis connection URL |
redis.prefix |
string |
yes | Key prefix for all Redis entries (use a unique value per environment) |
redis.taskDebounce |
number |
no | Milliseconds before a worker picks up a compaction task. Default: 120 000 |
redis.minMessageLifetime |
number |
no | Minimum time in ms that update messages are kept in Redis streams before compaction. Default: 60 000 |
redis.cacheTtl |
number |
no | TTL in seconds for cached API responses. Default: 10 |
redis.socket |
object |
no | Custom socket options merged into the Redis client socket config. See node-redis socket options for available options. |
postgres |
string |
yes | PostgreSQL connection string |
persistence |
PersistencePlugin[] |
yes | One or more storage plugins (e.g. S3PersistenceV1). At least one is required. |
server |
object | null |
no | HTTP/WebSocket server config. Set to null to run without a server (worker/script mode). |
server.port |
number |
yes* | Port to listen on |
server.auth |
AuthPlugin |
yes* | Auth plugin created with createAuthPlugin |
server.maxDocSize |
number |
no | Maximum Ydoc size in bytes, used for WebSocket payload limits. Default: 500 MB |
worker |
object | null |
no | Background compaction worker config. Set to null to disable. |
worker.taskConcurrency |
number |
yes* | Maximum number of compaction tasks to process in parallel |
worker.events.docUpdate |
function |
no | Called after each compaction with the merged DocTable |
Example: full server setup
import { createYHub, createAuthPlugin } from '@y/hub'
import { S3PersistenceV1 } from '@y/hub/plugins/s3'
const yhub = await createYHub({
redis: {
url: 'redis://localhost:6379',
prefix: 'yhub:prod',
// Optional: custom socket options for TLS, etc.
// socket: { rejectUnauthorized: false, ca: fs.readFileSync('/path/to/ca.pem', 'utf-8') }
},
postgres: 'postgres://user:pass@localhost/yhub',
persistence: [
new S3PersistenceV1({ bucket: 'my-bucket', /* ... */ })
],
server: {
port: 8080,
auth: createAuthPlugin({
async readAuthInfo (req) { return { userid: req.getHeader('x-user-id') } },
async getAccessType (authInfo, room) { return 'rw' }
})
},
worker: { taskConcurrency: 10 }
})Example: script / worker-only mode (no HTTP server)
const yhub = await createYHub({
redis: { url: 'redis://localhost:6379', prefix: 'yhub:prod' },
postgres: 'postgres://user:pass@localhost/yhub',
persistence: [ new S3PersistenceV1({ /* ... */ }) ],
server: null,
worker: null
})Retrieve the current state of a document, merging any in-memory Redis updates with the persisted state.
yhub.getDoc(
room: { org: string, docid: string, branch: string },
include: {
gc?: boolean,
nongc?: boolean,
contentmap?: boolean,
contentids?: boolean,
references?: boolean,
awareness?: boolean
},
opts?: { gcOnMerge?: boolean } // default: true
): Promise<{
gcDoc: Uint8Array | null,
nongcDoc: Uint8Array | null,
contentmap: Uint8Array | null,
contentids: Uint8Array | null,
references: Array<{ assetId, asset }> | null,
awareness: Uint8Array | null,
lastClock: string,
lastPersistedClock: string
}>Only fields listed in include with a truthy value are populated; the rest are null. Set gcOnMerge: false to keep full history in the returned gcDoc.
Example
import * as Y from '@y/y'
const { gcDoc } = await yhub.getDoc(
{ org: 'my-org', docid: 'my-doc', branch: 'main' },
{ gc: true }
)
const ydoc = Y.createDocFromUpdate(gcDoc)Attribute and persist a Yjs update directly to the database, without distributing it via Redis or WebSocket. Multiple calls for the same room are merged on next retrieval.
Warning: connected clients will not see the changes until they reconnect.
yhub.unsafePersistDoc(
room: { org: string, docid: string, branch: string },
update: Uint8Array, // encoded Yjs update (e.g. Y.encodeStateAsUpdate)
attributions: { by?: string } // optional author user-id
): Promise<void>Example
import * as Y from '@y/y'
const ydoc = new Y.Doc()
ydoc.getText('content').insert(0, 'Hello from a script')
await yhub.unsafePersistDoc(
{ org: 'my-org', docid: 'my-doc', branch: 'main' },
Y.encodeStateAsUpdate(ydoc),
{ by: 'import-script' }
)Run an LLM agent task against a room. The handler receives a freshly hydrated Y.Doc (snapshot of the room's current state) and a new Awareness instance bound to it. Edits made inside the handler are streamed to all connected clients in real time with attribution derived from the options. The returned promise resolves with the handler's return value only after the agent's awareness has been cleared.
yhub.agentTask(
room: { org: string, docid: string, branch: string },
opts: {
author?: string, // user-id recorded as `insert` / `delete`
displayedAuthor?: string, // awareness `user.name` (defaults to `author`)
promptBy?: string, // sugar for customAttributions: [{ k: 'promptBy', v: promptBy }]
customAttributions?: Array<{ k: string, v: string }>,
clearAwareness?: number | false // seconds; 0 = immediate (default); false = leave in place
},
handler: (ydoc: Y.Doc, awareness: Awareness) => Promise<R> | R
): Promise<R>Behavior:
- The handler's
ydocis a snapshot at task start; concurrent edits from other clients during the task are not merged back in. The handler's own edits are still distributed live. authorflows into the standardinsert/deletecontent attributions, the same way the authenticated user-id does on the WS and REST paths.customAttributionsentries becomeinsert:${k}/delete:${k}attributions, matching thecustomAttributionsshape accepted byPATCH /ydocand the WebSocket query param.promptByis sugar for one such entry and is merged with any explicitcustomAttributions.displayedAuthoris pre-seeded into the agent's awareness as{ user: { name: displayedAuthor } }so other clients can render the agent (cursor labels, presence panels). It is never recorded in the contentmap. The handler can replace or augment it withawareness.setLocalState(...).- On success, awareness is cleared after
clearAwarenessseconds (default0= immediately;false= don't clear). On any error — from the handler or from the underlying stream forwarding — awareness is cleared immediately regardless ofclearAwareness, and the error is re-thrown from the returned promise.
Example
await yhub.agentTask(
{ org: 'my-org', docid: 'my-doc', branch: 'main' },
{
author: 'agent-user-id',
displayedAuthor: 'Claude',
promptBy: 'kevin-user-id',
customAttributions: [{ k: 'model', v: 'opus-4.7' }],
clearAwareness: 20
},
async (ydoc, awareness) => {
// awareness already advertises { user: { name: 'Claude' } }
ydoc.get().applyDelta(delta.create().insert('Hello from the agent').done())
}
)