Source code for bsmart.scans.MLScanner.MLS_RFC

"""
MLScanner RFC method
--------------------



MLScanner method `MLS_RFC` based on code from 

* `<https://github.com/AHamamd150/MLscanner>`_
* `<https://arxiv.org/abs/2207.09959>`_

This scan implements an active learning strategy using a Random Forest Classifier (RFC)
to efficiently find "good" points in a parameter space. A point is considered "good" (Class 1)
if its Negative Log Likelihood (NLL) is below a specified threshold, and "bad" (Class 0) otherwise.

The process is as follows:

1.  **Initialization**: The scan begins by evaluating a small set of randomly generated points (`Bootstrap_Points`).
    It can also load an initial dataset from a CSV file (`InitCSV`).

2.  **Initial Training**: A Random Forest Classifier is trained on this initial dataset.
    Points are labeled as 1 (Good) or 0 (Bad) based on the `Threshold_Value`.

3.  **Active Learning Loop**: The scan enters a loop to iteratively discover new good points until
    a `Target_Points` count is reached. In each iteration:

    a. A large number of `Candidate_Points` are randomly generated.
    b. The trained RFC model predicts the probability of each candidate being "good".
    c. Candidates with the highest probability of being good, plus a small `Random_Fraction`,
       are selected for evaluation by the physics code.
    d. **Retraining**: The RFC is retrained with the newly discovered points, improving its
       ability to separate good regions from bad regions.

4.  **Data Collection**: All discovered good points (NLL < Threshold) are returned.

This method is particularly effective for high-dimensional parameter spaces where exhaustive
scanning is computationally prohibitive.
"""

__meta__ = {
 "name": "MLS_RFC",
 "requires": ["sklearn", "pandas", "numpy"],
 "settings": {
    "Networks": {
        "Iterations": "Number of active learning iterations (default: 10).",
        "Candidate_Points": "Number of candidate points to generate and score in each iteration (default: 500).",
        "Bootstrap_Points": "Number of initial random points to evaluate (default: 100).",
        "Points_Per_Iteration": "Number of candidate points to evaluate in each iteration (default: 300).",
        "Threshold_Value": "The threshold for the NLL to classify a point as 'good' (default: 1).",
        "Random_Fraction": "Fraction of points per iteration to be selected randomly, for exploration (default: 0.2).",
        "Estimators": "Number of trees in the forest (default: 300).",
        "Max_Depth": "Maximum depth of the tree (default: 50).",
        "Min_Samples_Split": "The minimum number of samples required to split an internal node (default: 2).",
        "Min_Samples_Leaf": "The minimum number of samples required to be at a leaf node (default: 1).",
        "Verbose": "Verbosity level (default: 0)."
    },
    "Setup": {
        "InitCSV": "Path to an optional CSV file with initial points to seed the scan.",
        "Points": "Number of points to generate *in total* before stopping (default: 1000)"
    }
    }
}

from bsmart.core import Scan as Scan
import os
import sys

import numpy as np
import pandas as pd
import sklearn
from sklearn.ensemble import RandomForestClassifier
import math
from bsmart.BSMlikelihood import MakeLikelihoods, MakeGlobalLikelihood, safe_float
from bsmart import debug



