Source code for metaheuristic_designer.stopping_condition

"""Module for algorithm stopping conditions and progress metric evaluation."""

from __future__ import annotations
from abc import ABC, abstractmethod
from typing import List, Optional, TYPE_CHECKING
import logging
import time
from dataclasses import dataclass
import pyparsing as pp

if TYPE_CHECKING:
    from .algorithm import Algorithm

logger = logging.getLogger(__name__)


[docs] class StoppingCondition(ABC):
[docs] @abstractmethod def restart(self): """Reset all counters and timers for a fresh run."""
[docs] @abstractmethod def update(self, algorithm: Algorithm): """Advance internal counters after one generation. Parameters ---------- current_population : Population The population at the end of the current generation. Its best objective is used to update convergence tracking. """
[docs] @abstractmethod def is_finished(self, finished: bool = False) -> bool: """ Given the state of the algorithm, returns wether we have finished or not. Parameters ---------- real_time_start: float The time in seconds that passed since the algorithm was executed. cpu_time_start: float The time in seconds that the CPU has executed code in this algorithm. Returns ------- has_stopped: bool Whether the algorithm has reached its end """
[docs] @abstractmethod def get_progress(self) -> float: """ Compute the current progress (0-1) according to the progress metric. Parameters ---------- gen: int The number of generations that has passed. real_time_start: float The time in seconds that passed since the algorithm was executed. cpu_time_start: float The time in seconds that the CPU has executed code in this algorithm. Returns ------- float A value between 0 (start) and 1 (finished). """
[docs] @abstractmethod def get_state(self) -> dict: """Return a dictionary with the current state of the stopping condition. Returns ------- dict Keys include ``iterations``, ``evaluations``, times, and configuration limits. """
[docs] @dataclass class ParsedStoppingCondition(StoppingCondition): """Encapsulate the logic that decides when an optimization run should end. A stopping condition is built from a logical expression that combines **tokens** with ``and``, ``or`` and parentheses. Each token has a corresponding numeric limit. The same expression (or a separate one) can be used to compute a progress value between 0 and 1 for parameter schedules. Parameters ---------- condition_str : str Logical expression defining when to stop (e.g. ``"max_iterations or real_time_limit"``). progress_metric_str : str, optional Logical expression defining how to compute the 0-1 progress value. Defaults to *condition_str*. max_iterations : int, optional Maximum number of generations. max_evaluations : int, optional Maximum number of objective function evaluations. real_time_limit : float, optional Wall-clock time limit in seconds. cpu_time_limit : float, optional CPU time limit in seconds. objective_target : float, optional Target value for the raw objective. max_patience : int, optional Consecutive iterations without improvement before ``"convergence"`` triggers. optimization_mode : str, optional ``"max"`` or ``"min"``, how the objective target and convergence are evaluated. """ condition_str: str progress_metric_str: Optional[str] = None max_iterations: int = None max_evaluations: int = None real_time_limit: float = None cpu_time_limit: float = None objective_target: float = None max_patience: int = None optimization_mode: str = "max" def __post_init__(self): self.condition_str = self.condition_str self.stop_cond_parsed = parse_stopping_cond(self.condition_str) self._validate_required_params(self.condition_str, "stopping condition") if self.progress_metric_str is None: self.progress_metric_str = self.condition_str self.progress_metric_parsed = self.stop_cond_parsed else: self.progress_metric_parsed = parse_stopping_cond(self.progress_metric_str) self._validate_required_params(self.progress_metric_str, "progress metric") # Set max patience to a sensible value, we need it to keep track of the missed iterations if self.max_patience is None: self.max_patience = 1 self.patience_left = self.max_patience self.iterations = 0 self.evaluations = 0 self.real_time_start = time.time() self.cpu_time_start = time.process_time() self.real_time_spent = 0 self.cpu_time_spent = 0 self.prev_best_objective = None self.first_best_objective = None self.best_objective = None def _validate_required_params(self, source_str: str, context: str): """Raise ValueError if a token appears in *source_str* but the corresponding parameter is None.""" # Mapping from token to (attribute_name, value) _token_map = { "max_evaluations": ("max_evaluations", self.max_evaluations), "max_iterations": ("max_iterations", self.max_iterations), "real_time_limit": ("real_time_limit", self.real_time_limit), "cpu_time_limit": ("cpu_time_limit", self.cpu_time_limit), "objective_target": ("objective_target", self.objective_target), "convergence": ("max_patience", self.max_patience), } for token, (attr, attr_value) in _token_map.items(): if token in source_str and attr_value is None: raise ValueError(f'"{token}" appears in the {context} but "{attr}" is not set.')
[docs] def restart(self): """Reset all counters and timers for a fresh run.""" self.iterations = 0 self.evaluations = 0 self.real_time_start = time.time() self.cpu_time_start = time.process_time() self.real_time_spent = 0 self.cpu_time_spent = 0 self.prev_best_objective = None self.first_best_objective = None
[docs] def update(self, algorithm: Algorithm): """Advance internal counters after one generation. Parameters ---------- current_population : Population The population at the end of the current generation. Its best objective is used to update convergence tracking. """ current_population = algorithm.population objfunc = algorithm.objfunc self.iterations += 1 self.evaluations = objfunc.counter self.real_time_spent = time.time() - self.real_time_start self.cpu_time_spent = time.process_time() - self.cpu_time_start if self.optimization_mode in {"max", "min"}: _, best_objective = current_population.best_solution() if self.prev_best_objective is not None: if self.optimization_mode == "max": improves = best_objective <= self.prev_best_objective else: improves = best_objective >= self.prev_best_objective if improves: self.patience_left -= 1 else: self.patience_left = self.max_patience if self.first_best_objective is None: self.first_best_objective = best_objective self.prev_best_objective = best_objective self.best_objective = best_objective else: self.prev_best_objective = best_objective logger.debug( "Updated stopping condition parameters:\nfunc. evaluations = %d\n" "generations = %d\ntime = %f\ncpu_time = %f\nbest = %f\npatience = %d", self.evaluations, self.iterations, self.real_time_start, self.cpu_time_start, self.best_objective, self.patience_left, )
[docs] def is_finished(self, finished: bool = False) -> bool: """ Given the state of the algorithm, returns wether we have finished or not. Parameters ---------- real_time_start: float The time in seconds that passed since the algorithm was executed. cpu_time_start: float The time in seconds that the CPU has executed code in this algorithm. Returns ------- has_stopped: bool Whether the algorithm has reached its end """ if finished: return True neval_reached = self.evaluations >= self.max_evaluations if self.max_evaluations is not None else False ngen_reached = self.iterations >= self.max_iterations if self.max_iterations is not None else False real_time_reached = self.real_time_spent >= self.real_time_limit if self.real_time_limit is not None else False cpu_time_reached = self.cpu_time_spent >= self.cpu_time_limit if self.cpu_time_limit is not None else False if (self.objective_target is not None) and (self.optimization_mode == "max"): target_reached = (self.best_objective is not None) and (self.best_objective >= self.objective_target) elif (self.objective_target is not None) and (self.optimization_mode == "min"): target_reached = (self.best_objective is not None) and (self.best_objective <= self.objective_target) else: target_reached = False if (self.patience_left is not None) and (self.optimization_mode in {"min", "max"}): patience_reached = self.patience_left <= 0 else: patience_reached = False logger.debug( "Evaluating stopping condition:\n\tcondition: %s\n\tfunc. evaluations = %d / %d" "\n\tgenerations = %d / %d\n\ttime = %f / %f\n\tcpu_time = %f / %f\n\tbest = %f / %f\n\tpatience = %d", self.condition_str, self.max_evaluations, self.evaluations, self.iterations, self.max_iterations, self.real_time_start, time.time(), self.cpu_time_start, time.process_time(), self.best_objective, self.objective_target, self.patience_left, ) return process_condition( self.stop_cond_parsed, neval_reached, ngen_reached, real_time_reached, cpu_time_reached, target_reached, patience_reached )
[docs] def get_progress(self) -> float: """ Compute the current progress (0-1) according to the progress metric. Parameters ---------- gen: int The number of generations that has passed. real_time_start: float The time in seconds that passed since the algorithm was executed. cpu_time_start: float The time in seconds that the CPU has executed code in this algorithm. Returns ------- float A value between 0 (start) and 1 (finished). """ tol = 1e-12 neval_reached = self.evaluations / self.max_evaluations if self.max_evaluations is not None else 0 ngen_reached = self.iterations / self.max_iterations if self.max_iterations is not None else 0 real_time_reached = self.real_time_spent / self.real_time_limit if self.real_time_limit is not None else 0 cpu_time_reached = self.cpu_time_spent / self.cpu_time_limit if self.cpu_time_limit is not None else 0 if (self.objective_target is not None) and (self.optimization_mode in {"min", "max"}): if self.first_best_objective is not None and abs(self.first_best_objective - self.objective_target) <= tol * max( abs(self.first_best_objective), abs(self.objective_target), 1 ): target_progress = 1 elif self.first_best_objective is not None: fit_init_dist = self.first_best_objective - self.objective_target fit_dist = self.best_objective - self.objective_target target_progress = 1 - fit_dist / fit_init_dist else: target_progress = 0 else: target_progress = 0 if (self.max_patience is not None) and (self.optimization_mode in {"min", "max"}): patience_percentage = 1 - self.patience_left / self.max_patience else: patience_percentage = 0 return process_progress( self.progress_metric_parsed, neval_reached, ngen_reached, real_time_reached, cpu_time_reached, target_progress, patience_percentage )
[docs] def get_state(self) -> dict: """Return a dictionary with the current state of the stopping condition. Returns ------- dict Keys include ``iterations``, ``evaluations``, times, and configuration limits. """ data = { "class_name": self.__class__.__name__, "stopped": self.is_finished(), "progress": self.get_progress(), "stop_condition": self.condition_str, "progress_metric": self.progress_metric_str, } if self.max_patience is not None: data["max_patience"] = self.max_patience if self.max_iterations is not None: data["max_iterations"] = self.max_iterations if self.max_evaluations is not None: data["max_evaluations"] = self.max_evaluations if self.real_time_limit is not None: data["real_time_limit"] = self.real_time_limit if self.cpu_time_limit is not None: data["cpu_time_limit"] = self.cpu_time_limit data.update( { "patience_left": self.patience_left, "iterations": self.iterations, "evaluations": self.evaluations, "real_time_spent": self.real_time_spent, "cpu_time_spent": self.cpu_time_spent, } ) return data
[docs] def parse_stopping_cond(condition_str: str) -> List[str | List]: """ This function parses an expression of the form "neval or cpu_time" into a tree structure so that it can be further processed. Parameters ---------- condition_str: str The string to be parsed. Returns ------- token_list: List[str | List] The list of tokens representing the original string. """ orop = pp.Literal("or") andop = pp.Literal("and") condition = pp.one_of(["max_evaluations", "max_iterations", "real_time_limit", "cpu_time_limit", "objective_target", "convergence"]) expr = pp.infix_notation(condition, [(orop, 2, pp.opAssoc.RIGHT), (andop, 2, pp.opAssoc.RIGHT)]) return expr.parse_string(condition_str).as_list()
[docs] def process_condition(cond_parsed: List[str | List], neval: int, ngen: int, real_time: float, cpu_time: float, target: float, patience: int) -> bool: """ This function receives as an input an expression for the stopping condition and the truth variable of the possible stopping conditions and returns wether to stop or not. Parameters ---------- cond_parsed: List[str | List] The list of tokens representing the parsed stopping condition. neval: int Number of function evaluations done. ngen: int Number of iterations done by the algorithm real_time: float Time since the start of the algorithm. cpu_time: float Time dedicated by the CPU to optimizing our function. target: float Fitness target. patience: int Number of time the algorithm has reached the same fitness value in a row. Returns ------- has_stopped: bool Whether the algorithm has reached its end """ result = None match cond_parsed: case [cond1, "and", cond2]: cond1_parsed = process_condition(cond1, neval, ngen, real_time, cpu_time, target, patience) cond2_parsed = process_condition(cond2, neval, ngen, real_time, cpu_time, target, patience) result = cond1_parsed and cond2_parsed case [cond1, "or", cond2]: cond1_parsed = process_condition(cond1, neval, ngen, real_time, cpu_time, target, patience) cond2_parsed = process_condition(cond2, neval, ngen, real_time, cpu_time, target, patience) result = cond1_parsed or cond2_parsed case [cond1]: result = process_condition(cond1, neval, ngen, real_time, cpu_time, target, patience) case "max_evaluations": result = neval case "max_iterations": result = ngen case "real_time_limit": result = real_time case "cpu_time_limit": result = cpu_time case "objective_target": result = target case "convergence": result = patience return result
[docs] def process_progress(cond_parsed: List[str | List], neval: int, ngen: int, real_time: float, cpu_time: float, target: float, patience: int) -> float: """ This function receives as an input an expression for the stopping condition and the truth variable of the possible stopping conditions and returns wether to stop or not. Parameters ---------- cond_parsed: List[str | List] The list of tokens representing the parsed stopping condition. neval: int Number of function evaluations done. ngen: int Number of iterations done by the algorithm real_time: float Time since the start of the algorithm. cpu_time: float Time dedicated by the CPU to optimizing our function. target: float Fitness target. patience: int Number of time the algorithm has reached the same fitness value in a row. Returns ------- has_stopped: bool Indicator of how close it the algorithm to finishing, 1 means the algorithm should be stopped. """ result = None match cond_parsed: case [cond1, "and", cond2]: progress1 = process_progress(cond1, neval, ngen, real_time, cpu_time, target, patience) progress2 = process_progress(cond2, neval, ngen, real_time, cpu_time, target, patience) result = min(progress1, progress2) case [cond1, "or", cond2]: progress1 = process_progress(cond1, neval, ngen, real_time, cpu_time, target, patience) progress2 = process_progress(cond2, neval, ngen, real_time, cpu_time, target, patience) result = max(progress1, progress2) case [cond1]: result = process_progress(cond1, neval, ngen, real_time, cpu_time, target, patience) case "max_evaluations": result = neval case "max_iterations": result = ngen case "real_time_limit": result = real_time case "cpu_time_limit": result = cpu_time case "objective_target": result = target case "convergence": result = patience return result