Cubes not moving towards target in NEAT implementation from scratch in Python (not neat-python)

I have created a NEAT model from scratch in Python. Its neural networks are used to control the movement of cubes in 3D space as they move towards a target. This was meant to improve on a previous implementation of the Genetic Algorithm, where the cubes could learn to move towards a stationary target but couldn’t handle it when the target moved randomly. Unfortunately with this new implementation using NEAT the cubes don’t improve at moving towards the target even when the target does not move.

I have read part of the original NEAT paper but found it hard to understand, however after some research I think that I have understood the central components like innovation numbers, speciation and historical markings through said innovation numbers. Here is the current code for my Neural Network class:

from Connection import Connection
from Neuron import Neuron
import globalvars
import random
import copy
import line_profiler
import timeit
import numpy as np
import time

class Neural_Network():
    def __init__(self, num_inputs, num_outputs):
        self.genome_neurons = []
        self.genome_connections = []
        self.input_neurons = []
        self.output_neurons = []
        self.num_inputs = num_inputs
        self.num_outputs = num_outputs
        self.in_out_layers()
        self.runtime = 0
    
    def in_out_layers(self):
        for i in range(self.num_inputs):
            neuron = Neuron(i, None, None)
            self.genome_neurons.append(neuron)
            self.input_neurons.append(neuron)
        
        for i in range(self.num_inputs, self.num_inputs + self.num_outputs):
            neuron = Neuron(i, None, None)
            self.genome_neurons.append(neuron)
            self.output_neurons.append(neuron)
        
        neuron = Neuron(self.num_inputs+self.num_outputs, None, None)
        self.genome_neurons.append(neuron)
        self.input_neurons.append(neuron)
        neuron = Neuron(self.num_inputs+self.num_outputs+1, None, None)
        self.genome_neurons.append(neuron)
        self.input_neurons.append(neuron)
    
    def create_connection(self, input_neuron, output_neuron):
        new_connection = True
        
        for connection in globalvars.connections:
            if connection.input_neuron == input_neuron and connection.output_neuron == output_neuron:
                new_connection = False
                innovation_number = connection.innovation_number
            
        if new_connection == True:
            innovation_number = globalvars.next_innovation_number
            globalvars.next_innovation_number += 1
            
        connection = Connection(input_neuron, output_neuron, innovation_number)
        if connection not in self.genome_connections:
            self.genome_connections.append(connection)
            globalvars.connections.append(connection)
        
        return connection
    
    def create_node(self, connection):
        input_neuron = connection.input_neuron
        output_neuron = connection.output_neuron
        new_neuron = True
        
        for neuron in globalvars.neurons:
            if neuron.input_neuron == input_neuron and neuron.output_neuron == output_neuron:
                new_neuron = False
                neuron_id = neuron.id
                
        if new_neuron == True:
            neuron_id = globalvars.next_id
            globalvars.next_id += 1
        
        connection.active = False
        neuron = Neuron(neuron_id, input_neuron, output_neuron)
        globalvars.neurons.append(neuron)
        self.genome_neurons.append(neuron)
        connection1 = self.create_connection(input_neuron, neuron.id)
        connection2 = self.create_connection(neuron.id, output_neuron)
        connection1.weight = 1
        connection2.weight = connection.weight
    
    def run(self, inputs):
        neuron_map = {instance.id: instance for instance in self.genome_neurons}
        self.reset()
        
        outputs = []
        active_connections = []
        
        self.input_neurons[self.num_inputs].value = 1
        self.input_neurons[self.num_inputs+1].value = 0
        
        for i in range(self.num_inputs):
            self.input_neurons[i].value = inputs[i]
            
        for connection in self.genome_connections:
            if connection.active == True:
                active_connections.append(connection)
        
        for neuron in self.output_neurons:
            outputs.append(self.get_neuron_value(neuron.id, active_connections, neuron_map, visited=None))
        
        max_value = None
        max_index = None

        for i, value in enumerate(outputs):
            if value is not None:
                if max_value is None or value > max_value:
                    max_value = value
                    max_index = i
        
        return max_index
        
    def get_neuron_value(self, neuron_id, active_connections, neuron_map, visited):
        if visited is None:
            visited = set()

        if neuron_id in visited:
            return 0

        visited.add(neuron_id)
        
        neuron = neuron_map[neuron_id]
        for connection in active_connections:
            if connection.output_neuron == neuron_id:
                input_neuron = neuron_map[connection.input_neuron]
                if input_neuron.value == None:
                    neuron.sum += self.get_neuron_value(connection.input_neuron, active_connections, neuron_map, visited)
                else:
                    neuron.sum += input_neuron.value
        
        neuron.activate()
        
        return neuron.value
                
                
    def reset(self):
        for neuron in self.genome_neurons:
            neuron.sum = 0
            neuron.value = None
    
    def crossover(self, parent2):
        aligned_connections = []
        
        parent1_connections = self.genome_connections
        parent2_connections = parent2.genome_connections
        
        parent1_connections.sort(key=lambda x: x.innovation_number)
        parent2_connections.sort(key=lambda x: x.innovation_number)
        
        p1_index = 0
        p2_index = 0
        
        while p1_index < len(parent1_connections) and p2_index < len(parent2_connections):
            connection1 = copy.deepcopy(parent1_connections[p1_index])
            connection2 = copy.deepcopy(parent2_connections[p2_index])
            
            if connection1.innovation_number == connection2.innovation_number:
                aligned_connections.append(connection1 if random.random() < 0.5 else connection2)
                p1_index += 1
                p2_index += 1
            elif connection1.innovation_number < connection2.innovation_number:
                aligned_connections.append(connection1)
                p1_index += 1
            else:
                aligned_connections.append(connection2)
                p2_index += 1
        
        while p1_index < len(parent1_connections):
            aligned_connections.append(parent1_connections[p1_index])
            p1_index += 1
        
        while p2_index < len(parent2_connections):
            aligned_connections.append(parent2_connections[p2_index])
            p2_index += 1
        
        
        offspring = Neural_Network(self.num_inputs, self.num_outputs)
        offspring.genome_neurons = []
        offspring.input_neurons = []
        offspring.output_neurons = []
        offspring.genome_connections = aligned_connections
        offspring.in_out_layers()

        offspring_node_ids = []
        
        for neuron in offspring.genome_neurons:
            offspring_node_ids.append(neuron.id)
        
        for connection in aligned_connections:
            if connection.input_neuron not in offspring_node_ids:
                offspring_node_ids.append(connection.input_neuron)
            elif connection.output_neuron not in offspring_node_ids:
                offspring_node_ids.append(connection.output_neuron)
        
        for neuron_id in offspring_node_ids:
            for neuron in self.genome_neurons:
                if neuron.id == neuron_id:
                    new_neuron = Neuron(neuron.id, neuron.input_neuron, neuron.output_neuron)
                    if not any(new_neuron.id == neuron.id for neuron in offspring.genome_neurons):
                        offspring.genome_neurons.append(new_neuron)
            
            for neuron in parent2.genome_neurons:
                if neuron.id == neuron_id:
                    new_neuron = Neuron(neuron.id, neuron.input_neuron, neuron.output_neuron)
                    if not any(new_neuron.id == neuron.id for neuron in offspring.genome_neurons):
                        offspring.genome_neurons.append(new_neuron)
        
        return offspring

    def mutate_connection(self):
        non_output_neurons = self.genome_neurons[:self.num_inputs] + self.genome_neurons[self.num_inputs + self.num_outputs + 1:]
        input_neuron = random.choice(non_output_neurons)
        while True:
            output_neuron = random.choice(self.genome_neurons[self.num_inputs:])
            if output_neuron != input_neuron:
                break
        self.create_connection(input_neuron.id, output_neuron.id)
        
    def mutate_neuron(self):
        self.create_node(random.choice(self.genome_connections))
    
    def mutate_enable_disable(self):
        connection = random.choice(self.genome_connections)
        
        if connection.active == True:
            connection.active = False
        elif connection.active == False:
            connection.active = True
        
    def mutate_weight_shift(self):
        connection = random.choice(self.genome_connections)
        connection.weight *= (random.random() * 2)
        
    def mutate_weight_random(self):
        connection = random.choice(self.genome_connections)
        connection.weight = (random.random() * 4) - 2
    
    def mutate(self, probabilities):
        if probabilities[0] > random.random():
            self.mutate_connection()
            
        if probabilities[1] > random.random() and len(self.genome_connections) > 0:
            self.mutate_neuron()
            
        if probabilities[2] > random.random() and len(self.genome_connections) > 0:
            self.mutate_enable_disable()
            
        if probabilities[3] > random.random() and len(self.genome_connections) > 0:
            self.mutate_weight_shift()
        
        if probabilities[4] > random.random() and len(self.genome_connections) > 0:
            self.mutate_weight_random()

