Source code for bsmart.scans.AffineMC

"""

Affine MCMC scan using the Goodman & Weare algorithm.

This is an excellent robust MCMC algorithm for larger numbers of dimensions. 

The chain, and automatic plots, are stored in a Results subdirectory. Results are updated after each pass. 

The relevant scan-specific settings are:

.. code-block:: text

    "Setup" : {
        "Steps": "int, Number of steps",
        "Walkers" : "int, Number of walkers, default 10 * number of variables"
    }





To store observable information inside the results file, it is necessary to set:

.. code-block:: json

    "Setup" : {
        "store_points_in_memory": "True",
        "store_invalid_points": "True"
    }

"""

__meta__ = {
    "name": "AffineMC",
    "requires": ["matplotlib","numpy","corner","pandas","seaborn"],
    "settings": {
        "Steps": "int, Number of steps",
        "Walkers" : "int, Number of walkers, default 10 * number of variables"
    }
}



from bsmart.core import Scan as Scan

import numpy as np
import math
from bsmart import debug


import os

from bsmart import BSMutil


from bsmart.BSMlikelihood import MakeGlobalLikelihood

import sys, os

import numpy as np



import math

[docs] class NewScan(Scan): """Scanner class for Affine MCMC Scans"""
[docs] def initialise(self): if self.runsettings.store_points_in_memory and self.runsettings.store_invalid_points: self.naive = False self.runsettings.invalid_return_value = 0 else: self.naive = True # we treat invalid points as bad self.runsettings.invalid_return_value = []
def __init__(self, inputs, log): Scan.__init__(self, inputs, log) self.downscalers,self.upscalers = BSMutil.create_scalers(self.inputs,log) self.n_variables=len(self.upscalers) #self.use_loglike=eval(self.inputs['Setup'].get('LogLike',"True")) self.Steps=self.inputs['Setup'].get('Steps',100) if 'Walkers' in self.inputs['Setup']: self.n_walkers = int(self.inputs['Setup']['Walkers']) else: #self.n_walkers = 10 * self.runsettings.cores self.n_walkers = 20 * self.n_variables #self.maxloss = np.log(1 + np.finfo(np.float64).max) + 1 self.maxloss = math.log(1+sys.float_info.max) + 1 # Create the likelihood functions based on 'scaling' property of variables, if they have it; otherwise use gaussian likelihood #elf.get_likelihood,self.observable_masks=MakeGlobalLikelihood(self.inputs['Observables'],self.use_loglike) #self.get_likelihood,self.observable_masks=MakeGlobalLikelihood(self.inputs['Observables'],return_type="NLL") # need a negative log likelihood if self.naive: self.getNLL,self.masks = MakeGlobalLikelihood(self.inputs['Observables'],return_type="NLL", smooth_losses= False) else: self.getNLL,self.masks = MakeGlobalLikelihood(self.inputs['Observables'],return_type="NLL", smooth_losses= True, max_loss=self.maxloss) self.n_observables = len(self.masks) # count *all* observables self.n_active_observables = sum(1 if mask else 0 for mask in self.masks) if 'Initial Sample' in self.inputs['Setup']: from bsmart.ml.seed_points import get_previous_sample seed_inputs,_ = get_previous_sample(sample_file=self.inputs['Setup']['Initial Sample'], inputs=self.inputs, nsamples = self.n_walkers, returntype = "NLL") # now we have to scale the inputs: #self.seed_inputs = np.array([[f(x) for x,f in zip(point,self.downscalers)] for point in seed_inputs]) # NB want to set the mean to be the best point, which is the first one in the list self.seed_inputs = np.array([[f(x) for x,f in zip(point,self.downscalers)] for point in seed_inputs]) else: self.seed_inputs = None
[docs] def postprocess(self,Point, observables, data_point,temp_dir,log, lock=None): """ return the likelihood; we won't get this far if the point failed to be generated """ """ calculate likelihood """ # For this algorithm we are minimising the objective function #res=1e100 if self.naive: return self.getNLL(observables) else: return 1.0
[docs] def run_batch(self,thetas): # Scale thetas, run the batch, return the likelihoods batch = [ [f(y) for y,f in zip(list(theta),self.upscalers)] for theta in thetas] results = self.RunManager.run_batch(batch) if self.naive: # postprocess returns the NLL, or [] if invalid losses = np.array([value if isinstance(value, (int, float)) else self.maxloss for value in results]) batch_observables = None else: # keeping 'invalid' points batch_observables = np.array([point["observables"] for point in self.RunManager.all_batch_points]) losses = np.array([self.getNLL(obs) for obs in batch_observables]) #print(f"losses: {losses}, of size {len(losses)}") return losses, batch_observables # want to keep the values of observables, only possible if not naive
[docs] def run(self): """ Affine MCMC: Uses Goodman & Weare Affine Invariant Ensemble Sampler """ self.log.info(f"Starting Affine MCMC scan with {self.n_walkers} walkers for {self.Steps} steps.") n_dim = self.n_variables n_walkers = self.n_walkers # Check if n_walkers is even, required for the split update if n_walkers % 2 != 0: self.log.warning(f"Number of walkers {n_walkers} is not even! Increasing by 1.") n_walkers += 1 self.n_walkers += 1 # 1. Initialization current_positions = np.zeros((n_walkers, n_dim)) if self.seed_inputs is not None: # Use seed inputs if available n_seeds = len(self.seed_inputs) if n_seeds >= n_walkers: current_positions = self.seed_inputs[:n_walkers] else: current_positions[:n_seeds] = self.seed_inputs # Fill the rest randomly current_positions[n_seeds:] = np.random.rand(n_walkers - n_seeds, n_dim) else: # Random initialization in [0, 1] hypercube current_positions = np.random.rand(n_walkers, n_dim) # Evaluate initial NLLs #batch_phys = [ [f(x) for x, f in zip(pos, self.upscalers)] for pos in current_positions ] #current_nlls = np.array(self.RunManager.run_batch(batch_phys)) current_nlls, current_observables = self.run_batch(current_positions) #print(f"current_nlls: {current_nlls}, current_observables: {current_observables}") # Handle potential invalid results in initial batch (though unlikely if random in [0,1]) # If run_batch returns something other than float (e.g. None or object), handle it. # Assuming run_batch returns list of floats/numeric based on CMAES.py analysis. # Setup specific for Affine Invariant stretch move a = 2.0 # Scale parameter, usually 2.0 # Arrays to store chain # shape: (Steps, Walkers, D) chain = np.zeros((self.Steps, n_walkers, n_dim)) chain_nll = np.zeros((self.Steps, n_walkers)) chain_observables = np.zeros((self.Steps, n_walkers, self.n_observables)) accepted = 0 total_proposals = 0 varlist = list(self.inputs['Variables'].keys()) if self.naive: obslist=[] else: obslist = list(self.inputs['Observables'].keys()) header = ','.join(varlist+obslist+['NLL']) # Save Results run_name = self.inputs['Setup'].get('RunName', 'AffineMC_Run') results_dir = os.path.join(self.inputs['Setup']['cwd'], run_name, 'Results') if not os.path.exists(results_dir): os.makedirs(results_dir) output_file = os.path.join(results_dir, "Results.csv") with open(output_file, 'w') as f: f.write(header + '\n') for step in range(self.Steps): # clear stored points from memory if not self.naive: self.RunManager.all_points = [] self.RunManager.valid_points = [] # Split walkers into two subsets: S1 (first half) and S2 (second half) half_k = n_walkers // 2 # We update S1 using S2, then S2 using S1 # This allows parallel updates within each subset #splits = [ (np.arange(0, half_k), np.arange(half_k, n_walkers)), np.arange(half_k, n_walkers), np.arange(0, half_k)) ] # random split all_indices = np.arange(n_walkers) np.random.shuffle(all_indices) splits = [(all_indices[:half_k], all_indices[half_k:]), (all_indices[half_k:], all_indices[:half_k])] for active_indices, passive_indices in splits: n_active = len(active_indices) n_passive = len(passive_indices) # Current active walkers X_active = current_positions[active_indices] NLL_active = current_nlls[active_indices] # Passive walkers to sample from (complementary set) X_passive = current_positions[passive_indices] # Generate proposals for all active walkers # For each active walker, choose a random passive walker # We can just sample n_active indices from 0 to n_passive-1 random_passive_indices = np.random.randint(0, n_passive, size=n_active) X_j = X_passive[random_passive_indices] # Stretch factor Z # g(z) ~ 1/sqrt(z) in [1/a, a] # CDF: G(z) = (sqrt(z) - sqrt(1/a)) / (sqrt(a) - sqrt(1/a)) # Inverse CDF: z = ( (sqrt(a) - sqrt(1/a))*u + sqrt(1/a) )^2 u = np.random.rand(n_active) z = ((a - 1.0) * u + 1.0)**2 / a # Note: The standard implementation (emcee) uses ((a-1)*u + 1)^2 / a Check: # if u=0 -> 1/a. if u=1 -> a^2/a = a. Correct. # Proposal: Y = X_j + z * (X_active - X_j) Y = X_j + z[:, None] * (X_active - X_j) # Check bounds [0, 1] # If OOB, NLL = infinity # We'll identify valid proposals in_bounds = np.all((Y >= 0.0) & (Y <= 1.0), axis=1) # Prepare batch evaluation # Only evaluate points in bounds valid_indices_local = np.where(in_bounds)[0] NLL_new_active = np.full(n_active, np.inf) observables_valid = None if len(valid_indices_local) > 0: # Extract valid proposals Y_valid = Y[valid_indices_local] # Upscale valid proposals #batch_phys_valid = [ [f(x) for x, f in zip(pos, self.upscalers)] for pos in Y_valid ] # Run batch #results_valid = self.RunManager.run_batch(batch_phys_valid) results_valid, observables_valid = self.run_batch(Y_valid) # Store results # Make sure to handle potential non-float returns if any for idx, res in enumerate(results_valid): if isinstance(res, (int, float)): NLL_new_active[valid_indices_local[idx]] = res else: NLL_new_active[valid_indices_local[idx]] = self.maxloss # 4. Metropolis-Hastings Acceptance # log_ratio = (D - 1) * log(z) + NLL_old - NLL_new # Accept if log(U) < log_ratio log_u = np.log(np.random.rand(n_active)) log_ratio = (n_dim - 1) * np.log(z) + NLL_active - NLL_new_active accept = log_u < log_ratio # Update positions and NLLs active_indices_accepted = active_indices[accept] local_indices_accepted = np.where(accept)[0] if len(active_indices_accepted) > 0: current_positions[active_indices_accepted] = Y[local_indices_accepted] current_nlls[active_indices_accepted] = NLL_new_active[local_indices_accepted] accepted += len(active_indices_accepted) if observables_valid is not None: # only true if not naive ... #print(observables_valid.shape) #print(current_observables.shape) # Map local accepted indices to indices in observables_valid # We know valid_indices_local contains all potentially accepted indices valid_positions = np.searchsorted(valid_indices_local, local_indices_accepted) current_observables[active_indices_accepted] = observables_valid[valid_positions] total_proposals += n_active # Store in chain chain[step] = current_positions chain_nll[step] = current_nlls flat_step = chain[step].reshape(-1, n_dim) flat_step_phys = np.array([ [f(x) for x, f in zip(pos, self.upscalers)] for pos in flat_step ]) if not self.naive: chain_observables[step] = current_observables with open(output_file, 'a') as f: np.savetxt(f, np.hstack((flat_step_phys, chain_observables[step].reshape(-1, self.n_observables), chain_nll[step].reshape(-1, 1))), delimiter=',', comments='') #np.savetxt(output_file, np.hstack((flat_step_phys, chain_observables[step].reshape(-1, self.n_observables), chain_nll[step].reshape(-1, 1))), delimiter=',', comments='', header=header) else: with open(output_file, 'a') as f: np.savetxt(f, np.hstack((flat_step_phys, chain_nll[step].reshape(-1, 1))), delimiter=',', comments='') if (step + 1) % 10 == 0 or step == 0: self.log.info(f"Step {step+1}/{self.Steps} completed. Mean NLL: {np.mean(current_nlls):.4f}. Acceptance rate: {accepted/total_proposals:.4f}") self.log.info(f"Finished AffineMC run. Final Acceptance rate: {accepted/total_proposals:.4f}") # Flatten chain for CSV export: (Steps * Walkers, D) #flat_chain = chain.reshape(-1, n_dim) # Upscale all points for saving #flat_phys = np.array([ [f(x) for x, f in zip(pos, self.upscalers)] for pos in flat_chain ]) #np.savetxt(output_file, flat_phys, delimiter=',', comments='', header=header) #np.savetxt(output_file, np.hstack((flat_phys, chain_observables.reshape(-1, self.n_observables), chain_nll.reshape(-1, 1))), delimiter=',', comments='', header=header) self.log.info(f"Results saved to {output_file}") if not self.runsettings.debug: print(f"Finished AffineMC run. Final Acceptance rate: {accepted/total_proposals:.4f}") print(f"Results saved to {output_file}") from bsmart import BSMplots ## Make an extra corner plot plotstub=os.path.join(results_dir,"corner_plot") BSMplots.make_auto_corner_csv(plotstub,os.path.join(results_dir,"Results.csv"),varlist,{}) os.system('python3 '+plotstub+'.py') plotstub=os.path.join(results_dir,'fancy_auto_corner') BSMplots.make_fancy_auto_corner_csv(plotstub,os.path.join(results_dir,"Results.csv"),varlist,{}) os.system('python3 '+plotstub+'.py')