A simple symbolic regression example

This notebook implements a symbolic regression pipeline based on Genetic Programming (GP) using the flex framework and its custom GPSymbolicRegressor.

The goal is to discover analytical expressions that best fit a given PMLB dataset by:

  • evolving symbolic expressions;

  • optimizing embedded numerical constants;

  • penalizing overly complex expressions;

  • evaluating performance on training and test datasets.

The code supports:

  • multi-variable regression problems;

  • automatic constant tuning using gradient-based optimization;

  • parallel execution using Ray.

Imports and Dependencies

First, we import all the required libraries for:

  • Genetic Programming: deap.gp, custom flex.gp utilities

  • Numerical computation: numpy, mygrad

  • Optimization: pygmo (for constant fitting)

  • Machine Learning utilities: scikit-learn

  • Parallelism: ray

  • Dataset generation: custom generate_dataset function

The code relies on both evolutionary optimization (for structure) and gradient-based optimization (for constants).

[1]:
%env RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
env: RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
[2]:
from deap import gp

from flex.gp import regressor as gps
from flex.gp.util import (
    detect_nested_trigonometric_functions,
    load_config_data,
    compile_individual_with_consts,
)
from flex.gp.primitives import add_primitives_to_pset_from_dict
import numpy as np
import ray

import warnings
import pygmo as pg

import re
from sklearn.metrics import r2_score

import time

import mygrad as mg
from mygrad._utils.lock_management import mem_guard_off

from functools import partial


from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from pmlb import fetch_data

import matplotlib.pyplot as plt
[3]:
# set up number of cpus per ray worker
num_cpus = 1
[4]:
# --- Custom generate dataset function ---
def generate_dataset(problem: str="1027_ESL", random_state: int=42, scaleXy: bool=True):
    np.random.seed(42)
    num_variables = 1
    scaler_X = None
    scaler_y = None

    # PMLB datasets
    X, y = fetch_data(problem, return_X_y=True, local_cache_dir="./datasets")

    num_variables = X.shape[1]

    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.25, random_state=random_state
    )

    y_train = y_train.reshape(-1, 1)
    y_test = y_test.reshape(-1, 1)

    if scaleXy:
        scaler_X = StandardScaler()
        scaler_y = StandardScaler()

        X_train_scaled = scaler_X.fit_transform(X_train)
        y_train_scaled = scaler_y.fit_transform(y_train)
        X_test_scaled = scaler_X.transform(X_test)
    else:
        X_train_scaled = X_train
        y_train_scaled = y_train
        X_test_scaled = X_test

    y_test = y_test.flatten()
    y_train_scaled = y_train_scaled.flatten()

    num_train_points = X_train.shape[0]


    # note y_test and y_train_scaled must be flattened
    return (
        X_train_scaled,
        y_train_scaled,
        X_test_scaled,
        y_test,
        scaler_X,
        scaler_y,
        num_variables,
        num_train_points,
    )

Evaluation Function and Constant Optimization

The core of the symbolic regression pipeline is the definition of the fitness function. First, we need to define an error measure, which in this case will be the standard MSE between the targets and the predictions. However, we remark that each individual carries some numerical constants that need to be optimized as well. For example, the individual

\[I(x) = c_1\sin(x) + c_2x^2 + c_3\]

may have a near 0 fitness for a given triple \((c_1, c_2, c_3)\) and a very large fitness for other triples. This simple examples shows that it is important to auto-optimize carefully the constants in each individual.

If the evaluation function is differentiable, then one natural choice to auto-optimize constants is the use of any gradient-based solver. In this example, we are going to use the library mygrad for autodifferentiation and pygmo for the optimization routine.

[5]:
def eval_model(individual, X, consts=[]):
    num_variables = X.shape[1]
    if num_variables > 1:
        X = [X[:, i] for i in range(num_variables)]
    else:
        X = [X]
    warnings.filterwarnings("ignore")
    y_pred = individual(*X, consts)
    return y_pred


def compute_MSE(individual, X, y, consts=[]):
    y_pred = eval_model(individual, X, consts)
    MSE = np.mean((y - y_pred) ** 2)

    if np.isnan(MSE) or np.isinf(MSE):
        MSE = 1e8

    return MSE


