Skip to content

thaddiusatme/feed-processing-system

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

56 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Feed Processing System

A robust and scalable system for processing RSS/Atom feeds with webhook delivery capabilities.

Features

  • Queue-based feed processing with configurable size and priority
  • Asynchronous API server with:
    • Thread-safe operations
    • Proper async/await support
    • Graceful shutdown handling
    • Enhanced error reporting
  • Webhook delivery with:
    • Robust retry mechanism with exponential backoff
    • Configurable retry parameters (initial delay, max delay, backoff factor)
    • Rate limiting and batch processing
    • Circuit breaker pattern
    • Comprehensive retry tracking and metrics
  • Advanced Content Analysis:
    • Intelligent content summarization
    • Multi-stage content processing
    • Quality assessment and validation
    • Customizable content filters
  • Advanced Performance Optimization:
    • Dynamic batch sizing and thread management
    • Intelligent resource allocation
    • Real-time performance monitoring
    • Adaptive processing parameters
  • Code Quality and Documentation:
    • Comprehensive module and method documentation
    • PEP 8 compliant code style
    • Full test coverage with pytest
    • Automated code quality checks (black, isort, flake8)
  • Metrics and monitoring:
    • Prometheus integration
    • Performance tracking
    • Custom dashboards
    • Resource utilization monitoring
    • Webhook retry metrics
  • Error handling:
    • Centralized error definitions
    • Automatic retry policies with exponential backoff
    • Error categorization and tracking
    • Detailed error reporting
  • Modular architecture:
    • Dedicated configuration management
    • Pluggable queue implementations
    • Extensible validation system
  • SQLite Database Integration:
    • Persistent storage of feed items
    • Tag-based organization
    • Efficient querying and retrieval
    • Automatic schema management
  • Batch processing support
  • Real-time metrics monitoring with Prometheus integration
  • Configurable webhook settings
  • Thread-safe implementation
  • Graceful shutdown handling
  • Advanced error handling:
    • Circuit breaker pattern for service protection
    • Error tracking and metrics
    • Configurable error history
    • Sensitive data sanitization
    • Comprehensive error categorization
  • Google Drive Integration
    • Automated folder structure creation
    • Standardized content organization
    • Metadata tracking
  • Webhook Processing
    • Rate-limited API endpoints
    • Data validation
    • Error handling

Current Development Status

High Priority Items (In Progress)

1. Webhook Delivery System

  • Rate limiting implementation (0.2s)
  • Retry mechanism with exponential backoff
  • Delivery queue manager
  • Circuit breaker implementation

2. Core Error Handling & Monitoring

  • Basic error handling system
  • Error tracking and logging
  • Critical error alerts
  • Performance monitoring

3. Content Analysis Core Features

  • Entity detection fixes
  • Basic keyword extraction improvements
  • Technology category identification
  • Content validation enhancements

Future Enhancements

Multilingual Topic Analysis

  • Support for multiple languages (English, Spanish, French)
  • Cross-lingual topic alignment
  • Similarity scoring across languages
  • Language-specific NLP processing pipelines

Requirements

  • Python 3.12+
  • pip for package management

Setup and Installation

  1. Create a virtual environment:
python3 -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate
  1. Install dependencies:
pip install -r requirements.txt
  1. Configure Google Drive credentials:

    • Create a project in Google Cloud Console
    • Enable Google Drive API
    • Create OAuth 2.0 credentials
    • Save credentials to .env file (see .env.example)
  2. Run tests:

pytest

Project Structure

feed_processor/
├── config/          # Configuration management
│   ├── webhook_config.py
│   └── processor_config.py
├── core/           # Core processing logic
│   ├── processor.py
│   └── database.py
├── queues/         # Queue implementations
│   ├── base.py
│   └── content.py
├── metrics/        # Metrics collection
│   ├── prometheus.py
│   └── performance.py
├── validation/     # Input validation
│   └── validators.py
├── webhook/        # Webhook handling
│   └── manager.py
└── errors.py       # Error definitions

Usage

Quick Start

  1. Start the feed processor:
python -m feed_processor start
  1. Process a specific feed:
python -m feed_processor process --feed-url https://example.com/feed.xml
  1. View current metrics:
python -m feed_processor metrics

Configuration

Core settings are managed through environment variables or config files:

# Required settings
WEBHOOK_URL=https://api.example.com/webhook
WEBHOOK_TOKEN=your_token
MAX_QUEUE_SIZE=1000

# Optional settings
BATCH_SIZE=50
RETRY_COUNT=3
RATE_LIMIT=100
DB_PATH=feeds.db

For detailed configuration options, see config/ directory.

Development

Testing

Run the test suite:

pytest

For integration tests:

pytest tests/integration

Metrics

The system exposes metrics via Prometheus at /metrics. Available metrics include:

  • Queue size and processing rates
  • Webhook delivery statistics
  • Error rates and types
  • Processing latency

Configuration

Configuration is managed through dedicated modules in the config/ directory:

  • webhook_config.py: Webhook delivery settings
  • processor_config.py: Core processor settings

Pipelines

Inoreader to Airtable Pipeline

The system includes a dedicated pipeline for fetching content from Inoreader and storing it in Airtable. This pipeline:

  • Fetches content from Inoreader using their API
  • Processes and validates the content
  • Stores the processed content in Airtable
  • Includes comprehensive metrics and monitoring

To run the pipeline:

  1. Copy the example environment file and fill in your credentials:
cp .env.example .env
# Edit .env with your Inoreader and Airtable credentials
  1. Run the pipeline:
python run_pipeline.py

The pipeline will:

  • Start a Prometheus metrics server (default port: 9090)
  • Continuously fetch new content from Inoreader
  • Process content in configurable batch sizes
  • Store processed content in Airtable
  • Provide real-time metrics and monitoring

Configuration options (via environment variables):

  • BATCH_SIZE: Number of items to process in each batch (default: 50)
  • FETCH_INTERVAL: Time in seconds between fetch operations (default: 60.0)
  • METRICS_PORT: Port for Prometheus metrics server (default: 9090)
  • DB_PATH: Path to SQLite database file (default: feeds.db)

Monitor the pipeline using:

  • Prometheus metrics at http://localhost:9090
  • Structured logs in JSON format
  • Airtable dashboard for stored content

Development Guidelines

Test-Driven Development

  • All new features must have corresponding test cases
  • Tests should be written before implementation
  • Current test coverage: ~85%

Code Quality Standards

  • Follow PEP 8 style guide
  • Documentation required for all new features
  • Code review required for all PRs
  • Regular dependency updates

Documentation

Core Documentation

Performance Optimization

Monitoring

Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Make your changes
  4. Run tests and ensure they pass
  5. Submit a pull request

License

This project is licensed under the MIT License - see the LICENSE file for details.

About

A robust Python-based feed processing system

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages