"""
Base class for the Population module.
This module implements a data structure to hold the collection of solutions we are considering.
"""
from __future__ import annotations
import logging
from typing import Iterable, Tuple, Any, Optional, Iterator
from copy import copy
import numpy as np
from .objective_function import ObjectiveFunc
from .encoding import Encoding, DefaultEncoding
from .encodings import ParameterExtendingEncoding
from .utils import VectorLike, MatrixLike, MaskLike
logger = logging.getLogger(__name__)
[docs]
class Population:
"""Container for a set of candidate solutions and their fitness.
A ``Population`` holds the genotype matrix, fitness and objective
values, historical bests, and the current best individual. It
is the central data structure passed between components of the
optimization loop.
Parameters
----------
objfunc : ObjectiveFunc
The objective function that will evaluate the population.
genotype_matrix : ndarray
2-D array of shape ``(N, M)`` containing the genotypes.
encoding : Encoding, optional
The encoding used to translate between genotype and phenotype.
Defaults to :class:`DefaultEncoding`.
"""
def __init__(self, objfunc: ObjectiveFunc, genotype_matrix: MatrixLike, encoding: Optional[Encoding] = None):
# Objective function
self.objfunc = objfunc
# Population of solutions
self.genotype_matrix = genotype_matrix
# Size of the population
self.population_size = genotype_matrix.shape[0]
self.dimension = genotype_matrix.shape[1]
# Fitness of each individual in the population
self.fitness = np.full(self.population_size, -np.inf)
self.objective = np.full(self.population_size, -np.inf)
self.fitness_calculated = np.zeros(self.population_size)
# Best solution found so far
self.best = None
self.best_fitness = None
self.best_objective = None
# Best individual in each spot of the population
self.historical_best_matrix = genotype_matrix
self.historical_best_fitness = np.full(self.population_size, -np.inf)
# Encoding to use
if encoding is None:
encoding = DefaultEncoding()
self.encoding = encoding
self.index = 0
def __len__(self) -> int:
return self.genotype_matrix.shape[0]
def __iter__(self) -> Iterator[VectorLike]:
for row in self.genotype_matrix:
yield row
def __repr__(self) -> str:
return (
"Population{"
f"\n\tobjfunc = {self.objfunc.name}"
f"\n\tgenotype_matrix = {self.genotype_matrix}"
f"\n\tpop_size = {self.population_size}"
f"\n\tvec_size = {self.dimension}"
f"\n\tfitness = {self.fitness}"
f"\n\tobjective = {self.objective}"
f"\n\tfitness_calculated = {self.fitness_calculated}"
f"\n\thistorical_best_matrix = {self.historical_best_matrix}"
f"\n\thistorical_best_fitness = {self.historical_best_fitness}"
f"\n\tbest = {self.best}"
f"\n\tbest_fitness = {self.best_fitness}"
f"\n\tbest_objective = {self.best_objective}"
"\n}"
)
def __copy__(self) -> Population:
copied_pop = Population(self.objfunc, copy(self.genotype_matrix), encoding=self.encoding)
copied_pop.fitness = copy(self.fitness)
copied_pop.objective = copy(self.objective)
copied_pop.fitness_calculated = copy(self.fitness_calculated)
copied_pop.historical_best_matrix = copy(self.historical_best_matrix)
copied_pop.historical_best_fitness = copy(self.historical_best_fitness)
copied_pop.best = copy(self.best)
copied_pop.best_fitness = copy(self.best_fitness)
copied_pop.best_objective = copy(self.best_objective)
return copied_pop
[docs]
def best_individual(self) -> Tuple[MatrixLike, float]:
"""Return the best genotype and its maximized fitness value.
Returns
-------
best_genotype : MatrixLike
The genotype vector of the best individual.
best_fitness : float
The internal fitness (always maximized).
"""
return self.best, self.best_fitness
[docs]
def best_solution(self) -> Tuple[Any, float]:
"""Return the best decoded solution and its raw objective value.
Returns
-------
solution : Any
The decoded phenotype of the best individual.
objective : float
The raw objective value.
"""
# Decode needs a matrix, so we ad a virtual dimension
best_solution_vec = self.best[None, :]
# The encoding returns an iterable of solutions, so we extract the first (and only) one.
best_solution_vec = self.encoding.decode(best_solution_vec)
if isinstance(best_solution_vec, np.ndarray) and best_solution_vec.ndim > 1:
best_solution_vec = best_solution_vec.squeeze()
else:
best_solution_vec = best_solution_vec[0]
return best_solution_vec, self.best_objective
[docs]
def update_genotype(self, genotype_source: MatrixLike | Population) -> Population:
"""Replace the genotype matrix.
Parameters
----------
genotype_source : ndarray or Population
New genotypes. If a ``Population`` is given, its genotype
matrix is used.
Returns
-------
Population
``self``, with updated genotypes and, if the size changed,
re-initialized fitness and historical bests.
"""
if isinstance(genotype_source, Population):
genotype_matrix = genotype_source.genotype_matrix
else:
genotype_matrix = genotype_source
if genotype_matrix.shape[1] != self.dimension:
raise ValueError("Individual vector size should not change when updating the population.")
if len(genotype_matrix) != len(self.genotype_matrix):
self.fitness = np.full(len(genotype_matrix), -np.inf)
self.objective = np.full(len(genotype_matrix), -np.inf)
self.fitness_calculated = np.zeros(len(genotype_matrix))
self.historical_best_fitness = np.full(len(genotype_matrix), -np.inf)
self.historical_best_matrix = copy(genotype_matrix)
logger.debug("Genotype matrix will change size.")
else:
self.fitness_calculated = np.all(self.genotype_matrix == genotype_matrix, axis=1)
self.genotype_matrix = genotype_matrix
self.population_size = genotype_matrix.shape[0]
logger.debug("Updated genotype matrix.")
return self
[docs]
def take_selection(self, selection_idx: MaskLike) -> Population:
"""
Takes a subset of the population given a mask.
Parameters
----------
selection_idx: ndarray
An array of indices or a mask that indicate which individuals to take from the population.
Returns
-------
selected_population: Population
A copy of the population containing only the chosen individuals.
"""
selected_genotype_matrix = copy(self.genotype_matrix[selection_idx, :])
selected_pop = Population(self.objfunc, selected_genotype_matrix, encoding=self.encoding)
selected_pop.fitness = copy(self.fitness[selection_idx])
selected_pop.objective = copy(self.objective[selection_idx])
selected_pop.fitness_calculated = copy(self.fitness_calculated[selection_idx])
selected_pop.historical_best_matrix = copy(self.historical_best_matrix[selection_idx, :])
selected_pop.historical_best_fitness = copy(self.historical_best_fitness[selection_idx])
selected_pop.best = copy(self.best)
selected_pop.best_fitness = copy(self.best_fitness)
selected_pop.best_objective = copy(self.best_objective)
logger.debug("Taken selection from population.")
return selected_pop
[docs]
def apply_selection(self, selected_pop: Population, selection_idx: MaskLike) -> Population:
"""
Replaces the chosen individuals from the input population to the current population.
Parameters
----------
selected_pop: Population
Population where to take the individuals that will replace the ones in the population.
selection_idx: ndarray
An array of indices or a mask that indicate which individuals to take from the population.
Returns
-------
self: Population
"""
self.genotype_matrix[selection_idx, :] = selected_pop.genotype_matrix
self.fitness_calculated[selection_idx] = False
self.fitness[selection_idx] = selected_pop.fitness
self.objective[selection_idx] = selected_pop.objective
self.historical_best_matrix[selection_idx, :] = selected_pop.historical_best_matrix
self.historical_best_fitness[selection_idx] = selected_pop.historical_best_fitness
if self.best is None or (selected_pop.best is not None and self.best_fitness < selected_pop.best_fitness):
self.best = selected_pop.best
self.best_fitness = selected_pop.best_fitness
self.best_objective = selected_pop.best_objective
logger.debug("Applied precomputed selection from population.")
return self
[docs]
def take_slice(self, mask: MaskLike) -> Population:
"""
Takes a subset of the components in the population vectors.
Parameters
----------
mask: ndarray
An array of indices or a mask that indicate which components to take from each vector in the population.
Returns
-------
sliced_population: Population
A copy of the population containing the masked individuals.
"""
sliced_genotype_matrix = copy(self.genotype_matrix[:, mask])
sliced_pop = Population(self.objfunc, sliced_genotype_matrix, encoding=self.encoding)
sliced_pop.dimension = sliced_genotype_matrix.shape[1]
sliced_pop.historical_best_matrix = copy(self.historical_best_matrix[:, mask])
sliced_pop.historical_best_fitness = copy(self.historical_best_fitness)
sliced_pop.fitness_calculated = copy(self.fitness_calculated)
sliced_pop.fitness = copy(self.fitness)
sliced_pop.objective = copy(self.objective)
sliced_pop.best = copy(self.best)
sliced_pop.best_fitness = copy(self.best_fitness)
sliced_pop.best_objective = copy(self.best_objective)
logger.debug("Taken slice from population.")
return sliced_pop
[docs]
def apply_slice(self, sliced_pop: Population, mask: MaskLike) -> Population:
"""
Apply the values of the population to a subset of the components of the population vectors.
Parameters
----------
sliced_pop: Population
Population where to take the individuals from which we will take the components that will replace the ones in the
current population.
mask: ndarray
An array of indices or a mask that indicate which components to take from each vector in the population.
Returns
-------
self: Population
"""
self.genotype_matrix[:, mask] = sliced_pop.genotype_matrix
self.fitness_calculated[:] = False
if self.best is None or (sliced_pop.best is not None and self.best_fitness < sliced_pop.best_fitness):
self.best = sliced_pop.best
self.best_fitness = sliced_pop.best_fitness
self.best_objective = sliced_pop.best_objective
logger.debug("Applied precomputed slice from population.")
return self
[docs]
@staticmethod
def join_populations(population1: Population, population2: Population) -> Population:
"""Concatenate two populations into a new one.
Parameters
----------
population1 : Population
First population.
population2 : Population
Second population.
Returns
-------
Population
A new population containing all individuals from both inputs.
"""
joined_genotype_matrix = np.concatenate((population1.genotype_matrix, population2.genotype_matrix), axis=0)
joined_pop = Population(population1.objfunc, joined_genotype_matrix, encoding=population1.encoding)
joined_pop.historical_best_matrix = np.concatenate((population1.historical_best_matrix, population2.historical_best_matrix), axis=0)
joined_pop.historical_best_fitness = np.concatenate((population1.historical_best_fitness, population2.historical_best_fitness))
joined_pop.fitness_calculated = np.concatenate((population1.fitness_calculated, population2.fitness_calculated))
joined_pop.fitness = np.concatenate((population1.fitness, population2.fitness))
joined_pop.objective = np.concatenate((population1.objective, population2.objective))
if population1.best is None or (population2.best is not None and population1.best_fitness > population2.best_fitness):
joined_pop.best = population1.best
joined_pop.best_fitness = population1.best_fitness
joined_pop.best_objective = population1.best_objective
else:
joined_pop.best = population2.best
joined_pop.best_fitness = population2.best_fitness
joined_pop.best_objective = population2.best_objective
logger.debug("Merged two populations into one.")
return joined_pop
[docs]
def join(self, other_population: Population) -> Population:
"""
Adds to the current population the individuals of the input population.
Parameters
----------
other_population: Population
Population that will be concatenated with the current one.
Returns
-------
joined_populations: Population
A population containing both the individuals from the current population and the ones from the input population.
"""
self.genotype_matrix = np.concatenate((self.genotype_matrix, other_population.genotype_matrix), axis=0)
self.population_size += other_population.genotype_matrix.shape[0]
self.historical_best_matrix = np.concatenate((self.historical_best_matrix, other_population.historical_best_matrix), axis=0)
self.historical_best_fitness = np.concatenate((self.historical_best_fitness, other_population.historical_best_fitness))
self.fitness_calculated = np.concatenate((self.fitness_calculated, other_population.fitness_calculated), axis=0)
self.fitness = np.concatenate((self.fitness, other_population.fitness))
self.objective = np.concatenate((self.objective, other_population.objective))
if self.best is None or (other_population.best is not None and self.best_fitness < other_population.best_fitness):
self.best = other_population.best
self.best_fitness = other_population.best_fitness
self.best_objective = other_population.best_objective
logger.debug("Merged one population into the current one.")
return self
[docs]
def sort_population(self) -> Population:
"""
Sorts the individuals by fitness.
Returns
-------
self: Population
"""
fitness_order = np.argsort(self.fitness)
self.genotype_matrix = self.genotype_matrix[fitness_order, :]
self.historical_best_matrix = self.historical_best_matrix[fitness_order, :]
self.historical_best_fitness = self.historical_best_fitness[fitness_order]
self.fitness_calculated = self.fitness_calculated[fitness_order]
self.fitness = self.fitness[fitness_order]
self.objective = self.objective[fitness_order]
logger.debug("Sorted population.")
return self
[docs]
def update_best_from_parents(self, parents: Population) -> Population:
"""Update the best solution if a better one exists in *parents*.
Parameters
----------
parents : Population
Population whose best individual may improve the current one.
Returns
-------
Population
``self``, with possibly updated ``best``, ``best_fitness``,
and ``best_objective``.
"""
if self.best is None or (parents.best is not None and self.best_fitness < parents.best_fitness):
self.best = parents.best
self.best_fitness = parents.best_fitness
self.best_objective = parents.best_objective
return self
[docs]
def step(self, _progress: float = 0) -> Population:
"""
Updates the best solution in the population.
Returns
-------
self: Population
"""
if self.best is None or np.any(self.best_fitness < self.fitness):
best_idx = np.argmax(self.fitness)
self.best = self.genotype_matrix[best_idx, :]
self.best_fitness = self.fitness[best_idx]
self.best_objective = self.objective[best_idx]
return self
[docs]
def repeat(self, amount: int = 2) -> Population:
"""
Duplicates the individuals of the population.
Parameters
----------
amount: int, optional
The amount of times to repeat the individuals in the population.
Returns
-------
repeated_population: Population
"""
genotype_matrix = np.tile(self.genotype_matrix, (amount, 1))
fitness_calculated = np.tile(self.fitness_calculated, (amount))
fitness = np.tile(self.fitness, (amount))
objective = np.tile(self.objective, (amount))
best = self.best
best_fitness = self.best_fitness
best_objective = self.best_objective
new_population = Population(self.objfunc, genotype_matrix, encoding=self.encoding)
new_population.fitness_calculated = fitness_calculated
new_population.fitness = fitness
new_population.objective = objective
new_population.best = best
new_population.best_fitness = best_fitness
new_population.best_objective = best_objective
logger.debug("Added %d copies of each individual to the population", amount)
return new_population
[docs]
def calculate_fitness(self, parallel: bool = False, threads: int = 8) -> VectorLike:
"""
Calculates the fitness of the individual if it has not been calculated before
Parameters
----------
parallel: bool, optional
Whether to evaluate the individuals in the population in parallel.
threads: int, optional
Number of processes to use at once if calculating the fitness in parallel.
Returns
-------
fitness: ndarray
"""
prev_fitness = copy(self.fitness)
# Objective values and fitness values are modified in place after the call
self.fitness = self.objfunc.fitness(self)
improved_mask = prev_fitness < self.fitness
self.historical_best_fitness[improved_mask] = self.fitness[improved_mask]
self.historical_best_matrix[improved_mask, :] = self.genotype_matrix[improved_mask, :]
if self.best is None or np.any(self.fitness > self.best_fitness):
best_idx = np.argmax(self.fitness)
self.best = self.genotype_matrix[best_idx]
self.best_fitness = self.fitness[best_idx]
self.best_objective = self.objective[best_idx]
logger.debug("Updated the fitness of the individuals.")
return self
[docs]
def repair_solutions(self) -> Population:
"""
Repairs the solutions in the population.
Returns
-------
self: Population
"""
self.genotype_matrix = self.objfunc.repair_solution(self.genotype_matrix)
return self
[docs]
def decode(self, encoding: Optional[Encoding] = None) -> Iterable:
"""
Return the population passed through the decoding function defined in the encoding.
Returns
-------
decoded_population: Any
"""
if encoding is None:
encoding = self.encoding
return self.encoding.decode(self.genotype_matrix)
[docs]
def decode_params(self, encoding: Optional[Encoding] = None) -> Iterable:
"""Decode the auxiliary parameters stored in the genotype.
Only works with :class:`ParameterExtendingEncoding`.
Parameters
----------
encoding : Encoding, optional
Encoding to use; defaults to ``self.encoding``.
Returns
-------
dict or None
Dictionary of parameter arrays, or ``None`` if the encoding
does not support extended parameters.
"""
if encoding is None:
encoding = self.encoding
if isinstance(self.encoding, ParameterExtendingEncoding):
return self.encoding.decode_params(self.genotype_matrix)
else:
return None
[docs]
def encode(self, encoding: Optional[Encoding] = None) -> MatrixLike:
"""Encode the current population using the given encoding.
Parameters
----------
encoding : Encoding, optional
Encoding to use; defaults to ``self.encoding``.
Returns
-------
MatrixLike
The encoded genotype matrix.
"""
if encoding is None:
encoding = self.encoding
return encoding.encode(self.genotype_matrix)
[docs]
def get_state(self) -> dict:
"""Return a dictionary with the current population state.
Returns
-------
dict
Keys include ``genotype_matrix``, ``fitness``, ``objective``,
historical bests, and the best individual.
"""
data = {
"genotype_matrix": self.genotype_matrix,
"fitness": self.fitness,
"objective": self.objective,
"historical_best_matrix": self.genotype_matrix,
"historical_best_fitness": self.historical_best_fitness,
"best": self.best,
"best_fitness": self.best_fitness,
"best_objective": self.best_objective,
"encoding": type(self.encoding).__name__,
}
return data
[docs]
def debug_repr(self, max_solutions: int = 5, max_vars: int = 5) -> str:
"""Return a compact string representation for debugging.
Parameters
----------
max_solutions : int, optional
Maximum number of rows to include in the preview.
max_vars : int, optional
Maximum number of columns to include in the preview.
Returns
-------
str
"""
genotype_matrix = self.genotype_matrix
shape = genotype_matrix.shape
if genotype_matrix.size == 0:
matrix_preview = "[]"
elif genotype_matrix.size > max_solutions * max_vars:
n_sol = min(max_solutions, shape[0])
n_vars = min(max_vars, shape[1])
preview = genotype_matrix[:n_sol, :n_vars]
matrix_preview = f"array({preview}, shape={shape}) ... " f"(showing first {n_sol} rows, {n_vars} cols)"
else:
matrix_preview = f"array({genotype_matrix})"
return (
f"Population(\n"
f" objfunc={self.objfunc.name},\n"
f" size={self.population_size}, dims={self.dimension},\n"
f" fitness=[{self.fitness.min():.3e}, {self.fitness.max():.3e}],\n"
f" objective=[{self.objective.min():.3e}, {self.objective.max():.3e}],\n"
f" best_fitness={self.best_fitness:.3e},\n"
f" best_objective={self.best_objective:.3e},\n"
f" genotype_matrix={matrix_preview}\n"
f")"
)