def eval_MSE_and_tune_constants(tree, toolbox, X, y):
    individual, num_consts = compile_individual_with_consts(tree, toolbox)

    if num_consts > 0:

        eval_MSE = partial(compute_MSE, individual=individual, X=X, y=y)

        x0 = np.ones(num_consts)

        class fitting_problem:
            def fitness(self, x):
                total_err = eval_MSE(consts=x)
                # return [total_err + 0.*(np.linalg.norm(x, 2))**2]
                return [total_err]

            def gradient(self, x):
                with mem_guard_off:
                    xt = mg.tensor(x, copy=False)
                    f =  mg.tensor(self.fitness(xt)[0], copy=False)
                    f.backward()
                return xt.grad

            def get_bounds(self):
                return (-5.0 * np.ones(num_consts), 5.0 * np.ones(num_consts))

        # PYGMO SOLVER
        prb = pg.problem(fitting_problem())
        algo = pg.algorithm(pg.nlopt(solver="lbfgs"))
        # algo = pg.algorithm(pg.pso(gen=10))
        # pop = pg.population(prb, size=70)
        algo.extract(pg.nlopt).maxeval = 10
        pop = pg.population(prb, size=1)
        pop.push_back(x0)
        pop = algo.evolve(pop)
        MSE = pop.champion_f[0]
        consts = pop.champion_x

        if np.isinf(MSE) or np.isnan(MSE):
            MSE = 1e8
    else:
        MSE = compute_MSE(individual, X, y)
        consts = []
    return MSE, consts

Expression Complexity and Structure Checks

These helper functions analyze individuals to:

  • count trigonometric functions;

  • detect nested trigonometric expressions;

  • penalize overly complex or pathological solutions.

They are later used to regularize the fitness function.

[6]:
def check_trig_fn(ind):
    return len(re.findall("cos", str(ind))) + len(re.findall("sin", str(ind)))


def check_nested_trig_fn(ind):
    return detect_nested_trigonometric_functions(str(ind))

def get_features_batch(
    individuals_batch,
    individ_feature_extractors=[len, check_nested_trig_fn, check_trig_fn],
):
    features_batch = [
        [fe(i) for i in individuals_batch] for fe in individ_feature_extractors
    ]

    individ_length = features_batch[0]
    nested_trigs = features_batch[1]
    num_trigs = features_batch[2]
    return individ_length, nested_trigs, num_trigs

Fitness and Prediction Functions

The fitness function combines:

  • prediction error (MSE),

  • structural penalties (expression length),

  • functional penalties (nested trigonometric functions).

A Tarpeian selection strategy is applied to discard very large trees early.

[7]:
def predict(individuals_batch, toolbox, X, penalty, fitness_scale):

    predictions = [None] * len(individuals_batch)

    for i, tree in enumerate(individuals_batch):
        callable, _ = compile_individual_with_consts(tree, toolbox)
        predictions[i] = eval_model(callable, X, consts=tree.consts)

    return predictions


def compute_MSEs(individuals_batch, toolbox, X, y, penalty, fitness_scale):

    total_errs = [None] * len(individuals_batch)

    for i, tree in enumerate(individuals_batch):
        callable, _ = compile_individual_with_consts(tree, toolbox)
        total_errs[i] = compute_MSE(callable, X, y, consts=tree.consts)

    return total_errs


def compute_attributes(individuals_batch, toolbox, X, y, penalty, fitness_scale):

    attributes = [None] * len(individuals_batch)

    individ_length, nested_trigs, num_trigs = get_features_batch(individuals_batch)

    for i, tree in enumerate(individuals_batch):

        # Tarpeian selection
        if individ_length[i] >= 50:
            consts = None
            fitness = (1e8,)
        else:
            MSE, consts = eval_MSE_and_tune_constants(tree, toolbox, X, y)
            fitness = (
                fitness_scale
                * (
                    MSE
                    + 100000 * nested_trigs[i]
                    + 0.0 * num_trigs[i]
                    + penalty["reg_param"] * individ_length[i]
                ),
            )
        attributes[i] = {"consts": consts, "fitness": fitness}
    return attributes


def assign_attributes(individuals_batch, attributes):
    for ind, attr in zip(individuals_batch, attributes):
        ind.consts = attr["consts"]
        ind.fitness.values = attr["fitness"]

Main Training and Evaluation Pipeline

The following cell orchestrates the entire symbolic regression process:

  1. loads configuration from a YAML file;

  2. generates training and test datasets;

  3. builds the GP primitive set;

  4. initializes the symbolic regressor;

  5. trains the model;

  6. evaluates performance on training and test sets.

[8]:
regressor_params, config_file_data = load_config_data("simple_sr.yaml")

scaleXy = config_file_data["gp"]["scaleXy"]

# generate training and test datasets
(
    X_train_scaled,
    y_train_scaled,
    X_test_scaled,
    y_test,
    _,
    scaler_y,
    num_variables,
    _,
) = generate_dataset("1096_FacultySalaries", scaleXy=scaleXy, random_state=29802)

if num_variables == 1:
    pset = gp.PrimitiveSetTyped("Main", [float], float)
    pset.renameArguments(ARG0="x")
elif num_variables == 2:
    pset = gp.PrimitiveSetTyped("Main", [float, float], float)
    pset.renameArguments(ARG0="x")
    pset.renameArguments(ARG1="y")
