Skip to content

implemented async EEG pipeline manager#93

Open
AgastyaRai wants to merge 2 commits intomainfrom
feature/neurolm-classifiers
Open

implemented async EEG pipeline manager#93
AgastyaRai wants to merge 2 commits intomainfrom
feature/neurolm-classifiers

Conversation

@AgastyaRai
Copy link
Copy Markdown
Collaborator

Async Pipeline Manager Pull Request

What?

Added an async pipeline manager in moss/manager.py, so pipeline execution is now driven by function pointers instead of a bunch of conditionals.

The manager now takes pipeline configs in the form:

{
  "nodes": [
    {"type": "bandpass filter", "config": {"method": "FIR", "Filter": 100}},
    {"type": "ml", "config": {"model": "LDA"}}
  ]
}

and runs the nodes in sequence.

It currently supports:

  • bandpass filter
  • ml

It also returns both required outputs:

{
  "processed_eeg": [...],
  "classifier_output": {
    "status": "ok",
    "task": "activity",
    "overall_label": "...",
    "confidence": 0.0,
    "segments": [...],
    "class_probabilities": {...},
    "n_segments": 0
  }
}

Also fixed an important issue where FUNCTION_MAP had originally been defined before the functions it referenced.


How?

Added a central FUNCTION_MAP so node names map directly to wrapper functions, instead of being handled through inline conditional execution logic.

Made the full pipeline async by turning the wrappers and run_pipeline(...) into async functions, and wrapping the underlying blocking preprocessing / encoding / classifier calls with await asyncio.to_thread(...).

The bandpass filter node now actually uses the config to control preprocessing, dispatching to the existing FIR or IIR bandpass helpers before running the existing from_array(...) resampling / segmentation path.

The ml node now uses the real downstream ML path through NeuroLMEncoder.encode(...), load_classifier(...), predict(...), and predict_majority(...).

run_pipeline(...) now tracks and returns both outputs explicitly:

  • preprocessing nodes update processed_eeg
  • ml nodes update classifier_output

Also added validation / error messages for invalid pipeline structure, unknown node types, invalid config types, invalid EEG input shape/type, and ML nodes receiving non-segment input.


Testing

Ran an end-to-end pipeline using real CSV-derived EEG input with:

pipeline = {
  "nodes": [
    {"type": "bandpass filter", "config": {"method": "FIR", "Filter": 100, "src_fs": 256}},
    {"type": "ml", "config": {"task": "activity"}}
  ]
}

Verified:

  • output includes both processed_eeg and classifier_output
  • classifier returns status ok
  • FIR and IIR bandpass paths both execute successfully
  • preprocessing-only pipeline returns classifier_output = None
  • ml-only pipeline works when given preprocessed segment input
  • alias node type "bandpass_filter" routes correctly

Also ran a short repeated timing pass after the final wiring update.

@AgastyaRai AgastyaRai requested a review from HeisSteve as a code owner April 11, 2026 10:34
@github-actions
Copy link
Copy Markdown

❌ PR approval conditions NOT satisfied. At least 1 team lead OR 2 team members must approve before merging.
[Last checked: 2026-04-11T10:34:28.668Z]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant