diff --git a/apps/web/package.json b/apps/web/package.json index f3935aa..a9d47ba 100644 --- a/apps/web/package.json +++ b/apps/web/package.json @@ -20,7 +20,7 @@ "standard-site:delete-record": "node --env-file=.env.local --experimental-strip-types scripts/delete-record.ts", "standard-site:list-records": "node --env-file=.env.local --experimental-strip-types scripts/list-records.ts", "standard-site:sync": "node --experimental-strip-types scripts/sync-standard-site.ts", - "test:standard-site": "node --experimental-strip-types --test src/lib/standard-site.test.ts scripts/sync-standard-site.test.ts" + "test:standard-site": "node --experimental-strip-types --test src/lib/standard-site.test.ts scripts/atproto.test.ts scripts/sync-standard-site.test.ts" }, "dependencies": { "@astrojs/compiler-rs": "catalog:", diff --git a/apps/web/scripts/atproto.test.ts b/apps/web/scripts/atproto.test.ts new file mode 100644 index 0000000..eae029b --- /dev/null +++ b/apps/web/scripts/atproto.test.ts @@ -0,0 +1,178 @@ +import assert from 'node:assert/strict'; +import test from 'node:test'; +import { createInMemoryAtprotoRepo, getRkey } from './atproto.ts'; +import { + buildDocumentRecord, + executePlan, + planReconciliation, +} from './sync-standard-site.ts'; + +test('getRkey extracts the last path segment of an at:// uri', () => { + assert.equal( + getRkey('at://did:plc:example/site.standard.document/hello-world'), + 'hello-world', + ); +}); + +test('createInMemoryAtprotoRepo lists seeded records', async () => { + const repo = createInMemoryAtprotoRepo({ + did: 'did:plc:example', + records: [ + { + uri: 'at://did:plc:example/site.standard.document/hello-world', + value: { path: '/posts/hello-world' }, + }, + ], + }); + + const records = await repo.listRecords('site.standard.document'); + assert.deepEqual(records, [ + { + uri: 'at://did:plc:example/site.standard.document/hello-world', + value: { path: '/posts/hello-world' }, + }, + ]); +}); + +test('createInMemoryAtprotoRepo reflects created records', async () => { + const repo = createInMemoryAtprotoRepo({ did: 'did:plc:example' }); + + const result = await repo.createRecord({ + collection: 'site.standard.document', + record: { path: '/posts/new-post' }, + rkey: 'new-post', + }); + + assert.equal( + result.uri, + 'at://did:plc:example/site.standard.document/new-post', + ); + assert.deepEqual(await repo.listRecords('site.standard.document'), [ + { + uri: 'at://did:plc:example/site.standard.document/new-post', + value: { path: '/posts/new-post' }, + }, + ]); +}); + +test('createInMemoryAtprotoRepo removes deleted records', async () => { + const repo = createInMemoryAtprotoRepo({ + did: 'did:plc:example', + records: [ + { + uri: 'at://did:plc:example/site.standard.document/hello-world', + value: { path: '/posts/hello-world' }, + }, + ], + }); + + await repo.deleteRecord({ + collection: 'site.standard.document', + rkey: 'hello-world', + }); + + assert.deepEqual(await repo.listRecords('site.standard.document'), []); +}); + +test('executePlan applies creates and deletes against an in-memory repo', async () => { + const did = 'did:plc:example'; + const post = { + portableContent: 'Hello.', + publishedAt: '2024-01-20', + slug: 'hello-world', + title: 'Hello World', + }; + + const repo = createInMemoryAtprotoRepo({ + did, + records: [ + { + uri: 'at://did:plc:example/site.standard.document/old-post', + value: { + $type: 'site.standard.document', + path: '/posts/old-post', + site: 'at://did:plc:3z5ja7l2rhnmtr2bni5dyfe7/site.standard.publication/3mnqwgvxn372f', + }, + }, + ], + }); + + const plan = planReconciliation({ + did, + existingRecords: await repo.listRecords('site.standard.document'), + posts: [post], + }); + + const documentsBySlug = await executePlan(repo, plan); + + assert.deepEqual(Object.fromEntries(documentsBySlug), { + 'hello-world': 'at://did:plc:example/site.standard.document/hello-world', + }); + + const records = await repo.listRecords('site.standard.document'); + assert.deepEqual(records, [ + { + uri: 'at://did:plc:example/site.standard.document/hello-world', + value: buildDocumentRecord(post), + }, + ]); +}); + +test('executePlan updates owned fields while preserving unknown remote fields', async () => { + const did = 'did:plc:example'; + const slug = 'hello-world'; + const uri = `at://${did}/site.standard.document/${slug}`; + + const stalePost = { + portableContent: 'Old body.', + publishedAt: '2024-01-20', + slug, + title: 'Old Title', + }; + const currentPost = { + portableContent: 'New body.', + publishedAt: '2024-02-15', + slug, + title: 'New Title', + }; + + const repo = createInMemoryAtprotoRepo({ + did, + records: [ + { + uri, + value: { + ...buildDocumentRecord(stalePost), + title: 'Stale Title Override', + unknownFutureField: 'preserve-me', + }, + }, + ], + }); + + const plan = planReconciliation({ + did, + existingRecords: await repo.listRecords('site.standard.document'), + posts: [currentPost], + }); + + assert.equal(plan.updates.length, 1); + assert.equal(plan.creates.length, 0); + + const documentsBySlug = await executePlan(repo, plan); + + assert.deepEqual(Object.fromEntries(documentsBySlug), { + [slug]: uri, + }); + + const records = await repo.listRecords('site.standard.document'); + assert.deepEqual(records, [ + { + uri, + value: { + ...buildDocumentRecord(currentPost), + unknownFutureField: 'preserve-me', + }, + }, + ]); +}); diff --git a/apps/web/scripts/atproto.ts b/apps/web/scripts/atproto.ts new file mode 100644 index 0000000..b85d492 --- /dev/null +++ b/apps/web/scripts/atproto.ts @@ -0,0 +1,313 @@ +import * as z from 'zod'; + +const SessionSchema = z.object({ + accessJwt: z.string(), + did: z.string(), +}); + +type Session = z.infer; + +const ExistingRecordSchema = z.object({ + uri: z.string().startsWith('at://'), + value: z.record(z.string(), z.unknown()), +}); + +export type ExistingRecord = z.infer; + +const ListRecordsResponseSchema = z.object({ + cursor: z.string().optional(), + records: z.array(ExistingRecordSchema), +}); + +const WriteRecordResponseSchema = z.object({ + uri: z.string().startsWith('at://'), +}); + +const DidDocumentSchema = z.object({ + service: z + .array( + z.object({ + serviceEndpoint: z.string().optional(), + type: z.string().optional(), + }), + ) + .optional(), +}); + +const DidResponseSchema = z.object({ + did: z.string().optional(), +}); + +export interface AtprotoRepo { + createRecord(input: { + collection: string; + rkey: string; + record: unknown; + validate?: boolean; + }): Promise<{ uri: string }>; + deleteRecord(input: { collection: string; rkey: string }): Promise; + readonly did: string; + listRecords(collection: string): Promise>; + putRecord(input: { + collection: string; + rkey: string; + record: unknown; + validate?: boolean; + }): Promise<{ uri: string }>; +} + +async function xrpc( + pds: string, + path: string, + schema: z.ZodType, + init: RequestInit = {}, +): Promise { + const response = await fetch(`${pds}/xrpc/${path}`, { + ...init, + headers: { + 'Content-Type': 'application/json', + ...init.headers, + }, + }); + const text = await response.text(); + + if (!response.ok) { + throw new Error(`${response.status} ${response.statusText}: ${text}`); + } + + return schema.parse(JSON.parse(text)); +} + +async function xrpcVoid( + pds: string, + path: string, + init: RequestInit = {}, +): Promise { + const response = await fetch(`${pds}/xrpc/${path}`, { + ...init, + headers: { + 'Content-Type': 'application/json', + ...init.headers, + }, + }); + + if (!response.ok) { + const text = await response.text(); + throw new Error(`${response.status} ${response.statusText}: ${text}`); + } +} + +async function resolveDid(identifier: string): Promise { + if (identifier.startsWith('did:')) { + return identifier; + } + + const response = await fetch( + `https://public.api.bsky.app/xrpc/com.atproto.identity.resolveHandle?handle=${encodeURIComponent(identifier)}`, + ); + const body = DidResponseSchema.parse(await response.json()); + + if (!response.ok) { + throw new Error(`Could not resolve ${identifier}: ${JSON.stringify(body)}`); + } + + if (!body.did) { + throw new Error(`Could not resolve ${identifier}`); + } + + return body.did; +} + +async function resolvePds(did: string): Promise { + const response = await fetch(`https://plc.directory/${did}`); + const body = DidDocumentSchema.parse(await response.json()); + + if (!response.ok) { + throw new Error(`Could not resolve DID document for ${did}`); + } + + const service = body.service?.find( + (item) => item.type === 'AtprotoPersonalDataServer', + ); + if (!service?.serviceEndpoint) { + throw new Error(`${did} has no AT Protocol PDS service`); + } + + return service.serviceEndpoint.replace(/\/$/, ''); +} + +async function createSession( + pds: string, + identifier: string, + password: string, +): Promise { + return xrpc(pds, 'com.atproto.server.createSession', SessionSchema, { + body: JSON.stringify({ identifier, password }), + method: 'POST', + }); +} + +export function getRkey(uri: string): string { + const index = uri.lastIndexOf('/'); + return uri.slice(index + 1); +} + +export async function connectAtprotoRepo(input: { + identifier: string; + password: string; +}): Promise { + const did = await resolveDid(input.identifier); + const pds = await resolvePds(did); + const session = await createSession(pds, input.identifier, input.password); + const auth = { Authorization: `Bearer ${session.accessJwt}` }; + + return { + did, + async listRecords(collection: string): Promise> { + const records: Array = []; + let cursor: string | undefined; + + while (true) { + const query = new URLSearchParams({ + collection, + limit: '100', + repo: did, + }); + if (cursor) { + query.set('cursor', cursor); + } + + const body = await xrpc( + pds, + `com.atproto.repo.listRecords?${query}`, + ListRecordsResponseSchema, + { + headers: auth, + }, + ); + records.push(...body.records); + if (!body.cursor) break; + cursor = body.cursor; + } + + return records; + }, + async createRecord(input: { + collection: string; + rkey: string; + record: unknown; + validate?: boolean; + }): Promise<{ uri: string }> { + return xrpc( + pds, + 'com.atproto.repo.createRecord', + WriteRecordResponseSchema, + { + body: JSON.stringify({ + collection: input.collection, + record: input.record, + repo: did, + rkey: input.rkey, + validate: input.validate ?? false, + }), + headers: auth, + method: 'POST', + }, + ); + }, + async putRecord(input: { + collection: string; + rkey: string; + record: unknown; + validate?: boolean; + }): Promise<{ uri: string }> { + return xrpc( + pds, + 'com.atproto.repo.putRecord', + WriteRecordResponseSchema, + { + body: JSON.stringify({ + collection: input.collection, + record: input.record, + repo: did, + rkey: input.rkey, + validate: input.validate ?? false, + }), + headers: auth, + method: 'POST', + }, + ); + }, + async deleteRecord(input: { + collection: string; + rkey: string; + }): Promise { + await xrpcVoid(pds, 'com.atproto.repo.deleteRecord', { + body: JSON.stringify({ + collection: input.collection, + repo: did, + rkey: input.rkey, + }), + headers: auth, + method: 'POST', + }); + }, + }; +} + +export function createInMemoryAtprotoRepo(input?: { + did?: string; + records?: Array; +}): AtprotoRepo { + const did = input?.did ?? 'did:plc:inmemory'; + const records = new Map(); + + for (const record of input?.records ?? []) { + records.set(record.uri, record); + } + + function buildUri(collection: string, rkey: string): string { + return `at://${did}/${collection}/${rkey}`; + } + + return { + did, + async listRecords(collection: string): Promise> { + const prefix = `at://${did}/${collection}/`; + return [...records.values()].filter((record) => + record.uri.startsWith(prefix), + ); + }, + async createRecord(input: { + collection: string; + rkey: string; + record: unknown; + validate?: boolean; + }): Promise<{ uri: string }> { + const uri = buildUri(input.collection, input.rkey); + if (records.has(uri)) { + throw new Error(`Record already exists at ${uri}`); + } + const stored = ExistingRecordSchema.parse({ uri, value: input.record }); + records.set(stored.uri, stored); + return { uri: stored.uri }; + }, + async putRecord(input: { + collection: string; + rkey: string; + record: unknown; + validate?: boolean; + }): Promise<{ uri: string }> { + const uri = buildUri(input.collection, input.rkey); + const stored = ExistingRecordSchema.parse({ uri, value: input.record }); + records.set(stored.uri, stored); + return { uri: stored.uri }; + }, + async deleteRecord(input: { + collection: string; + rkey: string; + }): Promise { + records.delete(buildUri(input.collection, input.rkey)); + }, + }; +} diff --git a/apps/web/scripts/delete-record.ts b/apps/web/scripts/delete-record.ts index c8387a8..c02bc1b 100644 --- a/apps/web/scripts/delete-record.ts +++ b/apps/web/scripts/delete-record.ts @@ -1,36 +1,6 @@ -const DEFAULT_IDENTIFIER = 'lukebennett.dev'; - -function getRkey(uri: string): string { - const index = uri.lastIndexOf('/'); - return uri.slice(index + 1); -} - -async function resolveDid(identifier: string): Promise { - if (identifier.startsWith('did:')) return identifier; - - const response = await fetch( - `https://public.api.bsky.app/xrpc/com.atproto.identity.resolveHandle?handle=${encodeURIComponent(identifier)}`, - ); - const body = (await response.json()) as { did: string }; - if (!response.ok || !body.did) { - throw new Error(`Could not resolve ${identifier}`); - } - return body.did; -} +import { connectAtprotoRepo, getRkey } from './atproto.ts'; -async function resolvePds(did: string): Promise { - const response = await fetch(`https://plc.directory/${did}`); - const body = (await response.json()) as { - service?: Array<{ type: string; serviceEndpoint: string }>; - }; - const service = body.service?.find( - (item) => item.type === 'AtprotoPersonalDataServer', - ); - if (!service?.serviceEndpoint) { - throw new Error(`${did} has no PDS`); - } - return service.serviceEndpoint; -} +const DEFAULT_IDENTIFIER = 'lukebennett.dev'; async function main() { const raw = process.argv[2]; @@ -50,44 +20,8 @@ async function main() { process.exit(1); } - const did = await resolveDid(identifier); - const pds = await resolvePds(did); - console.error(`Resolved ${identifier} → ${did}, PDS: ${pds}`); - - const sessionRes = await fetch( - `${pds}/xrpc/com.atproto.server.createSession`, - { - body: JSON.stringify({ identifier, password }), - headers: { 'Content-Type': 'application/json' }, - method: 'POST', - }, - ); - if (!sessionRes.ok) { - console.error( - `Auth failed: ${sessionRes.status} ${await sessionRes.text()}`, - ); - process.exit(1); - } - const session = (await sessionRes.json()) as { accessJwt: string }; - - const deleteRes = await fetch(`${pds}/xrpc/com.atproto.repo.deleteRecord`, { - body: JSON.stringify({ - collection: 'site.standard.document', - repo: did, - rkey, - }), - headers: { - Authorization: `Bearer ${session.accessJwt}`, - 'Content-Type': 'application/json', - }, - method: 'POST', - }); - if (!deleteRes.ok) { - console.error( - `Delete failed: ${deleteRes.status} ${await deleteRes.text()}`, - ); - process.exit(1); - } + const repo = await connectAtprotoRepo({ identifier, password }); + await repo.deleteRecord({ collection: 'site.standard.document', rkey }); console.log(`Deleted site.standard.document/${rkey}`); } diff --git a/apps/web/scripts/list-records.ts b/apps/web/scripts/list-records.ts index 8818b73..63337ee 100644 --- a/apps/web/scripts/list-records.ts +++ b/apps/web/scripts/list-records.ts @@ -1,31 +1,6 @@ -const DEFAULT_IDENTIFIER = 'lukebennett.dev'; - -async function resolveDid(identifier: string): Promise { - if (identifier.startsWith('did:')) return identifier; - - const response = await fetch( - `https://public.api.bsky.app/xrpc/com.atproto.identity.resolveHandle?handle=${encodeURIComponent(identifier)}`, - ); - const body = (await response.json()) as { did: string }; - if (!response.ok || !body.did) { - throw new Error(`Could not resolve ${identifier}`); - } - return body.did; -} +import { connectAtprotoRepo, getRkey } from './atproto.ts'; -async function resolvePds(did: string): Promise { - const response = await fetch(`https://plc.directory/${did}`); - const body = (await response.json()) as { - service?: Array<{ type: string; serviceEndpoint: string }>; - }; - const service = body.service?.find( - (item) => item.type === 'AtprotoPersonalDataServer', - ); - if (!service?.serviceEndpoint) { - throw new Error(`${did} has no PDS`); - } - return service.serviceEndpoint; -} +const DEFAULT_IDENTIFIER = 'lukebennett.dev'; async function main() { const identifier = process.env.ATPROTO_IDENTIFIER ?? DEFAULT_IDENTIFIER; @@ -35,54 +10,12 @@ async function main() { process.exit(1); } - const did = await resolveDid(identifier); - const pds = await resolvePds(did); - console.error(`Resolved ${identifier} → ${did}, PDS: ${pds}`); - - const sessionRes = await fetch( - `${pds}/xrpc/com.atproto.server.createSession`, - { - body: JSON.stringify({ identifier, password }), - headers: { 'Content-Type': 'application/json' }, - method: 'POST', - }, - ); - if (!sessionRes.ok) { - console.error( - `Auth failed: ${sessionRes.status} ${await sessionRes.text()}`, - ); - process.exit(1); - } - const session = (await sessionRes.json()) as { accessJwt: string }; - const auth = `Bearer ${session.accessJwt}`; - - const records: Array<{ uri: string; value: Record }> = []; - let cursor: string | undefined; - - while (true) { - const query = new URLSearchParams({ - collection: 'site.standard.document', - limit: '100', - repo: did, - }); - if (cursor) query.set('cursor', cursor); - - const res = await fetch( - `${pds}/xrpc/com.atproto.repo.listRecords?${query}`, - { headers: { Authorization: auth } }, - ); - const body = (await res.json()) as { - cursor?: string; - records: Array<{ uri: string; value: Record }>; - }; - records.push(...body.records); - if (!body.cursor) break; - cursor = body.cursor; - } + const repo = await connectAtprotoRepo({ identifier, password }); + const records = await repo.listRecords('site.standard.document'); for (const record of records) { - const rkey = record.uri.slice(record.uri.lastIndexOf('/') + 1); - const path = (record.value as Record)?.path ?? '(no path)'; + const rkey = getRkey(record.uri); + const path = record.value?.path ?? '(no path)'; const expectedRkey = typeof path === 'string' && path.startsWith('/posts/') ? path.slice('/posts/'.length) diff --git a/apps/web/scripts/sync-standard-site.ts b/apps/web/scripts/sync-standard-site.ts index 751fc19..b168b06 100644 --- a/apps/web/scripts/sync-standard-site.ts +++ b/apps/web/scripts/sync-standard-site.ts @@ -1,7 +1,12 @@ import { mkdir, readdir, readFile, rm, writeFile } from 'node:fs/promises'; import { basename } from 'node:path'; import { pathToFileURL } from 'node:url'; -import * as z from 'zod'; +import { + type AtprotoRepo, + connectAtprotoRepo, + type ExistingRecord, + getRkey, +} from './atproto.ts'; type PublishedPost = { portableContent: string; @@ -27,44 +32,6 @@ type DocumentRecord = { title: string; }; -const SessionSchema = z.object({ - accessJwt: z.string(), - did: z.string(), -}); - -type Session = z.infer; - -const ExistingRecordSchema = z.object({ - uri: z.string().startsWith('at://'), - value: z.record(z.string(), z.unknown()), -}); - -type ExistingRecord = z.infer; - -const ListRecordsResponseSchema = z.object({ - cursor: z.string().optional(), - records: z.array(ExistingRecordSchema), -}); - -const WriteRecordResponseSchema = z.object({ - uri: z.string().startsWith('at://'), -}); - -const DidDocumentSchema = z.object({ - service: z - .array( - z.object({ - serviceEndpoint: z.string().optional(), - type: z.string().optional(), - }), - ) - .optional(), -}); - -const DidResponseSchema = z.object({ - did: z.string().optional(), -}); - const APP_DIR = new URL('..', import.meta.url); const POSTS_DIR = new URL('content/posts/', APP_DIR); const GENERATED_DIR = new URL('src/generated/', APP_DIR); @@ -388,147 +355,6 @@ function parseSimpleFrontmatter( return fields; } -async function xrpc( - pds: string, - path: string, - schema: z.ZodType, - init: RequestInit = {}, -): Promise { - const response = await fetch(`${pds}/xrpc/${path}`, { - ...init, - headers: { - 'Content-Type': 'application/json', - ...init.headers, - }, - }); - const text = await response.text(); - - if (!response.ok) { - throw new Error(`${response.status} ${response.statusText}: ${text}`); - } - - return schema.parse(JSON.parse(text)); -} - -async function xrpcVoid( - pds: string, - path: string, - init: RequestInit = {}, -): Promise { - const response = await fetch(`${pds}/xrpc/${path}`, { - ...init, - headers: { - 'Content-Type': 'application/json', - ...init.headers, - }, - }); - - if (!response.ok) { - const text = await response.text(); - throw new Error(`${response.status} ${response.statusText}: ${text}`); - } -} - -async function resolveDid(identifier: string): Promise { - if (identifier.startsWith('did:')) { - return identifier; - } - - const response = await fetch( - `https://public.api.bsky.app/xrpc/com.atproto.identity.resolveHandle?handle=${encodeURIComponent(identifier)}`, - ); - const body = DidResponseSchema.parse(await response.json()); - - if (!response.ok) { - throw new Error(`Could not resolve ${identifier}: ${JSON.stringify(body)}`); - } - - if (!body.did) { - throw new Error(`Could not resolve ${identifier}`); - } - - return body.did; -} - -async function resolvePds(did: string): Promise { - const response = await fetch(`https://plc.directory/${did}`); - const body = DidDocumentSchema.parse(await response.json()); - - if (!response.ok) { - throw new Error(`Could not resolve DID document for ${did}`); - } - - const service = body.service?.find( - (item) => item.type === 'AtprotoPersonalDataServer', - ); - if (!service?.serviceEndpoint) { - throw new Error(`${did} has no AT Protocol PDS service`); - } - - return service.serviceEndpoint.replace(/\/$/, ''); -} - -async function createSession( - pds: string, - identifier: string, - password: string, -): Promise { - return xrpc(pds, 'com.atproto.server.createSession', SessionSchema, { - body: JSON.stringify({ identifier, password }), - method: 'POST', - }); -} - -async function listRecords( - pds: string, - auth: Record, - repo: string, - collection: string, -): Promise> { - const records: Array = []; - let cursor: string | undefined; - - while (true) { - const query = new URLSearchParams({ - collection, - limit: '100', - repo, - }); - if (cursor) { - query.set('cursor', cursor); - } - - const body = await xrpc( - pds, - `com.atproto.repo.listRecords?${query}`, - ListRecordsResponseSchema, - { - headers: auth, - }, - ); - records.push(...body.records); - if (!body.cursor) break; - cursor = body.cursor; - } - - return records; -} - -function getRkey(uri: string): string { - const index = uri.lastIndexOf('/'); - return uri.slice(index + 1); -} - -async function authenticate( - identifier: string, - password: string, -): Promise<{ auth: Record; did: string; pds: string }> { - const did = await resolveDid(identifier); - const pds = await resolvePds(did); - const session = await createSession(pds, identifier, password); - return { auth: { Authorization: `Bearer ${session.accessJwt}` }, did, pds }; -} - function printPlan(plan: ReconciliationPlan): void { for (const item of plan.creates) { console.log(`create ${item.slug}`); @@ -547,63 +373,34 @@ function printPlan(plan: ReconciliationPlan): void { ); } -async function executePlan( - pds: string, - auth: Record, - repo: string, +export async function executePlan( + repo: AtprotoRepo, plan: ReconciliationPlan, ): Promise> { const documentsBySlug = new Map(); for (const item of plan.creates) { - const result = await xrpc( - pds, - 'com.atproto.repo.createRecord', - WriteRecordResponseSchema, - { - body: JSON.stringify({ - collection: COLLECTION, - record: item.record, - repo, - rkey: item.slug, - validate: false, - }), - headers: auth, - method: 'POST', - }, - ); + const result = await repo.createRecord({ + collection: COLLECTION, + record: item.record, + rkey: item.slug, + }); recordSyncedDocumentUri(documentsBySlug, item.slug, result.uri); } for (const item of plan.updates) { - const result = await xrpc( - pds, - 'com.atproto.repo.putRecord', - WriteRecordResponseSchema, - { - body: JSON.stringify({ - collection: COLLECTION, - record: mergeOwnedDocumentFields(item.remote.value, item.record), - repo, - rkey: getRkey(item.remote.uri), - validate: false, - }), - headers: auth, - method: 'POST', - }, - ); + const result = await repo.putRecord({ + collection: COLLECTION, + record: mergeOwnedDocumentFields(item.remote.value, item.record), + rkey: getRkey(item.remote.uri), + }); recordSyncedDocumentUri(documentsBySlug, item.slug, result.uri); } for (const item of plan.deletes) { - await xrpcVoid(pds, 'com.atproto.repo.deleteRecord', { - body: JSON.stringify({ - collection: COLLECTION, - repo, - rkey: getRkey(item.record.uri), - }), - headers: auth, - method: 'POST', + await repo.deleteRecord({ + collection: COLLECTION, + rkey: getRkey(item.record.uri), }); } @@ -637,10 +434,10 @@ async function main(): Promise { } const identifier = process.env.ATPROTO_IDENTIFIER ?? DEFAULT_IDENTIFIER; - const { auth, did, pds } = await authenticate(identifier, password); + const repo = await connectAtprotoRepo({ identifier, password }); const posts = await loadPublishedPosts(); - const existingRecords = await listRecords(pds, auth, did, COLLECTION); - const plan = planReconciliation({ did, existingRecords, posts }); + const existingRecords = await repo.listRecords(COLLECTION); + const plan = planReconciliation({ did: repo.did, existingRecords, posts }); if (args.has('--report')) { printPlan(plan); @@ -649,7 +446,7 @@ async function main(): Promise { if (args.has('--write')) { await removeManifest(); - const documentsBySlug = await executePlan(pds, auth, did, plan); + const documentsBySlug = await executePlan(repo, plan); await writeManifest(documentsBySlug); printPlan(plan); }