Source code for mlprimitives.custom.timeseries_anomalies

"""
Time Series anomaly detection functions.
Implementation inspired by the paper https://arxiv.org/pdf/1802.04431.pdf
"""

import numpy as np
import pandas as pd
from scipy.optimize import fmin


[docs]def regression_errors(y, y_hat, smoothing_window=0.01, smooth=True): """Compute an array of absolute errors comparing predictions and expected output. If smooth is True, apply EWMA to the resulting array of errors. Args: y (ndarray): Ground truth. y_hat (ndarray): Predicted values. smoothing_window (float): Optional. Size of the smoothing window, expressed as a proportion of the total length of y. If not given, 0.01 is used. smooth (bool): Optional. Indicates whether the returned errors should be smoothed with EWMA. If not given, `True` is used. Returns: ndarray: Array of errors. """ errors = np.abs(y - y_hat)[:, 0] if not smooth: return errors smoothing_window = int(smoothing_window * len(y)) return pd.Series(errors).ewm(span=smoothing_window).mean().values
[docs]def deltas(errors, epsilon, mean, std): """Compute mean and std deltas. delta_mean = mean(errors) - mean(all errors below epsilon) delta_std = std(errors) - std(all errors below epsilon) Args: errors (ndarray): Array of errors. epsilon (ndarray): Threshold value. mean (float): Mean of errors. std (float): Standard deviation of errors. Returns: float, float: * delta_mean. * delta_std. """ below = errors[errors <= epsilon] if not len(below): return 0, 0 return mean - below.mean(), std - below.std()
[docs]def count_above(errors, epsilon): """Count number of errors and continuous sequences above epsilon. Continuous sequences are counted by shifting and counting the number of positions where there was a change and the original value was true, which means that a sequence started at that position. Args: errors (ndarray): Array of errors. epsilon (ndarray): Threshold value. Returns: int, int: * Number of errors above epsilon. * Number of continuous sequences above epsilon. """ above = errors > epsilon total_above = len(errors[above]) above = pd.Series(above) shift = above.shift(1) change = above != shift total_consecutive = sum(above & change) return total_above, total_consecutive
[docs]def z_cost(z, errors, mean, std): """Compute how bad a z value is. The original formula is:: (delta_mean/mean) + (delta_std/std) ------------------------------------------------------ number of errors above + (number of sequences above)^2 which computes the "goodness" of `z`, meaning that the higher the value the better the `z`. In this case, we return this value inverted (we make it negative), to convert it into a cost function, as later on we will use scipy.fmin to minimize it. Args: z (ndarray): Value for which a cost score is calculated. errors (ndarray): Array of errors. mean (float): Mean of errors. std (float): Standard deviation of errors. Returns: float: Cost of z. """ epsilon = mean + z * std delta_mean, delta_std = deltas(errors, epsilon, mean, std) above, consecutive = count_above(errors, epsilon) numerator = -(delta_mean / mean + delta_std / std) denominator = above + consecutive ** 2 if denominator == 0: return np.inf return numerator / denominator
def _find_threshold(errors, z_range): """Find the ideal threshold. The ideal threshold is the one that minimizes the z_cost function. Scipy.fmin is used to find the minimum, using the values from z_range as starting points. Args: errors (ndarray): Array of errors. z_range (list): List of two values denoting the range out of which the start points for the scipy.fmin function are chosen. Returns: float: Calculated threshold value. """ mean = errors.mean() std = errors.std() min_z, max_z = z_range best_z = min_z best_cost = np.inf for z in range(min_z, max_z): best = fmin(z_cost, z, args=(errors, mean, std), full_output=True, disp=False) z, cost = best[0:2] if cost < best_cost: best_z = z[0] return mean + best_z * std def _find_sequences(errors, epsilon, anomaly_padding): """Find sequences of values that are above epsilon. This is done following this steps: * create a boolean mask that indicates which values are above epsilon. * mark certain range of errors around True values with a True as well. * shift this mask by one place, filing the empty gap with a False. * compare the shifted mask with the original one to see if there are changes. * Consider a sequence start any point which was true and has changed. * Consider a sequence end any point which was false and has changed. Args: errors (ndarray): Array of errors. epsilon (float): Threshold value. All errors above epsilon are considered an anomaly. anomaly_padding (int): Number of errors before and after a found anomaly that are added to the anomalous sequence. Returns: ndarray, float: * Array containing start, end of each found anomalous sequence. * Maximum error value that was not considered an anomaly. """ above = pd.Series(errors > epsilon) index_above = np.argwhere(above.values) for idx in index_above.flatten(): above[max(0, idx - anomaly_padding):min(idx + anomaly_padding + 1, len(above))] = True shift = above.shift(1).fillna(False) change = above != shift if above.all(): max_below = 0 else: max_below = max(errors[~above]) index = above.index starts = index[above & change].tolist() ends = (index[~above & change] - 1).tolist() if len(ends) == len(starts) - 1: ends.append(len(above) - 1) return np.array([starts, ends]).T, max_below def _get_max_errors(errors, sequences, max_below): """Get the maximum error for each anomalous sequence. Also add a row with the max error which was not considered anomalous. Table containing a ``max_error`` column with the maximum error of each sequence and the columns ``start`` and ``stop`` with the corresponding start and stop indexes, sorted descendingly by the maximum error. Args: errors (ndarray): Array of errors. sequences (ndarray): Array containing start, end of anomalous sequences max_below (float): Maximum error value that was not considered an anomaly. Returns: pandas.DataFrame: DataFrame object containing columns ``start``, ``stop`` and ``max_error``. """ max_errors = [{ 'max_error': max_below, 'start': -1, 'stop': -1 }] for sequence in sequences: start, stop = sequence sequence_errors = errors[start: stop + 1] max_errors.append({ 'start': start, 'stop': stop, 'max_error': max(sequence_errors) }) max_errors = pd.DataFrame(max_errors).sort_values('max_error', ascending=False) return max_errors.reset_index(drop=True) def _prune_anomalies(max_errors, min_percent): """Prune anomalies to mitigate false positives. This is done by following these steps: * Shift the errors 1 negative step to compare each value with the next one. * Drop the last row, which we do not want to compare. * Calculate the percentage increase for each row. * Find rows which are below ``min_percent``. * Find the index of the latest of such rows. * Get the values of all the sequences above that index. Args: max_errors (pandas.DataFrame): DataFrame object containing columns ``start``, ``stop`` and ``max_error``. min_percent (float): Percentage of separation the anomalies need to meet between themselves and the highest non-anomalous error in the window sequence. Returns: ndarray: Array containing start, end, max_error of the pruned anomalies. """ next_error = max_errors['max_error'].shift(-1).iloc[:-1] max_error = max_errors['max_error'].iloc[:-1] increase = (max_error - next_error) / max_error too_small = increase < min_percent if too_small.all(): last_index = -1 else: last_index = max_error[~too_small].index[-1] return max_errors[['start', 'stop', 'max_error']].iloc[0: last_index + 1].values def _compute_scores(pruned_anomalies, errors, threshold, window_start): """Compute the score of the anomalies. Calculate the score of the anomalies proportional to the maximum error in the sequence and add window_start timestamp to make the index absolute. Args: pruned_anomalies (ndarray): Array of anomalies containing the start, end and max_error for all anomalies in the window. errors (ndarray): Array of errors. threshold (float): Threshold value. window_start (int): Index of the first error value in the window. Returns: list: List of anomalies containing start-index, end-index, score for each anomaly. """ anomalies = list() denominator = errors.mean() + errors.std() for row in pruned_anomalies: max_error = row[2] score = (max_error - threshold) / denominator anomalies.append([row[0] + window_start, row[1] + window_start, score]) return anomalies def _merge_sequences(sequences): """Merge consecutive and overlapping sequences. We iterate over a list of start, end, score triples and merge together overlapping or consecutive sequences. The score of a merged sequence is the average of the single scores, weighted by the length of the corresponding sequences. Args: sequences (list): List of anomalies, containing start-index, end-index, score for each anomaly. Returns: ndarray: Array containing start-index, end-index, score for each anomaly after merging. """ if len(sequences) == 0: return np.array([]) sorted_sequences = sorted(sequences, key=lambda entry: entry[0]) new_sequences = [sorted_sequences[0]] score = [sorted_sequences[0][2]] weights = [sorted_sequences[0][1] - sorted_sequences[0][0]] for sequence in sorted_sequences[1:]: prev_sequence = new_sequences[-1] if sequence[0] <= prev_sequence[1] + 1: score.append(sequence[2]) weights.append(sequence[1] - sequence[0]) weighted_average = np.average(score, weights=weights) new_sequences[-1] = (prev_sequence[0], max(prev_sequence[1], sequence[1]), weighted_average) else: score = [sequence[2]] weights = [sequence[1] - sequence[0]] new_sequences.append(sequence) return np.array(new_sequences) def _find_window_sequences(window, z_range, anomaly_padding, min_percent, window_start): """Find sequences of values that are anomalous. We first find the threshold for the window, then find all sequences above that threshold. After that, we get the max errors of the sequences and prune the anomalies. Lastly, the score of the anomalies is computed. Args: window (ndarray): Array of errors in the window that is analyzed. z_range (list): List of two values denoting the range out of which the start points for the dynamic find_threshold function are chosen. anomaly_padding (int): Number of errors before and after a found anomaly that are added to the anomalous sequence. min_percent (float): Percentage of separation the anomalies need to meet between themselves and the highest non-anomalous error in the window sequence. window_start (int): Index of the first error value in the window. Returns: ndarray: Array containing the start-index, end-index, score for each anomalous sequence that was found in the window. """ threshold = _find_threshold(window, z_range) window_sequences, max_below = _find_sequences(window, threshold, anomaly_padding) max_errors = _get_max_errors(window, window_sequences, max_below) pruned_anomalies = _prune_anomalies(max_errors, min_percent) window_sequences = _compute_scores(pruned_anomalies, window, threshold, window_start) return window_sequences
[docs]def find_anomalies(errors, index, z_range=(0, 10), window_size=None, window_step_size=None, min_percent=0.1, anomaly_padding=50, lower_threshold=False): """Find sequences of error values that are anomalous. We first define the window of errors, that we want to analyze. We then find the anomalous sequences in that window and store the start/stop index pairs that correspond to each sequence, along with its score. Optionally, we can flip the error sequence around the mean and apply the same procedure, allowing us to find unusually low error sequences. We then move the window and repeat the procedure. Lastly, we combine overlapping or consecutive sequences. Args: errors (ndarray): Array of errors. index (ndarray): Array of indices of the errors. z_range (list): Optional. List of two values denoting the range out of which the start points for the scipy.fmin function are chosen. If not given, (0, 10) is used. window_size (int): Optional. Size of the window for which a threshold is calculated. If not given, `None` is used, which finds one threshold for the entire sequence of errors. window_step_size (int): Optional. Number of steps the window is moved before another threshold is calculated for the new window. min_percent (float): Optional. Percentage of separation the anomalies need to meet between themselves and the highest non-anomalous error in the window sequence. It nof given, 0.1 is used. anomaly_padding (int): Optional. Number of errors before and after a found anomaly that are added to the anomalous sequence. If not given, 50 is used. lower_threshold (bool): Optional. Indicates whether to apply a lower threshold to find unusually low errors. If not given, `False` is used. Returns: ndarray: Array containing start-index, end-index, score for each anomalous sequence that was found. """ window_size = window_size or len(errors) window_step_size = window_step_size or window_size window_start = 0 window_end = 0 sequences = list() while window_end < len(errors): window_end = window_start + window_size window = errors[window_start:window_end] window_sequences = _find_window_sequences(window, z_range, anomaly_padding, min_percent, window_start) sequences.extend(window_sequences) if lower_threshold: # Flip errors sequence around mean mean = window.mean() inverted_window = mean - (window - mean) inverted_window_sequences = _find_window_sequences(inverted_window, z_range, anomaly_padding, min_percent, window_start) sequences.extend(inverted_window_sequences) window_start = window_start + window_step_size sequences = _merge_sequences(sequences) anomalies = list() for start, stop, score in sequences: anomalies.append([index[int(start)], index[int(stop)], score]) return np.asarray(anomalies)