Scalable multi-site architecture for water treatment plant simulation and edge gateway functionality. Supports multiple plants with centralized templates, structured protocol clients, and dynamic site selection. Perfect for both development simulation and production data collection from real PLCs.
backend/
βββ config/ # Multi-site configuration management
β βββ sites/ # Individual site configurations
β β βββ wtp-porto-01/
β β β βββ plant.yaml # Sample Plant configuration
β β βββ wtp-regional-02/
β β βββ plant.yaml # Another sample plant configuration
β βββ templates/ # Centralized templates
β β βββ modules.yaml # Module templates (58 types)
β β βββ parameters.yaml # Parameter specifications (49+ types)
β βββ schemas/ # JSON schema validation
β βββ site_config_schema.json
β βββ module_templates_schema.json
β βββ parameter_specifications_schema.json
βββ core/ # Core digital twin components
β βββ digital_twin.py # Central digital twin management
β βββ plant_builder.py # Factory for plant configurations
β βββ plant_elements.py # Core data structures (sensors, actuators)
β βββ sensor_catalog.py # Centralized parameter specifications
β βββ protocol_mapper.py # Protocol address mapping
β βββ config_validator.py # Configuration validation system
βββ simulation/ # Physics-based simulation engine
β βββ simulator.py # Simulation orchestration
β βββ components.py # Simulated plant component wrappers
β βββ process_models.py # Physical process simulation models
βββ gateway/ # Edge gateway for real PLCs
β βββ edge_gateway.py # Production data collection gateway
β βββ plc_readers.py # Async PLC communication handlers
β βββ data_mapper.py # Data transformation utilities
βββ protocols/ # Industrial protocol handlers
β βββ modbus_handler.py # Unified async Modbus implementation
β βββ mqtt_handler.py # Centralized MQTT client for all components
β βββ protocol_registry.py # Pluggable protocol system
β βββ __init__.py # Protocol package initialization
βββ main.py # Main unified system entry point
βββ README.md # System documentation
- Python 3.13+
- Docker & Docker Compose (for containerized deployment)
- Industrial network access (for production mode)
- MQTT broker (Mosquitto recommended)
# Clone the repository
git clone https://github.com/vitorbabo/hydros.git
cd hydros
# Docker compose (Recommended)
docker compose up -d --build- Purpose: Development, testing, demonstrations
- Data Source: Physics-based simulation models
- Protocols: Serves data via Modbus TCP (port 5020)
- Use Case: When no real PLCs are available
- Purpose: Production data collection
- Data Source: Real PLCs (Modbus, OPC UA, S7)
- Protocols: Collects from PLCs, serves via standardized protocols
- Use Case: Industrial deployment
- Individual site configurations with centralized templates
- Dynamic site selection:
--site-id wtp-porto-01or--site-id wtp-regional-02 - Scalable to unlimited sites without code changes
- Site-specific protocol client configurations
- Automatic protocol address allocation per site
- Eliminates hardcoded addresses completely
- Supports multiple protocols per site (Modbus, OPC UA, S7)
- Scalable for any plant size (9-20+ modules tested)
- Centralized sensor catalog with 49+ parameter types
- DigitalTwin class manages all plant components and states
- PlantComponent instances for physical equipment (pumps, filters, tanks)
- PlantParameter definitions with protocol addressing
- ComponentRole classification (sensor, actuator, status)
- Real-time parameter synchronization and lifecycle management
- Consistent
site.component.parameterformat throughout system - DigitalTwin uses:
wtp-porto-01.raw_intake.level - MQTT topics use:
wtp/wtp-porto-01/raw_intake/level/observation - No parameter ID conversion needed between components
- Unified Handler: Single MQTT client serves all system components
- Message Types: Observations, configurations, status, and future control commands
- Topic Structure:
- Sensor data:
wtp/{site_id}/{asset_id}/{measurement}/observation - Site configurations:
wtp/{site_id}/configuration/{type} - Global templates:
wtp/global/configuration/{templates|parameters} - System status:
wtp/{site_id}/status/{type}
- Sensor data:
- Real-time Publishing: Async message queuing with automatic reconnection
- Comprehensive Statistics: Connection health, message counts, and performance metrics
- Modbus TCP: Full async client/server implementation β
- MQTT: Centralized handler for all system components β
- Single connection for observations and configuration publishing
- Async message queuing with automatic reconnection
- Protocol registry integration with comprehensive statistics
- OPC UA: Protocol mapper support ready π§
- Siemens S7: Protocol mapper support ready π§
- Pluggable protocol registry system with shared connection management
- SimulatedPlantComponent wrappers for realistic behavior
- Hydraulic and water quality process models
- SensorType-specific noise and drift simulation
- Configurable process parameters with ComponentRole classification
The Hydros system is built around intuitive, domain-specific components:
- DigitalTwin: Central orchestrator managing all plant components and real-time data
- PlantComponent: Physical equipment instances (pumps, filters, tanks, sensors)
- PlantParameter: Unified parameter definitions with protocol addressing
- ComponentInfo: Component metadata and operational information
- PlantBuilder: Factory for constructing plant configurations from templates
- SensorCatalog: Centralized library of 49+ parameter specifications
- ProtocolMapper: Dynamic address allocation for Modbus, OPC UA, S7 protocols
- ConfigValidator: JSON schema validation for all configuration files
- MQTTHandler: Unified MQTT client for real-time data and configuration publishing
- ComponentRole: Parameter classification (SENSOR, ACTUATOR, STATUS)
- SensorType: Water treatment measurements (turbidity, pH, flow_rate, etc.)
- ProtocolDataType: Industrial protocol data types (BOOL, REAL, INT, etc.)
- OperationalState: Component states (active, inactive, fault, maintenance)
This architecture provides a clear, maintainable structure that reflects real water treatment plant concepts while enabling advanced digital twin capabilities.
site_info:
site_id: wtp-porto-01
name: "Porto Municipal WTP"
design_capacity: 50000 # m3/day
location:
region: "Porto"
coordinates: [41.1579, -8.6291]
# Modules reference centralized templates
modules:
- raw_intake
- intake_pump_1
- coagulation_tank
- clarifier_1
- filter_bed_1
- finished_water_tank
# Site-specific protocol client definitions
protocol_clients:
- client_id: main_plc
protocol: modbus_tcp
connection:
host: "${PLC_HOST:192.168.1.100}"
port: "${PLC_PORT:502}"
unit_id: 1
modules_assigned:
- raw_intake
- intake_pump_1
- coagulation_tankmodule_templates:
raw_intake:
type: "intake"
description: "Raw water intake with quality monitoring"
required_sensors:
- level
- flow_rate
- turbidity
- ph
- temperature
optional_sensors:
- dissolved_oxygen
- chlorophyll_aProtocol addresses are automatically generated by ProtocolMapper for each site:
mappings/modbus.json- Modbus TCP mappings (68 parameters)mappings/opcua.json- OPC UA node mappingsmappings/s7.json- Siemens S7 address mappingsedge_gateway_config.yaml- Gateway configuration with protocol clients
- Multi-site support: Porto (68 parameters), Regional (106+ parameters)
- Real-time simulation at 1-2 second intervals with DigitalTwin
- Multiple protocol clients per site (up to 5 tested)
- ~100 concurrent connections per protocol server
- Centralized templates: 58 module types, 49+ parameter specifications
- Configuration validation: JSON schema validation for all configs
- Memory: ~50MB base + ~1MB per 100 parameters
- CPU: <5% on modern hardware for typical plants
- Network: Optimized for industrial network constraints
-
Create New Site Configuration
# Create site directory mkdir backend/config/sites/wtp-new-site # Copy template and customize cp backend/config/sites/wtp-porto-01/plant.yaml backend/config/sites/wtp-new-site/ vim backend/config/sites/wtp-new-site/plant.yaml
-
Generate Protocol Mappings
cd backend # Generate for specific site using protocol mapper python -m core.protocol_mapper wtp-new-site # Or use the main system with --init flag python main.py --site-id wtp-new-site --init
-
Test in Simulation
cd backend python main.py --mode simulation --site-id wtp-new-site -
Configure Protocol Clients
# Protocol clients are configured per site in plant.yaml vim backend/config/sites/wtp-new-site/plant.yaml
from pymodbus.client import ModbusTcpClient
# Connect to Hydros simulation
client = ModbusTcpClient('localhost', port=5020)
client.connect()
# Read parameters (addresses auto-generated)
result = client.read_holding_registers(3000, 10) # Raw intake levels
result = client.read_input_registers(3010, 5) # Flow measurements
client.close()The system serves standard Modbus TCP, making it compatible with:
- Wonderware InTouch
- Siemens WinCC
- Schneider Electric Citect
- GE iFIX
- Any Modbus-compatible SCADA
Unified MQTT handler publishes multiple data types with standardized format:
Sensor Observations:
# Subscribe to all sensor observations
mosquitto_sub -h localhost -t "wtp/+/+/+/observation"
# Subscribe to specific asset
mosquitto_sub -h localhost -t "wtp/wtp-porto-01/raw_intake/+/observation"
# Subscribe to specific measurement across all assets
mosquitto_sub -h localhost -t "wtp/+/+/level/observation"Configuration Updates:
# Subscribe to all configuration updates
mosquitto_sub -h localhost -t "wtp/+/configuration/+"
# Subscribe to global templates and parameters
mosquitto_sub -h localhost -t "wtp/global/configuration/+"
# Subscribe to system status
mosquitto_sub -h localhost -t "wtp/+/status/+"Example Sensor Observation:
{
"site_id": "wtp-porto-01",
"asset_id": "raw_intake",
"sensor_id": "level-raw_intake",
"measurement": "level",
"ts": "2025-08-13T13:43:12.739650Z",
"value": 1000.0,
"unit": "m",
"quality": "good",
"raw_tag": "33001",
"source": "modbus_tcp_plc",
"seq": 46,
"parameter_type": "sensor",
"component_type": "sensor"
}Example Configuration Message:
{
"site_id": "wtp-porto-01",
"config_type": "plant",
"version": "1.0",
"timestamp": "2025-08-15T13:28:04.217000+00:00",
"data": {
"site_info": {
"site_id": "wtp-porto-01",
"name": "Porto Municipal WTP",
"design_capacity": 50000
},
"modules": ["raw_intake", "intake_pump_1", "..."],
"protocol_clients": [...],
"control_strategies": [...]
},
"source": "hydros-backend",
"seq": 3
}# View system logs
tail -f backend/hydros.log
# Filter by component
grep "SimulationEngine" backend/hydros.log
# Monitor protocol activity
grep "ModbusHandler\|MQTTHandler" backend/hydros.log
# Watch MQTT publishing and connections
grep "MQTTHandler\|mqtt_publishes\|MQTT.*connected" backend/hydros.log
# Monitor configuration publishing
grep "ConfigurationPublisher" backend/hydros.log# Test with mock PLC client
cd backend/core
python test_modbus_readback.py
# Performance testing
python benchmark_simulation.py
# MQTT sensor data verification
mosquitto_sub -h localhost -t "wtp/+/+/+/observation" -C 10
# MQTT configuration verification
mosquitto_sub -h localhost -t "wtp/+/configuration/+" -C 5
# MQTT system status verification
mosquitto_sub -h localhost -t "wtp/+/status/+" -C 3# Start the complete stack (Hydros + MQTT + InfluxDB + Dashboard)
docker compose up -d --build
# View logs
docker compose logs -f hydros-system
# Monitor MQTT data
docker compose logs -f | grep mqtt# Copy environment template
cp .env.example .env
# Edit configuration
vim .envServices:
- Hydros System: Unified simulation/gateway on
localhost:5020(Modbus TCP) - Mosquitto: MQTT
localhost:1883, WebSocketws://localhost:9001 - InfluxDB: http://localhost:8086 (org: hydros, bucket: wtp)
- React UI: http://localhost:5173
Environment Variables:
HYDROS_MODE:simulation,normal(default:normal)LOG_LEVEL:DEBUG,INFO,WARNING,ERROR(default:INFO)SITE_ID: Plant site identifier (default:wtp-porto-01)PLC_HOST: PLC IP address for gateway mode (default:localhost)- Site-specific variables:
MAIN_PLC_HOST,CHEM_PLC_HOST, etc.
# 1. Add to backend/config/templates/modules.yaml
module_templates:
new_tank_type:
type: "storage"
description: "New tank component"
required_sensors:
- level
- volume
- temperature
actuators:
- inlet_valve
- outlet_valve
# 2. Add parameters to backend/config/templates/parameters.yaml
parameter_specifications:
volume:
measurement_type: "volume"
unit: "mΒ³"
data_type: "REAL"
ranges:
normal: [0.0, 1000.0]
# 3. Reference in any site configuration
modules:
- new_tank_type# 1. Implement BaseProtocolHandler interface
class S7Handler(BaseProtocolHandler):
def connect(self): pass
def read_parameter(self, param_id): pass
def write_parameter(self, param_id, value): pass
# 2. Register with ProtocolRegistry
registry.register_protocol(ProtocolType.S7, S7Handler, [...])
# 3. Add address allocation supportSite configuration not found
# Check site directory structure
ls -la backend/config/sites/
ls -la backend/config/sites/wtp-porto-01/
# Verify templates exist
ls -la backend/config/templates/Port conflicts
# Check port availability
netstat -ln | grep 5020PLC connection failures
# Test network connectivity
ping 192.168.1.100
telnet 192.168.1.100 502Configuration validation failures
# Run configuration validation manually
python -c "from core.config_validator import ConfigValidator; ConfigValidator().validate_all_configurations()"
# Check schema validation
python main.py --init --log-level DEBUGMissing dependencies
# Install requirements
pip install -r requirements.txtcd backend
# Run in debug mode
python main.py --mode simulation --log-level DEBUG
# Check specific component
grep "DigitalTwin" backend/hydros.log
# Monitor MQTT publishing
mosquitto_sub -h localhost -t "wtp/+/+/+/observation"- Core Architecture - Detailed technical documentation
- Configuration Guide - Plant and protocol configuration
- Operation Modes - Simulation, Gateway, and Hybrid modes
- MQTT Integration - Real-time data streaming guide