Skip to content

iraphaelfernandes/iot-traffic-processor

Repository files navigation

IoT Traffic Processor

A cloud-native solution for processing, summarizing, and consolidating IoT traffic data from geographically distributed devices using AWS services.

Overview

This project implements a serverless architecture to process IoT traffic data collected from multiple enterprise branches. The solution automatically detects anomalies and bottlenecks in network traffic to guide infrastructure investment decisions.

Key Features

  • Scalable Processing: Serverless Lambda functions handle variable workloads without infrastructure management
  • Resilient Pipeline: SQS queues ensure reliable message delivery and processing even during worker unavailability
  • Efficient Storage: Minimal cloud storage footprint with automatic cleanup of processed files
  • Real-time Processing: Immediate processing of uploaded IoT traffic files
  • Consolidated Analytics: Aggregated traffic statistics with statistical measures (average, standard deviation)

Architecture

The solution consists of three main components:

1. Upload Client

  • Uploads IoT traffic CSV files from branch locations to S3
  • Sends notification messages to SQS queue to trigger processing
  • Runs locally or on branch infrastructure

2. Summarize Worker (Lambda)

  • Triggered by SQS messages when new files are uploaded
  • Reads CSV files from S3
  • Summarizes traffic data per source-destination IP pair per day:
    • Calculates total flow duration
    • Counts total forward packets
  • Sends summarized data to consolidation queue

3. Consolidator Worker (Lambda)

  • Processes summarized traffic from SQS queue
  • Updates DynamoDB with consolidated statistics:
    • Running totals and counts
    • Average flow duration
    • Average packet counts
    • Standard deviation calculations

4. Export Client

  • Retrieves consolidated traffic data from DynamoDB
  • Exports results to CSV file for analysis
  • Runs locally to generate final reports

AWS Services Used

  • S3: Raw IoT traffic file storage
  • SQS: Message queues for decoupled processing
    • new-file-queue: Triggers summarization
    • consolidate-queue: Triggers consolidation
  • Lambda: Serverless compute for workers
    • SummarizeWorker (Java 17)
    • ConsolidatorWorker (Java 17)
  • DynamoDB: Consolidated traffic statistics storage
  • CloudWatch: Logging and monitoring

Prerequisites

  • AWS Account with appropriate IAM permissions
  • AWS CLI configured with credentials
  • Java 17 or later
  • Maven 3.6+
  • Bash shell

AWS CLI Configuration

Before running any scripts, you must configure the AWS CLI with your AWS credentials. This is required for the solution to access your AWS account.

Configure AWS CLI

Run the following command to configure your AWS credentials:

aws configure

You will be prompted to enter:

  1. AWS Access Key ID: Your AWS access key (starts with AKIA...)
  2. AWS Secret Access Key: Your AWS secret key
  3. Default region: The AWS region where you want to deploy (e.g., us-east-1, eu-west-1)
  4. Default output format: Leave as json (or press Enter for default)

Example:

AWS Access Key ID [None]: AKIAIOSFODNN7EXAMPLE
AWS Secret Access Key [None]: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
Default region name [None]: us-east-1
Default output format [None]: json

Verify Configuration

To verify your AWS CLI is properly configured, run:

aws sts get-caller-identity

This should return your AWS account ID, user ARN, and username. If this command fails, your credentials are not properly configured.

Important Security Notes

  • Never commit credentials to version control
  • Keep your access keys secure - treat them like passwords
  • Consider using IAM roles for production deployments instead of long-term access keys
  • Rotate your access keys regularly

Installation & Setup

1. Clone the Repository

Download and extract the project
cd iot-traffic-processor-main

2. Build the Project

mvn clean package

This creates a fat JAR (target/iot-traffic-processor.jar) containing all dependencies for Lambda deployment.

3. Initial Setup (One-Time Only)

Run the full deployment script to set up all AWS infrastructure and deploy Lambda functions:

./full.sh

This script performs a complete one-time setup:

  • Creates a new S3 bucket for raw IoT traffic files
  • Creates SQS queues (new-file-queue and consolidate-queue)
  • Creates DynamoDB table (IoTTrafficConsolidated)
  • Builds the Java application
  • Deploys SummarizeWorker Lambda function
  • Deploys ConsolidatorWorker Lambda function
  • Configures event source mappings to connect SQS queues to Lambda functions

Run this script only once at the beginning to initialize your infrastructure.

4. Alternative: Manual Setup (If Not Using full.sh)

If you prefer to set up infrastructure manually:

./create-infra.sh

Then deploy Lambda functions individually:

./deploy-summarize.sh
./deploy-consolidator.sh

Usage

Uploading IoT Traffic Files

Use the upload script to upload IoT traffic files. Run this script every time you want to upload a new file:

./upload.sh path/to/your/iot-traffic-file.csv

What this script does:

  1. Uploads the CSV file to S3 bucket
  2. Sends a notification message to the SQS queue
  3. Automatically triggers the SummarizeWorker Lambda function
  4. Initiates the complete processing pipeline

Example:

./upload.sh data-20221205.csv

You can upload multiple files sequentially. Each file will be processed independently through the pipeline.

Monitoring Processing

Watch Lambda logs in real-time:

# Monitor SummarizeWorker
aws logs tail /aws/lambda/SummarizeWorker --follow

# Monitor ConsolidatorWorker
aws logs tail /aws/lambda/ConsolidatorWorker --follow

Or view in AWS Console:

  • CloudWatch → Log Groups → /aws/lambda/SummarizeWorker
  • CloudWatch → Log Groups → /aws/lambda/ConsolidatorWorker

