Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
390 changes: 332 additions & 58 deletions docs/source/methods/time_gridding.rst

Large diffs are not rendered by default.

308 changes: 3 additions & 305 deletions notebooks/demo_stage2.ipynb

Large diffs are not rendered by default.

483 changes: 0 additions & 483 deletions notebooks/demo_stage3.ipynb

This file was deleted.

630 changes: 630 additions & 0 deletions notebooks/demo_step1.ipynb

Large diffs are not rendered by default.

51 changes: 27 additions & 24 deletions oceanarray/generate_test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
"""
Generate test_data_raw.nc from the real CNV file for Stage 2 testing.
"""
import yaml
from pathlib import Path

import yaml

from oceanarray.stage1 import MooringProcessor


Expand Down Expand Up @@ -32,33 +34,33 @@ def create_test_stage1_data():

# Create YAML configuration
yaml_data = {
'name': 'test_mooring',
'waterdepth': 1000,
'longitude': -30.0,
'latitude': 60.0,
'deployment_latitude': '60 00.000 N',
'deployment_longitude': '030 00.000 W',
'deployment_time': '2018-08-12T08:00:00', # Before data starts
'recovery_time': '2018-08-26T20:47:24', # After data ends
'seabed_latitude': '60 00.000 N',
'seabed_longitude': '030 00.000 W',
'directory': 'moor/raw/test_deployment/',
'instruments': [
"name": "test_mooring",
"waterdepth": 1000,
"longitude": -30.0,
"latitude": 60.0,
"deployment_latitude": "60 00.000 N",
"deployment_longitude": "030 00.000 W",
"deployment_time": "2018-08-12T08:00:00", # Before data starts
"recovery_time": "2018-08-26T20:47:24", # After data ends
"seabed_latitude": "60 00.000 N",
"seabed_longitude": "030 00.000 W",
"directory": "moor/raw/test_deployment/",
"instruments": [
{
'instrument': 'microcat',
'serial': 7518,
'depth': 100,
'filename': 'test_data.cnv',
'file_type': 'sbe-cnv',
'clock_offset': 300, # 5 minutes offset for testing
'start_time': '2018-08-12T08:00:00',
'end_time': '2018-08-26T20:47:24'
"instrument": "microcat",
"serial": 7518,
"depth": 100,
"filename": "test_data.cnv",
"file_type": "sbe-cnv",
"clock_offset": 300, # 5 minutes offset for testing
"start_time": "2018-08-12T08:00:00",
"end_time": "2018-08-26T20:47:24",
}
]
],
}

config_file = proc_dir / "test_mooring.mooring.yaml"
with open(config_file, 'w') as f:
with open(config_file, "w") as f:
yaml.dump(yaml_data, f)

print(f"Created YAML config at {config_file}")
Expand All @@ -83,8 +85,9 @@ def create_test_stage1_data():

# Cleanup temp directory
import shutil

shutil.rmtree(test_dir)
print(f"Cleaned up temporary directory")
print("Cleaned up temporary directory")

