"""Divides the population into species based on genomic distances."""
from __future__ import division
import math
import sys
import warnings
# Namespace is used for species so reproduction,
# stagnation won't trample over each other's attribute names
from argparse import Namespace
from itertools import count
from neat.math_util import mean, stdev, tmean, NORM_EPSILON
from neat.six_util import iteritems, iterkeys, itervalues
from neat.config import ConfigParameter, DefaultClassConfig
[docs]class Species(object):
def __init__(self, key, generation):
self.key = key
self.created = generation
self.representative = None
self.members = {}
self.reproduction_namespace = Namespace()
self.reproduction_namespace.adjusted_fitness = None
self.stagnation_namespace = Namespace()
self.stagnation_namespace.fitness = None
self.stagnation_namespace.last_improved = generation
self.stagnation_namespace.fitness_history = []
def _getAdjustedFitness(self): # pragma: no cover
"""
Backwards compatibility wrapper for species.adjusted_fitness;
use species.reproduction_namespace.adjusted_fitness instead.
"""
warnings.warn("Use species.reproduction_namespace for adjusted_fitness",
DeprecationWarning, stacklevel=2)
return self.reproduction_namespace.adjusted_fitness
def _setAdjustedFitness(self,value): # pragma: no cover
if not isinstance(value, float):
raise TypeError(
"Adjusted_fitness ({0!r}) should be a float, not {1!s}".format(
value, type(value)))
warnings.warn("Use species.reproduction_namespace for adjusted_fitness",
DeprecationWarning, stacklevel=2)
self.reproduction_namespace.adjusted_fitness = value
def _delAdjustedFitness(self): # pragma: no cover
warnings.warn("Use species.reproduction_namespace for adjusted_fitness",
DeprecationWarning, stacklevel=2)
self.reproduction_namespace.adjusted_fitness = None
adjusted_fitness = property(_getAdjustedFitness,
_setAdjustedFitness,
_delAdjustedFitness)
def _getFitness(self): # pragma: no cover
"""
Backwards compatibility wrapper for species.fitness;
use species.stagnation_namespace.fitness instead.
"""
warnings.warn("Use species.stagnation_namespace for fitness",
DeprecationWarning, stacklevel=2)
return self.stagnation_namespace.fitness
def _setFitness(self,value): # pragma: no cover
if not isinstance(value, float):
raise TypeError(
"Fitness ({0!r}) should be a float, not {1!s}".format(
value, type(value)))
warnings.warn("Use species.stagnation_namespace for fitness",
DeprecationWarning, stacklevel=2)
self.stagnation_namespace.fitness = value
def _delFitness(self): # pragma: no cover
warnings.warn("Use species.stagnation_namespace for fitness",
DeprecationWarning, stacklevel=2)
self.stagnation_namespace.fitness = None
fitness = property(_getFitness,
_setFitness,
_delFitness)
def _getLastImproved(self): # pragma: no cover
"""
Backwards compatibility wrapper for species.last_improved;
use species.stagnation_namespace.last_improved instead.
"""
warnings.warn("Use species.stagnation_namespace for last_improved",
DeprecationWarning, stacklevel=2)
return self.stagnation_namespace.last_improved
def _setLastImproved(self, value): # pragma: no cover
if not isinstance(value, int):
raise TypeError(
"Last_improved ({0!r}) should be an int, not {1!s}".format(
value, type(value)))
warnings.warn("Use species.stagnation_namespace for last_improved",
DeprecationWarning, stacklevel=2)
self.stagnation_namespace.last_improved = value
last_improved = property(_getLastImproved,
_setLastImproved)
def _getFitnessHistory(self): # pragma: no cover
"""
Partial (due to being a list) backwards compatibility
wrapper for species.fitness_history; use
species.stagnation_namespace.last_improved instead
"""
warnings.warn("Use species.stagnation_namespace for fitness_history",
stacklevel=2)
return self.stagnation_namespace.fitness_history
fitness_history = property(_getFitnessHistory)
[docs] def update(self, representative, members):
self.representative = representative
self.members = members
[docs] def get_fitnesses(self):
return [m.fitness for m in itervalues(self.members)]
[docs]class GenomeDistanceCache(object):
def __init__(self, config):
self.distances = {}
self.config = config
self.hits = 0
self.misses = 0
[docs] def __call__(self, genome0, genome1):
g0 = genome0.key
g1 = genome1.key
d = self.distances.get((g0, g1))
if d is None:
# Distance is not already computed.
d = genome0.distance(genome1, self.config)
self.distances[g0, g1] = d
self.distances[g1, g0] = d
self.misses += 1
else:
self.hits += 1
return d
[docs]class DefaultSpeciesSet(DefaultClassConfig):
""" Encapsulates the default speciation scheme. """
def __init__(self, config, reporters, reproduction=None):
# pylint: disable=super-init-not-called
self.species_set_config = config
self.reporters = reporters
self.reproduction = reproduction
self.indexer = count(1)
self.species = {}
self.genome_to_species = {}
self.orig_compatibility_threshold = config.compatibility_threshold
if config.desired_species_num_max < config.desired_species_num_min:
raise ValueError(
"Desired_species_num_max {0:n} but num_min {1:n}".format(
config.desired_species_num_max, config.desired_species_num_min))
if ((config.compatibility_threshold_adjust.lower() != 'fixed') or
(reproduction is not None)):
self.min_pop_seen = 10**sys.float_info.dig
self.max_pop_seen = 0
if reproduction is None: # pragma: no cover
raise RuntimeError(
"Need reproduction instance for species info (threshold_adjust {0!s})".format(
self.compatibility_threshold_adjust))
self.threshold_adjust_dict = reproduction.get_species_size_info()
if ((config.compatibility_threshold_adjust.lower() != 'fixed') or
(self.threshold_adjust_dict['genome_config'] is not None)):
if self.threshold_adjust_dict['genome_config'] is None: # pragma: no cover
raise RuntimeError(
"Need genome_config for species info (threshold_adjust {0!s})".format(
self.compatibility_threshold_adjust))
self.threshold_adjust_dict.update(
self.threshold_adjust_dict['genome_config'].get_compatibility_info())
# below is based on configuration values from Stanley's website as
# compared to advised adjustment size.
self.base_threshold_adjust = 0.3*max(
self.threshold_adjust_dict['weight_coefficient'],
(self.threshold_adjust_dict['disjoint_coefficient']/2),
(self.orig_compatibility_threshold/6))
# for max_stagnation
self.threshold_adjust_dict.update(
self.threshold_adjust_dict['stagnation'].get_stagnation_info())
[docs] @classmethod
def parse_config(cls, param_dict):
return DefaultClassConfig(param_dict,
[ConfigParameter('compatibility_threshold', float),
ConfigParameter('compatibility_threshold_adjust',
str, 'fixed'),
ConfigParameter('desired_species_num_max',
int, 0, default_ok=True),
ConfigParameter('desired_species_num_min',
int, 0, default_ok=True)])
def _find_desired_num_species(self, pop_size_high, pop_size_low): # DOCUMENT!
config = self.species_set_config
max_num_usable = math.ceil(pop_size_high/self.threshold_adjust_dict['min_size'])
if max_num_usable < 2:
raise ValueError(
"Pop_size {0:n} is too low for effective min species size {1:n}".format(
pop_size_high, self.threshold_adjust_dict['min_size']))
poss_num_high = min(max_num_usable, # just in case
math.ceil(pop_size_high/
self.threshold_adjust_dict['min_OK_size']))
if config.desired_species_num_min > 2: # NEED TEST!
max_use = max((poss_num_high-1),2)
if config.desired_species_num_min > max_use: # NEED TEST!
warnings.warn(
"Desired_species_num_min {0:n} is too high for pop_size {1:n};".format(
config.desired_species_num_min,pop_size_high)
+ " adjusting to max {0:n}".format(max_use))
config.desired_species_num_min = max_use
elif ((config.desired_species_num_min != 0)
and (config.desired_species_num_min != 2)): # NEED TEST!
warnings.warn(
"Desired_species_num_min of {0:n} not valid; treating as 0 (autoconfigure)".format(
config.desired_species_num_min))
config.desired_species_num_min = 0
if config.desired_species_num_max > 2:
if config.desired_species_num_max > max_num_usable: # NEED TEST!
warnings.warn(
"Desired_species_num_max {0:n} is too high for pop_size {1:n};".format(
config.desired_species_num_max,pop_size_high)
+ " adjusting to max {0:n}".format(max_num_usable))
config.desired_species_num_max = max_num_usable
elif config.desired_species_num_max != 0: # NEED TEST!
warnings.warn(
"Desired_species_num_max of {0:n} not valid; treating as 0 (autoconfigure)".format(
config.desired_species_num_max))
config.desired_species_num_max = 0
if config.desired_species_num_min:
to_return_low = config.desired_species_num_min
else:
to_return_low = max(2,math.floor(pop_size_low/
self.threshold_adjust_dict['min_good_size']))
if config.desired_species_num_max > to_return_low:
to_return_high = config.desired_species_num_max
elif config.desired_species_num_max: # NEED TEST!
warnings.warn(
"Desired_species_num_max {0:n} is too low for min (autoconfigure?);".format(
config.desired_species_num_max)
+ " adjusting to min+1 {0:n}".format(to_return_low+1))
config.desired_species_num_max = to_return_low+1
to_return_high = to_return_low+1
elif poss_num_high < 2:
raise ValueError(
"Pop_size {0:n} is too low to determine desired num species;".format(pop_size_high)
+ " need minimum of {0:n} given min_OK_size {1:n}".format(
(2*self.threshold_adjust_dict['min_OK_size']),
self.threshold_adjust_dict['min_OK_size']))
else:
to_return_high = min(max_num_usable,max(poss_num_high,(to_return_low+1)))
return (to_return_high,to_return_low)
def _adjust_compatibility_threshold(self, increase, curr_tmean, max_rep_dist): # DOCUMENT!
old_threshold = self.species_set_config.compatibility_threshold
if increase:
mult_threshold = 1.05*old_threshold
add_threshold = old_threshold + self.base_threshold_adjust
if old_threshold >= self.orig_compatibility_threshold:
new_threshold = min(mult_threshold,add_threshold)
else:
new_threshold = max(min(mult_threshold,add_threshold),
min(self.orig_compatibility_threshold,
max(mult_threshold,add_threshold)))
if (curr_tmean
< self.orig_compatibility_threshold) and (new_threshold
< curr_tmean):
new_threshold = min(curr_tmean,((new_threshold
+ self.orig_compatibility_threshold)/2))
which = 'increased'
else:
div_threshold = old_threshold/1.05
sub_threshold = old_threshold - self.base_threshold_adjust
if old_threshold <= self.orig_compatibility_threshold:
new_threshold = max(div_threshold,sub_threshold)
else:
new_threshold = min(max(div_threshold,sub_threshold),
max(self.orig_compatibility_threshold,
min(div_threshold,sub_threshold)))
new_threshold = max(new_threshold,NORM_EPSILON,min(max_rep_dist,old_threshold))
if new_threshold >= old_threshold: # pragma: no cover
return
which = 'decreased'
self.species_set_config.compatibility_threshold = new_threshold
self.reporters.info(
"Compatibility threshold (orig {0:n}) {1!s} by {2:n} to {3:n} (from {4:n})".format(
self.orig_compatibility_threshold,
which,
abs(self.species_set_config.compatibility_threshold-old_threshold),
self.species_set_config.compatibility_threshold,
old_threshold))
[docs] def speciate(self, config, population, generation):
"""
Place genomes into species by genetic similarity.
Note that this method assumes the current representatives of the species are from the old
generation, and that after speciation has been performed, the old representatives should be
dropped and replaced with representatives from the new generation. If you violate this
assumption, you should make sure other necessary parts of the code are updated to reflect
the new behavior.
"""
if not isinstance(population, dict): # TEST NEEDED!
raise TypeError("Population ({0!r}) should be a dict, not {1!s}".format(
population, type(population)))
# Find the best representatives for each existing species.
unspeciated = set(iterkeys(population))
distances = GenomeDistanceCache(config.genome_config)
gid_min_dist = dict((gid, sys.float_info.max) for gid in unspeciated)
new_representatives = {}
new_members = {}
species_list = list(iterkeys(self.species))
species_list.sort(reverse=True,
key=lambda x: self.species[x].reproduction_namespace.adjusted_fitness)
max_rep_dist = 0.0
for sid in species_list:
s = self.species[sid]
candidates = []
for gid in unspeciated:
g = population[gid]
d = distances(s.representative, g)
candidates.append((d, g))
gid_min_dist[gid] = min(gid_min_dist[gid], d)
# The new representative is the genome closest to the current representative.
rdist, new_rep = min(candidates, key=lambda x: x[0])
max_rep_dist = max(max_rep_dist, rdist)
new_rid = new_rep.key
if rdist >= self.species_set_config.compatibility_threshold: # pragma: no cover
warnings.warn(
"Closest genome {0:n} to species {1:n}: dist {2:n} (thresh {3:n})".format(
new_rid, sid, rdist, self.species_set_config.compatibility_threshold))
new_representatives[sid] = new_rid
new_members[sid] = [new_rid]
unspeciated.remove(new_rid)
self.reporters.info(
"Furthest rep from old species representatives was at {0:n} ({1:n}%)".format(
max_rep_dist,
(100.0*max_rep_dist/self.species_set_config.compatibility_threshold)))
unspeciated_list = list(unspeciated)
# Putting most distant from any others first, to serve as clustering seeds
unspeciated_list.sort(reverse=True, key=lambda x: gid_min_dist[x])
# Partition population into species based on genetic similarity.
for gid in unspeciated_list:
g = population[gid]
# Find the species with the most similar representative.
candidates = []
for sid, rid in iteritems(new_representatives):
rep = population[rid]
d = distances(rep, g)
if d < self.species_set_config.compatibility_threshold:
candidates.append((d, sid))
if candidates:
ignored_sdist, sid = min(candidates, key=lambda x: x[0])
new_members[sid].append(gid)
else:
# No species is similar enough, create a new species, using
# this genome as its representative.
sid = next(self.indexer)
new_representatives[sid] = gid
new_members[sid] = [gid]
# Update species collection based on new speciation.
self.genome_to_species = {}
for sid, rid in iteritems(new_representatives):
s = self.species.get(sid)
if s is None:
s = Species(sid, generation)
self.species[sid] = s
members = new_members[sid]
for gid in members:
self.genome_to_species[gid] = sid
member_dict = dict((gid, population[gid]) for gid in members)
s.update(population[rid], member_dict)
gdmean = mean(itervalues(distances.distances))
gdtmean = tmean(itervalues(distances.distances))
gdstdev = stdev(itervalues(distances.distances))
self.reporters.info(
'Mean genetic distance {0:n}, 50% trimmed mean {1:n}, standard deviation {2:n}'.format(
gdmean, gdtmean, gdstdev))
if self.species_set_config.compatibility_threshold_adjust.lower() == 'number':
if generation < 1:
self.reporters.info(
"Min_size is {0:n}, min_OK_size is {1:n}, min_good_size is {2:n}".format(
self.threshold_adjust_dict['min_size'],
self.threshold_adjust_dict['min_OK_size'],
self.threshold_adjust_dict['min_good_size']))
self.min_pop_seen = min(self.min_pop_seen,len(population))
self.max_pop_seen = max(self.max_pop_seen,len(population))
desired_num_species_high, desired_num_species_low = self._find_desired_num_species(
self.max_pop_seen, self.min_pop_seen)
if len(self.species) < desired_num_species_low:
self.reporters.info(
"Species num {0:n} below desired minimum {1:n}".format(
len(self.species), desired_num_species_low))
self._adjust_compatibility_threshold(increase=False,
curr_tmean=gdtmean,
max_rep_dist=max_rep_dist)
elif len(self.species) > desired_num_species_high:
self.reporters.info(
"Species num {0:n} above desired maximum {1:n}".format(
len(self.species), desired_num_species_high))
if (('max_stagnation' not in self.threshold_adjust_dict)
or (generation >= self.threshold_adjust_dict['max_stagnation'])
or (self.species_set_config.compatibility_threshold <
self.orig_compatibility_threshold)
or (self.species_set_config.compatibility_threshold <=
max_rep_dist)):
self._adjust_compatibility_threshold(increase=True,
curr_tmean=gdtmean,
max_rep_dist=max_rep_dist)
elif ((self.species_set_config.compatibility_threshold <= max_rep_dist)
and (len(self.species) > desired_num_species_low)):
self._adjust_compatibility_threshold(increase=True,
curr_tmean=gdtmean,
max_rep_dist=max_rep_dist)
elif self.species_set_config.compatibility_threshold_adjust.lower() != 'fixed':
raise ValueError(
"Unknown compatibility_threshold_adjust {!r}".format(
self.species_set_config.compatibility_threshold_adjust))
[docs] def get_species_id(self, individual_id):
return self.genome_to_species[individual_id]
[docs] def get_species(self, individual_id): # NEED TEST!
sid = self.genome_to_species[individual_id]
return self.species[sid]