The advanced tier exposes lower-level components for custom pipelines and integrations:
import {
DocumentPipelineImpl,
stageDocumentsForEnqueue,
QueryRuntime,
createWorkspaceRuntimeFactory,
chunkingByTokenSize,
} from "@gsrag/core/advanced";
// Custom document processing
const pipeline = new DocumentPipelineImpl(
providers,
storages,
config,
pipelineStatus,
llmCache,
);
// Custom query execution
const runtime = new QueryRuntime(providers, storages, config);
const result = await runtime.query("What is...", new QueryParam({ mode: "hybrid" }));
// Direct chunking
const chunks = chunkingByTokenSize(
tokenizer,
"document text",
"\n\n", // splitByCharacter
false, // splitByCharacterOnly
100, // chunkOverlapTokenSize
1200, // chunkTokenSize
);
// Workspace runtime factory
const factory = createWorkspaceRuntimeFactory(storageFactory);
const workspaceRuntime = factory(providers, storages);import type { DocumentReader } from "@gsrag/contracts";
class MyCustomReader implements DocumentReader {
async read(input: ReaderInput): Promise<ReaderOutput> {
return {
content: "extracted text",
metadata: { source: "custom" },
};
}
}
const gsrag = new GsRag({
providers,
storages,
readers: [myCustomReader], // replaces default readers
});Implement the CompletionProvider or EmbeddingProvider interface:
import type { CompletionProvider, EmbeddingProvider, CompletionRequest } from "@gsrag/contracts";
class MyCompletionProvider implements CompletionProvider {
metadata = { name: "my-model", capabilities: ["completion" as const] };
async complete(request: CompletionRequest): Promise<string | AsyncIterable<string>> {
const { messages, stream, signal } = request;
// Call your LLM API here
return "response text";
}
}
class MyEmbeddingProvider implements EmbeddingProvider {
metadata = { name: "my-embedder", capabilities: ["embedding" as const] };
embeddingDimension = 768;
async embed(request: EmbeddingRequest): Promise<number[][]> {
// Call your embedding API here
return [[0.1, 0.2, ...]];
}
}Implement the contract interfaces from @gsrag/contracts or @gsrag/storage:
import type { KvStorage, VectorStorage, GraphStorage } from "@gsrag/storage";
// or from @gsrag/contracts
class MyCustomKvStorage implements KvStorage {
async getById(id: string): Promise<unknown | null> { ... }
async getByIds(ids: string[]): Promise<Record<string, unknown>> { ... }
async upsert(items: Record<string, unknown>): Promise<void> { ... }
async deleteById(id: string): Promise<void> { ... }
async drop(): Promise<void> { ... }
async indexDoneCallback(): Promise<void> { ... }
getAll?(): Promise<Record<string, unknown>> { ... }
}import { CacheUtils, TokenTracker } from "@gsrag/core/internal";
// Raw cache operations
const cache = new CacheUtils(storages.llm_response_cache);
await cache.getOrSet("cache-key", async () => "computed value");
// Concurrency tracking
const tracker = new TokenTracker();
await tracker.run(async () => { ... });For custom job processing:
import { createWorkspaceRuntimeFactory } from "@gsrag/core/advanced";
const runtime = createWorkspaceRuntimeFactory(storageFactory)(providers, defaultStorages);
// Process queued document jobs
await runtime.processJobs({
workspace: "my-workspace",
maxConcurrency: 2,
});
// Monitor job progress
const status = await runtime.getJobStatus("job-123");Direct access to KG CRUD operations:
import {
createEntity,
createRelation,
deleteByEntity,
deleteByRelation,
editEntity,
editRelation,
getEntityInfo,
getRelationInfo,
insertCustomKg,
mergeEntities,
} from "@gsrag/core/advanced";
// All functions take (storages, ...args)You can embed the entire document processing pipeline in your application:
const pipeline = new DocumentPipelineImpl(
providers,
storages,
{
chunkTokenSize: 800,
chunkOverlapTokenSize: 80,
maxConcurrentDocuments: 2,
enableLlmCacheForEntityExtract: true,
},
pipelineStatus,
llmCache,
);
const result = await pipeline.run([{
docId: "doc-001",
content: "Document text...",
filePath: "doc.txt",
trackId: "batch-1",
}]);
console.log(result.status); // "completed" | "failed" | "cancelled"