Source code for splendor.agents.our_agents.genetic_algorithm.evolve

import shutil
from argparse import ArgumentParser
from csv import writer as csv_writer
from datetime import datetime
from multiprocessing import cpu_count, Pool
from pathlib import Path

from splendor.agents.our_agents.genetic_algorithm.genes import (
    Gene,
    ManagerGene,
    StrategyGene,
)
from splendor.agents.our_agents.genetic_algorithm.genetic_algorithm_agent import (
    GeneAlgoAgent,
)
from splendor.game import Game
from splendor.Splendor import features
from splendor.Splendor.utils import LimitRoundsGameRule

import numpy as np


POPULATION_SIZE = 24  # 60
GENERATIONS = 100
MUTATION_RATE = 0.2
DEPENDECY_DEGREE = 3
FOUR_PLAYERS = 4
PLAYERS_OPTIONS = (2, 3, 4)
WORKING_DIR = Path().absolute()
FOLDER_FORMAT = "%y-%m-%d_%H-%M-%S"
# SELECTION = (POPULATION_SIZE // 3) or 2
# RETURN_SIZE = (POPULATION_SIZE // 12) or 1
WINNER_BONUS = 0
MAX_PROCESS = cpu_count() // 2
STATS_FILE = "stats.csv"
STATS_HEADERS = (
    "generation",
    "players_count",
    "rounds_count",
    "nobles_taken",
    "scores_mean",
    "player1_score",
    "player2_score",
    "player3_score",
    "player4_score",
    "tier1_left",
    "tier2_left",
    "tier3_left",
)


