The RAG (Retrieval-Augmented Generation) system is a production-ready implementation that combines document retrieval with language model generation using LMStudio models and LangGraph orchestration. The system features a modular architecture with comprehensive error handling, runtime configuration management, and full observability.
The system uses Qdrant as the vector database for efficient similarity search, LMStudio (with unsloth/gemma-3-4b-it-GGUF model) for local language model inference, and LangGraph for workflow orchestration with conditional routing. This design ensures privacy (local models), performance (optimized vector search), maintainability (graph-based workflows), and production reliability (comprehensive monitoring and error handling).
The system follows a layered architecture with the following components:
graph TB
subgraph "User Interface Layer"
CLI[Rich-based CLI Interface]
API[REST API Interface]
BATCH[Batch Processing]
end
subgraph "Orchestration Layer"
LG[LangGraph Orchestrator]
WM[Workflow Metrics]
WL[Workflow Logger]
RC[Runtime Config Manager]
end
subgraph "Processing Layer"
EMB[Embedder]
RET[Retriever]
GEN[Generator]
DOC[Document Processor]
PT[Prompt Templates]
end
subgraph "Storage Layer"
QD[Qdrant Vector DB]
FS[File System]
CM[Config Management]
end
subgraph "External Services"
LMS[LMStudio Server<br/>gemma-3-4b-it]
end
subgraph "Error Handling & Monitoring"
EH[Error Handler]
RF[Response Formatter]
LOG[Structured Logging]
end
CLI --> LG
API --> LG
BATCH --> LG
LG --> EMB
LG --> RET
LG --> GEN
LG --> DOC
LG --> WM
LG --> WL
EMB --> QD
RET --> QD
DOC --> FS
GEN --> LMS
GEN --> PT
RC --> CM
EH --> LOG
RF --> CLI
EH --> CLI
Purpose: Handles document ingestion, parsing, and chunking with metadata extraction Key Methods:
process_document(file_path: str) -> List[DocumentChunk]process_batch(file_paths: List[str]) -> List[DocumentChunk]extract_text(file_path: str) -> strchunk_text(text: str, config: ChunkingConfig) -> List[str]
Supported Formats: PDF, TXT, Markdown Chunking Strategy: Configurable via ChunkingConfig with recursive text splitting, overlap control, and metadata preservation
Purpose: Converts text into vector embeddings using sentence-transformers Key Methods:
embed_text(text: str) -> List[float]embed_batch(texts: List[str]) -> List[List[float]]get_embedding_dimension() -> intget_model_info() -> Dict[str, Any]
Model Configuration:
- Default: second-state/All-MiniLM-L6-v2-Embedding-GGUF (384 dimensions)
- Device auto-detection (CUDA/CPU)
- Configurable batch size and normalization
- EmbeddingConfig dataclass for flexible configuration
Purpose: Stores and indexes document embeddings with duplicate detection Key Methods:
add_documents(documents: List[DocumentChunk]) -> boolsearch_similar(query_embedding: List[float], top_k: int) -> List[DocumentChunk]find_similar_to_document(document_id: str, top_k: int) -> List[DocumentChunk]delete_collection(collection_name: str) -> boolget_collection_info() -> Dict[str, Any]
Configuration:
- Collection name: configurable per knowledge base
- Vector dimension: 384 (matches All-MiniLM-L6-v2-Embedding-GGUF)
- Distance metric: Cosine similarity
- Index type: HNSW for performance
- Duplicate detection: 0.95 similarity threshold
Purpose: Orchestrates query embedding and similarity search with validation Key Methods:
retrieve(query: str, top_k: int = 5) -> List[DocumentChunk]retrieve_batch(queries: List[str], top_k: int = 5) -> List[List[DocumentChunk]]test_connection() -> boolget_retrieval_stats() -> Dict[str, Any]
Features:
- Query preprocessing and normalization
- Similarity threshold filtering
- Metadata preservation (source, chunk position)
- Batch retrieval for multiple queries
- Connection testing and validation
Purpose: Interfaces with local LMStudio server with fallback endpoints Key Methods:
generate(prompt: str, **kwargs) -> strgenerate_async(prompt: str, **kwargs) -> strcheck_connection() -> boolget_available_models() -> List[str]
Configuration:
- Server endpoint (default: http://localhost:1234)
- Model: unsloth/gemma-3-4b-it-GGUF
- Parameters: temperature (0.7), max_tokens (1000), top_p (0.9)
- Multiple endpoint fallback (v1/completions, v1/chat/completions)
- Retry logic with exponential backoff
Purpose: Combines retrieved context with queries using advanced prompt templates Key Methods:
generate_response(query: str, context: List[DocumentChunk]) -> Responsegenerate_no_context_response(query: str) -> Responseformat_prompt(query: str, context: List[DocumentChunk]) -> strcalculate_confidence_score(response: str, context: List[DocumentChunk]) -> float
Prompt Templates:
- PromptTemplate: Basic template with configurable sections
- AdvancedPromptTemplate: Enhanced with source grouping and citation
- Factory function for template creation
- No-context fallback templates
Purpose: Manages the RAG workflow using graph-based orchestration with observability Workflow Nodes:
- Query Processing: Validate and preprocess user input
- Retrieval: Search for relevant documents with embedding generation
- Context Evaluation: Assess relevance of retrieved documents
- Generation: Generate response using LMStudio with context
- No Context Generation: Handle queries with no relevant context
- Search Expansion: Retry with relaxed parameters (max 2 attempts)
- Response Formatting: Structure final output with metadata and confidence
Key Features:
- Async and sync processing modes
- Thread-based conversation tracking
- State persistence via memory checkpointer
- Comprehensive workflow metrics collection
- Error propagation and recovery
Workflow Graph:
graph TD
A[Start] --> B[Process Query]
B --> C[Retrieve Documents]
C --> D{Documents Found?}
D -->|Yes| E[Evaluate Relevance]
D -->|No| F[No Context Response]
E --> G{Relevant Context?}
G -->|Yes| H[Generate Response]
G -->|No| I{Expansion Attempts < 2?}
I -->|Yes| J[Expand Search]
I -->|No| F
J --> C
H --> K[Format Output]
F --> K
K --> L[End]
Purpose: Manages dynamic configuration updates with validation and persistence Key Methods:
update_parameter(parameter: str, value: Any) -> boolbatch_update(updates: Dict[str, Any]) -> boolget_change_history() -> List[ConfigChange]reset_to_defaults() -> bool
Features:
- Runtime-updatable vs restart-required parameter classification
- Change history tracking with timestamps
- Callback system for configuration changes
- Atomic batch updates with rollback capability
- Configuration persistence to file
@dataclass
class DocumentChunk:
id: str = field(default_factory=lambda: str(uuid.uuid4()))
content: str = ""
embedding: Optional[List[float]] = None
metadata: Dict[str, Any] = field(default_factory=dict)
source: str = ""
chunk_index: int = 0
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for storage"""
def to_json(self) -> str:
"""Convert to JSON string"""
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'DocumentChunk':
"""Create from dictionary"""
@classmethod
def from_json(cls, json_str: str) -> 'DocumentChunk':
"""Create from JSON string"""@dataclass
class Query:
text: str
embedding: Optional[List[float]] = None
filters: Optional[Dict[str, Any]] = None
top_k: int = 5
similarity_threshold: float = 0.7
timestamp: datetime = field(default_factory=datetime.now)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary"""
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'Query':
"""Create from dictionary"""@dataclass
class Response:
answer: str
source_documents: List[DocumentChunk]
confidence_score: float
processing_time: float
metadata: Dict[str, Any]
timestamp: datetime = field(default_factory=datetime.now)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary"""
def to_json(self) -> str:
"""Convert to JSON string"""
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'Response':
"""Create from dictionary"""
@classmethod
def from_json(cls, json_str: str) -> 'Response':
"""Create from JSON string"""@dataclass
class ProcessingStats:
documents_processed: int = 0
chunks_created: int = 0
embeddings_generated: int = 0
queries_processed: int = 0
average_processing_time: float = 0.0
total_processing_time: float = 0.0
errors_encountered: int = 0
last_updated: datetime = field(default_factory=datetime.now)
def update_query_stats(self, processing_time: float) -> None:
"""Update statistics for query processing"""
def update_document_stats(self, chunks_count: int, embeddings_count: int) -> None:
"""Update statistics for document processing"""
def record_error(self) -> None:
"""Record an error occurrence"""
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary"""@dataclass
class RAGConfig:
# Qdrant settings
qdrant_host: str = "localhost"
qdrant_port: int = 6333
collection_name: str = "documents"
# LMStudio settings
lmstudio_endpoint: str = "http://localhost:1234"
model_name: str = "unsloth/gemma-3-4b-it-GGUF"
temperature: float = 0.7
max_tokens: int = 1000
top_p: float = 0.9
# Embedding settings
embedding_model: str = "second-state/All-MiniLM-L6-v2-Embedding-GGUF"
embedding_dimension: int = 384
# Retrieval settings
default_top_k: int = 5
similarity_threshold: float = 0.7
chunk_size: int = 1000
chunk_overlap: int = 200
# Logging settings
log_level: str = "INFO"
log_file: Optional[str] = None
# Performance settings
batch_size: int = 32
max_concurrent_requests: int = 10
device: Optional[str] = None # Auto-detect if None
def validate(self) -> None:
"""Validate configuration parameters"""
@classmethod
def from_file(cls, file_path: str) -> 'RAGConfig':
"""Load configuration from file"""
def to_file(self, file_path: str) -> None:
"""Save configuration to file"""@dataclass
class EmbeddingConfig:
model_name: str = "second-state/All-MiniLM-L6-v2-Embedding-GGUF"
batch_size: int = 32
max_seq_length: Optional[int] = None
device: Optional[str] = None
normalize_embeddings: bool = True
show_progress_bar: bool = False@dataclass
class ChunkingConfig:
chunk_size: int = 1000
chunk_overlap: int = 200
separators: List[str] = field(default_factory=lambda: ["\n\n", "\n", " ", ""])
keep_separator: bool = True
add_start_index: bool = Trueclass WorkflowState(TypedDict):
query: Optional[Query]
retrieved_documents: List[DocumentChunk]
generated_response: Optional[str]
final_response: Optional[Response]
error: Optional[str]
metadata: Dict[str, Any]
processing_start_time: Optional[float]@dataclass
class ConfigChange:
parameter: str
old_value: Any
new_value: Any
timestamp: datetime = field(default_factory=datetime.now)
source: str = "runtime" # runtime, file, api, etc.
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary"""A property is a characteristic or behavior that should hold true across all valid executions of a system-essentially, a formal statement about what the system should do. Properties serve as the bridge between human-readable specifications and machine-verifiable correctness guarantees.
Based on the prework analysis, I'll convert the testable acceptance criteria into universally quantified properties:
Property 1: Document Ingestion Round Trip For any valid document in supported formats (PDF, TXT, Markdown), ingesting it into the system should result in retrievable embeddings in Qdrant with preserved content and metadata Validates: Requirements 1.1, 1.2, 1.4
Property 2: Embedding Consistency For any text content, the Embedder should produce consistent numerical vectors of the expected dimension across multiple calls Validates: Requirements 1.2
Property 3: Duplicate Document Handling For any document that is ingested multiple times, the Document_Store should handle duplicates appropriately without corrupting the collection Validates: Requirements 1.5
Property 4: Query Retrieval Completeness For any user query, the Retriever should convert it to embeddings, search Qdrant, and return results with complete metadata (source, chunk position) Validates: Requirements 2.1, 2.5
Property 5: Similarity Ranking Correctness For any query with known similar documents in the collection, the Retriever should return documents ordered by relevance with the most similar documents first Validates: Requirements 2.2
Property 6: Top-K Parameter Compliance For any valid top-k value, the Retriever should return exactly that number of documents (or fewer if insufficient documents exist) Validates: Requirements 2.3
Property 7: Threshold Filtering For any query with no documents above the similarity threshold, the Retriever should return an empty result set Validates: Requirements 2.4
Property 8: Generation Parameter Application For any valid model parameters (temperature, max_tokens, top_p), the LMStudio_Client should apply them correctly when generating responses Validates: Requirements 3.3
Property 9: Context Integration For any user query and retrieved document context, the Generator should produce responses that incorporate both the query and the provided context Validates: Requirements 4.1, 4.2
Property 10: Source Citation For any generated response with source documents, the response should include references or citations to the source documents used Validates: Requirements 4.4
Property 11: Response Completeness For any processed query, the RAG_System should return a structured response containing generated text, source documents, and metadata Validates: Requirements 4.5, 7.3
Property 12: Workflow Execution Order For any query processed through LangGraph_Orchestrator, the workflow steps should execute in the correct sequence: query processing → retrieval → generation → response formatting Validates: Requirements 5.2
Property 13: Conditional Workflow Handling For any query that results in no retrieved documents, the LangGraph_Orchestrator should execute the appropriate conditional logic and generate a "no relevant information" response Validates: Requirements 5.3
Property 14: Configuration Validation For any configuration file with invalid settings, the RAG_System should detect the invalid values and provide clear error messages Validates: Requirements 6.2
Property 15: Runtime Parameter Updates For any valid runtime parameter change (retrieval count, generation settings), the system should apply the new parameters to subsequent operations Validates: Requirements 6.3
Property 16: Batch Processing Consistency For any set of queries processed in batch mode, each query should receive the same quality of processing as if processed individually Validates: Requirements 7.4
Property 17: Error Message Clarity For any error condition during query processing, the RAG_System should provide clear error messages that help users understand and resolve the issue Validates: Requirements 7.5
Property 18: Incremental Document Addition For any existing document collection, adding new documents should not require reprocessing of existing documents Validates: Requirements 8.3
Property 19: Batch Processing Efficiency For any large set of documents, the Embedder should process them in batches rather than individually to optimize memory usage Validates: Requirements 8.4
The system implements comprehensive error handling with structured logging, user-friendly messages, and actionable suggestions:
- Categories: Configuration, Connection, Authentication, Validation, Processing, Resource, System, User Input
- Severity Levels: Low, Medium, High, Critical
- Structured Context: ErrorContext dataclass with operation, component, user action, system state, and timestamp
- RAGSystemError: Base exception class with category, severity, and suggestions
- ErrorSuggestion: Actionable suggestions with priority and technical level
- ErrorFormatter: Rich console formatting for user-friendly error display
- Structured Logging: Using structlog for machine-readable error logs
- Unsupported file formats: Clear error messages with supported format list
- Corrupted files: Graceful handling with specific error details and recovery suggestions
- Large file handling: Memory management with progress indicators and chunking
- Permission errors: Clear file access error messages with permission fix suggestions
- Qdrant connection failures: Retry logic with exponential backoff (3 attempts)
- Collection creation errors: Validation and recovery procedures with detailed diagnostics
- Index corruption: Detection and rebuilding capabilities with backup recommendations
- Storage space issues: Monitoring and alerts with cleanup suggestions
- Duplicate detection: 0.95 similarity threshold with conflict resolution
- Server unavailability: Connection testing with multiple endpoint fallback
- Model loading failures: Model validation with alternative model suggestions
- Generation timeouts: Configurable timeout (30s default) with retry logic
- Invalid parameters: Parameter validation with range suggestions and examples
- Rate limiting: Backoff strategy with queue management
- Node execution failures: Error propagation with state preservation and recovery options
- State management errors: State validation with rollback capabilities and checkpoint recovery
- Resource exhaustion: Memory and processing limits with graceful degradation
- Configuration errors: Comprehensive validation with specific parameter fix suggestions
- Thread management: Conversation tracking with cleanup and recovery
- Connection retry: Exponential backoff for Qdrant and LMStudio connections
- Search expansion: Automatic retry with relaxed similarity thresholds (max 2 attempts)
- Fallback responses: No-context response generation when retrieval fails
- Graceful degradation: Reduced functionality when components are unavailable
- Configuration validation: Step-by-step configuration fix suggestions
- Service health checks: Automated diagnostics with specific fix instructions
- Error context: Detailed error information with troubleshooting steps
- Recovery commands: CLI commands for common recovery scenarios
- WorkflowMetrics: Comprehensive metrics collection for performance monitoring
- WorkflowLogger: Structured logging with configurable output formats
- ObservabilityMixin: Metrics collection integration for all components
- Performance tracking: Response times, success rates, error frequencies
- Error history: Persistent error log with categorization and trends
- Error aggregation: Common error pattern detection and reporting
- Alert thresholds: Configurable error rate alerts for proactive monitoring
- Diagnostic exports: Error context export for debugging and support
The testing strategy employs a comprehensive approach combining unit tests, integration tests, and property-based tests with full observability:
- Component isolation: Test individual components (Embedder, Retriever, Generator, RuntimeConfigManager) in isolation
- Interface testing: Test all public methods and error conditions for each component
- Mock dependencies: Use mocks for LMStudio and Qdrant during unit testing with realistic response simulation
- Configuration validation: Test various configuration combinations and edge cases
- Error condition testing: Comprehensive error scenario testing with recovery validation
- End-to-end workflows: Test complete RAG pipeline from document ingestion to response generation
- LangGraph workflow testing: Test all workflow nodes and conditional routing paths
- External service integration: Test actual integration with Qdrant and LMStudio servers
- Concurrent access: Test system behavior with multiple simultaneous queries and document processing
- Data persistence: Verify data integrity across system restarts and configuration changes
- Performance benchmarks: Validate response times and memory usage under various load conditions
- Testing framework: Use Hypothesis for Python property-based testing
- Test iterations: Minimum 100 iterations per property test for statistical confidence
- Smart data generation: Realistic generators for documents, queries, configurations, and embeddings
- Shrinking optimization: Leverage Hypothesis shrinking to find minimal failing examples
- Test categorization: Each property test tagged with format: Feature: rag-system, Property {number}: {property_text}
- Format variety: Generate PDF, TXT, and Markdown documents with varying structures
- Content diversity: Create documents with different lengths (100-10000 words), languages, and technical content
- Metadata variation: Generate realistic metadata including timestamps, authors, and categories
- Edge cases: Empty documents, very large documents, special characters, and encoding issues
- Complexity levels: Simple keyword queries to complex multi-part questions
- Similarity patterns: Queries with known similar documents for retrieval validation
- Language variety: Multi-language queries for internationalization testing
- Edge cases: Empty queries, very long queries, special characters, and malformed input
- Valid combinations: Generate realistic configuration combinations for different deployment scenarios
- Invalid scenarios: Create invalid configurations to test validation and error handling
- Boundary testing: Test parameter limits and edge values
- Runtime updates: Generate configuration change sequences for runtime update testing
- Realistic vectors: Generate embeddings that match expected dimensionality and distribution
- Similarity testing: Create embedding pairs with known similarity relationships
- Performance testing: Generate large embedding sets for batch processing validation
- Error simulation: Create malformed embeddings for error handling testing
- Docker containers: Consistent test environments with Qdrant and LMStudio services
- Test isolation: Separate containers for parallel test execution
- Service orchestration: Docker Compose for complete testing stack setup
- Environment cleanup: Automated container cleanup and resource management
- Automated setup: Scripted test collection creation and population
- Data fixtures: Reusable test datasets for consistent testing
- Cleanup procedures: Automated teardown of test collections and temporary files
- Backup and restore: Test data versioning and restoration capabilities
- Automated testing: Full test suite execution on code changes
- Parallel execution: Concurrent test running for faster feedback
- Coverage reporting: Comprehensive code coverage tracking with minimum thresholds
- Performance regression: Automated performance benchmark comparison
- Test result reporting: Detailed test reports with failure analysis and trends
- Workflow metrics: Validate WorkflowMetrics collection and accuracy
- Performance tracking: Test response time measurement and aggregation
- Error rate monitoring: Validate error frequency tracking and alerting
- Resource usage: Test memory and CPU usage monitoring
- Structured logging: Validate structlog output format and content
- Log level filtering: Test log level configuration and filtering
- Error context: Validate ErrorContext information capture and formatting
- Log aggregation: Test log collection and analysis capabilities
- Concurrent queries: Test system behavior under high query load
- Document ingestion: Test batch document processing performance
- Memory usage: Validate memory consumption under sustained load
- Error recovery: Test system recovery under failure conditions
- Service failures: Test behavior when Qdrant or LMStudio becomes unavailable
- Network issues: Test timeout and retry behavior under network problems
- Resource constraints: Test behavior under memory and disk space limitations
- Configuration corruption: Test recovery from invalid configuration states
- Minimum threshold: 85% code coverage across all modules
- Critical path coverage: 100% coverage for error handling and recovery paths
- Integration coverage: Full coverage of component interaction points
- Configuration coverage: Complete coverage of all configuration parameters and validation
- Feature completeness: All documented features must have corresponding tests
- Error scenarios: All identified error conditions must be tested
- Edge cases: Boundary conditions and edge cases must be covered
- Performance requirements: All performance requirements must be validated