diff --git a/.env.template b/.env.template index 3969875..994bf45 100644 --- a/.env.template +++ b/.env.template @@ -21,6 +21,16 @@ S3_YHUB_BUCKET='yhub' # For development only: S3 BUCKET for storing yjs documents when running tests. Never store in the main bucket! S3_YHUB_TEST_BUCKET='yhub-testing' +## Azure Blob Storage (use with BlobPersistence from @y/hub/plugins/blob) +# AZURE_STORAGE_CONNECTION_STRING=DefaultEndpointsProtocol=https;AccountName=...;AccountKey=...;EndpointSuffix=core.windows.net +# AZURE_STORAGE_CONTAINER=yhub + +## Google Cloud Storage (use with BlobPersistence from @y/hub/plugins/blob) +# GCS_BUCKET=yhub +# GCS_PROJECT_ID=my-project +# GCS_KEY_FILENAME=/path/to/service-account.json +# Or set GOOGLE_APPLICATION_CREDENTIALS env var + ## PostgreSQL connection string format: postgresql://[user[:password]@][netloc][:port][/dbname][?param1=value1&...] POSTGRES=postgres://yhub:yhub@localhost:5433/yhub # POSTGRES_TESTING=postgres://yhub:yhub@localhost:5433/yhub-testing diff --git a/STORAGE-ARCHITECTURE.md b/STORAGE-ARCHITECTURE.md index db8f315..b7ef072 100644 --- a/STORAGE-ARCHITECTURE.md +++ b/STORAGE-ARCHITECTURE.md @@ -77,7 +77,7 @@ CREATE TABLE yhub_ydoc_v1 ( This simplified table layout provides several advantages: -1. **Persistence Plugin Integration**: Each column stores schema-encoded assets that can be intercepted by persistence plugins (e.g., S3) before storage. When a plugin handles an asset, a `asset:retrievable:v1` reference is stored instead. +1. **Persistence Plugin Integration**: Each column stores schema-encoded assets that can be intercepted by persistence plugins (e.g., S3 via `S3PersistenceV1`, or any cloud storage via `BlobPersistence`) before storage. When a plugin handles an asset, a `asset:retrievable:v1` reference is stored instead. 2. **Partial Non-GC Document Retrieval**: By storing non-garbage-collected documents (`nongcDoc`) at regular intervals with timestamps, we can query for recent non-GC states without loading years of history. This enables efficient retrieval of document versions with full edit history for recent changes only. @@ -294,14 +294,48 @@ interface PersistencePlugin { } ``` -### Built-in: S3 Persistence +### Built-in: S3 Persistence (`S3PersistenceV1`) -The `S3PersistenceV1` plugin offloads assets to S3: +The `S3PersistenceV1` plugin offloads assets to S3-compatible storage (MinIO, AWS S3) using the `minio` SDK: - **Storage Path**: Uses asset ID string as S3 object key -- **Branch Filter**: Only stores assets from `main` branch by default +- **Branch Filter**: Only stores assets from `main` branch +- **Retries**: Handles transient network errors (connection resets, 503, 429) with one automatic retry - **Returns**: `{ type: 'asset:retrievable:v1', plugin: 'S3Persistence:v1' }` +Import: `import { S3PersistenceV1 } from '@y/hub/plugins/s3'` + +### Generic: Blob Persistence (`BlobPersistence`) + +The `BlobPersistence` plugin is a generic alternative to `S3PersistenceV1` for cloud storage backends that are not S3-compatible (Azure Blob Storage, Google Cloud Storage, etc.). Instead of bundling a specific SDK, the caller provides a simple adapter with four operations: + +| Method | Signature | Description | +|--------|-----------|-------------| +| `put` | `(path: string, data: Buffer) => Promise` | Store a blob. Caller handles retries. | +| `get` | `(path: string) => Promise` | Retrieve a blob. Return `null` if not found. | +| `del` | `(path: string) => Promise` | Delete a blob. Must not throw if missing. | +| `init` | `() => Promise` | *(Optional)* One-time setup (e.g. create container/bucket). | + +- **Storage Path**: Same asset ID string format as S3PersistenceV1 +- **Branch Filter**: Only stores assets from `main` branch (same as S3) +- **Encoding**: Same `lib0/buffer.encodeAny` / `decodeAny` as S3 +- **Deletion**: Delayed 10 seconds to prevent stale reads (same as S3) +- **Returns**: `{ type: 'asset:retrievable:v1', plugin: '' }` + +Import: `import { BlobPersistence } from '@y/hub/plugins/blob'` + +```javascript +// Azure Blob Storage example +const plugin = new BlobPersistence('AzureBlob:v1', { + init: () => container.createIfNotExists(), + put: (path, data) => container.getBlockBlobClient(path).upload(data, data.length), + get: async (path) => { /* return Buffer or null on 404 */ }, + del: (path) => container.getBlockBlobClient(path).deleteIfExists() +}) + +createYHub({ persistence: [plugin], ... }) +``` + ### Plugin Chain Multiple plugins can be chained: diff --git a/package.json b/package.json index da341f6..4e3bc0f 100644 --- a/package.json +++ b/package.json @@ -40,7 +40,11 @@ }, "./plugins/s3": { "default": "./src/plugins/s3.js", - "types": "./dist/src/storage/s3.d.ts" + "types": "./dist/src/plugins/s3.d.ts" + }, + "./plugins/blob": { + "default": "./src/plugins/blob.js", + "types": "./dist/src/plugins/blob.d.ts" } }, "repository": { diff --git a/src/plugins/blob.js b/src/plugins/blob.js new file mode 100644 index 0000000..40628d7 --- /dev/null +++ b/src/plugins/blob.js @@ -0,0 +1,180 @@ +/** + * Generic blob storage persistence plugin for yhub. + * + * Drop-in alternative to {@link S3PersistenceV1} that works with any cloud + * storage backend (Azure Blob Storage, Google Cloud Storage, etc.). Instead of + * being tied to a specific SDK, the caller provides a simple {@link BlobAdapter} + * with four operations (`put`, `get`, `del`, and optionally `init`). + * + * Behavior is identical to S3PersistenceV1: + * - Only `main`-branch assets are offloaded to blob storage; other branches + * remain in PostgreSQL. + * - Assets are encoded with `lib0/buffer.encodeAny` and decoded on retrieval. + * - Object keys use the canonical `t.assetIdToString()` format + * (`id:ydoc:v1/{org}/{docid}/{branch}/{gc}/{t}`). + * - Deletion is delayed 10 seconds to prevent stale reads from concurrent + * clients that still reference the old object. + * + * @example Azure Blob Storage + * import { BlobPersistence } from '@y/hub/plugins/blob' + * import { BlobServiceClient } from '@azure/storage-blob' // your dependency + * + * const client = BlobServiceClient.fromConnectionString(process.env.AZURE_STORAGE_CONNECTION_STRING) + * const container = client.getContainerClient('yhub') + * + * const plugin = new BlobPersistence('AzureBlob:v1', { + * init: () => container.createIfNotExists(), + * put: (path, data) => container.getBlockBlobClient(path).upload(data, data.length), + * get: async (path) => { + * try { + * const resp = await container.getBlockBlobClient(path).download() + * const chunks = [] + * for await (const chunk of resp.readableStreamBody) chunks.push(chunk) + * return Buffer.concat(chunks) + * } catch (e) { + * if (e.statusCode === 404) return null + * throw e + * } + * }, + * del: (path) => container.getBlockBlobClient(path).deleteIfExists() + * }) + * + * createYHub({ persistence: [plugin], ... }) + * + * @example Google Cloud Storage + * import { BlobPersistence } from '@y/hub/plugins/blob' + * import { Storage } from '@google-cloud/storage' // your dependency + * + * const bucket = new Storage({ projectId: process.env.GCS_PROJECT_ID }) + * .bucket(process.env.GCS_BUCKET) + * + * const plugin = new BlobPersistence('GCS:v1', { + * init: async () => { const [exists] = await bucket.exists(); if (!exists) await bucket.create() }, + * put: (path, data) => bucket.file(path).save(data), + * get: async (path) => { + * try { const [data] = await bucket.file(path).download(); return data } + * catch (e) { if (e.code === 404) return null; throw e } + * }, + * del: (path) => bucket.file(path).delete({ ignoreNotFound: true }) + * }) + * + * createYHub({ persistence: [plugin], ... }) + * + * @module + */ + +import * as t from '../types.js' +import * as buffer from 'lib0/buffer' +import { logger } from '../logger.js' + +const log = logger.child({ module: 'blob' }) + +/** + * Adapter interface that the caller must implement for their cloud storage + * backend. yhub calls these methods — all cloud-specific concerns (SDK setup, + * authentication, retries, transient-error handling) live in the adapter. + * + * @typedef {{ + * put: (path: string, data: Buffer) => Promise, + * get: (path: string) => Promise, + * del: (path: string) => Promise, + * init?: () => Promise + * }} BlobAdapter + * + * @property {function(string, Buffer): Promise} put + * Store a blob at `path`. The caller is responsible for retries on transient + * errors (cloud-specific error codes differ across providers). + * @property {function(string): Promise} get + * Retrieve a blob by `path`. Must return `null` when the object does not + * exist (the caller maps provider-specific 404s to `null`). + * @property {function(string): Promise} del + * Delete a blob by `path`. Must not throw if the object is already missing. + * @property {function(): Promise} [init] + * Optional one-time setup (e.g. create a container or bucket). Called once + * during yhub startup. + */ + +/** + * Generic blob persistence plugin. Pass any {@link BlobAdapter} to offload + * main-branch assets to a cloud object store without adding cloud SDK + * dependencies to yhub itself. + * + * @implements {t.PersistencePlugin} + */ +export class BlobPersistence { + /** + * @param {string} pluginId + * Unique identifier for this plugin instance, used to tag stored assets so + * they can be retrieved by the correct plugin later. Use a descriptive, + * versioned string (e.g. `'AzureBlob:v1'`, `'GCS:v1'`). + * @param {BlobAdapter} adapter + * Caller-provided adapter implementing the four blob operations. + */ + constructor (pluginId, adapter) { + this._pluginId = pluginId + this._adapter = adapter + } + + get pluginid () { + return this._pluginId + } + + async init () { + await this._adapter.init?.() + } + + /** + * Encode and store `asset` in blob storage. Only main-branch assets are + * offloaded; non-main branches return `null` (kept in PostgreSQL). + * + * @param {t.AssetId} assetId + * @param {t.Asset} asset + * @return {Promise} + */ + async store (assetId, asset) { + if (assetId.branch === 'main') { + const path = t.assetIdToString(assetId) + const data = Buffer.from(buffer.encodeAny(asset)) + await this._adapter.put(path, data) + return { type: 'asset:retrievable:v1', plugin: this._pluginId } + } + return null + } + + /** + * Retrieve and decode an asset previously stored by this plugin. Returns + * `null` if `assetInfo` doesn't belong to this plugin or the blob is missing. + * + * @param {t.AssetId} assetId + * @param {t.Asset} assetInfo + * @return {Promise} + */ + async retrieve (assetId, assetInfo) { + if (assetInfo.type === 'asset:retrievable:v1' && assetInfo.plugin === this._pluginId) { + const path = t.assetIdToString(assetId) + const data = await this._adapter.get(path) + return data && t.$asset.expect(buffer.decodeAny(data)) + } + return null + } + + /** + * Schedule deletion of a previously stored blob. Deletion is delayed by 10 + * seconds to avoid races with clients that may still be reading stale + * references. Returns `false` if `assetInfo` doesn't belong to this plugin. + * + * @param {t.AssetId} assetId + * @param {t.Asset} assetInfo + * @return {Promise} + */ + async delete (assetId, assetInfo) { + if (assetInfo.type !== 'asset:retrievable:v1' || assetInfo.plugin !== this._pluginId) { + return false + } + const path = t.assetIdToString(assetId) + setTimeout(() => { + this._adapter.del(path).catch(err => log.error({ err, path }, 'error deleting object')) + }, 10_000) + return true + } +}