Source code for ninolearn.learn.models.encoderDecoder

import numpy as np

import keras.backend as K
from keras.models import Model, save_model, load_model
from keras.layers import Dense, Input, Dropout
from keras.layers import GaussianNoise
from keras.optimizers import Adam
from keras.callbacks import EarlyStopping
from keras import regularizers

from os.path import join, exists
from os import mkdir, listdir, getcwd
from shutil import rmtree


from ninolearn.learn.skillMeasures import rmse
from ninolearn.exceptions import MissingArgumentError
from ninolearn.utils import small_print_header, print_header


import warnings

[docs]class EncoderDecoder(object): """ The Encoder-Decoder is an neural network that has the same architecture as an Autoencoder. Hence, labal and feature vector have the same dimension. In the ninolearn package the model is called Encoder-Decoder because it is used for prediction purposes and therefore label and feature vector might be soparated by some time lag or even are not the same variable. """
[docs] def set_parameters(self, neurons=(128, 16), dropout=0.2, noise=0.2, noise_out=0.2, l1_hidden=0.0001, l2_hidden=0.0001, l1_out=0.0001, l2_out=0.0001, batch_size=50, lr=0.0001, n_segments=5, n_members_segment=1, patience = 40, epochs=500, verbose=0): """ Set the parameters of the Encoder-Decoder neural network. Note, if the parameters are given in a list, ninolearn assumes that a the method .fit_RandomizedSearch() is used. :type neurons: tuple (list of two tuples for .fit_RandomizedSearch()) :param neurons: The architecture of the Encoder-Decoder. The layer\ with the lowest number of neurons is assumed to be the bottleneck layer\ for which the activation function is linear. Furthermore, the output\ layer has a linear activation as well. All other layers have the ReLU\ as activation. :type dropout: float :param dropout: Standard deviation of the Gaussian dropout. Dropout\ layers are installed behind each hidden layer in the Encoder and the\ Decoder. :type noise: float :param noise: Standard deviation of Gaussian noise for the input layer. :type noise: float :param noise: Standard deviation of Gaussian noise for the output\ layer. :type l1_hidden, l2_hidden: float :param l1_hidden, l2_hidden: Coefficent for the L1 and the L2 penalty\ term for the hidden layer weights. :type l1_out, l2_out: float :param l1_hidden, l2_hidden: Coefficent for the L1 and the L2 penalty\ term for the output layer weights. :type batch_size: int :param batch_size: Batch size during training of a member of the\ Encoder-Decoder. :type lr: float :param lr: The learning rate. :type n_segments: int :param n_segments: The number of segments that are used for the cross-\ validation scheme and the training of the Ensemble members. :type n_members_segment: int :param n_members_segment: The number of members that are trained for\ one segment. :type patience: int :param patience: The number of epochs to wait until Early-Stopping\ stops the training. :type epochs: int :param epochs: The maximum number of epochs. :type verbose: int :param verbose: Print some progress to screen. Either 0 (silent), 1 or\ 2. """ # hyperparameters self.hyperparameters = {'neurons': neurons, 'dropout': dropout, 'noise': noise, 'noise_out': noise_out, 'l1_hidden': l1_hidden, 'l2_hidden': l2_hidden, 'l1_out': l1_out, 'l2_out': l2_out, 'lr': lr, 'batch_size': batch_size} self.n_hidden_layers = len(neurons) self.bottelneck_layer = np.argmin(neurons) # hyperparameters for randomized search self.hyperparameters_search = {} for key in self.hyperparameters.keys(): if type(self.hyperparameters[key]) is list: if len(self.hyperparameters[key])>0: self.hyperparameters_search[key] = self.hyperparameters[key].copy() # training settings self.patience = patience self.epochs = epochs self.verbose = verbose # traning/validation split self.n_segments = n_segments self.n_members_segment = n_members_segment # derived parameters self.n_members = self.n_segments * self.n_members_segment
[docs] def build_model(self, n_features, n_labels): """ The method builds a new member of the ensemble and returns it. :type n_features: int :param n_features: The number of features. :type n_labels: int :param n_labels: The number of labels. """ # initialize optimizer and early stopping self.optimizer = Adam(lr=self.hyperparameters['lr'], beta_1=0.9, beta_2=0.999, epsilon=None, decay=0., amsgrad=False) self.es = EarlyStopping(monitor=f'val_mean_squared_error', min_delta=0.0, patience=self.patience, verbose=1, mode='min', restore_best_weights=True) inputs = Input(shape=(n_features,)) h = GaussianNoise(self.hyperparameters['noise'])(inputs) # the encoder part for i in range(self.bottelneck_layer): h = Dense(self.hyperparameters['neurons'][i], activation='relu', kernel_regularizer=regularizers.l1_l2(self.hyperparameters['l1_hidden'], self.hyperparameters['l2_hidden']))(h) h = Dropout(self.hyperparameters['dropout'])(h) latent = Dense(self.hyperparameters['neurons'][self.bottelneck_layer], activation='linear', kernel_regularizer=regularizers.l1_l2(self.hyperparameters['l1_hidden'], self.hyperparameters['l2_hidden']))(h) encoder = Model(inputs=inputs, outputs=latent, name='encoder') # the decoder part latent_inputs = Input(shape=(self.hyperparameters['neurons'][self.bottelneck_layer],)) h = GaussianNoise(0.0)(latent_inputs) for i in range(self.bottelneck_layer + 1, self.n_hidden_layers - 1): h = Dense(self.hyperparameters['neurons'][i], activation='relu', kernel_regularizer=regularizers.l1_l2(self.hyperparameters['l1_hidden'], self.hyperparameters['l2_hidden']))(h) h = Dropout(self.hyperparameters['dropout'])(h) decoded = Dense(n_labels, activation='linear', kernel_regularizer=regularizers.l1_l2(self.hyperparameters['l1_out'], self.hyperparameters['l2_out']))(h) decoder = Model(inputs=latent_inputs, outputs=decoded, name='decoder') # endocder-decoder model encoder_decoder = Model(inputs, decoder(encoder(inputs)), name='encoder_decoder') return encoder_decoder, encoder, decoder
[docs] def fit(self, trainX, trainy, valX=None, valy=None): """ Fit the model. If n_segments is 1, then a validation data set needs to be supplied. :type trainX: np.ndarray :param trainX: The training feature set. 2-D array with dimensions\ (timesteps, features) :type trainy: np.ndarray :param trainy: The training label set. 2-D array with dimensions\ (timesteps, labels) :type valX: np.ndarray :param valX: The validation feature set. 2-D array with dimensions\ (timesteps, features). :type valy: np.ndarray :param valy: The validation label set. 2-D array with dimensions\ (timesteps, labels). """ # clear memory K.clear_session() # allocate lists for the ensemble self.ensemble = [] self.history = [] self.val_loss = [] self.segment_len = trainX.shape[0]//self.n_segments if self.n_segments==1 and (valX is not None or valy is not None): warnings.warn("Validation and test data set are the same if n_segements is 1!") i = 0 while i<self.n_members_segment: j = 0 while j<self.n_segments: n_ens_sel = len(self.ensemble) small_print_header(f"Train member Nr {n_ens_sel+1}/{self.n_members}") # build model member, member_encoder, member_decoder = self.build_model(trainX.shape[1], trainy.shape[1]) # compite model member.compile(loss='mse', optimizer=self.optimizer, metrics=['mse']) # validate on the spare segment if self.n_segments!=1: if valX is not None or valy is not None: warnings.warn("Validation data set will be one of the segments. The provided validation data set is not used!") start_ind = j * self.segment_len end_ind = (j+1) * self.segment_len trainXens = np.delete(trainX, np.s_[start_ind:end_ind], axis=0) trainyens = np.delete(trainy, np.s_[start_ind:end_ind], axis=0) valXens = trainX[start_ind:end_ind] valyens = trainy[start_ind:end_ind] # validate on test data set elif self.n_segments==1: if valX is None or valy is None: raise MissingArgumentError("When segments length is 1, a validation data set must be provided.") trainXens = trainX trainyens = trainy valXens = valX valyens = valy history = member.fit(trainXens, trainyens, epochs=self.epochs, batch_size=self.hyperparameters['batch_size'], verbose=self.verbose, shuffle=True, callbacks=[self.es], validation_data=(valXens, valyens)) self.history.append(history) self.val_loss.append(member.evaluate(valXens, valyens)[1]) print(f"Loss: {self.val_loss[-1]}") self.ensemble.append(member) j+=1 i+=1 self.mean_val_loss = np.mean(self.val_loss) print(f"Mean loss: {self.mean_val_loss}")
[docs] def fit_RandomizedSearch(self, trainX, trainy, n_iter=10, **kwargs): """ Hyperparameter optimazation using random search. :type trainX: np.ndarray :param trainX: The training feature set. 2-D array with dimensions\ (timesteps, features). :type trainy: np.ndarray :param trainy: The training label set. 2-D array with dimensions\ (timesteps, labels). :param kwargs: Keyword arguments are passed to the .fit() method. """ # check if hyperparameters where provided in lists for randomized search if len(self.hyperparameters_search) == 0: raise Exception("No variable indicated for hyperparameter search!") #iterate with randomized hyperparameters best_loss = np.inf for i in range(n_iter): print_header(f"Search iteration Nr {i+1}/{n_iter}") # random selection of hyperparameters for key in self.hyperparameters_search.keys(): low = self.hyperparameters_search[key][0] high = self.hyperparameters_search[key][1] if type(low) is float and type(high) is float: self.hyperparameters[key] = np.random.uniform(low, high) if type(low) is int and type(high) is int: self.hyperparameters[key] = np.random.randint(low, high+1) if type(low) is tuple and type(high) is tuple: hyp_list = [] for i in range(len(low)): hyp_list.append(np.random.randint(low[i], high[i]+1)) self.hyperparameters[key] = tuple(hyp_list) self.fit(trainX, trainy, **kwargs) # check if validation score was enhanced if self.mean_val_loss<best_loss: best_loss = self.mean_val_loss self.best_hyperparameters = self.hyperparameters.copy() small_print_header("New best hyperparameters") print(f"Mean loss: {best_loss}") print(self.best_hyperparameters) # refit the model with optimized hyperparameter # AND to have the weights of the DE for the best hyperparameters again print_header("Refit the model with best hyperparamters") self.hyperparameters = self.best_hyperparameters.copy() print(self.hyperparameters) self.fit(trainX, trainy, **kwargs) print(f"best loss search: {best_loss}") print(f"loss refitting : {self.mean_val_loss}")
[docs] def predict(self, X): """ Ensemble prediction. :type X: np.ndarray :param X: Feature set for which the prediction should be made. """ pred_ens = np.zeros((X.shape[0], X.shape[1], self.n_members)) for i in range(self.n_members): pred_ens[:,:,i] = self.ensemble[i].predict(X) return np.mean(pred_ens, axis=2), pred_ens
[docs] def evaluate(self, X, ytrue): """ Evaluate the model based on the RMSE :type X: np.ndarray :param X: The feature array. :type ytrue: np.ndarray :param ytrue: The true label array. """ ypred, dummy = self.predict(X) return rmse(ytrue, ypred)
[docs] def save(self, location='', dir_name='ed_ensemble'): """ Save the ensemble. :type location: str :param location: Base directory where to for all Encoder-Decoder\ ensembles :type dir_name: str :param dir_name: The specific directory name in the base directory\ were to save the ensemble. """ path = join(location, dir_name) if not exists(path): mkdir(path) else: rmtree(path) mkdir(path) for i in range(self.n_members): path_h5 = join(path, f"member{i}.h5") save_model(self.ensemble[i], path_h5, include_optimizer=False)
[docs] def load(self, location=None, dir_name='ensemble'): """ Save the ensemble. :type location: str :param location: Base directory where for all Encoder-Decoder\ ensembles. :type dir_name: str :param dir_name: The specific directory name in the base directory\ were to find the ensemble. """ if location is None: location = getcwd() path = join(location, dir_name) files = listdir(path) self.n_members = len(files) self.ensemble = [] for file in files: file_path = join(path, file) self.ensemble.append(load_model(file_path)) output_neurons = self.ensemble[0].get_output_shape_at(0)[1] if output_neurons==2: self.std = True else: self.std = False