Skip to content

feat: ingestion pipeline — ingestSource, atomic swap, examples, integration tests (tasks 4.5-4.8)#5

Merged
johnmcollier merged 2 commits into
redhat-ai-dev:mainfrom
johnmcollier:feat/epic4-ingestion-pipeline
Jun 19, 2026
Merged

feat: ingestion pipeline — ingestSource, atomic swap, examples, integration tests (tasks 4.5-4.8)#5
johnmcollier merged 2 commits into
redhat-ai-dev:mainfrom
johnmcollier:feat/epic4-ingestion-pipeline

Conversation

@johnmcollier

Copy link
Copy Markdown
Contributor

Summary

Implements the full ingestion pipeline in src/server/ingestion/, wiring the primitives from PR #4 into an end-to-end sync flow.

  • ingest.tsingestSource orchestrator (clone → discover → parse → bundle → atomic swap → cleanup); exports ingestFromClonedPath for testability; best-effort per-skill error handling; temp dir always cleaned up via try/finally; clone failures propagate to caller
  • ingest.tsatomicSwap wraps deleteBySource + upsertMany in a single SQLite transaction via a new SkillRepository.transaction<T>() method — catalog is never in a partial state during re-sync
  • examples.tsloadExamplesIfEmpty seeds the catalog with 3 built-in skills (git-conventional-commit, code-review-checklist, explain-code) on first boot when both the sources and skills tables are empty
  • src/server/index.ts — wires await loadExamplesIfEmpty(db) at startup

Design note — examples source identity

The built-in examples source is created as a real source record (slug: "examples", url: "built-in") to satisfy the SQLite FK constraint (source_id REFERENCES sources(id)). sourceId: 0 is not usable — there is no source record at id=0 and auto-increment starts at 1. The sourceSlug: "examples" sentinel is the reliable identifier for downstream code (e.g., filtering built-ins from the Sources API in Epic 6).

Test plan

  • npm run build:server — zero TypeScript errors
  • npm test — 46/46 passing (3 test files)
  • Integration tests cover all 3 spec scenarios: valid repo → indexed, malformed → skipped+reported, re-sync → atomic replace

Related

Made with Cursor

- Add transaction<T> method to SkillRepository interface and SqliteSkillRepository
- Implement ingestFromClonedPath (inner pipeline: discover→parse→bundle→atomicSwap)
- Implement ingestSource (full pipeline with clone + cleanup in try/finally)
- Implement atomicSwap using repository transaction for atomic delete+upsert
- Implement loadExamplesIfEmpty seeding 3 built-in skills on first boot
- Wire loadExamplesIfEmpty into buildServer startup sequence
- Add integration tests for all 3 spec scenarios using real local git repos

Co-authored-by: Cursor <cursoragent@cursor.com>
@qodo-code-review

Copy link
Copy Markdown

PR Summary by Qodo

feat: ingestion pipeline — ingestSource, atomic swap, examples, integration tests
✨ Enhancement 🧪 Tests 🕐 20-40 Minutes

Grey Divider

Description

• Implements ingestSource orchestrator in ingest.ts: clone → discover → parse → bundle → atomic
 swap → cleanup
• Adds atomicSwap using SkillRepository.transaction() to delete+upsert in one SQLite transaction
• Seeds 3 built-in skills on first boot via loadExamplesIfEmpty, wired into server startup
• Adds integration tests for valid repo, malformed frontmatter, and atomic re-sync scenarios
Diagram

graph TD
    A["buildServer\n(index.ts)"] --> B["loadExamplesIfEmpty\n(examples.ts)"] --> DB[("SQLite DB")]
    A --> C["ingestSource\n(ingest.ts)"] --> D["clone\n(clone.ts)"]
    C --> E["ingestFromClonedPath\n(ingest.ts)"] --> F["discoverSkills\n(discover.ts)"] --> G["parseFrontmatter\n(frontmatter.ts)"] --> H["bundleSkill\n(bundle.ts)"] --> I["atomicSwap\n(ingest.ts)"] --> J["SkillRepository\n.transaction"] --> DB
    subgraph Legend
      direction LR
      _svc["Function/Module"] ~~~ _db[("Database")]
    end
