"""
Base class for the Algorithm module.
This module implements the main loop of the optimization algorithm using a search strategy.
"""
from __future__ import annotations
import logging
from typing import Tuple, Any, Optional
import json
import signal
from .history_tracker import HistoryTracker
from .reporters import create_reporter
from .reporter import Reporter
from .reporters import VerboseReporter
from .objective_function import ObjectiveFunc
from .search_strategy import SearchStrategy
from .population import Population
from .stopping_condition import StoppingCondition
from .initializer import Initializer
from .checkpointer import Checkpointer
from .utils import NumpyEncoder, VectorLike
logger = logging.getLogger(__name__)
[docs]
class TerminationException(Exception):
"""
Custom exception to handle SIGTERM
"""
[docs]
class Algorithm:
"""
Orchestrates a complete optimisation run.
An :class:`Algorithm` combines a :class:`ObjectiveFunc` with a
:class:`SearchStrategy` and manages the iteration loop, stopping
conditions, reporting, history tracking, and checkpointing.
All runtime settings can be supplied as plain keyword arguments
(e.g., ``max_iterations=200``) or as pre-built objects
(:class:`StoppingCondition`, :class:`Reporter`, etc.). The
keyword-argument style is convenient for quick experiments; the
object-based style gives finer control and reusability.
Parameters
----------
objfunc : ObjectiveFunc
The objective function to optimise.
search_strategy : SearchStrategy
Strategy that defines one iteration of the algorithm.
name : str, optional
Display name for the algorithm (defaults to the strategy's name).
stop_cond : str, optional
Expression that defines the stopping condition (see
:class:`StoppingCondition`). Default ``"real_time_limit"``.
progress_metric : str, optional
Token used to compute the 0-1 progress value for parameter
schedules. Defaults to the same tokens as *stop_cond*.
max_iterations : int, optional
Maximum number of iterations (default 1000).
max_evaluations : int, optional
Maximum number of objective evaluations (default 1e5).
real_time_limit : float, optional
Wall-clock time limit in seconds (default 60).
cpu_time_limit : float, optional
CPU time limit in seconds (default 60).
objective_target : float, optional
Target value for the raw objective (default 1e-10).
max_patience : int, optional
Iterations without improvement before ``convergence`` stops
(default 100).
verbose_timer : float, optional
Interval in seconds between prints when using the default
:class:`VerboseReporter` (default 0.5).
track_median / track_worst / track_full_objective / track_full_population / track_diversity : bool, optional
Flags forwarded to the :class:`HistoryTracker` when one is not
supplied explicitly.
checkpoint_file / checkpoint_time_frequency / checkpoint_iteration_frequency : optional
Arguments used to construct a :class:`Checkpointer` when
*checkpointer* is not given.
stopping_condition : StoppingCondition, optional
Explicit stopping condition object.
reporter : str or Reporter, optional
Reporter instance or name (``"tqdm"``, ``"silent"``, ``"verbose"``).
history_tracker : HistoryTracker, optional
Explicit history tracker.
checkpointer : Checkpointer, optional
Explicit checkpointer.
parallel : bool, optional
Whether to evaluate the population in parallel (currently
reserved for future use).
threads : int, optional
Number of threads for parallel evaluation (reserved).
"""
def __init__(
self,
objfunc: ObjectiveFunc,
search_strategy: SearchStrategy,
name: Optional[str] = None,
stop_cond: str = "real_time_limit",
progress_metric: Optional[str] = None,
max_iterations: int = 1000,
max_evaluations: int = 1e5,
real_time_limit: float = 60.0,
cpu_time_limit: float = 60.0,
objective_target: float = 1e-10,
max_patience: int = 100,
verbose_timer: float = 0.5,
track_median: bool = False,
track_worst: bool = False,
track_full_objective: bool = False,
track_full_population: bool = False,
track_diversity: bool = False,
checkpoint_file: Optional[str] = None,
checkpoint_time_frequency: Optional[float] = None,
checkpoint_iteration_frequency: Optional[float] = None,
stopping_condition: Optional[StoppingCondition] = None,
reporter: Optional[str | Reporter] = None,
history_tracker: Optional[HistoryTracker] = None,
checkpointer: Optional[Checkpointer] = None,
parallel: bool = False,
threads: int = 8,
):
super().__init__()
self.search_strategy = search_strategy
self.objfunc = objfunc
if name is None:
name = self.search_strategy.name
self.name = name
# Parallel parameters
self.parallel = parallel
self.threads = threads
# Stopping conditions
if stopping_condition is None:
stopping_condition = StoppingCondition(
condition_str=stop_cond,
progress_metric_str=progress_metric,
real_time_limit=real_time_limit,
cpu_time_limit=cpu_time_limit,
objective_target=objective_target,
max_evaluations=max_evaluations,
max_iterations=max_iterations,
max_patience=max_patience,
optimization_mode=objfunc.mode,
)
self.stopping_condition = stopping_condition
# Reporter
if reporter is None:
reporter = VerboseReporter(verbose_timer=verbose_timer)
elif isinstance(reporter, str):
reporter = create_reporter(reporter)
self.reporter = reporter
# History Tracker
if history_tracker is None:
history_tracker = HistoryTracker(
track_best=True,
track_median=track_median,
track_worst=track_worst,
track_full_objective=track_full_objective,
track_full_population=track_full_population,
track_diversity=track_diversity,
)
self.history_tracker = history_tracker
# Checkpointer
if checkpointer is not None or checkpoint_file is not None:
if checkpointer is None:
checkpointer = Checkpointer(
checkpoint_file=checkpoint_file,
iteration_frequency=checkpoint_iteration_frequency,
time_frequency=checkpoint_time_frequency,
)
self.checkpointer = checkpointer
else:
logger.info("Checkpointing is disabled since no checkpoint file was indicated.")
self.checkpointer = None
self._stop_requested = False
signal.signal(signal.SIGTERM, self._signal_handler)
def _signal_handler(self, signum, frame):
"""Handles SIGINT Os-level signals.
Parameters
----------
signum : Signal number identifier (unused)
frame : Frame of the signal (unused)
"""
self._stop_requested = True
@property
def initializer(self) -> Initializer:
return self.search_strategy.initializer
@initializer.setter
def initializer(self, new_initializer):
self.search_strategy.initializer = new_initializer
@property
def iterations(self) -> int:
return self.stopping_condition.iterations
@property
def evaluations(self) -> int:
return self.objfunc.counter
@property
def patience_left(self) -> int:
return self.stopping_condition.patience_left
@property
def progress(self) -> float:
return self.stopping_condition.get_progress()
@property
def population(self) -> Population:
return self.search_strategy.population
[docs]
def gather_parameters(self) -> dict:
"""
Collect the current parameters of the underlying search strategy.
Returns
-------
dict
A dictionary of parameter names and their current values.
"""
return self.search_strategy.gather_parameters()
[docs]
def best_solution(self) -> Tuple[Any, float]:
"""
Return the best decoded solution and its raw objective value.
Returns
-------
best_solution: Tuple[Any, float]
A pair of the best individual with its objective value.
"""
return self.search_strategy.best_solution()
[docs]
def best_individual(self) -> Tuple[VectorLike, float]:
"""
Return the best genotype and its internal fitness value.
Returns
-------
best_solution: Tuple[VectorLike, float]
A pair of the best individual with its fitness.
"""
return self.search_strategy.best_individual()
[docs]
def restart(self, restart_objfunc: bool = True):
"""
Reset internal counters and, optionally, the objective function.
Parameters
----------
restart_objfunc : bool, optional
If ``True``, also reset the objective function's evaluation
counter.
"""
if restart_objfunc:
self.objfunc.restart()
self.stopping_condition.restart()
self.history_tracker.restart()
if self.checkpointer is not None:
self.checkpointer.restart()
logger.debug("Reset the data of the algorithm.")
[docs]
def initialize(self, reset_objfunc: bool = True) -> Population:
"""
Create and evaluate the initial population.
Parameters
----------
reset_objfunc : bool, optional
Passed through to :meth:`restart`.
Returns
-------
Population
The evaluated initial population.
"""
self.restart(reset_objfunc)
initial_population = self.search_strategy.initialize(self.objfunc)
initial_population = self.search_strategy.evaluate_population(initial_population, self.parallel, self.threads)
self.search_strategy.population = initial_population
return initial_population
def _log_debug(self, text, population):
"""
Util for debugging population info.
"""
if logger.isEnabledFor(logging.DEBUG):
logger.debug(text, population.debug_repr())
[docs]
def step(self, population: Population = None) -> Population:
"""
Execute one iteration of the optimisation loop.
The default implementation performs: parent selection ->
perturbation -> evaluation -> survivor selection.
Parameters
----------
population : Population, optional
The population at the start of the iteration. If not given,
the currently stored population is used.
Returns
-------
Population
The population after the iteration.
"""
# Get the population of this generation
if population is None:
population = self.search_strategy.population
else:
self.search_strategy.population = population
self._log_debug("Original population:\n%s", population)
# Generate their parents
parents = self.search_strategy.select_parents(population)
self._log_debug("Parent selection\n%s", parents)
# Evolve the selected parents
offspring = self.search_strategy.perturb(parents)
self._log_debug("Perturbed\n%s", offspring)
# Get the fitness of the individuals
offspring = self.search_strategy.evaluate_population(offspring, self.parallel, self.threads)
self._log_debug("Evaluated\n%s", offspring)
# Select the individuals that remain for the next generation
new_population = self.search_strategy.select_individuals(population, offspring)
self._log_debug("Selected\n%s", new_population)
self.search_strategy.population = new_population
# Update in cascade all the objects involved in the optimization
self.search_strategy.step(progress=self.progress)
self._log_debug("Updated end\n%s", new_population)
return new_population
[docs]
def resume(self) -> Population:
"""
Resume an interrupted run from the last checkpoint.
Returns
-------
Population
The final population after the run completes.
"""
return self.optimize(resume=True)
[docs]
def optimize(self, resume: bool = False) -> Population:
"""
Run the optimisation loop until a stopping condition is met.
Parameters
----------
resume : bool, optional
If ``True``, do not reset the algorithm state - continue
from the current population and counters.
Returns
-------
Population
The final population.
Raises
------
KeyboardInterrupt, TerminationException
If the process is interrupted, a checkpoint is attempted
before re-raising.
"""
self.reporter.log_init(self)
# initialize clocks
if not resume:
self.restart()
self.stopping_condition.restart()
# Initialize search strategy and record initial values.
logger.info("Generating initial solutions...")
population = self.population if resume else self.initialize()
self.history_tracker.step(self)
# Search until the stopping condition is met
logger.info("Starting main optimization loop...")
try:
while not self.stopping_condition.is_finished(self.search_strategy.finish):
logger.debug("Started iteration %d...", self.iterations)
population = self.step(population=population)
self.stopping_condition.step(self.population)
self.reporter.log_step(self)
self.history_tracker.step(self)
if self.checkpointer is not None:
self.checkpointer.checkpoint(self)
if self._stop_requested:
raise TerminationException
except (KeyboardInterrupt, TerminationException) as e:
if self.checkpointer is not None:
self.checkpointer.save(self)
self.reporter.log_end(self)
logger.info("Optimization aborted by an OS signal.")
raise e
self.reporter.log_end(self)
logger.info("Optimization finished.")
return population
[docs]
def get_state(self, store_population: bool = False) -> dict:
"""
Serialise the current algorithm state to a dictionary.
Parameters
----------
store_population : bool, optional
If ``True``, include the complete genotype matrix.
Returns
-------
dict
Dictionary representation of the algorithm state.
"""
data = {
"class_name": self.__class__.__name__,
"name": self.name,
"objfunc": self.objfunc.get_state(),
"stopping_condition": self.stopping_condition.get_state(),
"search_strategy": self.search_strategy.get_state(store_population),
"history": self.history_tracker.get_state(),
}
return data
[docs]
def store_state(
self,
file_name: str = "dumped_state.json",
readable: bool = False,
):
"""
Serialise the current algorithm state to a JSON file.
Parameters
----------
file_name : str, optional
Destination path (default ``"dumped_state.json"``).
readable : bool, optional
If ``True``, produce indented JSON (larger but human
readable).
"""
dumped = json.dumps(self.get_state(), cls=NumpyEncoder, indent=4 if readable else None)
with open(file_name, "w", encoding="utf-8") as fp:
fp.write(dumped)
[docs]
def to_pandas(self):
"""
Shorthand for ``self.history_tracker.to_pandas()``.
Returns
-------
pandas.DataFrame
Per-iteration summary of tracked metrics.
"""
return self.history_tracker.to_pandas()
[docs]
def to_pandas_full_objective(self):
"""
Shorthand for ``self.history_tracker.to_pandas_full_objective()``.
Returns
-------
pandas.DataFrame
Wide-format DataFrame with the full objective vector per
generation.
"""
return self.history_tracker.to_pandas_full_objective()