Source code for flex.gp.util

import yaml
from itertools import chain
import ray
import numpy as np
from deap import gp
from flex.gp.cochain_primitives import Dimension, Rank


# Define how to handle the !dim tag
[docs] def dim_constructor(loader, node): # node.value is the string '0' or '1' from the YAML return Dimension(int(node.value))
# Define how to handle the !rank tag
[docs] def rank_constructor(loader, node): # node.value is 'SC', 'V', or 'T' val = node.value if val == "SC": return Rank.SCALAR elif val == "V": return Rank.VECTOR elif val == "T": return Rank.TENSOR
[docs] def load_config_data(filename): """Load problem settings from YAML file.""" # Register the tags with the SafeLoader (or your preferred loader) yaml.SafeLoader.add_constructor("!dim", dim_constructor) yaml.SafeLoader.add_constructor("!rank", rank_constructor) with open(filename) as config_file: config_file_data = yaml.load(config_file, Loader=yaml.SafeLoader) regressor_params = dict() regressor_params["num_individuals"] = config_file_data["gp"]["num_individuals"] regressor_params["generations"] = config_file_data["gp"]["generations"] regressor_params["num_islands"] = config_file_data["gp"]["multi_island"][ "num_islands" ] regressor_params["mig_freq"] = config_file_data["gp"]["multi_island"]["migration"][ "freq" ] regressor_params["mig_frac"] = config_file_data["gp"]["multi_island"]["migration"][ "frac" ] regressor_params["crossover_prob"] = config_file_data["gp"]["crossover_prob"] regressor_params["mut_prob"] = config_file_data["gp"]["mut_prob"] regressor_params["frac_elitist"] = config_file_data["gp"]["frac_elitist"] regressor_params["overlapping_generation"] = config_file_data["gp"][ "overlapping_generation" ] regressor_params["select_fun"] = config_file_data["gp"]["select"]["fun"] regressor_params["select_args"] = config_file_data["gp"]["select"]["kargs"] regressor_params["mut_fun"] = config_file_data["gp"]["mutate"]["fun"] regressor_params["mut_args"] = config_file_data["gp"]["mutate"]["kargs"] regressor_params["expr_mut_fun"] = config_file_data["gp"]["mutate"]["expr_mut"] regressor_params["expr_mut_args"] = config_file_data["gp"]["mutate"][ "expr_mut_kargs" ] regressor_params["crossover_fun"] = config_file_data["gp"]["crossover"]["fun"] regressor_params["crossover_args"] = config_file_data["gp"]["crossover"]["kargs"] regressor_params["validate"] = config_file_data["gp"]["validate"] # Optional regressor settings optional_keys = [ "variation_mechanism", "early_stop_fitness_threshold", ] for key in optional_keys: if key in config_file_data["gp"]: regressor_params[key] = config_file_data["gp"][key] return regressor_params, config_file_data
[docs] def detect_nested_trigonometric_functions(equation): # List of trigonometric functions trig_functions = ["sin", "cos"] nested = 0 # Flag to indicate if nested functions are found function_depth = 0 # Track depth within trigonometric function calls i = 0 while i < len(equation) and not nested: # Look for trigonometric function trig_found = any( equation[i : i + len(trig)].lower() == trig for trig in trig_functions ) if trig_found: # If a trig function is found, look for its opening parenthesis j = i while j < len(equation) and equation[j] not in ["(", " "]: j += 1 if j < len(equation) and equation[j] == "(": if function_depth > 0: # We are already inside a trig function, this is a nested trig # function nested = 1 function_depth += 1 i = j # Move i to the position of '(' elif equation[i] == "(" and function_depth > 0: # Increase depth if we're already in a trig function function_depth += 1 elif equation[i] == ")": if function_depth > 0: # Leaving a trigonometric function or nested parentheses function_depth -= 1 i += 1 return nested
[docs] def mapper(f, individuals, toolbox_ref, batch_size): fitnesses = [] * len(individuals) for i in range(0, len(individuals), batch_size): individuals_batch = individuals[i : i + batch_size] fitnesses.append(f(individuals_batch, toolbox_ref)) fitnesses = list(chain(*ray.get(fitnesses))) return fitnesses
[docs] def dummy_fitness(individuals_str, toolbox, X, y): fitnesses = [(0.0,)] * len(individuals_str) return fitnesses
[docs] def dummy_score(individuals_str, toolbox, X, y): MSE = [0.0] * len(individuals_str) return MSE
[docs] def dummy_predict(individuals_str, toolbox, X): pred = [np.zeros(len(X))] * len(individuals_str) return pred
[docs] def compile_individuals(toolbox, individuals_str_batch): return [toolbox.compile(expr=ind) for ind in individuals_str_batch]
[docs] def compile_individual_with_consts(tree, toolbox, special_term_name="c"): const_idx = 0 tree_clone = toolbox.clone(tree) for i, node in enumerate(tree_clone): if isinstance(node, gp.Terminal) and not node.name.startswith("ARG"): if node.name == special_term_name: tree_clone[i] = gp.Terminal( f"{special_term_name}[{const_idx}]", True, float ) const_idx += 1 individual = toolbox.compile(expr=tree_clone, extra_args=[special_term_name]) return individual, const_idx
[docs] def fitness_value(ind): return ind.fitness.values
[docs] def avg_func(values): return np.around(np.mean(values), 4)
[docs] def std_func(values): return np.around(np.std(values), 4)
[docs] def min_func(values): return np.around(np.min(values), 4)
[docs] def max_func(values): return np.around(np.max(values), 4)