Source code for mlprimitives.candidates.dsp

import numpy as np


[docs]def next_power_of_2(x): """Finds the next power of 2 value Args: x: Input value Returns: power_of_2: Next power of 2 value """ power_of_2 = 1 if x == 0 else 2 ** np.ceil(np.log2(x)) return power_of_2
[docs]class SpectralMask: """Anomalies detection in satellite telemetry data using a spectral mask""" def __init__(self, method='std_dev', gain=1, window_length=128, beta=4): """Inits SpectralMask Class with default values for method, gain, window_length, and beta Args: method: Method used to calculate the Spectral Mask: 'std_dev' uses the standard deviation of each frequency component and 'min_max' uses the minimum and maximum values gain: Multiplication factor used in the comparison between the spectral mask defined using the training data and the Fourier transform of the telemetry data vector window_length: Length of the sliding window in number of samples beta: Beta value for Kaiser window design """ self.gain = gain self.beta = beta self.window_length = window_length self.window = None self.method = method self.window_length = next_power_of_2(window_length)
[docs] def window_design(self, window_length, beta): """Kaiser window design Args: window_length: Length of the window in number of samples beta: Beta value for Kaiser window design Returns: window: Window designed using the beta and length provided as inputs """ self.window = np.kaiser(window_length, beta) return self.window
[docs] def fit(self, X): """Defines a spectral mask based on training data Args: X: Training data """ training_signal = X self.window_design(self.window_length, self.beta) if self.method == 'std_dev': self.fit_freq_std_dev(training_signal) elif self.method == 'min_max': self.fit_freq_min_max(training_signal) else: raise ValueError('Unknown method: {}'.format(self.method))
[docs] def fit_freq_min_max(self, training_signal): """Defines a spectral mask based on training data using min and max values of each frequency component Args: training_signal: Training data """ window_length = len(self.window) window_weight = sum(self.window) max_mask = np.zeros(int(window_length / 2) + 1) min_mask = np.zeros(int(window_length / 2) + 1) for i in range(0, len(training_signal) - window_length - 1): rfft = np.fft.rfft(training_signal[i:i + window_length] * self.window) temp = np.abs(rfft) / window_weight max_mask = np.maximum(max_mask, temp) min_mask = np.minimum(min_mask, temp) self.mask_top = self.gain * max_mask self.mask_bottom = min_mask / self.gain
[docs] def fit_freq_std_dev(self, training_signal): """Defines a spectral mask based on training data using the standard deviation values of each frequency component Args: training_signal: Training data """ window_length = len(self.window) window_weight = sum(self.window) num_of_windows = len(training_signal) - window_length - 1 mean = np.zeros(int(window_length / 2) + 1) pow = np.zeros(int(window_length / 2) + 1) temp = np.zeros(int(window_length / 2) + 1) rfft = np.fft.rfft(training_signal[0:0 + window_length] * self.window) max = np.abs(rfft) / window_weight min = max for i in range(0, num_of_windows): rfft = np.fft.rfft(training_signal[i:i + window_length] * self.window) temp = np.abs(rfft) / window_weight max = np.maximum(temp, max) min = np.minimum(temp, min) mean = mean + temp pow = pow + np.power(temp, 2) mean = mean / num_of_windows pow = pow / num_of_windows std_dev = np.sqrt(pow - np.power(mean, 2)) self.mask_top = mean + self.gain * std_dev self.mask_bottom = np.maximum(mean - self.gain * std_dev, np.zeros(int(window_length / 2) + 1))
[docs] def produce(self, X): """Detects anomalies in telemetry data based on its power spectral density Args: X: Telemetry data Returns: anomalies: Data vector consisting of the anomalies detected in the telemetry data """ signal = X window_length = len(self.window) anomalies = np.zeros(len(signal)) window_weight = sum(self.window) for i in range(0, len(signal) - window_length - 1): rfft = np.fft.rfft(signal[i:i + window_length] * self.window) sig_freq = np.abs(rfft) / window_weight anomalies[i] = 0 for m in range(0, int(window_length / 2) - 1): if ((sig_freq[m] > self.mask_top[m]) or (sig_freq[m] < self.mask_bottom[m])): anomalies[i] = 1 break return anomalies