A production-ready marketing data pipeline that processes Google Ads campaign data, converts spending to EUR, and delivers daily insights by country, brand, and channel. Built with Apache Airflow, PostgreSQL, and comprehensive monitoring.
The pipeline runs in 2 main stages:
- Ingest: Load CSV files containing raw costs, exchange rates, and account information into PostgreSQL
- Transform: Deduplicate data, convert currencies, extract countries, and aggregate spending metrics
All stages are orchestrated by Apache Airflow with built-in error handling, incremental processing, and monitoring.
campaignflow/
├── airflow/dags/ # Pipeline workflows
├── infra/ # Database connection modules
├── metadata/ # SQL table definitions
├── monitor/ # Basic monitoring setup
├── data/ # Input CSV files
└── docker-compose.yaml # Infrastructure setup
Key Components:
csv_to_postgres.py- Ingests CSV files into PostgreSQLmarketing_aggregation.py- Transforms and aggregates data for insightsconnections_postgres.py- Database operations with bulk insert/upsertdaily_spend_summary.sql- Final output table for stakeholder analysis- Basic Grafana setup demonstrating the final table can deliver required business insights
Schedule: Manual trigger with parameters
What it does:
- Reads CSV files from the
/datadirectory - Loads data into PostgreSQL with proper schema creation
- Handles different data types with appropriate upsert strategies:
- Raw Costs: Conditional upsert (only update if extraction timestamp is newer)
- Account Info: Simple upsert (latest data wins)
- Exchange Rates: Simple upsert by currency/date combination
Key features:
- Automatic file detection or manual file specification
- Chunked processing for large datasets (10k rows per batch)
- Pre-deduplication within chunks for efficiency
- Comprehensive error handling and logging
Usage:
# Auto-detect latest file
airflow dags trigger csv_to_postgres_dag --conf '{"data_type": "raw_costs"}'
# Specify exact file
airflow dags trigger csv_to_postgres_dag --conf '{"data_type": "account_info", "source_file": "/path/to/specific/file.csv"}'Schedule: Manual trigger with optional date parameter
What it does:
- Deduplicates raw costs (latest extraction wins per date/customer/campaign)
- Converts spending to EUR using exchange rates with date ranges
- Extracts countries from account domains and campaign names using
pycountry - Aggregates spending by report date, country, brand, and channel
- Outputs to
daily_spend_summarytable
Key features:
- SQL-level aggregation for performance
- Incremental processing (only processes new data since last successful run)
- Smart country extraction from multiple sources
- Proper currency conversion with fallback rates
- Pipeline run tracking for monitoring
Processing Logic:
- Deduplication: Uses window functions to keep latest extraction per campaign/date
- Currency Conversion: Joins with exchange rates based on date ranges
- Country Extraction:
- Account country from domain mapping (e.g., 'de' → 'germany')
- Campaign countries from campaign names using pycountry library
- Aggregation: Groups by date, country, brand, channel with spend totals
Usage:
# Process all new data since last run
airflow dags trigger marketing_aggregation_dag
# Process specific date
airflow dags trigger marketing_aggregation_dag --conf '{"process_date": "2025-01-29"}'Orchestration: Apache Airflow with LocalExecutor Database: PostgreSQL (dual setup for Airflow metadata and marketing data) Data Format: CSV input, PostgreSQL tables output Monitoring: Grafana + Prometheus (for both pipeline and infrastructure monitoring, check dashboards on Grafana UI) Infrastructure: Docker Compose Data Processing: Pandas + SQL for transformations
- CSV Files → PostgreSQL staging tables (raw_costs, exchange_rates, account_info)
- Raw Data → Deduplication and enrichment via SQL
- Enriched Data → Currency conversion and country extraction
- Processed Data → Final aggregation into daily_spend_summary
- Insights → Grafana dashboards for visualization
raw_costs: Campaign spending data with deduplication by (date, customer_id, campaign_id)exchange_rates: Currency conversion rates with validity periodsaccount_info: Account metadata including brand, channel, domain information
daily_spend_summary: Aggregated daily spending by (date, country, brand, channel)
report_date: Date of the spendingaccount_country: Country derived from account domaincampaign_countries: Countries extracted from campaign namesbrand: FlixBus, FlixTrain, etc.channel_type: Marketing channel (brand, nonbrand, mixed)spend_eur: Total spending converted to EURcampaign_count: Number of unique campaigns
-
Clone the repository:
git clone <repo-url> cd campaignflow
-
Setup environment: Create a
.envfile in the main directory:# Airflow Configuration AIRFLOW_UID=50000 _AIRFLOW_WWW_USER_USERNAME=airflow _AIRFLOW_WWW_USER_PASSWORD=airflow123 # PostgreSQL Configuration (Marketing Database) POSTGRES_HOST=marketing-postgres POSTGRES_PORT=5432 POSTGRES_DB=marketing_data POSTGRES_USER=marketing POSTGRES_PASSWORD=marketing123 # Processing Configuration PROCESSING_CHUNK_SIZE=1000
-
Start infrastructure:
./setup-docker.sh docker-compose up -d --build
-
Access Airflow:
- URL: http://localhost:8080
- Username: airflow
- Password: airflow123
-
Load your data:
- Place CSV files in the
data/directory - Trigger the ingestion DAG for each data type
- Run the aggregation DAG to process the data
- Place CSV files in the
-
View results:
- Grafana: http://localhost:3000 (admin/campaignflow)
- PostgreSQL: localhost:5433 (marketing/marketing123)
The pipeline outputs aggregated data to daily_spend_summary table, enabling stakeholders to analyze:
- Daily spending trends by country
- Brand budget allocation (FlixBus vs FlixTrain)
- Channel performance (brand vs nonbrand)
- Campaign efficiency metrics
Grafana Dashboard provides visualization of these insights, demonstrating how the final output table meets business requirements for marketing spend analysis by country, brand, and channel.
Note: This section addresses the design question from the case study - how to build a reliable and scalable pipeline for marketing data in a production environment.
The pipeline must prioritize data reliability and operational simplicity to support marketing teams making daily spending decisions. Given the scale of marketing investments, even small data quality issues can lead to suboptimal decisions and significant cost impact.
The pipeline follows a four-stage architecture designed to balance reliability, scalability, and operational simplicity:
Stage 1: Data Ingestion & Quality Gate → Stage 2: Raw Data Preservation → Stage 3: Business Logic Processing → Stage 4: Analytics Data Warehouse
Objective: Validate and monitor incoming data before processing
Implementation:
- Automated monitoring for new CSV files or API data streams
- Schema validation ensuring required columns and data types are present
- Data quality assessment including completeness checks, value range validation, and business rule verification
- Generation of quality metrics (row counts, field population rates, anomaly indicators) for downstream monitoring
Scheduling Strategy:
- Initial implementation: File polling every 6-12 hours aligned with Google Ads delivery schedules
- Evolution path: Event-driven processing as data source reliability increases
- Recovery capability: Manual triggers for missed batches or reprocessing scenarios
Objective: Maintain immutable copies of source data for compliance and future reprocessing
Dual Storage Approach:
- Production Staging: Raw data loaded into database staging tables without transformation for immediate processing use
- Archive Storage: Original files preserved in object storage with comprehensive metadata
- Retention Policy: Minimum 2-3 years retention to support historical analysis and regulatory requirements
Business Value: Enables reprocessing when business rules change without requiring fresh data extracts from source systems.
Objective: Apply marketing-specific transformations to create analysis-ready datasets
Core Processing Steps:
- Intelligent Deduplication: Apply latest-extraction-wins logic for campaign cost data using SQL window functions
- Currency Standardization: Convert all spend data to EUR using date-appropriate exchange rates with fallback handling
- Geographic Enrichment: Extract country information from account domains and campaign naming conventions
- Multi-dimensional Aggregation: Group data by report date, country, brand, and marketing channel
Data Quality Controls:
- Zero-tolerance policy for unexplained data loss with comprehensive audit logging
- Real-time validation of transformation logic with automatic rollback capabilities
- Alerting integration for processing anomalies or quality threshold breaches
Objective: Provide optimized, scalable access to processed marketing data
Storage Strategy:
- Cloud data warehouse (BigQuery, Snowflake, or Redshift) optimized for analytical workloads
- Columnar storage with appropriate partitioning and clustering for query performance
- Historical data retention supporting multi-year trend analysis
Access Patterns:
- Automated daily reporting for standard marketing metrics
- Self-service dashboard tools for ad-hoc analysis
- API endpoints for integration with marketing applications
- Direct database access for advanced analytical users
- Circuit Breaker Pattern: Prevent cascading failures between pipeline stages
- Exponential Backoff Retry: Automatic recovery from transient failures
- Dead Letter Queues: Isolation and analysis of consistently failing data
- Comprehensive Alerting: Multi-channel notifications with appropriate escalation paths
- Pipeline Health Dashboards: Real-time visibility into processing status and performance
- Data Quality Metrics: Automated tracking of completeness, accuracy, and timeliness
- Business Anomaly Detection: ML-powered identification of unusual spending patterns
- End-to-End Lineage: Complete traceability from source data to final insights
- Horizontal Partitioning: Processing workloads distributed by geography or time periods
- Database Optimization: Strategic indexing and query optimization for performance
- Cloud Auto-scaling: Dynamic resource allocation based on data volume and processing requirements
- Incremental Processing: Only process changed or new data to minimize computational overhead
- Role-Based Access Control: Granular permissions aligned with business responsibilities
- Data Anonymization: Automated masking of sensitive information in non-production environments
- Audit Trail Maintenance: Complete logging of all data access and modification activities
- Compliance Framework: Adherence to GDPR and other relevant data protection regulations
- Reliability over complexity
- Raw data preservation
- Zero tolerance for unexplained data loss
- Database-centric processing
- Incremental implementation
