"""
Optimisation using CMAES with Novelty Detection
Requires `pip install cmaes`, see:
https://github.com/CyberAgentAILab/cmaes
BSMArt scan to be describe in an upcoming publication.
This scan uses cmaes to optimise a log-likelihood constructed from selected observables.
All desired observables must use the scaling CFUNCTION with a range of values for which the observable is in an acceptable range (other observables should have "SCALING": "OFF" or at least no MEAN/VARIANCE/RANGE set so that they will be ignored.)
It is recommended to use this also with a validity condition, so that no further codes are evaluated when the observables are outside of this range, and the code chain can focus on first obtaining acceptable values for that/those observables.
For example, here we tell BSMArt to run SPheno and check the Higgs mass lies within 3 GeV of its observed value. We also record the lightest neutralino and selectron masses, construct a derived observable based on the difference of their masses, and insist that it is greater than zero:
.. code-block:: json
"Codes": {
"SPheno": {
"Command": "< >",
"InputFile": "< >",
"OutputFile": "< >",
"Observables":
{
"mh": {
"SLHA": [
"MASS",
[25]
],
"SCALING": "CFUNCTION",
"MEAN": 125.09,
"VARIANCE": 3,
"VALID": [122.09,128.09]
},
"mchi1": {
"SLHA" : [ "MASS", [1000022]], "SCALING": "OFF"]
},
"mse1": {
"SLHA" : [ "MASS", [1000011]], "SCALING": "OFF"]
},
"dmse1mchi1" : {
"FUNCTION": "mse1-mchi1",
"SCALING": "CFUNCTION",
"RANGE": [0,100000]
}
},
"Conditions": ["mse1>mchi1"],
"Run": "True"
}
"""
__meta__ = {
"name": "CMAES_ND",
"requires": ["cmaes", "pyod", "sklearn", "traceback", "pandas"],
"settings": {
"CMAESPopulationSize": "Number of points to run per batch (optional, auto-calculated if not set)",
"CMAESMean": "Initial mean for CMA-ES optimizer (optional, random if not set)",
"CMAESSigma": "Initial step size for CMA-ES (optional, default: 1/sqrt(n_variables))",
"CMAESMaxGenerations": "Number of batches to run (default: 100)",
"CMAES MaxLoss": "Maximum loss value override (optional)",
"CMAESPathSeeds": "Path to seed file (.csv or .parquet) for initial mean (optional)",
"CMAESNoveltyDetection": "Enable novelty detection using HBOS (default: True)",
"CMAESHBOSNBins": "Number of bins for HBOS novelty detection (default: 100)",
"CMAESFocus": "Dict with Variables and Observables lists for novelty detection focus (optional)",
"CMAESHierarchy": "List of hierarchical observables to focus on first (optional)",
"CMAESNoGoodPointsPatience": "Number of generations without good points before stopping (default: 5000)",
"CMAESGoodPointsEarlyStop": "Target number of good points for early stopping (default: 5000)",
"CMAESBestLossPatience": "Patience for best loss counter (default: 200)",
"CMAESBestNValidConstraintsPatience": "Patience for best n valid constraints counter (default: 2000)",
"Initial Sample": "Path to initial sample file for warm start (optional)",
},
}
import math
import os
import shutil
import sys
import traceback
from collections import OrderedDict
from bsmart import BSMutil
import cmaes
from bsmart import debug
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
# from BSMlikelihood import MakeLikelihoods, MakeGlobalLikelihood
from bsmart.BSMlikelihood import MakeLikelihoods, safe_float
from cmaes import CMA
from bsmart.core import Scan as Scan
from pyod.models.hbos import HBOS
from sklearn.preprocessing import minmax_scale
[docs]
class HBOSPenaliser:
def __init__(self, n_bins=100, *args, **kwargs):
self.n_bins = n_bins
self.hbos = HBOS(n_bins=self.n_bins)
[docs]
def fit(self, X):
self.hbos = self.hbos.fit(X)
return self
[docs]
def get_penalties(self, x):
try:
x = np.float64(x)
_x = np.nan_to_num(x, -np.inf)
scores = self.hbos.predict_proba(_x)
except Exception as e:
print("-" * 60)
traceback.print_exc(file=sys.stdout)
print("-" * 60)
print(x)
print(_x)
exit(1)
return scores[:, 0]
[docs]
class Optimiser:
def __init__(self, direction="min", *args, **kwargs):
self.direction = direction
self.args = args
self.kwargs = kwargs
def _sigmoid(self, x):
return 1 / (1 + np.exp(-x))
[docs]
def fit(self, X):
return self
[docs]
def get_penalties(self, X):
factor = 1 if self.direction == "min" else -1
_X = minmax_scale(X, feature_range=(-1, 1))
return self._sigmoid(factor * _X).mean(axis=1)
PENALTIES = {"HBOS": HBOSPenaliser, "Optimiser": Optimiser}
[docs]
class NewScan(Scan):
"""Scanner class for CMAES-ND Scans"""
[docs]
def initialise(self): # overload initial settings.
self.runsettings.store_points_in_memory = (
True # Force storage of info for hierarchical loss computation
)
self.runsettings.store_invalid_points = True # Need this to keep points where
self.runsettings.invalid_return_value = 0
self.citations = """@article{deSouza:2025uxb,
author = "de Souza, Fernando Abreu and Castro, Nuno Filipe and Crispim Rom{\\~a}o, Miguel and Porod, Werner",
title = "{Exploring scotogenic parameter spaces and mapping uncharted dark matter phenomenology with multi-objective search algorithms}",
eprint = "2505.08862",
archivePrefix = "arXiv",
primaryClass = "hep-ph",
reportNumber = "IPPP/25/29",
doi = "10.1007/JHEP10(2025)116",
journal = "JHEP",
volume = "10",
pages = "116",
year = "2025"
}
"""
def __init__(self, inputs, log):
Scan.__init__(self, inputs, log)
self.downscalers, self.upscalers = BSMutil.create_scalers(self.inputs, log)
self.n_variables = len(self.upscalers)
self.use_loglike = eval(self.inputs["Setup"].get("LogLike", "True"))
self.Points = self.inputs["Setup"].get("Points", 10000)
self.Variables = self.inputs["Variables"]
self.maxloss = np.log(1 + np.finfo(np.float64).max) + 1
if "CMAES MaxLoss" in self.inputs["Setup"]:
if self.inputs["Setup"].get("CMAES MaxLoss") is not None:
self.maxloss = float(self.inputs["Setup"].get("CMAES MaxLoss"))
# Create the likelihood functions based on 'scaling' property of variables, if they have it; otherwise use gaussian likelihood
# self.get_likelihood,self.observable_masks=MakeGlobalLikelihood(self.inputs['Observables'],self.use_loglike)
# self.likelihood_fns,self.observable_masks=MakeLikelihoods(self.inputs['Observables'],self.use_loglike)
self.likelihood_fns, self.observable_masks = MakeLikelihoods(
self.inputs["Observables"], loglike=True
)
self.Observables = list(self.inputs["Observables"].keys())
self.observable_indices = {obs: idx for idx, obs in enumerate(self.Observables)}
self.Constraints = [
obs for obs, mask in zip(self.Observables, self.observable_masks) if mask
]
# Build constraint_likelihoods by matching observables to likelihood functions
# Note: MakeLikelihoods only creates likelihood functions for non-OFF observables,
# so we need to match by iterating through observables in the same order
likelihood_fn_iter = iter(self.likelihood_fns)
self.constraint_likelihoods = []
for obs, mask in zip(self.Observables, self.observable_masks):
if mask:
# This observable should have a likelihood function
try:
self.constraint_likelihoods.append(next(likelihood_fn_iter))
except StopIteration:
self.log.error(f"Missing likelihood function for constraint: {obs}")
raise ValueError(
f"Missing likelihood function for constraint: {obs}. "
f"This suggests a mismatch between observable masks and likelihood functions."
)
## Debugging: Log constraint analysis
self.log.info("=" * 80)
self.log.info("CMAES-ND: Constraint Analysis")
self.log.info("=" * 80)
self.log.info(f"Total observables found: {len(self.Observables)}")
self.log.info("")
self.log.info("Observable details:")
for obs, mask in zip(self.Observables, self.observable_masks):
scaling = self.inputs["Observables"][obs].get("SCALING", "NOT SET")
mask_status = "ACTIVE (mask=True)" if mask else "INACTIVE (mask=False)"
self.log.info(f" - {obs}: SCALING={scaling}, {mask_status}")
self.log.info("")
self.log.info(f"Active constraints (mask=True): {len(self.Constraints)}")
self.log.info(f"Constraint names: {self.Constraints}")
self.log.info(
f"constraint_likelihoods count: {len(self.constraint_likelihoods)}"
)
if len(self.Constraints) != len(self.constraint_likelihoods):
self.log.error(
f"ERROR: Length mismatch! Constraints={len(self.Constraints)}, "
f"constraint_likelihoods={len(self.constraint_likelihoods)}"
)
self.log.info("")
self.log.info("Constraint-to-likelihood mapping:")
for i, (obs_name, likelihood_fn) in enumerate(
zip(self.Constraints, self.constraint_likelihoods)
):
self.log.info(
f" [{i}] {obs_name} -> likelihood_fn type: {type(likelihood_fn).__name__}"
)
self.log.info("=" * 80)
self.log.info("")
## Check that the observables used as constraints are defined with the CFUNCTION method.
obsok = True
for obs in self.Constraints:
if self.inputs["Observables"][obs]["SCALING"] != "CFUNCTION":
self.log.error(
"Observable "
+ obs
+ " should be defined with the CFUNCTION scaling method. Cannot proceed with the scan!"
)
if not obsok:
raise NameError("Incorrectly defined observables")
self.population_size = None
if "CMAESPopulationSize" in self.inputs["Setup"]:
if self.inputs["Setup"].get("CMAESPopulationSize") is not None:
self.population_size = int(
self.inputs["Setup"].get("CMAESPopulationSize")
)
self.mean = np.random.rand(self.n_variables)
if "CMAESMean" in self.inputs["Setup"]:
if self.inputs["Setup"].get("CMAESMean") is not None:
self.mean = float(self.inputs["Setup"].get("CMAESMean")) * np.ones(
self.n_variables
)
self.sigma = 1.0 / math.sqrt(self.n_variables)
if "CMAESSigma" in self.inputs["Setup"]:
if self.inputs["Setup"].get("CMAESSigma") is not None:
self.sigma = float(self.inputs["Setup"].get("CMAESSigma"))
if "CMAESPathSeeds" in self.inputs["Setup"]:
path_seeds = self.inputs["Setup"].get("CMAESPathSeeds")
try:
if ".csv" in path_seeds:
centroid_seeds = pd.read_csv(path_seeds)
else:
centroid_seeds = pd.read_parquet(path_seeds)
print(f"Using seeds in {path_seeds}")
except:
raise Exception("Seeds file for CMAES must be .csv or .parquet!")
# Extract values in original space from the seed file
seed_values_original = np.array(
centroid_seeds.sample(n=1)[self.inputs["Variables"].keys()]
.iloc[0]
.to_list()
)
# Apply downscalers to convert from original space to normalized [0,1] space for CMA-ES
self.mean = np.array(
[f(x) for x, f in zip(seed_values_original, self.downscalers)]
)
self.novelty_detection = True
if "CMAESNoveltyDetection" in self.inputs["Setup"]:
self.novelty_detection = eval(
str(self.inputs["Setup"]["CMAESNoveltyDetection"])
)
if self.novelty_detection:
# self.hbos_n_bins = int(self.inputs["Setup"].get("CMAESHBOSNBins", 100))
# self.nd_estimator = HBOSPenaliser(n_bins=self.hbos_n_bins)
self.nd_estimator = PENALTIES[
self.inputs["Setup"].get("CMAESNoveltyDetection", "HBOS")
](**self.inputs["Setup"].get("CMAESNoveltyDetectionParams", {}))
self.log.info(f"Using novelty detection estimator: {self.nd_estimator}")
if "CMAESFocus" in self.inputs["Setup"]:
self.focus = self.inputs["Setup"]["CMAESFocus"]
else:
self.focus = {
"Variables": list(self.Variables.keys()),
"Observables": list(self.Observables),
}
if "CMAESMaxGenerations" in self.inputs["Setup"]:
self.max_n_generations = self.inputs["Setup"]["CMAESMaxGenerations"]
else:
self.max_n_generations = 100
if (
"CMAESHierarchy" in self.inputs["Setup"]
and len(self.inputs["Setup"]["CMAESHierarchy"]) > 0
):
self.use_hierarchy = True
self.hierarchy_list = self.inputs["Setup"]["CMAESHierarchy"]
invalid_hierarchy = [
obs for obs in self.hierarchy_list if obs not in self.Constraints
]
if len(invalid_hierarchy) > 0:
raise NameError(
"Hierarchy observables must be constrained observables. "
f"Invalid entries: {invalid_hierarchy}"
)
self.hierarchy_indices = sorted(
[self.Constraints.index(obs) for obs in self.hierarchy_list]
)
else:
self.use_hierarchy = False
self.resultsdir = os.path.join(
self.inputs["Setup"]["cwd"], self.inputs["Setup"]["RunName"], "Results"
)
self.results_csv = os.path.join(self.resultsdir, "Results.csv")
self.resume = False
if os.path.exists(self.resultsdir) and not self.resume:
shutil.rmtree(self.resultsdir)
if not os.path.exists(self.resultsdir):
os.makedirs(self.resultsdir)
# Create the likelihood functions based on 'scaling' property of variables, if they have it; otherwise use gaussian likelihood
# self.get_likelihood,self.observable_masks=MakeGlobalLikelihood(self.inputs['Observables'],self.use_loglike)
if "Initial Sample" in self.inputs["Setup"]:
from bsmart.ml.seed_points import get_previous_sample
seed_inputs, _ = get_previous_sample(
sample_file=self.inputs["Setup"]["Initial Sample"],
inputs=self.inputs,
nsamples=self.population_size,
returntype="NLL",
)
"""
seed_inputs,nlls = get_previous_sample(sample_file=self.inputs['Setup']['Initial Sample'],
inputs=self.inputs,
nsamples = self.population_size,
returntype = "NLL")
print(seed_inputs.shape)
print(nlls.shape)
print(f"Minimum nll: {np.min(nlls)}")
print(np.hstack([seed_inputs, nlls.reshape(-1,1)])) # nlls reports size as (n,), need to change to (n,1)
"""
# now we have to scale the inputs:
# self.seed_inputs = np.array([[f(x) for x,f in zip(point,self.downscalers)] for point in seed_inputs])
# NB want to set the mean to be the best point, which is the first one in the list
self.mean = np.array(
[f(x) for x, f in zip(seed_inputs[0], self.downscalers)]
) # self.seed_inputs[0]
# self.sigma=0.01 # less variation
# self.seed_inputs = None
self.seed_inputs = np.array(
[
[f(x) for x, f in zip(point, self.downscalers)]
for point in seed_inputs
]
)
else:
self.seed_inputs = None
# self.all_valid=[]
# self.all_invalid=[]
[docs]
def postprocess(self, Point, observables, data_point, temp_dir, log, lock=None):
# Codes have run successfully; return 1. The actual loss is computed elsewhere, taking into account
return 1.0 # -math.log(self.get_likelihood(observables))
[docs]
def get_losses(self, observables):
"""Returns a list of losses.
The C-function loss should be zero if the observable is within allowed bounds, and greater than zero outside it.
If we include a validity condition, then other observables, evaluated afterwards, will be returned as NaN.
-> We should assign these the maximum loss ~ 710.
This is then compatible with the explicit use of 'hierarchical observables', where all other observables are set to the maximum.
"""
losses = []
for obs_name, likelihood in zip(self.Constraints, self.constraint_likelihoods):
obs_idx = self.observable_indices[obs_name]
val = safe_float(observables[obs_idx])
if math.isnan(val):
losses.append(self.maxloss)
else:
losses.append((-1.0) * likelihood(val))
return losses
"""
Equivalent to
losses=[]
for v, mask in zip(observables, self.observable_masks):
if mask:
f=next(likeit)
val = safe_float(v)
if not math.isnan(val):
losses.append(val)
else:
losses.append(self.maxloss)
return losses
"""
[docs]
def run(self):
"""
Algorithm:
Our initial samples are in [0,1] since we use scalers.
Subsequent rounds use the normalizing flow to generate samples
"""
"""
def C_function(obs, obs_lower, obs_upper):
C_max = np.log(1 + np.finfo(np.float64).max) + 1
C_value = C_max
if isinstance(obs, str):
try:
obs = float(obs)
except:
raise Exception(f"Observable value {obs} not a string or float!")
if isinstance(obs, float):
C_value = np.log10(max(0.0, -obs + obs_lower, obs - obs_upper) + 1)
return C_value
"""
self.log.info("Starting CMA-ES scan!")
# print(f"Do we store outputs? {self.RunManager.settings.store_outputs}")
# CMA-ES bounded between [0,1]
bounds = np.zeros((self.n_variables, 2))
bounds[:, 1] = np.ones(self.n_variables)
"""
if self.seed_inputs is not None: # warm start
cube_batch = self.seed_inputs
batch = [[f(y) for y,f in zip(x,self.upscalers)] for x in self.seed_inputs]
results=self.RunManager.run_batch(batch)
#print(self.RunManager.all_batch_points)
#full_losses = [(x,value) if isinstance(value, (int, float)) else (x,1e100) for x,value in zip(cube_batch,results)]
batch_observables=[point['observables'] for point in self.RunManager.all_batch_points]
keep_observables=[[obs for obs,mask in zip(point_obs,self.observable_masks) if mask] for point_obs in batch_observables]
#print(f"Got observables: {batch_observables}")
losses_raw = np.array([self.get_losses(obs) for obs in batch_observables])
else:
initial_cov = None
"""
initial_cov = None
optimizer = CMA(
bounds=bounds,
mean=self.mean,
sigma=self.sigma,
population_size=self.population_size,
cov=initial_cov,
)
gen_idx = 0
no_good_points_counter = 0
no_good_points_counter_patience = int(
self.inputs["Setup"].get("CMAESNoGoodPointsPatience", 5000)
)
n_good_points_early_stop = int(
self.inputs["Setup"].get("CMAESGoodPointsEarlyStop", 5000)
)
best_loss = np.inf
best_loss_counter = 0
best_loss_patience = int(self.inputs["Setup"].get("CMAESBestLossPatience", 200))
best_n_valid_constraints = 0.0
best_n_valid_constraints_counter = 0
best_n_valid_constraints_patience = int(
self.inputs["Setup"].get("CMAESBestNValidConstraintsPatience", 2000)
)
good_points_counter = 0
all_good_points = pd.DataFrame()
for generation in range(self.max_n_generations):
batch = []
cube_batch = []
self.RunManager.all_points = [] # don't need to store these overall
self.RunManager.valid_points = []
if generation == 0 and self.seed_inputs is not None:
cube_batch = self.seed_inputs
batch = [[f(y) for y, f in zip(x, self.upscalers)] for x in cube_batch]
else:
for _ in range(optimizer.population_size):
x = optimizer.ask()
cube_batch.append(x)
# batch.append([f(1.0/(1+math.exp(-y))) for y,f in zip(x,self.upscalers)])
batch.append([f(y) for y, f in zip(x, self.upscalers)])
results = self.RunManager.run_batch(batch)
# print(self.RunManager.all_batch_points)
# full_losses = [(x,value) if isinstance(value, (int, float)) else (x,1e100) for x,value in zip(cube_batch,results)]
batch_observables = [
point["observables"] for point in self.RunManager.all_batch_points
]
# print(f"Got observables: {batch_observables}")
losses_raw = np.array([self.get_losses(obs) for obs in batch_observables])
"""
#Ordering of points in all_batch_points was not the same as for the inputs!!
for point,inpoint in zip(self.RunManager.all_batch_points,batch):
print(f" {list(map(float,point['inputs']))},{list(map(float,inpoint))}")
"""
# losses_raw = np.array(self.get_losses(point['observables']) for point in self.RunManager.all_batch_points)
"""
losses_raw = np.array([[C_function(obs_val, self.Observables[obs]["MEAN"] - self.Observables[obs]["VARIANCE"], self.Observables[obs] ["MEAN"] + self.Observables[obs]["VARIANCE"]) for obs_val, obs in zip(res, self.Observables)] for res in results])
"""
# self.log.info(f"Losses shape: {losses_raw.shape}")
# print(f"Observables: {self.Observables}")
# self.log.info(f"relevant obs: {batch_observables}")
# self.log.info(f"Losses: {losses_raw}")
"""
The C-function loss should be zero if the observable is within allowed bounds, and greater than zero outside it.
If we include a validity condition, then other observables, evaluated afterwards, will be returned as NaN.
-> We should assign these the maximum loss ~ 710.
This is then compatible with the explicit use of 'hierarchical observables', where all other observables are set to the maximum.
In other words, if we use validity conditions then we probably don't need to define hierarchical observables at all.
"""
if self.use_hierarchy: # focus first on "hierarchical observables"
h_losses = np.ones_like(losses_raw) * (
np.log(1 + np.finfo(np.float64).max) + 1
)
for idx, loss in enumerate(losses_raw):
print(idx, loss)
if np.all(loss[self.hierarchy_indices] == 0):
h_losses[idx] = losses_raw[idx]
else:
h_losses[idx, self.hierarchy_indices] = loss[
self.hierarchy_indices
]
losses = h_losses.copy()
else:
losses = losses_raw.copy()
# print(f"Losses: {losses}")
fitnesses_raw = np.sum(losses, axis=1)
losses_scaled = minmax_scale(losses, axis=0)
fitnesses = np.sum(losses_scaled, axis=1)
# Build arrays for DataFrame
cube_arr = np.array(cube_batch)
batch_arr = np.array(batch)
obs_arr = np.array(batch_observables)
losses_arr = np.array(losses)
total_loss_arr = np.array(fitnesses_raw).reshape(-1, 1)
# Ensure all arrays are 2D
if cube_arr.ndim == 1:
cube_arr = cube_arr.reshape(-1, 1)
if batch_arr.ndim == 1:
batch_arr = batch_arr.reshape(-1, 1)
if obs_arr.ndim == 1:
obs_arr = obs_arr.reshape(-1, 1)
if losses_arr.ndim == 1:
losses_arr = losses_arr.reshape(-1, 1)
# Verify array shapes match expected dimensions
n_points = len(batch)
n_vars = len(self.Variables)
n_obs = len(self.Observables)
n_constraints = len(self.Constraints)
assert cube_arr.shape == (
n_points,
n_vars,
), f"cube_arr shape {cube_arr.shape} != expected ({n_points}, {n_vars})"
assert batch_arr.shape == (
n_points,
n_vars,
), f"batch_arr shape {batch_arr.shape} != expected ({n_points}, {n_vars})"
assert obs_arr.shape == (
n_points,
n_obs,
), f"obs_arr shape {obs_arr.shape} != expected ({n_points}, {n_obs})"
assert losses_arr.shape == (
n_points,
n_constraints,
), f"losses_arr shape {losses_arr.shape} != expected ({n_points}, {n_constraints})"
# Build column names
box_var_cols = ["box_" + v for v in self.Variables.keys()]
var_cols = list(self.Variables.keys())
obs_cols = list(self.Observables)
constraint_cols = ["C" + c for c in self.Constraints]
all_columns = (
box_var_cols + var_cols + obs_cols + constraint_cols + ["total_loss"]
)
all_points = pd.DataFrame(
np.hstack([cube_arr, batch_arr, obs_arr, losses_arr, total_loss_arr]),
columns=all_columns,
)
# all_points = pd.to_numeric(all_points,errors="coerce")
good_points = all_points.query("total_loss == 0")
# print(all_points['fitness'].dtype)
# print(f"These are good: {all_points[all_points['fitness']< 1.0]}")
penalties = np.ones_like(fitnesses)
if len(good_points) > 0:
self.log.info(f"Found good points in this batch:\n{good_points}")
if (
(len(all_good_points) > 1)
and (len(good_points) > 1)
and (self.novelty_detection)
):
self.log.info("///////////////////////////////////////////////////////")
self.log.info(all_good_points)
self.log.info(self.nd_estimator)
losses_scaled = minmax_scale(losses, axis=0)
fitnesses = np.sum(losses_scaled, axis=1)
# For variables, use box space (normalized [0,1]); for observables, use actual values
box_var_columns = ["box_" + v for v in self.focus.get("Variables", [])]
obs_columns = self.focus.get("Observables", [])
penalty_columns = box_var_columns + obs_columns
penalty_estimator = self.nd_estimator.fit(
X=all_good_points[penalty_columns].values.reshape(
-1, len(penalty_columns)
)
)
self.log.info(good_points)
penalties = np.zeros_like(fitnesses)
penalties = penalty_estimator.get_penalties(
all_points[penalty_columns].values.reshape(-1, len(penalty_columns))
)
##
# else:
# # Don't do novelty detection yet since we are still trying to find a minimum
# self.log.info(
# f"{len(good_points)} good points found; {len(all_good_points)} found previously"
# )
# penalties = np.ones_like(fitnesses)
# # self.log.info(f"\n{all_points}")
# Mask penalties: 1 for invalid points, ND penalty for valid points
penalties = np.where(np.all(np.array(losses) == 0, axis=1), penalties, 1.0)
fitnesses = fitnesses + penalties
# fitnesses = np.where(
# np.all(np.array(losses) == 0, axis=1), penalties, fitnesses + 1
# )
# all_points["total_loss"] = np.sum(losses, axis=1)
all_points["ratio_n_valid_constraints"] = np.where(
losses == 0.0, 1, 0
).mean(axis=1)
all_points["good_point"] = (all_points["total_loss"] == 0.0).astype(int)
all_points["fitness"] = fitnesses - penalties
all_points["penalty"] = penalties
# self.log.info(f"\n{all_points}")
# Select only the columns we want to display
display_cols = (
self.Constraints # constrained observables
+ constraint_cols # their losses
+ [
"total_loss",
"ratio_n_valid_constraints",
"good_point",
"fitness",
"penalty",
]
)
self.log.info(f"\n{all_points[display_cols]}")
if not os.path.exists(self.results_csv):
all_points.to_csv(self.results_csv)
else:
all_points.to_csv(self.results_csv, mode="a", header=False)
# solutions = [(pop,value) for pop,value in zip(cube_batch, fitnesses_raw)]
solutions = [(pop, value) for pop, value in zip(cube_batch, fitnesses)]
# all_good_points = pd.concat([all_good_points, good_points])
optimizer.tell(solutions)
gen_best_loss = losses.sum(axis=1).min()
if gen_best_loss < best_loss:
best_loss = gen_best_loss
best_loss_counter = 0
else:
best_loss_counter += 1
gen_best_n_valid_constraints = all_points["ratio_n_valid_constraints"].max()
if gen_best_n_valid_constraints > best_n_valid_constraints:
best_n_valid_constraints = gen_best_n_valid_constraints
best_n_valid_constraints_counter = 0
else:
best_n_valid_constraints_counter += 1
good_points_counter += all_points["good_point"].sum()
# good_points_found += results["good_point"].sum()
if all_points["good_point"].sum() > 0:
all_good_points = pd.concat(
[all_good_points, all_points[all_points.good_point == 1]],
ignore_index=True,
)
no_good_points_counter = 0
else:
no_good_points_counter += 1
# =====================================================================================
gen_idx += 1
if good_points_counter >= n_good_points_early_stop:
print(
f"Stopped! Max number of {n_good_points_early_stop} good points reached."
)
break # Always break when target number of good results have been reached
if gen_idx >= self.max_n_generations:
print(f"Stopped! Max number of {gen_idx} generations reached.")
break # Always use N_GEN as maximum budget
if any(
[
optimizer.should_stop(),
all(
[
best_loss_counter >= best_loss_patience,
best_n_valid_constraints_counter
>= best_n_valid_constraints_patience,
]
),
]
):
if no_good_points_counter < no_good_points_counter_patience:
continue # Still finding good results
else:
print(
f"Stopped! Patience of {no_good_points_counter_patience} generations with no good points reached"
)
break # Else, break as we're not restarting