Source code for metaheuristic_designer.operators.BO_operator

"""
Bayesian Optimization operator based on Gaussian Process regression.
"""

from __future__ import annotations
from typing import Callable, Optional
import warnings
import numpy as np
import scipy as sp
from sklearn.exceptions import ConvergenceWarning
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF, WhiteKernel

from ..initializer import Initializer
from ..utils import RNGLike
from ..operator import Operator
from ..objective_function import ObjectiveFunc
from ..population import Population
from ..encodings import Encoding

warnings.filterwarnings(action="ignore", category=ConvergenceWarning)


def _acquisition_function(gaussian_model: GaussianProcessRegressor, _X: np.ndarray, x_in: np.ndarray, max_y: float) -> float:
    """Expected Improvement acquisition function.

    Parameters
    ----------
    gaussian_model : GaussianProcessRegressor
        The fitted GP model.
    _X : ndarray
        Training inputs (not used directly).
    x_in : ndarray
        Point where the acquisition function is evaluated.
    max_y : float
        Current best observed value.

    Returns
    -------
    float
        Negative Expected Improvement (for minimization).
    """

    mean_y, std_y = gaussian_model.predict(x_in[None, :], return_std=True)
    std_y = np.maximum(std_y, 1e-10)

    z = (mean_y - max_y) / std_y
    exp_imp = (mean_y - max_y) * sp.stats.norm.cdf(z) + std_y * sp.stats.norm.pdf(z)

    return exp_imp


[docs] class BOOperator(Operator): """Bayesian Optimization operator using a GP surrogate. Fits a Gaussian Process model to the current population, then maximises the Expected Improvement acquisition function to propose a new candidate solution. The new solution is merged back into the population. Parameters ---------- name : str, optional Display name (default ``"Gaussian Regression Surrogate Model"``). encoding : Encoding, optional Encoding applied to the genotype. kernel : sklearn Kernel, optional GP kernel. Defaults to ``RBF(length_scale=1.0) + WhiteKernel(noise_level=1.0)``. random_state : RNGLike, optional Random number generator. batch_size : int, optional Number of random starting points for acquisition optimisation (default 100). max_samples : int, optional Maximum number of training points used (default 100). If the population exceeds this, a random subset is selected. rbf_scale : float, optional Multiplicative factor applied to the RBF kernel (default 1.0). **kwargs Additional keyword arguments stored as schedulable parameters. """ def __init__( self, name: str = "Gaussian Regression Surrogate Model", encoding: Optional[Encoding] = None, kernel: Optional[Callable] = None, random_state: Optional[RNGLike] = None, batch_size: int = 100, max_samples: int = 100, rbf_scale: float = 1.0, **kwargs, ): super().__init__( name=name, encoding=encoding, random_state=random_state, # Forced kwargs batch_size=batch_size, max_samples=max_samples, **kwargs, ) if kernel is None: kernel = rbf_scale * RBF(length_scale=1.0) + WhiteKernel(noise_level=1.0) self.rbf_scale = rbf_scale self.gaussian_model = GaussianProcessRegressor(kernel=kernel, normalize_y=True, copy_X_train=False)
[docs] def evolve(self, population: Population, initializer: Optional[Initializer] = None) -> Population: """Fit GP, optimise acquisition, and merge the proposed point. Parameters ---------- population : Population The current population. initializer : Initializer, optional Used to generate random starting points for acquisition optimisation. Returns ------- Population The population with the new candidate appended. """ # Obtain training data from the population population = population.calculate_fitness() X = population.genotype_matrix y = population.fitness if population.population_size > self.params.max_samples: mask = self.random_state.choice(population.population_size, size=self.params.max_samples, replace=False) X = X[mask] y = y[mask] # Fit the surrogate model self.gaussian_model.fit(X, y) # Initialize optimization data structures objfunc = population.objfunc max_y = np.max(self.gaussian_model.predict(X)) min_ei = float("inf") new_best_point = X[0] if isinstance(objfunc, ObjectiveFunc): bounds = np.asarray((objfunc.lower_bound, objfunc.upper_bound)).T if bounds.ndim == 1: bounds = bounds[None, :] else: bounds = None # Optimize the acquisition function with a batch of initial points chosen at random samples = initializer.generate_population(objfunc, self.params.batch_size).genotype_matrix for x0 in samples: result = sp.optimize.minimize( fun=lambda x_in: -_acquisition_function(self.gaussian_model, X, x_in, max_y), x0=x0, method="L-BFGS-B", bounds=bounds ) if result.fun < min_ei: min_ei = result.fun new_best_point = result.x # Create new population from the optimization result and merge it with the previous one new_sample_population = Population(objfunc, genotype_matrix=new_best_point[None, :], encoding=population.encoding) new_population = Population.join_populations(population, new_sample_population) new_population = new_population.repair_solutions() return new_population