Source code for bsmart.scans.MCMC

"""
Simple MCMC scan


Simple MCMC scan implementing the Metropolis-Hastings algorithm

Launches a seperate scan for each core, with the option to merge the
results at the end via "Merge Results":"True" in "Setup". Of course, will
also run in single-core mode.

For variables we require a range and a variance for the jumps:

.. code-block:: text

    "Variables": {
         "lambda": { "RANGE": [0.01,1.0], 
                     "VARIANCE": 0.2}
         },


Several possibilities for likelihood functions are provided -- see the BSMlikelihood file.

E.g. gaussian, Log likelihood, "USER", or "OFF" for observables that should not be taken into account for the likelihood calculation (but the user wants to retain them in the summary file for other purposes). These are set by providing e.g.:

.. code-block:: text

    "Observables": {
    	"mh" : { "SLHA": ["MASS", [25]], "SCALING": "GAUSS",
    		"MEAN": 125.0,
    		"VARIANCE":3.0
    	      }


or "SCALING":"LOG", or "SCALING":"OFF"

It is not necessary to specify the scaling explicitly: gaussian is taken by default

The results are summarised in a csv file (or several) in the Results subdirectory of the scan, which contains only the points retained by the scan (along with an ID for each point); the raw points will as usual be retained in the Spectrum_Files directory if so desired. 

"""

__meta__ = {
    "name": "MCMC",
    "requires": ["numpy"],
    "settings": {
        "Points": "Int",
        "Valid Points": "Int",
        "Merge Results": "Boolean",
        "Initial Sample": "Path to initial sample file",
        "Sample LogLike Name": "Column name for LogLike (if initial sample and relevant)",
        "Sample NLL Name": "Column name for NLL (if initial sample and relevant)",
        "Sample Likelihood Name": "Column name for Likelihood (if initial sample and relevant)"
    }
}



from bsmart.core import Scan as Scan

import numpy as np
import math
from bsmart import debug
import multiprocessing as mp

import queue
import time
import os
import shutil

from bsmart.HEPRun import UpdateProgress, DataPoint

from bsmart.BSMlikelihood import MakeLikelihoods, MakeGlobalLikelihood






[docs] class NewScan(Scan): """Scanner class for multicore MCMC Scans""" def __init__(self, inputs, log): Scan.__init__(self, inputs, log) self.header=[] self.header.append('ID') self.variances = [] self.lock = None for v in self.inputs['Variables']: self.variances.append(self.inputs['Variables'][v]['VARIANCE']) self.header.append(v) ## append the variable name ... # Create the likelihood functions based on 'scaling' property of variables, if they have it; otherwise use gaussian likelihood for o in self.inputs['Observables']: self.header.append(o) #self.use_loglike=eval(self.inputs['Setup'].get('LogLike',"False")) #self.likelihoodfns=MakeLikelihoods(self.inputs['Observables'],self.use_loglike) #self.get_likelihood,_=MakeGlobalLikelihood(self.inputs['Observables'],self.use_loglike) self.get_likelihood,_=MakeGlobalLikelihood(self.inputs['Observables'],return_type="Likelihood") self.header.append('Likelihood') #self.headerlinestring='# '+', '.join(self.header) self.headerlinestring=','.join(self.header) if 'Points' in self.inputs['Setup']: self.maxpoints=self.inputs['Setup']['Points'] else: self.maxpoints=None if 'Valid Points' in self.inputs['Setup']: self.maxvalidpoints=self.inputs['Setup']['Valid Points'] else: self.maxvalidpoints=None if not self.maxvalidpoints and not self.maxpoints: log.error('No Points target or Valid Points target') raise if self.maxvalidpoints: self.target=self.maxvalidpoints else: self.target=self.maxpoints self.resultsdir=os.path.join(self.inputs['Setup']['cwd'], self.inputs['Setup']['RunName'],'Results') self.merge_results = self.inputs['Setup'].get('Merge Results',False) """ For resuming, or using an initial sample, we need just to have a starting point per core. The initial sample should also be copied to the results """ self.starting_points=[] # list of input values and their likelihoods self.merged_filename=os.path.join(self.resultsdir,'MCMC_results') self.number_of_prior_points=0 self.number_of_prior_valid=0 if self.resume: # this handled by core.py if os.path.exists(os.path.join(self.resultsdir,'MCMC_results')): # Previous scan actually completed! self.inputs['Setup']['Initial Sample'] = self.merged_filename self.number_of_prior_valid = max(0,len(list(open(self.merged_filename, 'r'))) - 1) elif all([os.path.exists(os.path.join(self.resultsdir,'MCMC_run_'+str(core))) for core in range(len(self.RunManager.CoreRuns))]): # results files from all cores exist; read from each sample to get starting point, and merge the files into a results file if required totaldf = None for core in range(len(self.RunManager.CoreRuns)): resultfile=os.path.join(self.resultsdir,'MCMC_run_'+str(core)) core_inputs,core_likes=self.get_previous_sample(resultfile) if len(core_inputs) > 0: self.starting_points.append([list(core_inputs[-1]),float(core_likes[-1])]) self.number_of_prior_valid += len(core_inputs) else: # empty file self.starting_points.append([[],0]) elif os.path.exists(self.RunManager.csv_filename): self.inputs['Setup']["Initial Sample"] = self.RunManager.csv_filename # we just treat this as an initial sample if os.path.exists(self.RunManager.csv_filename): # count number of lines in self.RunManager.csv_filename self.number_of_prior_points = max(0,len(list(open(self.RunManager.csv_filename, 'r'))) - 1) if len(self.starting_points) == 0 and 'Initial Sample' in self.inputs['Setup'] and os.path.exists(self.inputs['Setup']['Initial Sample']): self.prior_sample, self.prior_likes = self.get_previous_sample(self.inputs['Setup']['Initial Sample']) # now must make some choices of where to start ... from bsmart.ml.seed_points import select_seeds # select seeds tries to minimis the nll selected_point_ids=list(select_seeds(self.prior_sample,-np.log(self.prior_likes),nseeds=self.runsettings.cores)) self.starting_points=[ [self.prior_sample[id],self.prior_likes[id]] for id in selected_point_ids] if len(self.starting_points) < self.runsettings.cores: self.starting_points = self.starting_points + [[],0] * (self.runsettings.cores - len(self.starting_points) ) # set up results directory and files if os.path.exists(self.resultsdir) and not self.resume: shutil.rmtree(self.resultsdir) if not os.path.exists(self.resultsdir): os.makedirs(self.resultsdir) # initialise the scans self.ScanThreads=[] for x in range(len(self.RunManager.CoreRuns)): self.ScanThreads.append(MCMC_onecore(self,x)) if len(self.starting_points) > 0: for numthread,pt in enumerate(self.starting_points): log.info("Adding initial point " +str(pt)) self.ScanThreads[numthread].last_point = list(pt[0]) self.ScanThreads[numthread].likelihood_last_step = float(pt[1])
[docs] def get_previous_sample(self,sample_file): """ Routine to load data from a previous sample and return a couple of numpy arrays of the values """ self.log.info('Attempting to load prior sample '+sample_file) try: import pandas as pd df = pd.read_csv(sample_file) columns=df.columns.tolist() sample_inputs= df[self.inputs['Variables'].keys()].to_numpy() if 'Sample LogLike Name' in self.inputs["Setup"]: likes = np.exp(df[self.inputs["Setup"]["Sample LogLike Name"]].to_numpy()) elif 'Sample NLL Name' in self.inputs["Setup"]: likes = np.exp(-df[self.inputs["Setup"]["Sample NLL Name"]].to_numpy()) elif 'Sample Likelihood Name' in self.inputs["Setup"]: likes = df[self.inputs["Setup"]["Sample Likelihood Name"]].to_numpy() elif 'Likelihood' in columns: likes = df['Likelihood'].to_numpy() elif 'Result' in columns: #if eval(self.inputs["Setup"].get("LogLike","True")): # likes = df['Result'].to_numpy() #else: # likes = df['Likelihood'].to_numpy() # Assume this comes from a prior MCMC scan likes = df['Result'].to_numpy() else: raise ValueError("Missing required columns for likelihood calculation") self.log.info('Loaded Initial sample '+sample_file) return sample_inputs,likes except Exception as e: self.log.error('Failed to load initial sample from file, will generate new one; '+str(e)) return np.empty(),np.empty()
[docs] def get_next_pointnumber(self): if self.lock: self.lock.acquire() nextid=self.next_id.value if not self.maxpoints or (nextid < self.maxpoints): self.next_id.value=self.next_id.value + 1 else: self.lock.release() raise # raise an exception to show we're done self.lock.release() else: nextid=self.next_id if not self.maxpoints or (self.next_id < self.maxpoints): self.next_id=self.next_id + 1 else: raise # raise an exception to show we're done if not self.maxvalidpoints: # target is number of *valid* points in chain UpdateProgress(nextid, self.target) return nextid
[docs] def propose_point(self,last_point): okay = False while okay == False: next_step = [np.random.normal(x, y) for x, y in zip(last_point, self.variances)] okay = True # check if point is in the given range for v, r in zip(next_step, self.inputs['Variables'].values()): if v < min(r['RANGE'][0],r['RANGE'][1]): okay = False if v > max(r['RANGE'][0],r['RANGE'][1]): okay = False return next_step
[docs] def get_random_point(self): return [np.random.uniform(x['RANGE'][0], x['RANGE'][1]) for x in self.inputs['Variables'].values()]
[docs] def run(self): start_time=time.time() self.log.debug('Started MCMC scan') if len(self.ScanThreads) > 1: manager = mp.Manager() List_all = manager.list() List_valid = manager.list() self.lock=manager.Lock() #self.next_id=mp.Value('i',0) ## next_id is an integer (hence 'i') self.next_id=mp.Value('i',self.number_of_prior_points) ## next_id is an integer (hence 'i') #self.valid_points=mp.Value('i',0) self.valid_points=mp.Value('i',self.number_of_prior_valid) processes = [mp.Process(target=thread.run,args=(List_all, List_valid)) for thread in self.ScanThreads] for p in processes: p.start() for p in processes: p.join() self.all_points_run=list(List_all) self.valid_points_run=list(List_valid) else: self.all_points_run=[] self.valid_points_run=[] self.next_id=self.number_of_prior_points self.valid_points=self.number_of_prior_valid self.ScanThreads[0].run(self.all_points_run,self.valid_points_run) self.RunManager.all_points=self.all_points_run self.RunManager.valid_points=self.valid_points_run end_time=time.time() self.log.debug('Finished MCMC scan after %.3f s' %(end_time-start_time)) UpdateProgress(self.target, self.target, message='',finish=True) ## merge output files if that is asked for if self.inputs['Setup']['Cores'] > 1: if 'Merge Results' in self.inputs['Setup']: if eval(self.inputs['Setup']['Merge Results']): merged_file=os.path.join(self.resultsdir,'MCMC_results') if not os.path.exists(merged_file): # don't overwrite previous results! with open(merged_file,'w') as FO: FO.write(self.headerlinestring+'\n') for thread in self.ScanThreads: debug.command_line_log('tail -n +2 '+thread.resultsfile+' >> '+merged_file,self.log) os.remove(thread.resultsfile)
#def postprocess(self,Point, observables, outslha,temp_dir,log, lock=None):
[docs] def postprocess(self,Point, observables, data_point,temp_dir,log, lock=None): """ return the likelihood; we won't get this far if the point failed to be generated """ """ calculate likelihood """ res=0.0 # for the MCMC we need likelihoods, not logL #if self.use_loglike: # try: # res = math.exp(self.get_likelihood(observables)) # except: # pass #else: try: res = self.get_likelihood(observables) except: res = 0.0 #print(f"Got likelihood: {res}") return res ''' lh = 1. for v, f in zip(observables,self.likelihoodfns): lh = lh * f(float(v)) return lh '''
[docs] class MCMC_onecore(): def __init__(self,ParentScan,CoreNumber): self.ParentScan=ParentScan self.runner=ParentScan.RunManager.CoreRuns[CoreNumber] self.last_point = [] self.likelihood_last_step = 0 #self.steps = 1 #self.points_total = 1 self.converged = False # set up results file self.resultsfile=os.path.join(ParentScan.resultsdir,'MCMC_run_'+str(CoreNumber)) with open(self.resultsfile,'w') as FO: # even if we resume, we restart here FO.write(ParentScan.headerlinestring+'\n')
[docs] def add_valid_point(self): if self.ParentScan.lock: self.ParentScan.lock.acquire() if not self.ParentScan.maxvalidpoints or (self.ParentScan.valid_points.value < self.ParentScan.maxvalidpoints): self.ParentScan.valid_points.value=self.ParentScan.valid_points.value+1 else: self.ParentScan.lock.release() raise ## raises an error indicating we're done self.ParentScan.lock.release() else: # single core mode if not self.ParentScan.maxvalidpoints or (self.ParentScan.valid_points < self.ParentScan.maxvalidpoints): self.ParentScan.valid_points=self.ParentScan.valid_points+1 if self.ParentScan.maxvalidpoints is not None: UpdateProgress(self.ParentScan.valid_points.value,self.ParentScan.maxvalidpoints)
[docs] def run(self,all_list,valid_list): self.runner.log.debug('Starting MCMC scan') # Initialise the random seed in each thread, otherwise we get identical points!! np.random.seed() # start with random point found_start=False # For resume!! if self.last_point != [] and self.likelihood_last_step > 0: found_start = True self.runner.log.info("Starting scan at point "+str(self.last_point)+" with likelihood "+str(self.likelihood_last_step)) while not found_start: try: tempID=self.ParentScan.get_next_pointnumber() except: ## have already exhausted the number of points to try return point = self.ParentScan.get_random_point() try: #outlh=self.runner.run_point(point,tempID,lock=self.ParentScan.lock,make_point_container=True) outlh=self.runner.run_point(point,tempID,lock=self.ParentScan.lock) """ if self.ParentScan.RunManager.settings.store_points: tpoint=DataPoint(self.runner.last_point.ID,self.runner.last_point.inputs) tpoint.outputs=self.runner.last_point.outputs tpoint.obs_dict=self.runner.last_point.obs_dict all_list.append(tpoint) """ #if self.ParentScan.RunManager.settings.store_points: # valid_list.append(self.runner.last_point) self.likelihood_last_step = outlh self.add_valid_point() found_start=True self.last_point=point except: pass #if self.ParentScan.RunManager.settings.store_points: # all_list.append(self.runner.last_point) # have a valid starting point, now we can engage the general algorithm self.reached_end=False while not self.reached_end: # First check in case one of the other threads just completed the list if self.ParentScan.maxvalidpoints is not None and (self.ParentScan.valid_points.value >= self.ParentScan.maxvalidpoints): self.reached_end = True break try: tempID=self.ParentScan.get_next_pointnumber() except: self.reached_end=True return point=self.ParentScan.propose_point(self.last_point) try: #outlh=self.runner.run_point(point,tempID,self.ParentScan.lock,make_point_container=True) outlh=self.runner.run_point(point,tempID,self.ParentScan.lock) except: continue """ if self.ParentScan.RunManager.settings.store_points: tpoint=DataPoint(self.runner.last_point.ID,self.runner.last_point.inputs) tpoint.outputs=self.runner.last_point.outputs tpoint.obs_dict=self.runner.last_point.obs_dict all_list.append(tpoint) #all_list.append(self.runner.last_point) """ # check whether to keep the point #print(f"checking {outlh} for {point} against {self.likelihood_last_step} for {self.last_point}") if (outlh > self.likelihood_last_step) or (np.random.uniform(0,1)*self.likelihood_last_step < outlh): self.likelihood_last_step = outlh #if self.ParentScan.RunManager.settings.store_points: # valid_list.append(self.runner.last_point) """ if self.ParentScan.RunManager.settings.store_points: valid_list.append(tpoint) """ self.last_point=point # write the output to our file outline=[str(tempID)] outline.extend(map(str,point)) outline.extend(map(str,self.runner.last_observables)) outline.append(str(outlh)) with open(self.resultsfile,'a') as FO: FO.write(','.join(outline)+'\n') try: self.add_valid_point() except: # have reached the end print("Reached end of scan!") self.reached_end=True return """ # this is redundant, right?!! else: continue """ return