Skip to content

K-tech-design/Flowstate-Inventory

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

4 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

🚀 Crypto Data Pipeline with Snowflake & Airflow

📌 Overview

This project is an end-to-end data engineering pipeline that fetches real-time cryptocurrency data (BTC/USDT) from an external API, processes it using Python, stores it in Snowflake, and orchestrates the workflow using Apache Airflow.

The goal of this project is to demonstrate practical implementation of ETL pipelines, cloud data warehousing, and workflow automation.


🏗️ Architecture

Binance API → Python ETL → Snowflake → Airflow Scheduler

⚙️ Tech Stack

  • Python
  • Snowflake (Cloud Data Warehouse)
  • Apache Airflow (Workflow Orchestration)
  • Pandas
  • Docker

🔄 Pipeline Workflow

1. Data Extraction

  • Fetches historical crypto data from Binance API
  • Uses REST API (/api/v3/klines)
  • Supports configurable symbol, interval, and limit

2. Data Transformation

  • Converts raw API response into structured format
  • Cleans and formats columns
  • Converts timestamps into proper datetime format
  • Ensures correct data types for analysis

3. Data Loading

  • Loads processed data into Snowflake
  • Automatically creates table if not exists
  • Uses efficient bulk loading (write_pandas)

4. Orchestration (Airflow)

  • Automates the pipeline execution
  • Schedules periodic data ingestion
  • Manages task dependencies
  • Handles failures and retries

📁 Project Structure

FlowState_Inventory/
│
├── src/
│   ├── data_ingestion.py
│   ├── load_to_snowflake.py
│   ├── utils/
│   │   ├── logger.py
│   │   └── exception.py
│
├── airflow/
│   ├── dags/
│   └── docker-compose.yaml
│
├── data/
│   └── btc_data.csv
│
├── .env
├── requirements.txt
└── README.md

🔐 Environment Variables

Create a .env file in the root directory:

SNOWFLAKE_USER=your_username
SNOWFLAKE_PASSWORD=your_password
SNOWFLAKE_ACCOUNT=your_account
SNOWFLAKE_WAREHOUSE=your_warehouse
SNOWFLAKE_DATABASE=your_database
SNOWFLAKE_SCHEMA=your_schema

▶️ How to Run

1. Clone the repository

git clone <your-repo-url>
cd FlowState_Inventory

2. Create virtual environment

python -m venv venv
venv\Scripts\activate

3. Install dependencies

pip install -r requirements.txt

4. Run data ingestion

python src/data_ingestion.py

5. Load data into Snowflake

python src/load_to_snowflake.py

6. Start Airflow (Docker)

cd airflow
docker-compose up

📊 Features

  • Automated crypto data ingestion
  • Secure credential management using .env
  • Scalable cloud data storage
  • Workflow scheduling with Airflow
  • Modular and production-like code structure

🚀 Future Improvements

  • Add support for multiple cryptocurrencies
  • Implement real-time streaming pipeline
  • Add data validation layer
  • Integrate alerting/monitoring system

👨‍💻 Author

Kunal Kumar


⭐ If you like this project

Give it a star ⭐ on GitHub!

About

End-to-end crypto ETL pipeline using API, Snowflake, and Airflow

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors