Skip to content

Latest commit

 

History

History
423 lines (324 loc) · 13.8 KB

File metadata and controls

423 lines (324 loc) · 13.8 KB

AWS Implementation Guide — Detailed Plan

Complete replication of the NYC Taxi pipeline on Amazon Web Services with step-by-step instructions.


1. Service Mapping (GCP → AWS)

GCP Component AWS Equivalent Notes
Cloud Storage (GCS) S3 Raw parquet, dashboard JSON
BigQuery RawBronze Glue Catalog + S3 (Parquet) Or Athena external tables
BigQuery CleanSilver Glue Catalog + S3 (Parquet)
BigQuery PreMlGold Glue Catalog + S3 (Parquet)
BigQuery PostMlGold Glue Catalog + S3 (Parquet)
Dataproc (Spark) EMR or Glue Spark EMR for full control, Glue for serverless
Cloud Composer MWAA or Step Functions MWAA for Airflow, Step Functions for simple DAGs
Cloud Functions Lambda TLC ingestion, export
Cloud Run ECS Fargate / Lambda ML stages (04a, 04b)

2. Architecture Overview

┌─────────────────────────────────────────────────────────────────────────────────┐
│                           AWS NYC TAXI PIPELINE                                    │
└─────────────────────────────────────────────────────────────────────────────────┘

  [TLC URLs]      [S3 Raw]        [Glue/Athena]      [S3 Dashboard]
       │              │                  │                    │
       ▼              ▼                  ▼                    ▼
  Lambda/ECS    s3://nyc-taxi-    Bronze → Silver    s3://nyc-taxi-
  (Stage 00)    raw-XXX/          → Gold (EMR/       dashboard-XXX/
                    taxi_type/         Glue Spark)        data/*.json
                    year/file.parquet       │                    │
                                            ▼                    ▼
                                    Glue Catalog          Static Website
                                    + S3 Parquet          + CloudFront

3. Prerequisites

  • AWS account with billing enabled
  • AWS CLI v2 installed and configured (aws configure)
  • IAM user/role with: AmazonS3FullAccess, AWSGlueConsoleFullAccess, AmazonEMRFullAccessPolicy_v2, AWSLambda_FullAccess, IAMFullAccess (or equivalent scoped policies)

4. Phase 1: Foundation (Day 1)

4.1 Set Variables

export AWS_REGION="us-east-1"
export AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
export PROJECT_PREFIX="nyc-taxi"
export RAW_BUCKET="${PROJECT_PREFIX}-raw-${AWS_ACCOUNT_ID}"
export DASHBOARD_BUCKET="${PROJECT_PREFIX}-dashboard-${AWS_ACCOUNT_ID}"
export AIRFLOW_BUCKET="${PROJECT_PREFIX}-airflow-${AWS_ACCOUNT_ID}"

4.2 Create S3 Buckets

aws s3 mb s3://${RAW_BUCKET} --region $AWS_REGION
aws s3 mb s3://${DASHBOARD_BUCKET} --region $AWS_REGION
aws s3 mb s3://${AIRFLOW_BUCKET} --region $AWS_REGION

# Optional: Enable versioning for raw data
aws s3api put-bucket-versioning --bucket $RAW_BUCKET --versioning-configuration Status=Enabled

4.3 Create Glue Database

aws glue create-database \
  --database-input "{\"Name\":\"nyc_taxi\",\"Description\":\"NYC TLC taxi data - Bronze, Silver, Gold\"}"

4.4 IAM Roles for EMR

Create a service role and instance profile for EMR. Use the AWS Console IAM or:

# EMR needs: EMR_DefaultRole, EMR_EC2_DefaultRole
# Create custom role for S3/Glue access if needed
aws iam create-role --role-name NYCTaxi-EMR-ServiceRole --assume-role-policy-document '{
  "Version": "2012-10-17",
  "Statement": [{"Effect": "Allow", "Principal": {"Service": "elasticmapreduce.amazonaws.com"}, "Action": "sts:AssumeRole"}]
}'
# Attach policies: AmazonElasticMapReduceRole, AmazonS3FullAccess, AWSGlueConsoleFullAccess

5. Phase 2: TLC Ingestion (Stage 00)

5.1 Lambda Function

Create lambda/ingest_tlc.py:

import boto3
import requests
from datetime import datetime
from dateutil.relativedelta import relativedelta

BUCKET = "nyc-taxi-raw-ACCOUNT_ID"  # Replace
TAXI_TYPES = ["yellow", "green", "fhv", "fhvhv"]
TLC_BASE = "https://d37ci6vzurychx.cloudfront.net/trip-data"

def handler(event, context):
    s3 = boto3.client("s3")
    taxi_type = event.get("taxi_type", "yellow")
    year_month = event.get("year_month")  # "2024-01"
    if not year_month:
        # Default: last 2 months
        end = datetime.now().replace(day=1) - relativedelta(months=2)
        year_month = end.strftime("%Y-%m")
    y, m = year_month.split("-")
    url = f"{TLC_BASE}/{taxi_type}_tripdata_{y}-{m}.parquet"
    key = f"{taxi_type}/{y}/{taxi_type}_tripdata_{y}-{m}.parquet"
    resp = requests.get(url, timeout=120)
    resp.raise_for_status()
    s3.put_object(Bucket=BUCKET, Key=key, Body=resp.content, ContentType="application/octet-stream")
    return {"status": "ok", "key": key}

5.2 Deploy Lambda

# Package (add requests layer or include in zip)
zip -r ingest.zip lambda/ingest_tlc.py
aws lambda create-function \
  --function-name nyc-taxi-ingest \
  --runtime python3.11 \
  --handler ingest_tlc.handler \
  --zip-file fileb://ingest.zip \
  --role arn:aws:iam::${AWS_ACCOUNT_ID}:role/lambda-execution-role \
  --timeout 120 \
  --memory-size 512

5.3 Step Functions (Orchestrate per Month)

Create a state machine that loops over taxi types and year-months, invoking the Lambda for each.


6. Phase 3: Spark Pipeline (Stages 01–03)

6.1 Adapt GCP Pipeline to AWS

GCP Code AWS Adaptation
spark.read.parquet("gs://bucket/path") spark.read.parquet("s3://bucket/path")
df.write.format("bigquery").option("table", ...) df.write.format("parquet").mode("overwrite").save("s3://bucket/glue_db/table/") + create Glue table
google.cloud.bigquery Use Glue Catalog or Athena for metadata; store data in S3 Parquet

6.2 Create Glue Tables (Bronze)

Run Glue Crawler or create tables manually:

# Run in Glue Studio or as script
import boto3
glue = boto3.client("glue")
glue.create_table(
    DatabaseName="nyc_taxi",
    TableInput={
        "Name": "raw_bronze_yellow_2024_01",
        "StorageDescriptor": {
            "Columns": [
                {"Name": "vendor_id", "Type": "bigint"},
                {"Name": "tpep_pickup_datetime", "Type": "timestamp"},
                # ... full schema
            ],
            "Location": f"s3://{RAW_BUCKET}/nyc_taxi/bronze/yellow/year=2024/month=01/",
            "InputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
            "OutputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
            "SerdeInfo": {"SerializationLibrary": "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"}
        },
        "PartitionKeys": []
    }
)

6.3 EMR Cluster + Spark Job

# Create cluster
CLUSTER_ID=$(aws emr create-cluster \
  --name "nyc-taxi-pipeline" \
  --release-label emr-6.15.0 \
  --applications Name=Spark Name=Hadoop Name=Hive \
  --ec2-attributes KeyName=your-key,SubnetId=subnet-xxx,InstanceProfile=EMR_EC2_DefaultRole \
  --instance-groups \
    InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m5.xlarge \
    InstanceGroupType=CORE,InstanceCount=2,InstanceType=m5.xlarge \
  --use-default-roles \
  --log-uri s3://${RAW_BUCKET}/emr-logs/ \
  --query 'ClusterId' --output text)

# Wait for cluster ready
aws emr wait cluster-running --cluster-id $CLUSTER_ID

# Submit PySpark job (upload script to S3 first)
aws s3 cp pipeline/01_gcs_to_bronze.py s3://${RAW_BUCKET}/scripts/
# Adapt script: replace GCS/BigQuery with S3/Glue

aws emr add-steps --cluster-id $CLUSTER_ID --steps '[
  {
    "Name": "GCS-to-Bronze",
    "ActionOnFailure": "CONTINUE",
    "HadoopJarStep": {
      "Jar": "command-runner.jar",
      "Args": ["spark-submit", "--deploy-mode", "cluster",
        "s3://'${RAW_BUCKET}'/scripts/01_s3_to_bronze.py"]
    }
  }
]'

6.4 Alternative: Glue Spark Jobs (Serverless)

Create Glue job with Spark script. No cluster to manage; pay per DPU-hour.


7. Phase 4: ML Stages (04a, 04b)

7.1 Option A: Lambda (if < 15 min, < 10 GB)

Use Lambda with container image for XGBoost/TensorFlow. Increase memory to 10 GB, timeout to 15 min.

7.2 Option B: ECS Fargate (Recommended)

  • Build Docker image with Python, pandas, xgboost, tensorflow, boto3
  • Push to ECR
  • Run as Fargate task triggered by Step Functions or EventBridge

7.3 Option C: SageMaker Processing

Use SageMaker Processing job for batch ML. Good for 04b (anomaly detection).


8. Phase 5: Export (Stage 05)

8.1 Lambda or ECS Task

# Query Athena for aggregated data
import boto3
import json

athena = boto3.client("athena")
s3 = boto3.client("s3")

query = "SELECT pickup_date, pickup_hour, SUM(trips) as trips FROM nyc_taxi.pre_ml_yellow_hourly GROUP BY 1,2"
result = athena.start_query_execution(QueryString=query, ResultConfiguration={"OutputLocation": "s3://bucket/results/"})
# Poll for completion, fetch results, write JSON to dashboard bucket

8.2 S3 Dashboard Structure

s3://nyc-taxi-dashboard-XXX/
├── index.html
├── data/
│   ├── time_series/
│   │   ├── yellow_daily.json
│   │   └── ...
│   ├── anomalies/
│   └── metadata/
│       └── dashboard_metadata.json

9. Phase 6: Orchestration

9.1 MWAA (Managed Airflow)

# Create environment (takes ~30 min)
aws mwaa create-environment \
  --name nyc-taxi-airflow \
  --airflow-version 2.7.0 \
  --source-bucket-arn arn:aws:s3:::${AIRFLOW_BUCKET} \
  --execution-role-arn arn:aws:iam::${AWS_ACCOUNT_ID}:role/MWAA-role \
  --network-configuration "SubnetIds=subnet-xxx,subnet-yyy,SecurityGroupIds=sg-xxx" \
  --requirements-file requirements.txt

9.2 DAG Example

# dags/nyc_taxi_pipeline.py
from airflow import DAG
from airflow.providers.amazon.aws.operators.lambda import LambdaInvokeFunctionOperator
from airflow.providers.amazon.aws.operators.emr import EmrAddStepsOperator
from airflow.providers.amazon.aws.sensors.emr import EmrStepSensor
from datetime import datetime, timedelta

default_args = {"owner": "airflow", "retries": 2, "retry_delay": timedelta(minutes=5)}

with DAG(
    "nyc_taxi_pipeline",
    default_args=default_args,
    schedule="0 2 * * *",
    start_date=datetime(2024, 1, 1),
    catchup=False,
) as dag:
    ingest = LambdaInvokeFunctionOperator(
        task_id="ingest_tlc",
        function_name="nyc-taxi-ingest",
        payload='{"taxi_type":"yellow","year_month":"2024-01"}',
    )
    bronze = EmrAddStepsOperator(
        task_id="bronze",
        job_flow_id="{{ var.value.emr_cluster_id }}",
        aws_conn_id="aws_default",
        steps=[...],
    )
    bronze_sensor = EmrStepSensor(...)
    ingest >> bronze >> bronze_sensor >> ...

9.3 Alternative: Step Functions

For simpler linear flow, use Step Functions with Lambda + EMR RunJobFlow.


10. Phase 7: Static Dashboard Hosting

10.1 Enable S3 Static Website

aws s3 website s3://${DASHBOARD_BUCKET} --index-document index.html --error-document index.html

10.2 CORS Configuration

aws s3api put-bucket-cors --bucket $DASHBOARD_BUCKET --cors-configuration '{
  "CORSRules": [{
    "AllowedHeaders": ["*"],
    "AllowedMethods": ["GET", "HEAD"],
    "AllowedOrigins": ["*"],
    "ExposeHeaders": []
  }]
}'

10.3 CloudFront (Optional)

Create distribution with S3 origin for faster global access and HTTPS.


11. Environment Variables Summary

Variable Example Description
AWS_REGION us-east-1 Region for all resources
RAW_BUCKET nyc-taxi-raw-123456789 S3 raw parquet
DASHBOARD_BUCKET nyc-taxi-dashboard-123456789 Dashboard JSON + static site
GLUE_DATABASE nyc_taxi Glue database name
EMR_CLUSTER_ID j-XXXXXXXX Active EMR cluster (if long-running)

12. Cost Estimation

Actual costs depend on data volume, cluster size, and run frequency. Estimate through a test-phase run before production:

  1. Test run batch: Use only 2 specific months of data (e.g. 2024-01 and 2024-02) for the full pipeline.
  2. Run ingest → Spark (Bronze/Silver/Gold) → ML → export end-to-end.
  3. Monitor billing in AWS Cost Explorer for S3, EMR, Glue, Athena, Lambda, MWAA.
  4. Scale estimates to full data volume and schedule.

Cost tips: Use EMR spot instances, Glue serverless Spark, S3 Intelligent-Tiering.


13. Implementation Checklist

  • Phase 1: S3 buckets, Glue database, IAM roles
  • Phase 2: Lambda ingest, test with one month
  • Phase 3: Adapt pipeline scripts (S3/Glue), run EMR job
  • Phase 4: Deploy 04a/04b (ECS or SageMaker)
  • Phase 5: Export Lambda/task to S3
  • Phase 6: MWAA or Step Functions orchestration
  • Phase 7: Static website, CORS, CloudFront

14. Pipeline Code Adaptation Reference

GCP (pipeline/) AWS Equivalent
google.cloud.storage boto3.client("s3")
gs://bucket/path s3://bucket/path
google.cloud.bigquery Athena start_query_execution or Glue Catalog
df.write.format("bigquery") df.write.parquet("s3://...") + Glue create_table
DataprocSparkSession EMR Spark (standard SparkSession)
pipeline_utils.config Env vars: RAW_BUCKET, DASHBOARD_BUCKET, GLUE_DATABASE

15. Troubleshooting

Issue Solution
Lambda timeout on ingest Increase timeout to 120s; parquet files can be large
EMR cluster fails to start Check subnet, security group, IAM instance profile
Glue table not found Run crawler or create table manually; ensure S3 path exists
Athena query slow Partition tables by year/month; use columnar format (Parquet)