Here is the code for the Neuron and Connection classes:

import random

class Connection:
    def __init__(self, input_neuron, output_neuron, innovation_number):
        self.input_neuron = input_neuron
        self.output_neuron = output_neuron
        self.active = True
        self.innovation_number = innovation_number
        self.weight = random.random() 

import numpy as np 

class Neuron:
    def __init__(self, id, input_neuron, output_neuron):
        self.id = id
        self.input_neuron = input_neuron
        self.output_neuron = output_neuron
        self.sum = 0
        self.value = None
    
    def activate(self):
        self.value = self.sigmoid(self.sum)

    def sigmoid(self, x): 
        z = self.clip(x, -20, 20)
        return 1 / (1 + np.e**-z)

    def clip(self, value, min_value, max_value):
        if value < min_value:
            return min_value
        elif value > max_value:
            return max_value
        else:
            return value

The input neuron and output neuron attributes in those classes are their ids, not the instances themselves. I have read that some people use the same type of variable for both neurons and connections, but I found using ids and innovation numbers to be easier to understand. I use a globalavrs file to keep a record of these independent of each network. The ‘neurons’ and ‘connections’ variables are arrays that contain instances of these classes. The next_innovation_number variable is an integer that begins at 1, and next_id is 16.

Here is the class I use to create the next generation. I have included several methods of choosing the best individuals for the next generation including just sorting them all and selecting those with the best scores, speciating using the neural networks and how similar they are and also speciating using the final position of the cubes controlled by the neural networks.