return True
else:
Expand Down
158 changes: 97 additions & 61 deletions oceanarray/stage2.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@
- Trimming data to deployment/recovery time windows
- Writing updated NetCDF files with '_use' suffix
"""
import os
import yaml

from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional

import numpy as np
import pandas as pd
import xarray as xr
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Optional, Tuple, Any, Union
import yaml
from ctd_tools.writers import NetCdfWriter


Expand All @@ -28,19 +29,19 @@ def __init__(self, base_dir: str):

def _setup_logging(self, mooring_name: str, output_path: Path) -> None:
"""Set up logging for the processing run."""
log_time = datetime.now().strftime('%Y%m%dT%H')
log_time = datetime.now().strftime("%Y%m%dT%H")
self.log_file = output_path / f"{mooring_name}_{log_time}_stage2.mooring.log"

def _log_print(self, *args, **kwargs) -> None:
"""Print to both console and log file."""
print(*args, **kwargs)
if self.log_file:
with open(self.log_file, 'a') as f:
with open(self.log_file, "a") as f:
print(*args, **kwargs, file=f)

def _load_mooring_config(self, config_path: Path) -> Dict[str, Any]:
"""Load mooring configuration from YAML file."""
with open(config_path, 'r') as f:
with open(config_path, "r") as f:
return yaml.safe_load(f)

def _read_yaml_time(self, data: Dict[str, Any], key: str) -> np.datetime64:
Expand All @@ -53,7 +54,9 @@ def _read_yaml_time(self, data: Dict[str, Any], key: str) -> np.datetime64:
except Exception:
return np.datetime64("NaT", "ns")

def _apply_clock_offset(self, dataset: xr.Dataset, clock_offset: float) -> xr.Dataset:
def _apply_clock_offset(
self, dataset: xr.Dataset, clock_offset: float
) -> xr.Dataset:
"""Apply clock offset correction to dataset time coordinate."""
if clock_offset == 0:
return dataset
Expand All @@ -64,17 +67,20 @@ def _apply_clock_offset(self, dataset: xr.Dataset, clock_offset: float) -> xr.Da
result = dataset.copy()

# Add clock offset as a variable
result['clock_offset'] = clock_offset
result['clock_offset'].attrs['units'] = 's'
result["clock_offset"] = clock_offset
result["clock_offset"].attrs["units"] = "s"

# Apply the correction to time coordinate
result['time'] = result['time'] + np.timedelta64(int(clock_offset), 's')
result["time"] = result["time"] + np.timedelta64(int(clock_offset), "s")

return result

def _trim_to_deployment_window(self, dataset: xr.Dataset,
deploy_time: np.datetime64,
recover_time: np.datetime64) -> xr.Dataset:
def _trim_to_deployment_window(
self,
dataset: xr.Dataset,
deploy_time: np.datetime64,
recover_time: np.datetime64,
) -> xr.Dataset:
"""Trim dataset to deployment time window."""
original_size = len(dataset.time)

Expand All @@ -96,26 +102,27 @@ def _trim_to_deployment_window(self, dataset: xr.Dataset,

return dataset

def _add_missing_metadata(self, dataset: xr.Dataset,
instrument_config: Dict[str, Any]) -> xr.Dataset:
def _add_missing_metadata(
self, dataset: xr.Dataset, instrument_config: Dict[str, Any]
) -> xr.Dataset:
"""Add any missing metadata variables to dataset."""
# Add instrument depth if missing
if 'InstrDepth' not in dataset.variables and 'depth' in instrument_config:
dataset['InstrDepth'] = instrument_config['depth']
if "InstrDepth" not in dataset.variables and "depth" in instrument_config:
dataset["InstrDepth"] = instrument_config["depth"]

# Add instrument type if missing
if 'instrument' not in dataset.variables and 'instrument' in instrument_config:
dataset['instrument'] = instrument_config['instrument']
if "instrument" not in dataset.variables and "instrument" in instrument_config:
dataset["instrument"] = instrument_config["instrument"]

# Add serial number if missing
if 'serial_number' not in dataset.variables and 'serial' in instrument_config:
dataset['serial_number'] = instrument_config['serial']
if "serial_number" not in dataset.variables and "serial" in instrument_config:
dataset["serial_number"] = instrument_config["serial"]

return dataset

def _clean_unnecessary_variables(self, dataset: xr.Dataset) -> xr.Dataset:
"""Remove variables that are not needed in the final product."""
vars_to_remove = ['timeS'] # Add other variables as needed
vars_to_remove = ["timeS"] # Add other variables as needed

for var in vars_to_remove:
if var in dataset.variables:
Expand All @@ -127,29 +134,45 @@ def _clean_unnecessary_variables(self, dataset: xr.Dataset) -> xr.Dataset:
def _get_netcdf_writer_params(self) -> Dict[str, Any]:
"""Get standard parameters for NetCDF writer."""
return {
'optimize': True,
'drop_derived': False,
'uint8_vars': [
"correlation_magnitude", "echo_intensity", "status", "percent_good",
"bt_correlation", "bt_amplitude", "bt_percent_good",
"optimize": True,
"drop_derived": False,
"uint8_vars": [
"correlation_magnitude",
"echo_intensity",
"status",
"percent_good",
"bt_correlation",
"bt_amplitude",
"bt_percent_good",
],
'float32_vars': [
"eastward_velocity", "northward_velocity", "upward_velocity",
"temperature", "salinity", "pressure", "pressure_std", "depth", "bt_velocity",
"float32_vars": [
"eastward_velocity",
"northward_velocity",
"upward_velocity",
"temperature",
"salinity",
"pressure",
"pressure_std",
"depth",
"bt_velocity",
],
'chunk_time': 3600,
'complevel': 5,
'quantize': 3,
"chunk_time": 3600,
"complevel": 5,
"quantize": 3,
}

def _process_instrument(self, instrument_config: Dict[str, Any],
mooring_config: Dict[str, Any],
proc_dir: Path, mooring_name: str,
deploy_time: np.datetime64,
recover_time: np.datetime64) -> bool:
def _process_instrument(
self,
instrument_config: Dict[str, Any],
mooring_config: Dict[str, Any],
proc_dir: Path,
mooring_name: str,
deploy_time: np.datetime64,
recover_time: np.datetime64,
) -> bool:
"""Process a single instrument's Stage 1 output."""
serial = instrument_config.get('serial', 'unknown')
instrument_type = instrument_config.get('instrument', 'unknown')
serial = instrument_config.get("serial", "unknown")
instrument_type = instrument_config.get("instrument", "unknown")