Loading
High-Level Assessment

The following are alternative approaches to this PR:

1. Add `replaceAllForSource(sourceId, skills)` to SkillRepository
  • ➕ Keeps transaction semantics encapsulated within the repository layer
  • ➕ Avoids exposing a transaction primitive in the repository abstraction
  • ➕ Harder for callers to misuse transactional boundaries
  • ➖ Less flexible for future atomic multi-step operations
  • ➖ Requires new method(s) and implementation updates in all backends
2. Keep current `transaction` repository primitive (this PR)
  • ➕ Highly flexible for future atomic compositions (beyond replace)
  • ➕ Minimal additional surface area (one generic primitive)
  • ➖ Exposes storage-level transaction semantics via the interface
  • ➖ Future non-SQLite implementations must match transactional contract

Recommendation: The PR’s approach is solid and pragmatic for SQLite/better-sqlite3. The main trade-off is exposing transaction on the repository interface; if multi-backend support is expected, consider a more domain-specific atomic method (e.g., replaceAllForSource).

Files changed (7) +465 / -4

Enhancement (5) +284 / -0
SqliteSkillRepository.tsAdd transaction<T> wrapper for better-sqlite3 +4/-0

Add transaction<T> wrapper for better-sqlite3

• Implements 'transaction<T>(fn)' by delegating to 'db.transaction(fn)()' so callers can group repository operations atomically.

src/server/db/SqliteSkillRepository.ts

types.tsExtend SkillRepository with transaction<T> +1/-0

Extend SkillRepository with transaction<T>

• Adds 'transaction<T>(fn: () => T): T' to the 'SkillRepository' interface to support atomic multi-step writes.

src/server/db/types.ts

index.tsSeed example skills during server startup +3/-0

Seed example skills during server startup

• Imports and awaits 'loadExamplesIfEmpty(db)' immediately after DB initialization to ensure first-boot catalog has built-ins.

src/server/index.ts

examples.tsAdd built-in examples seeding logic +148/-0

Add built-in examples seeding logic

• Introduces 'loadExamplesIfEmpty', which creates an 'examples' source record and upserts three built-in skills with SHA-256 digests when both sources and skills tables are empty.

src/server/ingestion/examples.ts

ingest.tsImplement end-to-end ingestion pipeline and atomic swap +128/-0

Implement end-to-end ingestion pipeline and atomic swap

• Adds 'ingestSource' (clone to temp dir, run inner pipeline, cleanup via try/finally), 'ingestFromClonedPath' (discover/parse/bundle with per-skill failure capture), and 'atomicSwap' (delete+upsert in one transaction). Returns a 'SyncReport' with counts and failure reasons.

src/server/ingestion/ingest.ts

Tests (1) +177 / -0
ingest.test.tsAdd integration tests for ingestion scenarios +177/-0

Add integration tests for ingestion scenarios

• Creates integration tests using an in-memory SQLite DB and local git repos to validate: (1) valid repo indexes skills, (2) malformed frontmatter is skipped and reported, and (3) re-sync atomically replaces the source’s skills set.

test/server/ingestion/ingest.test.ts

Documentation (1) +4 / -4
tasks.mdMark ingestion tasks 4.5–4.8 as complete +4/-4

Mark ingestion tasks 4.5–4.8 as complete

• Updates the OpenSpec task checklist to mark ingestion pipeline tasks 4.5 through 4.8 as done.

openspec/changes/rhess-enterprise-skills-server/tasks.md

@qodo-code-review

qodo-code-review Bot commented Jun 19, 2026

Copy link
Copy Markdown

Code Review by Qodo

🐞 Bugs (0) 📘 Rule violations (0) 📎 Requirement gaps (0) 🎨 UX issues (0) 🔗 Cross-repo conflicts (0) 📜 Skill insights (0)

Grey Divider


Action required

1. Temp dir leak on clone ✓ Resolved 🐞 Bug ☼ Reliability
Description
ingestSource() calls clone() before entering the try/finally, so if cloning throws, the temp
directory cleanup is skipped and partially-cloned data can be left under the OS temp directory.
Code

src/server/ingestion/ingest.ts[R121-127]

