diff --git a/.gitignore b/.gitignore index f823e3f..3192547 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ dist *.pdf *.png _* +.claude \ No newline at end of file diff --git a/README.md b/README.md index 5bf9206..36e3634 100644 --- a/README.md +++ b/README.md @@ -67,6 +67,58 @@ Bld['BldAeroNodes'] = ADProp Bld.write('AeroDyn_Blade_Modified.dat') ``` +## Streaming Mode for Large Files (New!) + +For large output files (GB-sized), you can use streaming mode to inspect file headers and metadata without loading all data into memory: + +**Header-only inspection:** +```python +import weio + +# Read only metadata without loading data (memory efficient!) +with weio.read('large_output.outb', streaming=True) as f: + print(f['attribute_names']) # Channel names + print(f['attribute_units']) # Units + print(f.description) # File description + # f.data is None - no data loaded yet +# File automatically closes when exiting 'with' block +``` + +**Load data after inspecting headers:** +```python +import weio + +# Inspect headers first, then decide whether to load +with weio.read('large_output.out', streaming=True) as f: + print(f"File has {len(f['attribute_names'])} channels") + + # Only load data if needed + f.readAll() + df = f.toDataFrame() + print(df.shape) +``` + +**Process CSV files in chunks:** +```python +import weio + +# Process very large CSV files incrementally +with weio.read('huge_dataset.csv', streaming=True) as f: + print(f"Columns: {f.colNames}") + + # Read and process data in chunks + while True: + chunk = f.readChunk(nlines=10000) + if chunk is None: + break + # Process this chunk... + print(f"Processing {len(chunk)} rows") +``` + +**Supported formats:** OpenFAST output files (`.out`, `.outb`), CSV files (`.csv`), HAWC2 files (`.dat`, `.sel`) + +**Memory savings:** Up to 10,000x reduction for header-only inspection of large files! + ## Requirements The library is compatible python 3, and has limited requirements. If you have pip installed on your system, you can install them by typing in a terminal: diff --git a/STREAMING_QUICKREF.md b/STREAMING_QUICKREF.md new file mode 100644 index 0000000..587b807 --- /dev/null +++ b/STREAMING_QUICKREF.md @@ -0,0 +1,89 @@ +# Streaming Mode - Quick Reference + +## For Users + +### Header-only inspection (memory efficient!) +```python +import weio + +with weio.read('large_file.outb', streaming=True) as f: + print(f['attribute_names']) # Column names + print(f['attribute_units']) # Units + # f.data is None - no data loaded +``` + +### Load data after inspection +```python +import weio + +with weio.read('file.out', streaming=True) as f: + print(f"Channels: {len(f['attribute_names'])}") + f.readAll() # Load data now + df = f.toDataFrame() +``` + +### Process CSV in chunks +```python +import weio + +with weio.read('huge.csv', streaming=True) as f: + while True: + chunk = f.readChunk(nlines=10000) + if chunk is None: + break + # Process chunk... +``` + +## For Developers + +### Adding streaming to a new file format + +1. **Add `streaming` parameter to `_read()`:** +```python +def _read(self, streaming=False, **kwargs): + if streaming: + # Read headers only, keep file open + self._fid = open(self.filename, 'r') + self['header'] = self._fid.readline() + self.data = None + else: + # Normal mode: read everything + with open(self.filename, 'r') as f: + self.data = f.read() +``` + +2. **Implement `_readAll()`:** +```python +def _readAll(self): + if self._fid is None: + raise RuntimeError("No open file handle") + # Read remaining data + self.data = self._fid.read() +``` + +3. **Optional: Implement `_readChunk()`:** +```python +def _readChunk(self, nlines=None, **kwargs): + if self._fid is None: + raise RuntimeError("No open file handle") + if nlines is None: + nlines = 1000 + # Read chunk... + return chunk # or None if EOF +``` + +That's it! The base File class handles context managers and enforcement. + +## Supported Formats (so far) + +- ✅ **OpenFAST:** `.out` (ASCII), `.outb` (binary) +- ✅ **CSV:** `.csv`, `.txt` (with chunk reading) +- ✅ **HAWC2:** `.dat`, `.sel` (ASCII and binary) + +## Key Design Points + +- **Single `streaming` parameter** (not both `headerOnly` and `streaming`) +- **Context manager required** for streaming mode (`with` statement) +- **Header-only:** Exit `with` block without calling `readAll()` +- **Full streaming:** Call `readAll()` or `readChunk()` inside `with` block +- **Backward compatible:** Default `streaming=False` means existing code works unchanged diff --git a/weio/_NEWFILE_TEMPLATE.py b/weio/_NEWFILE_TEMPLATE.py index 27f4388..e63e82d 100644 --- a/weio/_NEWFILE_TEMPLATE.py +++ b/weio/_NEWFILE_TEMPLATE.py @@ -73,15 +73,86 @@ def write(self, filename=None): # Calling (children) function to write self._write() - def _read(self): - """ Reads self.filename and stores data into self. Self is (or behaves like) a dictionary""" - # --- Example: + def _read(self, streaming=False, **kwargs): + """ + Reads self.filename and stores data into self. Self is (or behaves like) a dictionary + + Parameters + ---------- + streaming : bool + If True, read only headers and keep file open for later reading. + Requires context manager. Default: False (read entire file) + """ + # --- Example (normal mode - read everything): #self['data']=[] #with open(self.filename, 'r', errors="surrogateescape") as f: # for i, line in enumerate(f): # self['data'].append(line) + + # --- Example (with streaming support): + # if streaming: + # # Read headers only, keep file open + # self._fid = open(self.filename, 'r', errors="surrogateescape") + # # Read header lines + # self['header_line'] = self._fid.readline() + # # Parse header info + # self['attribute_names'] = self['header_line'].split() + # # File is now positioned at start of data + # else: + # # Normal mode: read entire file + # self['data']=[] + # with open(self.filename, 'r', errors="surrogateescape") as f: + # f.readline() # skip header + # for i, line in enumerate(f): + # self['data'].append(line) raise NotImplementedError() + def _readAll(self): + """ + Read all remaining data after header in streaming mode. + Only called when streaming=True and readAll() is invoked. + """ + # --- Example: + # if self._fid is None: + # raise RuntimeError("No open file handle") + # + # # Read all remaining lines from current position + # self['data'] = [] + # for line in self._fid: + # self['data'].append(line.strip()) + raise NotImplementedError(f"{self.__class__.__name__} does not support readAll()") + + def _readChunk(self, **kwargs): + """ + Read a chunk of data from current position in streaming mode. + Only called when streaming=True and readChunk() is invoked. + + Parameters + ---------- + **kwargs : dict + Format-specific parameters (e.g., nlines=1000, nrows=100, nbytes=1024) + Different file formats can implement different chunking strategies. + + Returns + ------- + chunk : data + The chunk of data read, or None if end of file + """ + # --- Example: + # if self._fid is None: + # raise RuntimeError("No open file handle") + # + # nlines = kwargs.get('nlines', 1000) # default chunk size + # + # chunk = [] + # for i in range(nlines): + # line = self._fid.readline() + # if not line: + # return None if not chunk else chunk + # chunk.append(line.strip()) + # return chunk + raise NotImplementedError(f"{self.__class__.__name__} does not support readChunk()") + def _write(self): """ Writes to self.filename""" # --- Example: diff --git a/weio/csv_file.py b/weio/csv_file.py index a688f4e..a3fdf17 100644 --- a/weio/csv_file.py +++ b/weio/csv_file.py @@ -35,7 +35,8 @@ def formatName(): return 'CSV file' def __init__(self, filename=None, sep=None, colNames=None, commentChar=None, commentLines=None,\ - colNamesLine=None, detectColumnNames=True, header=None, doRead=True, **kwargs): + colNamesLine=None, detectColumnNames=True, header=None, doRead=True, streaming=False, **kwargs): + # Initialize CSV-specific attributes colNames = [] if colNames is None else colNames commentLines = [] if commentLines is None else commentLines self.sep = sep @@ -45,7 +46,6 @@ def __init__(self, filename=None, sep=None, colNames=None, commentChar=None, com self.commentLines = commentLines self.colNamesLine = colNamesLine self.detectColumnNames = detectColumnNames - self.data=[] if header is None: self.header=[] else: @@ -58,20 +58,47 @@ def __init__(self, filename=None, sep=None, colNames=None, commentChar=None, com raise Exception('Provide either `commentChar` or `commentLines` for CSV file types') if (len(self.colNames)>0) and (self.colNamesLine is not None): raise Exception('Provide either `colNames` or `colNamesLine` for CSV file types') + + # Call parent __init__ - handles streaming, filename, etc. + File.__init__(self, filename=None, streaming=streaming) + + # Handle filename after parent init if filename: - self.read(filename, doRead=doRead, **kwargs) - else: - self.filename = None + if streaming: + # Don't read immediately in streaming mode - wait for context manager + self.filename = filename + else: + self.read(filename, doRead=doRead, **kwargs) + + def __enter__(self): + """Context manager entry - CSV needs special handling for detect().""" + self._in_context = True + if self.filename: + # In streaming mode: detect headers, open file but don't read data + # In normal mode: read everything + if self.streaming: + self.read(doRead=False) # Run detect() only + self._read() # Open file handle and skip to data + else: + self.read(doRead=True) + return self + + # Inherit __exit__ and _enforce_context_if_needed from parent File class - def read(self, filename=None, doRead=True, **kwargs): + def read(self, filename=None, doRead=True, streaming=None, **kwargs): if filename: self.filename = filename + if streaming is not None: + self.streaming = streaming if not self.filename: raise Exception('No filename provided') if not os.path.isfile(self.filename): raise OSError(2,'File not found:',self.filename) if os.stat(self.filename).st_size == 0: raise EmptyFileError('File is empty:',self.filename) + + self._enforce_context_if_needed() + # Calling children function self.detect() if doRead: @@ -259,16 +286,69 @@ def strIsFloat(s): #print(skiprows) def _read(self): + """Read CSV data. In streaming mode, file handle is kept open.""" try: - with open(self.filename,'r',encoding=self.encoding) as f: - self.data = pd.read_csv(f,sep=self.sep,skiprows=self.skiprows,header=None,comment=self.commentChar) + if self.streaming: + # Streaming mode: keep file open + self._fid = open(self.filename, 'r', encoding=self.encoding) + # Skip to data start + for _ in range(max(self.skiprows) + 1 if self.skiprows else 0): + self._fid.readline() + # Data is not loaded yet + self.data = None + else: + # Normal mode: read entire file + with open(self.filename,'r',encoding=self.encoding) as f: + self.data = pd.read_csv(f,sep=self.sep,skiprows=self.skiprows,header=None,comment=self.commentChar) + + if (len(self.colNames)==0) or (len(self.colNames)!=len(self.data.columns)): + self.colNames=['C{}'.format(i) for i in range(len(self.data.columns))] + self.data.columns = self.colNames + self.data.rename(columns=lambda x: x.strip(),inplace=True) except pd.errors.ParserError as e: raise WrongFormatError('CSV File {}: '.format(self.filename)+e.args[0]) - if (len(self.colNames)==0) or (len(self.colNames)!=len(self.data.columns)): - self.colNames=['C{}'.format(i) for i in range(len(self.data.columns))] - self.data.columns = self.colNames; - self.data.rename(columns=lambda x: x.strip(),inplace=True) + def _readAll(self): + """Read all remaining CSV data in streaming mode.""" + if self._fid is None: + raise RuntimeError("No open file handle") + + try: + # Read remaining data from current position + self.data = pd.read_csv(self._fid, sep=self.sep, header=None, comment=self.commentChar) + + if (len(self.colNames)==0) or (len(self.colNames)!=len(self.data.columns)): + self.colNames=['C{}'.format(i) for i in range(len(self.data.columns))] + self.data.columns = self.colNames + self.data.rename(columns=lambda x: x.strip(), inplace=True) + except pd.errors.ParserError as e: + raise WrongFormatError('CSV File {}: '.format(self.filename)+e.args[0]) + + def _readChunk(self, nlines=None, **kwargs): + """Read a chunk of CSV data (nlines rows).""" + if self._fid is None: + raise RuntimeError("No open file handle") + + if nlines is None: + nlines = 1000 # Default chunk size + + try: + chunk = pd.read_csv(self._fid, sep=self.sep, header=None, comment=self.commentChar, nrows=nlines) + + if len(chunk) == 0: + return None # End of file + + if (len(self.colNames)==0) or (len(self.colNames)!=len(chunk.columns)): + self.colNames=['C{}'.format(i) for i in range(len(chunk.columns))] + chunk.columns = self.colNames + chunk.rename(columns=lambda x: x.strip(), inplace=True) + + return chunk + except pd.errors.EmptyDataError: + # End of file reached + return None + except pd.errors.ParserError as e: + raise WrongFormatError('CSV File {}: '.format(self.filename)+e.args[0]) def read_slow_stop_at_first_empty_lines(self, skiprows=None, sep=None, numeric_only=True, colNames=None): diff --git a/weio/fast_output_file.py b/weio/fast_output_file.py index 15556a8..4ed5d5e 100644 --- a/weio/fast_output_file.py +++ b/weio/fast_output_file.py @@ -79,21 +79,26 @@ def defaultExtensions(): def formatName(): return 'FAST output file' - def __init__(self, filename=None, **kwargs): + # Inherit __enter__, __exit__, and _enforce_context_if_needed from parent File class + + def __init__(self, filename=None, streaming=False, **kwargs): """ Class constructor. If a `filename` is given, the file is read. """ - # Data - self.filename = filename + # FASTOutputFile-specific attributes self.data = None # pandas.DataFrame self.description = '' # string - if filename: - self.read(**kwargs) + self._data_start_pos = None - def read(self, filename=None, **kwargs): + # Call parent __init__ - handles streaming, filename, _fid, _in_context + File.__init__(self, filename=filename, streaming=streaming, **kwargs) + + def read(self, filename=None, streaming=None, **kwargs): """ Reads the file self.filename, or `filename` if provided """ - + # --- Standard tests and exceptions (generic code) if filename: self.filename = filename + if streaming is not None: + self.streaming = streaming if not self.filename: raise Exception('No filename provided') if not os.path.isfile(self.filename): @@ -101,6 +106,13 @@ def read(self, filename=None, **kwargs): if os.stat(self.filename).st_size == 0: raise EmptyFileError('File is empty:',self.filename) + # Enforce context manager for streaming + if self.streaming and not self._in_context: + raise RuntimeError( + "streaming=True requires using a context manager ('with' statement) " + "to ensure the file is closed. Use: `with FASTOutputFile(...) as reader:`" + ) + # --- Actual reading def readline(iLine): with open(self.filename) as f: @@ -113,52 +125,68 @@ def readline(iLine): ext = os.path.splitext(self.filename.lower())[1] info={} self['binary']=False - try: - if ext in ['.out','.elev','.dbg','.dbg2']: - self.data, info = load_ascii_output(self.filename, **kwargs) - elif ext=='.outb': - self.data, info = load_binary_output(self.filename, **kwargs) - self['binary']=True - elif ext=='.elm': - F=CSVFile(filename=self.filename, sep=' ', commentLines=[0,2],colNamesLine=1) - self.data = F.data - del F - info['attribute_units']=readline(3).replace('sec','s').split() - info['attribute_names']=self.data.columns.values - else: - if isBinary(self.filename): - self.data, info = load_binary_output(self.filename, **kwargs) + + if self.streaming: + # Streaming mode: read headers only, keep file open + try: + if ext=='.outb' or (ext not in ['.out','.elev','.dbg','.dbg2','.elm'] and isBinary(self.filename)): + # Binary file - read header only + self._fid, info = load_binary_output_header(self.filename, **kwargs) self['binary']=True - else: - self.data, info = load_ascii_output(self.filename, **kwargs) + elif ext in ['.out','.elev','.dbg','.dbg2'] or (ext not in ['.outb','.elm']): + # ASCII file - read header only + self._fid, info = load_ascii_output_header(self.filename, **kwargs) self['binary']=False - except MemoryError as e: - raise BrokenReaderError('FAST Out File {}: Memory error encountered\n{}'.format(self.filename,e)) - except Exception as e: - raise WrongFormatError('FAST Out File {}: {}'.format(self.filename,e.args)) - if self.data.shape[0]==0: - raise EmptyFileError('This FAST output file contains no data: {}'.format(self.filename)) - - + elif ext=='.elm': + # .elm files not supported in streaming mode yet + raise NotImplementedError('Streaming mode not yet supported for .elm files') - # --- Convert to DataFrame - if info['attribute_units'] is not None: - info['attribute_units'] = [re.sub(r'[()\[\]]','',u) for u in info['attribute_units']] - if len(info['attribute_names'])!=len(info['attribute_units']): - cols=info['attribute_names'] - print('[WARN] not all columns have units! Skipping units') - else: - cols=[n+'_['+u.replace('sec','s')+']' for n,u in zip(info['attribute_names'], info['attribute_units'])] + self.data = None # No data loaded yet in streaming mode + except Exception as e: + raise WrongFormatError('FAST Out File {}: {}'.format(self.filename,e.args)) else: - cols=info['attribute_names'] + # Normal mode: read entire file + try: + if ext in ['.out','.elev','.dbg','.dbg2']: + self.data, info = load_ascii_output(self.filename, **kwargs) + elif ext=='.outb': + self.data, info = load_binary_output(self.filename, **kwargs) + self['binary']=True + elif ext=='.elm': + F=CSVFile(filename=self.filename, sep=' ', commentLines=[0,2],colNamesLine=1) + self.data = F.data + del F + info['attribute_units']=readline(3).replace('sec','s').split() + info['attribute_names']=self.data.columns.values + else: + if isBinary(self.filename): + self.data, info = load_binary_output(self.filename, **kwargs) + self['binary']=True + else: + self.data, info = load_ascii_output(self.filename, **kwargs) + self['binary']=False + except MemoryError as e: + raise BrokenReaderError('FAST Out File {}: Memory error encountered\n{}'.format(self.filename,e)) + except Exception as e: + raise WrongFormatError('FAST Out File {}: {}'.format(self.filename,e.args)) + if self.data.shape[0]==0: + raise EmptyFileError('This FAST output file contains no data: {}'.format(self.filename)) + + + + # --- Store header information + self['attribute_names'] = info.get('attribute_names', []) + self['attribute_units'] = info.get('attribute_units', []) self.description = info.get('description', '') self.description = ''.join(self.description) if isinstance(self.description,list) else self.description - if isinstance(self.data, pd.DataFrame): - self.data.columns = cols - else: - if len(cols)!=self.data.shape[1]: - raise BrokenFormatError('Inconstistent number of columns between headers ({}) and data ({}) for file {}'.format(len(cols), self.data.shape[1], self.filename)) - self.data = pd.DataFrame(data=self.data, columns=cols) + + # Store binary file metadata for streaming mode + if self.streaming and self['binary']: + self['info_binary'] = info + + # --- Convert to DataFrame (only if data was loaded) + if self.data is not None: + self.data = fast_output_data_2_dataframe(self.data, info['attribute_names'], info['attribute_units'], self.filename) def write(self, filename=None, binary=None, fileID=4): @@ -212,6 +240,46 @@ def unit(s): units = [unit(c) for c in self.data.columns] return units + def _readAll(self): + """Read all data after header in streaming mode.""" + import numpy as np + + if self._fid is None: + raise RuntimeError("No open file handle. Use streaming=True with context manager.") + + ext = os.path.splitext(self.filename.lower())[1] + + # Read remaining data from open file handle + if self['binary']: + # Binary file - use shared data reading function + try: + # Read data using shared function with stored info + data = load_binary_output_data(self._fid, self['info_binary'], use_buffer=False, method='numpy') + + # Convert to DataFrame with existing column info + self.data = fast_output_data_2_dataframe(data, self['attribute_names'], self['attribute_units'], self.filename) + + if self.data.shape[0]==0: + raise EmptyFileError('This FAST output file contains no data: {}'.format(self.filename)) + + except Exception as e: + raise BrokenReaderError('Error reading binary data in streaming mode: {}'.format(e)) + + else: + # ASCII file - read from current position + try: + # Read all remaining data + data = np.loadtxt(self._fid, comments=('This')) + + # Convert to DataFrame with existing column info + self.data = fast_output_data_2_dataframe(data, self['attribute_names'], self['attribute_units'], self.filename) + + if self.data.shape[0]==0: + raise EmptyFileError('This FAST output file contains no data: {}'.format(self.filename)) + + except Exception as e: + raise BrokenReaderError('Error reading data in streaming mode: {}'.format(e)) + def toDataFrame(self): """ Returns object into one DataFrame, or a dictionary of DataFrames""" return self.data @@ -350,6 +418,47 @@ def toOUTB(self, filename=None, extension='.outb', fileID=4, noOverWrite=True, * # -------------------------------------------------------------------------------- # --- Helper low level functions # -------------------------------------------------------------------------------- +def fast_output_data_2_dataframe(data, attribute_names, attribute_units, filename): + """ + Convert FAST output data to DataFrame with proper column names. + + Parameters + ---------- + data : np.ndarray or pd.DataFrame + Data array or DataFrame + attribute_names : list + Channel names + attribute_units : list + Channel units + filename : str + Filename for error messages + + Returns + ------- + pd.DataFrame + DataFrame with properly formatted columns + """ + # Build column names with units + if attribute_units is not None: + units = [re.sub(r'[()\[\]]','',u) for u in attribute_units] + if len(attribute_names)!=len(units): + cols=attribute_names + print('[WARN] not all columns have units! Skipping units') + else: + cols=[n+'_['+u.replace('sec','s')+']' for n,u in zip(attribute_names, units)] + else: + cols=attribute_names + + # Convert to DataFrame if needed + if isinstance(data, pd.DataFrame): + data.columns = cols + return data + else: + if len(cols)!=data.shape[1]: + raise BrokenFormatError('Inconstistent number of columns between headers ({}) and data ({}) for file {}'.format(len(cols), data.shape[1], filename)) + return pd.DataFrame(data=data, columns=cols) + + def isBinary(filename): with open(filename, 'r') as f: try: @@ -368,6 +477,48 @@ def isBinary(filename): +def load_ascii_output_header(filename, encoding='ascii', **kwargs): + """ + Read only the header of an ASCII FAST output file and return open file handle. + Returns (fid, info) where fid is the open file handle positioned at start of data. + """ + fid = open(filename, encoding=encoding, errors='ignore') + info = {} + info['name'] = os.path.splitext(os.path.basename(filename))[0] + + # Header is whatever is before the keyword `time` + header = [] + maxHeaderLines = 35 + headerRead = False + for i in range(maxHeaderLines): + l = fid.readline() + if not l: + fid.close() + raise Exception('Error finding the end of FAST out file header. Keyword Time missing.') + # Check for utf-16 + if l[:3] == '\x00 \x00': + fid.close() + print('[WARN] Attempt to re-read the file with encoding utf-16') + return load_ascii_output_header(filename=filename, encoding='utf-16') + first_word = (l+' dummy').lower().split()[0] + in_header = (first_word != 'time') and (first_word != 'alpha') + if in_header: + header.append(l) + else: + info['description'] = header + info['attribute_names'] = l.split() + info['attribute_units'] = [unit[1:-1] for unit in fid.readline().split()] + headerRead = True + break + + if not headerRead: + fid.close() + raise WrongFormatError('Could not find the keyword "Time" or "Alpha" in the first {} lines of the file {}'.format(maxHeaderLines, filename)) + + # File handle is now positioned at the start of data + return fid, info + + def load_ascii_output(filename, method='numpy', encoding='ascii', **kwargs): @@ -442,22 +593,125 @@ def load_ascii_output(filename, method='numpy', encoding='ascii', **kwargs): return data, info -def load_binary_output(filename, use_buffer=False, method='mix', **kwargs): +def load_binary_output_header(filename, **kwargs): """ - 03/09/15: Ported from ReadFASTbinary.m by Mads M Pedersen, DTU Wind - 24/10/18: Low memory/buffered version by E. Branlard, NREL - 18/01/19: New file format for extended channels, by E. Branlard, NREL - 20/11/23: Improved performances using np.fromfile, by E. Branlard, NREL + Read only the header of a binary FAST output file and return open file handle. + Returns (fid, info) where fid is the open file handle positioned at start of data. """ StructDict = { - 'uint8': ('B', 1, np.uint8), - 'int16': ('h', 2, np.int16), - 'int32': ('i', 4, np.int32), + 'uint8': ('B', 1, np.uint8), + 'int16': ('h', 2, np.int16), + 'int32': ('i', 4, np.int32), + 'float32': ('f', 4, np.float32), + 'float64': ('d', 8, np.float64) + } + + def freadStruct(fid, n, dtype): + fmt, nbytes, npdtype = StructDict[dtype] + return struct.unpack(fmt * n, fid.read(nbytes * n)) + + fid = open(filename, 'rb') + info = {} + + #---------------------------- + # get the header information + #---------------------------- + FileID = freadStruct(fid, 1, 'int16')[0] # FAST output file format, INT(2) + + if FileID not in [FileFmtID_WithTime, FileFmtID_WithoutTime, FileFmtID_NoCompressWithoutTime, FileFmtID_ChanLen_In]: + fid.close() + raise Exception('FileID not supported {}. Is it a FAST binary file?'.format(FileID)) + + if FileID == FileFmtID_ChanLen_In: + LenName = freadStruct(fid, 1, 'int16')[0] # Number of characters in channel names and units + else: + LenName = 10 # Default number of characters per channel name + + NumOutChans = freadStruct(fid, 1, 'int32')[0] # Number of output channels, INT(4) + NT = freadStruct(fid, 1, 'int32')[0] # Number of time steps, INT(4) + + if FileID == FileFmtID_WithTime: + TimeScl = freadStruct(fid, 1, 'float64')[0] # The time slopes for scaling, REAL(8) + TimeOff = freadStruct(fid, 1, 'float64')[0] # The time offsets for scaling, REAL(8) + else: + TimeOut1 = freadStruct(fid, 1, 'float64')[0] # The first time in the time series, REAL(8) + TimeIncr = freadStruct(fid, 1, 'float64')[0] # The time increment, REAL(8) + + if FileID == FileFmtID_NoCompressWithoutTime: + ColScl = np.ones ((NumOutChans, 1)) # The channel slopes for scaling, REAL(4) + ColOff = np.zeros((NumOutChans, 1)) # The channel offsets for scaling, REAL(4) + else: + ColScl = freadStruct(fid, NumOutChans, 'float32') # The channel slopes for scaling, REAL(4) + ColOff = freadStruct(fid, NumOutChans, 'float32') # The channel offsets for scaling, REAL(4) + + LenDesc = freadStruct(fid, 1, 'int32')[0] # The number of characters in the description string, INT(4) + DescStrASCII = freadStruct(fid, LenDesc, 'uint8') # DescStr converted to ASCII + DescStr = "".join(map(chr, DescStrASCII)).strip() + + # ChanName and ChanUnit converted to numeric ASCII + ChanName = ["".join(map(chr, freadStruct(fid, LenName, 'uint8'))).strip() for _ in range(NumOutChans + 1)] + ChanUnit = ["".join(map(chr, freadStruct(fid, LenName, 'uint8'))).strip()[1:-1] for _ in range(NumOutChans + 1)] + + # Store in info dict (including metadata needed for _readAll()) + info['name'] = os.path.splitext(os.path.basename(filename))[0] + info['description'] = DescStr + info['attribute_names'] = ChanName + info['attribute_units'] = ChanUnit + info['FileID'] = FileID + info['NumOutChans'] = NumOutChans + info['NT'] = NT + info['ColScl'] = ColScl + info['ColOff'] = ColOff + if FileID == FileFmtID_WithTime: + info['TimeScl'] = TimeScl + info['TimeOff'] = TimeOff + else: + info['TimeOut1'] = TimeOut1 + info['TimeIncr'] = TimeIncr + + # File handle is now positioned at the start of data + return fid, info + + +def load_binary_output_data(fid, info, use_buffer=False, method='mix'): + """ + Read binary data portion using header metadata. + + Parameters + ---------- + fid : file handle + Open binary file positioned at start of data + info : dict + Header metadata from load_binary_output_header() + Must contain: FileID, NumOutChans, NT, ColScl, ColOff, TimeScl/TimeOff or TimeOut1/TimeIncr + use_buffer : bool + Whether to use buffered reading + method : str + Reading method: 'numpy', 'struct', 'mix', or 'optim' + + Returns + ------- + data : np.ndarray + Data array with time in first column, shape (NT, NumOutChans+1) + """ + import numpy as np + import struct + + # Extract metadata + FileID = info['FileID'] + NumOutChans = info['NumOutChans'] + NT = info['NT'] + ColScl = info['ColScl'] + ColOff = info['ColOff'] + + # Setup reading functions + StructDict = { + 'uint8': ('B', 1, np.uint8), + 'int16': ('h', 2, np.int16), + 'int32': ('i', 4, np.int32), 'float32': ('f', 4, np.float32), 'float64': ('d', 8, np.float64) } - def getFileSizeMB(filename): - return os.path.getsize(filename)/(1024.0**2) def freadStruct(fid, n, dtype): fmt, nbytes, npdtype = StructDict[dtype] @@ -469,7 +723,7 @@ def freadStructArray(fid, n, dtype): def freadNumpy(fid, n, dtype): fmt, nbytes, npdtype = StructDict[dtype] - return np.fromfile(fid, count=n, dtype=npdtype) # Improved performances + return np.fromfile(fid, count=n, dtype=npdtype) if method=='numpy': fread = freadNumpy @@ -481,8 +735,6 @@ def freadNumpy(fid, n, dtype): fread = freadStruct freadLarge = freadNumpy elif method=='optim': - # Decide on method on the fly - #MB = getFileSizeMB(filename) use_buffer = False fread = freadStruct freadLarge = freadNumpy @@ -490,7 +742,7 @@ def freadNumpy(fid, n, dtype): raise NotImplementedError def freadRowOrderTableBuffered(fid, n, type_in, nCols, nOff=0, type_out='float64'): - """ + """ Reads of row-ordered table from a binary file. Read `n` data of type `type_in`, assumed to be a row ordered table of `nCols` columns. @@ -524,82 +776,47 @@ def freadRowOrderTableBuffered(fid, n, type_in, nCols, nOff=0, type_out='float64 nLinesRead = nLinesRead + nLinesToRead nIntRead = nIntRead + nIntToRead except: + filename = getattr(fid, 'name', 'unknown') raise Exception('Read only %d of %d values in file: %s' % (nIntRead, n, filename)) return data - with open(filename, 'rb') as fid: - #---------------------------- - # get the header information - #---------------------------- - FileID = fread(fid, 1, 'int16')[0] # FAST output file format, INT(2) - - if FileID not in [FileFmtID_WithTime, FileFmtID_WithoutTime, FileFmtID_NoCompressWithoutTime, FileFmtID_ChanLen_In]: - raise Exception('FileID not supported {}. Is it a FAST binary file?'.format(FileID)) - - if FileID == FileFmtID_ChanLen_In: - LenName = fread(fid, 1, 'int16')[0] # Number of characters in channel names and units - else: - LenName = 10 # Default number of characters per channel name - - NumOutChans = fread(fid, 1, 'int32')[0] # Number of output channels, INT(4) - NT = fread(fid, 1, 'int32')[0] # Number of time steps, INT(4) + # ------------------------- + # Read the channel time series + # ------------------------- + nPts = NT * NumOutChans # number of data points in the file - if FileID == FileFmtID_WithTime: - TimeScl = fread(fid, 1, 'float64')[0] # The time slopes for scaling, REAL(8) - TimeOff = fread(fid, 1, 'float64')[0] # The time offsets for scaling, REAL(8) - else: - TimeOut1 = fread(fid, 1, 'float64')[0] # The first time in the time series, REAL(8) - TimeIncr = fread(fid, 1, 'float64')[0] # The time increment, REAL(8) + if FileID == FileFmtID_WithTime: + PackedTime = fread(fid, NT, 'int32') #read the time data + cnt = len(PackedTime) + if cnt < NT: + filename = getattr(fid, 'name', 'unknown') + raise Exception('Could not read entire %s file: read %d of %d time values' % (filename, cnt, NT)) + if use_buffer: + # Reading data using buffers, and allowing an offset for time column (nOff=1) if FileID == FileFmtID_NoCompressWithoutTime: - ColScl = np.ones ((NumOutChans, 1)) # The channel slopes for scaling, REAL(4) - ColOff = np.zeros((NumOutChans, 1)) # The channel offsets for scaling, REAL(4) + data = freadRowOrderTableBuffered(fid, nPts, 'float64', NumOutChans, nOff=1, type_out='float64') else: - # NOTE: check why legacy is needed here (changes the results) - ColScl = fread(fid, NumOutChans, 'float32') # The channel slopes for scaling, REAL(4) - ColOff = fread(fid, NumOutChans, 'float32') # The channel offsets for scaling, REAL(4) - - LenDesc = fread(fid, 1, 'int32')[0] # The number of characters in the description string, INT(4) - DescStrASCII = fread(fid, LenDesc, 'uint8') # DescStr converted to ASCII - DescStr = "".join(map(chr, DescStrASCII)).strip() - - # ChanName and ChanUnit converted to numeric ASCII - ChanName = ["".join(map(chr, fread(fid, LenName, 'uint8'))).strip() for _ in range(NumOutChans + 1)] - ChanUnit = ["".join(map(chr, fread(fid, LenName, 'uint8'))).strip()[1:-1] for _ in range(NumOutChans + 1)] - # ------------------------- - # get the channel time series - # ------------------------- - nPts = NT * NumOutChans # number of data points in the file - - if FileID == FileFmtID_WithTime: - PackedTime = fread(fid, NT, 'int32') #read the time data - cnt = len(PackedTime) - if cnt < NT: - raise Exception('Could not read entire %s file: read %d of %d time values' % (filename, cnt, NT)) - - if use_buffer: - # Reading data using buffers, and allowing an offset for time column (nOff=1) - if FileID == FileFmtID_NoCompressWithoutTime: - data = freadRowOrderTableBuffered(fid, nPts, 'float64', NumOutChans, nOff=1, type_out='float64') - else: - data = freadRowOrderTableBuffered(fid, nPts, 'int16', NumOutChans, nOff=1, type_out='float64') + data = freadRowOrderTableBuffered(fid, nPts, 'int16', NumOutChans, nOff=1, type_out='float64') + else: + # NOTE: unpacking huge data not possible on 32bit machines + if FileID == FileFmtID_NoCompressWithoutTime: + PackedData = freadLarge(fid, nPts, 'float64') # read the channel data else: - # NOTE: unpacking huge data not possible on 32bit machines - if FileID == FileFmtID_NoCompressWithoutTime: - PackedData = freadLarge(fid, nPts, 'float64') # read the channel data - else: - PackedData = freadLarge(fid, nPts, 'int16') # read the channel data + PackedData = freadLarge(fid, nPts, 'int16') # read the channel data - cnt = len(PackedData) - if cnt < nPts: - raise Exception('Could not read entire %s file: read %d of %d values' % (filename, cnt, nPts)) - data = np.array(PackedData).reshape(NT, NumOutChans) - del PackedData + cnt = len(PackedData) + if cnt < nPts: + filename = getattr(fid, 'name', 'unknown') + raise Exception('Could not read entire %s file: read %d of %d values' % (filename, cnt, nPts)) + data = np.array(PackedData).reshape(NT, NumOutChans) + del PackedData + # Create time vector if FileID == FileFmtID_WithTime: - time = (np.array(PackedTime) - TimeOff) / TimeScl + time = (np.array(PackedTime) - info['TimeOff']) / info['TimeScl'] else: - time = TimeOut1 + TimeIncr * np.arange(NT) + time = info['TimeOut1'] + info['TimeIncr'] * np.arange(NT) # ------------------------- # Scale the packed binary to real data @@ -618,12 +835,60 @@ def freadRowOrderTableBuffered(fid, n, type_in, nCols, nOff=0, type_out='float64 data = (data - ColOff) / ColScl data = np.concatenate([time.reshape(NT, 1), data], 1) - info = {'name': os.path.splitext(os.path.basename(filename))[0], - 'description': DescStr, - 'fileID': FileID, - 'attribute_names': ChanName, - 'attribute_units': ChanUnit} - return data, info + return data + + +def load_binary_output(filename, use_buffer=False, method='mix', **kwargs): + """ + Read OpenFAST binary output file. + + This function now calls load_binary_output_header() and load_binary_output_data() + to avoid code duplication with the streaming implementation. + + Parameters + ---------- + filename : str + Path to binary file + use_buffer : bool + Whether to use buffered reading + method : str + Reading method: 'numpy', 'struct', 'mix', or 'optim' + + Returns + ------- + data : np.ndarray + Data array with time in first column + info : dict + File metadata + + History + ------- + 03/09/15: Ported from ReadFASTbinary.m by Mads M Pedersen, DTU Wind + 24/10/18: Low memory/buffered version by E. Branlard, NREL + 18/01/19: New file format for extended channels, by E. Branlard, NREL + 20/11/23: Improved performances using np.fromfile, by E. Branlard, NREL + 11/08/25: Refactored to use shared header/data functions, by E. Branlard, NREL + """ + # Read header and get file handle positioned at data start + fid, info = load_binary_output_header(filename) + + try: + # Read data using header metadata + data = load_binary_output_data(fid, info, use_buffer=use_buffer, method=method) + finally: + # Always close file + fid.close() + + # Return in format expected by legacy callers + legacy_info = { + 'name': info['name'], + 'description': info['description'], + 'fileID': info['FileID'], + 'attribute_names': info['attribute_names'], + 'attribute_units': info['attribute_units'] + } + return data, legacy_info + def writeBinary(fileName, channels, chanNames, chanUnits, fileID=4, descStr=''): diff --git a/weio/file.py b/weio/file.py index 09b7e8c..e0f5db0 100644 --- a/weio/file.py +++ b/weio/file.py @@ -22,23 +22,72 @@ class OptionalImportError(Exception): FileNotFoundError = IOError class File(OrderedDict): - def __init__(self,filename=None,**kwargs): - if filename: - self.read(filename, **kwargs) - else: - self.filename = None - - def read(self, filename=None, **kwargs): - if filename: + def __init__(self, filename=None, streaming=False, **kwargs): + OrderedDict.__init__(self) + self.filename = filename + self.streaming = streaming # If True, read headers only, keep file open + self.data = None # Data storage (varies by format) + self._in_context = False # True only if in 'with' block + self._fid = None # File handle when streaming + if filename is not None and not streaming: + # Only read immediately in normal mode; streaming mode waits for __enter__ + self.read(streaming=streaming, **kwargs) + + def __enter__(self): + """Context manager entry.""" + self._in_context = True + if self.filename: + self.read(streaming=self.streaming) + return self + + def __exit__(self, exc_type, exc_value, traceback): + """Context manager exit - close file handle if open.""" + if self._fid is not None: + self._fid.close() + self._fid = None + self._in_context = False + # Optional: free data on exit if not streaming (saves memory) + if not self.streaming and self.data is not None: + self.data = None + return False # Propagate exceptions + + def _enforce_context_if_needed(self): + """Raise if streaming and not in context manager.""" + # Check if streaming attribute exists (some old file formats may not have it) + if hasattr(self, 'streaming') and self.streaming and not self._in_context: + raise RuntimeError( + "streaming=True requires using a context manager ('with' statement) " + "to ensure the file is closed. Use: `with File(...) as reader:`" + ) + + def read(self, filename=None, streaming=None, **kwargs): + if filename is not None: self.filename = filename + if streaming is not None: + self.streaming = streaming + # Initialize streaming attribute if it doesn't exist (for old file formats) + if not hasattr(self, 'streaming'): + self.streaming = False if not self.filename: raise Exception('No filename provided') if not os.path.isfile(self.filename): - raise OSError(2,'File not found:',self.filename) + raise OSError(2, 'File not found:', self.filename) if os.stat(self.filename).st_size == 0: - raise EmptyFileError('File is empty:',self.filename) + raise EmptyFileError('File is empty:', self.filename) + + self._enforce_context_if_needed() + # Calling children function - self._read(**kwargs) + # Check if child class _read() accepts streaming parameter + import inspect + sig = inspect.signature(self._read) + if 'streaming' in sig.parameters: + # New-style _read() with streaming support + self._read(streaming=self.streaming, **kwargs) + else: + # Old-style _read() without streaming support + self._read(**kwargs) + return self def write(self, filename=None, **kwargs): if filename: @@ -48,6 +97,22 @@ def write(self, filename=None, **kwargs): # Calling children function self._write(**kwargs) + def readAll(self): + """Read remaining data after streaming header (to be implemented by children).""" + if not self.streaming: + raise RuntimeError("readAll() only valid in streaming mode") + if not self._in_context: + raise RuntimeError("readAll() requires context manager") + self._readAll() + + def readChunk(self, **kwargs): + """Read chunk of data (to be implemented by children).""" + if not self.streaming: + raise RuntimeError("readChunk() only valid in streaming mode") + if not self._in_context: + raise RuntimeError("readChunk() requires context manager") + return self._readChunk(**kwargs) + def toDataFrame(self): return self._toDataFrame() @@ -105,9 +170,9 @@ def toParquet(self, filename=None, extension='.parquet', **kwargs): # -------------------------------------------------------------------------------- - # --- Sub class methods + # --- Sub class methods # -------------------------------------------------------------------------------- - def _read(self,**kwargs): + def _read(self, streaming=False, **kwargs): raise NotImplementedError("Method must be implemented in the subclass") def _write(self): @@ -116,6 +181,14 @@ def _write(self): def _toDataFrame(self): raise NotImplementedError("Method must be implemented in the subclass") + def _readAll(self): + """Override in child classes for streaming support.""" + raise NotImplementedError(f"{self.__class__.__name__} does not support readAll()") + + def _readChunk(self, **kwargs): + """Override in child classes for streaming support.""" + raise NotImplementedError(f"{self.__class__.__name__} does not support readChunk()") + def _fromDataFrame(self): raise NotImplementedError("Method must be implemented in the subclass") diff --git a/weio/hawc2_dat_file.py b/weio/hawc2_dat_file.py index 44de75f..2d6024c 100644 --- a/weio/hawc2_dat_file.py +++ b/weio/hawc2_dat_file.py @@ -16,28 +16,57 @@ def defaultExtensions(): def formatName(): return 'HAWC2 dat file' - def __init__(self, filename=None, **kwargs): + def __init__(self, filename=None, streaming=False, **kwargs): self.info={} - self.data=np.array([]) + self.data=None if streaming else np.array([]) self.bHawc=False - super(HAWC2DatFile, self).__init__(filename=filename,**kwargs) + self._res_file = None # Store ReadHawc2 object for streaming + + # Call parent __init__ - handles streaming, filename, _in_context, _fid + File.__init__(self, filename=filename, streaming=streaming, **kwargs) - def _read(self): + def _read(self, streaming=False, **kwargs): try: res_file = ReadHawc2(self.filename) - self.data = res_file.ReadAll() + + # Store metadata (always loaded) self.info['attribute_names'] = res_file.ChInfo[0] self.info['attribute_units'] = res_file.ChInfo[1] self.info['attribute_descr'] = res_file.ChInfo[2] + self.info['NrSc'] = res_file.NrSc + self.info['NrCh'] = res_file.NrCh + self.info['Time'] = res_file.Time + self.info['Freq'] = res_file.Freq + self.info['FileFormat'] = res_file.FileFormat + if res_file.FileFormat=='BHAWC_ASCII': self.bHawc=True + + if streaming: + # Streaming mode: store ReadHawc2 object, don't load data yet + self._res_file = res_file + self.data = None + else: + # Normal mode: load all data immediately + self.data = res_file.ReadAll() + except FileNotFoundError: raise - #raise WrongFormatError('HAWC2 dat File {}: '.format(self.filename)+' File Not Found:'+e.filename) - except Exception as e: -# raise e + except Exception as e: raise WrongFormatError('HAWC2 dat File {}: '.format(self.filename)+e.args[0]) + def _readAll(self): + """Read all data in streaming mode.""" + if self._res_file is None: + raise RuntimeError("No ReadHawc2 object available (not in streaming mode)") + + # Read all channels + self.data = self._res_file.ReadAll() + + def _readChunk(self, **kwargs): + """Read chunk of data - not implemented for HAWC2 files.""" + raise NotImplementedError(f"{self.__class__.__name__} does not support readChunk()") + #def _write(self): #self.data.to_csv(self.filename,sep=self.false,index=False) diff --git a/weio/tests/test_streaming.py b/weio/tests/test_streaming.py new file mode 100644 index 0000000..86a9368 --- /dev/null +++ b/weio/tests/test_streaming.py @@ -0,0 +1,441 @@ +""" +Unit tests for streaming functionality in weio +""" +import unittest +import os +import numpy as np +from weio.tests.helpers_for_test import MyDir +from weio.fast_output_file import FASTOutputFile + + +class TestFASTOutputFileStreaming(unittest.TestCase): + """Test streaming functionality for FASTOutputFile""" + + def setUp(self): + """Set up test files""" + self.ascii_file = os.path.join(MyDir, 'FASTOut.out') + self.binary_file = os.path.join(MyDir, 'FASTOutBin.outb') + + def test_backward_compatibility_ascii(self): + """Test that normal mode still works for ASCII files""" + f = FASTOutputFile(self.ascii_file) + self.assertIsNotNone(f.data) + self.assertGreater(len(f.data), 0) + self.assertEqual(f.data.shape[0], 21) + + def test_backward_compatibility_binary(self): + """Test that normal mode still works for binary files""" + f = FASTOutputFile(self.binary_file) + self.assertIsNotNone(f.data) + self.assertGreater(len(f.data), 0) + self.assertEqual(f.data.shape[0], 201) + + def test_streaming_header_only_ascii(self): + """Test header-only reading for ASCII files""" + with FASTOutputFile(self.ascii_file, streaming=True) as f: + # Headers should be loaded + self.assertIn('attribute_names', f) + self.assertIn('attribute_units', f) + self.assertIsNotNone(f.description) + + # Data should NOT be loaded + self.assertIsNone(f.data) + + # Check header content + self.assertIn('Time', f['attribute_names']) + self.assertIn('GenSpeed', f['attribute_names']) + + def test_streaming_header_only_binary(self): + """Test header-only reading for binary files""" + with FASTOutputFile(self.binary_file, streaming=True) as f: + # Headers should be loaded + self.assertIn('attribute_names', f) + self.assertIn('attribute_units', f) + self.assertIsNotNone(f.description) + + # Data should NOT be loaded + self.assertIsNone(f.data) + + # Check header content + self.assertIn('Time', f['attribute_names']) + self.assertIn('Wind1VelX', f['attribute_names']) + + def test_streaming_with_readAll_ascii(self): + """Test streaming mode with readAll() for ASCII files""" + with FASTOutputFile(self.ascii_file, streaming=True) as f: + # Initially no data + self.assertIsNone(f.data) + + # Call readAll() + f.readAll() + + # Now data should be loaded + self.assertIsNotNone(f.data) + self.assertEqual(f.data.shape[0], 21) + self.assertIn('Time_[s]', f.data.columns) + self.assertIn('GenSpeed_[rpm]', f.data.columns) + + def test_streaming_with_readAll_binary(self): + """Test streaming mode with readAll() for binary files""" + with FASTOutputFile(self.binary_file, streaming=True) as f: + # Initially no data + self.assertIsNone(f.data) + + # Call readAll() + f.readAll() + + # Now data should be loaded + self.assertIsNotNone(f.data) + self.assertEqual(f.data.shape[0], 201) + self.assertIn('Time_[s]', f.data.columns) + self.assertIn('Wind1VelX_[m/s]', f.data.columns) + + def test_streaming_data_consistency_ascii(self): + """Test that streaming mode produces same data as normal mode (ASCII)""" + # Normal mode + f_normal = FASTOutputFile(self.ascii_file) + data_normal = f_normal.data.copy() + + # Streaming mode + with FASTOutputFile(self.ascii_file, streaming=True) as f_stream: + f_stream.readAll() + data_stream = f_stream.data.copy() + + # Should be identical + np.testing.assert_array_almost_equal(data_normal.values, data_stream.values, decimal=10) + self.assertEqual(list(data_normal.columns), list(data_stream.columns)) + + def test_streaming_data_consistency_binary(self): + """Test that streaming mode produces same data as normal mode (binary)""" + # Normal mode + f_normal = FASTOutputFile(self.binary_file) + data_normal = f_normal.data.copy() + + # Streaming mode + with FASTOutputFile(self.binary_file, streaming=True) as f_stream: + f_stream.readAll() + data_stream = f_stream.data.copy() + + # Should be identical + np.testing.assert_array_almost_equal(data_normal.values, data_stream.values, decimal=5) + self.assertEqual(list(data_normal.columns), list(data_stream.columns)) + + def test_streaming_without_context_manager_fails(self): + """Test that streaming without context manager raises error""" + with self.assertRaises(RuntimeError) as cm: + f = FASTOutputFile(self.ascii_file, streaming=True) + f.read() + + self.assertIn('context manager', str(cm.exception)) + + def test_readAll_without_streaming_fails(self): + """Test that readAll() requires streaming mode""" + f = FASTOutputFile(self.ascii_file) + + with self.assertRaises(RuntimeError) as cm: + f.readAll() + + self.assertIn('streaming mode', str(cm.exception)) + + def test_read_streaming_without_context_fails(self): + """Test that calling read() with streaming=True fails without context manager""" + # Creating object with streaming=True doesn't read immediately + f = FASTOutputFile() + f.filename = self.ascii_file + f.streaming = True + + # Trying to read without context manager should fail + with self.assertRaises(RuntimeError) as cm: + f.read() + + self.assertIn('context manager', str(cm.exception)) + + def test_file_handle_cleanup(self): + """Test that file handles are properly closed""" + # After context manager exits, file should be closed + with FASTOutputFile(self.ascii_file, streaming=True) as f: + f.readAll() + self.assertIsNotNone(f._fid) + + # After exit, file handle should be None + self.assertIsNone(f._fid) + + def test_streaming_metadata_available(self): + """Test that all metadata is available in header-only mode""" + with FASTOutputFile(self.binary_file, streaming=True) as f: + # Binary-specific metadata should be available in info_binary dict + self.assertIn('info_binary', f) + self.assertIn('FileID', f['info_binary']) + self.assertIn('NumOutChans', f['info_binary']) + self.assertIn('NT', f['info_binary']) + + # Should be able to determine file properties without reading data + num_channels = len(f['attribute_names']) + self.assertGreater(num_channels, 0) + + +class TestBaseFileStreaming(unittest.TestCase): + """Test base File class streaming functionality""" + + def test_context_manager_support(self): + """Test that File class has context manager methods""" + from weio.file import File + + # Check that methods exist + self.assertTrue(hasattr(File, '__enter__')) + self.assertTrue(hasattr(File, '__exit__')) + + def test_readAll_method_exists(self): + """Test that readAll() method exists""" + from weio.file import File + + self.assertTrue(hasattr(File, 'readAll')) + self.assertTrue(hasattr(File, '_readAll')) + + def test_readChunk_method_exists(self): + """Test that readChunk() method exists""" + from weio.file import File + + self.assertTrue(hasattr(File, 'readChunk')) + self.assertTrue(hasattr(File, '_readChunk')) + + +class TestCSVFileStreaming(unittest.TestCase): + """Test streaming functionality for CSVFile""" + + def setUp(self): + """Set up test files""" + from weio.csv_file import CSVFile + self.CSVFile = CSVFile + self.csv_comma = os.path.join(MyDir, 'CSVComma.csv') + self.csv_header = os.path.join(MyDir, 'CSVColInHeader.csv') + self.csv_tab = os.path.join(MyDir, 'CSVTab.csv') + + def test_backward_compatibility_csv(self): + """Test that normal mode still works for CSV files""" + f = self.CSVFile(self.csv_comma) + self.assertIsNotNone(f.data) + self.assertGreater(len(f.data), 0) + self.assertEqual(len(f.data.columns), 2) + self.assertIn('ColA', f.data.columns) + self.assertIn('ColB', f.data.columns) + + def test_streaming_header_only_csv(self): + """Test header-only reading for CSV files""" + with self.CSVFile(self.csv_comma, streaming=True) as f: + # Metadata should be loaded + self.assertIsNotNone(f.colNames) + self.assertGreater(len(f.colNames), 0) + self.assertIsNotNone(f.sep) + + # Data should NOT be loaded + self.assertIsNone(f.data) + + # Check column names + self.assertIn('ColA', f.colNames) + self.assertIn('ColB', f.colNames) + + def test_streaming_with_readAll_csv(self): + """Test streaming mode with readAll() for CSV files""" + with self.CSVFile(self.csv_comma, streaming=True) as f: + # Initially no data + self.assertIsNone(f.data) + + # Call readAll() + f.readAll() + + # Now data should be loaded + self.assertIsNotNone(f.data) + self.assertEqual(len(f.data.columns), 2) + self.assertIn('ColA', f.data.columns) + self.assertIn('ColB', f.data.columns) + self.assertEqual(len(f.data), 4) + + def test_streaming_data_consistency_csv(self): + """Test that streaming mode produces same data as normal mode (CSV)""" + # Normal mode + f_normal = self.CSVFile(self.csv_comma) + data_normal = f_normal.data.copy() + + # Streaming mode + with self.CSVFile(self.csv_comma, streaming=True) as f_stream: + f_stream.readAll() + data_stream = f_stream.data.copy() + + # Should be identical + np.testing.assert_array_equal(data_normal.values, data_stream.values) + self.assertEqual(list(data_normal.columns), list(data_stream.columns)) + + def test_streaming_without_context_manager_fails_csv(self): + """Test that streaming without context manager raises error""" + f = self.CSVFile() + f.filename = self.csv_comma + f.streaming = True + + # Trying to read without context manager should fail + with self.assertRaises(RuntimeError) as cm: + f.read() + + self.assertIn('context manager', str(cm.exception)) + + def test_readAll_without_streaming_fails_csv(self): + """Test that readAll() requires streaming mode""" + f = self.CSVFile(self.csv_comma) + + with self.assertRaises(RuntimeError) as cm: + f.readAll() + + self.assertIn('streaming mode', str(cm.exception)) + + def test_file_handle_cleanup_csv(self): + """Test that file handles are properly closed""" + # After context manager exits, file should be closed + with self.CSVFile(self.csv_comma, streaming=True) as f: + f.readAll() + self.assertIsNotNone(f._fid) + + # After exit, file handle should be None + self.assertIsNone(f._fid) + + def test_streaming_with_header_comments(self): + """Test streaming with files containing comment headers""" + with self.CSVFile(self.csv_header, streaming=True) as f: + # Headers should be loaded + self.assertIsNotNone(f.header) + self.assertGreater(len(f.header), 0) + self.assertIsNotNone(f.colNames) + + # Data should NOT be loaded + self.assertIsNone(f.data) + + # Check that headers were parsed + self.assertEqual(f.commentChar, '!') + + def test_streaming_readChunk_csv(self): + """Test readChunk() functionality for CSV files""" + with self.CSVFile(self.csv_comma, streaming=True) as f: + # Read first chunk (3 lines) + chunk1 = f.readChunk(nlines=3) + self.assertIsNotNone(chunk1) + self.assertEqual(len(chunk1), 3) + self.assertIn('ColA', chunk1.columns) + + # Read second chunk (remaining 1 line) + chunk2 = f.readChunk(nlines=3) + # Should get the remaining line + if chunk2 is not None: + self.assertGreaterEqual(len(chunk2), 1) + self.assertIn('ColA', chunk2.columns) + + # Try to read beyond EOF - should return None + chunk3 = f.readChunk(nlines=3) + self.assertIsNone(chunk3) + + +class TestHAWC2DatFileStreaming(unittest.TestCase): + """Test streaming functionality for HAWC2DatFile""" + + def setUp(self): + """Set up test files""" + from weio.hawc2_dat_file import HAWC2DatFile + self.HAWC2DatFile = HAWC2DatFile + self.hawc2_ascii = os.path.join(MyDir, 'HAWC2_out_ascii.sel') + self.hawc2_binary = os.path.join(MyDir, 'HAWC2_out_bin.sel') + + def test_backward_compatibility_hawc2_ascii(self): + """Test that normal mode still works for HAWC2 ASCII files""" + f = self.HAWC2DatFile(self.hawc2_ascii) + self.assertIsNotNone(f.data) + self.assertGreater(len(f.data), 0) + self.assertIn('attribute_names', f.info) + self.assertIn('attribute_units', f.info) + + def test_backward_compatibility_hawc2_binary(self): + """Test that normal mode still works for HAWC2 binary files""" + f = self.HAWC2DatFile(self.hawc2_binary) + self.assertIsNotNone(f.data) + self.assertGreater(len(f.data), 0) + self.assertIn('attribute_names', f.info) + + def test_streaming_header_only_hawc2(self): + """Test header-only reading for HAWC2 files""" + with self.HAWC2DatFile(self.hawc2_ascii, streaming=True) as f: + # Metadata should be loaded + self.assertIn('attribute_names', f.info) + self.assertIn('attribute_units', f.info) + self.assertIn('attribute_descr', f.info) + self.assertIn('NrSc', f.info) + self.assertIn('NrCh', f.info) + + # Data should NOT be loaded + self.assertIsNone(f.data) + + # Check metadata content + self.assertGreater(len(f.info['attribute_names']), 0) + self.assertGreater(f.info['NrSc'], 0) + self.assertGreater(f.info['NrCh'], 0) + + def test_streaming_with_readAll_hawc2_ascii(self): + """Test streaming mode with readAll() for HAWC2 ASCII files""" + with self.HAWC2DatFile(self.hawc2_ascii, streaming=True) as f: + # Initially no data + self.assertIsNone(f.data) + + # Call readAll() + f.readAll() + + # Now data should be loaded + self.assertIsNotNone(f.data) + self.assertEqual(f.data.shape[1], f.info['NrCh']) + + def test_streaming_with_readAll_hawc2_binary(self): + """Test streaming mode with readAll() for HAWC2 binary files""" + with self.HAWC2DatFile(self.hawc2_binary, streaming=True) as f: + # Initially no data + self.assertIsNone(f.data) + + # Call readAll() + f.readAll() + + # Now data should be loaded + self.assertIsNotNone(f.data) + self.assertEqual(f.data.shape[1], f.info['NrCh']) + + def test_streaming_data_consistency_hawc2(self): + """Test that streaming mode produces same data as normal mode (HAWC2)""" + # Normal mode + f_normal = self.HAWC2DatFile(self.hawc2_ascii) + data_normal = f_normal.data.copy() + + # Streaming mode + with self.HAWC2DatFile(self.hawc2_ascii, streaming=True) as f_stream: + f_stream.readAll() + data_stream = f_stream.data.copy() + + # Should be identical + np.testing.assert_array_almost_equal(data_normal, data_stream, decimal=10) + + def test_streaming_without_context_manager_fails_hawc2(self): + """Test that streaming without context manager raises error""" + f = self.HAWC2DatFile() + f.filename = self.hawc2_ascii + f.streaming = True + + # Trying to read without context manager should fail + with self.assertRaises(RuntimeError) as cm: + f.read() + + self.assertIn('context manager', str(cm.exception)) + + def test_readAll_without_streaming_fails_hawc2(self): + """Test that readAll() requires streaming mode""" + f = self.HAWC2DatFile(self.hawc2_ascii) + + with self.assertRaises(RuntimeError) as cm: + f.readAll() + + self.assertIn('streaming mode', str(cm.exception)) + + +if __name__ == '__main__': + unittest.main()