# Construct file paths
raw_filename = f"{mooring_name}_{serial}_raw.nc"
Expand Down Expand Up @@ -177,19 +200,23 @@ def _process_instrument(self, instrument_config: Dict[str, Any],
dataset = self._clean_unnecessary_variables(dataset)

# Apply clock offset
clock_offset = instrument_config.get('clock_offset', 0)
clock_offset = instrument_config.get("clock_offset", 0)
dataset = self._apply_clock_offset(dataset, clock_offset)

# Trim to deployment window
dataset = self._trim_to_deployment_window(dataset, deploy_time, recover_time)
dataset = self._trim_to_deployment_window(
dataset, deploy_time, recover_time
)

if len(dataset.time) == 0:
self._log_print(f"ERROR: No data remains after processing {instrument_type} {serial}")
self._log_print(
f"ERROR: No data remains after processing {instrument_type} {serial}"
)
return False

# Log time range
start_time = dataset['time'].values.min()
end_time = dataset['time'].values.max()
start_time = dataset["time"].values.min()
end_time = dataset["time"].values.max()
self._log_print(f"Final time range: {start_time} to {end_time}")

# Remove existing output file if it exists
Expand All @@ -209,8 +236,9 @@ def _process_instrument(self, instrument_config: Dict[str, Any],
self._log_print(f"ERROR processing {instrument_type} {serial}: {e}")
return False

def process_mooring(self, mooring_name: str,
output_path: Optional[str] = None) -> bool:
def process_mooring(
self, mooring_name: str, output_path: Optional[str] = None
) -> bool:
"""
Process Stage 2 for a single mooring.

Expand All @@ -223,7 +251,7 @@ def process_mooring(self, mooring_name: str,
"""
# Set up paths
if output_path is None:
proc_dir = self.base_dir / 'moor' / 'proc' / mooring_name
proc_dir = self.base_dir / "moor" / "proc" / mooring_name
else:
proc_dir = Path(output_path) / mooring_name

Expand Down Expand Up @@ -256,22 +284,29 @@ def process_mooring(self, mooring_name: str,

# Process each instrument
success_count = 0
total_count = len(mooring_config.get('instruments', []))
total_count = len(mooring_config.get("instruments", []))

for instrument_config in mooring_config.get('instruments', []):
for instrument_config in mooring_config.get("instruments", []):
success = self._process_instrument(
instrument_config, mooring_config, proc_dir,
mooring_name, deploy_time, recover_time
instrument_config,
mooring_config,
proc_dir,
mooring_name,
deploy_time,
recover_time,
)
if success:
success_count += 1

self._log_print(f"Stage 2 completed: {success_count}/{total_count} instruments successful")
self._log_print(
f"Stage 2 completed: {success_count}/{total_count} instruments successful"
)
return success_count > 0


def stage2_mooring(mooring_name: str, basedir: str,
output_path: Optional[str] = None) -> bool:
def stage2_mooring(
mooring_name: str, basedir: str, output_path: Optional[str] = None
) -> bool:
"""
Process Stage 2 for a single mooring (backwards compatibility function).

Expand All @@ -287,8 +322,9 @@ def stage2_mooring(mooring_name: str, basedir: str,
return processor.process_mooring(mooring_name, output_path)


def process_multiple_moorings_stage2(mooring_list: List[str],
basedir: str) -> Dict[str, bool]:
def process_multiple_moorings_stage2(
mooring_list: List[str], basedir: str
) -> Dict[str, bool]:
"""
Process Stage 2 for multiple moorings.

Expand All @@ -315,9 +351,9 @@ def process_multiple_moorings_stage2(mooring_list: List[str],
# Example usage
if __name__ == "__main__":
# Your mooring list
moorlist = ['dsE_1_2018']
moorlist = ["dsE_1_2018"]

basedir = '/Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/'
basedir = "/Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/"

# Process all moorings
results = process_multiple_moorings_stage2(moorlist, basedir)
Expand Down
Loading
Loading