This document provides a comprehensive overview of the AI Automation Framework architecture, including system design, module dependencies, data flow, and key design decisions.
- System Overview
- Architecture Diagram
- Core Modules
- Module Dependencies
- Data Flow
- Key Design Decisions
- Extension Points
- Performance Considerations
The AI Automation Framework is a modular, extensible framework for building AI-powered automation solutions. It follows a layered architecture with clear separation of concerns:
┌─────────────────────────────────────────────────────────────┐
│ Application Layer │
│ (Examples, Demos, Real-world Applications) │
└─────────────────────────────────────────────────────────────┘
│
┌─────────────────────────────────────────────────────────────┐
│ Agent Layer │
│ (BaseAgent, ToolAgent, MultiAgent) │
└─────────────────────────────────────────────────────────────┘
│
┌──────────────┬─────────────┬──────────────┬────────────────┐
│ Workflow │ Tools │ RAG │ Integrations │
│ Engine │ Collection │ System │ Layer │
└──────────────┴─────────────┴──────────────┴────────────────┘
│
┌─────────────────────────────────────────────────────────────┐
│ LLM Layer │
│ (OpenAI, Anthropic, Ollama clients) │
└─────────────────────────────────────────────────────────────┘
│
┌─────────────────────────────────────────────────────────────┐
│ Core Infrastructure │
│ (Config, Logger, DI, Events, Metrics, Cache, etc.) │
└─────────────────────────────────────────────────────────────┘
┌──────────────────────────────────────────────────────────────────┐
│ USER APPLICATIONS │
│ ┌────────────┐ ┌────────────┐ ┌──────────────────────────┐ │
│ │ Examples │ │ Demos │ │ Production Apps │ │
│ └────────────┘ └────────────┘ └──────────────────────────┘ │
└──────────────────────────────────────────────────────────────────┘
│
┌─────────────────────┼─────────────────────┐
│ │ │
┌────────▼─────────┐ ┌────────▼─────────┐ ┌───────▼────────┐
│ │ │ │ │ │
│ Agent System │ │ Workflow Engine │ │ RAG System │
│ │ │ │ │ │
│ ┌────────────┐ │ │ ┌────────────┐ │ │ ┌───────────┐ │
│ │ BaseAgent │ │ │ │ Chain │ │ │ │ Embeddings│ │
│ ├────────────┤ │ │ ├────────────┤ │ │ ├───────────┤ │
│ │ ToolAgent │ │ │ │ Pipeline │ │ │ │VectorStore│ │
│ ├────────────┤ │ │ └────────────┘ │ │ ├───────────┤ │
│ │MultiAgent │ │ │ │ │ │ Retriever │ │
│ └────────────┘ │ │ │ │ └───────────┘ │
│ │ │ │ │ │
└──────┬───────────┘ └──────┬───────────┘ └────────┬────────┘
│ │ │
└─────────────────────┼───────────────────────┘
│
┌────────▼────────┐
│ │
│ LLM Clients │
│ │
│ ┌───────────┐ │
│ │ OpenAI │ │
│ ├───────────┤ │
│ │ Anthropic │ │
│ ├───────────┤ │
│ │ Ollama │ │
│ └───────────┘ │
│ │
└────────┬────────┘
│
┌───────────────────┼───────────────────┐
│ │ │
┌────────▼────────┐ ┌───────▼────────┐ ┌──────▼──────┐
│ │ │ │ │ │
│ Tools Layer │ │ Integrations │ │ Plugins │
│ │ │ │ │ │
│ • File Ops │ │ • Zapier │ │ • Plugin │
│ • Web Search │ │ • n8n │ │ System │
│ • Calculations │ │ • Airflow │ │ • Loading │
│ • Document │ │ • Temporal │ │ • Deps │
│ Loaders │ │ • Prefect │ │ │
│ • Advanced │ │ • Celery │ │ │
│ Automation │ │ • Cloud │ │ │
│ │ │ Services │ │ │
└─────────────────┘ └────────────────┘ └─────────────┘
│
┌───────────────────┴───────────────────┐
│ │
┌────────▼──────────────────────────────────────▼────────┐
│ CORE INFRASTRUCTURE │
│ │
│ ┌──────────┐ ┌─────────┐ ┌────────┐ ┌─────────────┐ │
│ │ Config │ │ Logger │ │ Base │ │ DI │ │
│ └──────────┘ └─────────┘ └────────┘ └─────────────┘ │
│ ┌──────────┐ ┌─────────┐ ┌────────┐ ┌─────────────┐ │
│ │ Events │ │ Metrics │ │ Cache │ │ Health │ │
│ └──────────┘ └─────────┘ └────────┘ └─────────────┘ │
│ ┌──────────┐ ┌─────────┐ ┌────────┐ ┌─────────────┐ │
│ │Circuit │ │ Task │ │Sanitiz-│ │ Validation │ │
│ │Breaker │ │ Queue │ │ ation │ │ │ │
│ └──────────┘ └─────────┘ └────────┘ └─────────────┘ │
│ ┌──────────┐ ┌─────────┐ ┌────────┐ │
│ │Middleware│ │ Utils │ │ Async │ │
│ └──────────┘ └─────────┘ └────────┘ │
└────────────────────────────────────────────────────────┘
The foundation layer providing essential services:
-
base.py: Base classes for all components
BaseComponent: Abstract base with lifecycle managementMessage: Standardized message formatResponse: Standardized response format
-
config.py: Configuration management with environment variable support
-
logger.py: Enhanced logging with rich formatting and structured output
-
di.py: Dependency Injection container for managing component dependencies
-
events.py: Event bus for event-driven architecture
-
metrics.py: Metrics collection and monitoring (Prometheus-compatible)
-
cache.py: Response caching for cost optimization
-
circuit_breaker.py: Circuit breaker pattern for fault tolerance
-
health.py: Health check system for monitoring service status
-
plugins.py: Plugin system for extensibility
-
middleware.py: Middleware pipeline for request/response processing
-
task_queue.py: Task queue for background job processing
-
sanitization.py: Input sanitization for security
-
validation.py: Input validation utilities
-
utils.py: Common utilities and helper functions
-
async_utils.py: Async/await utilities
Unified interface for different LLM providers:
-
base_client.py:
BaseLLMClientabstract interface- Synchronous chat
- Async chat
- Streaming support
- Simple chat interface
-
openai_client.py: OpenAI API implementation
- GPT-4, GPT-3.5 support
- Function calling
- Token usage tracking
-
anthropic_client.py: Anthropic Claude implementation
- Claude 3 models support
- Streaming responses
- Usage tracking
-
ollama_client.py: Local LLM support via Ollama
- Run models locally
- Privacy-first option
- No API costs
-
streaming.py: Streaming utilities and helpers
Retrieval-Augmented Generation implementation:
-
embeddings.py: Text embedding generation
- Multiple embedding models support
- Batch processing
- Caching
-
vector_store.py: Vector storage and similarity search
- ChromaDB integration
- FAISS support
- Efficient retrieval
-
retriever.py: Document retrieval and context management
- Context window management
- Relevance scoring
- Multi-document support
Intelligent agent implementations:
-
base_agent.py:
BaseAgentwith memory and conversation management- Message history
- System prompts
- Context management
-
tool_agent.py:
ToolAgentwith function calling capabilities- Tool registration
- Schema validation
- Tool execution
-
multi_agent.py: Multi-agent coordination and collaboration
- Agent communication
- Task delegation
- Parallel execution
Orchestration and automation:
-
chain.py: Sequential processing chains
- Step-by-step execution
- State passing
- Error handling
-
pipeline.py: Parallel and complex workflows
- DAG support
- Conditional branching
- Parallel execution
Pre-built tools and utilities:
- common_tools.py: File operations, calculations, web search
- document_loaders.py: PDF, Word, Markdown, text loaders
- advanced_automation.py: Email, database, web scraping
- scheduler_and_testing.py: Task scheduling, API testing
- data_processing.py: Excel/CSV processing
- devops_cloud.py: Cloud integration (AWS, GCP, Azure)
- performance_monitoring.py: Performance metrics and monitoring
- audio_processing.py: Speech-to-text, text-to-speech
- video_processing.py: Video processing and analysis
- websocket_server.py: WebSocket server implementation
- graphql_api.py: GraphQL API support
- media_messaging.py: Media and messaging tools
- ai_dev_assistant.py: AI-assisted development tools
External workflow and cloud integrations:
- zapier_integration.py: Zapier workflow automation
- n8n_integration.py: n8n workflow automation
- airflow_integration.py: Apache Airflow DAGs
- temporal_integration.py: Temporal.io workflows
- prefect_integration.py: Prefect workflows
- celery_integration.py: Celery distributed tasks
- cloud_services.py: AWS, GCP, Azure integrations
- workflow_automation_unified.py: Unified workflow interface
┌─────────────────────┐
│ Applications │
└──────────┬──────────┘
│
▼
┌─────────────────────┐
│ Agents │
└──────────┬──────────┘
│
┌────┴────┐
▼ ▼
┌──────────┐ ┌──────────┐
│Workflows │ │ RAG │
└────┬─────┘ └────┬─────┘
│ │
└─────┬──────┘
▼
┌─────────────┐
│ LLM Clients│
└──────┬──────┘
│
┌──────┴───────┐
▼ ▼
┌───────┐ ┌────────┐
│ Tools │ │Integr. │
└───┬───┘ └───┬────┘
│ │
└──────┬───────┘
▼
┌─────────────┐
│ Core │
└─────────────┘
- Core has no dependencies (foundation layer)
- LLM Clients depend on Core
- RAG, Tools, Integrations depend on Core
- Workflows depend on Core and LLM Clients
- Agents depend on Core, LLM Clients, and optionally RAG/Tools
- Applications depend on Agents and other layers as needed
User Input
│
▼
┌──────────────┐
│ Application │
└──────┬───────┘
│
▼
┌──────────────┐ ┌──────────┐
│ Agent │─────▶│ Memory │
└──────┬───────┘ └──────────┘
│
▼
┌──────────────┐ ┌──────────┐
│ LLM Client │─────▶│ Cache │
└──────┬───────┘ └──────────┘
│
▼
┌──────────────┐
│ Provider │ (OpenAI/Anthropic/Ollama)
│ API │
└──────┬───────┘
│
▼
Response
│
▼
┌──────────────┐ ┌──────────┐
│Usage Tracker │─────▶│ Metrics │
└──────────────┘ └──────────┘
│
▼
User Output
User Query
│
▼
┌──────────────┐
│ Embeddings │
└──────┬───────┘
│
▼
┌──────────────┐
│Vector Search │
└──────┬───────┘
│
▼
┌──────────────┐
│ Retriever │ (Get relevant documents)
└──────┬───────┘
│
▼
┌──────────────┐
│ Context │ + User Query
│ Construction │
└──────┬───────┘
│
▼
┌──────────────┐
│ LLM Client │
└──────┬───────┘
│
▼
Enhanced Answer
User Task
│
▼
┌──────────────┐
│ Tool Agent │
└──────┬───────┘
│
▼
┌──────────────┐
│ LLM Analyzes │ (Determine which tools to use)
│ Task │
└──────┬───────┘
│
▼
┌──────────────┐
│ Tool Schema │
│ Validation │
└──────┬───────┘
│
▼
┌──────────────┐
│Tool Execution│ (Execute selected tools)
└──────┬───────┘
│
▼
┌──────────────┐
│Result Format │
└──────┬───────┘
│
▼
┌──────────────┐
│LLM Generates │ (Create final response)
│Final Response│
└──────┬───────┘
│
▼
Task Result
Complex Task
│
▼
┌──────────────┐
│ Manager │
│ Agent │
└──────┬───────┘
│
├─────────────┬─────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Agent 1 │ │ Agent 2 │ │ Agent 3 │
│(Research)│ │(Analysis)│ │(Writing) │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
└──────┬──────┴──────┬──────┘
│ │
▼ ▼
┌────────┐ ┌────────┐
│ Event │ │Message │
│ Bus │ │Passing │
└────┬───┘ └────┬───┘
│ │
└──────┬──────┘
▼
┌──────────────┐
│ Manager │
│ Aggregates │
└──────┬───────┘
│
▼
Final Result
Decision: Use a layered, modular architecture with clear separation of concerns.
Rationale:
- Easy to understand and maintain
- Components can be developed and tested independently
- Enables code reuse across different use cases
- Facilitates team collaboration
Decision: All major components inherit from BaseComponent.
Rationale:
- Consistent lifecycle management (initialize, cleanup)
- Standardized logging across components
- Context manager support
- Shared configuration handling
Decision: Use standardized Message and Response objects.
Rationale:
- Provider-agnostic interface
- Type safety with Pydantic
- Easy to extend with metadata
- Consistent error handling
Decision: Implement DI container for component management.
Rationale:
- Loose coupling between components
- Easy to swap implementations
- Testability (mock injection)
- Configuration flexibility
Decision: Provide event bus for component communication.
Rationale:
- Decoupled component interaction
- Extensibility through event listeners
- Audit trail and logging
- Async processing support
Decision: Support middleware for request/response processing.
Rationale:
- Cross-cutting concerns (logging, auth, rate limiting)
- Composable processing chain
- Easy to add new behaviors
- Framework-level vs application-level logic separation
Decision: Implement circuit breaker for external API calls.
Rationale:
- Fault tolerance for unreliable services
- Prevent cascade failures
- Automatic recovery
- Configurable thresholds
Decision: Implement response caching at the LLM client level.
Rationale:
- Cost optimization (reduce API calls)
- Performance improvement (faster responses)
- Configurable TTL
- Easy to disable for dynamic content
Decision: Extensible plugin architecture.
Rationale:
- Third-party extensions
- Feature toggling
- Dependency management
- Version compatibility
Decision: Built-in usage and cost tracking.
Rationale:
- Cost monitoring and optimization
- Usage analytics
- Budget management
- Performance metrics
Extend BaseLLMClient to add new LLM providers:
from ai_automation_framework.llm.base_client import BaseLLMClient
class MyCustomClient(BaseLLMClient):
def chat(self, messages, **kwargs):
# Implementation
pass
async def achat(self, messages, **kwargs):
# Async implementation
pass
def stream_chat(self, messages, **kwargs):
# Streaming implementation
passExtend BaseAgent for specialized agent behavior:
from ai_automation_framework.agents import BaseAgent
class MySpecializedAgent(BaseAgent):
def run(self, task, **kwargs):
# Custom agent logic
passRegister custom tools with agents:
def my_tool(param: str) -> dict:
"""Tool implementation"""
return {"result": "success"}
# Tool schema for LLM
tool_schema = {
"type": "function",
"function": {
"name": "my_tool",
"description": "Description",
"parameters": {
"type": "object",
"properties": {
"param": {"type": "string"}
}
}
}
}
agent.register_tool("my_tool", my_tool, tool_schema)Add custom middleware to the pipeline:
from ai_automation_framework.core.middleware import Middleware
class MyMiddleware(Middleware):
async def process(self, request, next_middleware):
# Pre-processing
response = await next_middleware(request)
# Post-processing
return responseCreate custom plugins:
from ai_automation_framework.core.plugins import Plugin
class MyPlugin(Plugin):
def initialize(self):
# Plugin initialization
pass
def execute(self, *args, **kwargs):
# Plugin logic
passSubscribe to framework events:
from ai_automation_framework.core.events import event_bus
@event_bus.on("agent.task_started")
def on_task_started(event):
# Handle task start event
pass- All I/O-bound operations support async
- Use
achat()for non-blocking LLM calls - Parallel processing with
asyncio.gather()
- Response caching reduces API calls
- Configurable TTL and cache size
- LRU eviction policy
- Batch embedding generation
- Batch document processing
- Reduce API round-trips
- HTTP connection reuse
- Persistent connections to APIs
- Reduced latency
- Fail fast on unreliable services
- Prevent resource exhaustion
- Automatic recovery
- Background job processing
- Prevent blocking operations
- Scalable task execution
- Track performance metrics
- Identify bottlenecks
- Prometheus-compatible exports
- Real-time monitoring dashboards
- Validate all user inputs
- Sanitize file paths
- Prevent injection attacks
- Environment variable storage
- Never hardcode keys
- Secure key rotation
- Prevent API abuse
- Middleware-based rate limiting
- Configurable thresholds
- Don't expose sensitive information
- Log security events
- Graceful degradation
Developer Machine
├── Local LLM (Ollama)
├── SQLite Vector Store
└── File-based Cache
┌─────────────────────────────────────────┐
│ Load Balancer │
└─────────────┬───────────────────────────┘
│
┌─────────┴─────────┐
▼ ▼
┌─────────┐ ┌─────────┐
│ App │ │ App │
│Instance1│ │Instance2│
└────┬────┘ └────┬────┘
│ │
└─────────┬─────────┘
│
┌─────────┴──────────┐
▼ ▼
┌──────────┐ ┌──────────┐
│ Redis │ │PostgreSQL│
│ Cache │ │ Metadata │
└──────────┘ └──────────┘
│
▼
┌──────────┐
│ ChromaDB │
│ Vector │
│ Store │
└──────────┘
-
Advanced RAG Techniques
- HyDE (Hypothetical Document Embeddings)
- Multi-query retrieval
- Re-ranking
-
Agent Memory Persistence
- Database-backed memory
- Long-term memory
- Memory compression
-
Distributed Execution
- Multi-node agent execution
- Load balancing
- Fault tolerance
-
Enhanced Monitoring
- Real-time dashboards
- Alerting
- Performance profiling
-
More LLM Providers
- Google Gemini
- Cohere
- Together AI
-
Advanced Workflow Features
- Visual workflow builder
- Workflow templates
- Workflow marketplace
Version: 0.5.0 Last Updated: 2025-12-21 Maintainers: AI Automation Framework Team