+  const tmpDir = path.join(os.tmpdir(), `rhess-sync-${sourceId}-${Date.now()}`);
+  await clone(url, tmpDir);
+  try {
+    return await ingestFromClonedPath(sourceId, sourceSlug, tmpDir, repos);
+  } finally {
+    fs.rmSync(tmpDir, { recursive: true, force: true });
+  }
Evidence
The clone call happens before the try/finally, so any exception thrown by clone() bypasses the
rmSync cleanup in the finally block.

src/server/ingestion/ingest.ts[115-127]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

### Issue description
`ingestSource()` performs `await clone(url, tmpDir)` before the `try/finally` that removes `tmpDir`. If `clone()` throws, cleanup never runs and temp directories can accumulate.

### Issue Context
The temp directory path is derived from `os.tmpdir()` and `Date.now()`, and cleanup currently only happens for failures after clone completes.

### Fix Focus Areas
- src/server/ingestion/ingest.ts[121-127]

### Suggested fix
Wrap **both** the clone and ingestion steps in a single `try/finally` so cleanup always runs:

```ts
const tmpDir = ...
try {
 await clone(url, tmpDir);
 return await ingestFromClonedPath(...);
} finally {
 fs.rmSync(tmpDir, { recursive: true, force: true });
}
```

This keeps the existing “clone failures propagate” behavior while still cleaning up partial clones.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


2. Non-atomic examples seeding ✓ Resolved 🐞 Bug ☼ Reliability
Description
loadExamplesIfEmpty() creates the examples source and then inserts skills without a transaction;
if upsertMany() fails, the DB can be left with sources non-empty and skills empty, and
subsequent startups will skip seeding due to the early-return guard.
Code

src/server/ingestion/examples.ts[R128-147]

