"""Module for algorithm stopping conditions and progress metric evaluation."""
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import List, Optional, TYPE_CHECKING
import logging
import time
from dataclasses import dataclass
import pyparsing as pp
if TYPE_CHECKING:
from .algorithm import Algorithm
logger = logging.getLogger(__name__)
[docs]
class StoppingCondition(ABC):
[docs]
@abstractmethod
def restart(self):
"""Reset all counters and timers for a fresh run."""
[docs]
@abstractmethod
def update(self, algorithm: Algorithm):
"""Advance internal counters after one generation.
Parameters
----------
current_population : Population
The population at the end of the current generation. Its
best objective is used to update convergence tracking.
"""
[docs]
@abstractmethod
def is_finished(self, finished: bool = False) -> bool:
"""
Given the state of the algorithm, returns wether we have finished or not.
Parameters
----------
real_time_start: float
The time in seconds that passed since the algorithm was executed.
cpu_time_start: float
The time in seconds that the CPU has executed code in this algorithm.
Returns
-------
has_stopped: bool
Whether the algorithm has reached its end
"""
[docs]
@abstractmethod
def get_progress(self) -> float:
"""
Compute the current progress (0-1) according to the progress metric.
Parameters
----------
gen: int
The number of generations that has passed.
real_time_start: float
The time in seconds that passed since the algorithm was executed.
cpu_time_start: float
The time in seconds that the CPU has executed code in this algorithm.
Returns
-------
float
A value between 0 (start) and 1 (finished).
"""
[docs]
@abstractmethod
def get_state(self) -> dict:
"""Return a dictionary with the current state of the stopping condition.
Returns
-------
dict
Keys include ``iterations``, ``evaluations``, times, and
configuration limits.
"""
[docs]
@dataclass
class ParsedStoppingCondition(StoppingCondition):
"""Encapsulate the logic that decides when an optimization run should end.
A stopping condition is built from a logical expression that
combines **tokens** with ``and``, ``or`` and parentheses. Each
token has a corresponding numeric limit. The same expression
(or a separate one) can be used to compute a progress value
between 0 and 1 for parameter schedules.
Parameters
----------
condition_str : str
Logical expression defining when to stop (e.g.
``"max_iterations or real_time_limit"``).
progress_metric_str : str, optional
Logical expression defining how to compute the 0-1 progress
value. Defaults to *condition_str*.
max_iterations : int, optional
Maximum number of generations.
max_evaluations : int, optional
Maximum number of objective function evaluations.
real_time_limit : float, optional
Wall-clock time limit in seconds.
cpu_time_limit : float, optional
CPU time limit in seconds.
objective_target : float, optional
Target value for the raw objective.
max_patience : int, optional
Consecutive iterations without improvement before
``"convergence"`` triggers.
optimization_mode : str, optional
``"max"`` or ``"min"``, how the objective target and
convergence are evaluated.
"""
condition_str: str
progress_metric_str: Optional[str] = None
max_iterations: int = None
max_evaluations: int = None
real_time_limit: float = None
cpu_time_limit: float = None
objective_target: float = None
max_patience: int = None
optimization_mode: str = "max"
def __post_init__(self):
self.condition_str = self.condition_str
self.stop_cond_parsed = parse_stopping_cond(self.condition_str)
self._validate_required_params(self.condition_str, "stopping condition")
if self.progress_metric_str is None:
self.progress_metric_str = self.condition_str
self.progress_metric_parsed = self.stop_cond_parsed
else:
self.progress_metric_parsed = parse_stopping_cond(self.progress_metric_str)
self._validate_required_params(self.progress_metric_str, "progress metric")
# Set max patience to a sensible value, we need it to keep track of the missed iterations
if self.max_patience is None:
self.max_patience = 1
self.patience_left = self.max_patience
self.iterations = 0
self.evaluations = 0
self.real_time_start = time.time()
self.cpu_time_start = time.process_time()
self.real_time_spent = 0
self.cpu_time_spent = 0
self.prev_best_objective = None
self.first_best_objective = None
self.best_objective = None
def _validate_required_params(self, source_str: str, context: str):
"""Raise ValueError if a token appears in *source_str* but the
corresponding parameter is None."""
# Mapping from token to (attribute_name, value)
_token_map = {
"max_evaluations": ("max_evaluations", self.max_evaluations),
"max_iterations": ("max_iterations", self.max_iterations),
"real_time_limit": ("real_time_limit", self.real_time_limit),
"cpu_time_limit": ("cpu_time_limit", self.cpu_time_limit),
"objective_target": ("objective_target", self.objective_target),
"convergence": ("max_patience", self.max_patience),
}
for token, (attr, attr_value) in _token_map.items():
if token in source_str and attr_value is None:
raise ValueError(f'"{token}" appears in the {context} but "{attr}" is not set.')
[docs]
def restart(self):
"""Reset all counters and timers for a fresh run."""
self.iterations = 0
self.evaluations = 0
self.real_time_start = time.time()
self.cpu_time_start = time.process_time()
self.real_time_spent = 0
self.cpu_time_spent = 0
self.prev_best_objective = None
self.first_best_objective = None
[docs]
def update(self, algorithm: Algorithm):
"""Advance internal counters after one generation.
Parameters
----------
current_population : Population
The population at the end of the current generation. Its
best objective is used to update convergence tracking.
"""
current_population = algorithm.population
objfunc = algorithm.objfunc
self.iterations += 1
self.evaluations = objfunc.counter
self.real_time_spent = time.time() - self.real_time_start
self.cpu_time_spent = time.process_time() - self.cpu_time_start
if self.optimization_mode in {"max", "min"}:
_, best_objective = current_population.best_solution()
if self.prev_best_objective is not None:
if self.optimization_mode == "max":
improves = best_objective <= self.prev_best_objective
else:
improves = best_objective >= self.prev_best_objective
if improves:
self.patience_left -= 1
else:
self.patience_left = self.max_patience
if self.first_best_objective is None:
self.first_best_objective = best_objective
self.prev_best_objective = best_objective
self.best_objective = best_objective
else:
self.prev_best_objective = best_objective
logger.debug(
"Updated stopping condition parameters:\nfunc. evaluations = %d\n" "generations = %d\ntime = %f\ncpu_time = %f\nbest = %f\npatience = %d",
self.evaluations,
self.iterations,
self.real_time_start,
self.cpu_time_start,
self.best_objective,
self.patience_left,
)
[docs]
def is_finished(self, finished: bool = False) -> bool:
"""
Given the state of the algorithm, returns wether we have finished or not.
Parameters
----------
real_time_start: float
The time in seconds that passed since the algorithm was executed.
cpu_time_start: float
The time in seconds that the CPU has executed code in this algorithm.
Returns
-------
has_stopped: bool
Whether the algorithm has reached its end
"""
if finished:
return True
neval_reached = self.evaluations >= self.max_evaluations if self.max_evaluations is not None else False
ngen_reached = self.iterations >= self.max_iterations if self.max_iterations is not None else False
real_time_reached = self.real_time_spent >= self.real_time_limit if self.real_time_limit is not None else False
cpu_time_reached = self.cpu_time_spent >= self.cpu_time_limit if self.cpu_time_limit is not None else False
if (self.objective_target is not None) and (self.optimization_mode == "max"):
target_reached = (self.best_objective is not None) and (self.best_objective >= self.objective_target)
elif (self.objective_target is not None) and (self.optimization_mode == "min"):
target_reached = (self.best_objective is not None) and (self.best_objective <= self.objective_target)
else:
target_reached = False
if (self.patience_left is not None) and (self.optimization_mode in {"min", "max"}):
patience_reached = self.patience_left <= 0
else:
patience_reached = False
logger.debug(
"Evaluating stopping condition:\n\tcondition: %s\n\tfunc. evaluations = %d / %d"
"\n\tgenerations = %d / %d\n\ttime = %f / %f\n\tcpu_time = %f / %f\n\tbest = %f / %f\n\tpatience = %d",
self.condition_str,
self.max_evaluations,
self.evaluations,
self.iterations,
self.max_iterations,
self.real_time_start,
time.time(),
self.cpu_time_start,
time.process_time(),
self.best_objective,
self.objective_target,
self.patience_left,
)
return process_condition(
self.stop_cond_parsed, neval_reached, ngen_reached, real_time_reached, cpu_time_reached, target_reached, patience_reached
)
[docs]
def get_progress(self) -> float:
"""
Compute the current progress (0-1) according to the progress metric.
Parameters
----------
gen: int
The number of generations that has passed.
real_time_start: float
The time in seconds that passed since the algorithm was executed.
cpu_time_start: float
The time in seconds that the CPU has executed code in this algorithm.
Returns
-------
float
A value between 0 (start) and 1 (finished).
"""
tol = 1e-12
neval_reached = self.evaluations / self.max_evaluations if self.max_evaluations is not None else 0
ngen_reached = self.iterations / self.max_iterations if self.max_iterations is not None else 0
real_time_reached = self.real_time_spent / self.real_time_limit if self.real_time_limit is not None else 0
cpu_time_reached = self.cpu_time_spent / self.cpu_time_limit if self.cpu_time_limit is not None else 0
if (self.objective_target is not None) and (self.optimization_mode in {"min", "max"}):
if self.first_best_objective is not None and abs(self.first_best_objective - self.objective_target) <= tol * max(
abs(self.first_best_objective), abs(self.objective_target), 1
):
target_progress = 1
elif self.first_best_objective is not None:
fit_init_dist = self.first_best_objective - self.objective_target
fit_dist = self.best_objective - self.objective_target
target_progress = 1 - fit_dist / fit_init_dist
else:
target_progress = 0
else:
target_progress = 0
if (self.max_patience is not None) and (self.optimization_mode in {"min", "max"}):
patience_percentage = 1 - self.patience_left / self.max_patience
else:
patience_percentage = 0
return process_progress(
self.progress_metric_parsed, neval_reached, ngen_reached, real_time_reached, cpu_time_reached, target_progress, patience_percentage
)
[docs]
def get_state(self) -> dict:
"""Return a dictionary with the current state of the stopping condition.
Returns
-------
dict
Keys include ``iterations``, ``evaluations``, times, and
configuration limits.
"""
data = {
"class_name": self.__class__.__name__,
"stopped": self.is_finished(),
"progress": self.get_progress(),
"stop_condition": self.condition_str,
"progress_metric": self.progress_metric_str,
}
if self.max_patience is not None:
data["max_patience"] = self.max_patience
if self.max_iterations is not None:
data["max_iterations"] = self.max_iterations
if self.max_evaluations is not None:
data["max_evaluations"] = self.max_evaluations
if self.real_time_limit is not None:
data["real_time_limit"] = self.real_time_limit
if self.cpu_time_limit is not None:
data["cpu_time_limit"] = self.cpu_time_limit
data.update(
{
"patience_left": self.patience_left,
"iterations": self.iterations,
"evaluations": self.evaluations,
"real_time_spent": self.real_time_spent,
"cpu_time_spent": self.cpu_time_spent,
}
)
return data
[docs]
def parse_stopping_cond(condition_str: str) -> List[str | List]:
"""
This function parses an expression of the form "neval or cpu_time" into
a tree structure so that it can be further processed.
Parameters
----------
condition_str: str
The string to be parsed.
Returns
-------
token_list: List[str | List]
The list of tokens representing the original string.
"""
orop = pp.Literal("or")
andop = pp.Literal("and")
condition = pp.one_of(["max_evaluations", "max_iterations", "real_time_limit", "cpu_time_limit", "objective_target", "convergence"])
expr = pp.infix_notation(condition, [(orop, 2, pp.opAssoc.RIGHT), (andop, 2, pp.opAssoc.RIGHT)])
return expr.parse_string(condition_str).as_list()
[docs]
def process_condition(cond_parsed: List[str | List], neval: int, ngen: int, real_time: float, cpu_time: float, target: float, patience: int) -> bool:
"""
This function receives as an input an expression for the stopping condition
and the truth variable of the possible stopping conditions and returns wether to stop or not.
Parameters
----------
cond_parsed: List[str | List]
The list of tokens representing the parsed stopping condition.
neval: int
Number of function evaluations done.
ngen: int
Number of iterations done by the algorithm
real_time: float
Time since the start of the algorithm.
cpu_time: float
Time dedicated by the CPU to optimizing our function.
target: float
Fitness target.
patience: int
Number of time the algorithm has reached the same fitness value in a row.
Returns
-------
has_stopped: bool
Whether the algorithm has reached its end
"""
result = None
match cond_parsed:
case [cond1, "and", cond2]:
cond1_parsed = process_condition(cond1, neval, ngen, real_time, cpu_time, target, patience)
cond2_parsed = process_condition(cond2, neval, ngen, real_time, cpu_time, target, patience)
result = cond1_parsed and cond2_parsed
case [cond1, "or", cond2]:
cond1_parsed = process_condition(cond1, neval, ngen, real_time, cpu_time, target, patience)
cond2_parsed = process_condition(cond2, neval, ngen, real_time, cpu_time, target, patience)
result = cond1_parsed or cond2_parsed
case [cond1]:
result = process_condition(cond1, neval, ngen, real_time, cpu_time, target, patience)
case "max_evaluations":
result = neval
case "max_iterations":
result = ngen
case "real_time_limit":
result = real_time
case "cpu_time_limit":
result = cpu_time
case "objective_target":
result = target
case "convergence":
result = patience
return result
[docs]
def process_progress(cond_parsed: List[str | List], neval: int, ngen: int, real_time: float, cpu_time: float, target: float, patience: int) -> float:
"""
This function receives as an input an expression for the stopping condition
and the truth variable of the possible stopping conditions and returns wether to stop or not.
Parameters
----------
cond_parsed: List[str | List]
The list of tokens representing the parsed stopping condition.
neval: int
Number of function evaluations done.
ngen: int
Number of iterations done by the algorithm
real_time: float
Time since the start of the algorithm.
cpu_time: float
Time dedicated by the CPU to optimizing our function.
target: float
Fitness target.
patience: int
Number of time the algorithm has reached the same fitness value in a row.
Returns
-------
has_stopped: bool
Indicator of how close it the algorithm to finishing, 1 means the algorithm should be stopped.
"""
result = None
match cond_parsed:
case [cond1, "and", cond2]:
progress1 = process_progress(cond1, neval, ngen, real_time, cpu_time, target, patience)
progress2 = process_progress(cond2, neval, ngen, real_time, cpu_time, target, patience)
result = min(progress1, progress2)
case [cond1, "or", cond2]:
progress1 = process_progress(cond1, neval, ngen, real_time, cpu_time, target, patience)
progress2 = process_progress(cond2, neval, ngen, real_time, cpu_time, target, patience)
result = max(progress1, progress2)
case [cond1]:
result = process_progress(cond1, neval, ngen, real_time, cpu_time, target, patience)
case "max_evaluations":
result = neval
case "max_iterations":
result = ngen
case "real_time_limit":
result = real_time
case "cpu_time_limit":
result = cpu_time
case "objective_target":
result = target
case "convergence":
result = patience
return result