Python ETL application that extracts aviation weather data from the AviationWeather API, processes METAR temperature observations, stores intermediate data in a local parquet-based data lake, and persists refined datasets into a PostgreSQL data warehouse hosted on Aiven.
The current workflow is configured for the Ezeiza airport weather station:
SAEZ
Build an end-to-end ETL pipeline for hourly temperature data from an aviation weather station.
The application focuses on:
- Extracting station metadata from the AviationWeather API.
- Extracting METAR temperature reports.
- Performing full and incremental data extraction.
- Storing raw and processed data in data lake layers.
- Cleaning and transforming extracted data.
- Computing daily maximum and minimum temperature summaries.
- Persisting refined data into PostgreSQL tables.
- Generating Sphinx documentation for the project modules.
Application_API_ETL/
├── README.md
└── main/
├── main.py
├── extraction.py
├── prosecution.py
├── storage.py
├── config.ini
├── requireiments.txt
├── docs/
│ ├── conf.py
│ ├── index.rst
│ ├── extraction.rst
│ ├── prosecution.rst
│ ├── storage.rst
│ └── build/
└── __pycache__/
The pipeline is orchestrated from main/main.py.
Implemented in main/extraction.py.
The application queries:
https://aviationweather.gov/api/data/stationinfohttps://aviationweather.gov/api/data/metar
The extraction module provides:
station_data(): extracts station metadata.data_full(): extracts METAR temperature data from 00 UTC to the current UTC hour.data_incremental(): extracts the most recent hourly METAR data.
The extracted fields include:
- Station metadata:
icaoId,lat,lon,elev,country. - METAR data:
reportTime,temp.
Implemented in main/storage.py.
The application stores parquet files in a local data lake structure with three layers:
landing: raw extracted data.trusted: cleaned and typed data.refined: final processed data and aggregates.
The generated structure follows this pattern:
main/data_lake/<layer>/aviationweather_api/<endpoint>/<airport>/
Incremental data is additionally partitioned by report date/hour.
Implemented in main/prosecution.py.
The processing module performs:
- Duplicate removal.
- Removal of records without report time.
- Missing-value handling.
- Date/time splitting into year, month, day, hour, and minute fields.
- Type conversion for memory optimization.
- Column renaming.
- Concatenation of full and incremental data.
- Maximum temperature calculation.
- Minimum temperature calculation.
Implemented in main/storage.py.
The application reads PostgreSQL credentials from:
main/config.ini
It creates and writes to the following PostgreSQL tables:
public.metadata_estacionespublic.data_temperaturepublic.max_temperaturepublic.min_temperature
The warehouse writer checks whether a record already exists before inserting it.
- Python 3
- requests
- pandas
- SQLAlchemy
- psycopg2
- configparser
- fastparquet
- PostgreSQL
- Aiven
- Sphinx
Clone the repository:
git clone https://github.com/fabriciolopretto/Application_API_ETL.git
cd Application_API_ETL/mainCreate and activate a virtual environment:
python -m venv .venv
source .venv/bin/activateOn Windows:
.venv\Scripts\activateInstall dependencies:
pip install -r requireiments.txt
pip install fastparquetNote: the dependency file is currently named requireiments.txt. The standard spelling would be requirements.txt.
Before running the pipeline, fill in the PostgreSQL credentials in:
main/config.ini
Expected format:
[postgres]
host=
port=
user=
pwd=
dbname=These credentials are used to connect to the PostgreSQL database hosted on Aiven.
From the main/ directory, run:
python main.pyThe script will:
- Extract station metadata for
SAEZ. - Extract full METAR temperature data for the current UTC day.
- Extract the latest incremental METAR temperature report.
- Save raw data into the
landinglayer. - Clean and transform the data.
- Save processed data into the
trustedlayer. - Concatenate full and incremental records.
- Calculate maximum and minimum temperature summaries.
- Save refined data into the
refinedlayer. - Persist the refined tables into PostgreSQL.
- Read back
public.data_temperatureas a test query.
The repository includes Sphinx documentation under:
main/docs/
The built HTML documentation is already present in:
main/docs/build/html/
To rebuild the documentation from the main/docs/ directory:
make htmlOn Windows:
make.bat htmlThe current version is hardcoded to use the SAEZ station. To run the pipeline for another airport, update the aeropuerto value in main.py.
The application writes parquet files using fastparquet, but fastparquet is not listed in requireiments.txt, so it should be installed manually or added to the dependency file.
The local data lake directories must exist or be creatable by the script. Incremental storage creates missing partition directories, while full storage assumes the destination path is available.
The project README notes that this version differs from previous development branches because werehouse_save() persists data to a PostgreSQL database hosted on Aiven, creating tables with SQL statements and inserting records only when they do not already exist.