-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbitcoin_dag.py
More file actions
57 lines (49 loc) · 1.7 KB
/
bitcoin_dag.py
File metadata and controls
57 lines (49 loc) · 1.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
"""
Airflow DAG to automate real-time Bitcoin data ingestion, anomaly detection with Slack alerts,
rolling statistics computation, and S3 uploads using bitcoin_utils.py.
"""
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import sys
sys.path.append('/opt/airflow')
from bitcoin_utils import (
save_price_to_csv,
compute_moving_average,
upload_to_s3
)
# DAG Configuration
default_args = {
'owner': 'airflow',
'start_date': datetime(2025, 5, 1),
'retries': 1,
'retry_delay': timedelta(minutes=2),
}
with DAG(
dag_id='bitcoin_data_pipeline',
default_args=default_args,
description='ETL DAG: Ingest, compute stats, archive, and upload Bitcoin data with Slack alerts',
schedule_interval='@hourly',
catchup=False,
tags=["bitcoin", "etl", "stats", "s3", "slack", "archival"],
) as dag:
# Task 1: Fetch and Save Bitcoin Price (includes anomaly detection, Slack alert, archive + S3 upload)
fetch_and_save = PythonOperator(
task_id='fetch_and_save_bitcoin_price',
python_callable=save_price_to_csv
)
# Task 2: Compute rolling statistics and update processed data + S3 upload
process_data = PythonOperator(
task_id='compute_moving_average',
python_callable=compute_moving_average
)
# Redundant uploader task if needed independently
upload_processed = PythonOperator(
task_id='upload_processed_csv_to_s3',
python_callable=lambda: upload_to_s3(
bucket_name='bitcoin-price-store',
key_path='processed/bitcoin_processed.csv'
)
)
# DAG Flow
fetch_and_save >> process_data >> upload_processed