Source code for medusa.optimization

import numpy as np
import os, json, pickle, datetime
import matplotlib.pyplot as plt
from sklearn.model_selection import ParameterGrid
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF
from scipy.stats import norm
import warnings


[docs]class Grinder:
[docs] def __init__(self, **hyparams): # Make hyparams dictionary opt_params = dict() for key, value in hyparams.items(): value = [value] if not isinstance(value, list) else value opt_params[key] = np.arange(len(value)) # Make grid self.hyparams = hyparams self.opt_params = opt_params self.hyparams_grid = list(ParameterGrid(hyparams)) self.opt_grid = list(ParameterGrid(opt_params))
[docs] def get_hyparams(self, idx): return self.hyparams_grid[idx]
[docs]class Optimizer:
[docs] def __init__(self, obj, func, grinder, args=None, previous_hist=None): # Supported values self.supported_obj = ('minimize', 'maximize') self.supported_approach = ('grid', 'random', 'bayesian') # Check errors if not obj in self.supported_obj: raise ValueError("Parameter 'obj' must be one of these values: " + str(self.supported_obj)) if (args is not None) and (type(args)!=dict): raise Exception("Argument 'args' must be None or type dict") if (previous_hist is not None) and (type(previous_hist)!=dict): raise Exception("Argument 'previous_hist' must be None or type " "dict") # Initialize self.obj = obj self.func = func self.args = args self.grinder = grinder # Initialize history and the optim directory if previous_hist is None: # Initialize directory dir_name = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f") self.optim_dir = os.getcwd() + '/optimization/optim_' + \ dir_name + '\\' os.makedirs(self.optim_dir) # Instantiate useful variables self.history = dict() self.history["grid_idx"] = list() self.history["score"] = list() self.history["info"] = list() self.history["optim_dir"] = self.optim_dir else: warnings.warn("Loading a previous history only makes sense if " "arguments 'obj', 'func', 'grinder', 'args' and have " "not changed.") self.history = previous_hist self.optim_dir = self.history["optim_dir"] try: os.makedirs(self.history["optim_dir"]) except FileExistsError as e: pass # Bayesian optimization variables self.bayes_feat = None self.bayes_model = None self.bayes_n_samp = None self.rbf_length_scale = None self.bayes_parameters = False # Save the grinder with open(self.optim_dir + 'grinder.pkl', 'wb') as file: pickle.dump(self.grinder, file)
def __pick_params(self, approach): if approach == 'grid': idx = self.history["grid_idx"][-1]+1 \ if len(self.history["grid_idx"]) > 0 else 0 elif approach == 'random': indexes = np.arange(len(self.grinder.opt_grid)) pickable = np.delete(indexes, self.history["grid_idx"]) np.random.shuffle(pickable) idx = self.__pick_new_indexes_randomly(1, repetition=False)[0] elif approach == 'bayesian': # Pick new random samples samp = self.__pick_new_indexes_randomly( int(self.bayes_n_samp), repetition=False ) feat = self.__get_bayes_feat(samp) mu, std = self.bayes_model.predict(feat, return_std=True) # Get best feat until now with warnings.catch_warnings(): # ignore generated warnings warnings.simplefilter("ignore") # Update best model if self.obj == 'maximize': best = np.max(self.bayes_model.predict(self.bayes_feat)) else: best = np.min(self.bayes_model.predict(self.bayes_feat)) # Calculate the probability of improvement or expected improvement probs = norm.cdf((np.squeeze(mu) - best) / (std+1e-9)) if self.obj == 'maximize': idx = samp[np.argmax(probs)] else: idx = samp[np.argmin(probs)] else: pass hyparams = self.grinder.get_hyparams(idx) # Return return int(idx), hyparams def __pick_new_indexes_randomly(self, n, repetition=False): indexes = np.arange(len(self.grinder.opt_grid)) pickable = np.delete(indexes, self.history["grid_idx"]) \ if repetition is False else indexes np.random.shuffle(pickable) return pickable[0:n] def __get_bayes_feat(self, indexes): data = np.empty((len(indexes), len(self.grinder.opt_grid[0]))) i = 0 for idx in indexes: data[i, :] = [value for key, value in self.grinder.opt_grid[idx].items()] i += 1 return data
[docs] def set_bayes_opt_params(self, n_samp, rbf_length_scale=1): self.bayes_n_samp = n_samp self.rbf_length_scale = rbf_length_scale self.bayes_parameters = True
[docs] def optimize(self, max_iter, approach, save_history=True): # Check errors if max_iter > len(self.grinder.opt_grid): raise ValueError("Parameter 'max_iter' must be <= than the grid " "size:" + str(len(self.grinder.opt_grid))) if not approach in self.supported_approach: raise ValueError("Parameter 'approach' must be one of these " "values: " + str(self.supported_obj)) if approach == 'bayesian': if len(self.history["score"]) == 0: raise Exception("No previous history for bayesian " "optimization. Initialize with a random " "optimization") if not self.bayes_parameters: raise Exception("You must set the bayesian optimization " "parameters first. Call function " "set_bayes_opt_params.") # Initialize bayesian model if approach == 'bayesian': if self.bayes_feat is None: self.bayes_feat = self.__get_bayes_feat(self.history["grid_idx"]) if self.bayes_feat.shape[0] != len(self.history["grid_idx"]): self.bayes_feat = self.__get_bayes_feat(self.history["grid_idx"]) # Define gaussian model rbf_kernel = RBF(length_scale=self.rbf_length_scale) self.bayes_model = GaussianProcessRegressor(kernel=rbf_kernel, normalize_y=True) # Fit model self.bayes_model.fit(self.bayes_feat, self.history["score"]) print("\nOPTIMIZATION (" + approach + ")\n") # Start optimization for i in range(max_iter): print("===========================================================") print("ITERATION " + str(i+1) + "/" + str(max_iter) + "...\n") # Get params idx, hyparams = self.__pick_params(approach) # Evaluate function score, info = self.func(hyparams=hyparams, args=self.args) if (info is not None) and (type(info)!=dict): raise Exception("History must be None or type dict") # Update history self.history["grid_idx"].append(idx) self.history["score"].append(score) self.history["info"].append(info) # Update best model if self.obj == 'maximize': best_score = np.max(self.history["score"]) best_idx = self.history["grid_idx"][np.argmax( self.history["score"] )] else: best_score = np.min(self.history["score"]) best_idx = self.history["grid_idx"][np.argmin( self.history["score"] )] # Print info print() if approach == 'bayesian': pred = self.bayes_model.predict(self.__get_bayes_feat([idx]))[0] print("Bayes estim\t= %.5f" % (pred) + ", grid_idx = " + str(idx)) print("Score \t\t= %.5f" % (score) + ", grid_idx = " + str(idx)) print("Best score \t= %.5f" % (best_score) + ", grid_idx = " + str(best_idx)) print("===========================================================") print() # Update bayesian model if approach == 'bayesian': self.bayes_feat = np.concatenate((self.bayes_feat, self.__get_bayes_feat([idx])), axis=0) self.bayes_model.fit(self.bayes_feat, self.history["score"]) # Save history if save_history: with open(self.optim_dir + 'history.pkl', 'wb') as file: pickle.dump(self.history, file) # Save json history (human-readable) with open(self.optim_dir + 'history.json', 'w') as file: file.write(json.dumps(self.history, indent=4)) # Return return best_idx, best_score, self.grinder.get_hyparams(best_idx)
[docs] def get_best_hyparams(self): # Get best model if len(self.history["score"]) > 0: if self.obj == 'maximize': best_score = np.max(self.history["score"]) best_idx = self.history["grid_idx"][np.argmax( self.history["score"] )] else: best_score = np.min(self.history["score"]) best_idx = self.history["grid_idx"][np.argmin( self.history["score"] )] return best_score, best_idx, self.grinder.get_hyparams(best_idx) else: raise Exception("The optimization has not started")