This component focuses on building a modular data engineering pipeline for processing raw flood-monitoring data with a complex, deeply nested JSON format.
- Java SDK 21
- Python 3.19.3
- PySpark
Figure 1
Data Engineering Architecture using PySpark
Firstly, a SparkSession is initialised to ingest the JSON file into a Spark DataFrame. As illustrated in Figure 2 below, the actual data exists in the ‘items’ array, whereas other sibling fields contain API metadata. To isolate the relevant records, I employed the PySpark explode() function to flatten the ‘items’ array, converting each element of the array into distinct rows in the DataFrame. This approach effectively normalises the data for further processing.
Figure 2
Spark DataFrame Schema of a Measurements JSON File
Despite that, the initial flattening process reveals that ‘latestReading’ field remains nested as a ‘StructType’ (as seen in Figure 2). To strictly normalise the data, I extracted the field (alongside its sub-fields) into a separate Spark DataFrame, establishing a relationship via the ‘measure’ key (later aliased as ‘measurementId’). This separation prepares the data for gold level and supports the Star Schema in Figure 3.
Subsequently, the pipeline executes required transformation and cleaning processes, including deduplication and null handling. As shown in Figure 1, a custom validation function checks for malformed data, such as list objects appearing in float columns, and separates them into a quarantine DataFrame. This implementation adheres to Data Engineering best practices as it maintains the data integrity of the DataFrame (silver layer) without discarding potentially valuable data. Additionally, it enables us to conduct further inspection and logic refinement.
Lastly, Figure 3 below illustrates the database schema which represents the gold layer of the medallion architecture. This layer enables analysts to query high-quality, structured data using simplified SQL joins to generate operational insights, such as retrieving past 10 measurement readings for a station in a specific town.
Figure 3
Flood Monitoring Database Star Schema
The query below retrieves the latest readings from all measurements across all stations from the defined temporary views in PySpark.
Run the following command to clone this repository.
git clone https://github.com/ikml-bsrn/UK_Environment_Agency_data_engineering_spark.git
Install the dependencies using the command below.
pip install -r requirements.txt
Use the pyspark_development.ipynb Jupyter notebook to follow the step-by-step development process, from raw data ingestion to the final analytical demonstration.
To execute the full data pipeline, run the main script from your terminal:
python main.py
Note: You should see progress bars indicating the processing of files and warnings for any malformed data being quarantined (as seen in the screenshot below).


