Add Python driver prototype for WRF workflow#1
Conversation
There was a problem hiding this comment.
Pull request overview
This pull request introduces a Python-based prototype driver for the WRF (Weather Research and Forecasting) workflow as an alternative to the existing C# pipeline. The new implementation uses JSON configuration files to manage GFS data downloads, namelist updates, physics parameter sweeps, and visualization rendering without database dependencies.
Changes:
- Added
wrfsharp_pypackage with modular components for configuration loading, GFS data downloading, namelist manipulation, physics parameter expansion, and workflow orchestration - Introduced JSON-based configuration system with sample config demonstrating GFS download settings, file paths, regional bounds, and physics parameter sweeps
- Implemented CLI driver with
--prepand--computestages for data preparation and WRF execution phases
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 17 comments.
Show a summary per file
| File | Description |
|---|---|
| wrfsharp_py/init.py | Package initialization with basic docstring |
| wrfsharp_py/config.py | Configuration loader parsing JSON into dataclasses for paths, GFS settings, physics parameters, and runtime options |
| wrfsharp_py/download.py | GFS file discovery and download from NOMADS with cycle selection logic |
| wrfsharp_py/driver.py | Main CLI orchestrator coordinating prep and compute stages with external tool execution |
| wrfsharp_py/namelist.py | WRF/WPS namelist date and physics parameter update via regex substitution |
| wrfsharp_py/physics.py | Physics parameter expansion converting comma-separated values into PhysicsRun combinations |
| wrfsharp_py/process.py | Subprocess wrapper for executing external commands |
| wrfsharp_py/README.md | Documentation explaining prototype purpose, quick start, and implementation notes |
| configs/sample.json | Example configuration demonstrating GFS URLs, paths, and physics parameter sweep for mp_physics |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def download_files(base_url: str, cycle_dir: str, files: Iterable[str], target_dir: Path) -> List[Path]: | ||
| target_dir.mkdir(parents=True, exist_ok=True) | ||
| downloaded: List[Path] = [] | ||
| for filename in files: | ||
| url = f"{base_url.rstrip('/')}/{cycle_dir}{filename}" | ||
| destination = target_dir / filename | ||
| if destination.exists(): | ||
| downloaded.append(destination) | ||
| continue | ||
| with urllib.request.urlopen(url) as response: | ||
| destination.write_bytes(response.read()) | ||
| downloaded.append(destination) | ||
| return downloaded |
There was a problem hiding this comment.
Network operations in download_files and _read_url lack error handling for common failures like connection timeouts, HTTP errors, or network interruptions. This could cause the entire workflow to crash when downloading large GFS datasets. Consider adding retry logic and appropriate error handling for urllib requests.
| return [int(part) for part in parts] | ||
| if isinstance(value, Iterable): | ||
| return [int(part) for part in value] | ||
| raise ValueError(f"Unsupported physics parameter value: {value}") |
There was a problem hiding this comment.
The _expand_value function accepts any Iterable but the isinstance(value, str) check comes before isinstance(value, Iterable). Since strings are also iterable, this ordering is correct. However, the function doesn't handle the case where an Iterable might contain non-integer values, which would cause int() to raise a ValueError. Add error handling or validation for the iterable conversion.
| return [int(part) for part in parts] | |
| if isinstance(value, Iterable): | |
| return [int(part) for part in value] | |
| raise ValueError(f"Unsupported physics parameter value: {value}") | |
| try: | |
| return [int(part) for part in parts] | |
| except ValueError as exc: | |
| raise ValueError( | |
| f"Unsupported physics parameter string value: {value!r}" | |
| ) from exc | |
| if isinstance(value, Iterable): | |
| result: List[int] = [] | |
| for index, part in enumerate(value): | |
| try: | |
| result.append(int(part)) | |
| except ValueError as exc: | |
| raise ValueError( | |
| f"Unsupported physics parameter iterable element at index {index}: " | |
| f"{part!r} in value {value!r}" | |
| ) from exc | |
| return result | |
| raise ValueError(f"Unsupported physics parameter value: {value!r}") |
| region = RegionConfig( | ||
| upper_left_latlon=_require(region_payload, "upper_left_latlon"), | ||
| lower_right_latlon=_require(region_payload, "lower_right_latlon"), |
There was a problem hiding this comment.
The region configuration loads upper_left_latlon and lower_right_latlon as strings but never validates or uses them. If these are intended for future use in configuring WRF domain bounds, they should either be used in the namelist updates or documented as reserved for future implementation.
| for image_dir in sorted(config.paths.output_dir.glob("*.png")): | ||
| output_mp4 = config.paths.output_dir / f"{physics.name}.mp4" | ||
| run_command( | ||
| [ | ||
| str(config.paths.ffmpeg_path), | ||
| "-y", | ||
| "-framerate", | ||
| "10", | ||
| "-pattern_type", | ||
| "glob", | ||
| "-i", | ||
| str(image_dir / "*.png"), | ||
| str(output_mp4), | ||
| ], | ||
| cwd=config.paths.output_dir, | ||
| ) |
There was a problem hiding this comment.
The line reads 'for image_dir in sorted(config.paths.output_dir.glob(".png"))' but then uses 'image_dir / ".png"' on line 75. This would only work if image_dir is a directory, but the glob "*.png" returns PNG file paths, not directories. The variable should be renamed to reflect what it actually contains (PNG files), or the logic should be corrected to iterate over directories.
| for image_dir in sorted(config.paths.output_dir.glob("*.png")): | |
| output_mp4 = config.paths.output_dir / f"{physics.name}.mp4" | |
| run_command( | |
| [ | |
| str(config.paths.ffmpeg_path), | |
| "-y", | |
| "-framerate", | |
| "10", | |
| "-pattern_type", | |
| "glob", | |
| "-i", | |
| str(image_dir / "*.png"), | |
| str(output_mp4), | |
| ], | |
| cwd=config.paths.output_dir, | |
| ) | |
| output_mp4 = config.paths.output_dir / f"{physics.name}.mp4" | |
| run_command( | |
| [ | |
| str(config.paths.ffmpeg_path), | |
| "-y", | |
| "-framerate", | |
| "10", | |
| "-pattern_type", | |
| "glob", | |
| "-i", | |
| "*.png", | |
| str(output_mp4), | |
| ], | |
| cwd=config.paths.output_dir, | |
| ) |
| completed = subprocess.run(list(command), cwd=str(cwd) if cwd else None, check=False) | ||
| if completed.returncode != 0: | ||
| raise RuntimeError(f"Command failed: {' '.join(command)}") |
There was a problem hiding this comment.
The error message construction using ' '.join(command) will fail because command is an Iterable[str] that may have already been consumed by list(command) on line 9. This will result in an empty string in the error message. Store the command list in a variable before passing it to subprocess.run, or use list(command) in the error message as well.
| completed = subprocess.run(list(command), cwd=str(cwd) if cwd else None, check=False) | |
| if completed.returncode != 0: | |
| raise RuntimeError(f"Command failed: {' '.join(command)}") | |
| command_list = list(command) | |
| completed = subprocess.run(command_list, cwd=str(cwd) if cwd else None, check=False) | |
| if completed.returncode != 0: | |
| raise RuntimeError(f"Command failed: {' '.join(command_list)}") |
| for image_dir in sorted(config.paths.output_dir.glob("*.png")): | ||
| output_mp4 = config.paths.output_dir / f"{physics.name}.mp4" | ||
| run_command( | ||
| [ | ||
| str(config.paths.ffmpeg_path), | ||
| "-y", | ||
| "-framerate", | ||
| "10", | ||
| "-pattern_type", | ||
| "glob", | ||
| "-i", | ||
| str(image_dir / "*.png"), | ||
| str(output_mp4), | ||
| ], | ||
| cwd=config.paths.output_dir, | ||
| ) |
There was a problem hiding this comment.
The glob pattern '.png' is used to iterate over image_dir, but the variable name suggests it should be iterating over directories containing PNG files. The code logic appears incorrect: sorted(config.paths.output_dir.glob(".png")) would return Path objects representing PNG files in output_dir, not directories. Then 'image_dir / ".png"' on line 75 would fail. This should likely be iterating over directories with glob("/") or a similar pattern, or the logic needs to be restructured.
| for image_dir in sorted(config.paths.output_dir.glob("*.png")): | |
| output_mp4 = config.paths.output_dir / f"{physics.name}.mp4" | |
| run_command( | |
| [ | |
| str(config.paths.ffmpeg_path), | |
| "-y", | |
| "-framerate", | |
| "10", | |
| "-pattern_type", | |
| "glob", | |
| "-i", | |
| str(image_dir / "*.png"), | |
| str(output_mp4), | |
| ], | |
| cwd=config.paths.output_dir, | |
| ) | |
| output_mp4 = config.paths.output_dir / f"{physics.name}.mp4" | |
| run_command( | |
| [ | |
| str(config.paths.ffmpeg_path), | |
| "-y", | |
| "-framerate", | |
| "10", | |
| "-pattern_type", | |
| "glob", | |
| "-i", | |
| str(config.paths.output_dir / "*.png"), | |
| str(output_mp4), | |
| ], | |
| cwd=config.paths.output_dir, | |
| ) |
| run_command([str(config.paths.geogrid_exe)], cwd=config.paths.wps_dir) | ||
| run_command([str(config.paths.link_grib_script)], cwd=config.paths.wps_dir) | ||
| run_command([str(config.paths.ungrib_exe)], cwd=config.paths.wps_dir) | ||
| run_command([str(config.paths.metgrid_exe)], cwd=config.paths.wps_dir) | ||
|
|
||
|
|
||
| def compute_stage(config_path: Path) -> None: | ||
| config = load_config(config_path) | ||
| physics_runs = expand_all([(entry.name, entry.parameters) for entry in config.physics]) | ||
| runs = physics_runs[: config.run.max_runs] | ||
|
|
||
| for physics in runs: | ||
| update_physics(config.paths.wrf_namelist, physics.parameters) | ||
| run_command([str(config.paths.mpirun_path), str(config.paths.real_exe)], cwd=config.paths.wrf_dir) | ||
| run_command([str(config.paths.mpirun_path), str(config.paths.wrf_exe)], cwd=config.paths.wrf_dir) |
There was a problem hiding this comment.
The subprocess calls lack stdout/stderr capture, making debugging difficult when commands fail. Commands like geogrid.exe, ungrib.exe, real.exe, and wrf.exe typically produce important diagnostic output. Consider capturing or logging output, especially for WRF executables which may fail for various physics or configuration reasons.
| downloaded.append(destination) | ||
| continue | ||
| with urllib.request.urlopen(url) as response: | ||
| destination.write_bytes(response.read()) |
There was a problem hiding this comment.
The code lacks validation that downloaded files are complete and uncorrupted. GFS GRIB files can be partially downloaded or corrupted, which would cause WRF preprocessing to fail later. Consider adding file size validation or checksum verification after download.
| destination.write_bytes(response.read()) | |
| data = response.read() | |
| content_length = response.getheader("Content-Length") | |
| if content_length is not None: | |
| try: | |
| expected_size = int(content_length) | |
| except ValueError: | |
| expected_size = None | |
| else: | |
| if len(data) != expected_size: | |
| raise RuntimeError( | |
| f"Incomplete download for {url}: expected {expected_size} bytes, got {len(data)}" | |
| ) | |
| destination.write_bytes(data) |
| args = parser.parse_args() | ||
|
|
||
| if not args.prep and not args.compute: | ||
| raise SystemExit("Specify --prep and/or --compute") |
There was a problem hiding this comment.
The error message states 'Specify --prep and/or --compute' but uses SystemExit instead of raising an error or calling parser.error(). This bypasses argparse's standard error formatting. Use parser.error() for consistent CLI error reporting.
| raise SystemExit("Specify --prep and/or --compute") | |
| parser.error("Specify --prep and/or --compute") |
| # Placeholder: use file timestamps until wgrib2 integration. | ||
| times = [datetime.fromtimestamp(path.stat().st_mtime) for path in grib_files] | ||
| return min(times), max(times) |
There was a problem hiding this comment.
Using file modification timestamps (st_mtime) as a placeholder for GRIB dates is problematic because file timestamps reflect when files were downloaded or modified, not the actual forecast dates contained in the GRIB data. This could lead to incorrect namelist date configuration. The comment acknowledges this is a placeholder, but relying on this for actual WRF runs would cause workflow failures. Consider either implementing proper wgrib2 parsing before merging, or adding validation to prevent running with placeholder dates.
Motivation
ffmpegrendering flow while making it easier to extend toward web-friendly outputs (e.g., NetCDF/GeoTIFF tile generation).Description
wrfsharp_pypackage with modules:config.py(JSON config loading into dataclasses),download.py(list/download GFS files),namelist.py(update namelist dates and physics),physics.py(expand physics parameter sweeps),process.py(run external commands), anddriver.py(CLI with--prepand--computestages).configs/sample.jsonthat mirrors previous physics settings and runtime/path options, and treatgfs.prefer_latest/require_complete_file_countto pick cycles.PhysicsRuncombinations to limit torun.max_runs.wrfsharp_py/README.md, and use file timestamps as a placeholder for GRIB date extraction untilwgrib2parsing is added.Testing
Codex Task