# -*- coding: utf-8 -*-
import logging
from functools import partial
import keras
import numpy as np
import pandas as pd
import tensorflow as tf
from keras import backend as K
from keras.layers import Input, Layer
from keras.models import Model
from scipy import integrate, stats
from mlprimitives.adapters.keras import build_layer
from mlprimitives.utils import import_object
LOGGER = logging.getLogger(__name__)
[docs]class RandomWeightedAverage(Layer):
def __init__(self, batch_size):
super().__init__()
self.batch_size = batch_size
[docs] def call(self, inputs, **kwargs):
alpha = tf.random_uniform((self.batch_size, 1, 1, 1))
return (alpha * inputs[0]) + ((1 - alpha) * inputs[1])
[docs] def compute_output_shape(self, input_shape):
return input_shape[0]
[docs]class CycleGAN():
"""CycleGAN class"""
def _build_model(self, hyperparameters, layers, input_shape):
x = Input(shape=input_shape)
model = keras.models.Sequential()
for layer in layers:
built_layer = build_layer(layer, hyperparameters)
model.add(built_layer)
return Model(x, model(x))
def _wasserstein_loss(self, y_true, y_pred):
return K.mean(y_true * y_pred)
def _gradient_penalty_loss(self, y_true, y_pred, averaged_samples):
gradients = K.gradients(y_pred, averaged_samples)[0]
gradients_sqr = K.square(gradients)
gradients_sqr_sum = K.sum(gradients_sqr, axis=np.arange(1, len(gradients_sqr.shape)))
gradient_l2_norm = K.sqrt(gradients_sqr_sum)
gradient_penalty = K.square(1 - gradient_l2_norm)
return K.mean(gradient_penalty)
def __init__(self, shape, encoder_input_shape, generator_input_shape, critic_x_input_shape,
critic_z_input_shape, layers_encoder, layers_generator, layers_critic_x,
layers_critic_z, optimizer, learning_rate=0.0005, epochs=2000, latent_dim=20,
batch_size=64, iterations_critic=5, **hyperparameters):
"""Initialize the ARIMA object.
Args:
shape (tuple):
Tuple denoting the shape of an input sample.
encoder_input_shape (tuple):
Shape of encoder input.
generator_input_shape (tuple):
Shape of generator input.
critic_x_input_shape (tuple):
Shape of critic_x input.
critic_z_input_shape (tuple):
Shape of critic_z input.
layers_encoder (list):
List containing layers of encoder.
layers_generator (list):
List containing layers of generator.
layers_critic_x (list):
List containing layers of critic_x.
layers_critic_z (list):
List containing layers of critic_z.
optimizer (str):
String denoting the keras optimizer.
learning_rate (float):
Optional. Float denoting the learning rate of the optimizer. Default 0.005.
epochs (int):
Optional. Integer denoting the number of epochs. Default 2000.
latent_dim (int):
Optional. Integer denoting dimension of latent space. Default 20.
batch_size (int):
Integer denoting the batch size. Default 64.
iterations_critic (int):
Optional. Integer denoting the number of critic training steps per one
Generator/Encoder training step. Default 5.
hyperparameters (dictionary):
Optional. Dictionary containing any additional inputs.
"""
self.shape = shape
self.latent_dim = latent_dim
self.batch_size = batch_size
self.iterations_critic = iterations_critic
self.epochs = epochs
self.hyperparameters = hyperparameters
self.encoder_input_shape = encoder_input_shape
self.generator_input_shape = generator_input_shape
self.critic_x_input_shape = critic_x_input_shape
self.critic_z_input_shape = critic_z_input_shape
self.layers_encoder, self.layers_generator = layers_encoder, layers_generator
self.layers_critic_x, self.layers_critic_z = layers_critic_x, layers_critic_z
self.optimizer = import_object(optimizer)(learning_rate)
def _build_cyclegan(self, **kwargs):
hyperparameters = self.hyperparameters.copy()
hyperparameters.update(kwargs)
self.encoder = self._build_model(hyperparameters, self.layers_encoder,
self.encoder_input_shape)
self.generator = self._build_model(hyperparameters, self.layers_generator,
self.generator_input_shape)
self.critic_x = self._build_model(hyperparameters, self.layers_critic_x,
self.critic_x_input_shape)
self.critic_z = self._build_model(hyperparameters, self.layers_critic_z,
self.critic_z_input_shape)
self.generator.trainable = False
self.encoder.trainable = False
z = Input(shape=(self.latent_dim, 1))
x = Input(shape=self.shape)
x_ = self.generator(z)
z_ = self.encoder(x)
fake_x = self.critic_x(x_)
valid_x = self.critic_x(x)
interpolated_x = RandomWeightedAverage(self.batch_size)([x, x_])
validity_interpolated_x = self.critic_x(interpolated_x)
partial_gp_loss_x = partial(self._gradient_penalty_loss, averaged_samples=interpolated_x)
partial_gp_loss_x.__name__ = 'gradient_penalty'
self.critic_x_model = Model(inputs=[x, z], outputs=[valid_x, fake_x,
validity_interpolated_x])
self.critic_x_model.compile(loss=[self._wasserstein_loss, self._wasserstein_loss,
partial_gp_loss_x], optimizer=self.optimizer,
loss_weights=[1, 1, 5])
fake_z = self.critic_z(z_)
valid_z = self.critic_z(z)
interpolated_z = RandomWeightedAverage(self.batch_size)([z, z_])
validity_interpolated_z = self.critic_z(interpolated_z)
partial_gp_loss_z = partial(self._gradient_penalty_loss, averaged_samples=interpolated_z)
partial_gp_loss_z.__name__ = 'gradient_penalty'
self.critic_z_model = Model(inputs=[x, z], outputs=[valid_z, fake_z,
validity_interpolated_z])
self.critic_z_model.compile(loss=[self._wasserstein_loss, self._wasserstein_loss,
partial_gp_loss_z], optimizer=self.optimizer,
loss_weights=[1, 1, 10])
self.critic_x.trainable = False
self.critic_z.trainable = False
self.generator.trainable = True
self.encoder.trainable = True
z_gen = Input(shape=(self.latent_dim, 1))
x_gen_ = self.generator(z_gen)
x_gen = Input(shape=self.shape)
z_gen_ = self.encoder(x_gen)
x_gen_rec = self.generator(z_gen_)
fake_gen_x = self.critic_x(x_gen_)
fake_gen_z = self.critic_z(z_gen_)
self.encoder_generator_model = Model([x_gen, z_gen], [fake_gen_x, fake_gen_z, x_gen_rec])
self.encoder_generator_model.compile(loss=[self._wasserstein_loss, self._wasserstein_loss,
'mse'], optimizer=self.optimizer,
loss_weights=[1, 1, 50])
def _fit(self, X):
fake = np.ones((self.batch_size, 1))
valid = -np.ones((self.batch_size, 1))
delta = np.ones((self.batch_size, 1)) * 10
for epoch in range(self.epochs):
for _ in range(self.iterations_critic):
idx = np.random.randint(0, X.shape[0], self.batch_size)
x = X[idx]
z = np.random.normal(size=(self.batch_size, self.latent_dim, 1))
cx_loss = self.critic_x_model.train_on_batch([x, z], [valid, fake, delta])
cz_loss = self.critic_z_model.train_on_batch([x, z], [valid, fake, delta])
g_loss = self.encoder_generator_model.train_on_batch([x, z], [valid, valid, x])
if epoch % 100 == 0:
print('Epoch: {}, [Dx loss: {}] [Dz loss: {}] [G loss: {}]'.format(
epoch, cx_loss, cz_loss, g_loss))
[docs] def fit(self, X, **kwargs):
"""Fit the CycleGAN.
Args:
X (ndarray):
N-dimensional array containing the input training sequences for the model.
"""
self._build_cyclegan(**kwargs)
X = X.reshape((-1, self.shape[0], 1))
self._fit(X)
[docs] def predict(self, X):
"""Predict values using the initialized object.
Args:
X (ndarray):
N-dimensional array containing the input sequences for the model.
Returns:
typle:
ndarray:
N-dimensional array containing the reconstructions for each input sequence.
ndarray:
N-dimensional array containing the critic scores for each input sequence.
"""
X = X.reshape((-1, self.shape[0], 1))
z_ = self.encoder.predict(X)
y_hat = self.generator.predict(z_)
critic = self.critic_x.predict(X)
return y_hat, critic
[docs]def score_anomalies(y, y_hat, critic, score_window=10, smooth_window=200):
"""Compute an array of anomaly scores.
Anomaly scores are calculated using a combination of reconstruction error and critic score.
Args:
y (ndarray):
Ground truth.
y_hat (ndarray):
Predicted values. Each timestamp has multiple predictions.
critic (ndarray):
Critic score. Each timestamp has multiple critic scores.
score_window (int):
Optional. Size of the window over which the scores are calculated.
If not given, 10 is used.
smooth_window (int):
Optional. Size of window over which smoothing is applied.
If not given, 200 is used.
Returns:
ndarray:
Array of anomaly scores.
"""
true = [item[0] for item in y.reshape((y.shape[0], -1))]
for item in y[-1][1:]:
true.extend(item)
critic_extended = list()
for c in critic:
critic_extended = critic_extended + np.repeat(c, y_hat.shape[1]).tolist()
predictions = []
critic_kde_max = []
pred_length = y_hat.shape[1]
num_errors = y_hat.shape[1] + (y_hat.shape[0] - 1)
y_hat = np.asarray(y_hat)
critic_extended = np.asarray(critic_extended).reshape((-1, y_hat.shape[1]))
for i in range(num_errors):
intermediate = []
critic_intermediate = []
for j in range(max(0, i - num_errors + pred_length), min(i + 1, pred_length)):
intermediate.append(y_hat[i - j, j])
critic_intermediate.append(critic_extended[i - j, j])
if intermediate:
predictions.append(np.median(np.asarray(intermediate)))
if len(critic_intermediate) > 1:
discr_intermediate = np.asarray(critic_intermediate)
try:
critic_kde_max.append(discr_intermediate[np.argmax(
stats.gaussian_kde(discr_intermediate)(critic_intermediate))])
except np.linalg.LinAlgError:
critic_kde_max.append(np.median(discr_intermediate))
else:
critic_kde_max.append(np.median(np.asarray(critic_intermediate)))
predictions = np.asarray(predictions)
score_window_min = score_window // 2
pd_true = pd.Series(np.asarray(true).flatten())
pd_pred = pd.Series(np.asarray(predictions).flatten())
score_measure_true = pd_true.rolling(score_window, center=True, min_periods=score_window_min)\
.apply(integrate.trapz)
score_measure_pred = pd_pred.rolling(score_window, center=True, min_periods=score_window_min)\
.apply(integrate.trapz)
scores = abs(score_measure_true - score_measure_pred)
scores_smoothed = pd.Series(scores).rolling(smooth_window, center=True,
min_periods=smooth_window // 2,
win_type='triang').mean().values
z_score_scores = stats.zscore(scores_smoothed)
z_score_scores_clip = np.clip(z_score_scores, a_min=0, a_max=None) + 1
critic_kde_max = np.asarray(critic_kde_max)
l_quantile = np.quantile(critic_kde_max, 0.25)
u_quantile = np.quantile(critic_kde_max, 0.75)
in_range = np.logical_and(critic_kde_max >= l_quantile, critic_kde_max <= u_quantile)
critic_mean = np.mean(critic_kde_max[in_range])
critic_std = np.std(critic_kde_max)
z_score_critic = np.absolute((np.asarray(critic_kde_max) - critic_mean) / critic_std) + 1
z_score_critic = pd.Series(z_score_critic).rolling(
100, center=True, min_periods=50).mean().values
return np.multiply(z_score_scores_clip, z_score_critic)