Module nkcs.ea
Evolutionary algorithms for NKCS.
Expand source code
#!/usr/bin/python3
#
# Copyright (C) 2019--2024 Richard Preen <rpreen@gmail.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
"""Evolutionary algorithms for NKCS."""
from __future__ import annotations
import logging
import random
from copy import deepcopy
from operator import attrgetter
from typing import Final
import numpy as np
from .constants import Constants as Cons
from .nkcs import NKCS
from .surrogate import Model
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("ea")
class Ind:
"""A binary NKCS individual within an evolutionary algorithm."""
def __init__(self) -> None:
"""Initialise a random individual."""
self.fitness: float = 0 #: fitness
self.genome: np.ndarray = np.random.randint(2, size=Cons.N) #: genome
def to_string(self) -> str:
"""Return a string representation of an individual."""
return (
",".join([str(gene) for gene in self.genome]) + f",{self.fitness}"
)
def mutate(self) -> None:
"""Mutate an individual."""
for i, gene in enumerate(self.genome):
if np.random.uniform(low=0, high=1) < Cons.P_MUT:
self.genome[i] = 1 if gene == 0 else 1
def uniform_crossover(self, parent: Ind) -> None:
"""Perform uniform crossover."""
for i in range(len(self.genome)):
if np.random.randint(2):
self.genome[i] = parent.genome[i]
class EA:
"""NKCS evolutionary algorithm."""
def __init__(self) -> None:
"""Initialise the evolutionary algorithm."""
self.evals: int = 0 #: number of evaluations performed
self.archive: list[list[Ind]] = [[] for _ in range(Cons.S)] #: archive
self.pop: list[list[Ind]] = [] #: current species populations
def run_initial(self, nkcs: NKCS) -> None:
"""Generate new random initial populations."""
# create the initial populations
for _ in range(Cons.S):
species = [Ind() for _ in range(Cons.P)]
self.pop.append(species)
# select random initial partners
rpartners: list[Ind] = []
team: list[Ind] = []
for s in range(Cons.S):
r = self.pop[s][np.random.randint(0, Cons.P)]
rpartners.append(r)
team.append(r)
# evaluate initial populations
for s in range(Cons.S):
for p in range(Cons.P):
team[s] = self.pop[s][p]
genomes = [ind.genome for ind in team]
team[s].fitness = nkcs.calc_team_fit(genomes)
self.__update_archive(s, team[s])
self.evals += 1
team[s] = rpartners[s]
def __eval_team(self, nkcs: NKCS, team: list[Ind]) -> None:
"""Evaluate a candidate team.
Assign fitness to each individual if it's the best seen.
"""
genomes = [ind.genome for ind in team]
team_fit: Final[float] = nkcs.calc_team_fit(genomes)
for s in range(Cons.S):
team[s].fitness = max(team[s].fitness, team_fit)
self.evals += 1 # total team evals performed
def __get_team_best(self, s: int, child: Ind) -> list[Ind]:
"""Return a team assembled with the best partners for a child."""
team: list[Ind] = []
for sp in range(Cons.S):
if sp != s:
team.append(self.get_best(sp))
else:
team.append(child)
return team
def __update_archive(self, s: int, p: Ind) -> None:
"""Add an evaluated individual to the species archive."""
self.archive[s].append(p)
if len(self.archive[s]) > Cons.MAX_ARCHIVE:
self.archive[s].pop(0)
def update_perf(
self, evals: np.ndarray, perf_best: np.ndarray, perf_avg: np.ndarray
) -> None:
"""Update current performance tracking."""
if self.evals % (Cons.P * Cons.S) == 0:
g: Final[int] = int((self.evals / (Cons.P * Cons.S)) - 1)
evals[g] = self.evals
perf_best[g] = np.max([self.get_best_fit(s) for s in range(Cons.S)])
perf_avg[g] = np.mean([self.get_avg_fit(s) for s in range(Cons.S)])
def __create_offspring(self, p1: Ind, p2: Ind) -> Ind:
"""Create and returns a new offspring."""
child = deepcopy(p1)
child.fitness = 0
if np.random.uniform(low=0, high=1) < Cons.P_CROSS:
child.uniform_crossover(p2)
child.mutate()
return child
def __add_offspring(self, s: int, child: Ind) -> None:
"""Add an offspring to the population and archive."""
if Cons.REPLACE == "tournament":
self.pop[s].remove(self.__neg_tournament(s))
self.pop[s].append(deepcopy(child))
else:
worst = self.get_worst(s)
if worst.fitness < child.fitness:
self.pop[s].remove(worst)
self.pop[s].append(deepcopy(child))
self.__update_archive(s, child)
def __tournament(self, s: int) -> Ind:
"""Return a parent selected by tournament."""
tournament = random.sample(self.pop[s], Cons.T_SIZE)
return max(tournament, key=attrgetter("fitness"))
def __neg_tournament(self, s: int) -> Ind:
"""Return an individual selected by negative tournament."""
tournament = random.sample(self.pop[s], Cons.T_SIZE)
return min(tournament, key=attrgetter("fitness"))
def get_worst(self, s: int) -> Ind:
"""Return the index of the least fit individual in the population."""
return min(self.pop[s], key=attrgetter("fitness"))
def get_best(self, s: int) -> Ind:
"""Return the index of the best individual in the population."""
return max(self.pop[s], key=attrgetter("fitness"))
def get_best_fit(self, s: int) -> float:
"""Return the fitness of the best individual in a given species."""
return self.get_best(s).fitness
def get_avg_fit(self, s: int) -> float:
"""Return the average fitness of a given species."""
fitnesses: list[float] = [p.fitness for p in self.pop[s]]
mean = np.mean(fitnesses)
return float(mean)
def print_archive(self, s: int) -> None:
"""Print the archived individuals."""
for p in self.archive[s]:
logger.info(p.to_string())
def print_pop(self) -> None:
"""Print the current populations."""
for s in range(Cons.S):
logger.info("Species %d", s)
for p in self.pop[s]:
logger.info("%s", p.to_string())
def run_ea(
self, nkcs: NKCS, evals: np.ndarray, pbest: np.ndarray, pavg: np.ndarray
) -> None:
"""Execute a standard EA - partnering with the best candidates."""
while self.evals < Cons.MAX_EVALS:
for s in range(Cons.S):
parent1 = self.__tournament(s)
parent2 = self.__tournament(s)
child = self.__create_offspring(parent1, parent2)
team = self.__get_team_best(s, child)
self.__eval_team(nkcs, team)
self.__add_offspring(s, child)
self.update_perf(evals, pbest, pavg)
def __create_candidates(self, s: int) -> np.ndarray:
"""Return M offspring genomes generated by 2 parents."""
p1: Ind = self.__tournament(s)
p2: Ind = self.__tournament(s)
return np.asarray(
[self.__create_offspring(p1, p2).genome for _ in range(Cons.M)]
)
def run_sea(
self, nkcs: NKCS, evals: np.ndarray, pbest: np.ndarray, pavg: np.ndarray
) -> None:
"""Execute a surrogate-assisted EA."""
while self.evals < Cons.MAX_EVALS:
for s in range(Cons.S):
X_train = np.asarray([p.genome for p in self.archive[s]])
y_train = np.asarray([p.fitness for p in self.archive[s]])
X_predict = self.__create_candidates(s)
model = Model()
model.fit(X_train, y_train)
scores = model.score(X_predict)
# evaluate single best candidate
child = Ind()
child.genome = np.copy(X_predict[np.argmax(scores)])
team = self.__get_team_best(s, child)
self.__eval_team(nkcs, team)
self.__add_offspring(s, child)
self.update_perf(evals, pbest, pavg)
Classes
class EA
-
NKCS evolutionary algorithm.
Initialise the evolutionary algorithm.
Expand source code
class EA: """NKCS evolutionary algorithm.""" def __init__(self) -> None: """Initialise the evolutionary algorithm.""" self.evals: int = 0 #: number of evaluations performed self.archive: list[list[Ind]] = [[] for _ in range(Cons.S)] #: archive self.pop: list[list[Ind]] = [] #: current species populations def run_initial(self, nkcs: NKCS) -> None: """Generate new random initial populations.""" # create the initial populations for _ in range(Cons.S): species = [Ind() for _ in range(Cons.P)] self.pop.append(species) # select random initial partners rpartners: list[Ind] = [] team: list[Ind] = [] for s in range(Cons.S): r = self.pop[s][np.random.randint(0, Cons.P)] rpartners.append(r) team.append(r) # evaluate initial populations for s in range(Cons.S): for p in range(Cons.P): team[s] = self.pop[s][p] genomes = [ind.genome for ind in team] team[s].fitness = nkcs.calc_team_fit(genomes) self.__update_archive(s, team[s]) self.evals += 1 team[s] = rpartners[s] def __eval_team(self, nkcs: NKCS, team: list[Ind]) -> None: """Evaluate a candidate team. Assign fitness to each individual if it's the best seen. """ genomes = [ind.genome for ind in team] team_fit: Final[float] = nkcs.calc_team_fit(genomes) for s in range(Cons.S): team[s].fitness = max(team[s].fitness, team_fit) self.evals += 1 # total team evals performed def __get_team_best(self, s: int, child: Ind) -> list[Ind]: """Return a team assembled with the best partners for a child.""" team: list[Ind] = [] for sp in range(Cons.S): if sp != s: team.append(self.get_best(sp)) else: team.append(child) return team def __update_archive(self, s: int, p: Ind) -> None: """Add an evaluated individual to the species archive.""" self.archive[s].append(p) if len(self.archive[s]) > Cons.MAX_ARCHIVE: self.archive[s].pop(0) def update_perf( self, evals: np.ndarray, perf_best: np.ndarray, perf_avg: np.ndarray ) -> None: """Update current performance tracking.""" if self.evals % (Cons.P * Cons.S) == 0: g: Final[int] = int((self.evals / (Cons.P * Cons.S)) - 1) evals[g] = self.evals perf_best[g] = np.max([self.get_best_fit(s) for s in range(Cons.S)]) perf_avg[g] = np.mean([self.get_avg_fit(s) for s in range(Cons.S)]) def __create_offspring(self, p1: Ind, p2: Ind) -> Ind: """Create and returns a new offspring.""" child = deepcopy(p1) child.fitness = 0 if np.random.uniform(low=0, high=1) < Cons.P_CROSS: child.uniform_crossover(p2) child.mutate() return child def __add_offspring(self, s: int, child: Ind) -> None: """Add an offspring to the population and archive.""" if Cons.REPLACE == "tournament": self.pop[s].remove(self.__neg_tournament(s)) self.pop[s].append(deepcopy(child)) else: worst = self.get_worst(s) if worst.fitness < child.fitness: self.pop[s].remove(worst) self.pop[s].append(deepcopy(child)) self.__update_archive(s, child) def __tournament(self, s: int) -> Ind: """Return a parent selected by tournament.""" tournament = random.sample(self.pop[s], Cons.T_SIZE) return max(tournament, key=attrgetter("fitness")) def __neg_tournament(self, s: int) -> Ind: """Return an individual selected by negative tournament.""" tournament = random.sample(self.pop[s], Cons.T_SIZE) return min(tournament, key=attrgetter("fitness")) def get_worst(self, s: int) -> Ind: """Return the index of the least fit individual in the population.""" return min(self.pop[s], key=attrgetter("fitness")) def get_best(self, s: int) -> Ind: """Return the index of the best individual in the population.""" return max(self.pop[s], key=attrgetter("fitness")) def get_best_fit(self, s: int) -> float: """Return the fitness of the best individual in a given species.""" return self.get_best(s).fitness def get_avg_fit(self, s: int) -> float: """Return the average fitness of a given species.""" fitnesses: list[float] = [p.fitness for p in self.pop[s]] mean = np.mean(fitnesses) return float(mean) def print_archive(self, s: int) -> None: """Print the archived individuals.""" for p in self.archive[s]: logger.info(p.to_string()) def print_pop(self) -> None: """Print the current populations.""" for s in range(Cons.S): logger.info("Species %d", s) for p in self.pop[s]: logger.info("%s", p.to_string()) def run_ea( self, nkcs: NKCS, evals: np.ndarray, pbest: np.ndarray, pavg: np.ndarray ) -> None: """Execute a standard EA - partnering with the best candidates.""" while self.evals < Cons.MAX_EVALS: for s in range(Cons.S): parent1 = self.__tournament(s) parent2 = self.__tournament(s) child = self.__create_offspring(parent1, parent2) team = self.__get_team_best(s, child) self.__eval_team(nkcs, team) self.__add_offspring(s, child) self.update_perf(evals, pbest, pavg) def __create_candidates(self, s: int) -> np.ndarray: """Return M offspring genomes generated by 2 parents.""" p1: Ind = self.__tournament(s) p2: Ind = self.__tournament(s) return np.asarray( [self.__create_offspring(p1, p2).genome for _ in range(Cons.M)] ) def run_sea( self, nkcs: NKCS, evals: np.ndarray, pbest: np.ndarray, pavg: np.ndarray ) -> None: """Execute a surrogate-assisted EA.""" while self.evals < Cons.MAX_EVALS: for s in range(Cons.S): X_train = np.asarray([p.genome for p in self.archive[s]]) y_train = np.asarray([p.fitness for p in self.archive[s]]) X_predict = self.__create_candidates(s) model = Model() model.fit(X_train, y_train) scores = model.score(X_predict) # evaluate single best candidate child = Ind() child.genome = np.copy(X_predict[np.argmax(scores)]) team = self.__get_team_best(s, child) self.__eval_team(nkcs, team) self.__add_offspring(s, child) self.update_perf(evals, pbest, pavg)
Instance variables
var archive
-
archive
var evals
-
number of evaluations performed
var pop
-
current species populations
Methods
def get_avg_fit(self, s: int) ‑> float
-
Return the average fitness of a given species.
Expand source code
def get_avg_fit(self, s: int) -> float: """Return the average fitness of a given species.""" fitnesses: list[float] = [p.fitness for p in self.pop[s]] mean = np.mean(fitnesses) return float(mean)
def get_best(self, s: int) ‑> Ind
-
Return the index of the best individual in the population.
Expand source code
def get_best(self, s: int) -> Ind: """Return the index of the best individual in the population.""" return max(self.pop[s], key=attrgetter("fitness"))
def get_best_fit(self, s: int) ‑> float
-
Return the fitness of the best individual in a given species.
Expand source code
def get_best_fit(self, s: int) -> float: """Return the fitness of the best individual in a given species.""" return self.get_best(s).fitness
def get_worst(self, s: int) ‑> Ind
-
Return the index of the least fit individual in the population.
Expand source code
def get_worst(self, s: int) -> Ind: """Return the index of the least fit individual in the population.""" return min(self.pop[s], key=attrgetter("fitness"))
def print_archive(self, s: int) ‑> None
-
Print the archived individuals.
Expand source code
def print_archive(self, s: int) -> None: """Print the archived individuals.""" for p in self.archive[s]: logger.info(p.to_string())
def print_pop(self) ‑> None
-
Print the current populations.
Expand source code
def print_pop(self) -> None: """Print the current populations.""" for s in range(Cons.S): logger.info("Species %d", s) for p in self.pop[s]: logger.info("%s", p.to_string())
def run_ea(self, nkcs: NKCS, evals: np.ndarray, pbest: np.ndarray, pavg: np.ndarray) ‑> None
-
Execute a standard EA - partnering with the best candidates.
Expand source code
def run_ea( self, nkcs: NKCS, evals: np.ndarray, pbest: np.ndarray, pavg: np.ndarray ) -> None: """Execute a standard EA - partnering with the best candidates.""" while self.evals < Cons.MAX_EVALS: for s in range(Cons.S): parent1 = self.__tournament(s) parent2 = self.__tournament(s) child = self.__create_offspring(parent1, parent2) team = self.__get_team_best(s, child) self.__eval_team(nkcs, team) self.__add_offspring(s, child) self.update_perf(evals, pbest, pavg)
def run_initial(self, nkcs: NKCS) ‑> None
-
Generate new random initial populations.
Expand source code
def run_initial(self, nkcs: NKCS) -> None: """Generate new random initial populations.""" # create the initial populations for _ in range(Cons.S): species = [Ind() for _ in range(Cons.P)] self.pop.append(species) # select random initial partners rpartners: list[Ind] = [] team: list[Ind] = [] for s in range(Cons.S): r = self.pop[s][np.random.randint(0, Cons.P)] rpartners.append(r) team.append(r) # evaluate initial populations for s in range(Cons.S): for p in range(Cons.P): team[s] = self.pop[s][p] genomes = [ind.genome for ind in team] team[s].fitness = nkcs.calc_team_fit(genomes) self.__update_archive(s, team[s]) self.evals += 1 team[s] = rpartners[s]
def run_sea(self, nkcs: NKCS, evals: np.ndarray, pbest: np.ndarray, pavg: np.ndarray) ‑> None
-
Execute a surrogate-assisted EA.
Expand source code
def run_sea( self, nkcs: NKCS, evals: np.ndarray, pbest: np.ndarray, pavg: np.ndarray ) -> None: """Execute a surrogate-assisted EA.""" while self.evals < Cons.MAX_EVALS: for s in range(Cons.S): X_train = np.asarray([p.genome for p in self.archive[s]]) y_train = np.asarray([p.fitness for p in self.archive[s]]) X_predict = self.__create_candidates(s) model = Model() model.fit(X_train, y_train) scores = model.score(X_predict) # evaluate single best candidate child = Ind() child.genome = np.copy(X_predict[np.argmax(scores)]) team = self.__get_team_best(s, child) self.__eval_team(nkcs, team) self.__add_offspring(s, child) self.update_perf(evals, pbest, pavg)
def update_perf(self, evals: np.ndarray, perf_best: np.ndarray, perf_avg: np.ndarray) ‑> None
-
Update current performance tracking.
Expand source code
def update_perf( self, evals: np.ndarray, perf_best: np.ndarray, perf_avg: np.ndarray ) -> None: """Update current performance tracking.""" if self.evals % (Cons.P * Cons.S) == 0: g: Final[int] = int((self.evals / (Cons.P * Cons.S)) - 1) evals[g] = self.evals perf_best[g] = np.max([self.get_best_fit(s) for s in range(Cons.S)]) perf_avg[g] = np.mean([self.get_avg_fit(s) for s in range(Cons.S)])
class Ind
-
A binary NKCS individual within an evolutionary algorithm.
Initialise a random individual.
Expand source code
class Ind: """A binary NKCS individual within an evolutionary algorithm.""" def __init__(self) -> None: """Initialise a random individual.""" self.fitness: float = 0 #: fitness self.genome: np.ndarray = np.random.randint(2, size=Cons.N) #: genome def to_string(self) -> str: """Return a string representation of an individual.""" return ( ",".join([str(gene) for gene in self.genome]) + f",{self.fitness}" ) def mutate(self) -> None: """Mutate an individual.""" for i, gene in enumerate(self.genome): if np.random.uniform(low=0, high=1) < Cons.P_MUT: self.genome[i] = 1 if gene == 0 else 1 def uniform_crossover(self, parent: Ind) -> None: """Perform uniform crossover.""" for i in range(len(self.genome)): if np.random.randint(2): self.genome[i] = parent.genome[i]
Instance variables
var fitness
-
fitness
var genome
-
genome
Methods
def mutate(self) ‑> None
-
Mutate an individual.
Expand source code
def mutate(self) -> None: """Mutate an individual.""" for i, gene in enumerate(self.genome): if np.random.uniform(low=0, high=1) < Cons.P_MUT: self.genome[i] = 1 if gene == 0 else 1
def to_string(self) ‑> str
-
Return a string representation of an individual.
Expand source code
def to_string(self) -> str: """Return a string representation of an individual.""" return ( ",".join([str(gene) for gene in self.genome]) + f",{self.fitness}" )
def uniform_crossover(self, parent: Ind) ‑> None
-
Perform uniform crossover.
Expand source code
def uniform_crossover(self, parent: Ind) -> None: """Perform uniform crossover.""" for i in range(len(self.genome)): if np.random.randint(2): self.genome[i] = parent.genome[i]