Source code for ctrnn

"""Handles the continuous-time recurrent neural network implementation."""
from __future__ import division

from neat.graphs import required_for_output
from neat.six_util import itervalues, iteritems


[docs]class CTRNNNodeEval(object): def __init__(self, time_constant, activation, aggregation, bias, response, links): self.time_constant = time_constant self.activation = activation self.aggregation = aggregation self.bias = bias self.response = response self.links = links
[docs]class CTRNN(object): """Sets up the ctrnn network itself.""" def __init__(self, inputs, outputs, node_evals): self.input_nodes = inputs self.output_nodes = outputs self.node_evals = node_evals self.values = [{}, {}] for v in self.values: for k in inputs + outputs: v[k] = 0.0 for node, ne in iteritems(self.node_evals): v[node] = 0.0 for i, w in ne.links: v[i] = 0.0 self.active = 0 self.time_seconds = 0.0
[docs] def reset(self): self.values = [dict((k, 0.0) for k in v) for v in self.values] self.active = 0 self.time_seconds = 0.0
[docs] def set_node_value(self, node_key, value): for v in self.values: v[node_key] = value
[docs] def get_max_time_step(self): # pragma: no cover # TODO: Compute max time step that is known to be numerically stable for # the current network configuration. # pylint: disable=no-self-use raise NotImplementedError()
[docs] def advance(self, inputs, advance_time, time_step=None): """ Advance the simulation by the given amount of time, assuming that inputs are constant at the given values during the simulated time. """ final_time_seconds = self.time_seconds + advance_time # Use half of the max allowed time step if none is given. if time_step is None: # pragma: no cover time_step = 0.5 * self.get_max_time_step() if len(self.input_nodes) != len(inputs): raise RuntimeError("Expected {0:n} inputs, got {1:n}".format( len(self.input_nodes), len(inputs))) while self.time_seconds < final_time_seconds: dt = min(time_step, final_time_seconds - self.time_seconds) ivalues = self.values[self.active] ovalues = self.values[1 - self.active] self.active = 1 - self.active for i, v in zip(self.input_nodes, inputs): ivalues[i] = v ovalues[i] = v for node_key, ne in iteritems(self.node_evals): node_inputs = [ivalues[i] * w for i, w in ne.links] s = ne.aggregation(node_inputs) z = ne.activation(ne.bias + ne.response * s) ovalues[node_key] += dt / ne.time_constant * (-ovalues[node_key] + z) self.time_seconds += dt ovalues = self.values[1 - self.active] return [ovalues[i] for i in self.output_nodes]
[docs] @staticmethod def create(genome, config, time_constant): """ Receives a genome and returns its phenotype (a CTRNN). """ genome_config = config.genome_config required = required_for_output(genome_config.input_keys, genome_config.output_keys, genome.connections) # Gather inputs and expressed connections. node_inputs = {} for cg in itervalues(genome.connections): if not cg.enabled: continue i, o = cg.key if o not in required and i not in required: continue if o not in node_inputs: node_inputs[o] = [(i, cg.weight)] else: node_inputs[o].append((i, cg.weight)) node_evals = {} for node_key, inputs in iteritems(node_inputs): node = genome.nodes[node_key] activation_function = genome_config.activation_defs.get(node.activation) aggregation_function = genome_config.aggregation_function_defs.get(node.aggregation) node_evals[node_key] = CTRNNNodeEval(time_constant, activation_function, aggregation_function, node.bias, node.response, inputs) return CTRNN(genome_config.input_keys, genome_config.output_keys, node_evals)