Exporting Results

After processing completes (typically 1-2 minutes), export consolidated traffic data:

mvn exec:java -Dexec.mainClass=com.iotproject.ExportClient

This generates a CSV file with the consolidated traffic statistics:

  • SrcIP: Source IP address
  • DstIP: Destination IP address
  • Date: Traffic date
  • TotalDuration: Total flow duration for the day
  • TotalPackets: Total forward packets for the day
  • Count: Number of traffic records aggregated
  • AvgDuration: Average flow duration
  • AvgPackets: Average forward packets

Data Flow

Branch IoT Device
    ↓
Upload Client (./upload.sh)
    ↓
S3 Bucket (raw/)
    ↓
SQS new-file-queue
    ↓
SummarizeWorker Lambda
    ↓
SQS consolidate-queue
    ↓
ConsolidatorWorker Lambda
    ↓
DynamoDB (IoTTrafficConsolidated)
    ↓
Export Client (mvn exec:java)
    ↓
CSV Report

Input Data Format

IoT traffic CSV files should contain the following columns:

  • Src IP: Source IP address
  • Dst IP: Destination IP address
  • Flow Duration: Duration of the traffic flow
  • Tot Fwd Pkt: Total forward packets

Example:

Src IP,Dst IP,Flow Duration,Tot Fwd Pkt
192.168.1.10,10.0.0.5,1500,45
192.168.1.11,10.0.0.6,2300,67

Project Structure

iot-traffic-processor-main/
├── src/main/java/com/iotproject/
│   ├── UploadClient.java          # Branch upload application
│   ├── SummarizeWorker.java       # Lambda: summarizes traffic
│   ├── ConsolidatorWorker.java    # Lambda: consolidates statistics
│   ├── ExportClient.java          # Export consolidated data
│   └── App.java                   # Utility class
├── pom.xml                         # Maven configuration
├── create-infra.sh                # Infrastructure setup
├── full.sh                        # Complete deployment
├── deploy-summarize.sh            # Deploy SummarizeWorker
├── deploy-consolidator.sh         # Deploy ConsolidatorWorker
├── upload.sh                      # Upload IoT traffic files
├── bucket-name.txt                # Saved S3 bucket name
├── region.txt                     # Saved AWS region
└── README.md                      # This file

Configuration

Environment Variables

The following environment variables are automatically set by deployment scripts:

  • IOT_BUCKET: S3 bucket name for raw traffic files
  • NEW_FILE_QUEUE_URL: SQS queue URL for new file notifications
  • CONSOLIDATE_QUEUE_URL: SQS queue URL for consolidation tasks
  • AWS_REGION: AWS region for all resources

Lambda Configuration

  • Runtime: Java 17
  • Memory: 512 MB (SummarizeWorker), 512 MB (ConsolidatorWorker)
  • Timeout: 60 seconds
  • Batch Size: 1 (SummarizeWorker), 10 (ConsolidatorWorker)

Resilience & Reliability

The solution ensures processing reliability through:

  1. SQS Message Queues: Decouples components and provides message persistence
  2. Visibility Timeout: Messages remain in queue if Lambda fails (600 seconds)
  3. Batch Item Failures: Failed messages are automatically retried
  4. Event Source Mapping: Lambda automatically polls SQS for new messages

Performance Considerations

  • Processing Speed: Files are processed within seconds of upload
  • Scalability: Lambda automatically scales to handle concurrent uploads
  • Storage Optimization: Raw files can be deleted after summarization to minimize costs
  • DynamoDB: Uses on-demand billing for variable workloads

Troubleshooting

Lambda Function Not Triggering

  1. Verify event source mapping exists:

    aws lambda list-event-source-mappings --function-name SummarizeWorker
  2. Check SQS queue has messages:

    aws sqs receive-message --queue-url <queue-url>
  3. Verify IAM role has necessary permissions

Processing Failures

  1. Check CloudWatch logs for error messages
  2. Verify S3 bucket and DynamoDB table exist
  3. Confirm CSV file format matches expected schema
  4. Check Lambda memory and timeout settings

Missing Export Results

  1. Wait at least 1-2 minutes for processing to complete

  2. Verify DynamoDB table contains data:

    aws dynamodb scan --table-name IoTTrafficConsolidated
  3. Check for Lambda execution errors in CloudWatch

Dependencies

  • AWS SDK v2: 2.25.41
  • Apache Commons CSV: 1.10.0
  • SLF4J: 2.0.9
  • JUnit: 4.13.2 (testing)
  • Maven Shade Plugin: 3.5.0 (fat JAR creation)

Testing

Run unit tests:

mvn test

Cleanup

To remove all AWS resources:

# Delete Lambda functions
aws lambda delete-function --function-name SummarizeWorker
aws lambda delete-function --function-name ConsolidatorWorker

# Delete SQS queues
aws sqs delete-queue --queue-url <new-file-queue-url>
aws sqs delete-queue --queue-url <consolidate-queue-url>

# Delete DynamoDB table
aws dynamodb delete-table --table-name IoTTrafficConsolidated

# Delete S3 bucket (empty it first)
aws s3 rm s3://<bucket-name> --recursive
aws s3 rb s3://<bucket-name>

License

This project is provided as-is for educational purposes.

Support

For issues or questions:

  1. Check CloudWatch logs for detailed error messages
  2. Verify AWS credentials and permissions
  3. Ensure all prerequisites are installed
  4. Review the troubleshooting section above

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published