Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cda-etl/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ RUN pip install --no-cache-dir -r requirements.txt

COPY src/ /app/src/

ENTRYPOINT ["python", "src/cda_etl/main.py"]
CMD ["python", "src/cda_etl/main.py"]
182 changes: 182 additions & 0 deletions cda-etl/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
# CDA ETL

This project downloads CWMS data from a source CDA REST API, stages the retrieved JSON on the local filesystem, and then uploads the staged records to a destination CDA REST API.

The workflow is intentionally split into two phases:

1. Stage data from the source API onto disk.
2. Publish the staged files to the destination API.

When `SOURCE_CDA_URL` is configured, the stage phase always re-downloads source data and overwrites staged files for projects, locations, and timeseries.

If `SOURCE_CDA_URL` is not configured, the pipeline skips the download phase and publishes whatever is already staged on disk.

## What It Does

The ETL process currently handles three CWMS resource types:

- Locations
- Projects
- Timeseries data

The data is organized by office, project, and resource type, then written to a filesystem staging area before being posted to the destination CDA API.

## Configuration Overview

The main runtime configuration is stored in a YAML file, defaulting to `regi.yml` in the working directory.

The application reads the YAML path from the `REGI_CONFIG_PATH` environment variable. If the variable is not set, it looks for `regi.yml` next to where the process starts.

### Example Structure

```yaml
version: 1
settings:
startTime: "2026-01-01"
endTime: "now"
maxThreads: 10
logLevel: INFO
path: "./data"
offices:
- id: SWT
enabled: true
projects:
- id: EUFA
enabled: true
locations:
- id: SWT.EUFA-Dam
enabled: true
timeseries:
- id: SWT.EUFA.Elev.Inst.1Hour.0.Ccp-Rev
enabled: true
```

### YAML Fields

- `version`: Config version. Must be `1`.
- `settings.startTime`: Default start time used for timeseries downloads when a timeseries does not define its own download window.
- `settings.endTime`: Default end time used for timeseries downloads when a timeseries does not define its own download window.
- `settings.maxThreads`: Maximum number of worker threads used for staging and publishing.
- `settings.logLevel`: Logging level for the application.
- `settings.path`: Filesystem root used for staged JSON files.
- `offices`: List of office definitions.
- `projects`: Projects under each office.
- `locations`: Locations under each project.
- `timeseries`: Timeseries under each project.

### Enabled Flags

The `enabled` field is optional everywhere. If it is omitted, the item is treated as enabled.

### Filesystem Staging

Staged data is written under the directory configured by `settings.path`.

For timeseries data, the stored file name does not include the time window. During staging with `SOURCE_CDA_URL` configured, each run overwrites the staged file with a fresh source download.

## Runtime Parameters

### Required for Destination Upload

- `DEST_CDA_URL`: Destination CDA REST API root.

Environment variable values are trimmed. Empty or whitespace-only values are treated as unset.

### Optional Source Download

- `SOURCE_CDA_URL`: Source CDA REST API root. If set, source data is always re-downloaded and staged files are overwritten each run. If omitted (or set to an empty value), the download phase is skipped.
- `SOURCE_CDA_API_KEY`: API key for the source CDA REST API.
- `DEST_CDA_API_KEY`: API key for the destination CDA REST API.

### Other Runtime Settings

- `REGI_CONFIG_PATH`: Path to the YAML config file. Defaults to `regi.yml`.
- `LOG_LEVEL`: Console log level for the application process. Defaults to `INFO`.

## Docker Usage

### docker run

Mount the YAML file into the container and point `REGI_CONFIG_PATH` at it.

```powershell
docker run --rm `
-v ${PWD}\data\regi\regi.yml:/app/regi.yml `
-e REGI_CONFIG_PATH=/app/regi.yml `
-e SOURCE_CDA_URL=https://source.example/cwms-data `
-e SOURCE_CDA_API_KEY=your-source-key `
-e DEST_CDA_URL=https://dest.example/cwms-data `
-e DEST_CDA_API_KEY=your-dest-key `
cwms-data-api/etl
```

If you do not want to download from the source API, omit `SOURCE_CDA_URL` and the pipeline will publish only staged files.

### docker-compose

The included `docker-compose.yml` mounts `regi.yml` into the container and sets `REGI_CONFIG_PATH=/app/regi.yml`.

You still need to supply the API endpoint environment variables when running Compose.

## Gradle Commands

The Gradle build file provides Docker-based convenience tasks.

### Build the image

```bash
./gradlew dockerBuild
```

Optional Gradle properties:

- `-PetlImageName=<image-name>`: Override the Docker image name. Default: `cwms-data-api/etl`
- `-PdockerPull=true`: Add `--pull` to the Docker build

Example:

```bash
./gradlew dockerBuild -PetlImageName=cwms-data-api/etl:dev -PdockerPull=true
```

### Run the ETL container

```bash
./gradlew runEtl
```

Optional Gradle property:

- `-PetlEnvFile=<path>`: Override the environment file passed to `docker run`. Default: `etl.env`

Example:

```bash
./gradlew runEtl -PetlEnvFile=etl.env.example
```

### Run the unit tests in Docker

```bash
./gradlew runEtlUnitTests
```

This uses Docker, mounts the local `src` and `tests` directories, and runs `pytest` inside the container.

### Run the full verification task

```bash
./gradlew check
```

`check` depends on `runEtlUnitTests` in this project.

## Local Development

For local Python execution, ensure the environment variables for source and destination CDA endpoints are set, then run:

