Source code for bsmart.scans.MLScanner.MLS_GBR

"""!
MLScanner GBR (Gradient Boosting Regressor) method
---------------------------------------------------

MLScanner `MLS_GBR` method based on code from:

* `<https://github.com/AHamamd150/MLscanner>`_
* `<https://arxiv.org/abs/2207.09959>`_


This scan implements an active learning strategy using a Gradient Boosting Regressor (GBR)
to efficiently find "good" points in a parameter space. A point is considered "good" if the likelihood is below a specified threshold. This is a generalisation of the original algorithms; in the original package the scan looked for a `primary observable` to compare to a threshold. Since the likelihood can be set as "EXPUSER" for a given observable, the original case can also be accommodated -- but a likelihood is more generally useful. 

The process is as follows:

1.  **Initialization**: The scan begins by evaluating a small set of randomly generated points (`Bootstrap_Points`).
    It can also load an initial dataset from a CSV file (`InitCSV`).

2.  **Initial Training**: A Gradient Boosting Regressor is trained on this initial dataset to predict
    the Negative Log Likelihood (NLL) from the input parameters.

3.  **Active Learning Loop**: The scan enters a loop to iteratively discover new good points until
    a `Target_Points` count is reached. In each iteration:

    a. A large number of `Candidate_Points` are randomly generated.
    b. The trained GBR model predicts the NLL for these candidates.
    c. candidates with the lowest predicted NLL (best quality), plus a small `Random_Fraction`,
       are selected for evaluation by the physics code.
    d. **Retraining**: The GBR is retrained with the newly discovered points, becoming
       progressively better at identifying promising regions (low NLL).

4.  **Data Collection**: All discovered good points (NLL < Threshold) are returned.

This method is particularly effective for high-dimensional parameter spaces where exhaustive
scanning is computationally prohibitive.
"""

__meta__ = {
 "name": "MLS_GBR",
 "requires": ["sklearn", "pandas", "numpy"],
 "settings": {
    "Networks": {
        "Iterations": "Number of active learning iterations (default: 10).",
        "Candidate_Points": "Number of candidate points to generate and score in each iteration (default: 500).",
        "Bootstrap_Points": "Number of initial random points to evaluate (default: 100).",
        "Points_Per_Iteration": "Number of candidate points to evaluate in each iteration (default: 300).",
        "Threshold_Value": "The threshold for the NLL to consider a point 'good' (default: 1).",
        "Random_Fraction": "Fraction of points per iteration to be selected randomly (default: 0.2).",
        "Estimators": "Number of boosting stages to perform (default: 100).",
        "Max_Depth": "Maximum depth of the individual regression estimators (default: 30).",
        "LearningRate": "Learning rate shrinks the contribution of each tree by `learning_rate` (default: 1e-1).",
        "Verbose": "Verbosity level (default: 0)."
    },
    "Setup": {
        "InitCSV": "Path to an optional CSV file with initial points to seed the scan.",
        "Points": "Number of points to generate *in total* before stopping (default: 1000)"
    }
    }
}

from bsmart.core import Scan as Scan
import os
import sys
import pickle
import numpy as np
import pandas as pd
from sklearn.ensemble import GradientBoostingRegressor
import math
from bsmart.BSMlikelihood import MakeLikelihoods, MakeGlobalLikelihood, safe_float
from bsmart import debug

