Read this document carefully – it describes the architecture, components, and usage of the pipeline.
The Crigglestone Data Pipeline is designed to ingest, transform, and load data into a PostgreSQL data warehouse hosted on AWS. It demonstrates key data engineering practices including ETL automation, star schema modelling, cloud infrastructure, monitoring, and best practices in Python development.
By the end of the pipeline setup, you will have:
- Ingested raw operational data from a PostgreSQL source into an immutable S3 data lake.
- Transformed this data into dimension and fact tables aligned with a star schema.
- Loaded transformed data into a PostgreSQL data warehouse for analytics.
- Exported table previews to S3 for verification and lightweight reporting.
- Demonstrated monitoring and logging via AWS CloudWatch.
The solution showcases knowledge of Python, SQL, AWS services, and data engineering workflows.
The pipeline consists of three AWS Lambda functions:
-
Ingest Lambda Extracts updated data from a source PostgreSQL database and stores it as CSV files in an ingestion S3 bucket.
-
Transform Lambda Reads CSVs from the ingest bucket, transforms them into dimensional and fact tables, and writes them as Parquet files into a processed S3 bucket.
-
Load Lambda Loads Parquet files into a PostgreSQL data warehouse and exports table previews to another S3 bucket.
This pipeline enables a robust ETL workflow, supporting analytics on business operations data.
- AWS Lambda
- Amazon S3
- AWS Secrets Manager
- Amazon RDS (PostgreSQL)
awswranglerpg8000boto3pandaspandasql
nc-crigglestone-ingest-bucket: stores ingested CSV files.nc-crigglestone-processed-bucket: stores transformed Parquet files.nc-crigglestone-lambda-bucket: stores update-tracking JSON and exported CSV previews.
Project: credentials for the source database.warehouse-db-credentials: credentials for the warehouse database.
- Source Database: PostgreSQL with operational tables.
- Warehouse Database: PostgreSQL schema for dimensional and fact tables.
Purpose: Extracts updated rows from the source PostgreSQL database into CSV files in the ingest bucket.
Key Features:
-
Defines 11 source tables (address, counterparty, currency, etc.).
-
Uses Secrets Manager to retrieve credentials.
-
Tracks incremental updates with
update_tracking.json. -
Stores data in S3 under structured paths:
s3://nc-crigglestone-ingest-bucket/{table}/{timestamp}.csv
Execution: Triggered manually or via EventBridge schedule.
Purpose: Transforms raw CSVs into star schema tables and stores them as Parquet in the processed bucket.
Key Features:
- Creates dimension tables (
dim_location,dim_counterparty,dim_currency,dim_design,dim_payment_type,dim_staff,dim_transaction,dim_date). - Creates fact tables (
fact_payment,fact_purchase_order,fact_sales_order). - Deduplicates records and performs joins between source tables.
- Splits timestamps into
dateandtimecomponents.
Execution: Triggered by S3 event on new CSV ingestion or manually.
Purpose: Loads transformed Parquet data into the warehouse and exports CSV previews to S3.
Key Features:
-
Retrieves warehouse credentials from Secrets Manager.
-
Loads Parquet files into PostgreSQL (
publicschema). -
Logs table previews (first 10 rows) in CloudWatch.
-
Exports full tables to:
s3://nc-crigglestone-lambda-bucket/extracts/{table}.csv
Execution: Triggered by S3 events on processed bucket or manually.
-
Create S3 Buckets
nc-crigglestone-ingest-bucketnc-crigglestone-processed-bucketnc-crigglestone-lambda-bucket
-
Configure Secrets Manager
Projectfor source DB.warehouse-db-credentialsfor warehouse DB.
-
Set Up Databases
- Source PostgreSQL with operational tables.
- Warehouse PostgreSQL with dimension/fact schema.
-
Deploy Lambda Functions
-
Package code + dependencies.
-
Deploy via AWS CLI/console/CI-CD.
-
Configure triggers:
- Ingest: scheduled (EventBridge).
- Transform: S3 event on ingest bucket.
- Load: S3 event on processed bucket.
-
-
Test the Pipeline
- Trigger Ingest → verify CSV in ingest bucket.
- Trigger Transform → verify Parquet in processed bucket.
- Trigger Load → verify warehouse tables and CSV exports.
- Running: Start with Ingest Lambda; others trigger automatically if events are configured.
- Monitoring: Use CloudWatch Logs for execution details and errors.
- Verification: Check exported CSV previews in the extracts folder.
- Schema Consistency: Source DB must match expected table list.
- Performance: Use batching or Glue for very large datasets.
- Security: Restrict IAM permissions to least-privilege.
- Error Handling: Implement retries and failover where possible.
- Audit : No known vulnerabilties found.
- Bandit: No Hisk Risk code found
- TDD: All lambdas used TDD.