from Individual_AI import Individual
import random
import globalvars
import barrier
import neural_network
import copy 
import random

class GeneticAlgorithm: 
    def __init__(self):
        self.population = []
        self.current_generation = 1
        self.num_moves = 50
        self.pop_size = 200
        self.num_selected = 20
        self.mutation_probabilities = [0.6, 0.05, 0.2, 1.001, 0.675]
        self.target_origin = ((globalvars.target_vertices[0][0] + globalvars.target_vertices[1][0])/2,
                              globalvars.target_vertices[0][1],
                              (globalvars.target_vertices[0][2] + globalvars.target_vertices[2][2])/2)
    
    def create_population(self):
        for i in range(self.pop_size):
            nn = neural_network.Neural_Network(10, 4)
            nn.mutate(self.mutation_probabilities)
            ai = Individual(nn)
            self.population.append(ai) 
                
    def create_next_generation(self):
        self.mutation_probabilities = [0.6, 0.1, 0.2, 1.001, 0.675]
        best_individuals = self.selection()
        self.population = []

        for individual in best_individuals:
            print(individual.score)
    
        for i in range(self.pop_size - len(best_individuals)):
            parent1 = random.choice(best_individuals)
            parent2 = random.choice(best_individuals)
            offspring_nn = parent1.neural_network.crossover(parent2.neural_network)
            offspring_nn.mutate(self.mutation_probabilities)
            child = Individual(offspring_nn)
            self.population.append(child)
        
        for i in range(len(best_individuals)):
            parent = best_individuals[i]
            child_nn = copy.deepcopy(parent.neural_network)
            child = Individual(child_nn)
            self.population.append(child)   
                    
        #self.randomise_target()
        self.randomise_barriers()
    
    def selection(self):
        selected_individuals = []
        sorted_individuals = sorted(self.population, key=lambda x: x.score, reverse=True)
        selected_individuals = sorted_individuals[:self.num_selected]
        
        return selected_individuals   
    
    def speciation(self):
        selected_individuals = []
        species = []
        
        first_individual = self.population[0]
        species.append([first_individual])
        self.population.remove(first_individual)
        
        for individual in self.population:
            individual_is_new_species = True
            
            nn = individual.neural_network
            nn_innovation_nums = list(map(lambda instance: instance.innovation_number, nn.genome_connections))
            nn_ids = list(map(lambda instance: instance.id, nn.genome_neurons))
            
            for individual_species in species:
                model_nn = individual_species[0].neural_network
                model_innovation_nums = list(map(lambda instance: instance.innovation_number, model_nn.genome_connections))
                model_ids = list(map(lambda instance: instance.id, model_nn.genome_neurons))
                
                shared_innovation_nums = set(nn_innovation_nums) & set(model_innovation_nums)
                shared_ids = set(nn_ids) & set(model_ids)
                
                if len(shared_innovation_nums) > (len(nn_innovation_nums)/2) and len(shared_ids) > (len(nn_ids)-(nn.num_inputs + nn.num_outputs + 2))/2:
                    individual_species.append(individual)
                    individual_is_new_species = False
                    individual_species = sorted(individual_species, key=lambda x: x.score, reverse=True)
            
            if individual_is_new_species == True:
                species.append([individual])

        num_species = len(species)
        
        for _ in range(self.num_selected):
            for individual_species in species:
                if len(selected_individuals) < self.num_selected and len(individual_species) > 0:
                    if individual_species[0].score > 0 or num_species < 2 or len(selected_individuals) < 2:
                        selected_individuals.append(individual_species[0])
                        individual_species.pop(0)
         
        return selected_individuals  
    
    def speciation2(self):
        selected_individuals = []
        species = []
        
        first_individual = self.population[0]
        species.append([first_individual])
        self.population.remove(first_individual)
        
        for individual in self.population:
            individual_is_new_species = True
            
            individual_xpos = individual.cube_pos[0]
            individual_zpos = individual.cube_pos[2]
            
            for individual_species in species:
                species_xpos = individual_species[0].cube_pos[0]
                species_zpos = individual_species[0].cube_pos[2]
                
                difference_x = abs(individual_xpos - species_xpos)
                difference_z = abs(individual_zpos - species_zpos)
                
                if difference_x < 2 and difference_z < 2:
                    individual_species.append(individual)
                    individual_is_new_species = False
                    individual_species = sorted(individual_species, key=lambda x: x.score, reverse=True)
            
            if individual_is_new_species == True:
                species.append([individual])

        num_species = len(species)
        species = sorted(species, key=lambda x: x[0].score, reverse=True)
        
        for _ in range(self.num_selected):
            for individual_species in species:
                if len(selected_individuals) < self.num_selected and len(individual_species) > 0:
                    if individual_species[0].score > 0 or num_species < 2 or len(selected_individuals) < 2:
                        selected_individuals.append(individual_species[0])
                        individual_species.pop(0)
        
        return selected_individuals          

    def randomise_target(self):
        randrange = int(self.current_generation/10)
        x = self.target_origin[0] - randrange + random.randint(0, randrange*2)
        z = self.target_origin[2] - randrange + random.randint(0, randrange*2)
        
        if x < -20:
            x = -20
        elif x > 20:
            x = 20
        
        if z > -15:
            z = -15
        elif z < -55:
            z = -55
        
        globalvars.target_vertices = (
            (x + 5, -9.5, z + 5),
            (x, -9.5, z + 5),
            (x, -9.5, z),
            (x + 5, -9.5, z),
        )

