Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 72 additions & 22 deletions aeon/anomaly_detection/series/distance_based/_left_stampi.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import numpy as np

from aeon.anomaly_detection.series.base import BaseSeriesAnomalyDetector
from aeon.utils.windowing import reverse_windowing


class LeftSTAMPi(BaseSeriesAnomalyDetector):
Expand All @@ -22,7 +21,6 @@ class LeftSTAMPi(BaseSeriesAnomalyDetector):

LeftSTAMPi supports univariate time series only.


Parameters
----------
window_size : int, default=3
Expand All @@ -38,10 +36,16 @@ class LeftSTAMPi(BaseSeriesAnomalyDetector):
k : int, default=1
The number of top distances to return.

Notes
-----
The first ``n_init_train`` points will always receive an anomaly score of 0,
as there are no left neighbors available for comparison in that region.
This applies to both ``fit_predict`` and ``predict`` calls.

Examples
--------
Calculate the anomaly score for the complete time series at once.
Internally,this is applying the incremental approach outlined below.
Internally, this is applying the incremental approach outlined below.

>>> import numpy as np
>>> from aeon.anomaly_detection.series.distance_based import LeftSTAMPi
Expand All @@ -58,7 +62,7 @@ class LeftSTAMPi(BaseSeriesAnomalyDetector):
and Eamonn Keogh: "Matrix Profile I: All Pairs Similarity Joins
for Time Series: A Unifying View That Includes Motifs, Discords
and Shapelets.", In Proceedings of the International Conference
on Data Mining (ICDM), 13171322. doi: 10.1109/ICDM.2016.0179
on Data Mining (ICDM), 1317-1322. doi: 10.1109/ICDM.2016.0179

"""

Expand Down Expand Up @@ -91,6 +95,7 @@ def __init__(
super().__init__(axis=0)

def _check_params(self, X):
"""Validate parameters against the fit data X."""
if self.window_size < 3 or self.window_size > len(X):
raise ValueError(
"The window size must be at least 3 and at most the length of the "
Expand All @@ -112,33 +117,78 @@ def _check_params(self, X):
def _fit(self, X: np.ndarray, y=None) -> "LeftSTAMPi":
if X.ndim > 1:
X = X.squeeze()

self._check_params(X)

self._call_stumpi(X)
# Initialise the matrix profile on only the first n_init_train points.
# Store the full input length so _predict can detect the base-class
# fit_predict path where _fit(X) and _predict(X) both receive full X.
self._n_fit_points = len(X)
self._call_stumpi(X[: self.n_init_train])

return self

def _predict(self, X: np.ndarray) -> np.ndarray:
if X.ndim > 1:
X = X.squeeze()
self._check_params(X)

for x in X:
self.mp_.update(x)

lmp = self.mp_._left_P
lmp[: self.n_init_train] = 0
point_anomaly_scores = reverse_windowing(lmp, self.window_size)

return point_anomaly_scores

def _fit_predict(self, X: np.ndarray, y=None) -> np.ndarray:
if X.ndim > 1:
X = X.squeeze()

self.fit(X[: self.n_init_train])

return self.predict(X[self.n_init_train :])
n_input = len(X)

# Base-class fit_predict path: _fit(full_X) then _predict(full_X).
# Prepend zeros for the init region and score only the remainder.
if hasattr(self, "_n_fit_points") and self._n_fit_points == n_input:
init_scores = np.zeros(self.n_init_train, dtype=np.float64)
n_test = n_input - self.n_init_train
test_scores = self._score_new_points(X[self.n_init_train :], n_test)
# Reset so subsequent standalone predict() calls work correctly
self._n_fit_points = 0
return np.concatenate([init_scores, test_scores])

# Standalone predict() path: X contains only new (unseen) points
return self._score_new_points(X, len(X))

def _score_new_points(self, X: np.ndarray, n_out: int) -> np.ndarray:
"""Incrementally update the matrix profile and return ``n_out`` scores.

Each call to ``mp_.update(point)`` appends one new window to the left
matrix profile. The score for each new point is assigned to the end
of the window that completes at that point (i.e. window ``j`` maps to
output index ``j + window_size - 1``). The first ``window_size - 1``
output positions, which have no completing window yet, receive a score
of 0.

Parameters
----------
X : np.ndarray
New data points to feed into the matrix profile incrementally.
n_out : int
Number of point-level scores to return (equals ``len(X)``).
"""
if n_out == 0:
return np.array([], dtype=np.float64)

# Number of windows already in the profile before adding new points.
# After fitting on n_init_train points: n_init_windows = n_init_train
# - window_size + 1.
n_init_windows = self.n_init_train - self.window_size + 1

# Feed each new point into the existing stumpy STUMPI object.
for point in X:
self.mp_.update(point)

# Extract only the newly added window scores from the left matrix
# profile. Each update call appends exactly one new entry.
new_mp_windows = self.mp_._left_P[n_init_windows:]

# Assign each window score to the last (rightmost) point it covers.
# Window j of the new windows ends at output index j + window_size - 1.
# Indices below window_size - 1 have no completing window and stay 0.
n_total_out = len(new_mp_windows) + self.window_size - 1
point_scores = np.zeros(n_total_out, dtype=np.float64)
for j, score in enumerate(new_mp_windows):
point_scores[j + self.window_size - 1] = score

return point_scores[:n_out]

def _call_stumpi(self, X: np.ndarray):
import stumpy
Expand Down
Loading