[docs] def generate_param_points(inputs, num_points): variables_range = [] for varname in inputs['Variables']: if 'RANGE' in inputs['Variables'][varname]: varmin = inputs['Variables'][varname]['RANGE'][0] varmax = inputs['Variables'][varname]['RANGE'][1] variables_range.append(np.random.uniform(varmin, varmax, num_points)) return np.array(variables_range).T
[docs] class NewScan(Scan):
[docs] def initialise(self): """ Need to make sure we override certain settings """ self.runsettings.store_points_in_memory = True self.runsettings.invalid_return_value = [0] if self.runsettings.store_invalid_points: self.naive = False self.runsettings.invalid_return_value = [0] else: self.naive = True # we treat invalid points as bad self.runsettings.invalid_return_value = [] self.citations = """@article{Hammad:2022wpq, author = "Hammad, A. and Park, Myeonghun and Ramos, Raymundo and Saha, Pankaj", title = "{Exploration of parameter spaces assisted by machine learning}", eprint = "2207.09959", archivePrefix = "arXiv", primaryClass = "hep-ph", doi = "10.1016/j.cpc.2023.108902", journal = "Comput. Phys. Commun.", volume = "293", pages = "108902", year = "2023" } """
def __init__(self, inputs, log): Scan.__init__(self, inputs, log) print("start scan init") # Helper to get setting from Networks or Setup def get_setting(key, default, cast_type=int): val = self.inputs['Networks'].get(key) if val is None: val = self.inputs['Setup'].get(key) if val is None: return default return cast_type(val) self.iteration = get_setting('Iterations', 10) self.candidate_points = get_setting('Candidate_Points', 500) self.bootstrap_points = get_setting('Bootstrap_Points', 100) self.points_per_iter = get_setting('Points_Per_Iteration', 300) self.threshold_value = get_setting('Threshold_Value', 1, float) #self.target_points = get_setting('Target Points', 20000) self.function_dim = int(len(self.inputs['Variables'])) self.random_fraction = get_setting('Random_Fraction', 0.2, float) # GBR specific settings self.learning_rate = get_setting('LearningRate', 0.01, float) self.n_estimators = get_setting('Estimators', 100) self.max_depth = get_setting('Max_Depth', 30) self.subsample = get_setting('Subsample', 1.0, float) self.min_samples_split = get_setting('Min_Samples_Split', 2) self.min_samples_leaf = get_setting('Min_Samples_Leaf', 1) self.verbose = get_setting('Verbose', 0) print(f"verbos: {self.verbose}") if "Cores" in self.inputs['Setup']: self.ncores = int(self.inputs['Setup']['Cores']) self.log.info('Setting number of cores to %d' % self.ncores) self.target_points = int(self.inputs['Setup'].get('Points', 10000)) self.n_variables = len(self.inputs['Variables']) print("make likelihoods") self.maxloss = np.log(1 + np.finfo(np.float64).max) + 1 self.likelihood_fns, self.observable_masks = MakeLikelihoods(self.inputs["Observables"], loglike=True) self.primary_observable = 'NLL' self.InitCSV = self.inputs['Setup'].get('InitCSV') if self.InitCSV: if os.path.isfile(self.InitCSV): self.log.info(f'Will use InitCSV file: {self.InitCSV}') else: self.log.error(f'InitCSV file not found: {self.InitCSV}') self.InitCSV = None print("finished scan init")
[docs] def postprocess(self, Point, observables, data_point, temp_dir, log, lock=None): """ return the likelihood; we won't get this far if the point failed to be generated """ if self.naive: return np.sum(self.get_losses(observables)) else: return 1.0
[docs] def smooth_cap_loss(self, x): """ Caps the loss by applying a sigmoid. This is useful for losses that are unbounded. """ return -self.maxloss*np.expm1(x/self.maxloss) # assume x is negative and want to change its sign
[docs] def get_losses(self, observables): """ Returns a list of losses. """ likeit = iter(self.likelihood_fns) return [self.smooth_cap_loss((next(likeit))(val)) if mask and not math.isnan(val := safe_float(v)) else float((next(likeit) and False) or self.maxloss) for v, mask in zip(observables, self.observable_masks) if mask]
[docs] def extract_from_valid_points(self, valid_points): # inputs contains the parameter values params = np.array([point['inputs'] for point in valid_points]) batch_observables = [point["observables"] for point in valid_points] # Calculate NLL (fitness) for each point using get_losses logic losses = np.array([self.get_losses(obs) for obs in batch_observables]) fitnesses = np.sum(losses, axis=1) return params, fitnesses
[docs] def run(self): GBR = GradientBoostingRegressor( learning_rate=self.learning_rate, n_estimators=self.n_estimators, max_depth=self.max_depth, subsample=self.subsample, min_samples_split=self.min_samples_split, min_samples_leaf=self.min_samples_leaf, random_state=42 ) # 1. Bootstrap self.RunManager.run_batch(generate_param_points(self.inputs, self.bootstrap_points).tolist()) if not self.RunManager.valid_batch_points: sys.exit("No valid points found in initial batch!") all_params, all_nll = self.extract_from_valid_points(self.RunManager.valid_batch_points) # 2. Init CSV # 2. Init CSV if self.InitCSV: self.log.info(f'Loading InitCSV: {self.InitCSV}') df = pd.read_csv(self.InitCSV).dropna() csv_params = df[list(self.inputs['Variables'].keys())].values if self.primary_observable in df.columns: csv_nll = pd.to_numeric(df[self.primary_observable], errors='coerce').fillna(1e10).values else: self.log.info(f" '{self.primary_observable}' not found in InitCSV. Recalculating from Observables...") obs_keys = list(self.inputs['Observables'].keys()) missing_obs = [k for k in obs_keys if k not in df.columns] if missing_obs: self.log.warning(f"InitCSV missing observables: {missing_obs}") csv_obs_data = [] for k in obs_keys: if k in df.columns: csv_obs_data.append(pd.to_numeric(df[k], errors='coerce').values) else: csv_obs_data.append(np.full(len(df), np.nan)) csv_obs_data = np.column_stack(csv_obs_data) csv_nll = np.array([np.sum(self.get_losses(row)) for row in csv_obs_data]) all_params = np.vstack([all_params, csv_params]) all_nll = np.concatenate([all_nll, csv_nll]) # 3. Filter Failures mask = all_nll < 1e9 if not np.any(mask): sys.exit('No valid training data after filtering failures!') all_params, all_nll = all_params[mask], all_nll[mask] # 4. Initial Training n_good = np.sum(all_nll < self.threshold_value) initial_good_points = n_good self.log.info(f"Initial good points: {initial_good_points}. Target new points: {self.target_points}. Stopping at: {initial_good_points + self.target_points}") self.log.info(f'Training: {len(all_nll)} pts, {n_good} good ({n_good/len(all_nll)*100:.1f}%). Stats: Mean={np.mean(all_nll):.2f}, Std={np.std(all_nll):.2f}') GBR.fit(all_params, all_nll) # Validation idx = np.random.choice(len(all_params), min(5, len(all_params)), replace=False) for p, a in zip(GBR.predict(all_params[idx]), all_nll[idx]): self.log.info(f'Pred: {p:.2f} vs Act: {a:.2f} (Err: {abs(p-a):.2f})') # 5. Active Learning Loop run_num = 0 while n_good < (self.target_points + initial_good_points): run_num += 1 # Candidates cands = generate_param_points(self.inputs, self.candidate_points) ml_cands = cands[GBR.predict(cands) < self.threshold_value] # Batch Construction n_ml = int(self.points_per_iter * (1 - self.random_fraction)) batch = ml_cands[:n_ml] n_rnd = self.points_per_iter - len(batch) self.log.info(f"ML predicts {len(batch)} good points; adding {n_rnd} random points") #n_rnd = int(self.points_per_iter * self.random_fraction) rnd_pts = generate_param_points(self.inputs, n_rnd) batch = np.vstack([batch, rnd_pts]) if len(batch) > 0 else rnd_pts # Execution self.RunManager.run_batch(batch.tolist()) if not self.RunManager.valid_batch_points: continue # Update Data new_params, new_nll = self.extract_from_valid_points(self.RunManager.valid_batch_points) all_params = np.vstack([all_params, new_params]) all_nll = np.concatenate([all_nll, new_nll]) # Retrain (ignoring failures) mask = all_nll < 1e9 GBR.fit(all_params[mask], all_nll[mask]) n_good = np.sum(all_nll < self.threshold_value) new_good = np.sum(new_nll < self.threshold_value) self.log.info(f'Run {run_num}: +{new_good} good. Total: {n_good}') self.log.info(f'Target reached: {n_good} points.') mask = all_nll < self.threshold_value return all_params[mask], all_nll[mask]