I have created a NEAT model from scratch in Python. Its neural networks are used to control the movement of cubes in 3D space as they move towards a target. This was meant to improve on a previous implementation of the Genetic Algorithm, where the cubes could learn to move towards a stationary target but couldn’t handle it when the target moved randomly. Unfortunately with this new implementation using NEAT the cubes don’t improve at moving towards the target even when the target does not move.
I have read part of the original NEAT paper but found it hard to understand, however after some research I think that I have understood the central components like innovation numbers, speciation and historical markings through said innovation numbers. Here is the current code for my Neural Network class:
from Connection import Connection
from Neuron import Neuron
import globalvars
import random
import copy
import line_profiler
import timeit
import numpy as np
import time
class Neural_Network():
def __init__(self, num_inputs, num_outputs):
self.genome_neurons = []
self.genome_connections = []
self.input_neurons = []
self.output_neurons = []
self.num_inputs = num_inputs
self.num_outputs = num_outputs
self.in_out_layers()
self.runtime = 0
def in_out_layers(self):
for i in range(self.num_inputs):
neuron = Neuron(i, None, None)
self.genome_neurons.append(neuron)
self.input_neurons.append(neuron)
for i in range(self.num_inputs, self.num_inputs + self.num_outputs):
neuron = Neuron(i, None, None)
self.genome_neurons.append(neuron)
self.output_neurons.append(neuron)
neuron = Neuron(self.num_inputs+self.num_outputs, None, None)
self.genome_neurons.append(neuron)
self.input_neurons.append(neuron)
neuron = Neuron(self.num_inputs+self.num_outputs+1, None, None)
self.genome_neurons.append(neuron)
self.input_neurons.append(neuron)
def create_connection(self, input_neuron, output_neuron):
new_connection = True
for connection in globalvars.connections:
if connection.input_neuron == input_neuron and connection.output_neuron == output_neuron:
new_connection = False
innovation_number = connection.innovation_number
if new_connection == True:
innovation_number = globalvars.next_innovation_number
globalvars.next_innovation_number += 1
connection = Connection(input_neuron, output_neuron, innovation_number)
if connection not in self.genome_connections:
self.genome_connections.append(connection)
globalvars.connections.append(connection)
return connection
def create_node(self, connection):
input_neuron = connection.input_neuron
output_neuron = connection.output_neuron
new_neuron = True
for neuron in globalvars.neurons:
if neuron.input_neuron == input_neuron and neuron.output_neuron == output_neuron:
new_neuron = False
neuron_id = neuron.id
if new_neuron == True:
neuron_id = globalvars.next_id
globalvars.next_id += 1
connection.active = False
neuron = Neuron(neuron_id, input_neuron, output_neuron)
globalvars.neurons.append(neuron)
self.genome_neurons.append(neuron)
connection1 = self.create_connection(input_neuron, neuron.id)
connection2 = self.create_connection(neuron.id, output_neuron)
connection1.weight = 1
connection2.weight = connection.weight
def run(self, inputs):
neuron_map = {instance.id: instance for instance in self.genome_neurons}
self.reset()
outputs = []
active_connections = []
self.input_neurons[self.num_inputs].value = 1
self.input_neurons[self.num_inputs+1].value = 0
for i in range(self.num_inputs):
self.input_neurons[i].value = inputs[i]
for connection in self.genome_connections:
if connection.active == True:
active_connections.append(connection)
for neuron in self.output_neurons:
outputs.append(self.get_neuron_value(neuron.id, active_connections, neuron_map, visited=None))
max_value = None
max_index = None
for i, value in enumerate(outputs):
if value is not None:
if max_value is None or value > max_value:
max_value = value
max_index = i
return max_index
def get_neuron_value(self, neuron_id, active_connections, neuron_map, visited):
if visited is None:
visited = set()
if neuron_id in visited:
return 0
visited.add(neuron_id)
neuron = neuron_map[neuron_id]
for connection in active_connections:
if connection.output_neuron == neuron_id:
input_neuron = neuron_map[connection.input_neuron]
if input_neuron.value == None:
neuron.sum += self.get_neuron_value(connection.input_neuron, active_connections, neuron_map, visited)
else:
neuron.sum += input_neuron.value
neuron.activate()
return neuron.value
def reset(self):
for neuron in self.genome_neurons:
neuron.sum = 0
neuron.value = None
def crossover(self, parent2):
aligned_connections = []
parent1_connections = self.genome_connections
parent2_connections = parent2.genome_connections
parent1_connections.sort(key=lambda x: x.innovation_number)
parent2_connections.sort(key=lambda x: x.innovation_number)
p1_index = 0
p2_index = 0
while p1_index < len(parent1_connections) and p2_index < len(parent2_connections):
connection1 = copy.deepcopy(parent1_connections[p1_index])
connection2 = copy.deepcopy(parent2_connections[p2_index])
if connection1.innovation_number == connection2.innovation_number:
aligned_connections.append(connection1 if random.random() < 0.5 else connection2)
p1_index += 1
p2_index += 1
elif connection1.innovation_number < connection2.innovation_number:
aligned_connections.append(connection1)
p1_index += 1
else:
aligned_connections.append(connection2)
p2_index += 1
while p1_index < len(parent1_connections):
aligned_connections.append(parent1_connections[p1_index])
p1_index += 1
while p2_index < len(parent2_connections):
aligned_connections.append(parent2_connections[p2_index])
p2_index += 1
offspring = Neural_Network(self.num_inputs, self.num_outputs)
offspring.genome_neurons = []
offspring.input_neurons = []
offspring.output_neurons = []
offspring.genome_connections = aligned_connections
offspring.in_out_layers()
offspring_node_ids = []
for neuron in offspring.genome_neurons:
offspring_node_ids.append(neuron.id)
for connection in aligned_connections:
if connection.input_neuron not in offspring_node_ids:
offspring_node_ids.append(connection.input_neuron)
elif connection.output_neuron not in offspring_node_ids:
offspring_node_ids.append(connection.output_neuron)
for neuron_id in offspring_node_ids:
for neuron in self.genome_neurons:
if neuron.id == neuron_id:
new_neuron = Neuron(neuron.id, neuron.input_neuron, neuron.output_neuron)
if not any(new_neuron.id == neuron.id for neuron in offspring.genome_neurons):
offspring.genome_neurons.append(new_neuron)
for neuron in parent2.genome_neurons:
if neuron.id == neuron_id:
new_neuron = Neuron(neuron.id, neuron.input_neuron, neuron.output_neuron)
if not any(new_neuron.id == neuron.id for neuron in offspring.genome_neurons):
offspring.genome_neurons.append(new_neuron)
return offspring
def mutate_connection(self):
non_output_neurons = self.genome_neurons[:self.num_inputs] + self.genome_neurons[self.num_inputs + self.num_outputs + 1:]
input_neuron = random.choice(non_output_neurons)
while True:
output_neuron = random.choice(self.genome_neurons[self.num_inputs:])
if output_neuron != input_neuron:
break
self.create_connection(input_neuron.id, output_neuron.id)
def mutate_neuron(self):
self.create_node(random.choice(self.genome_connections))
def mutate_enable_disable(self):
connection = random.choice(self.genome_connections)
if connection.active == True:
connection.active = False
elif connection.active == False:
connection.active = True
def mutate_weight_shift(self):
connection = random.choice(self.genome_connections)
connection.weight *= (random.random() * 2)
def mutate_weight_random(self):
connection = random.choice(self.genome_connections)
connection.weight = (random.random() * 4) - 2
def mutate(self, probabilities):
if probabilities[0] > random.random():
self.mutate_connection()
if probabilities[1] > random.random() and len(self.genome_connections) > 0:
self.mutate_neuron()
if probabilities[2] > random.random() and len(self.genome_connections) > 0:
self.mutate_enable_disable()
if probabilities[3] > random.random() and len(self.genome_connections) > 0:
self.mutate_weight_shift()
if probabilities[4] > random.random() and len(self.genome_connections) > 0:
self.mutate_weight_random()
Here is the code for the Neuron and Connection classes:
import random
class Connection:
def __init__(self, input_neuron, output_neuron, innovation_number):
self.input_neuron = input_neuron
self.output_neuron = output_neuron
self.active = True
self.innovation_number = innovation_number
self.weight = random.random()
import numpy as np
class Neuron:
def __init__(self, id, input_neuron, output_neuron):
self.id = id
self.input_neuron = input_neuron
self.output_neuron = output_neuron
self.sum = 0
self.value = None
def activate(self):
self.value = self.sigmoid(self.sum)
def sigmoid(self, x):
z = self.clip(x, -20, 20)
return 1 / (1 + np.e**-z)
def clip(self, value, min_value, max_value):
if value < min_value:
return min_value
elif value > max_value:
return max_value
else:
return value
The input neuron and output neuron attributes in those classes are their ids, not the instances themselves. I have read that some people use the same type of variable for both neurons and connections, but I found using ids and innovation numbers to be easier to understand. I use a globalavrs file to keep a record of these independent of each network. The ‘neurons’ and ‘connections’ variables are arrays that contain instances of these classes. The next_innovation_number variable is an integer that begins at 1, and next_id is 16.
Here is the class I use to create the next generation. I have included several methods of choosing the best individuals for the next generation including just sorting them all and selecting those with the best scores, speciating using the neural networks and how similar they are and also speciating using the final position of the cubes controlled by the neural networks.
from Individual_AI import Individual
import random
import globalvars
import barrier
import neural_network
import copy
import random
class GeneticAlgorithm:
def __init__(self):
self.population = []
self.current_generation = 1
self.num_moves = 50
self.pop_size = 200
self.num_selected = 20
self.mutation_probabilities = [0.6, 0.05, 0.2, 1.001, 0.675]
self.target_origin = ((globalvars.target_vertices[0][0] + globalvars.target_vertices[1][0])/2,
globalvars.target_vertices[0][1],
(globalvars.target_vertices[0][2] + globalvars.target_vertices[2][2])/2)
def create_population(self):
for i in range(self.pop_size):
nn = neural_network.Neural_Network(10, 4)
nn.mutate(self.mutation_probabilities)
ai = Individual(nn)
self.population.append(ai)
def create_next_generation(self):
self.mutation_probabilities = [0.6, 0.1, 0.2, 1.001, 0.675]
best_individuals = self.selection()
self.population = []
for individual in best_individuals:
print(individual.score)
for i in range(self.pop_size - len(best_individuals)):
parent1 = random.choice(best_individuals)
parent2 = random.choice(best_individuals)
offspring_nn = parent1.neural_network.crossover(parent2.neural_network)
offspring_nn.mutate(self.mutation_probabilities)
child = Individual(offspring_nn)
self.population.append(child)
for i in range(len(best_individuals)):
parent = best_individuals[i]
child_nn = copy.deepcopy(parent.neural_network)
child = Individual(child_nn)
self.population.append(child)
#self.randomise_target()
self.randomise_barriers()
def selection(self):
selected_individuals = []
sorted_individuals = sorted(self.population, key=lambda x: x.score, reverse=True)
selected_individuals = sorted_individuals[:self.num_selected]
return selected_individuals
def speciation(self):
selected_individuals = []
species = []
first_individual = self.population[0]
species.append([first_individual])
self.population.remove(first_individual)
for individual in self.population:
individual_is_new_species = True
nn = individual.neural_network
nn_innovation_nums = list(map(lambda instance: instance.innovation_number, nn.genome_connections))
nn_ids = list(map(lambda instance: instance.id, nn.genome_neurons))
for individual_species in species:
model_nn = individual_species[0].neural_network
model_innovation_nums = list(map(lambda instance: instance.innovation_number, model_nn.genome_connections))
model_ids = list(map(lambda instance: instance.id, model_nn.genome_neurons))
shared_innovation_nums = set(nn_innovation_nums) & set(model_innovation_nums)
shared_ids = set(nn_ids) & set(model_ids)
if len(shared_innovation_nums) > (len(nn_innovation_nums)/2) and len(shared_ids) > (len(nn_ids)-(nn.num_inputs + nn.num_outputs + 2))/2:
individual_species.append(individual)
individual_is_new_species = False
individual_species = sorted(individual_species, key=lambda x: x.score, reverse=True)
if individual_is_new_species == True:
species.append([individual])
num_species = len(species)
for _ in range(self.num_selected):
for individual_species in species:
if len(selected_individuals) < self.num_selected and len(individual_species) > 0:
if individual_species[0].score > 0 or num_species < 2 or len(selected_individuals) < 2:
selected_individuals.append(individual_species[0])
individual_species.pop(0)
return selected_individuals
def speciation2(self):
selected_individuals = []
species = []
first_individual = self.population[0]
species.append([first_individual])
self.population.remove(first_individual)
for individual in self.population:
individual_is_new_species = True
individual_xpos = individual.cube_pos[0]
individual_zpos = individual.cube_pos[2]
for individual_species in species:
species_xpos = individual_species[0].cube_pos[0]
species_zpos = individual_species[0].cube_pos[2]
difference_x = abs(individual_xpos - species_xpos)
difference_z = abs(individual_zpos - species_zpos)
if difference_x < 2 and difference_z < 2:
individual_species.append(individual)
individual_is_new_species = False
individual_species = sorted(individual_species, key=lambda x: x.score, reverse=True)
if individual_is_new_species == True:
species.append([individual])
num_species = len(species)
species = sorted(species, key=lambda x: x[0].score, reverse=True)
for _ in range(self.num_selected):
for individual_species in species:
if len(selected_individuals) < self.num_selected and len(individual_species) > 0:
if individual_species[0].score > 0 or num_species < 2 or len(selected_individuals) < 2:
selected_individuals.append(individual_species[0])
individual_species.pop(0)
return selected_individuals
def randomise_target(self):
randrange = int(self.current_generation/10)
x = self.target_origin[0] - randrange + random.randint(0, randrange*2)
z = self.target_origin[2] - randrange + random.randint(0, randrange*2)
if x < -20:
x = -20
elif x > 20:
x = 20
if z > -15:
z = -15
elif z < -55:
z = -55
globalvars.target_vertices = (
(x + 5, -9.5, z + 5),
(x, -9.5, z + 5),
(x, -9.5, z),
(x + 5, -9.5, z),
)
Although the target is not being randomised now I had tried implementing a gradual randomisation function where as the genetic algorithm cycled through generations the targets position started being more and more random. Unfortuantely that didn’t have much effect.
I have tried varying many of the hyperparameters like population size, number of moves per cube per generation, mutation rates and also the number of generations but it doesn’t work well. Although they may make some improvements this generally flattens out fairly quickly and they don’t become very fit. Another issue I had is that the amount of time it takes to run each network increases exponentially as they become more complex. I have tried removing neurons that have too many but this didn’t make a difference and I was worried it would limit exploration.
How can I make this NEAT implementation learn properly and run faster?
There is more code to this project, you can find it here: GitHub - Shbc314159/NEAT-ai (this is old, I will push some changes).
N.B. Here is my fitness function:
self.distance_from_target = ((self.x_center - self.cube_pos[0]) ** 2 +
(self.y_center - self.cube_pos[1]) ** 2 +
(self.z_center - self.cube_pos[2]) ** 2) ** 0.5
self.score += 1/self.distance_from_target
The x, y and z centers are the centers of the target.
I have asked about this many times on other forums on reddit and either no-one knows how NEAT works or they don’t know how the coding implementation should work so I would appreciate any answers even if they are not comprehensive.