"""
--------------------------------------------------------------------------------------------------
HEPRun class for handling running and IO with HEP tools.
--------------------------------------------------------------------------------------------------
Part of BSMArt, but it is self-contained (nb requires zslha and the classes for the different tools)
and you can import it into your own programs
In that case the useful classes are:
HEPRun (which manages everything)
CoreRunner (which manages running on one core)
DataPoint
You can use HEPRun to run batches of points (using multiprocessing!)
or CoreRunner.run_point(point) to run individual points
This is useful for MPI running mode, where the multiprocessing is handled outside the program.
Allowed settings in the input dictionary ("Setup" block unless otherwise specified):
- RunName: Name of the run.
- Cores: Number of cores to use.
- TempDir: Temporary working directory.
- debug: Enable debug mode (boolean).
- SLHA_Output: Boolean, control SLHA output generation.
- Skip SLHA Files: Boolean, for pure python scans (skips I/O).
- Output File: Name for collected outputs.
- Spectrum File: Name for intermediate spectrum file.
- StoreInputs: Boolean, whether to store input Les Houches files.
- StoreAllPoints: Boolean, whether to store all points (default True).
- Store Output Files: Boolean, alias for StoreAllPoints.
- Short: Boolean, enable tabbed output.
- csv: Boolean, enable CSV output.
- csv name: Name for the CSV file.
- Precision: Integer, precision for observables.
- Store Invalid: Boolean, store invalid points.
- Store In Memory: Boolean, store DataPoint objects in memory.
- StoreSeparateFiles: Boolean, store separate spectrum files.
- StoreEverything: Boolean, retain all inputs and outputs for each point.
- Use TQDM: Boolean, use tqdm progress bar.
Other blocks:
- Codes: Dictionary of codes to run.
- Observables: Dictionary of observables.
- Variables: Dictionary of variables.
- Derived Variables: Dictionary of derived variables.
- Fitting: Dictionary for fitting inputs.
- MPI: Dictionary containing MPI settings (Size, Rank).
"""
# For use with traceback.print_exc() in debugging:
#import traceback
# standard packages
import os
import sys
from bsmart import debug
import logging
import shutil
import multiprocessing as mp
try:
if mp.get_start_method() != 'fork':
mp.set_start_method('fork', force=True)
except (ValueError, RuntimeError):
pass
from collections import OrderedDict
from bsmart import zslha
import importlib
import queue
import time
import math
import copy
from typing import Optional, Union, Callable
try:
from tqdm import tqdm
have_tqdm = True
except:
have_tqdm = False
##############################################################################################################################
#
# Define here the RunSettings class
#
##############################################################################################################################
[docs]
class RunSettings:
"""
Container class to store settings to be passed to the running module
Ordinarily these are filled in in core.py
"""
def __init__(self, inputs: Union[str, dict, OrderedDict] = None):
self.name: str = ""
self.cores: int = 1 # Number of cores
self.work_dir: str = ""
self.output_dir: str = ""
self.inputs_dir: str = ""
self.store_outputs: bool = (
True # Whether to store output spc files, either concatenated or as separate files
)
self.store_inputs: bool = (
False # Whether to store input Les Houches files, false by default
)
self.debug: bool = False # Whether to enable additional debugging
self.MAINpath: str = ""
#self.packagepath: str = ""
self.codes: dict = {}
self.observables = OrderedDict()
self.input_filename: str = ""
self.output_filename: str = "SLHA_output"
self.spc_filename: str = "SLHA_output"
self.output_file_suffix: str = ".slha"
self.input_file_suffix: str = ".in"
self.no_slha_output: bool = (
True # Tells the scan not to expect an slha file output. This might be the case for a pythonic tool.
)
self.no_slha_input: bool = (
False # Tells the scan not to write an input file. Intended either for fake scans, or ones where all codes are handled by python
)
self.csv_output: bool = False
self.csv_name: str = ""
self.tabbed_output: bool = False
self.tabbed_name: str = ""
self.store_invalid_points: bool = False # store invalid points
self.store_separate_files = False
self.store_points_in_memory = False ## store DataPoint objects of the points in memory. NB not including the whole spectrum file ...
self.deep_store_points_in_memory = False ## Even store the spectrum files in memory. Ignored if store_points is false
self.invalid_return_value = [] # This should be specified by the scan!
self.store_everything = False # Retain all inputs and outputs for each point
self.variables = None
self.varnames = None
self.marginal_variables = None
self.fitting_inputs = None
self.use_MPI = False
self.derived_variables = None
self.precision = None # numerical precision to store observables
self.use_tqdm = have_tqdm
if inputs is not None:
if isinstance(inputs, str): # have just provided a name and nothing else
self.name = inputs
else:
self.name = inputs["Setup"]["RunName"]
self.cores = inputs["Setup"]["Cores"]
self.work_dir = inputs["Setup"]["TempDir"]
self.debug = eval(inputs["Setup"]["debug"])
#self.packagepath = inputs["Setup"]["PackagePath"]
self.no_slha_output = (
False # above it is set to True, but in usual running we want false
)
if inputs["MPI"]["Size"] > 1:
self.use_MPI = True
if "SLHA_Output" in inputs["Setup"]:
self.no_slha_output = not eval(str(inputs["Setup"]["SLHA_Output"]))
if (
"Skip SLHA Files" in inputs["Setup"]
): # For scans with purely python tools, where we don't want to keep i/o and just use datapoints
self.no_slha_output = eval(str(inputs["Setup"]["Skip SLHA Files"]))
self.no_slha_input = self.no_slha_output
if "Codes" not in inputs:
raise NameError("No codes found in input file")
else:
self.codes = inputs["Codes"]
### add observables defined within the code to the global observables list if present
for code in inputs["Codes"]:
if (
eval(inputs["Codes"][code]["Run"])
and "Observables" in inputs["Codes"][code]
):
for obs in inputs["Codes"][code]["Observables"]:
inputs["Observables"][obs] = inputs["Codes"][code][
"Observables"
][obs]
firstcode = next(iter(inputs["Codes"]))
if (
"Output File" in inputs["Setup"]
): # This sets the name for collected outputs: spectrum files, csv or tabbed outputs
self.output_filename = inputs["Setup"]["Output File"]
if "Spectrum File" in inputs["Setup"] and inputs["Setup"]["Spectrum File"] != "": # This sets the name for the intermediate spectrum file
self.spc_filename = inputs["Setup"]["Spectrum File"]
elif "OutputFile" in inputs["Codes"][firstcode] and inputs["Codes"][firstcode]["OutputFile"] != "":
# take default from the output of the first code, if it has one. This is fine if it's SPheno, for example
self.spc_filename = inputs["Codes"][firstcode]["OutputFile"]
## the input file should be the input for the first code that we run, presumably SPheno. Note we imported the json as an OrderedDict
if not self.no_slha_input:
self.input_filename = inputs["Codes"][firstcode]["InputFile"]
else:
self.input_filename = self.spc_filename
# If the
if "StoreInputs" in inputs["Setup"]:
self.store_inputs = eval(str(inputs["Setup"]["StoreInputs"]))
else:
self.store_inputs = False
self.store_outputs = True
if "StoreAllPoints" in inputs["Setup"]:
self.store_outputs = eval(inputs["Setup"]["StoreAllPoints"])
if "Store Output Files" in inputs["Setup"]:
self.store_outputs = eval(inputs["Setup"]["Store Output Files"])
if "Short" in inputs["Setup"]:
self.tabbed_output = eval(str(inputs["Setup"]["Short"]))
if "csv" in inputs["Setup"]:
self.csv_output = eval(str(inputs["Setup"]["csv"]))
if "csv name" in inputs["Setup"]:
self.csv_name = str(inputs["Setup"]["csv name"])
if "Precision" in inputs["Setup"]:
self.precision = int(inputs["Setup"]["Precision"])
if "Store Invalid" in inputs["Setup"]:
self.store_invalid_points = eval(inputs["Setup"]["Store Invalid"])
if "Store In Memory" in inputs["Setup"]:
self.store_points_in_memory = eval(
inputs["Setup"]["Store In Memory"]
)
if "StoreSeparateFiles" in inputs["Setup"]:
self.store_separate_files = eval(
str(inputs["Setup"]["StoreSeparateFiles"])
)
# self.store_everything=False # this is already set above by default
if "StoreEverything" in inputs["Setup"]:
self.store_everything = eval(
str(inputs["Setup"]["StoreEverything"])
)
if self.store_everything:
self.store_outputs = True
if inputs["MPI"]["Size"] > 1:
self.all_dir = os.path.join(
inputs["Setup"]["cwd"],
inputs["Setup"]["RunName"],
"All_Outputs",
"MPI_" + str(inputs["MPI"]["Rank"]),
)
else:
self.all_dir = os.path.join(
inputs["Setup"]["cwd"],
inputs["Setup"]["RunName"],
"All_Outputs",
)
if inputs["MPI"]["Size"] > 1:
self.inputs_dir = os.path.join(
inputs["Setup"]["cwd"],
inputs["Setup"]["RunName"],
"Input_Files",
"MPI_" + str(inputs["MPI"]["Rank"]),
)
self.output_dir = os.path.join(
inputs["Setup"]["cwd"],
inputs["Setup"]["RunName"],
"Spectrum_Files",
"MPI_" + str(inputs["MPI"]["Rank"]),
)
else:
self.inputs_dir = os.path.join(
inputs["Setup"]["cwd"],
inputs["Setup"]["RunName"],
"Input_Files",
)
self.output_dir = os.path.join(
inputs["Setup"]["cwd"],
inputs["Setup"]["RunName"],
"Spectrum_Files",
)
# self.output_filename='SLHA_output' # already set above
# else: # already set above
# self.spc_filename='SLHA_output'
if "Variables" in inputs:
self.variables = inputs["Variables"]
if hasattr(inputs["Variables"], "keys"):
self.varnames = list(inputs["Variables"].keys())
else:
self.varnames = list(inputs["Variables"])
if "Derived Variables" in inputs:
"""
Allow for functions of the variables, which should be provided as a dictionary like
{"dvar1": "math.exp(mh)", ...}
These will be treated like variables when writing the input files etc, but won't show up in the csv file
"""
self.derived_variables = inputs["Derived Variables"]
if "Observables" in inputs:
self.observables = inputs["Observables"]
if "Fitting" in inputs:
self.fitting_inputs = inputs["Fitting"]
if "Use TQDM" in inputs["Setup"]:
self.use_tqdm = have_tqdm and eval(
inputs["Setup"]["Use TQDM"]
) # use it if we have it!
##############################################################################################################################
#
# Define here the DataPoint class
#
##############################################################################################################################
[docs]
class DataPoint:
"""Class to hold info about each point.
We need at least an ID.
A list of inputs is the first step to filling.
observables are a list of floats
outputs are a list of *results* from the computation, intended to be returned to the scan. E.g. a likelihood
valid is whether the point passes validity checks etc
"""
def __init__(self, ID: int, inputs: list = None):
self.ID = ID
if inputs is None:
self.inputs = []
else:
self.inputs = inputs
# self.varnames: list=None # list of names of the variables
self.outputs: list = []
self.observables: list = []
self.valid = False
self.var_dict = None
self.varnames = None
self.marginal_variables = None
self.obs_dict = OrderedDict() # future use
self.spc = None
[docs]
def clear_obs(self):
self.outputs = []
self.observables = []
self.valid = False
# self.var_dict=None
self.obs_dict = OrderedDict() # future use
self.spc = None
[docs]
def make_var_dict(self, var_names: list):
self.var_dict = {
var_name: var_val for var_name, var_val in zip(var_names, self.inputs)
}
self.var_dict["VARIABLE"] = self.inputs
self.var_dict["math"] = math
[docs]
def get_var_dict(self, var_names: list = None):
if self.var_dict is not None:
return self.var_dict
if var_names is not None:
var_dict = {
var_name: var_val for var_name, var_val in zip(var_names, self.inputs)
}
var_dict["VARIABLE"] = (
self.inputs
) ## Added for backward compatibility, maybe remove?
else:
if (
self.var_dict is not None
): # don't want to overwrite it if we already evaluated it!
return self.var_dict
var_dict = {
"VARIABLE": self.inputs
} ## Added for backward compatibility, maybe remove?
var_dict["math"] = (
math ## to allow maths functions such as sqrt or cos/sin/tan!
)
return var_dict
# def calculate_derived_vars(self,derived_vars: dict):
# for var in
[docs]
def get_all_dict(self) -> dict:
if self.var_dict is not None:
res = self.var_dict
else:
res = self.get_var_dict()
if self.obs_dict is not None:
res.update(self.obs_dict)
return res
[docs]
def to_dict(self, all_data: bool = False):
"""
Creates a dictionary representation.
If all_data is True, we export the spc object too
"""
# return dict(self.__dict__)
res = {"ID": self.ID, "inputs": self.inputs}
# for var,val in self.var_dict.items():
# if var != 'math' and var != '__builtins__':
# print(f"{var}")
res["variables"] = {
var_name: var_val
for var_name, var_val in self.var_dict.items()
if var_name != "math" and var_name != "__builtins__"
}
if self.obs_dict is not None:
res["variables"].update(self.obs_dict)
res["observables"] = list(self.observables)
res["results"] = self.outputs
res["valid"] = self.valid
# res['observables'] = copy.deepcopy(self.observables)
# res['results'] = copy.deepcopy(self.outputs)
# res['valid'] = self.valid
if all_data and self.spc is not None:
res["spc"] = copy.deepcopy(self.spc.to_dict())
return res
[docs]
def UpdateProgress(number, length, message="", finish=False):
"""
Ridiculously basic progress bar
Implemented here so that we don't need to install extra packages
If tqdm is installed, you can use it in this function. See the example below:
from tqdm import tqdm
with tqdm(total=length) as pbar:
for i in range(number):
# Do stuff...
pbar.update()
"""
## below is not compatible with python < 3.6
# string=f'['
if length > 0:
string = "["
done = number * 50 // length
todo = 50 - done
percentage = number * 100 // length
if done != 0:
done = done - 1
string = (
string
+ ("=" * done)
+ ">"
+ (" " * todo)
+ "] "
+ str(percentage)
+ "% "
+ message
)
# if number == length:
if finish:
print(string)
else:
print(string, end="\r")
def _normalize_valid_bound(val):
"""
Normalize bounds used in VALID intervals to floats or None.
Accepts:
- None -> None
- float/int (incl. +/-inf) -> float(val)
- strings like 'inf', '+inf', 'infty', 'infinity', '-inf' -> +/-inf
- other strings convertible to float -> float(value)
"""
if val is None:
return None
if isinstance(val, (int, float)):
return float(val)
if isinstance(val, str):
s = val.strip().lower()
if s in ("inf", "+inf", "infty", "infinity", "+infty", "+infinity"):
return float("inf")
if s in ("-inf", "-infty", "-infinity"):
return float("-inf")
# fall back to numeric parse
return float(s)
return float(val)
[docs]
def create_validator(obs, range):
# Normalise bounds so that VALID can safely use numeric or string +/-inf
norm_range = [_normalize_valid_bound(v) for v in range]
varmax = max(norm_range)
varmin = min(norm_range)
def tfunc(obs_dict):
val = float(eval(obs, obs_dict))
if val < varmin:
return False
if val > varmax:
return False
return True
return tfunc
[docs]
def create_condition(cond_string):
def tfunc(obs_dict):
# print(f"Checking condition {cond_string}:")
# print(f"{bool(eval(cond_string,obs_dict))}")
return bool(eval(cond_string, obs_dict))
return tfunc
"""
def smart_format(value,precision):
if isinstance(value, (float)):
return "{0:.{1}g}".format(value, precision)
return str(value)
"""
"""
def format_variable(value):
if isinstance(value, (float)):
return str(round(value,8))
return str(value)
"""
##############################################################################################################################
#
# Define here the HepTool class
#
##############################################################################################################################
##############################################################################################################################
#
# Define here the write_inputfile function
#
##############################################################################################################################
def _create_default_write_inputfile(
input_filename : str,
log: logging.Logger,
block_info: Union[dict, OrderedDict] = None,
):
"""
Routine to create a function that will write an slha input file from a template and/or list of blocks.
"""
log.info('Reading bare spc: '+str(input_filename))
try:
inputspc= zslha.read(input_filename,verbose=False)
except Exception as e:
log.error('Problem reading slha file '+input_filename+', ' + str(e))
inputspc = zslha.SLHA()
# in case there is no template and we want to use json input
if 'Blocks' in block_info:
try:
for current_block in block_info['Blocks']:
if current_block not in inputspc.blocks:
inputspc.blocks[current_block]={}
for key in block_info['Blocks'][current_block].keys():
inputspc.blocks[current_block][key] = block_info['Blocks'][current_block][key]
except Exception as e:
log.debug('Exception reading json block input: '+str(e))
# now create the function to write the input file
def write_lh_file(
point: list,
dir: str,
name: str,
values_dict: dict=None,
Overloads: list = None
):
""" write Les Houches input file for given parameter point """
lhname=os.path.join(dir,name)
if Overloads is not None:
"""
Allow overloading of block values, e.g. during marginalisation
Format: a list of form
[["SPHENOINPUTS",[1],1.0],...]
"""
# obtain a new copy of the input spc
runspc = zslha.SLHA(inputspc) #
for overload in Overloads:
if len(overload) < 3:
continue
block=overload[0].upper()
blockkey=str(overload[1])[1:-1].replace(" ", "")
value=overload[2]
if block in runspc.blocks:
runspc.blocks[block][blockkey] = value
if values_dict is None:
zslha.write_lh(runspc, point, lhname,self.varnames)
else:
zslha.write_lh_dict(runspc, values_dict, lhname)
return write_lh_file
##############################################################################################################################
#
# Define here the HEPRun class
#
##############################################################################################################################
[docs]
class HEPRun:
"""
This is the main class to handle running.
"""
def __init__(self,
runsettings: RunSettings = None,
write_inputfile: Callable = None,
postprocess: Callable = None,
log: logging.Logger = None,
inputs: Union[dict,OrderedDict] = {},
):
if runsettings is None:
# use the inputs instead!
self.settings = RunSettings(inputs)
else:
self.settings = runsettings
if write_inputfile is None:
self.write_inputfile = _create_default_write_inputfile(
self.settings.input_filename,
self.log,
inputs,
)
else:
self.write_inputfile = write_inputfile
self.postprocess = postprocess
self.work_dir = runsettings.work_dir
self.use_tqdm = runsettings.use_tqdm
if log == None:
self.log = debug.new_logger(
self.settings.debug,
self.settings.name,
os.path.join(self.settings.work_dir, self.settings.name),
)
self.log.debug("Initialise HEPRun")
else:
log.debug("Initialise HEPRun")
self.log = log
if self.settings.store_points_in_memory:
self.all_points = []
self.valid_points = []
self.all_batch_points = []
self.valid_batch_points = []
self.nextpointID = 1
if self.settings.cores > 1:
self.input_points = mp.Queue()
self.output_fileheaders = mp.Queue()
self.invalid_fileheaders = mp.Queue()
self.progress_messages = mp.Queue()
else:
self.input_points = queue.Queue()
self.output_fileheaders = queue.Queue()
self.invalid_fileheaders = queue.Queue()
self.progress_messages = queue.Queue()
if self.settings.fitting_inputs is not None:
self.do_marginalisation = True
if "Method" in self.settings.fitting_inputs:
self.marginalisation_method = self.settings.fitting_inputs["Method"]
else:
self.marginalisation_method = None
self.set_up_marginalisation()
if "Options" in self.settings.fitting_inputs:
## should be dict, but need to evaluate if there is a "True"
self.marginalisation_options = {
x: eval(str(self.settings.fitting_inputs["Options"][x]))
for x in self.settings.fitting_inputs["Options"]
}
else:
self.marginalisation_options = {"ftol": 1e-1, "disp": False, "eps": 0.1}
# print('Options:')
# print(dict(eval(self.marginalisation_options)))
# raise
else:
self.do_marginalisation = False
self.tools = []
self.set_up_codes()
## Set up a run handler for each core
self.CoreRuns = []
for x in range(self.settings.cores):
self.CoreRuns.append(CoreRunner(self, "Core_" + str(x)))
self.all_tools_run_state()
## set up an output file and how we format output if we want to store csv or tab values
if self.settings.csv_output:
if self.settings.csv_name == "":
# self.csv_filename=os.path.join(self.settings.output_dir,self.settings.output_filename)
# if self.settings.store_outputs:
log.info(
"CSV output without a csv file name -> appending .csv to output filename"
)
# self.csv_filename=self.csv_filename+'.csv'
csv_filename = self.settings.output_filename + ".csv"
else:
csv_filename = self.settings.csv_name
self.csv_filename = os.path.join(self.settings.output_dir, csv_filename)
# else:
# self.csv_filename=os.path.join(self.settings.output_dir,self.settings.csv_name)
if os.path.isfile(self.csv_filename):
os.remove(self.csv_filename)
self.output_fileheaders.put([self.csv_filename, ","])
if self.settings.store_invalid_points: # keep a csv file for invalid points
self.invalid_csv_filename = os.path.join(
self.settings.output_dir, "invalid_" + csv_filename
)
self.invalid_fileheaders.put([self.invalid_csv_filename, ","])
if self.settings.tabbed_output:
if self.settings.tabbed_name == "":
tabbed_name = self.settings.output_filename + ".dat"
self.tabbed_filename = os.path.join(
self.settings.output_dir, self.settings.output_filename
)
if self.settings.store_outputs:
log.info(
"Tabbed output without a tab file name -> appending .dat to output filename"
)
self.tabbed_filename = self.tabbed_filename + ".dat"
else:
tabbed_name = self.settings.tabbed_name
self.tabbed_filename = os.path.join(
self.settings.output_dir, self.settings.tabbed_name
)
if os.path.isfile(self.tabbed_filename):
os.path.remove(self.tabbed_filename)
self.output_fileheaders.put([self.tabbed_filename, "\t"])
if self.settings.store_invalid_points: # keep a csv file for invalid points
self.invalid_tabbed_filename = os.path.join(
self.settings.output_dir, "invalid_" + tabbed_name
)
self.invalid_fileheaders.put([self.invalid_tabbed_filename, "\t"])
# Define functions for formatting variables and observables
# In zslha the variables are printed out only to 8 significant figures anyway.
"""
def format_variable(value):
if isinstance(value, (float)):
return "{0:.{1}g}".format(value, 8)
return str(value)
if self.settings.precision is not None:
def format_observable(self,value):
print(value)
if isinstance(value, (float)):
return "{0:.{1}g}".format(value, self.settings.precision)
return str(value)
else:
def format_observable(self,value):
print(value)
return str(value)
"""
self.format_variable = format_variable # defined above, rounds to 8 decimal places, since that is what is done in zslha
if self.settings.precision is not None:
self.format_observable = create_smart_format(self.settings.precision)
else:
self.format_observable = lambda x: str(x)
if self.settings.store_inputs:
self.input_filename_prefix = os.path.join(
self.settings.inputs_dir, self.settings.name + "_"
)
if self.settings.store_outputs:
if self.settings.store_separate_files:
# self.output_filename_prefix=os.path.join(self.settings.output_dir,self.settings.name+'_')
self.output_filename_prefix = os.path.join(
self.settings.output_dir, self.settings.output_filename + "_"
)
else:
self.output_filename = os.path.join(
self.settings.output_dir, self.settings.output_filename
)
if self.settings.store_everything:
# self.output_directory_prefix=os.path.join(self.settings.all_dir,self.settings.name+'_')
# self.output_filename_prefix=os.path.join(self.settings.all_dir,self.settings.output_filename+'_')
self.output_directory_prefix = os.path.join(
self.settings.all_dir, self.settings.output_filename + "_"
)
[docs]
def set_up_marginalisation(self):
import bsmart.marginalisation as marginalisation
if "Variables" not in self.settings.fitting_inputs:
raise NameError("Fitting must include Variables!")
self.marginal_variables = list(self.settings.fitting_inputs["Variables"].keys())
# print('Marginal variables: ' +str(self.marginal_variables))
self.marginal_downscalers, self.marginal_upscalers, self.marginal_ranges = (
marginalisation.create_scalers_marginal(
self.settings.fitting_inputs, self.log
)
)
self.number_tools_run_marginalisation = 0
# print('here')
if "Observables" not in self.settings.fitting_inputs:
raise NameError("Fitting must include Observables!")
self.marginal_observables = list(
self.settings.fitting_inputs["Observables"].keys()
)
if (
(
self.marginalisation_method == None
or self.marginalisation_method == "Brentq"
or self.marginalisation_method != "Minimise"
)
and len(self.marginal_variables) == 1
and len(self.marginal_observables) == 1
):
self.oned_marginal = True
self.marginal_chisquared = marginalisation.create_diff_marginal(
self.settings.fitting_inputs["Observables"]
)
else:
if self.marginalisation_method == "Minimise":
self.marginalisation_method = None
self.oned_marginal = False
self.marginal_chisquared = marginalisation.create_chisquare_marginal(
self.settings.fitting_inputs["Observables"]
)
[docs]
def set_up_codes(self):
"""Set up the HEP Tools needed in the scan"""
# classpath = os.path.join(self.settings.packagepath, "tools")
found_observables = []
for toolname in self.settings.codes:
self.log.debug("Setting up " + toolname)
if eval(self.settings.codes[toolname]["Run"]):
"""
Allow definition of observables inside the tool
"""
if self.do_marginalisation and len(found_observables) < len(
self.marginal_observables
):
self.number_tools_run_marginalisation += 1
if "Observables" in self.settings.codes[toolname]:
for obs in self.settings.codes[toolname]["Observables"]:
# print(obs)
self.settings.observables[obs] = self.settings.codes[toolname][
"Observables"
][obs]
if self.do_marginalisation and obs in self.marginal_observables:
found_observables.append(obs)
# Try to import from bsmart.tools
tool_module = None
try:
tool_module = importlib.import_module("bsmart.tools." + toolname)
except ImportError:
pass
if tool_module:
try:
self.tools.append(
tool_module.NewTool(
toolname, self.settings.codes[toolname], self.settings
)
)
except Exception as e:
self.log.critical(
"Failed to initialise " + toolname + " from bsmart.tools, error " + str(e)
)
raise SystemExit
else:
self.log.warning(
"Required tool "
+ toolname
+ " not found in bsmart.tools, trying user directory ./Tools"
)
## try a user directory
classfile = os.path.join(os.getcwd(), "Tools", toolname + ".py")
if not os.path.isfile(classfile):
self.log.critical(
"Required tool not present "
+ toolname
+ ", should be in file "
+ classfile
)
raise SystemExit
try:
if os.path.join(os.getcwd(), "Tools") not in sys.path:
sys.path.append(os.path.join(os.getcwd(), "Tools"))
new_class = importlib.import_module(toolname)
self.tools.append(
new_class.NewTool(
toolname, self.settings.codes[toolname], self.settings
)
)
except Exception as e:
self.log.critical(
"Failed to initialise " + toolname + " ,error " + str(e)
)
raise SystemExit
## If we didn't find all the observables in the tools
if self.do_marginalisation and len(found_observables) < len(
self.marginal_observables
):
self.extra_observable_step = True
else:
self.extra_observable_step = False
[docs]
def newpoint(self, inputs):
nextID = self.nextpointID
self.nextpointID = self.nextpointID + 1
return DataPoint(nextID, inputs)
[docs]
def new_point_to_stack(self, inputs):
nextID = self.nextpointID
self.nextpointID = self.nextpointID + 1
# self.input_points.put(DataPoint(nextID,inputs))
self.input_points.put(
{"ID": nextID, "inputs": inputs}
) # since we're not storing anything here
[docs]
def run_batch(self, sample):
sample_size = len(sample)
sample_start = self.nextpointID
for point in sample:
self.new_point_to_stack(point)
for _ in range(len(self.CoreRuns)):
self.input_points.put(None) ## Add a 'sentinel' to indicate termination
self.log.info(
"Starting batch run of "
+ str(sample_size)
+ " points over "
+ str(self.settings.cores)
+ " cores"
)
start_time = time.time()
if not self.use_tqdm:
UpdateProgress(0, sample_size, (" 0 of " + str(sample_size) + " done"))
self.all_batch_points = []
self.valid_batch_points = []
if self.settings.cores > 1:
with mp.Manager() as manager:
# List_all = manager.list() # better to replace this with a queue!
# Using a manager queue is more robust, especially with 'spawn' or 'forkserver' start methods
# which are becoming default in newer Python versions.
result_queue = manager.Queue()
lock = manager.Lock()
# processes = [mp.Process(target=corerun.run_all,args=(List_all, List_valid, sample_start, sample_size,lock)) for corerun in self.CoreRuns]
processes = [
mp.Process(
target=corerun.run_all,
args=(result_queue, sample_start, sample_size, lock),
)
for corerun in self.CoreRuns
]
for p in processes:
p.start()
if self.use_tqdm:
# with tqdm(total=sample_size,ncols=50) as pbar:
pbar = tqdm(total=sample_size, ncols=50)
else:
pbar = None
while True:
try:
item = result_queue.get(timeout=0.1)
# print(f"Got item {item}")
if item["valid"]:
self.valid_batch_points.append(item)
self.all_batch_points.append(item)
if self.use_tqdm:
pbar.update(1)
except (
queue.Empty
): # Queue.Empty, not necessarily finished! Check! Could just have timed out, weird!
if all(not w.is_alive() for w in processes):
break
continue
except KeyboardInterrupt: # something went wrong or interrupted
self.log.warning("Running interrupted")
for p in processes:
p.terminate()
for p in processes:
p.join()
raise
except (
Exception
) as e: # Something else when wrong. Should not ignore
self.log.error("Running hit exception " + str(e))
for p in processes:
p.terminate()
for p in processes:
p.join()
raise
# except KeyboardInterrupt:
# except: # something went wrong
# continue
for p in processes:
p.join()
nfinal = len(self.all_batch_points)
nvalid = len(self.valid_batch_points)
else:
result_queue = queue.Queue()
if self.use_tqdm:
pbar = tqdm(total=sample_size, ncols=50)
else:
pbar = None
self.CoreRuns[0].run_all(result_queue, sample_start, sample_size, Lock=None, pbar=pbar)
while True:
try:
item = result_queue.get_nowait()
except queue.Empty: # Queue.Empty, not necessarily finished! Check!
break
except Exception as e:
raise NameError("Error collecting results, " + str(e))
if item["valid"]:
self.valid_batch_points.append(item)
self.all_batch_points.append(item)
if self.use_tqdm:
pbar.update(1)
nfinal = len(self.all_batch_points)
nvalid = len(self.valid_batch_points)
if self.settings.store_points_in_memory:
self.all_points = self.all_points + self.all_batch_points
self.valid_points = self.valid_points + self.valid_batch_points
message = "Batch finished, %d of %d points evaluated are valid" % (
nvalid,
nfinal,
)
if not self.use_tqdm:
UpdateProgress(sample_size, sample_size, message, True)
self.log.info("Batch finished ...")
self.log.info(
"... with %d points in total, of which %d are valid" % (nfinal, nvalid)
)
end_time = time.time()
self.log.info("Batch run took %f seconds " % (end_time - start_time))
### Now return the results, just the result values in the correct order:
if self.settings.cores > 1:
return_vals = [[] for x in range(len(self.all_batch_points))]
reordered_all_batch = [None] * len(return_vals)
for point in self.all_batch_points:
return_vals[point["ID"] - sample_start] = point["results"]
reordered_all_batch[point["ID"] - sample_start] = point
self.all_batch_points = reordered_all_batch
# no need to reorder the valid list, since we know it will have holes?
# otherwise could do
# self.valid_batch_points.sort(key=lambda x: x["ID"])
#
# return_vals[point.ID-sample_start] = point.outputs ## use the ID to order the points
return return_vals
else: ## points already correctly ordered
return [point["results"] for point in self.all_batch_points]
# return [point.outputs for point in self.all_batch_points] ### note we set the output to self.settings.invalid_return_value
##############################################################################################################################
#
# Define here the CoreRunner class
#
##############################################################################################################################
[docs]
class CoreRunner:
"""Class to handle running on one core, takes HEPRun as parent, which becomes the self.RunHandler"""
def __init__(self, ParentRunner, name):
ParentRunner.log.debug("Setting up runner name " + name)
## set up temp directory and output directory
self.RunHandler = ParentRunner
# self.output_file=ParentRunner.settings.output_filename
self.spc_file = ParentRunner.settings.spc_filename
self.name = name
self.temp_dir = os.path.join(self.RunHandler.work_dir, name)
if os.path.isdir(self.temp_dir):
shutil.rmtree(self.temp_dir)
os.makedirs(self.temp_dir)
self.log = debug.new_core_logger(
self.RunHandler.settings.debug, name, os.path.join(self.temp_dir, name)
)
self.last_observables = None
# self.last_spc=None # this won't pickle
self.last_point_dict = None # We need this to pass all point information back and forth, in case we are storing it.
self.run_tools = (
{}
) ### defined to allow tools to not necessarily be run every time
# for code in self.ParentRunner.tools
def __del__(self):
"""destructor method: delete working directory to clear up?"""
if not self.RunHandler.settings.debug:
if os.path.isdir(
self.temp_dir
): ## needed because we may have already deleted it
shutil.rmtree(self.temp_dir)
# def run_all(self, all_list, valid_list, sample_start, sample_size, Lock=None):
[docs]
def run_all(self, run_queue, sample_start, sample_size, Lock=None, pbar=None):
"""
Runs points from a list; usually invoked by run_batch in the RunHandler.
"""
while True: # not self.RunHandler.input_points.empty():
try:
# point=self.RunHandler.input_points.get()
self.last_point_dict = self.RunHandler.input_points.get(timeout=1.0)
# print(f"got {self.last_point_dict}")
except:
# self.RunHandler.progress_messages.put('')
return 0
if self.last_point_dict is None:
return 0
message = ""
point_number_in_batch = int(self.last_point_dict["ID"]) - sample_start + 1
try:
if self.RunHandler.settings.store_points_in_memory:
# Do not need to pass the point data, it is already specified above
# output=self.run_point(None,None,lock=Lock,point_data=point_dict) # should be able to access this object
output = self.run_point(None, None, lock=Lock)
else:
output = self.run_point(
self.last_point_dict["inputs"],
self.last_point_dict["ID"],
lock=Lock,
)
# point_dict = copy.deepcopy(self.last_point_dict)
# point_dict = self.last_point_dict
self.last_point_dict["valid"] = True
if "results" not in self.last_point_dict:
self.last_point_dict["results"] = output
message = (
"Batch point "
+ str(point_number_in_batch)
+ "/"
+ str(sample_size)
+ " is valid! "
)
if self.RunHandler.settings.debug:
message = message + str(self.last_point_dict["inputs"])
# message=message+str(point.inputs)
except Exception as e:
#traceback.print_exc()
self.log.debug("Running hit exception " + str(e))
# point_dict = self.last_point_dict
self.last_point_dict["valid"] = False
self.last_point_dict["results"] = (
self.RunHandler.settings.invalid_return_value
)
message = (
"Batch point "
+ str(point_number_in_batch)
+ "/"
+ str(sample_size)
+ " is not valid! "
)
if self.RunHandler.settings.debug:
message = message + str(self.last_point_dict["inputs"])
run_queue.put(self.last_point_dict)
# self.RunHandler.progress_messages.put(message)
if not self.RunHandler.use_tqdm:
UpdateProgress(point_number_in_batch, sample_size, message)
elif pbar: # only provided in single core mode.
pbar.update(1)
[docs]
def run_point(self, Point=None, ID=None, lock=None):
"""
Run a single point: create LH file, set codes to work on it; uses a single core for this
If ID is None, we assume we are running in single core mode
Point is just a list of floats corresponding to the input variables.
"""
if Point is None: # get data indirectly via the dictionary.
Point = self.last_point_dict[
"inputs"
] # if this is missing, get an exception -> ok. Still need this further down
tID = self.last_point_dict["ID"]
point_container = DataPoint(self.last_point_dict["ID"], Point)
else:
if not ID:
"""running in single core mode, no batch, but we may want to store the point/files for which we need a unique ID"""
point_container = self.RunHandler.newpoint(Point)
tID = point_container.ID
else:
tID = ID
"""
If we are handling the cores within the scan, we might want to still store the points (e.g. in the MCMC scan).
Then we need to create a point container for the point
Change of plan: now we always create the point so that we can pass info between the tools
"""
# if self.RunHandler.settings.store_points and make_point_container:
# point_container=DataPoint(tID,Point)
point_container = DataPoint(tID, Point)
"""
If we have no point ID, then we are running a single point in single core mode,
so we should stick it on the stack if we are storing points. BUT this is no good in multicore mode:
the points will be lost at the end. So instead we just keep the *last* point and leave it up to the
scan to handle the storage in that case.
"""
# if self.RunHandler.settings.store_points and ((not ID) or (make_point_container)):
if self.RunHandler.settings.varnames is not None:
point_container.make_var_dict(self.RunHandler.settings.varnames)
if self.RunHandler.settings.derived_variables is not None:
"""
Derived variables should be a function of the variables.
We expect a dictionary of the form
{"name": "expression", ... }
If you need something that depends on the observables too, then I will treat that as an observable
and you should define it as an observable with the 'FUNCTION' property
"""
vardict = point_container.get_var_dict()
derived = {
var: eval(self.RunHandler.settings.derived_variables[var], vardict)
for var in self.RunHandler.settings.derived_variables
}
point_container.var_dict.update(vardict)
# if self.RunHandler.settings.store_points and (not ID):
# This doesn't work (pickling)
# self.last_point=point_container
self.log.debug("Running point with input parameters: %s" % str(Point))
point_container.valid = True
os.chdir(self.temp_dir)
if self.RunHandler.do_marginalisation:
# print('Let us do fit')
try:
self.do_marginalisation(point_container)
point_container.clear_obs() # Entire toolchain will be run again with marginalised values
point_container.valid = True
except: # failure to marginalise ...
self.log.warning("Marginalisation failed")
point_container.valid = False
# if self.RunHandler.settings.store_points_in_memory:
if self.RunHandler.settings.store_invalid_points:
point_container.marginal_variables = [0.0] * len(
self.RunHandler.marginal_variables
)
if self.RunHandler.settings.store_points_in_memory:
self.last_point_dict = point_container.to_dict()
"""
self.last_point_dict = point_container.to_dict()
point_container.clear_obs()
point_container.valid = False
self.last_point_dict = point_container.to_dict()
self.last_observables=[]
point_all_dict=point_container.get_all_dict()
for obs,val in zip(self.RunHandler.settings.observables.keys(),self.RunHandler.settings.observables.values()):
#self.last_observables.append('NaN') # for invalid points we will be missing observables, declare them as NaN
self.last_observables.append(math.nan)
"""
else:
raise
## Now the results should be stored in the point_container.var_dict
if (
not self.RunHandler.settings.no_slha_output
): # clear output files if we need them
if os.path.exists(self.spc_file):
try:
os.remove(self.spc_file)
except Exception:
self.log.debug("Failed to remove file", exc_info=True)
if (
point_container.valid
): # haven't tried to do marginalisation or it didn't fail
if (
not self.RunHandler.settings.no_slha_input
): # check if we need an input file, and create it
if os.path.exists(self.RunHandler.settings.input_filename):
try:
os.remove(self.RunHandler.settings.input_filename)
except Exception:
self.log.debug("Failed to remove file", exc_info=True)
self.RunHandler.write_inputfile(
Point,
self.temp_dir,
self.RunHandler.settings.input_filename,
point_container.var_dict,
)
if self.RunHandler.settings.store_inputs:
shutil.copyfile(
self.RunHandler.settings.input_filename,
self.RunHandler.input_filename_prefix
+ str(tID)
+ self.RunHandler.settings.input_file_suffix,
)
# moved this up, here we check whether to go on because if the marginalisation failed
# point_container.valid=True
for code in self.RunHandler.tools:
# print(self.run_tools)
if self.run_tools[code.name]:
self.log.debug("Launching " + str(code))
# point_container.valid=False
try:
code.launch(
self.spc_file, self.temp_dir, self.log, point_container
)
# point_container.valid=True
except Exception:
self.log.debug(
self.name + " failed to run " + code.name, exc_info=True
)
point_container.valid = False
# if self.last_point_dict is not None and self.RunHandler.settings.store_points_in_memory: # try to keep whatever observables we already gleaned
if self.RunHandler.settings.store_points_in_memory:
self.last_point_dict = point_container.to_dict()
# self.last_point_dict = point_data
break
# raise #### return with error
if (
not point_container.valid
and not self.RunHandler.settings.store_invalid_points
):
# no need to continue
raise
"""
if self.RunHandler.settings.no_slha_output:
try:
result=self.RunHandler.postprocess(Point,self.temp_dir, self.log, lock)
self.last_observables = None
except:
self.log.debug(self.name+' failed to produce point')
raise
else:
"""
if not self.RunHandler.settings.no_slha_output:
# read in the spc if we expect it and haven't already done so
## slha output -> read slha file first and extract observables
if point_container.spc is None:
# self.last_slha=zslha.read(self.output_file)
if os.path.isfile(self.spc_file):
point_container.spc = zslha.read(self.spc_file)
else:
self.last_slha = point_container.spc
# self.RunHandler.settings.observables contains all observables.
# point_container should already have some values loaded into it. We just need to update them if not present
# self.last_observables=[self.last_slha.Value(obs['SLHA'][0], obs['SLHA'][1]) for obs in self.RunHandler.settings.observables.values()]
if len(self.RunHandler.settings.observables) > 0:
try:
self.last_observables = []
point_all_dict = point_container.get_all_dict()
for obs, val in zip(
self.RunHandler.settings.observables.keys(),
self.RunHandler.settings.observables.values(),
):
if (
obs in point_container.obs_dict
): # Trust that the correct value has been entered by the tools into the point container
self.last_observables.append(point_container.obs_dict[obs])
elif (
point_container.valid
): # observables that are not specified in the tool, but directly in the settings: extract these after all tools have run.
obsval = 0.0
if (
"SLHA" in val
): # New: allow for observables that are not SLHA values but may be functions.
obsval = float(
point_container.spc.Value(
val["SLHA"][0], val["SLHA"][1]
)
)
elif "FUNCTION" in val:
obsval = eval(
val["FUNCTION"], point_all_dict
) # allow for derived variables
else:
raise NameError(
"Unknown observable information: "
+ str(obs)
+ ", "
+ str(val)
)
self.last_observables.append(obsval)
point_all_dict[obs] = obsval
point_container.obs_dict[obs] = obsval
else:
# self.last_observables.append('NaN') # for invalid points we will be missing observables, declare them as NaN
self.last_observables.append(math.nan)
except Exception as e:
self.log.debug(
self.name
+ " point "
+ str(tID)
+ " failed to read observables, "
+ str(e)
)
point_container.valid = False
if not self.RunHandler.settings.store_invalid_points:
raise
if point_container.valid: # point ran successfully
try:
# result=self.RunHandler.postprocess(Point,self.last_observables,self.last_slha,self.temp_dir, self.log, lock)
result = self.RunHandler.postprocess(
Point,
self.last_observables,
point_container,
self.temp_dir,
self.log,
lock,
)
point_container.outputs = result
except:
self.log.debug(
self.name + " point " + str(tID) + " failed postprocessing"
)
point_container.valid = False
if not self.RunHandler.settings.store_invalid_points:
raise
else:
result = self.RunHandler.settings.invalid_return_value
if (
self.RunHandler.settings.csv_output
or self.RunHandler.settings.tabbed_output
):
# output_fileheaders will be a list with [0] being the filename and [1] being the separator.
if (
point_container.valid and not self.RunHandler.output_fileheaders.empty()
) or (
not point_container.valid
and not self.RunHandler.invalid_fileheaders.empty()
):
headerline = []
if tID:
headerline.append("ID")
if self.RunHandler.settings.variables is not None:
for v in self.RunHandler.settings.variables:
headerline.append(v)
else:
for x in range(len(Point)):
headerline.append("VARIABLE[" + str(x) + "]")
if self.RunHandler.do_marginalisation:
for v in self.RunHandler.marginal_variables:
headerline.append(v)
for obs in self.RunHandler.settings.observables:
headerline.append(obs)
if point_container.valid:
if hasattr(result, "__len__") and len(result) > 1:
for x in range(len(result)):
headerline.append("RESULT[" + str(x) + "]")
else:
if str(result) != "":
headerline.append("Result")
if point_container.valid:
getlist = self.RunHandler.output_fileheaders
else:
getlist = self.RunHandler.invalid_fileheaders
# while not self.RunHandler.output_fileheaders.empty():
# header_to_write=self.RunHandler.output_fileheaders.get()
while not getlist.empty():
header_to_write = getlist.get()
# header_to_write is [filename,separator]
headerlinestring = header_to_write[1].join(headerline)
## check to see if file already exists and prepend the header
if lock:
lock.acquire()
if not os.path.isfile(header_to_write[0]):
debug.command_line_log(
"echo "
+ headerlinestring
+ " >> "
+ str(header_to_write[0]),
self.log,
)
else: # either we have some results already, or we are resuming an old scan
with open(header_to_write[0], "r+") as f:
s = f.read()
if not s.startswith(headerlinestring):
f.seek(0)
f.write(headerlinestring + "\n" + s)
if lock:
lock.release()
# end if not self.RunHandler.output_fileheaders.empty():
outline = []
if tID:
outline.append(str(tID))
# outline.extend(map(str,Point))
outline.extend(map(self.RunHandler.format_variable, Point))
if self.RunHandler.do_marginalisation:
# outline.extend(map(str,point_container.marginal_variables))
outline.extend(
map(
self.RunHandler.format_variable,
point_container.marginal_variables,
)
)
# outline.extend(map(str,self.last_observables))
if self.last_observables is not None:
outline.extend(
map(self.RunHandler.format_observable, self.last_observables)
)
if hasattr(result, "__len__") and len(result) > 1:
# outline.extend(map(str,result))
outline.extend(map(self.RunHandler.format_observable, result))
else:
if str(result) != "":
# outline.append(str(result))
outline.append(self.RunHandler.format_observable(result))
if self.RunHandler.settings.csv_output:
outlinestr = ",".join(outline)
if point_container.valid:
outf = self.RunHandler.csv_filename
else:
outf = self.RunHandler.invalid_csv_filename
if lock:
lock.acquire()
debug.command_line_log(
"echo " + outlinestr + " >> " + outf, self.log
)
# debug.command_line_log("echo " + outlinestr + " >> " + self.RunHandler.csv_filename, self.log)
if lock:
lock.release()
if self.RunHandler.settings.tabbed_output:
outlinestr = "\t".join(outline)
if point_container.valid:
outf = self.RunHandler.tabbed_filename
else: # we already quit if we are not storing data
outf = self.RunHandler.invalid_tabbed_filename
if lock:
lock.acquire()
debug.command_line_log(
"echo " + outlinestr + " >> " + outf, self.log
)
# debug.command_line_log("echo " + outlinestr + " >> " + self.RunHandler.tabbed_filename, self.log)
if lock:
lock.release()
if self.RunHandler.settings.store_everything:
## copy everything except the log which is called <name>, and anything else that might be onerous
shutil.copytree(
self.temp_dir,
self.RunHandler.output_directory_prefix + str(tID),
ignore=shutil.ignore_patterns("*.out", self.name),
)
if self.RunHandler.settings.store_outputs:
# check first whether the point is valid. If so, then there must be an spc file.
# If the point is not valid, only store the file if we are told to -- and if it exists!
if point_container.valid or (
self.RunHandler.settings.store_invalid_points
and os.path.isfile(self.spc_file)
):
if self.RunHandler.settings.store_separate_files:
shutil.copyfile(
self.spc_file,
self.RunHandler.output_filename_prefix
+ str(tID)
+ self.RunHandler.settings.output_file_suffix,
)
else:
if lock:
lock.acquire()
debug.command_line_log(
"cat "
+ self.spc_file
+ " >> "
+ self.RunHandler.output_filename,
self.log,
)
debug.command_line_log(
'echo "ENDOFPARAMETERPOINT" >> '
+ self.RunHandler.output_filename,
self.log,
)
if lock:
lock.release()
# if self.RunHandler.settings.store_points and ((not ID) or (make_point_container)):
# point_container.outputs=result
# point_container.valid=True
if (
self.last_observables is not None
): ## if no slha then no observables to store ...
point_container.observables = self.last_observables
# if not point_container.valid and not self.RunHandler.settings.store_invalid_points:
# raise
# point_container.valid=True
# if self.last_point_dict is not None and self.RunHandler.settings.store_points_in_memory:
if self.RunHandler.settings.store_points_in_memory:
self.last_point_dict = point_container.to_dict()
# self.last_point_dict = point_data
if not point_container.valid:
raise
# self.last_point=point_container
# self.last_point_dict = point_container.to_dict() # by default not including the spc file
return result # this is for scans that just want a function that returns a number or some quantity
[docs]
def do_marginalisation(self, point_container):
import scipy
# print('hello!')
# var_dict=point_container.make_var_dict(self.RunHandler.settings.varnames)
# var_dict=point_container.get_var_dict(self.RunHandler.settings.varnames)
var_dict = point_container.var_dict
# print(var_dict)
def fun(x0): ## function to pass to optimisation routine
try:
point_container.clear_obs()
# print("h1"+str(x0))
if os.path.exists(self.RunHandler.settings.input_filename):
try:
os.remove(self.RunHandler.settings.input_filename)
except Exception:
self.log.debug("Failed to remove file", exc_info=True)
if os.path.exists(self.spc_file):
try:
os.remove(self.spc_file)
except Exception:
self.log.debug("Failed to remove file", exc_info=True)
# if len(self.RunHandler.marginal_variables) == 1: # special handling for 1d case
if self.RunHandler.oned_marginal:
for var, x in zip(self.RunHandler.marginal_variables, x0):
var_dict[var] = x
else:
for var, x, f in zip(
self.RunHandler.marginal_variables,
x0,
self.RunHandler.marginal_downscalers,
):
var_dict[var] = f(x)
if (
"Overloads" in self.RunHandler.marginalisation_options
): # allow overriding values in the input file, e.g. turning off decays in SPheno
self.RunHandler.write_inputfile(
point_container.inputs,
self.temp_dir,
self.RunHandler.settings.input_filename,
var_dict,
self.RunHandler.marginalisation_options["Overloads"],
)
else:
self.RunHandler.write_inputfile(
point_container.inputs,
self.temp_dir,
self.RunHandler.settings.input_filename,
var_dict,
)
for ncode in range(self.RunHandler.number_tools_run_marginalisation):
code = self.RunHandler.tools[ncode]
self.log.debug("Launching " + str(code))
try:
code.launch(
self.spc_file, self.temp_dir, self.log, point_container
)
except Exception:
self.log.debug(
self.name + " failed to run " + code.name, exc_info=True
)
raise #### return with error
if self.RunHandler.extra_observable_step:
if point_container.obs_dict is None:
point_container.obs_dict = OrderedDict()
for obs, val in zip(
self.RunHandler.settings.observables.keys(),
self.RunHandler.settings.observables.values(),
):
# point_container.obs_dict[obs] = float(point_container.spc.Value(val['SLHA'][0], val['SLHA'][1]))
if (
"SLHA" in val
): # New: allow for observables that are not SLHA values but may be functions.
point_container.obs_dict[obs] = float(
point_container.spc.Value(
val["SLHA"][0], val["SLHA"][1]
)
)
elif "FUNCTION" in val:
point_container.obs_dict[obs] = eval(
val["FUNCTION"], point_container.get_all_dict()
) # allow for derived variables
out_observables = [
float(eval(x, point_container.obs_dict))
for x in self.RunHandler.marginal_observables
]
res = self.RunHandler.marginal_chisquared(out_observables)
# print('res: '+str(res))
return res
except:
return 1e32
# result=scipy.optimize.minimize(fun, x0, args=(), method=None, jac=None, hess=None, hessp=None, bounds=None, constraints=(), tol=None)
# print('Try optimize')
try:
# result=scipy.optimize.minimize(fun, [0.0]*len(self.RunHandler.marginal_variables), tol=1.0)
# result=scipy.optimize.minimize(fun, [0.0]*len(self.RunHandler.marginal_variables), method=self.RunHandler.marginalisation_options, options={'xatol': 1,'gtol': 1e-22, 'disp': True})
# result=scipy.optimize.minimize(fun, [0.0]*len(self.RunHandler.marginal_variables), method=self.RunHandler.marginalisation_method, options=dict(eval(self.RunHandler.marginalisation_options)))
if self.RunHandler.oned_marginal:
import bsmart.marginalisation as marginalisation
result = marginalisation.oned_usebrentq(
fun,
self.RunHandler.marginal_ranges,
self.log,
options=self.RunHandler.marginalisation_options,
)
else:
result = scipy.optimize.minimize(
fun,
[0.0] * len(self.RunHandler.marginal_variables),
method=self.RunHandler.marginalisation_method,
options=self.RunHandler.marginalisation_options,
)
# result=scipy.optimize.basinhopping(fun, [0.0]*len(self.RunHandler.marginal_variables))
except Exception as e:
self.log.debug("Failed to optimize:" + str(e))
raise
# print('ran'+str(result.x))
results = []
if self.RunHandler.oned_marginal:
results.append(result)
for var in self.RunHandler.marginal_variables:
point_container.var_dict[var] = result
else:
for var, x, f in zip(
self.RunHandler.marginal_variables,
result.x,
self.RunHandler.marginal_downscalers,
):
res = f(x)
# print(var+':'+str(res))
try:
# print(point_container.var_dict)
point_container.var_dict[var] = res
except Exception as e:
self.log.debug("Failed to set point container " + str(e))
results.append(res)
point_container.marginal_variables = results
# print('Results: '+str(results))
return