From 4013a809d858d2a3d44c33daa8996f14771bd641 Mon Sep 17 00:00:00 2001 From: Ian Langmore Date: Sun, 3 Aug 2025 03:09:38 +0000 Subject: [PATCH 1/8] Added test demonstrating append --- .gitignore | 5 ++++ xarray_beam/_src/zarr.py | 3 +++ xarray_beam/_src/zarr_test.py | 45 +++++++++++++++++++++++++++++++++++ 3 files changed, 53 insertions(+) diff --git a/.gitignore b/.gitignore index 45aaaab..4cb101e 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,8 @@ docs/_build docs/_autosummary docs/*.zarr __pycache__ +*.swp + +.envrc +.venv* +.direnv diff --git a/xarray_beam/_src/zarr.py b/xarray_beam/_src/zarr.py index dd97440..14e988c 100644 --- a/xarray_beam/_src/zarr.py +++ b/xarray_beam/_src/zarr.py @@ -461,6 +461,7 @@ def __init__( *, num_threads: Optional[int] = None, needs_setup: bool = True, + append_dim: Optional[str] = None, ): # pyformat: disable """Initialize ChunksToZarr. @@ -495,6 +496,8 @@ def __init__( useful for Datasets with a small number of variables. needs_setup: if False, then the Zarr store is already setup and does not need to be set up as part of this PTransform. + append_dim: If provided, chunks are appended along `append_dim` to an + existing store. """ # pyformat: enable if isinstance(template, xarray.Dataset): diff --git a/xarray_beam/_src/zarr_test.py b/xarray_beam/_src/zarr_test.py index fba4abc..1f83fa8 100644 --- a/xarray_beam/_src/zarr_test.py +++ b/xarray_beam/_src/zarr_test.py @@ -253,6 +253,51 @@ def test_chunks_to_zarr(self): ): inputs2 | xbeam.ChunksToZarr(temp_dir, template) + def test_chunks_to_zarr_append(self): + dataset = xarray.Dataset( + {'foo': (('t', 'x'), np.arange(3 * 5).reshape(3, 5))}, + coords={ + 't': np.arange(100, 103), + 'x': np.arange(5), + }, + ) + chunked = dataset.chunk(t=1) + + # Write the first two chunks. + zarr_chunks = {'t': 1, 'x': 5} + two_chunk_template = xbeam.make_template(dataset.isel(t=slice(2))) + first_two_chunks = [ + (xbeam.Key({'t': 0}), dataset.isel(t=[0])), + (xbeam.Key({'t': 1}), dataset.isel(t=[1])), + ] + path = self.create_tempdir().full_path + first_two_chunks | xbeam.ChunksToZarr( + path, template=two_chunk_template, zarr_chunks=zarr_chunks + ) + two_chunk_result = xarray.open_zarr(path, consolidated=True) + xarray.testing.assert_identical(dataset.isel(t=slice(2)), two_chunk_result) + + # Now append the last chunk. + # First modify the metadata + chunked.isel(t=[2]).to_zarr(path, mode='a', append_dim='t', compute=False) + + # Then get the full template. Opening the dataset is an easy way to get it. + xbeam_opened_result, chunks = xbeam.open_zarr(path) + full_template = xbeam.make_template(xbeam_opened_result) + self.assertEqual(chunks, zarr_chunks) + + last_chunk = [ + (xbeam.Key({'t': 2}), dataset.isel(t=[2])), + ] + last_chunk | xbeam.ChunksToZarr( + path, + template=full_template, + zarr_chunks=chunks, + needs_setup=False, + ) + full_result = xarray.open_zarr(path, consolidated=True) + xarray.testing.assert_identical(dataset, full_result) + def test_multiple_vars_chunks_to_zarr(self): dataset = xarray.Dataset( { From b955e4cc0adf61b9c185d029dadb612de518b39c Mon Sep 17 00:00:00 2001 From: Ian Langmore Date: Sun, 3 Aug 2025 03:15:48 +0000 Subject: [PATCH 2/8] Reverting changes to zarr.py --- xarray_beam/_src/zarr.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/xarray_beam/_src/zarr.py b/xarray_beam/_src/zarr.py index 14e988c..dd97440 100644 --- a/xarray_beam/_src/zarr.py +++ b/xarray_beam/_src/zarr.py @@ -461,7 +461,6 @@ def __init__( *, num_threads: Optional[int] = None, needs_setup: bool = True, - append_dim: Optional[str] = None, ): # pyformat: disable """Initialize ChunksToZarr. @@ -496,8 +495,6 @@ def __init__( useful for Datasets with a small number of variables. needs_setup: if False, then the Zarr store is already setup and does not need to be set up as part of this PTransform. - append_dim: If provided, chunks are appended along `append_dim` to an - existing store. """ # pyformat: enable if isinstance(template, xarray.Dataset): From 1d2cccdc9a8228e217a9458a8095ff74c18f054b Mon Sep 17 00:00:00 2001 From: Ian Langmore Date: Mon, 4 Aug 2025 00:38:35 +0000 Subject: [PATCH 3/8] Added different modes to test --- xarray_beam/_src/zarr_test.py | 33 +++++++++++++++++++++++++-------- 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/xarray_beam/_src/zarr_test.py b/xarray_beam/_src/zarr_test.py index 1f83fa8..9e3c071 100644 --- a/xarray_beam/_src/zarr_test.py +++ b/xarray_beam/_src/zarr_test.py @@ -253,7 +253,11 @@ def test_chunks_to_zarr(self): ): inputs2 | xbeam.ChunksToZarr(temp_dir, template) - def test_chunks_to_zarr_append(self): + @parameterized.named_parameters( + dict(testcase_name='append', mode='a'), + dict(testcase_name='overwrite', mode='w'), + ) + def test_chunks_to_zarr_append(self, mode): dataset = xarray.Dataset( {'foo': (('t', 'x'), np.arange(3 * 5).reshape(3, 5))}, coords={ @@ -261,7 +265,6 @@ def test_chunks_to_zarr_append(self): 'x': np.arange(5), }, ) - chunked = dataset.chunk(t=1) # Write the first two chunks. zarr_chunks = {'t': 1, 'x': 5} @@ -278,25 +281,39 @@ def test_chunks_to_zarr_append(self): xarray.testing.assert_identical(dataset.isel(t=slice(2)), two_chunk_result) # Now append the last chunk. + # First modify the metadata - chunked.isel(t=[2]).to_zarr(path, mode='a', append_dim='t', compute=False) + if mode == 'a': + # Append the new data (t=[2]) to the existing metadata. + # This results in t/.zarray that has chunk:2, equal to the number of times + # in the first write. + dataset.isel(t=[2]).chunk(zarr_chunks).to_zarr( + path, mode='a', append_dim='t', compute=False + ) + elif mode == 'w': + # Overwrite all metadata. + # This results in t/.zarray that has chunk:3, equal to the number of times + # in the total dataset. + dataset.chunk(zarr_chunks).to_zarr(path, mode='w', compute=False) - # Then get the full template. Opening the dataset is an easy way to get it. + # Second, get full template. Opening the dataset is an easy way to get it. xbeam_opened_result, chunks = xbeam.open_zarr(path) full_template = xbeam.make_template(xbeam_opened_result) - self.assertEqual(chunks, zarr_chunks) + # Third, write the last chunk only. last_chunk = [ (xbeam.Key({'t': 2}), dataset.isel(t=[2])), ] last_chunk | xbeam.ChunksToZarr( path, template=full_template, - zarr_chunks=chunks, + zarr_chunks=zarr_chunks, needs_setup=False, ) - full_result = xarray.open_zarr(path, consolidated=True) - xarray.testing.assert_identical(dataset, full_result) + + final_result, final_chunks = xbeam.open_zarr(path, consolidated=True) + xarray.testing.assert_identical(dataset, final_result) + self.assertEqual(zarr_chunks, final_chunks) def test_multiple_vars_chunks_to_zarr(self): dataset = xarray.Dataset( From 659c96acb8b4ac789469def5af4482a3fc609076 Mon Sep 17 00:00:00 2001 From: Ian Langmore Date: Mon, 4 Aug 2025 01:20:17 +0000 Subject: [PATCH 4/8] Lots of debugging stuff in there --- xarray_beam/_src/zarr.py | 7 ++++++- xarray_beam/_src/zarr_test.py | 15 +++++++++++---- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/xarray_beam/_src/zarr.py b/xarray_beam/_src/zarr.py index dd97440..321a9b6 100644 --- a/xarray_beam/_src/zarr.py +++ b/xarray_beam/_src/zarr.py @@ -321,6 +321,9 @@ def setup_zarr( template: xarray.Dataset, store: WritableStore, zarr_chunks: Optional[Mapping[str, int]] = None, + append_dim: Optional[str] = None, + mode: str = 'w', + debug: bool = False, ) -> None: """Setup a Zarr store. @@ -345,7 +348,9 @@ def setup_zarr( if 'chunks' in var.encoding: del var.encoding['chunks'] logging.info(f'writing Zarr metadata for template:\n{template}') - template2.to_zarr(store, compute=False, consolidated=True, mode='w') + if debug: + return template2 + template2.to_zarr(store, compute=False, consolidated=True, mode=mode, append_dim=append_dim) def validate_zarr_chunk( diff --git a/xarray_beam/_src/zarr_test.py b/xarray_beam/_src/zarr_test.py index 9e3c071..b212cd4 100644 --- a/xarray_beam/_src/zarr_test.py +++ b/xarray_beam/_src/zarr_test.py @@ -287,14 +287,20 @@ def test_chunks_to_zarr_append(self, mode): # Append the new data (t=[2]) to the existing metadata. # This results in t/.zarray that has chunk:2, equal to the number of times # in the first write. - dataset.isel(t=[2]).chunk(zarr_chunks).to_zarr( - path, mode='a', append_dim='t', compute=False - ) + #dataset.isel(t=[2]).chunk(zarr_chunks).to_zarr( + # path, mode='a', append_dim='t', compute=False + #) + xbeam.setup_zarr(xbeam.make_template(dataset.isel(t=[2])), path, zarr_chunks=zarr_chunks, mode='a', append_dim='t') elif mode == 'w': # Overwrite all metadata. # This results in t/.zarray that has chunk:3, equal to the number of times # in the total dataset. - dataset.chunk(zarr_chunks).to_zarr(path, mode='w', compute=False) + #dataset.chunk(zarr_chunks).to_zarr(path, mode='w', compute=False, consolidated=True) + C = dataset.chunk(zarr_chunks) + #xbeam.setup_zarr(xbeam.make_template(dataset.chunk(zarr_chunks)), path, zarr_chunks=zarr_chunks, mode='w') + #xbeam.setup_zarr(dataset.chunk(zarr_chunks), path, zarr_chunks=zarr_chunks, mode='w') + D = xbeam.setup_zarr(xbeam.make_template(dataset), path, zarr_chunks=zarr_chunks, mode='w', debug=True) + D.to_zarr(path, mode='w', compute=False) # Second, get full template. Opening the dataset is an easy way to get it. xbeam_opened_result, chunks = xbeam.open_zarr(path) @@ -314,6 +320,7 @@ def test_chunks_to_zarr_append(self, mode): final_result, final_chunks = xbeam.open_zarr(path, consolidated=True) xarray.testing.assert_identical(dataset, final_result) self.assertEqual(zarr_chunks, final_chunks) + #import pudb; pu.db def test_multiple_vars_chunks_to_zarr(self): dataset = xarray.Dataset( From caacbc787120947fb4c479f69337075221752cbc Mon Sep 17 00:00:00 2001 From: Ian Langmore Date: Tue, 5 Aug 2025 11:59:58 +0000 Subject: [PATCH 5/8] Before removing overwrite options that do not work --- xarray_beam/_src/zarr.py | 20 ++++++---- xarray_beam/_src/zarr_test.py | 75 +++++++++++++++++++++++------------ 2 files changed, 61 insertions(+), 34 deletions(-) diff --git a/xarray_beam/_src/zarr.py b/xarray_beam/_src/zarr.py index 321a9b6..f0a1003 100644 --- a/xarray_beam/_src/zarr.py +++ b/xarray_beam/_src/zarr.py @@ -35,6 +35,7 @@ import dask.array import numpy as np import pandas as pd +import pudb # TODO Remove import xarray from xarray_beam._src import core from xarray_beam._src import rechunk @@ -321,9 +322,6 @@ def setup_zarr( template: xarray.Dataset, store: WritableStore, zarr_chunks: Optional[Mapping[str, int]] = None, - append_dim: Optional[str] = None, - mode: str = 'w', - debug: bool = False, ) -> None: """Setup a Zarr store. @@ -348,9 +346,7 @@ def setup_zarr( if 'chunks' in var.encoding: del var.encoding['chunks'] logging.info(f'writing Zarr metadata for template:\n{template}') - if debug: - return template2 - template2.to_zarr(store, compute=False, consolidated=True, mode=mode, append_dim=append_dim) + template2.to_zarr(store, compute=False, consolidated=True, mode='w') def validate_zarr_chunk( @@ -423,6 +419,7 @@ def write_chunk_to_zarr( chunk: xarray.Dataset, store: WritableStore, template: xarray.Dataset, + debug, # TODO Remove ) -> None: """Write a single Dataset chunk to Zarr. @@ -437,6 +434,8 @@ def write_chunk_to_zarr( without array values. """ # Immutable dicts not considered a Mapping type which method expects. + if debug: + pu.db region = core.offsets_to_slices(key.offsets, chunk.sizes) # pytype: disable=wrong-arg-types already_written = [ k for k in chunk.variables if k in _unchunked_vars(template) @@ -466,6 +465,7 @@ def __init__( *, num_threads: Optional[int] = None, needs_setup: bool = True, + debug=False, # TODO Remove ): # pyformat: disable """Initialize ChunksToZarr. @@ -501,6 +501,9 @@ def __init__( needs_setup: if False, then the Zarr store is already setup and does not need to be set up as part of this PTransform. """ + self.debug = debug + if self.debug: + pu.db # pyformat: enable if isinstance(template, xarray.Dataset): if needs_setup: @@ -544,9 +547,9 @@ def _validate_zarr_chunk(self, key, chunk, template=None): validate_zarr_chunk(key, chunk, template, self.zarr_chunks) return key, chunk - def _write_chunk_to_zarr(self, key, chunk, template=None): + def _write_chunk_to_zarr(self, key, chunk, template=None, debug=None): assert template is not None - return write_chunk_to_zarr(key, chunk, self.store, template) + return write_chunk_to_zarr(key, chunk, self.store, template, debug) def expand(self, pcoll): if isinstance(self.template, xarray.Dataset): @@ -574,6 +577,7 @@ def expand(self, pcoll): self._write_chunk_to_zarr, template=template, num_threads=self.num_threads, + debug=self.debug, ) ) diff --git a/xarray_beam/_src/zarr_test.py b/xarray_beam/_src/zarr_test.py index b212cd4..22f58f1 100644 --- a/xarray_beam/_src/zarr_test.py +++ b/xarray_beam/_src/zarr_test.py @@ -18,6 +18,7 @@ import dask.array as da import numpy as np import pandas as pd +import pudb # TODO REmove import xarray import xarray_beam as xbeam from xarray_beam._src import test_util @@ -258,27 +259,34 @@ def test_chunks_to_zarr(self): dict(testcase_name='overwrite', mode='w'), ) def test_chunks_to_zarr_append(self, mode): - dataset = xarray.Dataset( + zarr_chunks = {'t': 1, 'x': 5} + + # Calling .chunk() on this dataset is very important. + ds_orig = xarray.Dataset( {'foo': (('t', 'x'), np.arange(3 * 5).reshape(3, 5))}, coords={ 't': np.arange(100, 103), 'x': np.arange(5), }, - ) + )#.chunk(zarr_chunks) + + # Create ds_full, which will be different than ds_orig. + ds_first_two = (ds_orig.isel(t=slice(2))).chunk(zarr_chunks) + ds_last = (ds_orig.isel(t=[2]) * 2).chunk(zarr_chunks) + ds_full = xarray.concat((ds_first_two, ds_last), dim='t') # Write the first two chunks. - zarr_chunks = {'t': 1, 'x': 5} - two_chunk_template = xbeam.make_template(dataset.isel(t=slice(2))) + two_chunk_template = xbeam.make_template(ds_first_two) first_two_chunks = [ - (xbeam.Key({'t': 0}), dataset.isel(t=[0])), - (xbeam.Key({'t': 1}), dataset.isel(t=[1])), + (xbeam.Key({'t': 0}), ds_first_two.isel(t=[0])), + (xbeam.Key({'t': 1}), ds_first_two.isel(t=[1])), ] path = self.create_tempdir().full_path first_two_chunks | xbeam.ChunksToZarr( path, template=two_chunk_template, zarr_chunks=zarr_chunks ) two_chunk_result = xarray.open_zarr(path, consolidated=True) - xarray.testing.assert_identical(dataset.isel(t=slice(2)), two_chunk_result) + xarray.testing.assert_identical(ds_first_two, two_chunk_result) # Now append the last chunk. @@ -287,40 +295,55 @@ def test_chunks_to_zarr_append(self, mode): # Append the new data (t=[2]) to the existing metadata. # This results in t/.zarray that has chunk:2, equal to the number of times # in the first write. - #dataset.isel(t=[2]).chunk(zarr_chunks).to_zarr( - # path, mode='a', append_dim='t', compute=False - #) - xbeam.setup_zarr(xbeam.make_template(dataset.isel(t=[2])), path, zarr_chunks=zarr_chunks, mode='a', append_dim='t') + ds_last.chunk(zarr_chunks).to_zarr( + path, mode='a', append_dim='t', compute=False + ) + xbeam_opened_result, chunks = xbeam.open_zarr(path) + full_template = xbeam.make_template(xbeam_opened_result) elif mode == 'w': + pu.db + partial_result, partial_chunks = xbeam.open_zarr(path, consolidated=True) # Overwrite all metadata. # This results in t/.zarray that has chunk:3, equal to the number of times # in the total dataset. - #dataset.chunk(zarr_chunks).to_zarr(path, mode='w', compute=False, consolidated=True) - C = dataset.chunk(zarr_chunks) - #xbeam.setup_zarr(xbeam.make_template(dataset.chunk(zarr_chunks)), path, zarr_chunks=zarr_chunks, mode='w') - #xbeam.setup_zarr(dataset.chunk(zarr_chunks), path, zarr_chunks=zarr_chunks, mode='w') - D = xbeam.setup_zarr(xbeam.make_template(dataset), path, zarr_chunks=zarr_chunks, mode='w', debug=True) - D.to_zarr(path, mode='w', compute=False) - - # Second, get full template. Opening the dataset is an easy way to get it. - xbeam_opened_result, chunks = xbeam.open_zarr(path) - full_template = xbeam.make_template(xbeam_opened_result) - - # Third, write the last chunk only. + full_template = xbeam.replace_template_dims( + xbeam.make_template(partial_result), + t=ds_orig.t, + ).chunk(zarr_chunks) + full_template.to_zarr(path, mode='a', compute=False, consolidated=True) + #xbeam.setup_zarr(full_template, path, zarr_chunks=zarr_chunks) + + # An interesting fact is that, although we have only asked ChunksToZarr to + # write first_two_chunks, this premature_result will contain the entire + # ds_orig! (this happens if ds_orig was chunked) + #premature_result, final_chunks = xbeam.open_zarr(path, consolidated=True) + #xarray.testing.assert_identical(ds_orig, premature_result) + + # Second, write the last chunk only. last_chunk = [ - (xbeam.Key({'t': 2}), dataset.isel(t=[2])), + (xbeam.Key({'t': 2}), ds_last), ] + pu.db + region = {'t': slice(2, 3, 1), 'x': slice(0, 5, 1)} + #ds_last.drop_vars(['t', 'x']).to_zarr(path, region=region, compute=True) + #temp = xarray.open_zarr(path).compute() + last_chunk | xbeam.ChunksToZarr( path, + #template=xbeam.make_template(ds_last), # Does not work + #template=xbeam.make_template(ds_orig), # Works template=full_template, zarr_chunks=zarr_chunks, needs_setup=False, + debug=True, ) + # Verify that the final_result contains ds_full, which has information + # not available (in any sneaky way) before this final ChunksToZarr call. + pu.db final_result, final_chunks = xbeam.open_zarr(path, consolidated=True) - xarray.testing.assert_identical(dataset, final_result) + xarray.testing.assert_identical(ds_full, final_result) self.assertEqual(zarr_chunks, final_chunks) - #import pudb; pu.db def test_multiple_vars_chunks_to_zarr(self): dataset = xarray.Dataset( From b7ffcd66fe43130d637cb711328410e3947cf137 Mon Sep 17 00:00:00 2001 From: Ian Langmore Date: Tue, 5 Aug 2025 12:18:29 +0000 Subject: [PATCH 6/8] Remove debug lines --- xarray_beam/_src/zarr.py | 13 +----- xarray_beam/_src/zarr_test.py | 81 ++++++++++------------------------- 2 files changed, 25 insertions(+), 69 deletions(-) diff --git a/xarray_beam/_src/zarr.py b/xarray_beam/_src/zarr.py index f0a1003..dd97440 100644 --- a/xarray_beam/_src/zarr.py +++ b/xarray_beam/_src/zarr.py @@ -35,7 +35,6 @@ import dask.array import numpy as np import pandas as pd -import pudb # TODO Remove import xarray from xarray_beam._src import core from xarray_beam._src import rechunk @@ -419,7 +418,6 @@ def write_chunk_to_zarr( chunk: xarray.Dataset, store: WritableStore, template: xarray.Dataset, - debug, # TODO Remove ) -> None: """Write a single Dataset chunk to Zarr. @@ -434,8 +432,6 @@ def write_chunk_to_zarr( without array values. """ # Immutable dicts not considered a Mapping type which method expects. - if debug: - pu.db region = core.offsets_to_slices(key.offsets, chunk.sizes) # pytype: disable=wrong-arg-types already_written = [ k for k in chunk.variables if k in _unchunked_vars(template) @@ -465,7 +461,6 @@ def __init__( *, num_threads: Optional[int] = None, needs_setup: bool = True, - debug=False, # TODO Remove ): # pyformat: disable """Initialize ChunksToZarr. @@ -501,9 +496,6 @@ def __init__( needs_setup: if False, then the Zarr store is already setup and does not need to be set up as part of this PTransform. """ - self.debug = debug - if self.debug: - pu.db # pyformat: enable if isinstance(template, xarray.Dataset): if needs_setup: @@ -547,9 +539,9 @@ def _validate_zarr_chunk(self, key, chunk, template=None): validate_zarr_chunk(key, chunk, template, self.zarr_chunks) return key, chunk - def _write_chunk_to_zarr(self, key, chunk, template=None, debug=None): + def _write_chunk_to_zarr(self, key, chunk, template=None): assert template is not None - return write_chunk_to_zarr(key, chunk, self.store, template, debug) + return write_chunk_to_zarr(key, chunk, self.store, template) def expand(self, pcoll): if isinstance(self.template, xarray.Dataset): @@ -577,7 +569,6 @@ def expand(self, pcoll): self._write_chunk_to_zarr, template=template, num_threads=self.num_threads, - debug=self.debug, ) ) diff --git a/xarray_beam/_src/zarr_test.py b/xarray_beam/_src/zarr_test.py index 22f58f1..0697368 100644 --- a/xarray_beam/_src/zarr_test.py +++ b/xarray_beam/_src/zarr_test.py @@ -18,7 +18,6 @@ import dask.array as da import numpy as np import pandas as pd -import pudb # TODO REmove import xarray import xarray_beam as xbeam from xarray_beam._src import test_util @@ -254,95 +253,61 @@ def test_chunks_to_zarr(self): ): inputs2 | xbeam.ChunksToZarr(temp_dir, template) - @parameterized.named_parameters( - dict(testcase_name='append', mode='a'), - dict(testcase_name='overwrite', mode='w'), - ) - def test_chunks_to_zarr_append(self, mode): + def test_chunks_to_zarr_append(self): zarr_chunks = {'t': 1, 'x': 5} - # Calling .chunk() on this dataset is very important. - ds_orig = xarray.Dataset( + # ds is the full dataset we want to write to zarr. + + # Warning: Do not chunk this ds. If you do, it will secretly + # write itself to disk, then simply modifying the template (without + # even calling ChunksToZarr) is enough to make all tests pass! + ds = xarray.Dataset( {'foo': (('t', 'x'), np.arange(3 * 5).reshape(3, 5))}, coords={ 't': np.arange(100, 103), 'x': np.arange(5), }, - )#.chunk(zarr_chunks) - - # Create ds_full, which will be different than ds_orig. - ds_first_two = (ds_orig.isel(t=slice(2))).chunk(zarr_chunks) - ds_last = (ds_orig.isel(t=[2]) * 2).chunk(zarr_chunks) - ds_full = xarray.concat((ds_first_two, ds_last), dim='t') + ) # Write the first two chunks. - two_chunk_template = xbeam.make_template(ds_first_two) + two_chunk_template = xbeam.make_template(ds.isel(t=slice(2))) first_two_chunks = [ - (xbeam.Key({'t': 0}), ds_first_two.isel(t=[0])), - (xbeam.Key({'t': 1}), ds_first_two.isel(t=[1])), + (xbeam.Key({'t': 0}), ds.isel(t=[0])), + (xbeam.Key({'t': 1}), ds.isel(t=[1])), ] path = self.create_tempdir().full_path first_two_chunks | xbeam.ChunksToZarr( path, template=two_chunk_template, zarr_chunks=zarr_chunks ) two_chunk_result = xarray.open_zarr(path, consolidated=True) - xarray.testing.assert_identical(ds_first_two, two_chunk_result) + xarray.testing.assert_identical(ds.isel(t=slice(2)), two_chunk_result) # Now append the last chunk. # First modify the metadata - if mode == 'a': - # Append the new data (t=[2]) to the existing metadata. - # This results in t/.zarray that has chunk:2, equal to the number of times - # in the first write. - ds_last.chunk(zarr_chunks).to_zarr( - path, mode='a', append_dim='t', compute=False - ) - xbeam_opened_result, chunks = xbeam.open_zarr(path) - full_template = xbeam.make_template(xbeam_opened_result) - elif mode == 'w': - pu.db - partial_result, partial_chunks = xbeam.open_zarr(path, consolidated=True) - # Overwrite all metadata. - # This results in t/.zarray that has chunk:3, equal to the number of times - # in the total dataset. - full_template = xbeam.replace_template_dims( - xbeam.make_template(partial_result), - t=ds_orig.t, - ).chunk(zarr_chunks) - full_template.to_zarr(path, mode='a', compute=False, consolidated=True) - #xbeam.setup_zarr(full_template, path, zarr_chunks=zarr_chunks) - - # An interesting fact is that, although we have only asked ChunksToZarr to - # write first_two_chunks, this premature_result will contain the entire - # ds_orig! (this happens if ds_orig was chunked) - #premature_result, final_chunks = xbeam.open_zarr(path, consolidated=True) - #xarray.testing.assert_identical(ds_orig, premature_result) + # Append the new data (t=[2]) to the existing metadata. + # This results in t/.zarray that has chunk:2, equal to the number of times + # in the first write. + xbeam.make_template(ds.isel(t=[2])).chunk(zarr_chunks).to_zarr( + path, mode='a', append_dim='t', compute=False + ) + xbeam_opened_result, chunks = xbeam.open_zarr(path) + self.assertEqual(zarr_chunks, chunks) # Second, write the last chunk only. last_chunk = [ - (xbeam.Key({'t': 2}), ds_last), + (xbeam.Key({'t': 2}), ds.isel(t=[2])), ] - pu.db - region = {'t': slice(2, 3, 1), 'x': slice(0, 5, 1)} - #ds_last.drop_vars(['t', 'x']).to_zarr(path, region=region, compute=True) - #temp = xarray.open_zarr(path).compute() last_chunk | xbeam.ChunksToZarr( path, - #template=xbeam.make_template(ds_last), # Does not work - #template=xbeam.make_template(ds_orig), # Works - template=full_template, + template=xbeam.make_template(xbeam_opened_result), zarr_chunks=zarr_chunks, needs_setup=False, - debug=True, ) - # Verify that the final_result contains ds_full, which has information - # not available (in any sneaky way) before this final ChunksToZarr call. - pu.db final_result, final_chunks = xbeam.open_zarr(path, consolidated=True) - xarray.testing.assert_identical(ds_full, final_result) + xarray.testing.assert_identical(ds, final_result) self.assertEqual(zarr_chunks, final_chunks) def test_multiple_vars_chunks_to_zarr(self): From 3ae0fa07b75b167f77de3596838e498d2f501da5 Mon Sep 17 00:00:00 2001 From: Ian Langmore Date: Tue, 5 Aug 2025 12:21:52 +0000 Subject: [PATCH 7/8] Added a comment --- xarray_beam/_src/zarr_test.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/xarray_beam/_src/zarr_test.py b/xarray_beam/_src/zarr_test.py index 0697368..8543774 100644 --- a/xarray_beam/_src/zarr_test.py +++ b/xarray_beam/_src/zarr_test.py @@ -289,7 +289,12 @@ def test_chunks_to_zarr_append(self): # This results in t/.zarray that has chunk:2, equal to the number of times # in the first write. xbeam.make_template(ds.isel(t=[2])).chunk(zarr_chunks).to_zarr( - path, mode='a', append_dim='t', compute=False + # Caling make_template is not necessary, but let's test it since + # this is the anticipated workflow. + path, + mode='a', + append_dim='t', + compute=False, ) xbeam_opened_result, chunks = xbeam.open_zarr(path) self.assertEqual(zarr_chunks, chunks) From 94fad97aa068cb40dbc26494a10916b7ef6b1be4 Mon Sep 17 00:00:00 2001 From: Ian Langmore Date: Wed, 6 Aug 2025 00:21:30 +0000 Subject: [PATCH 8/8] Updated comment to reflect caching is to blame --- xarray_beam/_src/zarr_test.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/xarray_beam/_src/zarr_test.py b/xarray_beam/_src/zarr_test.py index 8543774..b7d57c2 100644 --- a/xarray_beam/_src/zarr_test.py +++ b/xarray_beam/_src/zarr_test.py @@ -258,9 +258,9 @@ def test_chunks_to_zarr_append(self): # ds is the full dataset we want to write to zarr. - # Warning: Do not chunk this ds. If you do, it will secretly - # write itself to disk, then simply modifying the template (without - # even calling ChunksToZarr) is enough to make all tests pass! + # Warning: Do not chunk this ds. If you do, caching will make tests + # pass, so long as you simply modify the template (without even calling + # ChunksToZarr). ds = xarray.Dataset( {'foo': (('t', 'x'), np.arange(3 * 5).reshape(3, 5))}, coords={