A metadata-driven ingestion framework for Snowflake that leverages native capabilities including Snowpipe, Dynamic Tables, Tasks, and Stored Procedures to automate data ingestion and transformation across Bronze, Silver, and Gold layers.
Snowmeta Pipeline is a comprehensive framework designed to be flexible, scalable, and observable, providing:
- β Metadata-driven configuration
- β Bronze layer ingestion with automatic schema inference
- β Silver layer transformations (SCD Type 1 & Type 2)
- β Data quality checks and CDC handling
- β Environment promotion capabilities
- β Minimal operational overhead
- Automatic Schema Inference: Uses Snowflake's
INFER_SCHEMAfunction - Multiple File Formats: Support for CSV, JSON, Parquet, and more
- Unified Stored Procedures: Single procedure for all tables
- Task Automation: Automated execution with Snowflake Tasks
- Latest record only (overwrites)
- Automatic deduplication
- Low maintenance with Dynamic Tables
- Full Historical Tracking:
VALID_FROM,VALID_TO,IS_CURRENTcolumns - Dynamic Column Detection: Automatically adapts to schema changes
- Change Detection: Tracks all attribute changes
- Task Chaining: Sequential execution with dependencies
- Flexible Configuration: Metadata-driven with column exclusions
- Built-in data quality expectations
- CDC (Change Data Capture) support
- Configurable sequence columns for ordering
snowflake-snowpark-python>=1.0.0
from snowmeta.snowmeta_pipeline import SnowmetaPipeline
pipeline = SnowmetaPipeline(session)
pipeline_bronze_data = [
{
"source_table": "Banks_2022_2023_raw",
"source_path_dev": "@RAW.ETBANKSFINANCIAL.LANDING/",
"reader_format": "CSV",
"bronze_database_dev": "ANALYTICS",
"bronze_schema": "FINANCIAL_BRONZE",
"bronze_table": "Banks_2022_2023"
}
]
pipeline.invoke_bronze_pipeline(
pipeline_data=pipeline_bronze_data,
warehouse_name="COMPUTE_WH",
use_stored_procedures=True
)pipeline_silver_data = [
{
"bronze_database_dev": "ANALYTICS",
"bronze_schema": "FINANCIAL_BRONZE",
"bronze_table": "Banks_2022_2023",
"silver_database_dev": "ANALYTICS",
"silver_schema": "FINANCIAL_SILVER",
"silver_table": "Banks_2022_2023",
"silver_cdc_apply_changes": {
"keys": ["customer_id"],
"sequence_by": "dmsTimestamp",
"scd_type": "2",
"except_column_list": ["Op", "dmsTimestamp", "_rescued_data"]
}
}
]
pipeline.invoke_silver_scd2_pipeline(
pipeline_silver_data=pipeline_silver_data,
warehouse_name="COMPUTE_WH",
bronze_task_name="ANALYTICS.FINANCIAL_BRONZE.INGEST_ALL_BRONZE",
execute_tasks=True
)- SCD Type 2 Pipeline Guide - Comprehensive guide for SCD Type 2 implementation
- Examples README - Example scripts and usage patterns
- Quick Start Script - Ready-to-use financial data pipeline
Raw Stage Data β Bronze Layer β Silver Layer β Gold Layer
(Ingestion) (SCD 1/2) (Analytics)
- Input: Raw files from cloud storage (S3, Azure Blob, GCS)
- Processing: Schema inference, COPY INTO
- Output: Raw tables with minimal transformations
- Input: Bronze tables
- Processing:
- SCD Type 1: Dynamic Tables with deduplication
- SCD Type 2: Stored Procedures with historical tracking
- Output: Clean, historized business entities
- Input: Silver tables
- Processing: Aggregations, joins, business logic
- Output: Analytics-ready datasets
snow-meta/
βββ snowmeta/ # Main package
β βββ __init__.py
β βββ snowmeta_pipeline.py # Core pipeline logic
β βββ controltable_reader.py # Metadata reader
β βββ config.py # Configuration
β βββ ...
βββ examples/ # Example scripts
β βββ onboarding.ipynb
β βββ pipeline.ipynb
βββ tests/ # Test suite
βββ ui/ # UI components
βββ setup.py # Package setup
βββ README.md
-
invoke_bronze_pipeline(pipeline_data, warehouse_name, use_stored_procedures)- Execute bronze ingestion pipeline
-
create_unified_bronze_stored_procedure(pipeline_data)- Generate stored procedure SQL for bronze ingestion
-
generate_bronze_sql_scripts(pipeline_data, warehouse_name)- Generate standalone SQL scripts
invoke_silver_pipeline(pipeline_silver_data, warehouse_name)- Execute SCD Type 1 pipeline using Dynamic Tables
-
invoke_silver_scd2_pipeline(pipeline_silver_data, warehouse_name, bronze_task_name, execute_tasks)- Execute SCD Type 2 pipeline with stored procedures and tasks
-
create_scd2_stored_procedure(silver_config)- Generate stored procedure SQL for SCD Type 2
-
create_scd2_task(silver_config, warehouse_name, after_task)- Generate task SQL for SCD Type 2
-
generate_scd2_sql_scripts(pipeline_silver_data, warehouse_name, bronze_task_name)- Generate standalone SQL scripts for SCD Type 2
Built on top of Snowflake's powerful features:
- Snowpark Python
- Dynamic Tables
- Stored Procedures
- Tasks
- COPY INTO with schema inference
- Data quality framework expansion
- Enhanced UI dashboard
- Automated testing framework