@@ -191,7 +191,8 @@ def __init__(self,
191191 interface_type : str = None ,
192192 max_threads : int = 100 ,
193193 storage_options : dict = None ,
194- active_storage_url : str = None ) -> None :
194+ active_storage_url : str = None ,
195+ option_disable_chunk_cache : bool = False ) -> None :
195196 """
196197 Instantiate with a NetCDF4 dataset URI and the variable of interest within that file.
197198 (We need the variable, because we need variable specific metadata from within that
@@ -200,6 +201,7 @@ def __init__(self,
200201
201202 :param storage_options: s3fs.S3FileSystem options
202203 :param active_storage_url: Reductionist server URL
204+ :param option_disable_chunk_cache: flag to disable chunk caching
203205 """
204206 self .ds = None
205207 input_variable = False
@@ -257,6 +259,9 @@ def __init__(self,
257259 self .storage_options = storage_options
258260 self .active_storage_url = active_storage_url
259261
262+ # turn off chunk caching
263+ self .option_disable_chunk_cache = option_disable_chunk_cache
264+
260265 # basic check on file
261266 if not input_variable :
262267 if not os .path .isfile (self .uri ) and not self .interface_type :
@@ -515,21 +520,22 @@ def _from_storage(self, ds, indexer, chunks, out_shape, out_dtype,
515520 # Create a shared session object.
516521 if self .interface_type == "s3" and self ._version == 2 :
517522 if self .storage_options is not None :
518- key , secret = None , None
519523 if self .storage_options .get ("anon" , None ) is True :
520524 print ("Reductionist session for Anon S3 bucket." )
521525 session = reductionist .get_session (
522526 None , None , S3_ACTIVE_STORAGE_CACERT )
523- if "key" in self .storage_options :
524- key = self .storage_options ["key" ]
525- if "secret" in self .storage_options :
526- secret = self .storage_options ["secret" ]
527- if key and secret :
528- session = reductionist .get_session (
529- key , secret , S3_ACTIVE_STORAGE_CACERT )
530527 else :
531- session = reductionist .get_session (
532- S3_ACCESS_KEY , S3_SECRET_KEY , S3_ACTIVE_STORAGE_CACERT )
528+ key , secret = None , None
529+ if "key" in self .storage_options :
530+ key = self .storage_options ["key" ]
531+ if "secret" in self .storage_options :
532+ secret = self .storage_options ["secret" ]
533+ if key and secret :
534+ session = reductionist .get_session (
535+ key , secret , S3_ACTIVE_STORAGE_CACERT )
536+ else :
537+ session = reductionist .get_session (
538+ S3_ACCESS_KEY , S3_SECRET_KEY , S3_ACTIVE_STORAGE_CACERT )
533539 else :
534540 session = reductionist .get_session (S3_ACCESS_KEY ,
535541 S3_SECRET_KEY ,
@@ -661,6 +667,9 @@ def _process_chunk(self,
661667 # Axes over which to apply a reduction
662668 axis = self ._axis
663669
670+ # turn off chunk caching
671+ chunk_caching = self .option_disable_chunk_cache
672+
664673 if self .interface_type == 's3' and self ._version == 1 :
665674 tmp , count = reduce_opens3_chunk (ds ._fh ,
666675 offset ,
@@ -703,7 +712,8 @@ def _process_chunk(self,
703712 ds ._order ,
704713 chunk_selection ,
705714 axis ,
706- operation = self ._method )
715+ operation = self ._method ,
716+ option_disable_chunk_cache = chunk_caching ,)
707717 else :
708718 if self .storage_options .get ("anon" , None ) is True :
709719 bucket = os .path .dirname (parsed_url .path )
@@ -723,7 +733,8 @@ def _process_chunk(self,
723733 ds ._order ,
724734 chunk_selection ,
725735 axis ,
726- operation = self ._method )
736+ operation = self ._method ,
737+ option_disable_chunk_cache = chunk_caching ,)
727738 elif self .interface_type == "https" and self ._version == 2 :
728739 tmp , count = reductionist .reduce_chunk (session ,
729740 self .active_storage_url ,
@@ -739,7 +750,8 @@ def _process_chunk(self,
739750 chunk_selection ,
740751 axis ,
741752 operation = self ._method ,
742- interface_type = "https" )
753+ interface_type = "https" ,
754+ option_disable_chunk_cache = chunk_caching ,)
743755
744756 elif self .interface_type == 'ActivePosix' and self .version == 2 :
745757 # This is where the DDN Fuse and Infinia wrappers go
0 commit comments