+export async function loadExamplesIfEmpty(repos: Repositories): Promise<void> {
+  if (repos.skills.count() !== 0 || repos.sources.findAll().length !== 0) {
+    return;
+  }
+
+  const source = repos.sources.create({ slug: "examples", url: "built-in" });
+
+  repos.skills.upsertMany(
+    EXAMPLE_SKILLS.map((skill) => ({
+      sourceId: source.id,
+      sourceSlug: "examples",
+      slug: skill.slug,
+      name: skill.name,
+      description: skill.description,
+      artifactType: "skill-md" as const,
+      digest: sha256(skill.content),
+      content: skill.content,
+      supportingFiles: [],
+    }))
+  );
Evidence
The function returns early when either table is non-empty, but it inserts into sources before
inserting into skills and does not wrap them in a transaction; the schema’s FK is only from
skills to sources, so a standalone sources row is a valid partial state.

src/server/ingestion/examples.ts[128-147]
src/server/db/schema.ts[12-38]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

### Issue description
`loadExamplesIfEmpty()` performs two separate writes (insert source, then upsert skills) without transactional protection. If the second step throws, you can end up with a persistent partial seed (source exists, skills missing) and the function will never retry because it returns early when `sources` is non-empty.

### Issue Context
- The guard condition is `skills.count() !== 0 || sources.findAll().length !== 0`.
- The schema allows a `sources` row to exist without any `skills` rows.

### Fix Focus Areas
- src/server/ingestion/examples.ts[128-147]
- src/server/db/schema.ts[12-38]

### Suggested fix
Make the seed operation atomic and retry-safe:
1. Wrap **both** `repos.sources.create(...)` and `repos.skills.upsertMany(...)` in a single SQLite transaction.
  - Given current abstractions, you can use `repos.skills.transaction(() => { ... })` because both repos share the same underlying `better-sqlite3` connection.
2. Move/repeat the “is empty?” checks *inside* the transaction to avoid races.
3. Optionally: if an `examples` source already exists but has zero skills, proceed to (re)insert the skills (idempotent recovery).

This prevents a permanent half-seeded state and ensures seeding can recover from transient failures.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools



Remediation recommended

3. Async transaction API footgun ✓ Resolved 🐞 Bug ⚙ Maintainability
Description
SkillRepository.transaction<T>(fn: () => T) accepts any return type (including Promise), but the
implementation invokes the callback synchronously, so an async callback would commit immediately
and any work after the first await would run outside the transaction.
Code

src/server/db/types.ts[R56-59]

  upsertMany(skills: UpsertSkillInput[]): void;
  deleteBySource(sourceId: number): void;
  count(): number;
+  transaction<T>(fn: () => T): T;
Evidence
The interface allows any generic return type, and the implementation immediately invokes the
better-sqlite3 transaction wrapper and returns the callback result without awaiting; therefore a
Promise-returning callback would not keep awaited work inside the transaction.

src/server/db/types.ts[52-60]
src/server/db/SqliteSkillRepository.ts[135-137]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

### Issue description
`SkillRepository.transaction<T>(fn: () => T): T` permits `fn` to be `async` (returning `Promise<T>`). The current implementation runs `this.db.transaction(fn)()` immediately and returns the callback’s return value, so transactional boundaries do not extend across `await` points.

### Issue Context
Current call sites are synchronous, but the type signature makes it easy for future code to accidentally assume async safety.

### Fix Focus Areas
- src/server/db/types.ts[52-60]
- src/server/db/SqliteSkillRepository.ts[135-137]

### Suggested fix
Tighten the API contract to make async misuse hard/impossible:
- Option A (recommended): rename to `transactionSync` and type it/document it as sync-only.
- Option B: keep the name but enforce sync-only by:
 - adding a runtime check: if the result is a Promise, throw with a clear error message
 - and/or using a type-level guard (e.g., overloads) to discourage `Promise` return types.

Also add a short doc comment: “Callback must be synchronous; do not `await` inside.”

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools



Informational

4. Duplicate SKILL.md reads ✓ Resolved 🐞 Bug ➹ Performance
Description
For single-file skills, ingestFromClonedPath() reads SKILL.md to parse frontmatter and then
bundleSkillMd() reads the same file again to compute digest/content, causing redundant disk I/O
during ingestion.
Code

src/server/ingestion/ingest.ts[R48-55]

+      const content = fs.readFileSync(candidate.skillMdPath, "utf-8");
+      const fmResult = parseFrontmatter(content);
+      if (!fmResult.ok) {
+        failures.push({ path: relativePath, reason: fmResult.reason });
+        continue;
+      }
+      const bundleResult = await bundleSkill(candidate);
+      indexed.push({
Evidence
ingestFromClonedPath() reads candidate.skillMdPath to parse frontmatter, while bundleSkillMd()
separately reads candidate.skillMdPath again when there are no supporting files.

src/server/ingestion/ingest.ts[45-64]
src/server/ingestion/bundle.ts[22-29]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

### Issue description
The ingestion path reads `SKILL.md` twice for single-file skills: once for frontmatter parsing and once in `bundleSkillMd()` for digest/content.

### Issue Context
This is a small but systematic inefficiency that scales with the number of skills.

### Fix Focus Areas
- src/server/ingestion/ingest.ts[45-64]
- src/server/ingestion/bundle.ts[22-29]

### Suggested fix
Avoid the second read for `artifactType: "skill-md"` by reusing the already-read content. One approach:
- Call `bundleSkill(candidate)` first.
- If it returns `artifactType: "skill-md"`, parse frontmatter from `bundleResult.artifact` (which is the SKILL.md content) instead of reading `SKILL.md` separately.
- For `artifactType: "archive"`, keep the current read-for-frontmatter + archive packing behavior (still only one SKILL.md read in that case).

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


Grey Divider

Qodo Logo

Comment thread src/server/ingestion/ingest.ts
Comment thread src/server/ingestion/examples.ts Outdated
- ingest: move clone() inside try/finally so partial clones are always
  cleaned up on failure (temp dir leak)
- ingest: bundle first and reuse artifact content for frontmatter parsing
  on skill-md type, eliminating duplicate SKILL.md reads
- examples: wrap source create + skill upsert in transactionSync so a
  transient failure cannot leave a permanent orphaned source row
- db: rename transaction<T> → transactionSync<T> with doc comment making
  the sync-only contract explicit and preventing async misuse
- test: remove unused beforeEach import (CI lint failure)

Co-authored-by: Cursor <cursoragent@cursor.com>
@johnmcollier johnmcollier merged commit b97e900 into redhat-ai-dev:main Jun 19, 2026
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant