diff --git a/data/test_data_raw.nc b/data/test_data_raw.nc new file mode 100644 index 0000000..1743f4d Binary files /dev/null and b/data/test_data_raw.nc differ diff --git a/data/test_mooring.yaml b/data/test_mooring.yaml new file mode 100644 index 0000000..b2ba240 --- /dev/null +++ b/data/test_mooring.yaml @@ -0,0 +1,20 @@ +deployment_latitude: 60 00.000 N +deployment_longitude: 030 00.000 W +deployment_time: '2018-08-12T08:00:00' +directory: moor/raw/test_deployment/ +instruments: +- clock_offset: 300 + depth: 100 + end_time: '2018-08-26T20:47:24' + file_type: sbe-cnv + filename: test_data.cnv + instrument: microcat + serial: 7518 + start_time: '2018-08-12T08:00:00' +latitude: 60.0 +longitude: -30.0 +name: test_mooring +recovery_time: '2018-08-26T20:47:24' +seabed_latitude: 60 00.000 N +seabed_longitude: 030 00.000 W +waterdepth: 1000 diff --git a/notebooks/demo_stage2.ipynb b/notebooks/demo_stage2.ipynb index 5613925..b593485 100644 --- a/notebooks/demo_stage2.ipynb +++ b/notebooks/demo_stage2.ipynb @@ -13,7 +13,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 1, "id": "0ff1f4d0", "metadata": {}, "outputs": [], @@ -41,134 +41,319 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 2, "id": "518ff131", "metadata": {}, - "outputs": [], + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "==================================================\n", + "Processing Stage 2 for mooring dsE_1_2018\n", + "==================================================\n", + "Starting Stage 2 processing for mooring: dsE_1_2018\n", + "Deployment time: 2018-08-12T22:44:00.000000000\n", + "Recovery time: 2018-08-26T10:38:00.000000000\n", + "Processing sbe56 serial 6363\n", + "Trimming start to deployment time: 2018-08-12T22:44:00.000000000\n", + "Trimming end to recovery time: 2018-08-26T10:38:00.000000000\n", + "Trimmed from 128476 to 111829 records\n", + "Final time range: 2018-08-13T12:00:00.000000000 to 2018-08-26T10:37:59.001600000\n", + "Removed existing file: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/sbe56/dsE_1_2018_6363_use.nc\n", + "Successfully wrote: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/sbe56/dsE_1_2018_6363_use.nc\n", + "Processing sbe16 serial 2419\n", + "Removing variable: timeS\n", + "Applying clock offset: 259000 seconds\n", + "Trimming start to deployment time: 2018-08-12T22:44:00.000000000\n", + "Trimming end to recovery time: 2018-08-26T10:38:00.000000000\n", + "Trimmed from 51670 to 47284 records\n", + "Final time range: 2018-08-15T11:56:41.000000000 to 2018-08-26T10:37:41.000000000\n", + "Removed existing file: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/sbe16/dsE_1_2018_2419_use.nc\n", + "Successfully wrote: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/sbe16/dsE_1_2018_2419_use.nc\n", + "Processing sbe56 serial 6401\n", + "Applying clock offset: -2400 seconds\n", + "Trimming start to deployment time: 2018-08-12T22:44:00.000000000\n", + "Trimming end to recovery time: 2018-08-26T10:38:00.000000000\n", + "Trimmed from 128349 to 112069 records\n", + "Final time range: 2018-08-13T11:20:00.000000000 to 2018-08-26T10:37:59.020800000\n", + "Removed existing file: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/sbe56/dsE_1_2018_6401_use.nc\n", + "Successfully wrote: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/sbe56/dsE_1_2018_6401_use.nc\n", + "Processing sbe56 serial 6402\n", + "Applying clock offset: 950 seconds\n", + "Trimming start to deployment time: 2018-08-12T22:44:00.000000000\n", + "Trimming end to recovery time: 2018-08-26T10:38:00.000000000\n", + "Trimmed from 128417 to 111734 records\n", + "Final time range: 2018-08-13T12:15:50.000000000 to 2018-08-26T10:37:59.033600000\n", + "Removed existing file: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/sbe56/dsE_1_2018_6402_use.nc\n", + "Successfully wrote: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/sbe56/dsE_1_2018_6402_use.nc\n", + "Processing sbe56 serial 8482\n", + "Applying clock offset: 2000 seconds\n", + "Trimming start to deployment time: 2018-08-12T22:44:00.000000000\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/Users/eddifying/Cloudfree/gitlab-cloudfree/ctd-tools/ctd_tools/writers/netcdf_writer.py:97: FutureWarning: The return type of `Dataset.dims` will be changed to return a set of dimension names in future, in order to be more consistent with `DataArray.dims`. To access a mapping from dimension names to lengths, please use `Dataset.sizes`.\n", + " chunks.append(max(1, min(ds.dims[d], int(chunk_time))))\n", + "/Users/eddifying/Cloudfree/github/oceanarray/oceanarray/stage2.py:166: FutureWarning: In a future version of xarray decode_timedelta will default to False rather than None. To silence this warning, set decode_timedelta to True, False, or a 'CFTimedeltaCoder' instance.\n", + " with xr.open_dataset(raw_filepath) as ds:\n", + "/Users/eddifying/Cloudfree/gitlab-cloudfree/ctd-tools/ctd_tools/writers/netcdf_writer.py:97: FutureWarning: The return type of `Dataset.dims` will be changed to return a set of dimension names in future, in order to be more consistent with `DataArray.dims`. To access a mapping from dimension names to lengths, please use `Dataset.sizes`.\n", + " chunks.append(max(1, min(ds.dims[d], int(chunk_time))))\n", + "/Users/eddifying/Cloudfree/gitlab-cloudfree/ctd-tools/ctd_tools/writers/netcdf_writer.py:97: FutureWarning: The return type of `Dataset.dims` will be changed to return a set of dimension names in future, in order to be more consistent with `DataArray.dims`. To access a mapping from dimension names to lengths, please use `Dataset.sizes`.\n", + " chunks.append(max(1, min(ds.dims[d], int(chunk_time))))\n", + "/Users/eddifying/Cloudfree/gitlab-cloudfree/ctd-tools/ctd_tools/writers/netcdf_writer.py:97: FutureWarning: The return type of `Dataset.dims` will be changed to return a set of dimension names in future, in order to be more consistent with `DataArray.dims`. To access a mapping from dimension names to lengths, please use `Dataset.sizes`.\n", + " chunks.append(max(1, min(ds.dims[d], int(chunk_time))))\n", + "/Users/eddifying/Cloudfree/gitlab-cloudfree/ctd-tools/ctd_tools/writers/netcdf_writer.py:97: FutureWarning: The return type of `Dataset.dims` will be changed to return a set of dimension names in future, in order to be more consistent with `DataArray.dims`. To access a mapping from dimension names to lengths, please use `Dataset.sizes`.\n", + " chunks.append(max(1, min(ds.dims[d], int(chunk_time))))\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Trimming end to recovery time: 2018-08-26T10:38:00.000000000\n", + "Trimmed from 136972 to 116604 records\n", + "Final time range: 2018-08-12T22:44:08.979200000 to 2018-08-26T10:37:59.014400000\n", + "Removed existing file: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/sbe56/dsE_1_2018_8482_use.nc\n", + "Successfully wrote: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/sbe56/dsE_1_2018_8482_use.nc\n", + "Processing sbe56 serial 6365\n", + "Trimming start to deployment time: 2018-08-12T22:44:00.000000000\n", + "Trimming end to recovery time: 2018-08-26T10:38:00.000000000\n", + "Trimmed from 128443 to 111829 records\n", + "Final time range: 2018-08-13T12:00:00.000000000 to 2018-08-26T10:37:59.001600000\n", + "Removed existing file: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/sbe56/dsE_1_2018_6365_use.nc\n", + "Successfully wrote: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/sbe56/dsE_1_2018_6365_use.nc\n", + "Processing sbe56 serial 6409\n", + "Trimming start to deployment time: 2018-08-12T22:44:00.000000000\n", + "Trimming end to recovery time: 2018-08-26T10:38:00.000000000\n", + "Trimmed from 128431 to 111829 records\n", + "Final time range: 2018-08-13T12:00:00.000000000 to 2018-08-26T10:37:59.001600000\n", + "Removed existing file: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/sbe56/dsE_1_2018_6409_use.nc\n", + "Successfully wrote: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/sbe56/dsE_1_2018_6409_use.nc\n", + "Processing sbe56 serial 6397\n", + "Applying clock offset: -2400 seconds\n", + "Trimming start to deployment time: 2018-08-12T22:44:00.000000000\n", + "Trimming end to recovery time: 2018-08-26T10:38:00.000000000\n", + "Trimmed from 128405 to 112069 records\n", + "Final time range: 2018-08-13T11:20:00.000000000 to 2018-08-26T10:37:59.020800000\n", + "Removed existing file: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/sbe56/dsE_1_2018_6397_use.nc\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/Users/eddifying/Cloudfree/gitlab-cloudfree/ctd-tools/ctd_tools/writers/netcdf_writer.py:97: FutureWarning: The return type of `Dataset.dims` will be changed to return a set of dimension names in future, in order to be more consistent with `DataArray.dims`. To access a mapping from dimension names to lengths, please use `Dataset.sizes`.\n", + " chunks.append(max(1, min(ds.dims[d], int(chunk_time))))\n", + "/Users/eddifying/Cloudfree/gitlab-cloudfree/ctd-tools/ctd_tools/writers/netcdf_writer.py:97: FutureWarning: The return type of `Dataset.dims` will be changed to return a set of dimension names in future, in order to be more consistent with `DataArray.dims`. To access a mapping from dimension names to lengths, please use `Dataset.sizes`.\n", + " chunks.append(max(1, min(ds.dims[d], int(chunk_time))))\n", + "/Users/eddifying/Cloudfree/gitlab-cloudfree/ctd-tools/ctd_tools/writers/netcdf_writer.py:97: FutureWarning: The return type of `Dataset.dims` will be changed to return a set of dimension names in future, in order to be more consistent with `DataArray.dims`. To access a mapping from dimension names to lengths, please use `Dataset.sizes`.\n", + " chunks.append(max(1, min(ds.dims[d], int(chunk_time))))\n", + "/Users/eddifying/Cloudfree/gitlab-cloudfree/ctd-tools/ctd_tools/writers/netcdf_writer.py:97: FutureWarning: The return type of `Dataset.dims` will be changed to return a set of dimension names in future, in order to be more consistent with `DataArray.dims`. To access a mapping from dimension names to lengths, please use `Dataset.sizes`.\n", + " chunks.append(max(1, min(ds.dims[d], int(chunk_time))))\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Successfully wrote: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/sbe56/dsE_1_2018_6397_use.nc\n", + "Processing sbe56 serial 6366\n", + "Applying clock offset: -2400 seconds\n", + "Trimming start to deployment time: 2018-08-12T22:44:00.000000000\n", + "Trimming end to recovery time: 2018-08-26T10:38:00.000000000\n", + "Trimmed from 128392 to 112069 records\n", + "Final time range: 2018-08-13T11:20:00.000000000 to 2018-08-26T10:37:59.020800000\n", + "Removed existing file: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/sbe56/dsE_1_2018_6366_use.nc\n", + "Successfully wrote: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/sbe56/dsE_1_2018_6366_use.nc\n", + "Processing sbe56 serial 6394\n", + "Applying clock offset: -2400 seconds\n", + "Trimming start to deployment time: 2018-08-12T22:44:00.000000000\n", + "Trimming end to recovery time: 2018-08-26T10:38:00.000000000\n", + "Trimmed from 128487 to 112069 records\n", + "Final time range: 2018-08-13T11:20:00.000000000 to 2018-08-26T10:37:59.020800000\n", + "Removed existing file: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/sbe56/dsE_1_2018_6394_use.nc\n", + "Successfully wrote: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/sbe56/dsE_1_2018_6394_use.nc\n", + "Processing sbe56 serial 6370\n", + "Applying clock offset: -2400 seconds\n", + "Trimming start to deployment time: 2018-08-12T22:44:00.000000000\n", + "Trimming end to recovery time: 2018-08-26T10:38:00.000000000\n", + "Trimmed from 128455 to 112069 records\n", + "Final time range: 2018-08-13T11:20:00.000000000 to 2018-08-26T10:37:59.020800000\n", + "Removed existing file: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/sbe56/dsE_1_2018_6370_use.nc\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/Users/eddifying/Cloudfree/gitlab-cloudfree/ctd-tools/ctd_tools/writers/netcdf_writer.py:97: FutureWarning: The return type of `Dataset.dims` will be changed to return a set of dimension names in future, in order to be more consistent with `DataArray.dims`. To access a mapping from dimension names to lengths, please use `Dataset.sizes`.\n", + " chunks.append(max(1, min(ds.dims[d], int(chunk_time))))\n", + "/Users/eddifying/Cloudfree/gitlab-cloudfree/ctd-tools/ctd_tools/writers/netcdf_writer.py:97: FutureWarning: The return type of `Dataset.dims` will be changed to return a set of dimension names in future, in order to be more consistent with `DataArray.dims`. To access a mapping from dimension names to lengths, please use `Dataset.sizes`.\n", + " chunks.append(max(1, min(ds.dims[d], int(chunk_time))))\n", + "/Users/eddifying/Cloudfree/gitlab-cloudfree/ctd-tools/ctd_tools/writers/netcdf_writer.py:97: FutureWarning: The return type of `Dataset.dims` will be changed to return a set of dimension names in future, in order to be more consistent with `DataArray.dims`. To access a mapping from dimension names to lengths, please use `Dataset.sizes`.\n", + " chunks.append(max(1, min(ds.dims[d], int(chunk_time))))\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Successfully wrote: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/sbe56/dsE_1_2018_6370_use.nc\n", + "WARNING: Raw file not found: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/sbe16/dsE_1_2018_2418_raw.nc\n", + "Processing tr1050 serial 13889\n", + "Applying clock offset: 85620 seconds\n", + "Trimming start to deployment time: 2018-08-12T22:44:00.000000000\n", + "Trimming end to recovery time: 2018-08-26T10:38:00.000000000\n", + "Trimmed from 130255 to 111907 records\n", + "Final time range: 2018-08-13T11:47:00.000000000 to 2018-08-26T10:38:00.000000000\n", + "Removed existing file: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/tr1050/dsE_1_2018_13889_use.nc\n", + "Successfully wrote: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/tr1050/dsE_1_2018_13889_use.nc\n", + "Processing rbrsolo serial 101651\n", + "Applying clock offset: 85970 seconds\n", + "Trimming start to deployment time: 2018-08-12T22:44:00.000000000\n", + "Trimming end to recovery time: 2018-08-26T10:38:00.000000000\n", + "Trimmed from 130929 to 111872 records\n", + "Final time range: 2018-08-13T11:52:50.000000000 to 2018-08-26T10:38:00.000000000\n", + "Removed existing file: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/rbrsolo/dsE_1_2018_101651_use.nc\n", + "Successfully wrote: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/rbrsolo/dsE_1_2018_101651_use.nc\n", + "Processing tr1050 serial 15580\n", + "Applying clock offset: 85170 seconds\n", + "Trimming start to deployment time: 2018-08-12T22:44:00.000000000\n", + "Trimming end to recovery time: 2018-08-26T10:38:00.000000000\n", + "Trimmed from 130327 to 111952 records\n", + "Final time range: 2018-08-13T11:39:30.000000000 to 2018-08-26T10:38:00.000000000\n", + "Removed existing file: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/tr1050/dsE_1_2018_15580_use.nc\n", + "Successfully wrote: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/tr1050/dsE_1_2018_15580_use.nc\n", + "Processing rbrsolo serial 101647\n", + "Applying clock offset: 85920 seconds\n", + "Trimming start to deployment time: 2018-08-12T22:44:00.000000000\n", + "Trimming end to recovery time: 2018-08-26T10:38:00.000000000\n", + "Trimmed from 130987 to 111877 records\n", + "Final time range: 2018-08-13T11:52:00.000000000 to 2018-08-26T10:38:00.000000000\n", + "Removed existing file: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/rbrsolo/dsE_1_2018_101647_use.nc\n", + "Successfully wrote: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/rbrsolo/dsE_1_2018_101647_use.nc\n", + "Processing tr1050 serial 13874\n", + "Applying clock offset: 86256 seconds\n", + "Trimming start to deployment time: 2018-08-12T22:44:00.000000000\n", + "Trimming end to recovery time: 2018-08-26T10:38:00.000000000\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/Users/eddifying/Cloudfree/gitlab-cloudfree/ctd-tools/ctd_tools/writers/netcdf_writer.py:97: FutureWarning: The return type of `Dataset.dims` will be changed to return a set of dimension names in future, in order to be more consistent with `DataArray.dims`. To access a mapping from dimension names to lengths, please use `Dataset.sizes`.\n", + " chunks.append(max(1, min(ds.dims[d], int(chunk_time))))\n", + "/Users/eddifying/Cloudfree/gitlab-cloudfree/ctd-tools/ctd_tools/writers/netcdf_writer.py:97: FutureWarning: The return type of `Dataset.dims` will be changed to return a set of dimension names in future, in order to be more consistent with `DataArray.dims`. To access a mapping from dimension names to lengths, please use `Dataset.sizes`.\n", + " chunks.append(max(1, min(ds.dims[d], int(chunk_time))))\n", + "/Users/eddifying/Cloudfree/gitlab-cloudfree/ctd-tools/ctd_tools/writers/netcdf_writer.py:97: FutureWarning: The return type of `Dataset.dims` will be changed to return a set of dimension names in future, in order to be more consistent with `DataArray.dims`. To access a mapping from dimension names to lengths, please use `Dataset.sizes`.\n", + " chunks.append(max(1, min(ds.dims[d], int(chunk_time))))\n", + "/Users/eddifying/Cloudfree/gitlab-cloudfree/ctd-tools/ctd_tools/writers/netcdf_writer.py:97: FutureWarning: The return type of `Dataset.dims` will be changed to return a set of dimension names in future, in order to be more consistent with `DataArray.dims`. To access a mapping from dimension names to lengths, please use `Dataset.sizes`.\n", + " chunks.append(max(1, min(ds.dims[d], int(chunk_time))))\n", + "/Users/eddifying/Cloudfree/gitlab-cloudfree/ctd-tools/ctd_tools/writers/netcdf_writer.py:97: FutureWarning: The return type of `Dataset.dims` will be changed to return a set of dimension names in future, in order to be more consistent with `DataArray.dims`. To access a mapping from dimension names to lengths, please use `Dataset.sizes`.\n", + " chunks.append(max(1, min(ds.dims[d], int(chunk_time))))\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Trimmed from 88987 to 88987 records\n", + "Final time range: 2018-08-13T11:57:36.000000000 to 2018-08-23T19:08:36.000000000\n", + "Removed existing file: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/tr1050/dsE_1_2018_13874_use.nc\n", + "Successfully wrote: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/tr1050/dsE_1_2018_13874_use.nc\n", + "Processing rbrsolo serial 101645\n", + "Applying clock offset: 85860 seconds\n", + "Trimming start to deployment time: 2018-08-12T22:44:00.000000000\n", + "Trimming end to recovery time: 2018-08-26T10:38:00.000000000\n", + "Trimmed from 131002 to 111883 records\n", + "Final time range: 2018-08-13T11:51:00.000000000 to 2018-08-26T10:38:00.000000000\n", + "Removed existing file: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/rbrsolo/dsE_1_2018_101645_use.nc\n", + "Successfully wrote: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/rbrsolo/dsE_1_2018_101645_use.nc\n", + "Processing tr1050 serial 15574\n", + "Applying clock offset: 85920 seconds\n", + "Trimming start to deployment time: 2018-08-12T22:44:00.000000000\n", + "Trimming end to recovery time: 2018-08-26T10:38:00.000000000\n", + "Trimmed from 130363 to 111877 records\n", + "Final time range: 2018-08-13T11:52:00.000000000 to 2018-08-26T10:38:00.000000000\n", + "Removed existing file: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/tr1050/dsE_1_2018_15574_use.nc\n", + "Successfully wrote: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/tr1050/dsE_1_2018_15574_use.nc\n", + "Processing rbrsolo serial 101646\n", + "Applying clock offset: 85320 seconds\n", + "Trimming start to deployment time: 2018-08-12T22:44:00.000000000\n", + "Trimming end to recovery time: 2018-08-26T10:38:00.000000000\n", + "Trimmed from 130964 to 111937 records\n", + "Final time range: 2018-08-13T11:42:00.000000000 to 2018-08-26T10:38:00.000000000\n", + "Removed existing file: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/rbrsolo/dsE_1_2018_101646_use.nc\n", + "Successfully wrote: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/rbrsolo/dsE_1_2018_101646_use.nc\n", + "Processing tr1050 serial 15577\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/Users/eddifying/Cloudfree/gitlab-cloudfree/ctd-tools/ctd_tools/writers/netcdf_writer.py:97: FutureWarning: The return type of `Dataset.dims` will be changed to return a set of dimension names in future, in order to be more consistent with `DataArray.dims`. To access a mapping from dimension names to lengths, please use `Dataset.sizes`.\n", + " chunks.append(max(1, min(ds.dims[d], int(chunk_time))))\n", + "/Users/eddifying/Cloudfree/gitlab-cloudfree/ctd-tools/ctd_tools/writers/netcdf_writer.py:97: FutureWarning: The return type of `Dataset.dims` will be changed to return a set of dimension names in future, in order to be more consistent with `DataArray.dims`. To access a mapping from dimension names to lengths, please use `Dataset.sizes`.\n", + " chunks.append(max(1, min(ds.dims[d], int(chunk_time))))\n", + "/Users/eddifying/Cloudfree/gitlab-cloudfree/ctd-tools/ctd_tools/writers/netcdf_writer.py:97: FutureWarning: The return type of `Dataset.dims` will be changed to return a set of dimension names in future, in order to be more consistent with `DataArray.dims`. To access a mapping from dimension names to lengths, please use `Dataset.sizes`.\n", + " chunks.append(max(1, min(ds.dims[d], int(chunk_time))))\n", + "/Users/eddifying/Cloudfree/gitlab-cloudfree/ctd-tools/ctd_tools/writers/netcdf_writer.py:97: FutureWarning: The return type of `Dataset.dims` will be changed to return a set of dimension names in future, in order to be more consistent with `DataArray.dims`. To access a mapping from dimension names to lengths, please use `Dataset.sizes`.\n", + " chunks.append(max(1, min(ds.dims[d], int(chunk_time))))\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Applying clock offset: 84420 seconds\n", + "Trimming start to deployment time: 2018-08-12T22:44:00.000000000\n", + "Trimming end to recovery time: 2018-08-26T10:38:00.000000000\n", + "Trimmed from 130190 to 112027 records\n", + "Final time range: 2018-08-13T11:27:00.000000000 to 2018-08-26T10:38:00.000000000\n", + "Removed existing file: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/tr1050/dsE_1_2018_15577_use.nc\n", + "Successfully wrote: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/tr1050/dsE_1_2018_15577_use.nc\n", + "Processing microcat serial 7518\n", + "Applying clock offset: -2050 seconds\n", + "Trimming start to deployment time: 2018-08-12T22:44:00.000000000\n", + "Trimming end to recovery time: 2018-08-26T10:38:00.000000000\n", + "Trimmed from 124619 to 113473 records\n", + "Final time range: 2018-08-13T07:25:51.008000000 to 2018-08-26T10:37:50.979200000\n", + "Removed existing file: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/microcat/dsE_1_2018_7518_use.nc\n", + "Successfully wrote: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/microcat/dsE_1_2018_7518_use.nc\n", + "Processing sbe56 serial 6364\n", + "Applying clock offset: -200 seconds\n", + "Trimming start to deployment time: 2018-08-12T22:44:00.000000000\n", + "Trimming end to recovery time: 2018-08-26T10:38:00.000000000\n", + "Trimmed from 128381 to 111849 records\n", + "Final time range: 2018-08-13T11:56:40.000000000 to 2018-08-26T10:37:59.017600000\n", + "Removed existing file: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/sbe56/dsE_1_2018_6364_use.nc\n", + "Successfully wrote: /Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/moor/proc/dsE_1_2018/sbe56/dsE_1_2018_6364_use.nc\n", + "Stage 2 completed: 22/23 instruments successful\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/Users/eddifying/Cloudfree/gitlab-cloudfree/ctd-tools/ctd_tools/writers/netcdf_writer.py:97: FutureWarning: The return type of `Dataset.dims` will be changed to return a set of dimension names in future, in order to be more consistent with `DataArray.dims`. To access a mapping from dimension names to lengths, please use `Dataset.sizes`.\n", + " chunks.append(max(1, min(ds.dims[d], int(chunk_time))))\n" + ] + } + ], "source": [ - "# Specify the base directory. raw is a subdirectory from here moor/raw/ and proc is moor/proc\n", - "basedir = '/Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/'\n", - "output_path = basedir + 'moor/proc/'\n", - "\n", - "\n", - "\n", - "def read_yaml_time(data, key):\n", - " \"\"\"Return datetime64[ns] from YAML dict or NaT if missing/invalid.\"\"\"\n", - " val = data.get(key, None)\n", - " print(val)\n", - " if val is None or (isinstance(val, str) and not val.strip()):\n", - " return np.datetime64(\"NaT\", \"ns\")\n", - " try:\n", - " return pd.to_datetime(val).to_datetime64()\n", - " except Exception:\n", - " return np.datetime64(\"NaT\", \"ns\")\n", - "\n", - "# Cycle through the yaml and load instrument data into a list of xarray datasets\n", - "# Enrich the netCDF with information from the yaml file\n", - "# Find the mooring's processed directory & read the yaml specification\n", - "name1 = moorlist[0]\n", - "proc_dir = output_path + name1\n", - "moor_yaml = proc_dir + '/' + name1 + '.mooring.yaml'\n", - "with open(moor_yaml, 'r') as f:\n", - " moor_yaml_data = yaml.safe_load(f)\n", - "\n", - "# For each instrument, load the raw netCDF files and add some metadata from the yaml\n", - "datasets = []\n", - "deploy_time = read_yaml_time(moor_yaml_data, \"deployment_time\")\n", - "recover_time = read_yaml_time(moor_yaml_data, \"recovery_time\")\n", - "print(f\"deploy time is {deploy_time}\")\n", - "\n", - "for i in moor_yaml_data['instruments']:\n", - " fname = name1 + '_' + str(i['serial']) + '_raw.nc'\n", - " rawfile = proc_dir + '/' + i['instrument'] + '/' + fname\n", - "\n", - " if os.path.exists(rawfile):\n", - " ds1 = xr.open_dataset(rawfile)\n", + "from oceanarray.stage2 import Stage2Processor, process_multiple_moorings_stage2\n", "\n", - " if 'InstrDepth' not in ds1.variables and 'depth' in i:\n", - " ds1['InstrDepth'] = i['depth']\n", - " if 'instrument' not in ds1.variables and 'instrument' in i:\n", - " ds1['instrument'] = i['instrument']\n", - " if 'serial_number' not in ds1.variables and 'serial' in i:\n", - " ds1['serial_number'] = i['serial']\n", - " if 'timeS' in ds1.variables:\n", - " ds1 = ds1.drop_vars('timeS')\n", - "\n", - " # Apply the clock offset\n", - " ds1['clock_offset'] = i.get('clock_offset', 0)\n", - " ds1['clock_offset'].attrs['units'] = 's'\n", - " clock_offset = ds1['clock_offset'].values\n", - " ds1['time'] = ds1['time'] + np.timedelta64(int(ds1['clock_offset'].values), 's')\n", - "\n", - " if np.isfinite(deploy_time):\n", - " ds1 = ds1.sel(time=slice(deploy_time, None))\n", - " if np.isfinite(recover_time):\n", - " ds1 = ds1.sel(time=slice(None, recover_time))\n", - "\n", - " start_time = ds1['time'].values.min()\n", - " end_time = ds1['time'].values.max()\n", - " print(f\"Deploy time is {deploy_time}. Data starts at {start_time} and ends at {end_time}\")\n", - " #---------------------------------------------\n", - " # Store the data in a list of datasets\n", - " fname2 = fname.replace('_raw','_use')\n", - " fileout = proc_dir + '/' + i['instrument'] + '/' + fname2\n", - " print(f\"Saving to {proc_dir} + {fname2}\")\n", - "\n", - " if os.path.exists(fileout):\n", - " os.remove(fileout)\n", - "\n", - " writer = NetCdfWriter(ds1)\n", - " writer.write(\n", - " fileout,\n", - " optimize=True,\n", - " drop_derived=False, # drops vars with attrs[\"derived\"] == True (e.g., z)\n", - " uint8_vars=[\n", - " \"correlation_magnitude\", \"echo_intensity\", \"status\", \"percent_good\",\n", - " \"bt_correlation\", \"bt_amplitude\", \"bt_percent_good\",\n", - " ],\n", - " float32_vars=[ # optional explicit list; float32=True already covers floats generically\n", - " \"eastward_velocity\", \"northward_velocity\", \"upward_velocity\",\n", - " \"temperature\", \"salinity\", \"pressure\", \"pressure_std\", \"depth\", \"bt_velocity\",\n", - " ],\n", - " chunk_time=3600, # 1-hour chunks if you have ~1 Hz ensembles; adjust as needed\n", - " complevel=5,\n", - " quantize=3,\n", - " )\n", - "\n", - "\n", - "\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "982fccbb", - "metadata": {}, - "outputs": [], - "source": [ - "import pandas as pd\n", - "val = '2018-08-12T19:53:00'\n", - "f = pd.to_datetime(val).to_datetime64()\n", - "print(f)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "3d042d3f", - "metadata": {}, - "outputs": [], - "source": [ - "if 0:\n", - " fname = 'dsC_1_2018_7516_use.nc'\n", - " instr = 'microcat'\n", - " filein = proc_dir + '/' + instr + '/' + fname\n", - " ds2 = xr.open_dataset(filein)\n", - "\n", - " fname = 'dsC_1_2018_7516_raw.nc'\n", - " instr = 'microcat'\n", - " filein = proc_dir + '/' + instr + '/' + fname\n", - " ds1 = xr.open_dataset(filein)" + "# Simple usage\n", + "moorlist = ['dsE_1_2018']\n", + "basedir = '/Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/'\n", + "results = process_multiple_moorings_stage2(moorlist, basedir)" ] } ], diff --git a/oceanarray/generate_test_data.py b/oceanarray/generate_test_data.py new file mode 100644 index 0000000..4b9fadf --- /dev/null +++ b/oceanarray/generate_test_data.py @@ -0,0 +1,107 @@ +#!/usr/bin/env python3 +""" +Generate test_data_raw.nc from the real CNV file for Stage 2 testing. +""" +import yaml +from pathlib import Path +from oceanarray.stage1 import MooringProcessor + + +def create_test_stage1_data(): + """Create test data by running Stage 1 on the real CNV file.""" + + # Set up test directory structure + test_dir = Path("test_data_temp") + raw_dir = test_dir / "moor" / "raw" / "test_deployment" / "microcat" + proc_dir = test_dir / "moor" / "proc" / "test_mooring" + + raw_dir.mkdir(parents=True, exist_ok=True) + proc_dir.mkdir(parents=True, exist_ok=True) + + # Copy the real CNV file + source_cnv = Path("data/test_data.cnv") + dest_cnv = raw_dir / "test_data.cnv" + + if not source_cnv.exists(): + print(f"ERROR: Source CNV file not found at {source_cnv}") + print("Please ensure data/test_data.cnv exists") + return False + + dest_cnv.write_text(source_cnv.read_text()) + print(f"Copied CNV file to {dest_cnv}") + + # Create YAML configuration + yaml_data = { + 'name': 'test_mooring', + 'waterdepth': 1000, + 'longitude': -30.0, + 'latitude': 60.0, + 'deployment_latitude': '60 00.000 N', + 'deployment_longitude': '030 00.000 W', + 'deployment_time': '2018-08-12T08:00:00', # Before data starts + 'recovery_time': '2018-08-26T20:47:24', # After data ends + 'seabed_latitude': '60 00.000 N', + 'seabed_longitude': '030 00.000 W', + 'directory': 'moor/raw/test_deployment/', + 'instruments': [ + { + 'instrument': 'microcat', + 'serial': 7518, + 'depth': 100, + 'filename': 'test_data.cnv', + 'file_type': 'sbe-cnv', + 'clock_offset': 300, # 5 minutes offset for testing + 'start_time': '2018-08-12T08:00:00', + 'end_time': '2018-08-26T20:47:24' + } + ] + } + + config_file = proc_dir / "test_mooring.mooring.yaml" + with open(config_file, 'w') as f: + yaml.dump(yaml_data, f) + + print(f"Created YAML config at {config_file}") + + # Run Stage 1 processing + processor = MooringProcessor(str(test_dir)) + success = processor.process_mooring("test_mooring") + + if success: + # Move the generated file to data/ directory + generated_file = proc_dir / "microcat" / "test_mooring_7518_raw.nc" + target_file = Path("data/test_data_raw.nc") + + if generated_file.exists(): + target_file.write_bytes(generated_file.read_bytes()) + print(f"Successfully created {target_file}") + + # Also copy the YAML for Stage 2 tests + target_yaml = Path("data/test_mooring.yaml") + target_yaml.write_text(config_file.read_text()) + print(f"Copied YAML config to {target_yaml}") + + # Cleanup temp directory + import shutil + shutil.rmtree(test_dir) + print(f"Cleaned up temporary directory") + + return True + else: + print(f"ERROR: Expected output file not found at {generated_file}") + return False + else: + print("ERROR: Stage 1 processing failed") + return False + + +if __name__ == "__main__": + success = create_test_stage1_data() + if success: + print("\nTest data generation completed successfully!") + print("Files created:") + print(" - data/test_data_raw.nc") + print(" - data/test_mooring.yaml") + print("\nYou can now run Stage 2 tests with real data.") + else: + print("\nTest data generation failed.") diff --git a/oceanarray/stage1.py b/oceanarray/stage1.py index 9be1b1d..5802d21 100644 --- a/oceanarray/stage1.py +++ b/oceanarray/stage1.py @@ -7,8 +7,7 @@ from typing import Any, Dict, List, Optional, Tuple import yaml -from ctd_tools.readers import (NortekAsciiReader, - RbrAsciiReader, +from ctd_tools.readers import (NortekAsciiReader, RbrAsciiReader, RbrRskAutoReader, SbeAsciiReader, SbeCnvReader) from ctd_tools.writers import NetCdfWriter @@ -22,9 +21,9 @@ class MooringProcessor: "nortek-aqd": NortekAsciiReader, "sbe-asc": SbeAsciiReader, "rbr-rsk": RbrRskAutoReader, - #"rbr-matlab": RbrMatlabReader, + # "rbr-matlab": RbrMatlabReader, "rbr-dat": RbrAsciiReader, - #"adcp-matlab": AdcpMatlabReader, + # "adcp-matlab": AdcpMatlabReader, } # Variables to remove for specific file types diff --git a/oceanarray/stage2.py b/oceanarray/stage2.py new file mode 100644 index 0000000..6596f6e --- /dev/null +++ b/oceanarray/stage2.py @@ -0,0 +1,331 @@ +""" +Stage 2 processing for mooring data: Apply clock offsets and trim to deployment period. + +This module handles: +- Loading processed Stage 1 NetCDF files +- Applying clock corrections from YAML configuration +- Trimming data to deployment/recovery time windows +- Writing updated NetCDF files with '_use' suffix +""" +import os +import yaml +import numpy as np +import pandas as pd +import xarray as xr +from pathlib import Path +from datetime import datetime +from typing import Dict, List, Optional, Tuple, Any, Union +from ctd_tools.writers import NetCdfWriter + + +class Stage2Processor: + """Handles Stage 2 processing: clock correction and temporal trimming.""" + + def __init__(self, base_dir: str): + """Initialize processor with base directory.""" + self.base_dir = Path(base_dir) + self.log_file = None + + def _setup_logging(self, mooring_name: str, output_path: Path) -> None: + """Set up logging for the processing run.""" + log_time = datetime.now().strftime('%Y%m%dT%H') + self.log_file = output_path / f"{mooring_name}_{log_time}_stage2.mooring.log" + + def _log_print(self, *args, **kwargs) -> None: + """Print to both console and log file.""" + print(*args, **kwargs) + if self.log_file: + with open(self.log_file, 'a') as f: + print(*args, **kwargs, file=f) + + def _load_mooring_config(self, config_path: Path) -> Dict[str, Any]: + """Load mooring configuration from YAML file.""" + with open(config_path, 'r') as f: + return yaml.safe_load(f) + + def _read_yaml_time(self, data: Dict[str, Any], key: str) -> np.datetime64: + """Return datetime64[ns] from YAML dict or NaT if missing/invalid.""" + val = data.get(key, None) + if val is None or (isinstance(val, str) and not val.strip()): + return np.datetime64("NaT", "ns") + try: + return pd.to_datetime(val).to_datetime64() + except Exception: + return np.datetime64("NaT", "ns") + + def _apply_clock_offset(self, dataset: xr.Dataset, clock_offset: float) -> xr.Dataset: + """Apply clock offset correction to dataset time coordinate.""" + if clock_offset == 0: + return dataset + + self._log_print(f"Applying clock offset: {clock_offset} seconds") + + # Work on a copy to avoid modifying the original + result = dataset.copy() + + # Add clock offset as a variable + result['clock_offset'] = clock_offset + result['clock_offset'].attrs['units'] = 's' + + # Apply the correction to time coordinate + result['time'] = result['time'] + np.timedelta64(int(clock_offset), 's') + + return result + + def _trim_to_deployment_window(self, dataset: xr.Dataset, + deploy_time: np.datetime64, + recover_time: np.datetime64) -> xr.Dataset: + """Trim dataset to deployment time window.""" + original_size = len(dataset.time) + + # Apply deployment time trimming + if np.isfinite(deploy_time): + self._log_print(f"Trimming start to deployment time: {deploy_time}") + dataset = dataset.sel(time=slice(deploy_time, None)) + + # Apply recovery time trimming + if np.isfinite(recover_time): + self._log_print(f"Trimming end to recovery time: {recover_time}") + dataset = dataset.sel(time=slice(None, recover_time)) + + final_size = len(dataset.time) + self._log_print(f"Trimmed from {original_size} to {final_size} records") + + if final_size == 0: + self._log_print("WARNING: No data remains after trimming!") + + return dataset + + def _add_missing_metadata(self, dataset: xr.Dataset, + instrument_config: Dict[str, Any]) -> xr.Dataset: + """Add any missing metadata variables to dataset.""" + # Add instrument depth if missing + if 'InstrDepth' not in dataset.variables and 'depth' in instrument_config: + dataset['InstrDepth'] = instrument_config['depth'] + + # Add instrument type if missing + if 'instrument' not in dataset.variables and 'instrument' in instrument_config: + dataset['instrument'] = instrument_config['instrument'] + + # Add serial number if missing + if 'serial_number' not in dataset.variables and 'serial' in instrument_config: + dataset['serial_number'] = instrument_config['serial'] + + return dataset + + def _clean_unnecessary_variables(self, dataset: xr.Dataset) -> xr.Dataset: + """Remove variables that are not needed in the final product.""" + vars_to_remove = ['timeS'] # Add other variables as needed + + for var in vars_to_remove: + if var in dataset.variables: + self._log_print(f"Removing variable: {var}") + dataset = dataset.drop_vars(var) + + return dataset + + def _get_netcdf_writer_params(self) -> Dict[str, Any]: + """Get standard parameters for NetCDF writer.""" + return { + 'optimize': True, + 'drop_derived': False, + 'uint8_vars': [ + "correlation_magnitude", "echo_intensity", "status", "percent_good", + "bt_correlation", "bt_amplitude", "bt_percent_good", + ], + 'float32_vars': [ + "eastward_velocity", "northward_velocity", "upward_velocity", + "temperature", "salinity", "pressure", "pressure_std", "depth", "bt_velocity", + ], + 'chunk_time': 3600, + 'complevel': 5, + 'quantize': 3, + } + + def _process_instrument(self, instrument_config: Dict[str, Any], + mooring_config: Dict[str, Any], + proc_dir: Path, mooring_name: str, + deploy_time: np.datetime64, + recover_time: np.datetime64) -> bool: + """Process a single instrument's Stage 1 output.""" + serial = instrument_config.get('serial', 'unknown') + instrument_type = instrument_config.get('instrument', 'unknown') + + # Construct file paths + raw_filename = f"{mooring_name}_{serial}_raw.nc" + use_filename = f"{mooring_name}_{serial}_use.nc" + + raw_filepath = proc_dir / instrument_type / raw_filename + use_filepath = proc_dir / instrument_type / use_filename + + if not raw_filepath.exists(): + self._log_print(f"WARNING: Raw file not found: {raw_filepath}") + return False + + try: + self._log_print(f"Processing {instrument_type} serial {serial}") + + # Load the raw dataset + with xr.open_dataset(raw_filepath) as ds: + # Create a copy to modify + dataset = ds.load() + + # Add missing metadata + dataset = self._add_missing_metadata(dataset, instrument_config) + + # Clean unnecessary variables + dataset = self._clean_unnecessary_variables(dataset) + + # Apply clock offset + clock_offset = instrument_config.get('clock_offset', 0) + dataset = self._apply_clock_offset(dataset, clock_offset) + + # Trim to deployment window + dataset = self._trim_to_deployment_window(dataset, deploy_time, recover_time) + + if len(dataset.time) == 0: + self._log_print(f"ERROR: No data remains after processing {instrument_type} {serial}") + return False + + # Log time range + start_time = dataset['time'].values.min() + end_time = dataset['time'].values.max() + self._log_print(f"Final time range: {start_time} to {end_time}") + + # Remove existing output file if it exists + if use_filepath.exists(): + use_filepath.unlink() + self._log_print(f"Removed existing file: {use_filepath}") + + # Write the processed dataset + writer = NetCdfWriter(dataset) + writer_params = self._get_netcdf_writer_params() + writer.write(str(use_filepath), **writer_params) + + self._log_print(f"Successfully wrote: {use_filepath}") + return True + + except Exception as e: + self._log_print(f"ERROR processing {instrument_type} {serial}: {e}") + return False + + def process_mooring(self, mooring_name: str, + output_path: Optional[str] = None) -> bool: + """ + Process Stage 2 for a single mooring. + + Args: + mooring_name: Name of the mooring to process + output_path: Optional custom output path + + Returns: + bool: True if processing completed successfully + """ + # Set up paths + if output_path is None: + proc_dir = self.base_dir / 'moor' / 'proc' / mooring_name + else: + proc_dir = Path(output_path) / mooring_name + + if not proc_dir.exists(): + print(f"ERROR: Processing directory not found: {proc_dir}") + return False + + # Set up logging + self._setup_logging(mooring_name, proc_dir) + self._log_print(f"Starting Stage 2 processing for mooring: {mooring_name}") + + # Load configuration + config_file = proc_dir / f"{mooring_name}.mooring.yaml" + if not config_file.exists(): + self._log_print(f"ERROR: Configuration file not found: {config_file}") + return False + + try: + mooring_config = self._load_mooring_config(config_file) + except Exception as e: + self._log_print(f"ERROR: Failed to load configuration: {e}") + return False + + # Extract deployment time window + deploy_time = self._read_yaml_time(mooring_config, "deployment_time") + recover_time = self._read_yaml_time(mooring_config, "recovery_time") + + self._log_print(f"Deployment time: {deploy_time}") + self._log_print(f"Recovery time: {recover_time}") + + # Process each instrument + success_count = 0 + total_count = len(mooring_config.get('instruments', [])) + + for instrument_config in mooring_config.get('instruments', []): + success = self._process_instrument( + instrument_config, mooring_config, proc_dir, + mooring_name, deploy_time, recover_time + ) + if success: + success_count += 1 + + self._log_print(f"Stage 2 completed: {success_count}/{total_count} instruments successful") + return success_count > 0 + + +def stage2_mooring(mooring_name: str, basedir: str, + output_path: Optional[str] = None) -> bool: + """ + Process Stage 2 for a single mooring (backwards compatibility function). + + Args: + mooring_name: Name of the mooring to process + basedir: Base directory containing the data + output_path: Optional output path override + + Returns: + bool: True if processing completed successfully + """ + processor = Stage2Processor(basedir) + return processor.process_mooring(mooring_name, output_path) + + +def process_multiple_moorings_stage2(mooring_list: List[str], + basedir: str) -> Dict[str, bool]: + """ + Process Stage 2 for multiple moorings. + + Args: + mooring_list: List of mooring names to process + basedir: Base directory containing the data + + Returns: + Dict mapping mooring names to success status + """ + processor = Stage2Processor(basedir) + results = {} + + for mooring_name in mooring_list: + print(f"\n{'='*50}") + print(f"Processing Stage 2 for mooring {mooring_name}") + print(f"{'='*50}") + + results[mooring_name] = processor.process_mooring(mooring_name) + + return results + + +# Example usage +if __name__ == "__main__": + # Your mooring list + moorlist = ['dsE_1_2018'] + + basedir = '/Users/eddifying/Dropbox/data/ifmro_mixsed/ds_data_eleanor/' + + # Process all moorings + results = process_multiple_moorings_stage2(moorlist, basedir) + + # Print summary + print(f"\n{'='*50}") + print("STAGE 2 PROCESSING SUMMARY") + print(f"{'='*50}") + for mooring, success in results.items(): + status = "SUCCESS" if success else "FAILED" + print(f"{mooring}: {status}") diff --git a/tests/test_stage1.py b/tests/test_stage1.py index 27ef347..4c07ff5 100644 --- a/tests/test_stage1.py +++ b/tests/test_stage1.py @@ -3,16 +3,17 @@ Tests use real data files for reliable integration testing. """ -import pytest + import tempfile -import yaml -import numpy as np -import xarray as xr from pathlib import Path from unittest.mock import Mock, patch -from datetime import datetime -from oceanarray.stage1 import MooringProcessor, stage1_mooring, process_multiple_moorings +import pytest +import xarray as xr +import yaml + +from oceanarray.stage1 import (MooringProcessor, process_multiple_moorings, + stage1_mooring) class TestMooringProcessor: @@ -33,29 +34,29 @@ def processor(self, temp_dir): def sample_yaml_data(self): """Sample YAML configuration data for SBE CNV file.""" return { - 'name': 'test_mooring', - 'waterdepth': 1000, - 'longitude': -30.0, - 'latitude': 60.0, - 'deployment_latitude': '60 00.000 N', - 'deployment_longitude': '030 00.000 W', - 'deployment_time': '2018-08-12T08:00:00', - 'recovery_time': '2018-08-26T20:47:24', - 'seabed_latitude': '60 00.000 N', - 'seabed_longitude': '030 00.000 W', - 'directory': 'moor/raw/test_deployment/', - 'instruments': [ + "name": "test_mooring", + "waterdepth": 1000, + "longitude": -30.0, + "latitude": 60.0, + "deployment_latitude": "60 00.000 N", + "deployment_longitude": "030 00.000 W", + "deployment_time": "2018-08-12T08:00:00", + "recovery_time": "2018-08-26T20:47:24", + "seabed_latitude": "60 00.000 N", + "seabed_longitude": "030 00.000 W", + "directory": "moor/raw/test_deployment/", + "instruments": [ { - 'instrument': 'microcat', - 'serial': 7518, - 'depth': 100, - 'filename': 'test_data.cnv', - 'file_type': 'sbe-cnv', - 'clock_offset': 0, - 'start_time': '2018-08-12T08:00:00', - 'end_time': '2018-08-26T20:47:24' + "instrument": "microcat", + "serial": 7518, + "depth": 100, + "filename": "test_data.cnv", + "file_type": "sbe-cnv", + "clock_offset": 0, + "start_time": "2018-08-12T08:00:00", + "end_time": "2018-08-26T20:47:24", } - ] + ], } def test_init(self, temp_dir): @@ -68,9 +69,7 @@ def test_reader_map_completeness(self): """Test that READER_MAP contains expected file types that are available in PyPI ctd_tools.""" processor = MooringProcessor("/tmp") # Only test for readers that are available in the PyPI version - expected_types = [ - 'sbe-cnv', 'nortek-aqd', 'sbe-asc', 'rbr-rsk', 'rbr-dat' - ] + expected_types = ["sbe-cnv", "nortek-aqd", "sbe-asc", "rbr-rsk", "rbr-dat"] for file_type in expected_types: assert file_type in processor.READER_MAP @@ -107,7 +106,7 @@ def test_log_print(self, processor, temp_dir): def test_load_mooring_config(self, processor, temp_dir, sample_yaml_data): """Test loading YAML configuration.""" config_file = temp_dir / "test_config.yaml" - with open(config_file, 'w') as f: + with open(config_file, "w") as f: yaml.dump(sample_yaml_data, f) loaded_data = processor._load_mooring_config(config_file) @@ -123,13 +122,15 @@ def test_find_file_tag(self, processor): def test_generate_output_filename(self, processor, temp_dir): """Test output filename generation.""" instrument_config = { - 'file_type': 'sbe-cnv', - 'filename': 'test.cnv', - 'serial': 7518 + "file_type": "sbe-cnv", + "filename": "test.cnv", + "serial": 7518, } output_dir = temp_dir / "output" - filename = processor._generate_output_filename("test_mooring", instrument_config, output_dir) + filename = processor._generate_output_filename( + "test_mooring", instrument_config, output_dir + ) expected = output_dir / "test_mooring_7518_raw.nc" assert filename == expected @@ -138,11 +139,11 @@ def test_get_netcdf_writer_params(self, processor): params = processor._get_netcdf_writer_params() assert isinstance(params, dict) - assert 'optimize' in params - assert 'uint8_vars' in params - assert 'float32_vars' in params - assert params['chunk_time'] == 3600 - assert params['complevel'] == 5 + assert "optimize" in params + assert "uint8_vars" in params + assert "float32_vars" in params + assert params["chunk_time"] == 3600 + assert params["complevel"] == 5 class TestRealDataProcessing: @@ -170,48 +171,48 @@ def test_data_setup(self, tmp_path): # Create YAML config yaml_data = { - 'name': 'test_mooring', - 'waterdepth': 1000, - 'longitude': -30.0, - 'latitude': 60.0, - 'deployment_latitude': '60 00.000 N', - 'deployment_longitude': '030 00.000 W', - 'deployment_time': '2018-08-12T08:00:00', - 'recovery_time': '2018-08-26T20:47:24', - 'seabed_latitude': '60 00.000 N', - 'seabed_longitude': '030 00.000 W', - 'directory': 'moor/raw/test_deployment/', - 'instruments': [ + "name": "test_mooring", + "waterdepth": 1000, + "longitude": -30.0, + "latitude": 60.0, + "deployment_latitude": "60 00.000 N", + "deployment_longitude": "030 00.000 W", + "deployment_time": "2018-08-12T08:00:00", + "recovery_time": "2018-08-26T20:47:24", + "seabed_latitude": "60 00.000 N", + "seabed_longitude": "030 00.000 W", + "directory": "moor/raw/test_deployment/", + "instruments": [ { - 'instrument': 'microcat', - 'serial': 7518, - 'depth': 100, - 'filename': 'test_data.cnv', - 'file_type': 'sbe-cnv', - 'clock_offset': 0, - 'start_time': '2018-08-12T08:00:00', - 'end_time': '2018-08-26T20:47:24' + "instrument": "microcat", + "serial": 7518, + "depth": 100, + "filename": "test_data.cnv", + "file_type": "sbe-cnv", + "clock_offset": 0, + "start_time": "2018-08-12T08:00:00", + "end_time": "2018-08-26T20:47:24", } - ] + ], } config_file = proc_dir / "test_mooring.mooring.yaml" - with open(config_file, 'w') as f: + with open(config_file, "w") as f: yaml.dump(yaml_data, f) return { - 'base_dir': base_dir, - 'raw_dir': raw_dir, - 'proc_dir': proc_dir, - 'config_file': config_file, - 'data_file': test_data_dest, - 'yaml_data': yaml_data + "base_dir": base_dir, + "raw_dir": raw_dir, + "proc_dir": proc_dir, + "config_file": config_file, + "data_file": test_data_dest, + "yaml_data": yaml_data, } def test_process_real_sbe_file(self, test_data_setup): """Test processing with real SBE CNV file.""" setup = test_data_setup - processor = MooringProcessor(str(setup['base_dir'])) + processor = MooringProcessor(str(setup["base_dir"])) # Process the mooring result = processor.process_mooring("test_mooring") @@ -220,22 +221,22 @@ def test_process_real_sbe_file(self, test_data_setup): assert result is True # Check that output file was created - output_file = setup['proc_dir'] / "microcat" / "test_mooring_7518_raw.nc" + output_file = setup["proc_dir"] / "microcat" / "test_mooring_7518_raw.nc" assert output_file.exists() # Check that we can open and validate the NetCDF file with xr.open_dataset(output_file) as ds: # Check basic structure - assert 'temperature' in ds.data_vars - assert 'pressure' in ds.data_vars - assert 'salinity' in ds.data_vars - assert 'time' in ds.coords + assert "temperature" in ds.data_vars + assert "pressure" in ds.data_vars + assert "salinity" in ds.data_vars + assert "time" in ds.coords # Check metadata - assert ds.attrs['mooring_name'] == 'test_mooring' - assert ds['serial_number'].values == 7518 - assert ds['instrument'].values == 'microcat' - assert ds['InstrDepth'].values == 100 + assert ds.attrs["mooring_name"] == "test_mooring" + assert ds["serial_number"].values == 7518 + assert ds["instrument"].values == "microcat" + assert ds["InstrDepth"].values == 100 # Check data ranges are reasonable assert len(ds.time) == 151 # Should match the test file @@ -249,16 +250,16 @@ def test_process_missing_file(self, test_data_setup): setup = test_data_setup # Remove the data file - setup['data_file'].unlink() + setup["data_file"].unlink() - processor = MooringProcessor(str(setup['base_dir'])) + processor = MooringProcessor(str(setup["base_dir"])) result = processor.process_mooring("test_mooring") # Should fail gracefully assert result is False # Check log file contains error message - log_files = list(setup['proc_dir'].glob("*_stage1.mooring.log")) + log_files = list(setup["proc_dir"].glob("*_stage1.mooring.log")) assert len(log_files) == 1 log_content = log_files[0].read_text() assert "Error reading file" in log_content @@ -266,7 +267,7 @@ def test_process_missing_file(self, test_data_setup): def test_process_existing_output(self, test_data_setup): """Test processing when output file already exists.""" setup = test_data_setup - processor = MooringProcessor(str(setup['base_dir'])) + processor = MooringProcessor(str(setup["base_dir"])) # First processing run result1 = processor.process_mooring("test_mooring") @@ -277,7 +278,7 @@ def test_process_existing_output(self, test_data_setup): assert result2 is True # Check log mentions skipping - log_files = list(setup['proc_dir'].glob("*_stage1.mooring.log")) + log_files = list(setup["proc_dir"].glob("*_stage1.mooring.log")) log_content = log_files[-1].read_text() # Get the latest log assert "OUTFILE EXISTS" in log_content @@ -296,7 +297,7 @@ def test_process_missing_config(self, tmp_path): class TestConvenienceFunctions: """Test convenience functions.""" - @patch('oceanarray.stage1.MooringProcessor') + @patch("oceanarray.stage1.MooringProcessor") def test_stage1_mooring(self, mock_processor_class): """Test backwards compatibility function.""" mock_processor = Mock() @@ -309,7 +310,7 @@ def test_stage1_mooring(self, mock_processor_class): mock_processor_class.assert_called_once_with("/test/dir") mock_processor.process_mooring.assert_called_once_with("test_mooring", None) - @patch('oceanarray.stage1.MooringProcessor') + @patch("oceanarray.stage1.MooringProcessor") def test_process_multiple_moorings(self, mock_processor_class): """Test batch processing function.""" mock_processor = Mock() @@ -319,11 +320,7 @@ def test_process_multiple_moorings(self, mock_processor_class): moorings = ["mooring1", "mooring2", "mooring3"] results = process_multiple_moorings(moorings, "/test/dir") - expected = { - "mooring1": True, - "mooring2": False, - "mooring3": True - } + expected = {"mooring1": True, "mooring2": False, "mooring3": True} assert results == expected assert mock_processor.process_mooring.call_count == 3 diff --git a/tests/test_stage2.py b/tests/test_stage2.py new file mode 100644 index 0000000..f42716d --- /dev/null +++ b/tests/test_stage2.py @@ -0,0 +1,493 @@ +""" +Tests for oceanarray.stage2 module. + +Tests use real data files generated from Stage 1 processing. + +Version: 2.0 - Fixed clock offset tests with proper array comparison and immutability testing +Last updated: 2025-01-09 +Changes: +- Fixed clock offset tests using np.testing.assert_array_equal +- Added verification that original datasets are not modified +- Fixed date ranges in trimming tests to match actual data +""" +import pytest +import tempfile +import yaml +import numpy as np +import pandas as pd +import xarray as xr +from pathlib import Path +from unittest.mock import Mock, patch +from datetime import datetime, timedelta + +from oceanarray.stage2 import Stage2Processor, stage2_mooring, process_multiple_moorings_stage2 + + +class TestStage2Processor: + """Test cases for Stage2Processor class.""" + + @pytest.fixture + def temp_dir(self): + """Create a temporary directory for testing.""" + with tempfile.TemporaryDirectory() as tmpdir: + yield Path(tmpdir) + + @pytest.fixture + def processor(self, temp_dir): + """Create a Stage2Processor instance for testing.""" + return Stage2Processor(str(temp_dir)) + + @pytest.fixture + def sample_yaml_data(self): + """Sample YAML configuration data for Stage 2.""" + return { + 'name': 'test_mooring', + 'waterdepth': 1000, + 'longitude': -30.0, + 'latitude': 60.0, + 'deployment_time': '2018-08-12T09:00:00', + 'recovery_time': '2018-08-26T19:00:00', + 'instruments': [ + { + 'instrument': 'microcat', + 'serial': 7518, + 'depth': 100, + 'clock_offset': 300 # 5 minutes + } + ] + } + + @pytest.fixture + def sample_raw_dataset(self): + """Create a sample raw dataset for testing.""" + # Create time series with some data before/after deployment window + start_time = pd.to_datetime('2018-08-12T08:00:00') + end_time = pd.to_datetime('2018-08-26T20:00:00') + time_range = pd.date_range(start_time, end_time, freq='10min') + + data = { + 'temperature': (['time'], np.random.random(len(time_range)) + 20), + 'salinity': (['time'], np.random.random(len(time_range)) + 35), + 'pressure': (['time'], np.random.random(len(time_range)) + 100), + 'timeS': (['time'], np.arange(len(time_range))), # To be removed + } + + ds = xr.Dataset(data, coords={'time': time_range}) + + # Add some metadata + ds.attrs['mooring_name'] = 'test_mooring' + ds['serial_number'] = 7518 + ds['instrument'] = 'microcat' + ds['InstrDepth'] = 100 + + return ds + + def test_init(self, temp_dir): + """Test Stage2Processor initialization.""" + processor = Stage2Processor(str(temp_dir)) + assert processor.base_dir == temp_dir + assert processor.log_file is None + + def test_setup_logging(self, processor, temp_dir): + """Test logging setup.""" + mooring_name = "test_mooring" + output_path = temp_dir / "output" + output_path.mkdir() + + processor._setup_logging(mooring_name, output_path) + + assert processor.log_file is not None + assert processor.log_file.parent == output_path + assert mooring_name in processor.log_file.name + assert "stage2.mooring.log" in processor.log_file.name + + def test_read_yaml_time_valid(self, processor): + """Test reading valid time from YAML.""" + data = {'deployment_time': '2018-08-12T09:00:00'} + result = processor._read_yaml_time(data, 'deployment_time') + expected = pd.to_datetime('2018-08-12T09:00:00').to_datetime64() + assert result == expected + + def test_read_yaml_time_missing(self, processor): + """Test reading missing time from YAML.""" + data = {} + result = processor._read_yaml_time(data, 'deployment_time') + assert pd.isna(result) + + def test_read_yaml_time_empty_string(self, processor): + """Test reading empty string time from YAML.""" + data = {'deployment_time': ''} + result = processor._read_yaml_time(data, 'deployment_time') + assert pd.isna(result) + + def test_read_yaml_time_invalid(self, processor): + """Test reading invalid time from YAML.""" + data = {'deployment_time': 'not-a-date'} + result = processor._read_yaml_time(data, 'deployment_time') + assert pd.isna(result) + + def test_apply_clock_offset_zero(self, processor, sample_raw_dataset): + """Test applying zero clock offset - Version 2.0 with proper array testing.""" + original_time = sample_raw_dataset.time.copy() + result = processor._apply_clock_offset(sample_raw_dataset, 0) + + # Time should be unchanged + np.testing.assert_array_equal(result.time.values, original_time.values) + + # Verify original dataset was not modified + np.testing.assert_array_equal(sample_raw_dataset.time.values, original_time.values) + + def test_apply_clock_offset_positive(self, processor, sample_raw_dataset): + """Test applying positive clock offset - Version 2.0 with immutability testing.""" + offset_seconds = 300 # 5 minutes + original_time = sample_raw_dataset.time.copy() + result = processor._apply_clock_offset(sample_raw_dataset, offset_seconds) + + # Check that clock_offset variable was added + assert 'clock_offset' in result.variables + assert result['clock_offset'].values == offset_seconds + assert result['clock_offset'].attrs['units'] == 's' + + # Check that time was shifted forward + expected_time = original_time + np.timedelta64(offset_seconds, 's') + np.testing.assert_array_equal(result.time.values, expected_time.values) + + # Verify original dataset was not modified + np.testing.assert_array_equal(sample_raw_dataset.time.values, original_time.values) + + def test_apply_clock_offset_negative(self, processor, sample_raw_dataset): + """Test applying negative clock offset - Version 2.0 with immutability testing.""" + offset_seconds = -600 # -10 minutes + original_time = sample_raw_dataset.time.copy() + result = processor._apply_clock_offset(sample_raw_dataset, offset_seconds) + + # Check that time was shifted backward + expected_time = original_time + np.timedelta64(offset_seconds, 's') + np.testing.assert_array_equal(result.time.values, expected_time.values) + + # Verify original dataset was not modified + np.testing.assert_array_equal(sample_raw_dataset.time.values, original_time.values) + + def test_trim_to_deployment_window_both_bounds(self, processor, sample_raw_dataset): + """Test trimming with both deployment and recovery times.""" + deploy_time = np.datetime64('2018-08-12T10:00:00') + recover_time = np.datetime64('2018-08-26T18:00:00') + + result = processor._trim_to_deployment_window( + sample_raw_dataset, deploy_time, recover_time + ) + + # Check that data is trimmed correctly + assert result.time.min() >= deploy_time + assert result.time.max() <= recover_time + assert len(result.time) < len(sample_raw_dataset.time) + + def test_trim_to_deployment_window_deploy_only(self, processor, sample_raw_dataset): + """Test trimming with only deployment time.""" + deploy_time = np.datetime64('2018-08-12T10:00:00') + recover_time = np.datetime64('NaT') + + result = processor._trim_to_deployment_window( + sample_raw_dataset, deploy_time, recover_time + ) + + # Check that only start is trimmed + assert result.time.min() >= deploy_time + assert result.time.max() == sample_raw_dataset.time.max() + + def test_trim_to_deployment_window_recover_only(self, processor, sample_raw_dataset): + """Test trimming with only recovery time.""" + deploy_time = np.datetime64('NaT') + recover_time = np.datetime64('2018-08-26T18:00:00') + + result = processor._trim_to_deployment_window( + sample_raw_dataset, deploy_time, recover_time + ) + + # Check that only end is trimmed + assert result.time.min() == sample_raw_dataset.time.min() + assert result.time.max() <= recover_time + + def test_trim_to_deployment_window_no_trimming(self, processor, sample_raw_dataset): + """Test trimming with no valid times.""" + deploy_time = np.datetime64('NaT') + recover_time = np.datetime64('NaT') + + result = processor._trim_to_deployment_window( + sample_raw_dataset, deploy_time, recover_time + ) + + # Check that nothing is trimmed + assert len(result.time) == len(sample_raw_dataset.time) + + def test_trim_to_deployment_window_empty_result(self, processor, sample_raw_dataset): + """Test trimming that results in empty dataset.""" + # Use times outside the data range + deploy_time = np.datetime64('2019-01-01T00:00:00') + recover_time = np.datetime64('2019-01-02T00:00:00') + + result = processor._trim_to_deployment_window( + sample_raw_dataset, deploy_time, recover_time + ) + + # Should result in empty dataset + assert len(result.time) == 0 + + def test_add_missing_metadata(self, processor, sample_raw_dataset): + """Test adding missing metadata variables.""" + # Remove some metadata to test adding it back + ds = sample_raw_dataset.copy() + ds = ds.drop_vars(['InstrDepth', 'instrument', 'serial_number']) + + instrument_config = { + 'depth': 150, + 'instrument': 'new_microcat', + 'serial': 9999 + } + + result = processor._add_missing_metadata(ds, instrument_config) + + assert result['InstrDepth'].values == 150 + assert result['instrument'].values == 'new_microcat' + assert result['serial_number'].values == 9999 + + def test_add_missing_metadata_no_overwrite(self, processor, sample_raw_dataset): + """Test that existing metadata is not overwritten.""" + instrument_config = { + 'depth': 999, + 'instrument': 'different_instrument', + 'serial': 8888 + } + + result = processor._add_missing_metadata(sample_raw_dataset, instrument_config) + + # Should keep original values + assert result['InstrDepth'].values == 100 + assert result['instrument'].values == 'microcat' + assert result['serial_number'].values == 7518 + + def test_clean_unnecessary_variables(self, processor, sample_raw_dataset): + """Test removal of unnecessary variables.""" + result = processor._clean_unnecessary_variables(sample_raw_dataset) + + # timeS should be removed + assert 'timeS' not in result.variables + # Other variables should remain + assert 'temperature' in result.variables + assert 'salinity' in result.variables + assert 'pressure' in result.variables + + +class TestRealDataProcessing: + """Integration tests using real data files - Version 2.0 with fixed date ranges.""" + + @pytest.fixture + def test_data_setup(self, tmp_path): + """Set up test environment with real processed data.""" + # Check for real test data files + raw_data_file = Path("data/test_data_raw.nc") + yaml_config_file = Path("data/test_mooring.yaml") + + if not raw_data_file.exists() or not yaml_config_file.exists(): + pytest.skip("Real test data files not found. Run generate_test_data.py first.") + + # Set up test directory structure + base_dir = tmp_path / "test_data" + proc_dir = base_dir / "moor" / "proc" / "test_mooring" + microcat_dir = proc_dir / "microcat" + microcat_dir.mkdir(parents=True) + + # Copy real files to test location + test_raw_file = microcat_dir / "test_mooring_7518_raw.nc" + test_yaml_file = proc_dir / "test_mooring.mooring.yaml" + + test_raw_file.write_bytes(raw_data_file.read_bytes()) + test_yaml_file.write_text(yaml_config_file.read_text()) + + return { + 'base_dir': base_dir, + 'proc_dir': proc_dir, + 'raw_file': test_raw_file, + 'yaml_file': test_yaml_file + } + + def test_process_real_data_full_workflow(self, test_data_setup): + """Test complete Stage 2 processing with real data - Version 2.0.""" + setup = test_data_setup + processor = Stage2Processor(str(setup['base_dir'])) + + # Process the mooring + result = processor.process_mooring("test_mooring") + + # Check that processing succeeded + assert result is True + + # Check that output file was created + use_file = setup['proc_dir'] / "microcat" / "test_mooring_7518_use.nc" + assert use_file.exists() + + # Load and validate the processed file + with xr.open_dataset(use_file) as ds: + # Check basic structure + assert 'temperature' in ds.data_vars + assert 'pressure' in ds.data_vars + assert 'salinity' in ds.data_vars + assert 'time' in ds.coords + + # Check that clock offset was applied + assert 'clock_offset' in ds.variables + assert ds['clock_offset'].values == 300 # 5 minutes from config + + # Check that timeS was removed + assert 'timeS' not in ds.variables + + # Verify metadata is present + assert ds['serial_number'].values == 7518 + assert ds['instrument'].values == 'microcat' + + # Check that data was trimmed (should be less than original) + with xr.open_dataset(setup['raw_file']) as raw_ds: + assert len(ds.time) <= len(raw_ds.time) + + # Time should be shifted due to clock offset + if len(ds.time) > 0 and len(raw_ds.time) > 0: + # First time point should be shifted by clock offset + time_diff = ds.time[0].values - raw_ds.time[0].values + expected_diff = np.timedelta64(300, 's') # 5 minutes + assert abs(time_diff - expected_diff) < np.timedelta64(1, 's') + + def test_process_with_modified_times(self, test_data_setup): + """Test processing with modified deployment/recovery times - Version 2.0 with correct dates.""" + setup = test_data_setup + + # Load and modify the YAML config + with open(setup['yaml_file'], 'r') as f: + config = yaml.safe_load(f) + + # First check what time range we actually have in the data + with xr.open_dataset(setup['raw_file']) as raw_ds: + data_start = raw_ds.time.min().values + data_end = raw_ds.time.max().values + print(f"Raw data time range: {data_start} to {data_end}") + + # Set a restrictive time window within the actual data range + # Data is on 2018-08-13, so use the correct date + config['deployment_time'] = '2018-08-13T08:05:00' # 5 minutes after data start + config['recovery_time'] = '2018-08-13T08:15:00' # 10 minute window + + # Write modified config + with open(setup['yaml_file'], 'w') as f: + yaml.dump(config, f) + + processor = Stage2Processor(str(setup['base_dir'])) + result = processor.process_mooring("test_mooring") + + assert result is True + + # Check that the processed file has limited data + use_file = setup['proc_dir'] / "microcat" / "test_mooring_7518_use.nc" + with xr.open_dataset(use_file) as ds: + # Should have much less data due to restrictive time window + with xr.open_dataset(setup['raw_file']) as raw_ds: + assert len(ds.time) < len(raw_ds.time), f"Expected trimmed data, got {len(ds.time)} vs {len(raw_ds.time)}" + + # All data should be within the specified window (accounting for clock offset) + deploy_time = pd.to_datetime('2018-08-13T08:05:00') + recover_time = pd.to_datetime('2018-08-13T08:15:00') + + assert ds.time.min() >= np.datetime64(deploy_time) + assert ds.time.max() <= np.datetime64(recover_time) + + def test_process_missing_raw_file(self, test_data_setup): + """Test processing when raw file is missing.""" + setup = test_data_setup + + # Remove the raw file + setup['raw_file'].unlink() + + processor = Stage2Processor(str(setup['base_dir'])) + result = processor.process_mooring("test_mooring") + + # Should fail gracefully + assert result is False + + # Check log file contains warning + log_files = list(setup['proc_dir'].glob("*_stage2.mooring.log")) + assert len(log_files) == 1 + log_content = log_files[0].read_text() + assert "Raw file not found" in log_content + + def test_process_missing_config(self, tmp_path): + """Test processing with missing config file.""" + base_dir = tmp_path / "test_data" + proc_dir = base_dir / "moor" / "proc" / "test_mooring" + proc_dir.mkdir(parents=True) + + processor = Stage2Processor(str(base_dir)) + result = processor.process_mooring("test_mooring") + + assert result is False + + +class TestConvenienceFunctions: + """Test convenience functions - Version 2.0.""" + + @patch('oceanarray.stage2.Stage2Processor') + def test_stage2_mooring(self, mock_processor_class): + """Test backwards compatibility function.""" + mock_processor = Mock() + mock_processor.process_mooring.return_value = True + mock_processor_class.return_value = mock_processor + + result = stage2_mooring("test_mooring", "/test/dir") + + assert result is True + mock_processor_class.assert_called_once_with("/test/dir") + mock_processor.process_mooring.assert_called_once_with("test_mooring", None) + + @patch('oceanarray.stage2.Stage2Processor') + def test_process_multiple_moorings_stage2(self, mock_processor_class): + """Test batch processing function.""" + mock_processor = Mock() + mock_processor.process_mooring.side_effect = [True, False, True] + mock_processor_class.return_value = mock_processor + + moorings = ["mooring1", "mooring2", "mooring3"] + results = process_multiple_moorings_stage2(moorings, "/test/dir") + + expected = { + "mooring1": True, + "mooring2": False, + "mooring3": True + } + assert results == expected + assert mock_processor.process_mooring.call_count == 3 + + +class TestErrorHandling: + """Test error handling scenarios - Version 2.0.""" + + def test_invalid_yaml_file(self, tmp_path): + """Test handling of invalid YAML file.""" + processor = Stage2Processor(str(tmp_path)) + + # Create invalid YAML file + proc_dir = tmp_path / "moor" / "proc" / "test_mooring" + proc_dir.mkdir(parents=True) + + invalid_yaml = proc_dir / "test_mooring.mooring.yaml" + invalid_yaml.write_text("invalid: yaml: content: [") + + result = processor.process_mooring("test_mooring") + assert result is False + + def test_processing_directory_not_found(self, tmp_path): + """Test handling when processing directory doesn't exist.""" + processor = Stage2Processor(str(tmp_path)) + result = processor.process_mooring("nonexistent_mooring") + assert result is False + + +if __name__ == "__main__": + # Version 2.0 - Updated test runner + pytest.main([__file__, "-v", "--tb=short"])