Skip to content

glommmer/airflow-agent

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

2 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

πŸš€ Airflow Monitoring Agent

Python FastAPI Streamlit Airflow LangGraph OpenAI Langfuse

Airflow Monitoring Agent is an advanced, autonomous multi-agent system designed to monitor, diagnose, and remediate Apache Airflow workflows. Built upon LangGraph, it orchestrates specialized AI agents that collaborate to detect failures, analyze root causes using task logs and source code, and execute recovery actions with human-in-the-loop approval.

This system decouples the reasoning engine (FastAPI Backend) from the user interface (Streamlit Frontend), enabling real-time, streaming feedback and interactive troubleshooting.


πŸ—οΈ System Architecture

The system operates as a state machine where a shared AgentState is passed between nodes.

graph TD
    Start((Start)) --> Monitor[πŸ” Monitor Agent]
    Monitor -->|Failure Detected| Analyzer[πŸ”¬ Analyzer Agent]
    Monitor -->|No Issues| End((End))
    
    Analyzer -->|Report Generated| Interaction[πŸ’¬ User Interaction Agent]
    
    Interaction -->|Wait for Input| Interaction
    Interaction -->|User Decision: Action| Action[⚑ Action Agent]
    
    Action -->|Action Complete| End
    Action -->|Requires Follow-up| Interaction
Loading

Component Roles

Component Technology Description
Server FastAPI Hosts the LangGraph workflow, WebSocket endpoints, and Airflow client services.
Client Streamlit Provides the chat UI, renders markdown reports, and manages user sessions.
Workflow LangGraph Defines the conditional routing logic between agents (e.g., should_analyze, should_act).
Database JSON / Memory Stores session states locally for persistence across UI refreshes.

🌟 Key Features

1. Intelligent Monitoring & Detection

  • Real-time Failure Detection: The AirflowMonitorAgent queries the Airflow REST API to identify failed DAG runs and Task Instances.
  • Context-Aware Monitoring: Supports monitoring specific DAG IDs, tasks, or a wildcard mode (~) to scan the entire Airflow instance for recent errors.
  • Log Retrieval: Automatically fetches execution logs and metadata (try number, start/end dates) for failed tasks.

2. Autonomous Error Analysis

  • Deep Diagnostics: The ErrorAnalyzerAgent ingests both the Task Logs and the DAG Source Code to understand the context of the failure.
  • Web-Enabled Debugging: Equipped with DuckDuckGo search tools, the agent can research specific error messages, library exceptions, or stack traces online to find relevant solutions.
  • Root Cause Identification: Generates a structured report detailing the root cause, error location, and suggested solutions.

3. Human-in-the-Loop Workflow

  • Interactive Decision Making: The UserInteractionAgent presents the analysis to the user and requests guidance (e.g., "Retry," "Skip," or "Manual Fix").
  • Natural Language Interface: Users can issue commands or ask questions like "Analyze the failed task in dag_id X" or "Clear the task" directly through the chat interface.

4. Automated Remediation

  • Action Execution: The ActionAgent performs the chosen operation via the Airflow API, such as Clearing Task Instances (triggering a retry) or marking tasks as success/skipped.
  • Validation: Verifies the result of the action and reports back to the user.

5. Observability & Architecture

  • LangGraph Orchestration: Uses a state-based graph architecture to manage the flow between monitoring, analysis, interaction, and action.
  • Full Observability: Integrated with Langfuse to trace agent reasoning steps, token usage, and latency.
  • Modern Stack: Built with FastAPI (Backend), Streamlit (Frontend), and WebSockets/SSE for real-time streaming.

πŸ“‚ Project Structure

airflow-agent/
β”œβ”€β”€ app/                        # Streamlit Frontend
β”‚   β”œβ”€β”€ components/             # UI Components (Chat interface)
β”‚   β”œβ”€β”€ utils/                  # Session state management
β”‚   └── main.py                 # Client entry point
β”œβ”€β”€ server/                     # FastAPI Backend
β”‚   β”œβ”€β”€ agents/                 # Agent logic (Monitor, Analyzer, Interaction, Action)
β”‚   β”œβ”€β”€ routers/                # API & WebSocket endpoints
β”‚   β”œβ”€β”€ services/               # Airflow REST API Client
β”‚   β”œβ”€β”€ tools/                  # Search tools (DuckDuckGo)
β”‚   β”œβ”€β”€ workflow/               # LangGraph state & graph definition
β”‚   └── main.py                 # Server entry point
β”œβ”€β”€ sessions/                   # Local storage for session data
β”œβ”€β”€ run_server.sh               # Startup script for Backend
β”œβ”€β”€ run_client.sh               # Startup script for Frontend
└── .env.example                # Configuration template

βš™οΈ Installation & Setup

1. Prerequisites

  • Python 3.10+
  • Running instance of Apache Airflow (API enabled)
  • OpenAI API Key

2. Clone and Install

# Clone the repository
git clone <repository-url>
cd airflow-agent

# Create virtual environment
python -m venv .venv
source .venv/bin/activate  # Windows: .venv\Scripts\activate

# Install dependencies
pip install -r requirements.txt

3. Configuration

Copy the example environment file and configure your credentials.

cp .env.example .env

Required .env Variables:

# --- Airflow Connection ---
AIRFLOW_HOST=http://localhost:8080
AIRFLOW_USERNAME=admin
AIRFLOW_PASSWORD=admin

# --- LLM Provider ---
OPENAI_API_KEY=sk-...
OPENAI_MODEL=gpt-4o

# --- Observability (Optional) ---
LANGFUSE_PUBLIC_KEY=pk-lf-...
LANGFUSE_SECRET_KEY=sk-lf-...
LANGFUSE_HOST=[https://cloud.langfuse.com](https://cloud.langfuse.com)

# --- Server Config ---
API_BASE_URL=http://localhost:8000

πŸš€ Usage Guide

This system requires running the backend server and frontend client simultaneously.

1. Start the Backend Server

The server handles the heavy lifting: agent orchestration, API calls, and reasoning.

./run_server.sh
# OR manually:
# python -m uvicorn server.main:app --reload --host 0.0.0.0 --port 8000

2. Start the Frontend Client

The client provides the visual interface to interact with the agents.

./run_client.sh
# OR manually:
# streamlit run app/main.py --server.port 8501

3. How to Interact

  1. Start a Session: Click "+ New Session" in the sidebar.
  2. Monitor: Type "Check failed DAGs" or provide a specific DAG ID.
  3. Analyze: The agent will detect errors and generate a report automatically.
  4. Resolve:
    • Click or type "1" (or "Retry") to clear the task and restart it.
    • Click or type "2" (or "Manual") to skip automated actions.
    • Click or type "3" to see the full technical report.

πŸ”§ Troubleshooting

  • Airflow API Connection: Ensure your Airflow instance is running and the credentials in .env have API access permissions. The AirflowClient uses Basic Auth.
  • WebSocket Errors: If the UI hangs, check the browser console for WebSocket connection errors. Ensure API_BASE_URL matches the running server address.
  • LLM Rate Limits: If using OpenAI, ensure your account has sufficient credits. The ErrorAnalyzerAgent can generate large prompts containing logs and code.

About

Autonomous multi-agent system for Apache Airflow monitoring and troubleshooting. Powered by LangGraph, OpenAI, and FastAPI with a Streamlit UI for human-in-the-loop recovery.

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors