Source code for metaheuristic_designer.objective_function

"""
Base class for the Objective Function module.

This module implements the objective function that will measure the quality of the solutions.
"""

from __future__ import annotations
import logging
from typing import Any, Callable, Optional, TYPE_CHECKING
from abc import ABC, abstractmethod
import numpy as np

from .encodings import ParameterExtendingEncoding
from .constraint_handlers import ConstraintHandler, ClipBoundConstraint, CompositeConstraint, ExtendedConstraintHandler
from .parametrizable_mixin import ParametrizableMixin
from .utils import MatrixLike, VectorLike, ScalarLike

if TYPE_CHECKING:
    from metaheuristic_designer.population import Population

logger = logging.getLogger(__name__)


[docs] class ObjectiveFunc(ParametrizableMixin, ABC): """Abstract objective function with built-in fitness conversion. Subclasses must implement :meth:`objective`, which returns the raw objective value. The base class automatically converts it to a *fitness* that is always maximized (flipping the sign for minimization) and applies a penalty if a :class:`ConstraintHandler` is present. Parameters ---------- dimension : int Number of decision variables. lower_bound : float or array-like, optional Lower bound(s) of the feasible region. When both bounds are given, a :class:`ClipBoundConstraint` is added automatically. upper_bound : float or array-like, optional Upper bound(s) of the feasible region. constraint_handler : ConstraintHandler, optional Handler that can repair solutions and/or compute penalties. mode : str, optional ``"max"`` or ``"min"``. The fitness is always maximized internally; the mode controls the sign conversion. name : str, optional Human-readable name for this function. vectorized : bool, optional If ``True``, :meth:`objective` receives the whole population at once and must return an array. recalculate : bool, optional If ``True``, every individual is re-evaluated even if its fitness has already been computed. **kwargs Additional keyword arguments stored as schedulable parameters. """ def __init__( self, dimension: int, lower_bound: Optional[ScalarLike | VectorLike] = None, upper_bound: Optional[ScalarLike | VectorLike] = None, constraint_handler: Optional[ConstraintHandler] = None, mode: str = "max", name: str = "Some function", vectorized: bool = False, recalculate: bool = False, **kwargs, ): super().__init__() self.dimension = dimension self.lower_bound = lower_bound self.upper_bound = upper_bound if lower_bound is not None and upper_bound is not None: bound_constraint_handler = ClipBoundConstraint(dimension, lower_bound, upper_bound) if constraint_handler is None: constraint_handler = bound_constraint_handler else: constraint_handler = CompositeConstraint([constraint_handler, bound_constraint_handler]) self.constraint_handler = constraint_handler self.name = name self.counter = 0 self.factor = 1 self.vectorized = vectorized self.recalculate = recalculate if mode not in ["max", "min"]: raise ValueError('Optimization objective (mode) must be "min" or "max".') self.mode = mode if mode == "min": self.factor = -1 self.store_kwargs(**kwargs) def __call__(self, population: Population, adjusted: bool = True, parallel: bool = False, threads: int = 8) -> VectorLike: """ Shorthand for executing the objective function on a vector. """ return self.fitness(population, adjusted)
[docs] def fitness(self, population: Population, parallel: bool = False, threads: int = 8) -> VectorLike: """Evaluate fitness for the whole population. The raw objective values are computed via :meth:`objective`, penalties are subtracted, and the result (always maximized) is stored in ``population.fitness``. Individuals that already have a valid fitness are skipped unless :attr:`recalculate` is set. Parameters ---------- population : Population The population whose individuals will be evaluated. parallel : bool, optional Reserved for future use, currently ignored. threads : int, optional Reserved for future use, currently ignored. Returns ------- ndarray The new fitness values (also written in-place). """ if parallel: logger.warning("Parallel fitness computing not available at the moment. Ignoring parallel option.") logger.debug("Calculating fitness of the population...") fitness = population.fitness objective = population.objective solutions = population.decode() genotypes = population.genotype_matrix if not self.recalculate and np.all(population.fitness_calculated == 1): logger.debug("Fitness was not calculated. Every individual is duplicated.") return population.fitness if self.recalculate: fitness_mask = np.ones(population.population_size, dtype=bool) else: fitness_mask = population.fitness_calculated == 0 # Penalty is always vectorized. We use the genotype instead of the decoded solutions penalty_vector = self.constraint_handler.penalty(genotypes[fitness_mask]) if self.vectorized: if isinstance(solutions, np.ndarray): solutions = solutions[fitness_mask] else: solutions = [solutions[i] for i, include_value in enumerate(fitness_mask) if include_value] objective_values = self.objective(solutions) fitness_values = self.factor * (objective_values - penalty_vector) fitness[fitness_mask] = fitness_values objective[fitness_mask] = objective_values else: # Expand the penalty to have the size `pop_size` penalty_vector_aux = np.zeros(population.population_size) penalty_vector_aux[fitness_mask] = penalty_vector penalty_vector = penalty_vector_aux for idx, (solution, do_calculation) in enumerate(zip(solutions, fitness_mask)): if self.recalculate or do_calculation: objective_value = self.objective(solution) fitness_value = self.factor * (objective_value - penalty_vector[idx]) fitness[idx] = fitness_value objective[idx] = objective_value self.counter += np.count_nonzero(fitness_mask) population.fitness_calculated = np.ones_like(fitness_mask) # Write the fitness and objective values in-place population.fitness = fitness population.objective = objective logger.debug("Done calculating the fitness.") return fitness
[docs] @abstractmethod def objective(self, solution: Any) -> VectorLike | ScalarLike: """ Implementation of the objective function. Parameters ---------- solution: Any The solution for which the fitness will be calculated. Returns ------- objective_value: VectorLike | ScalarLike Value of the objective function given a solution. """
[docs] def repair_solution(self, solution: MatrixLike) -> MatrixLike: """ Transforms an invalid vector into one that satisfies the restrictions of the problem. Parameters ---------- solution: MatrixLike A solution that could be violating the restrictions of the problem. Returns ------- repaired_solution: MatrixLike A modified version of the solution passed that satisfies the restrictions of the problem. """ return self.constraint_handler.repair_solution(solution)
[docs] def add_parameter_constraints(self, parameter_extending_encoding: ParameterExtendingEncoding, param_handlers: dict[str, ConstraintHandler]): """Attach extra constraint handlers for extended encodings (e.g., PSO). Parameters ---------- parameter_extending_encoding : ParameterExtendingEncoding The encoding that splits the genotype into solution and auxiliary parameters. param_handlers : dict Mapping from parameter names to :class:`ConstraintHandler` instances. """ if isinstance(self.constraint_handler, ExtendedConstraintHandler): assert self.constraint_handler.param_handler_dict.keys() == param_handlers.keys() base_constraint_handler = self.constraint_handler self.constraint_handler = ExtendedConstraintHandler( solution_handler=base_constraint_handler, param_handler_dict=param_handlers, encoding=parameter_extending_encoding )
[docs] def restart(self): """Reset the evaluation counter to zero.""" self.counter = 0
[docs] def get_state(self) -> dict: """Return a dictionary with the current configuration. Returns ------- dict Keys include ``class_name``, ``name``, constraint handler state, and all stored parameters. """ data = {"class_name": self.__class__.__name__, "name": self.name, "constraint": self.constraint_handler.get_state(), **self.get_params()} return data
[docs] class NullObjectiveFunc(ObjectiveFunc): """Objective function that always returns zero. Useful as a placeholder in tests or when the optimization criterion is handled entirely by constraints. Parameters ---------- **kwargs Forwarded to :class:`ObjectiveFunc`. """ def __init__(self, **kwargs): super().__init__(name="Null objective", **kwargs)
[docs] def objective(self, _) -> VectorLike | ScalarLike: return 0
[docs] class ObjectiveFromLambda(ObjectiveFunc): """ Objective function indicated by a function call. Parameters ---------- obj_func: Callable Objective function as a callable object. dimension: int The dimension of the vectors accepted by the objective function. mode: str, optional Whether to maximize or minimize the function (using the string 'max' or 'min'). lower_bound: float, optional Lower limit restriction for the vectors. upper_bound: float, optional Upper limit restriction for the vectors. name: str, optional The name that will be displayed to represent this function. """ def __init__( self, obj_func: Callable, dimension: int, lower_bound: Optional[ScalarLike | VectorLike] = None, upper_bound: Optional[ScalarLike | VectorLike] = None, constraint_handler: Optional[ConstraintHandler] = None, mode: str = "max", name: Optional[str] = None, vectorized: bool = False, recalculate: bool = False, **kwargs, ): if name is None: name = obj_func.__name__ self.obj_func = obj_func super().__init__( dimension=dimension, lower_bound=lower_bound, upper_bound=upper_bound, constraint_handler=constraint_handler, mode=mode, name=name, vectorized=vectorized, recalculate=recalculate, **kwargs, )
[docs] def objective(self, solution: Any) -> VectorLike | ScalarLike: return self.obj_func(solution, **self.current_kwargs)