else:
    pset = gp.PrimitiveSetTyped("Main", [float] * num_variables, float)

pset = add_primitives_to_pset_from_dict(pset, config_file_data["gp"]["primitives"])

batch_size = config_file_data["gp"]["batch_size"]
if config_file_data["gp"]["use_constants"]:
    pset.addTerminal(object, float, "c")

callback_func = assign_attributes
fitness_scale = 1.0

penalty = config_file_data["gp"]["penalty"]
common_params = {"penalty": penalty, "fitness_scale": fitness_scale}

gpsr = gps.GPSymbolicRegressor(
    pset_config=pset,
    fitness=compute_attributes,
    predict_func=predict,
    score_func=compute_MSEs,
    common_data=common_params,
    callback_func=callback_func,
    print_log=True,
    num_best_inds_str=1,
    save_best_individual=False,
    output_path="./",
    seed_str=None,
    batch_size=batch_size,
    num_cpus=num_cpus,
    remove_init_duplicates=True,
    **regressor_params,
)

tic = time.time()
gpsr.fit(X_train_scaled, y_train_scaled)
toc = time.time()

best = gpsr.get_best_individuals(n_ind=1)[0]

if hasattr(best, "consts"):
    print("Best parameters = ", best.consts)

print("Elapsed time = ", toc - tic)
individuals_per_sec = (
    (gpsr.get_last_gen() + 1)
    * gpsr.num_individuals
    * gpsr.num_islands
    / (toc - tic)
)
print("Individuals per sec = ", individuals_per_sec)

u_best = gpsr.predict(X_test_scaled)

# de-scale outputs before computing errors
if scaleXy:
    u_best = scaler_y.inverse_transform(u_best.reshape(-1, 1)).flatten()

MSE = np.mean((u_best - y_test) ** 2)
r2_test = r2_score(y_test, u_best)
print("MSE on the test set = ", MSE)
print("R^2 on the test set = ", r2_test)

pred_train = gpsr.predict(X_train_scaled)

if scaleXy:
    pred_train = scaler_y.inverse_transform(pred_train.reshape(-1, 1)).flatten()
    y_train_scaled = scaler_y.inverse_transform(
        y_train_scaled.reshape(-1, 1)
    ).flatten()

MSE = np.mean((pred_train - y_train_scaled) ** 2)
r2_train = r2_score(y_train_scaled, pred_train)
print("MSE on the training set = ", MSE)
print("R^2 on the training set = ", r2_train)

# ray is explicitly shut down at the end of the execution to release computational resources
ray.shutdown()
2026-02-25 11:18:31,563 INFO worker.py:1998 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 
Generating initial population(s)...
 Removing duplicates from initial population(s)...
 DONE.
DONE.
Evaluating initial population(s)...
DONE.
 -= START OF EVOLUTION =-
                           fitness                           size
                ------------------------------  ------------------------------
