"""
MLScanner RFC method
--------------------
MLScanner method `MLS_RFC` based on code from
* `<https://github.com/AHamamd150/MLscanner>`_
* `<https://arxiv.org/abs/2207.09959>`_
This scan implements an active learning strategy using a Random Forest Classifier (RFC)
to efficiently find "good" points in a parameter space. A point is considered "good" (Class 1)
if its Negative Log Likelihood (NLL) is below a specified threshold, and "bad" (Class 0) otherwise.
The process is as follows:
1. **Initialization**: The scan begins by evaluating a small set of randomly generated points (`Bootstrap_Points`).
It can also load an initial dataset from a CSV file (`InitCSV`).
2. **Initial Training**: A Random Forest Classifier is trained on this initial dataset.
Points are labeled as 1 (Good) or 0 (Bad) based on the `Threshold_Value`.
3. **Active Learning Loop**: The scan enters a loop to iteratively discover new good points until
a `Target_Points` count is reached. In each iteration:
a. A large number of `Candidate_Points` are randomly generated.
b. The trained RFC model predicts the probability of each candidate being "good".
c. Candidates with the highest probability of being good, plus a small `Random_Fraction`,
are selected for evaluation by the physics code.
d. **Retraining**: The RFC is retrained with the newly discovered points, improving its
ability to separate good regions from bad regions.
4. **Data Collection**: All discovered good points (NLL < Threshold) are returned.
This method is particularly effective for high-dimensional parameter spaces where exhaustive
scanning is computationally prohibitive.
"""
__meta__ = {
"name": "MLS_RFC",
"requires": ["sklearn", "pandas", "numpy"],
"settings": {
"Networks": {
"Iterations": "Number of active learning iterations (default: 10).",
"Candidate_Points": "Number of candidate points to generate and score in each iteration (default: 500).",
"Bootstrap_Points": "Number of initial random points to evaluate (default: 100).",
"Points_Per_Iteration": "Number of candidate points to evaluate in each iteration (default: 300).",
"Threshold_Value": "The threshold for the NLL to classify a point as 'good' (default: 1).",
"Random_Fraction": "Fraction of points per iteration to be selected randomly, for exploration (default: 0.2).",
"Estimators": "Number of trees in the forest (default: 300).",
"Max_Depth": "Maximum depth of the tree (default: 50).",
"Min_Samples_Split": "The minimum number of samples required to split an internal node (default: 2).",
"Min_Samples_Leaf": "The minimum number of samples required to be at a leaf node (default: 1).",
"Verbose": "Verbosity level (default: 0)."
},
"Setup": {
"InitCSV": "Path to an optional CSV file with initial points to seed the scan.",
"Points": "Number of points to generate *in total* before stopping (default: 1000)"
}
}
}
from bsmart.core import Scan as Scan
import os
import sys
import numpy as np
import pandas as pd
import sklearn
from sklearn.ensemble import RandomForestClassifier
import math
from bsmart.BSMlikelihood import MakeLikelihoods, MakeGlobalLikelihood, safe_float
from bsmart import debug
[docs]
def generate_param_points(inputs, num_points):
variables_range = []
for varname in inputs['Variables']:
if 'RANGE' in inputs['Variables'][varname]:
varmin = inputs['Variables'][varname]['RANGE'][0]
varmax = inputs['Variables'][varname]['RANGE'][1]
variables_range.append(np.random.uniform(varmin, varmax, num_points))
return np.array(variables_range).T
[docs]
class NewScan(Scan):
[docs]
def initialise(self):
self.runsettings.store_points_in_memory = True
self.runsettings.invalid_return_value = [0]
if self.runsettings.store_invalid_points:
self.naive = False
self.runsettings.invalid_return_value = [0]
else:
self.naive = True # we treat invalid points as bad
self.runsettings.invalid_return_value = []
self.citations = """@article{Hammad:2022wpq,
author = "Hammad, A. and Park, Myeonghun and Ramos, Raymundo and Saha, Pankaj",
title = "{Exploration of parameter spaces assisted by machine learning}",
eprint = "2207.09959",
archivePrefix = "arXiv",
primaryClass = "hep-ph",
doi = "10.1016/j.cpc.2023.108902",
journal = "Comput. Phys. Commun.",
volume = "293",
pages = "108902",
year = "2023"
}
"""
def __init__(self, inputs, log):
Scan.__init__(self, inputs, log)
print("start scan init")
# Helper to get setting from Networks or Setup
def get_setting(key, default, cast_type=int):
val = self.inputs['Networks'].get(key)
if val is None:
val = self.inputs['Setup'].get(key)
if val is None:
return default
return cast_type(val)
self.iteration = get_setting('Iterations', 10)
self.candidate_points = get_setting('Candidate_Points', 500)
self.bootstrap_points = get_setting('Bootstrap_Points', 100)
self.points_per_iter = get_setting('Points_Per_Iteration', 300)
self.threshold_value = get_setting('Threshold_Value', 1.0)
#self.target_points = get_setting('Target_Points', 20000)
self.function_dim = int(len(self.inputs['Variables']))
self.random_fraction = get_setting('Random_Fraction', 0.2)
self.n_estimators = get_setting('Estimators', 300)
self.max_depth = get_setting('Max_Depth', 50)
self.min_samples_split = get_setting('Min_Samples_Split', 2)
self.min_samples_leaf = get_setting('Min_Samples_Leaf', 1)
self.verbose = get_setting('Verbose', 0)
if "Cores" in self.inputs['Setup']:
self.ncores = int(self.inputs['Setup']['Cores'])
#torch.set_num_threads(self.ncores)
self.log.info('Setting number of cores to %d' % self.ncores)
self.target_points = int(self.inputs['Setup'].get('Points', 10000))
#self.num_points = int(self.inputs['Setup'].get('Points', 10000))
self.n_variables = len(self.inputs['Variables'])
print("make likelihoods")
self.maxloss = np.log(1 + np.finfo(np.float64).max) + 1
self.likelihood_fns, self.observable_masks = MakeLikelihoods(self.inputs["Observables"], loglike=True)
self.primary_observable = 'NLL'
self.InitCSV = self.inputs['Setup'].get('InitCSV')
if self.InitCSV:
if os.path.isfile(self.InitCSV):
self.log.info(f'Will use InitCSV file: {self.InitCSV}')
else:
self.log.error(f'InitCSV file not found: {self.InitCSV}')
self.InitCSV = None
print("finished scan init")
[docs]
def postprocess(self, Point, observables, data_point, temp_dir, log, lock=None):
""" return the likelihood; we won't get this far if the point failed to be generated """
if self.naive:
return np.sum(self.get_losses(observables))
else:
return 1.0
[docs]
def smooth_cap_loss(self, x):
"""
Caps the loss by applying a sigmoid.
This is useful for losses that are unbounded.
"""
return -self.maxloss*np.expm1(x/self.maxloss) # assume x is negative and want to change its sign
[docs]
def get_losses(self, observables):
""" Returns a list of losses. """
likeit = iter(self.likelihood_fns)
return [self.smooth_cap_loss((next(likeit))(val)) if mask and not math.isnan(val := safe_float(v)) else float((next(likeit) and False) or self.maxloss)
for v, mask in zip(observables, self.observable_masks) if mask]
[docs]
def run(self):
RFC = RandomForestClassifier(
n_estimators=self.n_estimators, max_depth=self.max_depth, min_samples_split=self.min_samples_split,
min_samples_leaf=self.min_samples_leaf, n_jobs=self.ncores, random_state=42
)
# 1. Bootstrap
self.RunManager.run_batch(generate_param_points(self.inputs, self.bootstrap_points).tolist())
if not self.RunManager.valid_batch_points: sys.exit("No valid points found in initial batch!")
all_params, all_nll = self.extract_from_valid_points(self.RunManager.valid_batch_points)
# 2. Init CSV
# 2. Init CSV
if self.InitCSV:
self.log.info(f'Loading InitCSV: {self.InitCSV}')
df = pd.read_csv(self.InitCSV).dropna()
csv_params = df[list(self.inputs['Variables'].keys())].values
# Recalculate NLL if missing
if self.primary_observable in df.columns:
all_obs = pd.to_numeric(df[self.primary_observable], errors='coerce').fillna(1e10).values
else:
self.log.info(f" '{self.primary_observable}' not found in InitCSV. Recalculating from Observables...")
obs_keys = list(self.inputs['Observables'].keys())
missing_obs = [k for k in obs_keys if k not in df.columns]
if missing_obs:
self.log.warning(f"InitCSV missing observables: {missing_obs}.")
csv_obs_data = []
for k in obs_keys:
if k in df.columns:
csv_obs_data.append(pd.to_numeric(df[k], errors='coerce').values)
else:
csv_obs_data.append(np.full(len(df), np.nan))
csv_obs_data = np.column_stack(csv_obs_data)
all_obs = np.array([np.sum(self.get_losses(row)) for row in csv_obs_data])
all_params = np.vstack([all_params, csv_params])
all_nll = np.concatenate([all_nll, all_obs])
# 3. Filter Failures (keep clean data for training)
mask = all_nll < 1e9
if not np.any(mask): sys.exit('No valid training data!')
all_params, all_nll = all_params[mask], all_nll[mask]
# 4. Initial Training
labels = (all_nll < self.threshold_value).astype(int)
n_good = np.sum(labels)
initial_good_points = n_good
self.log.info(f"Initial good points: {initial_good_points}. Target new points: {self.target_points}. Stopping at: {initial_good_points + self.target_points}")
if n_good < self.function_dim: sys.exit(f'Not enough good points ({n_good}) to train! Need {self.function_dim}+.')
self.log.info(f'Training: {len(labels)} pts, {n_good} good ({n_good/len(labels)*100:.1f}%)')
RFC.fit(all_params, labels)
# Validation
idx = np.random.choice(len(all_params), min(5, len(all_params)), replace=False)
for p, a in zip(RFC.predict(all_params[idx]), labels[idx]):
self.log.info(f'Pred: {p} vs Act: {a}')
# 5. Active Learning Loop
run_num = 0
while n_good < (self.target_points + initial_good_points):
run_num += 1
# Candidates
cands = generate_param_points(self.inputs, self.candidate_points)
# Predict Probabilities
probs_full = RFC.predict_proba(cands)
# --- Single Class Fix Start ---
# If training data has only one class (e.g. all good), predict_proba returns (N, 1)
if probs_full.shape[1] == 2:
probs = probs_full[:, 1]
else:
# Handle single-class case: check if the single class is 1 (Good) or 0 (Bad)
probs = probs_full[:, 0] if RFC.classes_[0] == 1 else np.zeros(len(cands))
# --- Single Class Fix End ---
# Select best candidates and filter
sorted_idx = np.argsort(probs)[::-1] # Descending order
ml_cands = cands[sorted_idx]
# Filter those predicted as good (prob > 0.5)
# Actually, we can just take the top N even if prob < 0.5 to encourage exploration,
# but strictly speaking we want "good" points.
# Let's use the explicit prediction for filtering to match the logic "ml_cands = cands[RFC.predict(cands) == 1]"
# BUT sorted by probability.
preds = RFC.predict(cands)
ml_cands = cands[preds == 1]
# If we want to sort the *good* candidates by confidence:
if len(ml_cands) > 0:
probs_ml_full = RFC.predict_proba(ml_cands)
if probs_ml_full.shape[1] == 2:
ml_probs = probs_ml_full[:, 1]
else:
ml_probs = probs_ml_full[:, 0] if RFC.classes_[0] == 1 else np.zeros(len(ml_cands))
ml_cands = ml_cands[np.argsort(ml_probs)[::-1]]
# Batch
n_ml = int(self.points_per_iter * (1 - self.random_fraction))
batch = ml_cands[:n_ml]
n_rnd = self.points_per_iter - len(batch)
self.log.info(f"ML predicts {len(batch)} good points; adding {n_rnd} random points")
#n_rnd = int(self.points_per_iter * self.random_fraction)
rnd_pts = generate_param_points(self.inputs, n_rnd)
batch = np.vstack([batch, rnd_pts]) if len(batch) > 0 else rnd_pts
# Execution
self.RunManager.run_batch(batch.tolist())
if not self.RunManager.valid_batch_points: continue
# Update
new_params, new_nll = self.extract_from_valid_points(self.RunManager.valid_batch_points)
all_params = np.vstack([all_params, new_params])
all_nll = np.concatenate([all_nll, new_nll])
# Retrain (filter failures again)
mask = all_nll < 1e9
labels = (all_nll[mask] < self.threshold_value).astype(int)
RFC.fit(all_params[mask], labels)
n_good = np.sum(labels)
new_good = np.sum(new_nll < self.threshold_value)
self.log.info(f'Run {run_num}: +{new_good} good. Total: {n_good}')
self.log.info(f'Target reached: {n_good} points.')
mask = all_nll < self.threshold_value
return all_params[mask], all_nll[mask]