Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/mesido/esdl/esdl_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ def __init__(self, *args, **kwargs) -> None:
DBAccessType.READ: [],
DBAccessType.WRITE: [],
}
self._workers_db_profile_reading = kwargs.get(
"workers_db_profile_reading", min(4, os.cpu_count())
)

profile_reader_class = kwargs.get("profile_reader", InfluxDBProfileReader)
input_file_name = kwargs.get("input_timeseries_file", None)
Expand Down Expand Up @@ -189,6 +192,7 @@ def __init__(self, *args, **kwargs) -> None:
file_path=input_file_path,
database_credentials=read_only_dbase_credentials,
use_esdl_ranged_contraint=self._ESDLMixin__use_esdl_ranged_constraint,
workers_db_profile_reading=self._workers_db_profile_reading,
)
else: # read from a file, no database credentials needed
self.__profile_reader: BaseProfileReader = profile_reader_class(
Expand Down Expand Up @@ -819,6 +823,9 @@ def read(self) -> None:
esdl_carriers = self.esdl_carriers
self.hot_cold_pipe_relations()
io = self.io

import time
start_time = time.time()
self.__profile_reader.read_profiles(
energy_system_components=energy_system_components,
io=io,
Expand All @@ -828,6 +835,8 @@ def read(self) -> None:
ensemble_size=ensemble_size,
ensemble=self.__ensemble,
)
print("Execution time: " + time.strftime("%M:%S", time.gmtime(time.time() - start_time)))
exit("kobus arrrrrrrrrrrrrrrrrrrrrr")

def write(self) -> None:
"""
Expand Down
152 changes: 114 additions & 38 deletions src/mesido/esdl/profile_parser.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import time
import datetime
import logging
import sys
import threading
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Dict, Optional, Set, Tuple

Expand Down Expand Up @@ -263,6 +266,7 @@ def __init__(
self,
energy_system: esdl.EnergySystem,
file_path: Optional[Path],
workers_db_profile_reading: Optional[int] = None,
use_esdl_ranged_contraint: bool = False,
database_credentials: Optional[Dict[str, Tuple[str, str]]] = None,
):
Expand All @@ -275,6 +279,9 @@ def __init__(
self._database_credentials = (
database_credentials if database_credentials is not None else {"": ("", "")}
)
self._database_profilemanager = list()
self._lock = threading.Lock()
self._workers_db_profile_parsing = workers_db_profile_reading

def _load_profiles_from_source(
self,
Expand Down Expand Up @@ -319,25 +326,40 @@ def _load_profiles_from_source(
)
unique_profiles.append(profile)

unique_series.append(
self._load_profile_timeseries_from_database(profile=unique_profiles[-1])
)
self._check_profile_time_series(
profile_time_series=unique_series[-1], profile=unique_profiles[-1]
)
if self._reference_datetimes is None:
# TODO: since the previous function ensures it's a date time index, I'm not sure
# how to get rid of this type checking warning
self._reference_datetimes = unique_series[-1].index
else:
if not all(unique_series[-1].index == self._reference_datetimes):
raise RuntimeError(
f"Obtained a profile for asset {profile.field} with a "
f"timeseries index that doesn't match the timeseries of "
f"other assets. Please ensure that the profile that is "
f"specified to be loaded for each asset covers exactly the "
f"same timeseries. "
)
# unique_profiles_attributes.append(
# [
# profile.database,
# profile.field,
# profile.host,
# profile.startDate,
# profile.endDate,
# profile.measurement,
# profile.port,
# ]
# )
# unique_profiles.append(profile)

# # Open parallel processes to load all unique profiles parallely
Comment thread
KobusVanRooyen marked this conversation as resolved.
with ThreadPoolExecutor(max_workers=self._workers_db_profile_parsing) as executor:
print(f"fffffffffffff {self._workers_db_profile_parsing}")
unique_series = list(
executor.map(self._load_profile_timeseries_from_database, unique_profiles)
)
executor.map(self._check_profile_time_series, unique_series, unique_profiles)

if self._reference_datetimes is None:
# TODO: since the previous function ensures it's a date time index, I'm not sure
# how to get rid of this type checking warning
self._reference_datetimes = unique_series[-1].index
else:
if not all(unique_series[-1].index == self._reference_datetimes):
raise RuntimeError(
f"Obtained a profile for asset {profile.field} with a "
f"timeseries index that doesn't match the timeseries of "
f"other assets. Please ensure that the profile that is "
f"specified to be loaded for each asset covers exactly the "
f"same timeseries. "
)
# Loop trough all the requried profiles in the energy system and assign the profile data:
# - series: use the unique series data, without reading from the database again
# - other profile info: get it from the specific profile
Expand Down Expand Up @@ -451,7 +473,7 @@ def _load_profile_timeseries_from_database(self, profile: esdl.InfluxDBProfile)
username, password = self._database_credentials.get(influx_host, (None, None))

conn_settings = ConnectionSettings(
host=profile.host,
host=profile_host,
port=profile.port,
username=username,
password=password,
Expand All @@ -460,25 +482,79 @@ def _load_profile_timeseries_from_database(self, profile: esdl.InfluxDBProfile)
verify_ssl=ssl_setting,
)

try:
time_series_data = InfluxDBProfileManager(conn_settings)
except Exception:
container = profile.eContainer()
asset = container.energyasset
get_potential_errors().add_potential_issue(
MesidoAssetIssueType.ASSET_PROFILE_AVAILABILITY,
asset.id,
f"Asset named {asset.name}: Database {profile.database}"
f" is not available in the host.",
)
potential_error_to_error(NetworkErrors.HEAT_NETWORK_ERRORS)

time_series_data.load_influxdb(
profile.measurement,
[profile.field],
profile.startDate,
profile.endDate,
# Check if an object of the InfluxDBProfileManager is already present in a list. If so,
# re-use that object that was already created and been stored in the list.
# First check if a connection is found. If yes, query the profile directly.
time_series_data = next(
Comment thread
KobusVanRooyen marked this conversation as resolved.
filter(lambda x: x.database_settings == conn_settings, self._database_profilemanager),
None,
)
# If connection not found, create it.
if not time_series_data:
# Acquire a lock to ensure that only one thread creates a connection for the same
# database settings, and other threads wait until the connection is created and stored
# in the list. This is to avoid multiple connections being created for the same
# database settings when multiple profiles share the same database settings and are
# loaded in parallel.
t0 = threading.current_thread().name
print(f"thread [{t0}] starting now and {time_series_data} before lock and profile.field {profile.field}", flush=True)
with self._lock:
Comment thread
KobusVanRooyen marked this conversation as resolved.
Comment thread
KobusVanRooyen marked this conversation as resolved.
time_series_data = next(
filter(
lambda x: x.database_settings == conn_settings,
self._database_profilemanager,
),
None,
)

print(
"After LOOKUP:",
"len=", len(self._database_profilemanager),
"eqs=", [
pm.database_settings == conn_settings
for pm in self._database_profilemanager
],
"time_series_data=", time_series_data,
flush=True,
)

if not time_series_data:
try:
time_series_data = InfluxDBProfileManager(conn_settings)
time_series_data.load_influxdb(
profile.measurement,
[profile.field],
profile.startDate,
profile.endDate,
)
# Storing the InfluxDBProfileManager object in the list
self._database_profilemanager.append(time_series_data)
t1 = threading.current_thread().name
print(f"no connection thread [{t1}] appending now for {profile.field}", flush=True)
time.sleep(10)
except Exception:
container = profile.eContainer()
asset = container.energyasset
get_potential_errors().add_potential_issue(
MesidoAssetIssueType.ASSET_PROFILE_AVAILABILITY,
asset.id,
f"Asset named {asset.name}: Database {profile.database}"
f" is not available in the host.",
)
potential_error_to_error(NetworkErrors.HEAT_NETWORK_ERRORS)
else:
time_series_data.load_influxdb(
profile.measurement,
[profile.field],
profile.startDate,
profile.endDate,
)
t2 = threading.current_thread().name
print(f"connection exists thread [{t2}] now adding {profile.field}", flush=True)
time.sleep(60)

t10 = threading.current_thread().name
print(f"thread [{t10}] going on so long and {time_series_data}", flush=True)

if not time_series_data.profile_data_list: # if time_series_data.profile_data_list == []:
container = profile.eContainer()
Expand Down