Source code for bsmart.scans.Vegas

"""

Importance Sampling using vegas

Written by M. Goodsell



"""


from bsmart.core import Scan as Scan

import numpy as np
import math
from bsmart import debug


import os
import shutil

from bsmart import BSMutil


#from bsmart.scans import MCMC
import copy

from bsmart.BSMlikelihood import MakeLikelihoods, MakeGlobalLikelihood

import sys, os

import numpy as np

import vegas




[docs] class NewScan(Scan): """Scanner class for Vegas Scans""" def __init__(self, inputs, log): Scan.__init__(self, inputs, log) self.downscalers,self.upscalers = BSMutil.create_scalers(self.inputs,log) self.n_variables=len(self.upscalers) self.use_loglike=eval(self.inputs['Setup'].get('LogLike',"True")) self.Points=int(self.inputs['Setup'].get('Points',10000)) """Number of points to sample. Default: 10000.""" # batch size for training self.batch_size = int(self.inputs['Setup'].get('Batch Size',100)) if 'Iterations' in self.inputs['Setup']: self.batch_size=int(self.Points/self.inputs['Setup']['Iterations']) # Create the likelihood functions based on 'scaling' property of variables, if they have it; otherwise use gaussian likelihood self.get_likelihood,self.observable_masks=MakeGlobalLikelihood(self.inputs['Observables'],self.use_loglike) """ self.likelihoodfns=MakeLikelihoods(self.inputs['Observables'],True) ## use log likelihood """ #self.input_dim=sum([1 for x in self.observable_masks if x]) #self.flow=RealNVP(dim=self.n_variables, hidden_dim=64, num_layers=4).to(device) self.epochs = int(self.inputs["Setup"].get("Epochs",1000)) self.LR=float(self.inputs["Setup"].get("LR",1e-3)) self.patience=int(self.inputs["Setup"].get("Patience",30)) self.all_valid=[] self.all_invalid=[] #self.sample= [torch.empty(0,self.input_dim),[]] # Only #self.sample= [torch.empty(0,self.n_variables),torch.empty(0)]
[docs] def postprocess(self,Point, observables, data_point,temp_dir,log, lock=None): """ return the likelihood; we won't get this far if the point failed to be generated """ return self.get_likelihood(observables)
#lik =self.get_likelihood(observables) #return lik
[docs] def call_batch(self,input_numpy): """ Handles running of the points input_torch is a torch tensor """ upscaled=[[f(float(x)) for x,f in zip(p,self.upscalers)] for p in input_numpy] results=self.RunManager.run_batch(upscaled) mask=[] new_results=[] for rvars,res in zip(input_numpy,results): if res!=[]: self.all_valid.append(list(map(float,rvars))) mask.append(True) new_results.append(res) else: mask.append(False) new_results.append(0.0) self.all_invalid.append(list(map(float,rvars))) return np.array(new_results)
[docs] def run(self): self.log.info("Starting Vegas scan!") self.log.info(f"batch size: {self.batch_size}, points: {self.Points}") @vegas.lbatchintegrand def f(x): return self.call_batch(x) # create a box that is [0,1]**n_variables # dls_vegas.neval_batch=100 # dls_vegas.min_neval_batch=100 integ = vegas.Integrator(self.n_variables * [[0, 1]]) warmup = integ(f, nitn=2, neval=200) result = integ(f, nitn=5, neval=1000) print(result.summary()) print('result = %s Q = %.2f' % (result, result.Q))