Skip to content

Latest commit

 

History

History
186 lines (126 loc) · 3.24 KB

File metadata and controls

186 lines (126 loc) · 3.24 KB

🚀 Crypto Data Pipeline with Snowflake & Airflow

📌 Overview

This project is an end-to-end data engineering pipeline that fetches real-time cryptocurrency data (BTC/USDT) from an external API, processes it using Python, stores it in Snowflake, and orchestrates the workflow using Apache Airflow.

The goal of this project is to demonstrate practical implementation of ETL pipelines, cloud data warehousing, and workflow automation.


🏗️ Architecture

Binance API → Python ETL → Snowflake → Airflow Scheduler

⚙️ Tech Stack

  • Python
  • Snowflake (Cloud Data Warehouse)
  • Apache Airflow (Workflow Orchestration)
  • Pandas
  • Docker

🔄 Pipeline Workflow

1. Data Extraction

  • Fetches historical crypto data from Binance API
  • Uses REST API (/api/v3/klines)
  • Supports configurable symbol, interval, and limit

2. Data Transformation

  • Converts raw API response into structured format
  • Cleans and formats columns
  • Converts timestamps into proper datetime format
  • Ensures correct data types for analysis

3. Data Loading

  • Loads processed data into Snowflake
  • Automatically creates table if not exists
  • Uses efficient bulk loading (write_pandas)

4. Orchestration (Airflow)

  • Automates the pipeline execution
  • Schedules periodic data ingestion
  • Manages task dependencies
  • Handles failures and retries

📁 Project Structure

FlowState_Inventory/
│
├── src/
│   ├── data_ingestion.py
│   ├── load_to_snowflake.py
│   ├── utils/
│   │   ├── logger.py
│   │   └── exception.py
│
├── airflow/
│   ├── dags/
│   └── docker-compose.yaml
│
├── data/
│   └── btc_data.csv
│
├── .env
├── requirements.txt
└── README.md

🔐 Environment Variables

Create a .env file in the root directory:

SNOWFLAKE_USER=your_username
SNOWFLAKE_PASSWORD=your_password
SNOWFLAKE_ACCOUNT=your_account
SNOWFLAKE_WAREHOUSE=your_warehouse
SNOWFLAKE_DATABASE=your_database
SNOWFLAKE_SCHEMA=your_schema

▶️ How to Run

1. Clone the repository

git clone <your-repo-url>
cd FlowState_Inventory

2. Create virtual environment

python -m venv venv
venv\Scripts\activate

3. Install dependencies

pip install -r requirements.txt

4. Run data ingestion

python src/data_ingestion.py

5. Load data into Snowflake

python src/load_to_snowflake.py

6. Start Airflow (Docker)

cd airflow
docker-compose up

📊 Features

  • Automated crypto data ingestion
  • Secure credential management using .env
  • Scalable cloud data storage
  • Workflow scheduling with Airflow
  • Modular and production-like code structure

🚀 Future Improvements

  • Add support for multiple cryptocurrencies
  • Implement real-time streaming pipeline
  • Add data validation layer
  • Integrate alerting/monitoring system

👨‍💻 Author

Kunal Kumar


⭐ If you like this project

Give it a star ⭐ on GitHub!