[docs] def mutate(gene: Gene, progress: float, mutate_rate: float): """ Mutates a single gene. """ def _mutate(value): """ Mutation method is based on the following article (page 112) http://web.ist.utl.pt/adriano.simoes/tese/referencias/Michalewicz%20Z.%20Genetic%20Algorithms%20+%20Data%20Structures%20=%20Evolution%20Programs%20%283ed%29.PDF """ diff = value - np.random.choice((gene.LOWER_BOUND, gene.UPPER_BOUND)) power = (1 - progress) ** DEPENDECY_DEGREE return value - diff * (1 - np.random.rand() ** power) gene.mutate(mutate_rate, _mutate)
[docs] def mutate_population( population: list[GeneAlgoAgent], progress: float, mutation_rate: float ): """ Mutates the genes of the population. """ for agent in population: mutate(agent._manager_gene, progress, mutation_rate) mutate(agent._strategy_gene_1, progress, mutation_rate) mutate(agent._strategy_gene_2, progress, mutation_rate) mutate(agent._strategy_gene_3, progress, mutation_rate)
def _crossover(dna1: np.array, dna2: np.array) -> tuple[np.array, np.array]: """ Crossover method is based on the following article (page 9) https://www.cs.us.es/~fsancho/ficheros/IA2019/TheContinuousGeneticAlgorithm.pdf """ split_point = np.random.randint(len(dna1)) mix_coefficient = np.random.rand() diff = dna1[split_point] - dna2[split_point] new_value_1 = dna1[split_point] - mix_coefficient * diff new_value_2 = dna2[split_point] + mix_coefficient * diff child1 = np.hstack((dna1[:split_point], [new_value_1], dna2[split_point + 1 :])) child2 = np.hstack((dna2[:split_point], [new_value_2], dna1[split_point + 1 :])) return child1, child2
[docs] def crossover(mom: Gene, dad: Gene) -> tuple[Gene, Gene]: """ Executes crossover between 2 genes, which produces 2 children. """ cls = mom.__class__ if not isinstance(dad, cls): raise TypeError("Crossover works only between genes of the same type") if len(cls.SHAPE) == 1: child_dna_1, child_dna_2 = _crossover(mom._dna, dad._dna) return cls(child_dna_1), cls(child_dna_2) elif len(cls.SHAPE) == 2: children_dna = ( _crossover(dna1, dna2) for dna1, dna2 in zip(mom._dna.T, dad._dna.T) ) child_dna_1, child_dna_2 = zip(*children_dna) return cls(np.vstack(child_dna_1).T), cls(np.vstack(child_dna_2).T) raise ValueError(f"Unsupported DNA shape for crossover {cls.SHAPE}")
[docs] def mate(parents: list[GeneAlgoAgent], population_size: int) -> list[GeneAlgoAgent]: """ Creates new individual by randomly choosing 2 parents and mating them till we got enough individuals. """ CHILDREN_PER_MATING = 2 PARENTS_PER_MATING = 2 children = list() matings = (population_size - len(parents)) // CHILDREN_PER_MATING for _ in range(matings): mom, dad = np.random.choice(parents, PARENTS_PER_MATING, False) managers = crossover(mom._manager_gene, dad._manager_gene) strategies_1 = crossover(mom._strategy_gene_1, dad._strategy_gene_1) strategies_2 = crossover(mom._strategy_gene_2, dad._strategy_gene_2) strategies_3 = crossover(mom._strategy_gene_3, dad._strategy_gene_3) for i in range(CHILDREN_PER_MATING): children.append( GeneAlgoAgent( 0, managers[i], strategies_1[i], strategies_2[i], strategies_3[i] ) ) return children
[docs] def single_game(agents) -> tuple[Game, dict]: """ Runs a single game of Splendor (with the Engine) using the given agents. """ np.random.shuffle(agents) names = list() for i, agent in enumerate(agents): agent.id = i names.append(str(i)) game = Game( LimitRoundsGameRule, agents, len(agents), seed=np.random.randint(1e8, dtype=int), agents_namelist=names, ) return game, game.Run()
def _evaluate_multiprocess( population: list[GeneAlgoAgent], players_count: int, ) -> list[tuple[Game, dict]]: """ """ games = len(population) // players_count if players_count == FOUR_PLAYERS: agents_generator = (population[i : i + FOUR_PLAYERS] for i in range(0, len(population), FOUR_PLAYERS)) else: agents_generator = (population[i::games] for i in range(games)) with Pool(MAX_PROCESS) as pool: return pool.map(single_game, agents_generator) def _evaluate( population: list[GeneAlgoAgent], players_count: int, quiet: bool, ) -> list[tuple[Game, dict]]: """ """ results = list() games = len(population) // players_count for i in range(games): if not quiet: print( f" game number {i+1} " f"({datetime.now().strftime(FOLDER_FORMAT)})" ) if players_count == FOUR_PLAYERS: agents = population[i * FOUR_PLAYERS : (i + 1) * FOUR_PLAYERS] else: agents = population[i::games] results.append(single_game(agents)) return results
[docs] def evaluate( population: list[GeneAlgoAgent], quiet: bool, multiprocess: bool, ) -> dict[GeneAlgoAgent, int]: """ Measures the fitness of each individual by having them play against each other. Each individual plays in 3 games with 1,2 and 3 rivals. """ pool = list() games_stats = list() evaluation = [0] * len(population) for players_count in PLAYERS_OPTIONS: if not quiet: print(f" evaluating games of {players_count} players") if multiprocess: results = _evaluate_multiprocess(population, players_count) else: results = _evaluate(population, players_count, quiet) for game, result in results: max_score = max(result["scores"].values()) for agent in game.agents: evaluation[agent.population_id] += result["scores"][agent.id] if result["scores"][agent.id] == max_score: evaluation[agent.population_id] += WINNER_BONUS stats = [ players_count, len(result["actions"]) // players_count, players_count + 1 - len(game.game_rule.current_game_state.board.nobles), np.mean(tuple(result["scores"].values())) ] stats.extend(result["scores"].get(i, "None") for i in range(FOUR_PLAYERS)) cards_in_play = zip( game.game_rule.current_game_state.board.decks, game.game_rule.current_game_state.board.dealt ) stats.extend(len(deck) + len(tuple(filter(None, dealt))) for deck, dealt in cards_in_play) games_stats.append(stats) return evaluation, games_stats
[docs] def sort_by_fitness( population: list[GeneAlgoAgent], folder: Path, message: str, quiet: bool, multiprocess: bool, ) -> list[list]: if not quiet: print(message) for i, agent in enumerate(population): agent.population_id = i evaluation, games_stats = evaluate(population, quiet, multiprocess) population.sort(key=lambda agent: evaluation[agent.population_id], reverse=True) if not quiet: print( ' Saving the best agent ' f'({evaluation[population[0].population_id]})' ) folder.mkdir() population[0].save(folder) return games_stats
[docs] def generate_initial_population(population_size: int): """ Creates agents with random genes. """ return [ GeneAlgoAgent( 0, ManagerGene.random(), StrategyGene.random(), StrategyGene.random(), StrategyGene.random(), ) for _ in range(population_size) ]
[docs] def evolve( population_size: int = POPULATION_SIZE, generations: int = GENERATIONS, mutation_rate: float = MUTATION_RATE, working_dir: Path = WORKING_DIR, seed: int = None, quiet: bool = False, multiprocess: bool = False, ): """ Genetic algorithm evolution process. In each generation `selection_size` are kept and used for mating. Returns the top `return_size` individuals of the last generation. """ start_time = datetime.now() selection_size = (population_size // 3) or 2 return_size = (population_size // 12) or 1 if seed is not None: np.random.seed(seed) folder = working_dir / start_time.strftime(FOLDER_FORMAT) folder.mkdir() shutil.copy(features.__file__, folder) if not quiet: print(f"({folder.name}) Starting evolution with") print(f" population: {population_size}") print(f" selection: {selection_size}") population = generate_initial_population(population_size) with open(folder / STATS_FILE, "w", newline="\n") as stats_file: stats_csv = csv_writer(stats_file) stats_csv.writerow(STATS_HEADERS) for generation in range(generations): progress = generation / generations generation += 1 games_stats = sort_by_fitness(population, folder / str(generation), f'Gen {generation}', quiet, multiprocess) for stats in games_stats: stats.insert(0, generation) stats_csv.writerow(stats) parents = population[:selection_size] np.random.shuffle(parents) children = mate(parents, population_size) mutate_population(children, progress, mutation_rate) population = parents + children np.random.shuffle(population) games_stats = sort_by_fitness(population, folder / "final", "Final", quiet, multiprocess) for stats in games_stats: stats.insert(0, "final") stats_csv.writerow(stats) if not quiet: print(f"Done (run time was {datetime.now() - start_time})") return population[:return_size]
[docs] def main(): parser = ArgumentParser( prog="evolve", description="Evolves a Splendor agent using genetic algorithm.", ) parser.add_argument( "-p", "--population-size", default=POPULATION_SIZE, type=int, help="Size of the population (should be multiple of 12)", ) parser.add_argument( "-g", "--generations", default=GENERATIONS, type=int, help="Amount of generations", ) parser.add_argument( "-m", "--mutation-rate", default=MUTATION_RATE, type=float, help="Probability to mutate (should be in the range [0,1])", ) parser.add_argument( "-w", "--working-dir", default=WORKING_DIR, type=Path, help="Path to directory to work in (will create a directory with " "current timestamp for each run)", ) parser.add_argument( "-s", "--seed", type=int, help="Seed to set for numpy's random number generator", ) parser.add_argument( "--multiprocess", action='store_true', help="Use multiprocessing to evolve faster", ) parser.add_argument('-q', '--quiet', action='store_true') options = parser.parse_args() if options.population_size <= 0 or (options.population_size % 12): raise ValueError( "To work properly, population size should be a " f"multiple of 12 (not {options.population_size})" ) if options.generations <= 0: raise ValueError(f"Invalid amount of generations {options.generations}") if not 0 <= options.mutation_rate <= 1: raise ValueError(f"Invalid mutation rate value {options.mutation_rate}") evolve(**options.__dict__)
if __name__ == "__main__": main()