[docs] def generate_param_points(inputs, num_points): variables_range = [] for varname in inputs['Variables']: if 'RANGE' in inputs['Variables'][varname]: varmin = inputs['Variables'][varname]['RANGE'][0] varmax = inputs['Variables'][varname]['RANGE'][1] variables_range.append(np.random.uniform(varmin, varmax, num_points)) return np.array(variables_range).T
[docs] class NewScan(Scan):
[docs] def initialise(self): self.runsettings.store_points_in_memory = True self.runsettings.invalid_return_value = [0] if self.runsettings.store_invalid_points: self.naive = False self.runsettings.invalid_return_value = [0] else: self.naive = True # we treat invalid points as bad self.runsettings.invalid_return_value = [] self.citations = """@article{Hammad:2022wpq, author = "Hammad, A. and Park, Myeonghun and Ramos, Raymundo and Saha, Pankaj", title = "{Exploration of parameter spaces assisted by machine learning}", eprint = "2207.09959", archivePrefix = "arXiv", primaryClass = "hep-ph", doi = "10.1016/j.cpc.2023.108902", journal = "Comput. Phys. Commun.", volume = "293", pages = "108902", year = "2023" } """
def __init__(self, inputs, log): Scan.__init__(self, inputs, log) print("start scan init") # Helper to get setting from Networks or Setup def get_setting(key, default, cast_type=int): val = self.inputs['Networks'].get(key) if val is None: val = self.inputs['Setup'].get(key) if val is None: return default return cast_type(val) self.iteration = get_setting('Iterations', 10) self.candidate_points = get_setting('Candidate_Points', 500) self.bootstrap_points = get_setting('Bootstrap_Points', 100) self.points_per_iter = get_setting('Points_Per_Iteration', 300) self.threshold_value = get_setting('Threshold_Value', 1.0) #self.target_points = get_setting('Target_Points', 20000) self.function_dim = int(len(self.inputs['Variables'])) self.random_fraction = get_setting('Random_Fraction', 0.2) self.n_estimators = get_setting('Estimators', 300) self.max_depth = get_setting('Max_Depth', 50) self.min_samples_split = get_setting('Min_Samples_Split', 2) self.min_samples_leaf = get_setting('Min_Samples_Leaf', 1) self.verbose = get_setting('Verbose', 0) if "Cores" in self.inputs['Setup']: self.ncores = int(self.inputs['Setup']['Cores']) #torch.set_num_threads(self.ncores) self.log.info('Setting number of cores to %d' % self.ncores) self.target_points = int(self.inputs['Setup'].get('Points', 10000)) #self.num_points = int(self.inputs['Setup'].get('Points', 10000)) self.n_variables = len(self.inputs['Variables']) print("make likelihoods") self.maxloss = np.log(1 + np.finfo(np.float64).max) + 1 self.likelihood_fns, self.observable_masks = MakeLikelihoods(self.inputs["Observables"], loglike=True) self.primary_observable = 'NLL' self.InitCSV = self.inputs['Setup'].get('InitCSV') if self.InitCSV: if os.path.isfile(self.InitCSV): self.log.info(f'Will use InitCSV file: {self.InitCSV}') else: self.log.error(f'InitCSV file not found: {self.InitCSV}') self.InitCSV = None print("finished scan init")
[docs] def postprocess(self, Point, observables, data_point, temp_dir, log, lock=None): """ return the likelihood; we won't get this far if the point failed to be generated """ if self.naive: return np.sum(self.get_losses(observables)) else: return 1.0
[docs] def smooth_cap_loss(self, x): """ Caps the loss by applying a sigmoid. This is useful for losses that are unbounded. """ return -self.maxloss*np.expm1(x/self.maxloss) # assume x is negative and want to change its sign
[docs] def get_losses(self, observables): """ Returns a list of losses. """ likeit = iter(self.likelihood_fns) return [self.smooth_cap_loss((next(likeit))(val)) if mask and not math.isnan(val := safe_float(v)) else float((next(likeit) and False) or self.maxloss) for v, mask in zip(observables, self.observable_masks) if mask]
[docs] def extract_from_valid_points(self, valid_points): # inputs contains the parameter values params = np.array([point['inputs'] for point in valid_points]) batch_observables = [point["observables"] for point in valid_points] # Calculate NLL (fitness) for each point using get_losses logic losses = np.array([self.get_losses(obs) for obs in batch_observables]) fitnesses = np.sum(losses, axis=1) return params, fitnesses
[docs] def run(self): RFC = RandomForestClassifier( n_estimators=self.n_estimators, max_depth=self.max_depth, min_samples_split=self.min_samples_split, min_samples_leaf=self.min_samples_leaf, n_jobs=self.ncores, random_state=42 ) # 1. Bootstrap self.RunManager.run_batch(generate_param_points(self.inputs, self.bootstrap_points).tolist()) if not self.RunManager.valid_batch_points: sys.exit("No valid points found in initial batch!") all_params, all_nll = self.extract_from_valid_points(self.RunManager.valid_batch_points) # 2. Init CSV # 2. Init CSV if self.InitCSV: self.log.info(f'Loading InitCSV: {self.InitCSV}') df = pd.read_csv(self.InitCSV).dropna() csv_params = df[list(self.inputs['Variables'].keys())].values # Recalculate NLL if missing if self.primary_observable in df.columns: all_obs = pd.to_numeric(df[self.primary_observable], errors='coerce').fillna(1e10).values else: self.log.info(f" '{self.primary_observable}' not found in InitCSV. Recalculating from Observables...") obs_keys = list(self.inputs['Observables'].keys()) missing_obs = [k for k in obs_keys if k not in df.columns] if missing_obs: self.log.warning(f"InitCSV missing observables: {missing_obs}.") csv_obs_data = [] for k in obs_keys: if k in df.columns: csv_obs_data.append(pd.to_numeric(df[k], errors='coerce').values) else: csv_obs_data.append(np.full(len(df), np.nan)) csv_obs_data = np.column_stack(csv_obs_data) all_obs = np.array([np.sum(self.get_losses(row)) for row in csv_obs_data]) all_params = np.vstack([all_params, csv_params]) all_nll = np.concatenate([all_nll, all_obs]) # 3. Filter Failures (keep clean data for training) mask = all_nll < 1e9 if not np.any(mask): sys.exit('No valid training data!') all_params, all_nll = all_params[mask], all_nll[mask] # 4. Initial Training labels = (all_nll < self.threshold_value).astype(int) n_good = np.sum(labels) initial_good_points = n_good self.log.info(f"Initial good points: {initial_good_points}. Target new points: {self.target_points}. Stopping at: {initial_good_points + self.target_points}") if n_good < self.function_dim: sys.exit(f'Not enough good points ({n_good}) to train! Need {self.function_dim}+.') self.log.info(f'Training: {len(labels)} pts, {n_good} good ({n_good/len(labels)*100:.1f}%)') RFC.fit(all_params, labels) # Validation idx = np.random.choice(len(all_params), min(5, len(all_params)), replace=False) for p, a in zip(RFC.predict(all_params[idx]), labels[idx]): self.log.info(f'Pred: {p} vs Act: {a}') # 5. Active Learning Loop run_num = 0 while n_good < (self.target_points + initial_good_points): run_num += 1 # Candidates cands = generate_param_points(self.inputs, self.candidate_points) # Predict Probabilities probs_full = RFC.predict_proba(cands) # --- Single Class Fix Start --- # If training data has only one class (e.g. all good), predict_proba returns (N, 1) if probs_full.shape[1] == 2: probs = probs_full[:, 1] else: # Handle single-class case: check if the single class is 1 (Good) or 0 (Bad) probs = probs_full[:, 0] if RFC.classes_[0] == 1 else np.zeros(len(cands)) # --- Single Class Fix End --- # Select best candidates and filter sorted_idx = np.argsort(probs)[::-1] # Descending order ml_cands = cands[sorted_idx] # Filter those predicted as good (prob > 0.5) # Actually, we can just take the top N even if prob < 0.5 to encourage exploration, # but strictly speaking we want "good" points. # Let's use the explicit prediction for filtering to match the logic "ml_cands = cands[RFC.predict(cands) == 1]" # BUT sorted by probability. preds = RFC.predict(cands) ml_cands = cands[preds == 1] # If we want to sort the *good* candidates by confidence: if len(ml_cands) > 0: probs_ml_full = RFC.predict_proba(ml_cands) if probs_ml_full.shape[1] == 2: ml_probs = probs_ml_full[:, 1] else: ml_probs = probs_ml_full[:, 0] if RFC.classes_[0] == 1 else np.zeros(len(ml_cands)) ml_cands = ml_cands[np.argsort(ml_probs)[::-1]] # Batch n_ml = int(self.points_per_iter * (1 - self.random_fraction)) batch = ml_cands[:n_ml] n_rnd = self.points_per_iter - len(batch) self.log.info(f"ML predicts {len(batch)} good points; adding {n_rnd} random points") #n_rnd = int(self.points_per_iter * self.random_fraction) rnd_pts = generate_param_points(self.inputs, n_rnd) batch = np.vstack([batch, rnd_pts]) if len(batch) > 0 else rnd_pts # Execution self.RunManager.run_batch(batch.tolist()) if not self.RunManager.valid_batch_points: continue # Update new_params, new_nll = self.extract_from_valid_points(self.RunManager.valid_batch_points) all_params = np.vstack([all_params, new_params]) all_nll = np.concatenate([all_nll, new_nll]) # Retrain (filter failures again) mask = all_nll < 1e9 labels = (all_nll[mask] < self.threshold_value).astype(int) RFC.fit(all_params[mask], labels) n_good = np.sum(labels) new_good = np.sum(new_nll < self.threshold_value) self.log.info(f'Run {run_num}: +{new_good} good. Total: {n_good}') self.log.info(f'Target reached: {n_good} points.') mask = all_nll < self.threshold_value return all_params[mask], all_nll[mask]