gen     evals   min     avg     max     std     min     avg     max     std
1       2000    0.1103  0.6129  1.1296  0.3316  2       8.0595  21      3.8143
Best individuals of this generation:
sub(mul(c, ARG1), mul(c, add(ARG2, ARG1)))
2       2000    0.1103  0.3508  0.7454  0.1588  2       8.302   24      4.1524
Best individuals of this generation:
sub(mul(c, ARG1), mul(c, add(ARG2, ARG1)))
3       2000    0.1103  0.247   0.3748  0.075   2       7.982   24      4.2672
Best individuals of this generation:
sub(mul(c, ARG1), mul(c, sub(ARG2, ARG1)))
4       2000    0.1061  0.2048  0.2911  0.0589  2       8.306   26      4.397
Best individuals of this generation:
aq(ARG1, mul(sub(ARG2, ARG1), sub(add(ARG2, ARG3), c)))
5       2000    0.1061  0.1666  0.2527  0.0349  3       9.182   30      4.6258
Best individuals of this generation:
aq(ARG1, mul(sub(ARG2, ARG1), sub(add(ARG2, ARG3), c)))
6       2000    0.1061  0.1456  0.1732  0.0135  3       8.649   25      4.5359
Best individuals of this generation:
aq(ARG1, mul(sub(ARG2, ARG1), sub(add(ARG2, ARG3), c)))
7       2000    0.1061  0.1388  0.1532  0.0085  3       8.7245  25      4.6155
Best individuals of this generation:
aq(ARG1, mul(sub(ARG2, ARG1), sub(add(ARG2, ARG3), c)))
8       2000    0.1025  0.135   0.1453  0.0064  3       8.984   26      4.6724
Best individuals of this generation:
aq(sub(mul(c, ARG1), sin(ARG2)), sub(mul(c, ARG2), mul(c, ARG1)))
9       2000    0.1025  0.1326  0.14    0.0052  3       8.775   26      4.217
Best individuals of this generation:
aq(sub(mul(c, ARG1), sin(ARG2)), sub(mul(c, ARG2), mul(c, ARG1)))
10      2000    0.1025  0.1309  0.1368  0.0046  3       7.96    26      4.1211
Best individuals of this generation:
aq(sub(mul(c, ARG1), sin(ARG2)), sub(mul(c, ARG2), mul(c, ARG1)))
11      2000    0.1025  0.1296  0.135   0.0044  3       7.452   26      4.1453
Best individuals of this generation:
aq(sub(mul(c, ARG1), sin(ARG2)), sub(mul(c, ARG2), mul(c, ARG1)))
12      2000    0.1025  0.1287  0.1332  0.0043  3       6.97    25      4.1004
Best individuals of this generation:
aq(sub(mul(c, ARG1), sin(ARG2)), sub(mul(c, ARG2), mul(c, ARG1)))
13      2000    0.1025  0.1277  0.1321  0.0043  3       6.9555  28      4.2998
Best individuals of this generation:
aq(sub(mul(c, ARG1), sin(ARG2)), sub(mul(c, ARG2), mul(c, ARG1)))
14      2000    0.1025  0.127   0.1311  0.0044  3       7.129   28      4.4758
Best individuals of this generation:
aq(sub(mul(c, ARG1), sin(ARG2)), sub(mul(c, ARG2), mul(c, ARG1)))
15      2000    0.1025  0.1262  0.131   0.0044  3       7.443   28      4.7715
Best individuals of this generation:
aq(sub(mul(c, ARG1), sin(ARG2)), sub(mul(c, ARG2), mul(c, ARG1)))
16      2000    0.1025  0.1254  0.13    0.0045  3       7.3915  28      4.9913
Best individuals of this generation:
aq(sub(mul(c, ARG1), sin(ARG2)), sub(mul(c, ARG2), mul(c, ARG1)))
17      2000    0.1025  0.1247  0.129   0.0046  3       7.422   29      5.281
Best individuals of this generation:
aq(sub(mul(c, ARG1), sin(ARG2)), sub(mul(c, ARG2), mul(c, ARG1)))
18      2000    0.1025  0.1238  0.1284  0.0047  3       8.152   29      5.586
Best individuals of this generation:
aq(sub(mul(c, ARG1), sin(ARG2)), sub(mul(c, ARG2), mul(c, ARG1)))
19      2000    0.1025  0.123   0.1274  0.0049  3       8.9855  29      5.7858
Best individuals of this generation:
aq(sub(mul(c, ARG1), sin(ARG2)), sub(mul(c, ARG2), mul(c, ARG1)))
20      2000    0.1025  0.1222  0.127   0.0049  3       10.028  29      5.8074
Best individuals of this generation:
aq(sub(mul(c, ARG1), sin(ARG2)), sub(mul(c, ARG2), mul(c, ARG1)))
 -= END OF EVOLUTION =-
The best individual is aq(sub(mul(c, ARG1), sin(ARG2)), sub(mul(c, ARG2), mul(c, ARG1)))
The best fitness on the training set is 0.1025
Best parameters =  [1.9244978  2.9227811  2.16785583]
Elapsed time =  89.945805311203
Individuals per sec =  466.9478454796689
MSE on the test set =  1.535581223536389
R^2 on the test set =  0.796316488032204
MSE on the training set =  2.2604829120253043
R^2 on the training set =  0.9115115245302139
[9]:
# get the sympy version of the best individual
best_expression_sympy_str = str(gpsr.get_best_individual_sympy())
best_expression_sympy_str
[9]:
'(1.9244978027630006*ARG1 - sin(ARG2))/sqrt((-2.1678558285490226*ARG1 + 2.922781097169807*ARG2)**2 + 1)'

Plots

[10]:
fitness = gpsr.get_train_fit_history()
generations = gpsr.generations
plt.plot(np.arange(1, generations+1), fitness)
plt.xlabel("Generations")
plt.ylabel("Fitness")
plt.xticks([1, 5, 10, 15, 20])
plt.grid(True, linestyle=":", linewidth=0.5)
plt.show()
../_images/tutorials_simple_sr_22_0.png
[11]:
y_min = min(np.min(y_train_scaled), np.min(y_test))
y_max = max(np.max(y_train_scaled), np.max(y_test))

plt.scatter(y_train_scaled, pred_train, s=5, label="Training")
plt.scatter(y_test, u_best, s=5, label="Test")
plt.plot([y_min, y_max], [y_min, y_max], "k--", linewidth=1)
plt.grid(True, linestyle=":", linewidth=0.5)
plt.xlabel("Targets")
plt.ylabel("Predictions")
plt.legend()
plt.show()
../_images/tutorials_simple_sr_23_0.png