Although the target is not being randomised now I had tried implementing a gradual randomisation function where as the genetic algorithm cycled through generations the targets position started being more and more random. Unfortuantely that didn’t have much effect.

I have tried varying many of the hyperparameters like population size, number of moves per cube per generation, mutation rates and also the number of generations but it doesn’t work well. Although they may make some improvements this generally flattens out fairly quickly and they don’t become very fit. Another issue I had is that the amount of time it takes to run each network increases exponentially as they become more complex. I have tried removing neurons that have too many but this didn’t make a difference and I was worried it would limit exploration.

How can I make this NEAT implementation learn properly and run faster?
There is more code to this project, you can find it here: https://github.com/Shbc314159/NEAT-ai (this is old, I will push some changes).

N.B. Here is my fitness function:

        self.distance_from_target = ((self.x_center - self.cube_pos[0]) ** 2 + 
                            (self.y_center - self.cube_pos[1]) ** 2 + 
                            (self.z_center - self.cube_pos[2]) ** 2) ** 0.5 
        self.score += 1/self.distance_from_target

The x, y and z centers are the centers of the target.

I have asked about this many times on other forums on reddit and either no-one knows how NEAT works or they don’t know how the coding implementation should work so I would appreciate any answers even if they are not comprehensive.

I can’t comment on the learning as that’s very subject specific and I would have to understand how your model is over/under fitting and what the characteristics of the data set are.

In terms of performance, I’d ask, what are your goals? If the algorithm is indeed exponential, getting it to polynomial time is sometimes worth doing its own research paper on.

You might already know, but if you just want to speed execution, I’d implement it in C++ or Rust since those are compiled languages (as opposed to Python being interpreted). Whenever we get good performance out of ML algorithms in python it’s simply because it’s leveraging C or C++ libraries under the hood explicitly.

1 Like

Thanks for your answer. After running some tests, I believe the performance may actually be polynomial as it is the number of neurons * the number of connections. I have thought and am trying to implement it in c++, however as I’m already running the program with things like pypy and nuitka which do transform some elements of it to cpp, I’m not sure how much I will gain in performance.

About the learning issue - I’m not actually using a dataset or backpropagation or anything like that. The networks are run on real time input from the cubes about their environment, and the improvement occurs after each generation as the best networks are selected then mutated and crossed over.

Not sure if I mentionned this in the answer, but I have got it working with the XOR problem. It solves it within around 100 generations, creating a solution with 6 hidden neurons (far from ideal, I know). However when I tried approximating some basic functions like y = 5x and y = x**2 with it NEAT failed spectacularly. The output of the best network after 1000 generations, when visualised, was generally either a flat line or a sigmoid function (which is its activation). I’m still new to this topic, but my understanding was that networks should be able to approximate functions other than their own activation functions. Does this seem like an error in the running of the networks or their evolution/improvement?
If you give me an example neural network with weights, connections etc I can implement it and test if it runs as it should. As of yet I have been unable to find examples of any other than the xor problem online.