diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..6ba4335 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,14 @@ +{ + "sqltools.connections": [ + { + "ssh": "Disabled", + "previewLimit": 50, + "server": "localhost", + "port": 5432, + "driver": "PostgreSQL", + "name": "postgres", + "database": "postgres", + "username": "postgres" + } + ] +} \ No newline at end of file diff --git a/README.md b/README.md index a4a46a4..81f6591 100644 --- a/README.md +++ b/README.md @@ -1,208 +1,19 @@ -# AirLife ETL Pipeline - Starter Repository +# AirLife ETL Pipeline -Welcome to the AirLife ETL Pipeline workshop! This repository contains the skeleton code for building a simple Extract, Transform, Load (ETL) pipeline for aircraft and airport data. +## What This Does +This pipeline extracts airport data from a CSV file and live flight data from an API, +cleans the data, and loads it into a PostgreSQL database. -## 🎯 Workshop Goals +## How to Run It +1. Install dependencies: `pip install -r requirements.txt` +2. Set up PostgreSQL database +3. Update database connection in `src/load_data.py` +4. Run: `python main.py` -By the end of this 3-hour workshop, you will have: -- Extracted airport data from a CSV file -- Fetched live flight data from the OpenSky Network API -- Cleaned and transformed the data using Python/pandas -- Loaded the data into a PostgreSQL database -- Verified your pipeline works end-to-end +## What We Built +- **Extract:** Gets airport data from CSV and flight data from OpenSky Network API +- **Transform:** Cleans invalid coordinates and converts units +- **Load:** Puts clean data into PostgreSQL tables -## πŸ“ Repository Structure - -``` -ETL-AirLife/ -β”œβ”€β”€ README.md # This file -β”œβ”€β”€ requirements.txt # Python dependencies -β”œβ”€β”€ main.py # Main pipeline orchestrator -β”œβ”€β”€ database_setup.sql # SQL script to create tables -β”œβ”€β”€ data/ -β”‚ └── airports.csv # Sample airport data (50 airports) -└── src/ - β”œβ”€β”€ extract_data.py # Data extraction functions - β”œβ”€β”€ transform_data.py # Data cleaning and transformation - └── load_data.py # Database loading functions -``` - -## πŸš€ Quick Start - -### 1. Prerequisites - -Make sure you have installed: -- Python 3.7 or higher -- PostgreSQL 12 or higher -- Git - -### 2. Setup - -1. **Fork this repository** to your GitHub account -2. **Clone your fork** locally: - ```bash - git clone https://github.com/YOUR_USERNAME/ETL-AirLife.git - cd ETL-AirLife - ``` - -3. **Install Python dependencies**: - ```bash - pip install -r requirements.txt - ``` - -4. **Create PostgreSQL database**: - ```bash - # Connect to PostgreSQL - psql -U your_username -d postgres - - # Create database - CREATE DATABASE airlife_db; - - # Exit and reconnect to new database - \q - psql -U your_username -d airlife_db - - # Create tables - \i database_setup.sql - ``` - -### 3. Configure Database Connection - -Edit the database configuration in `src/load_data.py`: - -```python -DATABASE_CONFIG = { - 'username': 'your_username', # Replace with your PostgreSQL username - 'password': 'your_password', # Replace with your PostgreSQL password - 'host': 'localhost', - 'port': '5432', - 'database': 'airlife_db' -} -``` - -## πŸ› οΈ Your Tasks - -The repository contains skeleton code with TODO comments. Your job is to implement the missing functionality: - -### Part 1: Data Extraction (`src/extract_data.py`) -- [ ] Implement `extract_airports()` to read CSV data -- [ ] Implement `extract_flights()` to fetch data from OpenSky Network API -- [ ] Handle errors gracefully (network issues, API limits) - -### Part 2: Data Transformation (`src/transform_data.py`) -- [ ] Implement `clean_airports()` to remove invalid data -- [ ] Implement `clean_flights()` to standardize API data -- [ ] Convert units (altitude meters to feet) -- [ ] Handle missing values appropriately - -### Part 3: Data Loading (`src/load_data.py`) -- [ ] Implement `load_to_database()` using pandas to_sql() -- [ ] Implement `verify_data()` to check data was loaded correctly -- [ ] Update database connection configuration - -### Part 4: Integration (`main.py`) -- [ ] Uncomment the function calls once each component works -- [ ] Test the full pipeline end-to-end -- [ ] Add error handling for robustness - -## πŸ§ͺ Testing Your Code - -Each module can be tested independently: - -```bash -# Test extraction -python src/extract_data.py - -# Test transformation -python src/transform_data.py - -# Test loading (after implementing database config) -python src/load_data.py - -# Run full pipeline -python main.py -``` - -## πŸ“Š Sample Data - -The `data/airports.csv` file contains 50 airports including: -- Major European airports (CDG, LHR, FRA, etc.) -- Valid coordinates and IATA codes -- Some invalid data for testing your cleaning logic - -The OpenSky Network API provides real-time flight data over Europe with: -- Aircraft identifiers and callsigns -- Current positions (latitude, longitude, altitude) -- Ground speed and heading information - -## ⚠️ Common Issues - -**API Rate Limits**: The OpenSky Network has rate limits. If you get errors: -- Wait a few seconds between requests -- Test with smaller geographic areas first -- Use the `test_api_connection()` function to debug - -**Database Connection**: If you can't connect to PostgreSQL: -- Check that PostgreSQL service is running -- Verify your username/password -- Make sure the `airlife_db` database exists -- Ensure tables are created with `database_setup.sql` - -**Import Errors**: Make sure you're in the project root directory when running scripts - -## 🎯 Success Criteria - -Your ETL pipeline is working when: -1. βœ… `python main.py` runs without errors -2. βœ… Airport data is loaded into the `airports` table -3. βœ… Flight data (if API accessible) is loaded into the `flights` table -4. βœ… You can run SQL queries on your loaded data -5. βœ… Your code handles errors gracefully - -## πŸ” Example Queries - -Once your data is loaded, try these queries: - -```sql --- Count total airports -SELECT COUNT(*) FROM airports; - --- Show airports by country -SELECT country, COUNT(*) as airport_count -FROM airports -GROUP BY country -ORDER BY airport_count DESC; - --- Show current flights (if any) -SELECT callsign, origin_country, altitude -FROM flights -WHERE altitude > 10000 -LIMIT 5; -``` - -## πŸ“š Resources - -- [OpenSky Network API Documentation](https://opensky-network.org/apidoc/) -- [Pandas Documentation](https://pandas.pydata.org/docs/) -- [SQLAlchemy to_sql() Guide](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_sql.html) -- [PostgreSQL Documentation](https://www.postgresql.org/docs/) - -## 🀝 Getting Help - -If you're stuck: -1. Read the TODO comments carefully - they contain hints -2. Test each module individually before running the full pipeline -3. Use the test functions provided (like `test_api_connection()`) -4. Check the error messages - they usually point to the problem -5. Ask your instructor or classmates - -## πŸ† Next Steps - -After completing this workshop, you'll be ready for the larger AirLife project where you'll design your own startup's complete data pipeline with more advanced features like: -- Multiple data sources -- Complex transformations -- Production-ready error handling -- Data quality monitoring -- Automated scheduling - -Good luck building your first ETL pipeline! πŸš€ \ No newline at end of file +## Team Members +BenoΓ―t, Mohammed et Quentin \ No newline at end of file diff --git a/database_setup.sql b/database_setup.sql old mode 100644 new mode 100755 diff --git a/main.py b/main.py index 3d07bd9..99b3aa5 100644 --- a/main.py +++ b/main.py @@ -24,38 +24,39 @@ def main(): print("πŸ“₯ Extracting data from sources...") # TODO: Call the extraction functions - # airports = extract_airports() - # flights = extract_flights() - + airports = extract_airports() + flights = extract_flights() + # Uncomment the lines above once you've implemented the functions - print("⚠️ Extraction functions not yet implemented") - return + # print("⚠️ Extraction functions not yet implemented") + # Step 2: Transform data print("\n=== TRANSFORMATION ===") print("πŸ”„ Cleaning and transforming data...") # TODO: Call the transformation functions - # clean_airports_data = clean_airports(airports) - # clean_flights_data = clean_flights(flights) - # final_airports, final_flights = combine_data(clean_airports_data, clean_flights_data) + clean_airports_data = clean_airports(airports) + clean_flights_data = clean_flights(flights) + final_airports, final_flights = combine_data(clean_airports_data, clean_flights_data) # Step 3: Load data print("\n=== LOADING ===") print("πŸ’Ύ Loading data to database...") # TODO: Call the loading function - # load_to_database(final_airports, final_flights) + load_to_database(final_airports, final_flights) # Step 4: Verify everything worked print("\n=== VERIFICATION ===") print("βœ… Verifying data was loaded correctly...") # TODO: Call the verification function - # verify_data() + verify_data() print("\nπŸŽ‰ ETL Pipeline completed!") print("=" * 50) + return if __name__ == "__main__": main() diff --git a/src/extract_data.py b/src/extract_data.py index cea80ab..d6ab193 100644 --- a/src/extract_data.py +++ b/src/extract_data.py @@ -23,14 +23,15 @@ def extract_airports(): try: # TODO: Read the airports.csv file using pandas # The file is located at: data/airports.csv - + # Hint: Use pd.read_csv() + df = pd.read_csv('data/airports.csv') # For now, return an empty DataFrame - df = pd.DataFrame() + #df = pd.DataFrame() # TODO: Print how many airports were loaded - # Example: print(f"Loaded {len(df)} airports") + print(f"Loaded {len(df)} airports") - print("⚠️ Airport extraction not yet implemented") + # print("⚠️ Airport extraction not yet implemented") return df except Exception as e: @@ -61,22 +62,33 @@ def extract_flights(): print("Making API request... (this may take a few seconds)") # TODO: Make the API request using requests.get() - + # Hint: response = requests.get(url, params=params, timeout=10) + response = requests.get(url, params=params, timeout=10) + # TODO: Check if the response is successful - + # Hint: Check response.status_code == 200 + print("response status : ", response.status_code == 200) + # TODO: Get the JSON data from the response - + # Hint: data = response.json() + data = response.json() + # TODO: Extract the 'states' data from the JSON # The API returns: {"time": 123456789, "states": [[aircraft_data], [aircraft_data], ...]} - + # Hint: states = data['states'] if data['states'] else [] + states = data['states'] if data['states'] else [] + # TODO: Convert to DataFrame + # Hint: df = pd.DataFrame(states) + df = pd.DataFrame(states) # TODO: Print how many flights were found # Example: print(f"Found {len(df)} active flights") - + print(f"Found {len(df)} active flights") + # For now, return empty DataFrame - print("⚠️ Flight extraction not yet implemented") - return pd.DataFrame() + # print("⚠️ Flight extraction not yet implemented") + return df except requests.exceptions.RequestException as e: print(f"❌ Network error fetching flight data: {e}") diff --git a/src/load_data.py b/src/load_data.py index 1bb86ee..b668167 100644 --- a/src/load_data.py +++ b/src/load_data.py @@ -14,8 +14,8 @@ # Database connection configuration # TODO: Update these values with your actual database credentials DATABASE_CONFIG = { - 'username': 'your_username', - 'password': 'your_password', + 'username': 'postgres', + 'password': 'thdvASedvqlpap', 'host': 'localhost', 'port': '5432', 'database': 'airlife_db' @@ -36,17 +36,19 @@ def load_to_database(airports_df, flights_df): print("πŸ’Ύ Loading data to PostgreSQL database...") # TODO: Create connection string using the function above - # connection_string = get_connection_string() + connection_string = get_connection_string() try: # TODO: Create SQLAlchemy engine + engine = create_engine(connection_string) + + # print("⚠️ Database loading not yet implemented") - print("⚠️ Database loading not yet implemented") - return # TODO: Load airports data # Use pandas to_sql method to insert data - # + airports_df.to_sql('airports', engine, if_exists='replace', index=False) + # Parameters explanation: # - 'airports': table name in database # - engine: database connection @@ -55,14 +57,16 @@ def load_to_database(airports_df, flights_df): # TODO: Load flights data (only if not empty) # Check if flights_df is not empty before loading - # flights_df.to_sql('flights', engine, if_exists='replace', index=False) + if not flights_df.empty: + flights_df.to_sql('flights', engine, if_exists='replace', index=False) # TODO: Print loading statistics - # print(f"βœ… Loaded {len(airports_df)} airports to database") - # if not flights_df.empty: - # print(f"βœ… Loaded {len(flights_df)} flights to database") - # else: - # print("ℹ️ No flight data to load") + print(f"βœ… Loaded {len(airports_df)} airports to database") + if not flights_df.empty: + print(f"βœ… Loaded {len(flights_df)} flights to database") + else: + print("ℹ️ No flight data to load") + return except Exception as e: print(f"❌ Error loading data to database: {e}") @@ -82,27 +86,31 @@ def verify_data(): try: # TODO: Create SQLAlchemy engine - # engine = create_engine(connection_string) + engine = create_engine(connection_string) + + # print("⚠️ Data verification not yet implemented") - print("⚠️ Data verification not yet implemented") - return # TODO: Count airports in database - # print(f"πŸ“Š Airports in database: {airports_count.iloc[0]['count']}") + airports_count = pd.read_sql("SELECT COUNT(*) as count FROM airports", engine) + print(f"πŸ“Š Airports in database: {airports_count.iloc[0]['count']}") # TODO: Count flights in database - # print(f"πŸ“Š Flights in database: {flights_count.iloc[0]['count']}") + flights_count = pd.read_sql("SELECT COUNT(*) as count FROM flights", engine) + print(f"πŸ“Š Flights in database: {flights_count.iloc[0]['count']}") # TODO: Show sample airport data - # print("\nπŸ“‹ Sample airports:") - # print(sample_airports.to_string(index=False)) + sample_airports = pd.read_sql("SELECT name, city, country FROM airports LIMIT 3", engine) + print("\nπŸ“‹ Sample airports:") + print(sample_airports.to_string(index=False)) # TODO: Show sample flight data (if any exists) - # sample_flights = pd.read_sql("SELECT callsign, origin_country, altitude FROM flights LIMIT 3", engine) - # if not sample_flights.empty: - # print("\n✈️ Sample flights:") - # print(sample_flights.to_string(index=False)) - + # Hint: Check if flights table has data first + sample_flights = pd.read_sql("SELECT callsign, origin_country, altitude FROM flights LIMIT 3", engine) + if not sample_flights.empty: + print("\n✈️ Sample flights:") + print(sample_flights.to_string(index=False)) + return except Exception as e: print(f"❌ Error verifying data: {e}") diff --git a/src/transform_data.py b/src/transform_data.py index 67d7546..b9d224c 100644 --- a/src/transform_data.py +++ b/src/transform_data.py @@ -31,20 +31,25 @@ def clean_airports(airports_df): df = airports_df.copy() # TODO: Remove rows with missing latitude or longitude - # df = df.dropna(subset=['latitude', 'longitude']) + # Hint: Use .dropna(subset=['latitude', 'longitude']) + df = df.dropna(subset=['latitude', 'longitude']) # TODO: Remove airports with invalid coordinates # Latitude should be between -90 and 90 # Longitude should be between -180 and 180 + df = df[(df['latitude'] >= -90) & (df['latitude'] <= 90)] + df = df[(df['longitude'] >= -180) & (df['longitude'] <= 180)] # TODO: Handle missing IATA codes (replace empty strings or 'N' with None) + df['iata_code'] = df['iata_code'].replace(['', 'N', '\\N'], None) # TODO: Convert altitude to numeric (handle non-numeric values) + df['altitude'] = pd.to_numeric(df['altitude'], errors='coerce') # TODO: Print how many airports remain after cleaning - # print(f"After cleaning: {len(df)} airports remain") + print(f"After cleaning: {len(df)} airports remain") - print("⚠️ Airport cleaning not yet implemented") + # print("⚠️ Airport cleaning not yet implemented") return df def clean_flights(flights_df): @@ -82,24 +87,30 @@ def clean_flights(flights_df): ] # Make a copy to avoid modifying the original - df = flights_df.copy() + df = flights_df.copy().iloc[:,:12] # TODO: Assign column names to the DataFrame + df.columns = expected_columns # TODO: Remove flights with missing coordinates + df = df.dropna(subset=['longitude', 'latitude']) # TODO: Convert altitude from meters to feet (multiply by 3.28084) # This makes it easier to understand for aviation + df['altitude'] = df['altitude'] * 3.28084 # TODO: Remove flights with invalid coordinates # Same coordinate bounds as airports + df = df[(df['latitude'] >= -90) & (df['latitude'] <= 90)] + df = df[(df['longitude'] >= -180) & (df['longitude'] <= 180)] # TODO: Clean callsign (remove extra whitespace) + df['callsign'] = df['callsign'].str.strip() # TODO: Print how many flights remain after cleaning - # print(f"After cleaning: {len(df)} flights remain") + print(f"After cleaning: {len(df)} flights remain") - print("⚠️ Flight cleaning not yet implemented") + # print("⚠️ Flight cleaning not yet implemented") return df def combine_data(airports_df, flights_df):