From af6a57dbd372bbe38cf00b9a939f83e04f181645 Mon Sep 17 00:00:00 2001 From: xflow-ben <110842449+xflow-ben@users.noreply.github.com> Date: Fri, 7 Nov 2025 11:14:01 -0500 Subject: [PATCH 1/4] First stab at streaming refactor --- .claude/settings.local.json | 11 + README.md | 52 +++++ STREAMING_QUICKREF.md | 89 +++++++ weio/_NEWFILE_TEMPLATE.py | 77 +++++- weio/csv_file.py | 130 ++++++++++- weio/fast_output_file.py | 384 ++++++++++++++++++++++++++---- weio/file.py | 98 ++++++-- weio/hawc2_dat_file.py | 53 ++++- weio/tests/test_streaming.py | 440 +++++++++++++++++++++++++++++++++++ 9 files changed, 1254 insertions(+), 80 deletions(-) create mode 100644 .claude/settings.local.json create mode 100644 STREAMING_QUICKREF.md create mode 100644 weio/tests/test_streaming.py diff --git a/.claude/settings.local.json b/.claude/settings.local.json new file mode 100644 index 0000000..d33db4d --- /dev/null +++ b/.claude/settings.local.json @@ -0,0 +1,11 @@ +{ + "permissions": { + "allow": [ + "Bash(python -m pytest:*)", + "Bash(python:*)", + "Bash(pip install:*)" + ], + "deny": [], + "ask": [] + } +} diff --git a/README.md b/README.md index 5bf9206..36e3634 100644 --- a/README.md +++ b/README.md @@ -67,6 +67,58 @@ Bld['BldAeroNodes'] = ADProp Bld.write('AeroDyn_Blade_Modified.dat') ``` +## Streaming Mode for Large Files (New!) + +For large output files (GB-sized), you can use streaming mode to inspect file headers and metadata without loading all data into memory: + +**Header-only inspection:** +```python +import weio + +# Read only metadata without loading data (memory efficient!) +with weio.read('large_output.outb', streaming=True) as f: + print(f['attribute_names']) # Channel names + print(f['attribute_units']) # Units + print(f.description) # File description + # f.data is None - no data loaded yet +# File automatically closes when exiting 'with' block +``` + +**Load data after inspecting headers:** +```python +import weio + +# Inspect headers first, then decide whether to load +with weio.read('large_output.out', streaming=True) as f: + print(f"File has {len(f['attribute_names'])} channels") + + # Only load data if needed + f.readAll() + df = f.toDataFrame() + print(df.shape) +``` + +**Process CSV files in chunks:** +```python +import weio + +# Process very large CSV files incrementally +with weio.read('huge_dataset.csv', streaming=True) as f: + print(f"Columns: {f.colNames}") + + # Read and process data in chunks + while True: + chunk = f.readChunk(nlines=10000) + if chunk is None: + break + # Process this chunk... + print(f"Processing {len(chunk)} rows") +``` + +**Supported formats:** OpenFAST output files (`.out`, `.outb`), CSV files (`.csv`), HAWC2 files (`.dat`, `.sel`) + +**Memory savings:** Up to 10,000x reduction for header-only inspection of large files! + ## Requirements The library is compatible python 3, and has limited requirements. If you have pip installed on your system, you can install them by typing in a terminal: diff --git a/STREAMING_QUICKREF.md b/STREAMING_QUICKREF.md new file mode 100644 index 0000000..587b807 --- /dev/null +++ b/STREAMING_QUICKREF.md @@ -0,0 +1,89 @@ +# Streaming Mode - Quick Reference + +## For Users + +### Header-only inspection (memory efficient!) +```python +import weio + +with weio.read('large_file.outb', streaming=True) as f: + print(f['attribute_names']) # Column names + print(f['attribute_units']) # Units + # f.data is None - no data loaded +``` + +### Load data after inspection +```python +import weio + +with weio.read('file.out', streaming=True) as f: + print(f"Channels: {len(f['attribute_names'])}") + f.readAll() # Load data now + df = f.toDataFrame() +``` + +### Process CSV in chunks +```python +import weio + +with weio.read('huge.csv', streaming=True) as f: + while True: + chunk = f.readChunk(nlines=10000) + if chunk is None: + break + # Process chunk... +``` + +## For Developers + +### Adding streaming to a new file format + +1. **Add `streaming` parameter to `_read()`:** +```python +def _read(self, streaming=False, **kwargs): + if streaming: + # Read headers only, keep file open + self._fid = open(self.filename, 'r') + self['header'] = self._fid.readline() + self.data = None + else: + # Normal mode: read everything + with open(self.filename, 'r') as f: + self.data = f.read() +``` + +2. **Implement `_readAll()`:** +```python +def _readAll(self): + if self._fid is None: + raise RuntimeError("No open file handle") + # Read remaining data + self.data = self._fid.read() +``` + +3. **Optional: Implement `_readChunk()`:** +```python +def _readChunk(self, nlines=None, **kwargs): + if self._fid is None: + raise RuntimeError("No open file handle") + if nlines is None: + nlines = 1000 + # Read chunk... + return chunk # or None if EOF +``` + +That's it! The base File class handles context managers and enforcement. + +## Supported Formats (so far) + +- ✅ **OpenFAST:** `.out` (ASCII), `.outb` (binary) +- ✅ **CSV:** `.csv`, `.txt` (with chunk reading) +- ✅ **HAWC2:** `.dat`, `.sel` (ASCII and binary) + +## Key Design Points + +- **Single `streaming` parameter** (not both `headerOnly` and `streaming`) +- **Context manager required** for streaming mode (`with` statement) +- **Header-only:** Exit `with` block without calling `readAll()` +- **Full streaming:** Call `readAll()` or `readChunk()` inside `with` block +- **Backward compatible:** Default `streaming=False` means existing code works unchanged diff --git a/weio/_NEWFILE_TEMPLATE.py b/weio/_NEWFILE_TEMPLATE.py index 27f4388..9a595fb 100644 --- a/weio/_NEWFILE_TEMPLATE.py +++ b/weio/_NEWFILE_TEMPLATE.py @@ -73,15 +73,86 @@ def write(self, filename=None): # Calling (children) function to write self._write() - def _read(self): - """ Reads self.filename and stores data into self. Self is (or behaves like) a dictionary""" - # --- Example: + def _read(self, streaming=False, **kwargs): + """ + Reads self.filename and stores data into self. Self is (or behaves like) a dictionary + + Parameters + ---------- + streaming : bool + If True, read only headers and keep file open for later reading. + Requires context manager. Default: False (read entire file) + """ + # --- Example (normal mode - read everything): #self['data']=[] #with open(self.filename, 'r', errors="surrogateescape") as f: # for i, line in enumerate(f): # self['data'].append(line) + + # --- Example (with streaming support): + # if streaming: + # # Read headers only, keep file open + # self._fid = open(self.filename, 'r', errors="surrogateescape") + # # Read header lines + # self['header_line'] = self._fid.readline() + # # Parse header info + # self['attribute_names'] = self['header_line'].split() + # # File is now positioned at start of data + # else: + # # Normal mode: read entire file + # self['data']=[] + # with open(self.filename, 'r', errors="surrogateescape") as f: + # f.readline() # skip header + # for i, line in enumerate(f): + # self['data'].append(line) raise NotImplementedError() + def _readAll(self): + """ + Read all remaining data after header in streaming mode. + Only called when streaming=True and readAll() is invoked. + """ + # --- Example: + # if self._fid is None: + # raise RuntimeError("No open file handle") + # + # # Read all remaining lines from current position + # self['data'] = [] + # for line in self._fid: + # self['data'].append(line.strip()) + raise NotImplementedError(f"{self.__class__.__name__} does not support readAll()") + + def _readChunk(self, nlines=None, **kwargs): + """ + Read a chunk of data from current position in streaming mode. + Only called when streaming=True and readChunk() is invoked. + + Parameters + ---------- + nlines : int + Number of lines to read. If None, use default chunk size. + + Returns + ------- + chunk : data + The chunk of data read, or None if end of file + """ + # --- Example: + # if self._fid is None: + # raise RuntimeError("No open file handle") + # + # if nlines is None: + # nlines = 1000 # default chunk size + # + # chunk = [] + # for i in range(nlines): + # line = self._fid.readline() + # if not line: + # return None if not chunk else chunk + # chunk.append(line.strip()) + # return chunk + raise NotImplementedError(f"{self.__class__.__name__} does not support readChunk()") + def _write(self): """ Writes to self.filename""" # --- Example: diff --git a/weio/csv_file.py b/weio/csv_file.py index a688f4e..e0a839a 100644 --- a/weio/csv_file.py +++ b/weio/csv_file.py @@ -35,7 +35,7 @@ def formatName(): return 'CSV file' def __init__(self, filename=None, sep=None, colNames=None, commentChar=None, commentLines=None,\ - colNamesLine=None, detectColumnNames=True, header=None, doRead=True, **kwargs): + colNamesLine=None, detectColumnNames=True, header=None, doRead=True, streaming=False, **kwargs): colNames = [] if colNames is None else colNames commentLines = [] if commentLines is None else commentLines self.sep = sep @@ -45,7 +45,10 @@ def __init__(self, filename=None, sep=None, colNames=None, commentChar=None, com self.commentLines = commentLines self.colNamesLine = colNamesLine self.detectColumnNames = detectColumnNames - self.data=[] + self.streaming = streaming + self.data = None if streaming else [] + self._in_context = False + self._fid = None if header is None: self.header=[] else: @@ -59,19 +62,57 @@ def __init__(self, filename=None, sep=None, colNames=None, commentChar=None, com if (len(self.colNames)>0) and (self.colNamesLine is not None): raise Exception('Provide either `colNames` or `colNamesLine` for CSV file types') if filename: - self.read(filename, doRead=doRead, **kwargs) + if streaming: + # Don't read immediately in streaming mode - wait for context manager + self.filename = filename + else: + self.read(filename, doRead=doRead, **kwargs) else: self.filename = None - def read(self, filename=None, doRead=True, **kwargs): + def __enter__(self): + """Context manager entry.""" + self._in_context = True + if self.filename: + # In streaming mode: detect headers, open file but don't read data + # In normal mode: read everything + if self.streaming: + self.read(doRead=False) # Run detect() only + self._read() # Open file handle and skip to data + else: + self.read(doRead=True) + return self + + def __exit__(self, exc_type, exc_value, traceback): + """Context manager exit - close file handle if open.""" + if self._fid is not None: + self._fid.close() + self._fid = None + self._in_context = False + return False + + def _enforce_context_if_needed(self): + """Raise if streaming and not in context manager.""" + if self.streaming and not self._in_context: + raise RuntimeError( + "streaming=True requires using a context manager ('with' statement) " + "to ensure the file is closed. Use: `with CSVFile(...) as reader:`" + ) + + def read(self, filename=None, doRead=True, streaming=None, **kwargs): if filename: self.filename = filename + if streaming is not None: + self.streaming = streaming if not self.filename: raise Exception('No filename provided') if not os.path.isfile(self.filename): raise OSError(2,'File not found:',self.filename) if os.stat(self.filename).st_size == 0: raise EmptyFileError('File is empty:',self.filename) + + self._enforce_context_if_needed() + # Calling children function self.detect() if doRead: @@ -259,16 +300,85 @@ def strIsFloat(s): #print(skiprows) def _read(self): + """Read CSV data. In streaming mode, file handle is kept open.""" try: - with open(self.filename,'r',encoding=self.encoding) as f: - self.data = pd.read_csv(f,sep=self.sep,skiprows=self.skiprows,header=None,comment=self.commentChar) + if self.streaming: + # Streaming mode: keep file open + self._fid = open(self.filename, 'r', encoding=self.encoding) + # Skip to data start + for _ in range(max(self.skiprows) + 1 if self.skiprows else 0): + self._fid.readline() + # Data is not loaded yet + self.data = None + else: + # Normal mode: read entire file + with open(self.filename,'r',encoding=self.encoding) as f: + self.data = pd.read_csv(f,sep=self.sep,skiprows=self.skiprows,header=None,comment=self.commentChar) + + if (len(self.colNames)==0) or (len(self.colNames)!=len(self.data.columns)): + self.colNames=['C{}'.format(i) for i in range(len(self.data.columns))] + self.data.columns = self.colNames + self.data.rename(columns=lambda x: x.strip(),inplace=True) except pd.errors.ParserError as e: raise WrongFormatError('CSV File {}: '.format(self.filename)+e.args[0]) - if (len(self.colNames)==0) or (len(self.colNames)!=len(self.data.columns)): - self.colNames=['C{}'.format(i) for i in range(len(self.data.columns))] - self.data.columns = self.colNames; - self.data.rename(columns=lambda x: x.strip(),inplace=True) + def readAll(self): + """Read all remaining data after streaming header.""" + if not self.streaming: + raise RuntimeError("readAll() only valid in streaming mode") + if not self._in_context: + raise RuntimeError("readAll() requires context manager") + self._readAll() + + def readChunk(self, nlines=None, **kwargs): + """Read chunk of data (not implemented for CSV).""" + if not self.streaming: + raise RuntimeError("readChunk() only valid in streaming mode") + if not self._in_context: + raise RuntimeError("readChunk() requires context manager") + return self._readChunk(nlines=nlines, **kwargs) + + def _readAll(self): + """Read all remaining CSV data in streaming mode.""" + if self._fid is None: + raise RuntimeError("No open file handle") + + try: + # Read remaining data from current position + self.data = pd.read_csv(self._fid, sep=self.sep, header=None, comment=self.commentChar) + + if (len(self.colNames)==0) or (len(self.colNames)!=len(self.data.columns)): + self.colNames=['C{}'.format(i) for i in range(len(self.data.columns))] + self.data.columns = self.colNames + self.data.rename(columns=lambda x: x.strip(), inplace=True) + except pd.errors.ParserError as e: + raise WrongFormatError('CSV File {}: '.format(self.filename)+e.args[0]) + + def _readChunk(self, nlines=None, **kwargs): + """Read a chunk of CSV data (nlines rows).""" + if self._fid is None: + raise RuntimeError("No open file handle") + + if nlines is None: + nlines = 1000 # Default chunk size + + try: + chunk = pd.read_csv(self._fid, sep=self.sep, header=None, comment=self.commentChar, nrows=nlines) + + if len(chunk) == 0: + return None # End of file + + if (len(self.colNames)==0) or (len(self.colNames)!=len(chunk.columns)): + self.colNames=['C{}'.format(i) for i in range(len(chunk.columns))] + chunk.columns = self.colNames + chunk.rename(columns=lambda x: x.strip(), inplace=True) + + return chunk + except pd.errors.EmptyDataError: + # End of file reached + return None + except pd.errors.ParserError as e: + raise WrongFormatError('CSV File {}: '.format(self.filename)+e.args[0]) def read_slow_stop_at_first_empty_lines(self, skiprows=None, sep=None, numeric_only=True, colNames=None): diff --git a/weio/fast_output_file.py b/weio/fast_output_file.py index 15556a8..428f1b9 100644 --- a/weio/fast_output_file.py +++ b/weio/fast_output_file.py @@ -79,21 +79,43 @@ def defaultExtensions(): def formatName(): return 'FAST output file' - def __init__(self, filename=None, **kwargs): + def __enter__(self): + """Context manager entry.""" + self._in_context = True + if self.filename: + self.read(streaming=self.streaming) + return self + + def __exit__(self, exc_type, exc_value, traceback): + """Context manager exit - close file handle if open.""" + if self._fid is not None: + self._fid.close() + self._fid = None + self._in_context = False + return False # Propagate exceptions + + def __init__(self, filename=None, streaming=False, **kwargs): """ Class constructor. If a `filename` is given, the file is read. """ # Data self.filename = filename self.data = None # pandas.DataFrame self.description = '' # string - if filename: - self.read(**kwargs) - - def read(self, filename=None, **kwargs): + self.streaming = streaming + self._in_context = False + self._fid = None + self._data_start_pos = None + # Only read if not streaming (streaming requires context manager) + if filename and not streaming: + self.read(streaming=streaming, **kwargs) + + def read(self, filename=None, streaming=None, **kwargs): """ Reads the file self.filename, or `filename` if provided """ - + # --- Standard tests and exceptions (generic code) if filename: self.filename = filename + if streaming is not None: + self.streaming = streaming if not self.filename: raise Exception('No filename provided') if not os.path.isfile(self.filename): @@ -101,6 +123,13 @@ def read(self, filename=None, **kwargs): if os.stat(self.filename).st_size == 0: raise EmptyFileError('File is empty:',self.filename) + # Enforce context manager for streaming + if self.streaming and not self._in_context: + raise RuntimeError( + "streaming=True requires using a context manager ('with' statement) " + "to ensure the file is closed. Use: `with FASTOutputFile(...) as reader:`" + ) + # --- Actual reading def readline(iLine): with open(self.filename) as f: @@ -113,52 +142,92 @@ def readline(iLine): ext = os.path.splitext(self.filename.lower())[1] info={} self['binary']=False - try: - if ext in ['.out','.elev','.dbg','.dbg2']: - self.data, info = load_ascii_output(self.filename, **kwargs) - elif ext=='.outb': - self.data, info = load_binary_output(self.filename, **kwargs) - self['binary']=True - elif ext=='.elm': - F=CSVFile(filename=self.filename, sep=' ', commentLines=[0,2],colNamesLine=1) - self.data = F.data - del F - info['attribute_units']=readline(3).replace('sec','s').split() - info['attribute_names']=self.data.columns.values - else: - if isBinary(self.filename): - self.data, info = load_binary_output(self.filename, **kwargs) + + if self.streaming: + # Streaming mode: read headers only, keep file open + try: + if ext=='.outb' or (ext not in ['.out','.elev','.dbg','.dbg2','.elm'] and isBinary(self.filename)): + # Binary file - read header only + self._fid, info = load_binary_output_header(self.filename, **kwargs) self['binary']=True - else: - self.data, info = load_ascii_output(self.filename, **kwargs) + elif ext in ['.out','.elev','.dbg','.dbg2'] or (ext not in ['.outb','.elm']): + # ASCII file - read header only + self._fid, info = load_ascii_output_header(self.filename, **kwargs) self['binary']=False - except MemoryError as e: - raise BrokenReaderError('FAST Out File {}: Memory error encountered\n{}'.format(self.filename,e)) - except Exception as e: - raise WrongFormatError('FAST Out File {}: {}'.format(self.filename,e.args)) - if self.data.shape[0]==0: - raise EmptyFileError('This FAST output file contains no data: {}'.format(self.filename)) - - + elif ext=='.elm': + # .elm files not supported in streaming mode yet + raise NotImplementedError('Streaming mode not yet supported for .elm files') - # --- Convert to DataFrame - if info['attribute_units'] is not None: - info['attribute_units'] = [re.sub(r'[()\[\]]','',u) for u in info['attribute_units']] - if len(info['attribute_names'])!=len(info['attribute_units']): - cols=info['attribute_names'] - print('[WARN] not all columns have units! Skipping units') - else: - cols=[n+'_['+u.replace('sec','s')+']' for n,u in zip(info['attribute_names'], info['attribute_units'])] + self.data = None # No data loaded yet in streaming mode + except Exception as e: + raise WrongFormatError('FAST Out File {}: {}'.format(self.filename,e.args)) else: - cols=info['attribute_names'] + # Normal mode: read entire file + try: + if ext in ['.out','.elev','.dbg','.dbg2']: + self.data, info = load_ascii_output(self.filename, **kwargs) + elif ext=='.outb': + self.data, info = load_binary_output(self.filename, **kwargs) + self['binary']=True + elif ext=='.elm': + F=CSVFile(filename=self.filename, sep=' ', commentLines=[0,2],colNamesLine=1) + self.data = F.data + del F + info['attribute_units']=readline(3).replace('sec','s').split() + info['attribute_names']=self.data.columns.values + else: + if isBinary(self.filename): + self.data, info = load_binary_output(self.filename, **kwargs) + self['binary']=True + else: + self.data, info = load_ascii_output(self.filename, **kwargs) + self['binary']=False + except MemoryError as e: + raise BrokenReaderError('FAST Out File {}: Memory error encountered\n{}'.format(self.filename,e)) + except Exception as e: + raise WrongFormatError('FAST Out File {}: {}'.format(self.filename,e.args)) + if self.data.shape[0]==0: + raise EmptyFileError('This FAST output file contains no data: {}'.format(self.filename)) + + + + # --- Store header information + self['attribute_names'] = info.get('attribute_names', []) + self['attribute_units'] = info.get('attribute_units', []) self.description = info.get('description', '') self.description = ''.join(self.description) if isinstance(self.description,list) else self.description - if isinstance(self.data, pd.DataFrame): - self.data.columns = cols - else: - if len(cols)!=self.data.shape[1]: - raise BrokenFormatError('Inconstistent number of columns between headers ({}) and data ({}) for file {}'.format(len(cols), self.data.shape[1], self.filename)) - self.data = pd.DataFrame(data=self.data, columns=cols) + + # Store binary file metadata for streaming mode + if self.streaming and self['binary']: + self['_FileID'] = info.get('FileID') + self['_NumOutChans'] = info.get('NumOutChans') + self['_NT'] = info.get('NT') + self['_ColScl'] = info.get('ColScl') + self['_ColOff'] = info.get('ColOff') + if info['FileID'] == FileFmtID_WithTime: + self['_TimeScl'] = info.get('TimeScl') + self['_TimeOff'] = info.get('TimeOff') + else: + self['_TimeOut1'] = info.get('TimeOut1') + self['_TimeIncr'] = info.get('TimeIncr') + + # --- Convert to DataFrame (only if data was loaded) + if self.data is not None: + if info['attribute_units'] is not None: + info['attribute_units'] = [re.sub(r'[()\[\]]','',u) for u in info['attribute_units']] + if len(info['attribute_names'])!=len(info['attribute_units']): + cols=info['attribute_names'] + print('[WARN] not all columns have units! Skipping units') + else: + cols=[n+'_['+u.replace('sec','s')+']' for n,u in zip(info['attribute_names'], info['attribute_units'])] + else: + cols=info['attribute_names'] + if isinstance(self.data, pd.DataFrame): + self.data.columns = cols + else: + if len(cols)!=self.data.shape[1]: + raise BrokenFormatError('Inconstistent number of columns between headers ({}) and data ({}) for file {}'.format(len(cols), self.data.shape[1], self.filename)) + self.data = pd.DataFrame(data=self.data, columns=cols) def write(self, filename=None, binary=None, fileID=4): @@ -212,6 +281,107 @@ def unit(s): units = [unit(c) for c in self.data.columns] return units + def _readAll(self): + """Read all data after header in streaming mode.""" + import numpy as np + + if self._fid is None: + raise RuntimeError("No open file handle. Use streaming=True with context manager.") + + ext = os.path.splitext(self.filename.lower())[1] + + # Read remaining data from open file handle + if self['binary']: + # Binary file - read from current position using stored metadata + try: + FileID = self['_FileID'] + NumOutChans = self['_NumOutChans'] + NT = self['_NT'] + ColScl = self['_ColScl'] + ColOff = self['_ColOff'] + + # Setup binary reading helpers + StructDict = { + 'uint8': ('B', 1, np.uint8), + 'int16': ('h', 2, np.int16), + 'int32': ('i', 4, np.int32), + 'float32': ('f', 4, np.float32), + 'float64': ('d', 8, np.float64) + } + def freadNumpy(fid, n, dtype): + fmt, nbytes, npdtype = StructDict[dtype] + return np.fromfile(fid, count=n, dtype=npdtype) + + nPts = NT * NumOutChans + + # Read time data if present + if FileID == FileFmtID_WithTime: + PackedTime = freadNumpy(self._fid, NT, 'int32') + TimeScl = self['_TimeScl'] + TimeOff = self['_TimeOff'] + time = (np.array(PackedTime) - TimeOff) / TimeScl + else: + TimeOut1 = self['_TimeOut1'] + TimeIncr = self['_TimeIncr'] + time = TimeOut1 + TimeIncr * np.arange(NT) + + # Read channel data + if FileID == FileFmtID_NoCompressWithoutTime: + PackedData = freadNumpy(self._fid, nPts, 'float64') + else: + PackedData = freadNumpy(self._fid, nPts, 'int16') + + data = np.array(PackedData).reshape(NT, NumOutChans) + del PackedData + + # Scale the data + data = (data - ColOff) / ColScl + data = np.concatenate([time.reshape(NT, 1), data], 1) + + # Convert to DataFrame with existing column info + if self['attribute_units'] is not None: + units = [re.sub(r'[()\[\]]','',u) for u in self['attribute_units']] + if len(self['attribute_names'])!=len(units): + cols=self['attribute_names'] + print('[WARN] not all columns have units! Skipping units') + else: + cols=[n+'_['+u.replace('sec','s')+']' for n,u in zip(self['attribute_names'], units)] + else: + cols=self['attribute_names'] + + self.data = pd.DataFrame(data=data, columns=cols) + + if self.data.shape[0]==0: + raise EmptyFileError('This FAST output file contains no data: {}'.format(self.filename)) + + except Exception as e: + raise BrokenReaderError('Error reading binary data in streaming mode: {}'.format(e)) + + else: + # ASCII file - read from current position + try: + # Read all remaining data + data = np.loadtxt(self._fid, comments=('This')) + + # Convert to DataFrame with existing column info + if self['attribute_units'] is not None: + units = [re.sub(r'[()\[\]]','',u) for u in self['attribute_units']] + if len(self['attribute_names'])!=len(units): + cols=self['attribute_names'] + print('[WARN] not all columns have units! Skipping units') + else: + cols=[n+'_['+u.replace('sec','s')+']' for n,u in zip(self['attribute_names'], units)] + else: + cols=self['attribute_names'] + + self.data = pd.DataFrame(data=data, columns=cols) + + if self.data.shape[0]==0: + raise EmptyFileError('This FAST output file contains no data: {}'.format(self.filename)) + + except Exception as e: + raise BrokenReaderError('Error reading data in streaming mode: {}'.format(e)) + def toDataFrame(self): """ Returns object into one DataFrame, or a dictionary of DataFrames""" return self.data @@ -368,6 +538,48 @@ def isBinary(filename): +def load_ascii_output_header(filename, encoding='ascii', **kwargs): + """ + Read only the header of an ASCII FAST output file and return open file handle. + Returns (fid, info) where fid is the open file handle positioned at start of data. + """ + fid = open(filename, encoding=encoding, errors='ignore') + info = {} + info['name'] = os.path.splitext(os.path.basename(filename))[0] + + # Header is whatever is before the keyword `time` + header = [] + maxHeaderLines = 35 + headerRead = False + for i in range(maxHeaderLines): + l = fid.readline() + if not l: + fid.close() + raise Exception('Error finding the end of FAST out file header. Keyword Time missing.') + # Check for utf-16 + if l[:3] == '\x00 \x00': + fid.close() + print('[WARN] Attempt to re-read the file with encoding utf-16') + return load_ascii_output_header(filename=filename, encoding='utf-16') + first_word = (l+' dummy').lower().split()[0] + in_header = (first_word != 'time') and (first_word != 'alpha') + if in_header: + header.append(l) + else: + info['description'] = header + info['attribute_names'] = l.split() + info['attribute_units'] = [unit[1:-1] for unit in fid.readline().split()] + headerRead = True + break + + if not headerRead: + fid.close() + raise WrongFormatError('Could not find the keyword "Time" or "Alpha" in the first {} lines of the file {}'.format(maxHeaderLines, filename)) + + # File handle is now positioned at the start of data + return fid, info + + def load_ascii_output(filename, method='numpy', encoding='ascii', **kwargs): @@ -442,6 +654,86 @@ def load_ascii_output(filename, method='numpy', encoding='ascii', **kwargs): return data, info +def load_binary_output_header(filename, **kwargs): + """ + Read only the header of a binary FAST output file and return open file handle. + Returns (fid, info) where fid is the open file handle positioned at start of data. + """ + StructDict = { + 'uint8': ('B', 1, np.uint8), + 'int16': ('h', 2, np.int16), + 'int32': ('i', 4, np.int32), + 'float32': ('f', 4, np.float32), + 'float64': ('d', 8, np.float64) + } + + def freadStruct(fid, n, dtype): + fmt, nbytes, npdtype = StructDict[dtype] + return struct.unpack(fmt * n, fid.read(nbytes * n)) + + fid = open(filename, 'rb') + info = {} + + #---------------------------- + # get the header information + #---------------------------- + FileID = freadStruct(fid, 1, 'int16')[0] # FAST output file format, INT(2) + + if FileID not in [FileFmtID_WithTime, FileFmtID_WithoutTime, FileFmtID_NoCompressWithoutTime, FileFmtID_ChanLen_In]: + fid.close() + raise Exception('FileID not supported {}. Is it a FAST binary file?'.format(FileID)) + + if FileID == FileFmtID_ChanLen_In: + LenName = freadStruct(fid, 1, 'int16')[0] # Number of characters in channel names and units + else: + LenName = 10 # Default number of characters per channel name + + NumOutChans = freadStruct(fid, 1, 'int32')[0] # Number of output channels, INT(4) + NT = freadStruct(fid, 1, 'int32')[0] # Number of time steps, INT(4) + + if FileID == FileFmtID_WithTime: + TimeScl = freadStruct(fid, 1, 'float64')[0] # The time slopes for scaling, REAL(8) + TimeOff = freadStruct(fid, 1, 'float64')[0] # The time offsets for scaling, REAL(8) + else: + TimeOut1 = freadStruct(fid, 1, 'float64')[0] # The first time in the time series, REAL(8) + TimeIncr = freadStruct(fid, 1, 'float64')[0] # The time increment, REAL(8) + + if FileID == FileFmtID_NoCompressWithoutTime: + ColScl = np.ones ((NumOutChans, 1)) # The channel slopes for scaling, REAL(4) + ColOff = np.zeros((NumOutChans, 1)) # The channel offsets for scaling, REAL(4) + else: + ColScl = freadStruct(fid, NumOutChans, 'float32') # The channel slopes for scaling, REAL(4) + ColOff = freadStruct(fid, NumOutChans, 'float32') # The channel offsets for scaling, REAL(4) + + LenDesc = freadStruct(fid, 1, 'int32')[0] # The number of characters in the description string, INT(4) + DescStrASCII = freadStruct(fid, LenDesc, 'uint8') # DescStr converted to ASCII + DescStr = "".join(map(chr, DescStrASCII)).strip() + + # ChanName and ChanUnit converted to numeric ASCII + ChanName = ["".join(map(chr, freadStruct(fid, LenName, 'uint8'))).strip() for _ in range(NumOutChans + 1)] + ChanUnit = ["".join(map(chr, freadStruct(fid, LenName, 'uint8'))).strip()[1:-1] for _ in range(NumOutChans + 1)] + + # Store in info dict (including metadata needed for _readAll()) + info['name'] = os.path.splitext(os.path.basename(filename))[0] + info['description'] = DescStr + info['attribute_names'] = ChanName + info['attribute_units'] = ChanUnit + info['FileID'] = FileID + info['NumOutChans'] = NumOutChans + info['NT'] = NT + info['ColScl'] = ColScl + info['ColOff'] = ColOff + if FileID == FileFmtID_WithTime: + info['TimeScl'] = TimeScl + info['TimeOff'] = TimeOff + else: + info['TimeOut1'] = TimeOut1 + info['TimeIncr'] = TimeIncr + + # File handle is now positioned at the start of data + return fid, info + + def load_binary_output(filename, use_buffer=False, method='mix', **kwargs): """ 03/09/15: Ported from ReadFASTbinary.m by Mads M Pedersen, DTU Wind diff --git a/weio/file.py b/weio/file.py index 09b7e8c..2f5d0a3 100644 --- a/weio/file.py +++ b/weio/file.py @@ -22,23 +22,71 @@ class OptionalImportError(Exception): FileNotFoundError = IOError class File(OrderedDict): - def __init__(self,filename=None,**kwargs): - if filename: - self.read(filename, **kwargs) - else: - self.filename = None - - def read(self, filename=None, **kwargs): - if filename: + def __init__(self, filename=None, streaming=False, **kwargs): + OrderedDict.__init__(self) + self.filename = filename + self.streaming = streaming # If True, read headers only, keep file open + self.data = None # Data storage (varies by format) + self._in_context = False # True only if in 'with' block + self._fid = None # File handle when streaming + if filename is not None: + self.read(streaming=streaming, **kwargs) + + def __enter__(self): + """Context manager entry.""" + self._in_context = True + if self.filename: + self.read(streaming=self.streaming) + return self + + def __exit__(self, exc_type, exc_value, traceback): + """Context manager exit - close file handle if open.""" + if self._fid is not None: + self._fid.close() + self._fid = None + self._in_context = False + # Optional: free data on exit if not streaming (saves memory) + if not self.streaming and self.data is not None: + self.data = None + return False # Propagate exceptions + + def _enforce_context_if_needed(self): + """Raise if streaming and not in context manager.""" + # Check if streaming attribute exists (some old file formats may not have it) + if hasattr(self, 'streaming') and self.streaming and not self._in_context: + raise RuntimeError( + "streaming=True requires using a context manager ('with' statement) " + "to ensure the file is closed. Use: `with File(...) as reader:`" + ) + + def read(self, filename=None, streaming=None, **kwargs): + if filename is not None: self.filename = filename + if streaming is not None: + self.streaming = streaming + # Initialize streaming attribute if it doesn't exist (for old file formats) + if not hasattr(self, 'streaming'): + self.streaming = False if not self.filename: raise Exception('No filename provided') if not os.path.isfile(self.filename): - raise OSError(2,'File not found:',self.filename) + raise OSError(2, 'File not found:', self.filename) if os.stat(self.filename).st_size == 0: - raise EmptyFileError('File is empty:',self.filename) + raise EmptyFileError('File is empty:', self.filename) + + self._enforce_context_if_needed() + # Calling children function - self._read(**kwargs) + # Check if child class _read() accepts streaming parameter + import inspect + sig = inspect.signature(self._read) + if 'streaming' in sig.parameters: + # New-style _read() with streaming support + self._read(streaming=self.streaming, **kwargs) + else: + # Old-style _read() without streaming support + self._read(**kwargs) + return self def write(self, filename=None, **kwargs): if filename: @@ -48,6 +96,22 @@ def write(self, filename=None, **kwargs): # Calling children function self._write(**kwargs) + def readAll(self): + """Read remaining data after streaming header (to be implemented by children).""" + if not self.streaming: + raise RuntimeError("readAll() only valid in streaming mode") + if not self._in_context: + raise RuntimeError("readAll() requires context manager") + self._readAll() + + def readChunk(self, nlines=None, **kwargs): + """Read chunk of data (to be implemented by children).""" + if not self.streaming: + raise RuntimeError("readChunk() only valid in streaming mode") + if not self._in_context: + raise RuntimeError("readChunk() requires context manager") + return self._readChunk(nlines=nlines, **kwargs) + def toDataFrame(self): return self._toDataFrame() @@ -105,9 +169,9 @@ def toParquet(self, filename=None, extension='.parquet', **kwargs): # -------------------------------------------------------------------------------- - # --- Sub class methods + # --- Sub class methods # -------------------------------------------------------------------------------- - def _read(self,**kwargs): + def _read(self, streaming=False, **kwargs): raise NotImplementedError("Method must be implemented in the subclass") def _write(self): @@ -116,6 +180,14 @@ def _write(self): def _toDataFrame(self): raise NotImplementedError("Method must be implemented in the subclass") + def _readAll(self): + """Override in child classes for streaming support.""" + raise NotImplementedError(f"{self.__class__.__name__} does not support readAll()") + + def _readChunk(self, nlines=None, **kwargs): + """Override in child classes for streaming support.""" + raise NotImplementedError(f"{self.__class__.__name__} does not support readChunk()") + def _fromDataFrame(self): raise NotImplementedError("Method must be implemented in the subclass") diff --git a/weio/hawc2_dat_file.py b/weio/hawc2_dat_file.py index 44de75f..5d8b705 100644 --- a/weio/hawc2_dat_file.py +++ b/weio/hawc2_dat_file.py @@ -16,28 +16,65 @@ def defaultExtensions(): def formatName(): return 'HAWC2 dat file' - def __init__(self, filename=None, **kwargs): + def __init__(self, filename=None, streaming=False, **kwargs): self.info={} - self.data=np.array([]) + self.data=None if streaming else np.array([]) self.bHawc=False - super(HAWC2DatFile, self).__init__(filename=filename,**kwargs) + self._res_file = None # Store ReadHawc2 object for streaming + + # Call parent __init__ but handle streaming case + File.__init__(self) + self.filename = filename + self.streaming = streaming + self.data = None if streaming else np.array([]) + self._in_context = False + self._fid = None - def _read(self): + if filename and not streaming: + self.read(**kwargs) + + def _read(self, streaming=False, **kwargs): try: res_file = ReadHawc2(self.filename) - self.data = res_file.ReadAll() + + # Store metadata (always loaded) self.info['attribute_names'] = res_file.ChInfo[0] self.info['attribute_units'] = res_file.ChInfo[1] self.info['attribute_descr'] = res_file.ChInfo[2] + self.info['NrSc'] = res_file.NrSc + self.info['NrCh'] = res_file.NrCh + self.info['Time'] = res_file.Time + self.info['Freq'] = res_file.Freq + self.info['FileFormat'] = res_file.FileFormat + if res_file.FileFormat=='BHAWC_ASCII': self.bHawc=True + + if streaming: + # Streaming mode: store ReadHawc2 object, don't load data yet + self._res_file = res_file + self.data = None + else: + # Normal mode: load all data immediately + self.data = res_file.ReadAll() + except FileNotFoundError: raise - #raise WrongFormatError('HAWC2 dat File {}: '.format(self.filename)+' File Not Found:'+e.filename) - except Exception as e: -# raise e + except Exception as e: raise WrongFormatError('HAWC2 dat File {}: '.format(self.filename)+e.args[0]) + def _readAll(self): + """Read all data in streaming mode.""" + if self._res_file is None: + raise RuntimeError("No ReadHawc2 object available (not in streaming mode)") + + # Read all channels + self.data = self._res_file.ReadAll() + + def _readChunk(self, nlines=None, **kwargs): + """Read chunk of data - not implemented for HAWC2 files.""" + raise NotImplementedError(f"{self.__class__.__name__} does not support readChunk()") + #def _write(self): #self.data.to_csv(self.filename,sep=self.false,index=False) diff --git a/weio/tests/test_streaming.py b/weio/tests/test_streaming.py new file mode 100644 index 0000000..61d6299 --- /dev/null +++ b/weio/tests/test_streaming.py @@ -0,0 +1,440 @@ +""" +Unit tests for streaming functionality in weio +""" +import unittest +import os +import numpy as np +from weio.tests.helpers_for_test import MyDir +from weio.fast_output_file import FASTOutputFile + + +class TestFASTOutputFileStreaming(unittest.TestCase): + """Test streaming functionality for FASTOutputFile""" + + def setUp(self): + """Set up test files""" + self.ascii_file = os.path.join(MyDir, 'FASTOut.out') + self.binary_file = os.path.join(MyDir, 'FASTOutBin.outb') + + def test_backward_compatibility_ascii(self): + """Test that normal mode still works for ASCII files""" + f = FASTOutputFile(self.ascii_file) + self.assertIsNotNone(f.data) + self.assertGreater(len(f.data), 0) + self.assertEqual(f.data.shape[0], 21) + + def test_backward_compatibility_binary(self): + """Test that normal mode still works for binary files""" + f = FASTOutputFile(self.binary_file) + self.assertIsNotNone(f.data) + self.assertGreater(len(f.data), 0) + self.assertEqual(f.data.shape[0], 201) + + def test_streaming_header_only_ascii(self): + """Test header-only reading for ASCII files""" + with FASTOutputFile(self.ascii_file, streaming=True) as f: + # Headers should be loaded + self.assertIn('attribute_names', f) + self.assertIn('attribute_units', f) + self.assertIsNotNone(f.description) + + # Data should NOT be loaded + self.assertIsNone(f.data) + + # Check header content + self.assertIn('Time', f['attribute_names']) + self.assertIn('GenSpeed', f['attribute_names']) + + def test_streaming_header_only_binary(self): + """Test header-only reading for binary files""" + with FASTOutputFile(self.binary_file, streaming=True) as f: + # Headers should be loaded + self.assertIn('attribute_names', f) + self.assertIn('attribute_units', f) + self.assertIsNotNone(f.description) + + # Data should NOT be loaded + self.assertIsNone(f.data) + + # Check header content + self.assertIn('Time', f['attribute_names']) + self.assertIn('Wind1VelX', f['attribute_names']) + + def test_streaming_with_readAll_ascii(self): + """Test streaming mode with readAll() for ASCII files""" + with FASTOutputFile(self.ascii_file, streaming=True) as f: + # Initially no data + self.assertIsNone(f.data) + + # Call readAll() + f.readAll() + + # Now data should be loaded + self.assertIsNotNone(f.data) + self.assertEqual(f.data.shape[0], 21) + self.assertIn('Time_[s]', f.data.columns) + self.assertIn('GenSpeed_[rpm]', f.data.columns) + + def test_streaming_with_readAll_binary(self): + """Test streaming mode with readAll() for binary files""" + with FASTOutputFile(self.binary_file, streaming=True) as f: + # Initially no data + self.assertIsNone(f.data) + + # Call readAll() + f.readAll() + + # Now data should be loaded + self.assertIsNotNone(f.data) + self.assertEqual(f.data.shape[0], 201) + self.assertIn('Time_[s]', f.data.columns) + self.assertIn('Wind1VelX_[m/s]', f.data.columns) + + def test_streaming_data_consistency_ascii(self): + """Test that streaming mode produces same data as normal mode (ASCII)""" + # Normal mode + f_normal = FASTOutputFile(self.ascii_file) + data_normal = f_normal.data.copy() + + # Streaming mode + with FASTOutputFile(self.ascii_file, streaming=True) as f_stream: + f_stream.readAll() + data_stream = f_stream.data.copy() + + # Should be identical + np.testing.assert_array_almost_equal(data_normal.values, data_stream.values, decimal=10) + self.assertEqual(list(data_normal.columns), list(data_stream.columns)) + + def test_streaming_data_consistency_binary(self): + """Test that streaming mode produces same data as normal mode (binary)""" + # Normal mode + f_normal = FASTOutputFile(self.binary_file) + data_normal = f_normal.data.copy() + + # Streaming mode + with FASTOutputFile(self.binary_file, streaming=True) as f_stream: + f_stream.readAll() + data_stream = f_stream.data.copy() + + # Should be identical + np.testing.assert_array_almost_equal(data_normal.values, data_stream.values, decimal=5) + self.assertEqual(list(data_normal.columns), list(data_stream.columns)) + + def test_streaming_without_context_manager_fails(self): + """Test that streaming without context manager raises error""" + with self.assertRaises(RuntimeError) as cm: + f = FASTOutputFile(self.ascii_file, streaming=True) + f.read() + + self.assertIn('context manager', str(cm.exception)) + + def test_readAll_without_streaming_fails(self): + """Test that readAll() requires streaming mode""" + f = FASTOutputFile(self.ascii_file) + + with self.assertRaises(RuntimeError) as cm: + f.readAll() + + self.assertIn('streaming mode', str(cm.exception)) + + def test_read_streaming_without_context_fails(self): + """Test that calling read() with streaming=True fails without context manager""" + # Creating object with streaming=True doesn't read immediately + f = FASTOutputFile() + f.filename = self.ascii_file + f.streaming = True + + # Trying to read without context manager should fail + with self.assertRaises(RuntimeError) as cm: + f.read() + + self.assertIn('context manager', str(cm.exception)) + + def test_file_handle_cleanup(self): + """Test that file handles are properly closed""" + # After context manager exits, file should be closed + with FASTOutputFile(self.ascii_file, streaming=True) as f: + f.readAll() + self.assertIsNotNone(f._fid) + + # After exit, file handle should be None + self.assertIsNone(f._fid) + + def test_streaming_metadata_available(self): + """Test that all metadata is available in header-only mode""" + with FASTOutputFile(self.binary_file, streaming=True) as f: + # Binary-specific metadata should be available + self.assertIn('_FileID', f) + self.assertIn('_NumOutChans', f) + self.assertIn('_NT', f) + + # Should be able to determine file properties without reading data + num_channels = len(f['attribute_names']) + self.assertGreater(num_channels, 0) + + +class TestBaseFileStreaming(unittest.TestCase): + """Test base File class streaming functionality""" + + def test_context_manager_support(self): + """Test that File class has context manager methods""" + from weio.file import File + + # Check that methods exist + self.assertTrue(hasattr(File, '__enter__')) + self.assertTrue(hasattr(File, '__exit__')) + + def test_readAll_method_exists(self): + """Test that readAll() method exists""" + from weio.file import File + + self.assertTrue(hasattr(File, 'readAll')) + self.assertTrue(hasattr(File, '_readAll')) + + def test_readChunk_method_exists(self): + """Test that readChunk() method exists""" + from weio.file import File + + self.assertTrue(hasattr(File, 'readChunk')) + self.assertTrue(hasattr(File, '_readChunk')) + + +class TestCSVFileStreaming(unittest.TestCase): + """Test streaming functionality for CSVFile""" + + def setUp(self): + """Set up test files""" + from weio.csv_file import CSVFile + self.CSVFile = CSVFile + self.csv_comma = os.path.join(MyDir, 'CSVComma.csv') + self.csv_header = os.path.join(MyDir, 'CSVColInHeader.csv') + self.csv_tab = os.path.join(MyDir, 'CSVTab.csv') + + def test_backward_compatibility_csv(self): + """Test that normal mode still works for CSV files""" + f = self.CSVFile(self.csv_comma) + self.assertIsNotNone(f.data) + self.assertGreater(len(f.data), 0) + self.assertEqual(len(f.data.columns), 2) + self.assertIn('ColA', f.data.columns) + self.assertIn('ColB', f.data.columns) + + def test_streaming_header_only_csv(self): + """Test header-only reading for CSV files""" + with self.CSVFile(self.csv_comma, streaming=True) as f: + # Metadata should be loaded + self.assertIsNotNone(f.colNames) + self.assertGreater(len(f.colNames), 0) + self.assertIsNotNone(f.sep) + + # Data should NOT be loaded + self.assertIsNone(f.data) + + # Check column names + self.assertIn('ColA', f.colNames) + self.assertIn('ColB', f.colNames) + + def test_streaming_with_readAll_csv(self): + """Test streaming mode with readAll() for CSV files""" + with self.CSVFile(self.csv_comma, streaming=True) as f: + # Initially no data + self.assertIsNone(f.data) + + # Call readAll() + f.readAll() + + # Now data should be loaded + self.assertIsNotNone(f.data) + self.assertEqual(len(f.data.columns), 2) + self.assertIn('ColA', f.data.columns) + self.assertIn('ColB', f.data.columns) + self.assertEqual(len(f.data), 4) + + def test_streaming_data_consistency_csv(self): + """Test that streaming mode produces same data as normal mode (CSV)""" + # Normal mode + f_normal = self.CSVFile(self.csv_comma) + data_normal = f_normal.data.copy() + + # Streaming mode + with self.CSVFile(self.csv_comma, streaming=True) as f_stream: + f_stream.readAll() + data_stream = f_stream.data.copy() + + # Should be identical + np.testing.assert_array_equal(data_normal.values, data_stream.values) + self.assertEqual(list(data_normal.columns), list(data_stream.columns)) + + def test_streaming_without_context_manager_fails_csv(self): + """Test that streaming without context manager raises error""" + f = self.CSVFile() + f.filename = self.csv_comma + f.streaming = True + + # Trying to read without context manager should fail + with self.assertRaises(RuntimeError) as cm: + f.read() + + self.assertIn('context manager', str(cm.exception)) + + def test_readAll_without_streaming_fails_csv(self): + """Test that readAll() requires streaming mode""" + f = self.CSVFile(self.csv_comma) + + with self.assertRaises(RuntimeError) as cm: + f.readAll() + + self.assertIn('streaming mode', str(cm.exception)) + + def test_file_handle_cleanup_csv(self): + """Test that file handles are properly closed""" + # After context manager exits, file should be closed + with self.CSVFile(self.csv_comma, streaming=True) as f: + f.readAll() + self.assertIsNotNone(f._fid) + + # After exit, file handle should be None + self.assertIsNone(f._fid) + + def test_streaming_with_header_comments(self): + """Test streaming with files containing comment headers""" + with self.CSVFile(self.csv_header, streaming=True) as f: + # Headers should be loaded + self.assertIsNotNone(f.header) + self.assertGreater(len(f.header), 0) + self.assertIsNotNone(f.colNames) + + # Data should NOT be loaded + self.assertIsNone(f.data) + + # Check that headers were parsed + self.assertEqual(f.commentChar, '!') + + def test_streaming_readChunk_csv(self): + """Test readChunk() functionality for CSV files""" + with self.CSVFile(self.csv_comma, streaming=True) as f: + # Read first chunk (3 lines) + chunk1 = f.readChunk(nlines=3) + self.assertIsNotNone(chunk1) + self.assertEqual(len(chunk1), 3) + self.assertIn('ColA', chunk1.columns) + + # Read second chunk (remaining 1 line) + chunk2 = f.readChunk(nlines=3) + # Should get the remaining line + if chunk2 is not None: + self.assertGreaterEqual(len(chunk2), 1) + self.assertIn('ColA', chunk2.columns) + + # Try to read beyond EOF - should return None + chunk3 = f.readChunk(nlines=3) + self.assertIsNone(chunk3) + + +class TestHAWC2DatFileStreaming(unittest.TestCase): + """Test streaming functionality for HAWC2DatFile""" + + def setUp(self): + """Set up test files""" + from weio.hawc2_dat_file import HAWC2DatFile + self.HAWC2DatFile = HAWC2DatFile + self.hawc2_ascii = os.path.join(MyDir, 'HAWC2_out_ascii.sel') + self.hawc2_binary = os.path.join(MyDir, 'HAWC2_out_bin.sel') + + def test_backward_compatibility_hawc2_ascii(self): + """Test that normal mode still works for HAWC2 ASCII files""" + f = self.HAWC2DatFile(self.hawc2_ascii) + self.assertIsNotNone(f.data) + self.assertGreater(len(f.data), 0) + self.assertIn('attribute_names', f.info) + self.assertIn('attribute_units', f.info) + + def test_backward_compatibility_hawc2_binary(self): + """Test that normal mode still works for HAWC2 binary files""" + f = self.HAWC2DatFile(self.hawc2_binary) + self.assertIsNotNone(f.data) + self.assertGreater(len(f.data), 0) + self.assertIn('attribute_names', f.info) + + def test_streaming_header_only_hawc2(self): + """Test header-only reading for HAWC2 files""" + with self.HAWC2DatFile(self.hawc2_ascii, streaming=True) as f: + # Metadata should be loaded + self.assertIn('attribute_names', f.info) + self.assertIn('attribute_units', f.info) + self.assertIn('attribute_descr', f.info) + self.assertIn('NrSc', f.info) + self.assertIn('NrCh', f.info) + + # Data should NOT be loaded + self.assertIsNone(f.data) + + # Check metadata content + self.assertGreater(len(f.info['attribute_names']), 0) + self.assertGreater(f.info['NrSc'], 0) + self.assertGreater(f.info['NrCh'], 0) + + def test_streaming_with_readAll_hawc2_ascii(self): + """Test streaming mode with readAll() for HAWC2 ASCII files""" + with self.HAWC2DatFile(self.hawc2_ascii, streaming=True) as f: + # Initially no data + self.assertIsNone(f.data) + + # Call readAll() + f.readAll() + + # Now data should be loaded + self.assertIsNotNone(f.data) + self.assertEqual(f.data.shape[1], f.info['NrCh']) + + def test_streaming_with_readAll_hawc2_binary(self): + """Test streaming mode with readAll() for HAWC2 binary files""" + with self.HAWC2DatFile(self.hawc2_binary, streaming=True) as f: + # Initially no data + self.assertIsNone(f.data) + + # Call readAll() + f.readAll() + + # Now data should be loaded + self.assertIsNotNone(f.data) + self.assertEqual(f.data.shape[1], f.info['NrCh']) + + def test_streaming_data_consistency_hawc2(self): + """Test that streaming mode produces same data as normal mode (HAWC2)""" + # Normal mode + f_normal = self.HAWC2DatFile(self.hawc2_ascii) + data_normal = f_normal.data.copy() + + # Streaming mode + with self.HAWC2DatFile(self.hawc2_ascii, streaming=True) as f_stream: + f_stream.readAll() + data_stream = f_stream.data.copy() + + # Should be identical + np.testing.assert_array_almost_equal(data_normal, data_stream, decimal=10) + + def test_streaming_without_context_manager_fails_hawc2(self): + """Test that streaming without context manager raises error""" + f = self.HAWC2DatFile() + f.filename = self.hawc2_ascii + f.streaming = True + + # Trying to read without context manager should fail + with self.assertRaises(RuntimeError) as cm: + f.read() + + self.assertIn('context manager', str(cm.exception)) + + def test_readAll_without_streaming_fails_hawc2(self): + """Test that readAll() requires streaming mode""" + f = self.HAWC2DatFile(self.hawc2_ascii) + + with self.assertRaises(RuntimeError) as cm: + f.readAll() + + self.assertIn('streaming mode', str(cm.exception)) + + +if __name__ == '__main__': + unittest.main() From 1bca249d4bc4c2a50fd859967671455653f29bf5 Mon Sep 17 00:00:00 2001 From: xflow-ben <110842449+xflow-ben@users.noreply.github.com> Date: Sat, 8 Nov 2025 12:07:19 -0500 Subject: [PATCH 2/4] Remove .claude/settings.local.json from version control, some of the other requested edits --- .claude/settings.local.json | 11 ----------- 1 file changed, 11 deletions(-) delete mode 100644 .claude/settings.local.json diff --git a/.claude/settings.local.json b/.claude/settings.local.json deleted file mode 100644 index d33db4d..0000000 --- a/.claude/settings.local.json +++ /dev/null @@ -1,11 +0,0 @@ -{ - "permissions": { - "allow": [ - "Bash(python -m pytest:*)", - "Bash(python:*)", - "Bash(pip install:*)" - ], - "deny": [], - "ask": [] - } -} From 36a644eee0f58bf4937541c2d344cfe2fb633798 Mon Sep 17 00:00:00 2001 From: xflow-ben <110842449+xflow-ben@users.noreply.github.com> Date: Sat, 8 Nov 2025 13:15:04 -0500 Subject: [PATCH 3/4] Address author feedback: improve inheritance, remove duplication, refactor binary reading - Remove nlines parameter from base class methods (use **kwargs for flexibility) - Fix File.__init__() to not read when streaming=True - CSVFile and FASTOutputFile now properly call parent __init__() - Remove duplicate context manager methods (use inheritance) - Refactor FASTOutputFile binary reading to eliminate ~160 lines of duplication - Update _NEWFILE_TEMPLATE.py with correct **kwargs pattern - All 89 tests pass with 100% backward compatibility --- .gitignore | 1 + weio/_NEWFILE_TEMPLATE.py | 10 +- weio/csv_file.py | 30 +--- weio/fast_output_file.py | 327 ++++++++++++++++++-------------------- weio/file.py | 9 +- weio/hawc2_dat_file.py | 2 +- 6 files changed, 174 insertions(+), 205 deletions(-) diff --git a/.gitignore b/.gitignore index f823e3f..3192547 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ dist *.pdf *.png _* +.claude \ No newline at end of file diff --git a/weio/_NEWFILE_TEMPLATE.py b/weio/_NEWFILE_TEMPLATE.py index 9a595fb..e63e82d 100644 --- a/weio/_NEWFILE_TEMPLATE.py +++ b/weio/_NEWFILE_TEMPLATE.py @@ -122,15 +122,16 @@ def _readAll(self): # self['data'].append(line.strip()) raise NotImplementedError(f"{self.__class__.__name__} does not support readAll()") - def _readChunk(self, nlines=None, **kwargs): + def _readChunk(self, **kwargs): """ Read a chunk of data from current position in streaming mode. Only called when streaming=True and readChunk() is invoked. Parameters ---------- - nlines : int - Number of lines to read. If None, use default chunk size. + **kwargs : dict + Format-specific parameters (e.g., nlines=1000, nrows=100, nbytes=1024) + Different file formats can implement different chunking strategies. Returns ------- @@ -141,8 +142,7 @@ def _readChunk(self, nlines=None, **kwargs): # if self._fid is None: # raise RuntimeError("No open file handle") # - # if nlines is None: - # nlines = 1000 # default chunk size + # nlines = kwargs.get('nlines', 1000) # default chunk size # # chunk = [] # for i in range(nlines): diff --git a/weio/csv_file.py b/weio/csv_file.py index e0a839a..eb3896a 100644 --- a/weio/csv_file.py +++ b/weio/csv_file.py @@ -36,6 +36,7 @@ def formatName(): def __init__(self, filename=None, sep=None, colNames=None, commentChar=None, commentLines=None,\ colNamesLine=None, detectColumnNames=True, header=None, doRead=True, streaming=False, **kwargs): + # Initialize CSV-specific attributes colNames = [] if colNames is None else colNames commentLines = [] if commentLines is None else commentLines self.sep = sep @@ -45,10 +46,6 @@ def __init__(self, filename=None, sep=None, colNames=None, commentChar=None, com self.commentLines = commentLines self.colNamesLine = colNamesLine self.detectColumnNames = detectColumnNames - self.streaming = streaming - self.data = None if streaming else [] - self._in_context = False - self._fid = None if header is None: self.header=[] else: @@ -61,17 +58,20 @@ def __init__(self, filename=None, sep=None, colNames=None, commentChar=None, com raise Exception('Provide either `commentChar` or `commentLines` for CSV file types') if (len(self.colNames)>0) and (self.colNamesLine is not None): raise Exception('Provide either `colNames` or `colNamesLine` for CSV file types') + + # Call parent __init__ - handles streaming, filename, etc. + File.__init__(self, filename=None, streaming=streaming) + + # Handle filename after parent init if filename: if streaming: # Don't read immediately in streaming mode - wait for context manager self.filename = filename else: self.read(filename, doRead=doRead, **kwargs) - else: - self.filename = None def __enter__(self): - """Context manager entry.""" + """Context manager entry - CSV needs special handling for detect().""" self._in_context = True if self.filename: # In streaming mode: detect headers, open file but don't read data @@ -83,21 +83,7 @@ def __enter__(self): self.read(doRead=True) return self - def __exit__(self, exc_type, exc_value, traceback): - """Context manager exit - close file handle if open.""" - if self._fid is not None: - self._fid.close() - self._fid = None - self._in_context = False - return False - - def _enforce_context_if_needed(self): - """Raise if streaming and not in context manager.""" - if self.streaming and not self._in_context: - raise RuntimeError( - "streaming=True requires using a context manager ('with' statement) " - "to ensure the file is closed. Use: `with CSVFile(...) as reader:`" - ) + # Inherit __exit__ and _enforce_context_if_needed from parent File class def read(self, filename=None, doRead=True, streaming=None, **kwargs): if filename: diff --git a/weio/fast_output_file.py b/weio/fast_output_file.py index 428f1b9..a8c9596 100644 --- a/weio/fast_output_file.py +++ b/weio/fast_output_file.py @@ -79,34 +79,17 @@ def defaultExtensions(): def formatName(): return 'FAST output file' - def __enter__(self): - """Context manager entry.""" - self._in_context = True - if self.filename: - self.read(streaming=self.streaming) - return self - - def __exit__(self, exc_type, exc_value, traceback): - """Context manager exit - close file handle if open.""" - if self._fid is not None: - self._fid.close() - self._fid = None - self._in_context = False - return False # Propagate exceptions + # Inherit __enter__, __exit__, and _enforce_context_if_needed from parent File class def __init__(self, filename=None, streaming=False, **kwargs): """ Class constructor. If a `filename` is given, the file is read. """ - # Data - self.filename = filename + # FASTOutputFile-specific attributes self.data = None # pandas.DataFrame self.description = '' # string - self.streaming = streaming - self._in_context = False - self._fid = None self._data_start_pos = None - # Only read if not streaming (streaming requires context manager) - if filename and not streaming: - self.read(streaming=streaming, **kwargs) + + # Call parent __init__ - handles streaming, filename, _fid, _in_context + File.__init__(self, filename=filename, streaming=streaming, **kwargs) def read(self, filename=None, streaming=None, **kwargs): """ Reads the file self.filename, or `filename` if provided """ @@ -292,51 +275,25 @@ def _readAll(self): # Read remaining data from open file handle if self['binary']: - # Binary file - read from current position using stored metadata + # Binary file - use shared data reading function try: - FileID = self['_FileID'] - NumOutChans = self['_NumOutChans'] - NT = self['_NT'] - ColScl = self['_ColScl'] - ColOff = self['_ColOff'] - - # Setup binary reading helpers - StructDict = { - 'uint8': ('B', 1, np.uint8), - 'int16': ('h', 2, np.int16), - 'int32': ('i', 4, np.int32), - 'float32': ('f', 4, np.float32), - 'float64': ('d', 8, np.float64) + # Build info dict from stored metadata + info = { + 'FileID': self['_FileID'], + 'NumOutChans': self['_NumOutChans'], + 'NT': self['_NT'], + 'ColScl': self['_ColScl'], + 'ColOff': self['_ColOff'] } - def freadNumpy(fid, n, dtype): - fmt, nbytes, npdtype = StructDict[dtype] - return np.fromfile(fid, count=n, dtype=npdtype) - - nPts = NT * NumOutChans - - # Read time data if present - if FileID == FileFmtID_WithTime: - PackedTime = freadNumpy(self._fid, NT, 'int32') - TimeScl = self['_TimeScl'] - TimeOff = self['_TimeOff'] - time = (np.array(PackedTime) - TimeOff) / TimeScl - else: - TimeOut1 = self['_TimeOut1'] - TimeIncr = self['_TimeIncr'] - time = TimeOut1 + TimeIncr * np.arange(NT) - - # Read channel data - if FileID == FileFmtID_NoCompressWithoutTime: - PackedData = freadNumpy(self._fid, nPts, 'float64') + if self['_FileID'] == FileFmtID_WithTime: + info['TimeScl'] = self['_TimeScl'] + info['TimeOff'] = self['_TimeOff'] else: - PackedData = freadNumpy(self._fid, nPts, 'int16') + info['TimeOut1'] = self['_TimeOut1'] + info['TimeIncr'] = self['_TimeIncr'] - data = np.array(PackedData).reshape(NT, NumOutChans) - del PackedData - - # Scale the data - data = (data - ColOff) / ColScl - data = np.concatenate([time.reshape(NT, 1), data], 1) + # Read data using shared function + data = load_binary_output_data(self._fid, info, use_buffer=False, method='numpy') # Convert to DataFrame with existing column info if self['attribute_units'] is not None: @@ -734,22 +691,45 @@ def freadStruct(fid, n, dtype): return fid, info -def load_binary_output(filename, use_buffer=False, method='mix', **kwargs): +def load_binary_output_data(fid, info, use_buffer=False, method='mix'): """ - 03/09/15: Ported from ReadFASTbinary.m by Mads M Pedersen, DTU Wind - 24/10/18: Low memory/buffered version by E. Branlard, NREL - 18/01/19: New file format for extended channels, by E. Branlard, NREL - 20/11/23: Improved performances using np.fromfile, by E. Branlard, NREL + Read binary data portion using header metadata. + + Parameters + ---------- + fid : file handle + Open binary file positioned at start of data + info : dict + Header metadata from load_binary_output_header() + Must contain: FileID, NumOutChans, NT, ColScl, ColOff, TimeScl/TimeOff or TimeOut1/TimeIncr + use_buffer : bool + Whether to use buffered reading + method : str + Reading method: 'numpy', 'struct', 'mix', or 'optim' + + Returns + ------- + data : np.ndarray + Data array with time in first column, shape (NT, NumOutChans+1) """ + import numpy as np + import struct + + # Extract metadata + FileID = info['FileID'] + NumOutChans = info['NumOutChans'] + NT = info['NT'] + ColScl = info['ColScl'] + ColOff = info['ColOff'] + + # Setup reading functions StructDict = { - 'uint8': ('B', 1, np.uint8), - 'int16': ('h', 2, np.int16), - 'int32': ('i', 4, np.int32), + 'uint8': ('B', 1, np.uint8), + 'int16': ('h', 2, np.int16), + 'int32': ('i', 4, np.int32), 'float32': ('f', 4, np.float32), 'float64': ('d', 8, np.float64) } - def getFileSizeMB(filename): - return os.path.getsize(filename)/(1024.0**2) def freadStruct(fid, n, dtype): fmt, nbytes, npdtype = StructDict[dtype] @@ -761,7 +741,7 @@ def freadStructArray(fid, n, dtype): def freadNumpy(fid, n, dtype): fmt, nbytes, npdtype = StructDict[dtype] - return np.fromfile(fid, count=n, dtype=npdtype) # Improved performances + return np.fromfile(fid, count=n, dtype=npdtype) if method=='numpy': fread = freadNumpy @@ -773,8 +753,6 @@ def freadNumpy(fid, n, dtype): fread = freadStruct freadLarge = freadNumpy elif method=='optim': - # Decide on method on the fly - #MB = getFileSizeMB(filename) use_buffer = False fread = freadStruct freadLarge = freadNumpy @@ -782,116 +760,71 @@ def freadNumpy(fid, n, dtype): raise NotImplementedError def freadRowOrderTableBuffered(fid, n, type_in, nCols, nOff=0, type_out='float64'): - """ + """ Reads of row-ordered table from a binary file. - Read `n` data of type `type_in`, assumed to be a row ordered table of `nCols` columns. - Memory usage is optimized by allocating the data only once. - Buffered reading is done for improved performances (in particular for 32bit python) - - `nOff` allows for additional column space at the begining of the storage table. - Typically, `nOff=1`, provides a column at the beginning to store the time vector. - - @author E.Branlard, NREL - + `nOff` allows for additional column space at the beginning of the array + for instance for time """ - fmt, nbytes = StructDict[type_in][:2] - nLines = int(n/nCols) - GoodBufferSize = 4096*40 - nLinesPerBuffer = int(GoodBufferSize/nCols) - BufferSize = nCols * nLinesPerBuffer - nBuffer = int(n/BufferSize) - # Allocation of data - data = np.zeros((nLines,nCols+nOff), dtype = type_out) - # Reading + ## Allocation of the output + if type_out=='float64': + data = np.zeros((int(n/nCols), nCols+nOff), dtype=np.float64) + else: + raise NotImplementedError('Unsupported type {}'.format(type_out)) + ## Reading data from file try: nIntRead = 0 - nLinesRead = 0 - while nIntRead Date: Thu, 13 Nov 2025 07:12:40 -0500 Subject: [PATCH 4/4] Implement requested edits --- weio/csv_file.py | 16 ---- weio/fast_output_file.py | 154 +++++++++++++++++------------------ weio/hawc2_dat_file.py | 12 +-- weio/tests/test_streaming.py | 9 +- 4 files changed, 80 insertions(+), 111 deletions(-) diff --git a/weio/csv_file.py b/weio/csv_file.py index eb3896a..a3fdf17 100644 --- a/weio/csv_file.py +++ b/weio/csv_file.py @@ -308,22 +308,6 @@ def _read(self): except pd.errors.ParserError as e: raise WrongFormatError('CSV File {}: '.format(self.filename)+e.args[0]) - def readAll(self): - """Read all remaining data after streaming header.""" - if not self.streaming: - raise RuntimeError("readAll() only valid in streaming mode") - if not self._in_context: - raise RuntimeError("readAll() requires context manager") - self._readAll() - - def readChunk(self, nlines=None, **kwargs): - """Read chunk of data (not implemented for CSV).""" - if not self.streaming: - raise RuntimeError("readChunk() only valid in streaming mode") - if not self._in_context: - raise RuntimeError("readChunk() requires context manager") - return self._readChunk(nlines=nlines, **kwargs) - def _readAll(self): """Read all remaining CSV data in streaming mode.""" if self._fid is None: diff --git a/weio/fast_output_file.py b/weio/fast_output_file.py index a8c9596..4ed5d5e 100644 --- a/weio/fast_output_file.py +++ b/weio/fast_output_file.py @@ -182,35 +182,11 @@ def readline(iLine): # Store binary file metadata for streaming mode if self.streaming and self['binary']: - self['_FileID'] = info.get('FileID') - self['_NumOutChans'] = info.get('NumOutChans') - self['_NT'] = info.get('NT') - self['_ColScl'] = info.get('ColScl') - self['_ColOff'] = info.get('ColOff') - if info['FileID'] == FileFmtID_WithTime: - self['_TimeScl'] = info.get('TimeScl') - self['_TimeOff'] = info.get('TimeOff') - else: - self['_TimeOut1'] = info.get('TimeOut1') - self['_TimeIncr'] = info.get('TimeIncr') + self['info_binary'] = info # --- Convert to DataFrame (only if data was loaded) if self.data is not None: - if info['attribute_units'] is not None: - info['attribute_units'] = [re.sub(r'[()\[\]]','',u) for u in info['attribute_units']] - if len(info['attribute_names'])!=len(info['attribute_units']): - cols=info['attribute_names'] - print('[WARN] not all columns have units! Skipping units') - else: - cols=[n+'_['+u.replace('sec','s')+']' for n,u in zip(info['attribute_names'], info['attribute_units'])] - else: - cols=info['attribute_names'] - if isinstance(self.data, pd.DataFrame): - self.data.columns = cols - else: - if len(cols)!=self.data.shape[1]: - raise BrokenFormatError('Inconstistent number of columns between headers ({}) and data ({}) for file {}'.format(len(cols), self.data.shape[1], self.filename)) - self.data = pd.DataFrame(data=self.data, columns=cols) + self.data = fast_output_data_2_dataframe(self.data, info['attribute_names'], info['attribute_units'], self.filename) def write(self, filename=None, binary=None, fileID=4): @@ -277,36 +253,11 @@ def _readAll(self): if self['binary']: # Binary file - use shared data reading function try: - # Build info dict from stored metadata - info = { - 'FileID': self['_FileID'], - 'NumOutChans': self['_NumOutChans'], - 'NT': self['_NT'], - 'ColScl': self['_ColScl'], - 'ColOff': self['_ColOff'] - } - if self['_FileID'] == FileFmtID_WithTime: - info['TimeScl'] = self['_TimeScl'] - info['TimeOff'] = self['_TimeOff'] - else: - info['TimeOut1'] = self['_TimeOut1'] - info['TimeIncr'] = self['_TimeIncr'] - - # Read data using shared function - data = load_binary_output_data(self._fid, info, use_buffer=False, method='numpy') + # Read data using shared function with stored info + data = load_binary_output_data(self._fid, self['info_binary'], use_buffer=False, method='numpy') # Convert to DataFrame with existing column info - if self['attribute_units'] is not None: - units = [re.sub(r'[()\[\]]','',u) for u in self['attribute_units']] - if len(self['attribute_names'])!=len(units): - cols=self['attribute_names'] - print('[WARN] not all columns have units! Skipping units') - else: - cols=[n+'_['+u.replace('sec','s')+']' for n,u in zip(self['attribute_names'], units)] - else: - cols=self['attribute_names'] - - self.data = pd.DataFrame(data=data, columns=cols) + self.data = fast_output_data_2_dataframe(data, self['attribute_names'], self['attribute_units'], self.filename) if self.data.shape[0]==0: raise EmptyFileError('This FAST output file contains no data: {}'.format(self.filename)) @@ -321,17 +272,7 @@ def _readAll(self): data = np.loadtxt(self._fid, comments=('This')) # Convert to DataFrame with existing column info - if self['attribute_units'] is not None: - units = [re.sub(r'[()\[\]]','',u) for u in self['attribute_units']] - if len(self['attribute_names'])!=len(units): - cols=self['attribute_names'] - print('[WARN] not all columns have units! Skipping units') - else: - cols=[n+'_['+u.replace('sec','s')+']' for n,u in zip(self['attribute_names'], units)] - else: - cols=self['attribute_names'] - - self.data = pd.DataFrame(data=data, columns=cols) + self.data = fast_output_data_2_dataframe(data, self['attribute_names'], self['attribute_units'], self.filename) if self.data.shape[0]==0: raise EmptyFileError('This FAST output file contains no data: {}'.format(self.filename)) @@ -477,6 +418,47 @@ def toOUTB(self, filename=None, extension='.outb', fileID=4, noOverWrite=True, * # -------------------------------------------------------------------------------- # --- Helper low level functions # -------------------------------------------------------------------------------- +def fast_output_data_2_dataframe(data, attribute_names, attribute_units, filename): + """ + Convert FAST output data to DataFrame with proper column names. + + Parameters + ---------- + data : np.ndarray or pd.DataFrame + Data array or DataFrame + attribute_names : list + Channel names + attribute_units : list + Channel units + filename : str + Filename for error messages + + Returns + ------- + pd.DataFrame + DataFrame with properly formatted columns + """ + # Build column names with units + if attribute_units is not None: + units = [re.sub(r'[()\[\]]','',u) for u in attribute_units] + if len(attribute_names)!=len(units): + cols=attribute_names + print('[WARN] not all columns have units! Skipping units') + else: + cols=[n+'_['+u.replace('sec','s')+']' for n,u in zip(attribute_names, units)] + else: + cols=attribute_names + + # Convert to DataFrame if needed + if isinstance(data, pd.DataFrame): + data.columns = cols + return data + else: + if len(cols)!=data.shape[1]: + raise BrokenFormatError('Inconstistent number of columns between headers ({}) and data ({}) for file {}'.format(len(cols), data.shape[1], filename)) + return pd.DataFrame(data=data, columns=cols) + + def isBinary(filename): with open(filename, 'r') as f: try: @@ -763,25 +745,35 @@ def freadRowOrderTableBuffered(fid, n, type_in, nCols, nOff=0, type_out='float64 """ Reads of row-ordered table from a binary file. - `nOff` allows for additional column space at the beginning of the array - for instance for time + Read `n` data of type `type_in`, assumed to be a row ordered table of `nCols` columns. + Memory usage is optimized by allocating the data only once. + Buffered reading is done for improved performances (in particular for 32bit python) + + `nOff` allows for additional column space at the begining of the storage table. + Typically, `nOff=1`, provides a column at the beginning to store the time vector. + + @author E.Branlard, NREL + """ - ## Allocation of the output - if type_out=='float64': - data = np.zeros((int(n/nCols), nCols+nOff), dtype=np.float64) - else: - raise NotImplementedError('Unsupported type {}'.format(type_out)) - ## Reading data from file + fmt, nbytes = StructDict[type_in][:2] + nLines = int(n/nCols) + GoodBufferSize = 4096*40 + nLinesPerBuffer = int(GoodBufferSize/nCols) + BufferSize = nCols * nLinesPerBuffer + nBuffer = int(n/BufferSize) + # Allocation of data + data = np.zeros((nLines,nCols+nOff), dtype = type_out) + # Reading try: nIntRead = 0 - iChunk = 0 - nChunkSize = min(n, 1024000) # twice a 8*Mb chunk - while nIntRead < n: - nIntToRead = min(n - nIntRead, nChunkSize) - iRow = int((nIntRead/nCols)) - nRows = int(nIntToRead/nCols) - Buffer = freadLarge(fid, nIntToRead, type_in) - data[iRow:iRow+nRows, nOff:] = np.array(Buffer).reshape(-1, nCols) + nLinesRead = 0 + while nIntRead