diff --git a/CHANGELOG.md b/CHANGELOG.md index 2a51720..3c3231a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added support for running an ensemble. Default output is the ensemble median, but --full_ensemble can be specified [!17](https://github.com/dmidk/sunflow/pull/17), @KristianHMoller - Added a check for the number of ensemble members, as the code currently supports only one [!13](https://github.com/dmidk/sunflow/pull/13), @KristianHMoller - Subsetting to bounding box is now also done in the `s3` and `files` code paths [!11](https://github.com/dmidk/sunflow/pull/11), @JoachimKoenigslieb - Subsetting to bounding box now correctly handles both ascending and descending lat/lon coordinates [!11](https://github.com/dmidk/sunflow/pull/11), @JoachimKoenigslieb diff --git a/sunflow/config.py b/sunflow/config.py index 6cb2e7f..bfa8a89 100644 --- a/sunflow/config.py +++ b/sunflow/config.py @@ -45,6 +45,8 @@ class NowcastConfig: nowcast_directory: str ens_members: int + alpha: float + beta: float past_steps: int future_steps: int input_data_availability_delay_minutes: int @@ -61,6 +63,8 @@ def from_env(cls) -> Self: - NOWCAST_DIRECTORY (default: .) - ENS_MEMBERS (default: 1) + - ALPHA (default: 0.0 for ENS_MEMBERS=1, 9.29 for ENS_MEMBERS>1) + - BETA (default: 0.0 for ENS_MEMBERS=1, 0.17 for ENS_MEMBERS>1) - PAST_STEPS (default: 4) - FUTURE_STEPS (default: 24) - INPUT_DATA_AVAILABILITY_DELAY_MINUTES (default: 24) @@ -69,9 +73,21 @@ def from_env(cls) -> Self: - SATELLITE_DATA_DIRECTORY (default: .) - MAX_CLEARSKY_FALLBACK_DAYS (default: 3) """ + + ens_members = int(os.getenv("ENS_MEMBERS", "1")) + # Reference for default noise values: + # A. Carpentieri, D. Folini, D. Nerini, S. Pulkkinen, M. Wild, A. Meyer, + # "Intraday probabilistic forecasts of surface solar radiation with cloud + # scale-dependent autoregressive advection," + # Applied Energy, Volume 351, 2023 + default_alpha = 0.0 if ens_members == 1 else 9.29 + default_beta = 0.0 if ens_members == 1 else 0.17 + return cls( nowcast_directory=os.getenv("NOWCAST_DIRECTORY", "."), - ens_members=int(os.getenv("ENS_MEMBERS", "1")), + ens_members=ens_members, + alpha=float(os.getenv("ALPHA", str(default_alpha))), + beta=float(os.getenv("BETA", str(default_beta))), past_steps=int(os.getenv("PAST_STEPS", "4")), future_steps=int(os.getenv("FUTURE_STEPS", "24")), input_data_availability_delay_minutes=int( diff --git a/sunflow/data_io.py b/sunflow/data_io.py index 70973d3..4c65aaa 100644 --- a/sunflow/data_io.py +++ b/sunflow/data_io.py @@ -443,6 +443,7 @@ def save_forecast( dataset_name: str, nowcast_config: NowcastConfig, model_version: str, + output_mode: str, run_mode: str = "files", s3_config: S3Config | None = None, ) -> str: @@ -453,16 +454,18 @@ def save_forecast( numeric values (float64, minutes since the forecast reference time). Args: - forecast: Forecast array, shape [time, lat, lon] or - [ensemble, time, lat, lon]. + forecast: Forecast array, shape [ensemble, time, lat, lon]. time_step: Forecast reference time (start of the forecast window). n_steps: Number of forecast time steps to write. latitudes: 1-D array of latitude values (degrees). longitudes: 1-D array of longitude values (degrees). dataset_name: Name of the source dataset (options: KNMI, DWD). - nowcast_config: NowcastConfig object supplying output directory, - ensemble size, and input data frequency. + nowcast_config: NowcastConfig object supplying output directory + and input data frequency. model_version: Model version string written as a global attribute. + output_mode: Output aggregation mode label written to global + NetCDF attrs (expected: 'deterministic', 'median', + or 'full_ensemble'). run_mode: One of 'files' (local) or 's3'. Defaults to 'files'. s3_config: S3Config object; required when run_mode is 's3'. @@ -470,12 +473,12 @@ def save_forecast( Filename (basename only) of the written NetCDF file. """ input_data_frequency_minutes = nowcast_config.input_data_frequency_minutes - ens_members = nowcast_config.ens_members filename = f"SolarNowcast_{time_step.strftime('%Y%m%d%H%M')}.nc" - # Add ensemble dimension if needed (forecast should be [ensemble, time, lat, lon]) - if forecast.ndim == 3: - forecast = forecast[np.newaxis, :, :, :] # Now [1, time, lat, lon] + if forecast.ndim != 4: + raise ValueError("forecast must have shape (ensemble, time, lat, lon).") + + ens_members = forecast.shape[0] # Build time coordinate (CF-convention: minutes since forecast reference time) @@ -525,6 +528,7 @@ def save_forecast( f"Simple Probabilistic Advection solar forecast " f"using {dataset_name} data" ), + "output_mode": output_mode, "history": ( f"Created " f"{datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')} UTC" diff --git a/sunflow/forecast.py b/sunflow/forecast.py index 3abe325..439b394 100644 --- a/sunflow/forecast.py +++ b/sunflow/forecast.py @@ -61,16 +61,19 @@ def preprocess_data( ) -def simple_advection_forecast( - ratio_data: np.ndarray, motion_field: np.ndarray, n_steps: int, ens_members: int +def probabilistic_advection_forecast( + ratio_data: np.ndarray, + motion_field: np.ndarray, + n_steps: int, + ens_members: int, + alpha: float, + beta: float, ) -> np.ndarray: - """Run a deterministic advection forecast on solar irradiance ratios. + """Run a probabilistic advection forecast on solar irradiance ratios. - Uses ProbabilisticAdvection with noise parameters alpha=0 and beta=0, - which disables Gaussian noise on the motion field norm and - von Mises noise on the direction, yielding a purely deterministic - advection result. The ensemble dimension added by the model is removed - before returning. + Uses ProbabilisticAdvection with configurable noise parameters: + alpha controls Gaussian noise on motion field norm and beta controls + von Mises noise on motion field direction. Args: ratio_data: Input array of shape (time, lat, lon) containing @@ -78,25 +81,24 @@ def simple_advection_forecast( motion_field: Optical flow field of shape (2, lat, lon) as produced by dense_lucaskanade. n_steps: Number of forecast timesteps to produce. + ens_members: Number of ensemble members. + alpha: Gaussian noise strength on motion field norm. + beta: von Mises noise strength on motion field angle. Returns: Forecast array of shape (n_steps, lat, lon). """ - # Initialize ProbabilisticAdvection with NO noise (alpha=0, beta=0) + # Initialize ProbabilisticAdvection with configured noise settings. pa = ProbabilisticAdvection( - alpha=0.0, # No Gaussian noise on motion field norm - beta=0.0, # No von Mises noise on motion field angle + alpha=alpha, + beta=beta, return_motion_field=False, ens_members=ens_members, ) # Run probabilistic advection using the correct method name forecast = pa.maps_forecast(n_steps, ratio_data, motion_field) - # Remove ensemble dimension if present (squeeze to get shape: [time, lat, lon]) - if forecast.ndim == 4: # [ensemble, time, lat, lon] - forecast = forecast[0] # Take first (and only) ensemble member - return forecast @@ -115,7 +117,7 @@ def multiply_clearsky( Args: ratio_forecast: Forecast array of shape (n_steps, lat, lon) - containing SDS/SDS_CS ratios. + or (ensemble, n_steps, lat, lon) containing SDS/SDS_CS ratios. clearsky_data: xarray Dataset with a 'time' dimension containing the clearsky variable for each forecast step. previous_day_time_steps: List of datetimes (one per forecast step) @@ -124,26 +126,94 @@ def multiply_clearsky( NetCDF variable name in the datasets. Returns: - Solar irradiance forecast array of shape (n_steps, lat, lon) - in W m⁻². + Solar irradiance forecast array with the same shape as + ratio_forecast, in W m⁻². Raises: RuntimeError: If clearsky data is missing for any forecast timestep. """ - solar_forecast = np.zeros_like(ratio_forecast) + clearsky_steps: list[np.ndarray] = [] - for i, time_step in enumerate(previous_day_time_steps): + for time_step in previous_day_time_steps: try: sds_cs = clearsky_data.sel(time=time_step.replace(tzinfo=None))[ nc_variable_names["sds_cs"] ].values - - # Multiply ratio by clearsky - solar_forecast[i] = ratio_forecast[i] * sds_cs + clearsky_steps.append(sds_cs) except KeyError: raise RuntimeError( f"No clearsky data for {time_step.strftime('%Y-%m-%dT%H:%M:%SZ')}, " "cannot compute solar forecast for this step." ) + clearsky_stack = np.stack(clearsky_steps, axis=0) + + if ratio_forecast.ndim == 3: + if ratio_forecast.shape[0] != clearsky_stack.shape[0]: + raise ValueError( + "ratio_forecast time dimension does not match clearsky timesteps " + f"({ratio_forecast.shape[0]} != {clearsky_stack.shape[0]})." + ) + solar_forecast = ratio_forecast * clearsky_stack + return solar_forecast[ + np.newaxis, :, :, : + ] # Add ensemble dimension for consistency + + if ratio_forecast.ndim == 4: + if ratio_forecast.shape[1] != clearsky_stack.shape[0]: + raise ValueError( + "ratio_forecast time dimension does not match clearsky timesteps " + f"({ratio_forecast.shape[1]} != {clearsky_stack.shape[0]})." + ) + return ratio_forecast * clearsky_stack[np.newaxis, :, :, :] + + raise ValueError( + "ratio_forecast must have shape (time, lat, lon) or " + "(ensemble, time, lat, lon)." + ) + + +def prepend_t0( + clearsky_data: xr.Dataset, + ratio_data: np.ndarray, + solar_forecast: np.ndarray, + config: dict, + clearsky_t0_time: datetime, +) -> np.ndarray: + """Prepend analysis timestep (t=0) to an ensemble solar forecast. + + Computes the t=0 solar field as the latest observed ratio + (ratio_data[-1]) multiplied by clearsky irradiance at clearsky_t0_time, + then prepends that field to all ensemble members in solar_forecast. + + Args: + clearsky_data: Dataset containing clearsky irradiance values on + a time axis. + ratio_data: Ratio history array with shape (time, lat, lon). + solar_forecast: Forecast array with shape + (ensemble, forecast_time, lat, lon). + config: Runtime configuration dict containing + config["nc_variable_names"]["sds_cs"]. + clearsky_t0_time: Timestamp for the analysis clearsky field, + typically one day before the nowcast time. + + Returns: + Array of shape (ensemble, forecast_time + 1, lat, lon) + with the analysis field inserted at index 0 along the time axis. + """ + # Prepend timestep 0: current observation (ratio_data[-1]) × clearsky at t=0 + sds_cs_t0 = clearsky_data.sel(time=clearsky_t0_time.replace(tzinfo=None))[ + config["nc_variable_names"]["sds_cs"] + ].values + solar_t0 = ratio_data[-1] * sds_cs_t0 + + # Broadcast the same t=0 clearsky-based analysis field to all ensembles. + solar_t0_ens = np.broadcast_to( + solar_t0, + (solar_forecast.shape[0],) + solar_t0.shape, + ) + solar_forecast = np.concatenate( + [solar_t0_ens[:, np.newaxis, :, :], solar_forecast], + axis=1, + ) return solar_forecast diff --git a/sunflow/main.py b/sunflow/main.py index 6034bab..1043ca0 100644 --- a/sunflow/main.py +++ b/sunflow/main.py @@ -22,7 +22,12 @@ save_forecast, ) from .downloaders import download_past_data -from .forecast import multiply_clearsky, preprocess_data, simple_advection_forecast +from .forecast import ( + multiply_clearsky, + prepend_t0, + preprocess_data, + probabilistic_advection_forecast, +) from .geospatial import check_solar_elevation, get_bbox from .time_handler import generate_time_steps, round_time from .validation import ( @@ -31,7 +36,6 @@ validate_clearsky_shapes, validate_config, validate_data_shape, - validate_nowcast_config, validate_run_mode, verify_environment_variables, ) @@ -133,6 +137,14 @@ def parse_datetime_with_timezone(datetime_str: str) -> datetime: help="End of time span in ISO8601 format (inclusive). Use with --start-time.", default=None, ) + parser.add_argument( + "--full_ensemble", + action="store_true", + help=( + "Save full ensemble output. By default, the pixel-wise median across " + "ensemble members is saved." + ), + ) args = parser.parse_args() @@ -172,6 +184,7 @@ def run_nowcast( bbox_choice: str, nowcast_config: NowcastConfig, s3_config: S3Config, + full_ensemble: bool = False, custom_time: bool = True, ) -> RunResult: """Run a single nowcast for the given (already-rounded) time step. @@ -185,6 +198,8 @@ def run_nowcast( bbox_choice: Bounding box identifier. nowcast_config: NowcastConfig object. s3_config: S3Config object. + full_ensemble: If True, save all ensemble members. If False, + save pixel-wise median over ensemble members. custom_time: If True, skip the retry wait loop on missing data. Returns: @@ -274,12 +289,14 @@ def run_nowcast( # Compute motion field motion_field = dense_lucaskanade(ratio_data) - # Simple forecast (ratio forecast) - ratio_forecast = simple_advection_forecast( + # Probabilistic advection forecast (ratio forecast) + ratio_forecast = probabilistic_advection_forecast( ratio_data, motion_field, nowcast_config.future_steps, ens_members=nowcast_config.ens_members, + alpha=nowcast_config.alpha, + beta=nowcast_config.beta, ) # Generate previous day time steps for clearsky lookup @@ -331,16 +348,33 @@ def run_nowcast( config["nc_variable_names"], ) - # Prepend timestep 0: current observation (ratio_data[-1]) × clearsky at t=0 - sds_cs_t0 = clearsky_data.sel(time=clearsky_t0_time.replace(tzinfo=None))[ - config["nc_variable_names"]["sds_cs"] - ].values - solar_t0 = ratio_data[-1] * sds_cs_t0 - solar_forecast = np.concatenate([solar_t0[np.newaxis, :, :], solar_forecast], axis=0) + solar_forecast = prepend_t0( + clearsky_data, ratio_data, solar_forecast, config, clearsky_t0_time + ) + + if full_ensemble: + output_forecast = solar_forecast + output_mode = "full_ensemble" + logger.info("Saving full ensemble forecast") + else: + if solar_forecast.shape[0] == 1: + output_forecast = solar_forecast + output_mode = "deterministic" + logger.info( + "Saving deterministic forecast " + "(single ensemble member, kept as singleton ensemble dimension)" + ) + else: + output_forecast = np.median(solar_forecast, axis=0, keepdims=True) + output_mode = "median" + logger.info( + "Saving pixel-wise median forecast across ensemble members " + "(with singleton ensemble dimension)" + ) # Save forecast (now contains actual solar irradiance, not ratios) filename = save_forecast( - solar_forecast, + output_forecast, time_step, nowcast_config.future_steps + 1, # +1 for the t=0 analysis step latitudes, @@ -348,6 +382,7 @@ def run_nowcast( dataset_name, nowcast_config, model_version, + output_mode, run_mode, s3_config, ) @@ -410,10 +445,30 @@ def cli() -> None: logger.info(f"Running in {run_mode} mode") logger.info(f"Using {dataset_name} dataset") logger.info(f"Using {bbox_choice} bbox: {bbox}") + logger.info(f"Number of ensemble members: {nowcast_config.ens_members}") + if args.full_ensemble: + logger.info("Output mode: full ensemble") + elif nowcast_config.ens_members == 1: + logger.info("Output mode: deterministic") + else: + logger.info("Output mode: median") + logger.info( + f"Using probabilistic advection noise parameters alpha={nowcast_config.alpha}, " + f"beta={nowcast_config.beta}" + ) + + if nowcast_config.ens_members == 1 and ( + nowcast_config.alpha != 0.0 or nowcast_config.beta != 0.0 + ): + logger.warning( + "Running with a single ensemble member, but non-zero probabilistic advection" + "noise parameters alpha and/or beta. This is generally not recommended as it" + "simply adds noise to the nowcast without providing any ensemble spread. " + "Consider setting alpha=0.0 and beta=0.0 for a single-member run." + ) validate_run_mode(run_mode, dataset_name) validate_config(config, dataset_name) - validate_nowcast_config(nowcast_config) verify_environment_variables(run_mode, dataset_name) # Determine the time steps to run @@ -464,6 +519,7 @@ def cli() -> None: bbox_choice, nowcast_config, s3_config, + full_ensemble=args.full_ensemble, custom_time=custom_time, ) results.append(result) diff --git a/sunflow/validation.py b/sunflow/validation.py index 21a1cae..09c9e2c 100644 --- a/sunflow/validation.py +++ b/sunflow/validation.py @@ -9,8 +9,6 @@ import xarray as xr from loguru import logger -from .config import NowcastConfig - class MissingClearskyDataError(RuntimeError): pass @@ -46,27 +44,6 @@ def validate_config(config: dict[str, Any], dataset_name: str) -> None: sys.exit(1) -def validate_nowcast_config(nowcast_config: NowcastConfig) -> None: - """Validate that the options selected for the nowcast config are valid. - - Checks the nowcast config created from imported environment variables. - Exits immediately for invalid choices. - - Args: - nowcast_config: Instance of the NowcastConfig class - loaded from environment variables in config.py. - - Raises: - SystemExit: If any invalid choice is detected. - """ - if nowcast_config.ens_members != 1: - logger.error( - f"Invalid nowcast configuration: Currently, only ens_members=1 is supported. " - f"Current value: {nowcast_config.ens_members}. Exiting.\n" - ) - sys.exit(1) - - def validate_run_mode(run_mode: str, dataset_name: str) -> None: """Validate that the run mode is compatible with the dataset.