Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 27 additions & 10 deletions operators/distiller-counted-data-reader/operator.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,48 @@
"id": "5527c101-304b-47d0-8814-488455847ba6",
"image": "ghcr.io/nersc/interactem/distiller-counted-data-reader:latest",
"label": "Counted Data Reader",
"description": "Reads frames from the distiller pipeline, and sends out frames of data.",
"description": "Reads a file of sparse data and sends out batched frames of data.",
"outputs": [
{
"name": "counted_data",
"label": "Counted Data",
"type": "sparse_frame",
"description": "Sparse frame of counted data"
"description": "Sparse frames of counted data"
}
],
"parameters": [
{
"name": "raw_data_dir",
"label": "Raw data directory",
"name": "counted_data_dir",
"label": "Counted data directory",
"type": "mount",
"default": "~/ncem_raw_data",
"description": "This is where the raw data files (*.data) are located",
"default": "/global/cfs/cdirs/ncemhub/distiller/counted/",
"description": "The path to the sparse H5 data files",
"required": true
},
{
"name": "filename",
"label": "Filename",
"name": "scan_id",
"label": "Scan ID",
"type": "int",
"default": "10000",
"description": "The Distiller Scan ID of the dataset to be read",
"required": true
},
{
"name": "file_suffix",
"label": "File suffix",
"type": "str",
"default": "test_file.h5",
"description": "The file to be processed",
"default": "STANDARD",
"description": "The file suffix type (STANDARD or CENTERED).",
"required": true
},
{
"name": "batch_size_mb",
"label": "Batch size (MB)",
"type": "float",
"default": 1.0,
"description": "The size of each emitted batch in megabytes.",
"required": false
},
{
"name": "cache_last_file",
"label": "Cache last file",
Expand All @@ -36,6 +52,7 @@
"description": "Cache the last loaded file to avoid re-reading between triggers.",
"required": false
}

],
"triggers": [
{
Expand Down
41 changes: 27 additions & 14 deletions operators/distiller-counted-data-reader/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import stempy.io
from distiller_streaming.emitter import BatchEmitter
from stempy.contrib import FileSuffix, get_scan_path

from interactem.core.logger import get_logger
from interactem.core.models.messages import BytesMessage
Expand All @@ -13,11 +14,12 @@
# --- Operator State ---
source_dataset_path: pathlib.Path = pathlib.Path()
active_emitter: BatchEmitter | None = None
scan_id: int | None = None
current_scan_number: int = 1
current_filename: str | None = None
cached_sparse_array: tuple[str, Any] | None = None
current_scan_id: int | None = None
cached_sparse_array: tuple[int, Any] | None = None

data_dir = pathlib.Path(f"{DATA_DIRECTORY}/raw_data_dir")
data_dir = pathlib.Path(f"{DATA_DIRECTORY}/counted_data_dir")


@operator
Expand All @@ -29,28 +31,39 @@ def reader(
return None

global source_dataset_path, active_emitter, current_scan_number
global current_filename, cached_sparse_array
global current_scan_id, cached_sparse_array

filename = parameters.get("filename", None)
scan_id = parameters.get("scan_id", None)
file_suffix = parameters.get("file_suffix", "STANDARD")
batch_size_mb = parameters.get("batch_size_mb", 1.0)
cache_last_file = parameters.get("cache_last_file", False)

if not filename:
raise ValueError("Filename parameter 'filename' is not set.")
if not scan_id:
raise ValueError("Parameter 'scan_id' is not set.")

if not cache_last_file:
cached_sparse_array = None

if filename != current_filename:
current_filename = filename
if not file_suffix:
raise ValueError("Parameter 'file_suffix' is not set.")

if file_suffix == "STANDARD":
file_suffix_enum = FileSuffix.STANDARD
elif file_suffix == "CENTERED":
file_suffix_enum = FileSuffix.CENTERED
else:
raise ValueError(f"Invalid file_suffix: {file_suffix}")

if scan_id != current_scan_id:
current_scan_id = scan_id
current_scan_number = 1
active_emitter = None
cached_sparse_array = None
logger.info(f"New filename received: {filename}. Resetting scan number to 1.")
logger.info(f"New scan_id received: {scan_id}. Resetting scan number to 1.")

source_dataset_path = data_dir / filename
source_dataset_path, scan_num, scan_id = get_scan_path(data_dir, scan_id=scan_id, version=1, file_suffix=file_suffix_enum)

# Load Dataset and Create Emitter if Necessary
# Load Dataset and create emitter if necessary
if active_emitter is None:
if not source_dataset_path.exists():
logger.error(f"Source file not found: {source_dataset_path}")
Expand All @@ -63,7 +76,7 @@ def reader(
cached_sparse_array[1]
if cache_last_file
and cached_sparse_array
and cached_sparse_array[0] == filename
and cached_sparse_array[0] == scan_id
else None
)

Expand All @@ -72,7 +85,7 @@ def reader(
)

if cache_last_file:
cached_sparse_array = (filename, loaded_sparse_array)
cached_sparse_array = (scan_id, loaded_sparse_array) # type: ignore

active_emitter = BatchEmitter(
sparse_array=loaded_sparse_array,
Expand Down
Loading