```bash
python src/cda_etl/main.py
```

The process will load the YAML config, stage files under `settings.path`, and publish to the destination CDA API.
126 changes: 85 additions & 41 deletions cda-etl/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,56 +22,100 @@ plugins {
id 'base'
}

def isWindows = {
System.getProperty('os.name').toLowerCase().contains('win')
final def imageName = providers.gradleProperty('etlImageName').orElse('cwms-data-api/etl')
final def dockerPull = providers.gradleProperty('dockerPull')
.map { it.toBoolean() }
.orElse(false)

def dockerVersion

def getDockerVersion = {
if (dockerVersion != null) {
return dockerVersion
}

try {
def stdout = new ByteArrayOutputStream()
def stderr = new ByteArrayOutputStream()

exec {
commandLine 'docker', 'info', '--format', '{{.ServerVersion}}'
standardOutput = stdout
errorOutput = stderr
}

dockerVersion = stdout.toString().trim()
} catch (ignored) {
dockerVersion = ''
}

return dockerVersion
}

final def pythonCmd = isWindows() ? 'python' : 'python3'
final def mainScript = 'src/cda_etl/main.py'
final def envFile = 'etl.env'
final def reqFile = 'requirements.txt'

tasks.register('installRequirements', Exec) {
commandLine pythonCmd, '-m', 'pip', 'install', '-r', reqFile
workingDir projectDir
inputs.file(new File(projectDir, reqFile))
outputs.file(new File(buildDir, 'pip-install.marker'))
doLast {
mkdir buildDir
new File(buildDir, 'pip-install.marker').text = "installed at ${new Date()}\n"
def isDockerAvailable = { taskName ->
def version = getDockerVersion()
if (!version) {
logger.lifecycle("Skipping ${taskName} because Docker is not installed or the Docker daemon is unavailable.")
return false
}

logger.info("Docker is available: ${version}")
return true
}

tasks.register('runEtl', Exec) {
dependsOn 'installRequirements'
group 'application'
executable pythonCmd
args mainScript
workingDir projectDir

// Load environment variables from etl.env
tasks.register('dockerBuild', Exec) {
group = 'docker'
description = 'Builds the CDA ETL Docker image.'

onlyIf {
isDockerAvailable(name)
}

doFirst {
def envFileObj = new File(projectDir, envFile)
if (envFileObj.exists()) {
envFileObj.eachLine { line ->
if (line.trim() && !line.startsWith('#')) {
def parts = line.split('=', 2)
if (parts.length == 2) {
environment parts[0].trim(), parts[1].trim()
}
}
}
} else {
logger.warn("Environment file ${envFile} not found.")
def args = ['docker', 'build']

if (dockerPull.get()) {
args += '--pull'
}

args += ['-t', imageName.get(), '.']

commandLine args
}

inputs.file('Dockerfile')
inputs.file('requirements.txt')
inputs.dir('src')
}

tasks.register('runEtlUnitTests', Exec) {
dependsOn 'installRequirements'
group 'verification'
executable pythonCmd
args '-m', 'pytest'
workingDir projectDir
environment 'PYTHONPATH', 'src/cda_etl'
dependsOn 'dockerBuild'

group = 'verification'
description = 'Runs ETL unit tests in Docker with local source and tests mounted for faster iteration.'

onlyIf {
isDockerAvailable(name)
}

doFirst {
def args = [
'docker', 'run', '--rm',
'-e', 'PYTHONPATH=/app/src/cda_etl',
'-v', "${projectDir}/src:/app/src"
]

def testsDir = file('tests')
args += ['-v', "${testsDir.absolutePath}:/app/tests"]
args += [imageName.get(), 'python', '-m', 'pytest']

commandLine args
}

inputs.dir('src')
inputs.dir('tests').optional()
}

tasks.named('check') {
dependsOn 'runEtlUnitTests'
}
35 changes: 35 additions & 0 deletions cda-etl/data/regi/SWT/Projects/EUFA.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{
"location": {
"office-id": "SWT",
"name": "EUFA",
"latitude": 35.3069444,
"longitude": -95.3625,
"active": true,
"public-name": "Eufaula Lake",
"long-name": "Eufaula Lake near Brooken, OK",
"description": "Eufaula Lake near Brooken, OK",
"timezone-name": "US/Central",
"location-type": "RESERVOIR",
"location-kind": "PROJECT",
"nation": "US",
"state-initial": "OK",
"county-name": "Haskell",
"nearest-city": "Hoyt, OK",
"horizontal-datum": "WGS84",
"published-longitude": 95.3625,
"published-latitude": 35.306944444444,
"vertical-datum": "NGVD29",
"elevation": 151.79,
"bounding-office-id": "SWT",
"elevation-units": "m"
},
"federal-cost": 0,
"non-federal-cost": 0,
"cost-unit": "$",
"federal-o-and-m-cost": 0,
"non-federal-o-and-m-cost": 0,
"project-owner": "USACE",
"sedimentation-desc": "Lake inflow carries large amount of sediment from Canadian, North Canadian, and Deep Fork Rivers. During high-flow, bank caving and erosion becomes a problem. Avg annual sedimentation rate is 9,417 ac-ft/yr.",
"downstream-urban-desc": "Eufala, OK is located on the Eufala Reservoir. Whitefield is located downstream from the dam on the mainstem of the Canadian River.",
"bank-full-capacity-desc": "Bankfull Capacity at Whitefield, OK is 40,000 cfs (stage 13.01 ft)."
}
Loading
Loading