Source code for bsmart.scans.CMAES

"""
Optimisation using CMAES


https://github.com/CyberAgentAILab/cmaes


BSMArt scan written by M. Goodsell


Requires:
        

pip3 install cmaes



"""

__meta__ = {
    "name": "CMAES",
    "requires": ["cmaes", "numpy"],
    "settings": {
        "Points": "Number of points",
        "CMAES MaxLoss": "Float",
        "CMAESEpisodes": "Int",
        "CMAESPopulationSize": "Int",
        "CMAESMaxGenerations": "Int",
        "CMAESMean": "Float",
        "CMAESSigma": "Float",
        "Sigma Tolerance": "Float",
        "Initial Sample": "Path to file"
    }
}



from bsmart.core import Scan as Scan

import numpy as np
import math
from bsmart import debug


import os
import shutil

from bsmart import BSMutil


from bsmart.BSMlikelihood import MakeLikelihoods, MakeGlobalLikelihood, safe_float

import sys, os

import matplotlib.pyplot as plt
import numpy as np

import cmaes

from cmaes import CMA

import math

[docs] class NewScan(Scan): """Scanner class for CMAES Scans"""
[docs] def initialise(self): if self.runsettings.store_points_in_memory and self.runsettings.store_invalid_points: self.naive = False self.runsettings.invalid_return_value = 0 else: self.naive = True # we treat invalid points as bad self.runsettings.invalid_return_value = []
def __init__(self, inputs, log): Scan.__init__(self, inputs, log) self.downscalers,self.upscalers = BSMutil.create_scalers(self.inputs,log) self.n_variables=len(self.upscalers) #self.use_loglike=eval(self.inputs['Setup'].get('LogLike',"True")) self.Points=self.inputs['Setup'].get('Points',10000) self.maxloss = np.log(1 + np.finfo(np.float64).max) + 1 # Create the likelihood functions based on 'scaling' property of variables, if they have it; otherwise use gaussian likelihood #elf.get_likelihood,self.observable_masks=MakeGlobalLikelihood(self.inputs['Observables'],self.use_loglike) #self.get_likelihood,self.observable_masks=MakeGlobalLikelihood(self.inputs['Observables'],return_type="NLL") # need a negative log likelihood self.likelihood_fns, self.observable_masks = MakeLikelihoods(self.inputs["Observables"], loglike=True) #self.likelihoodfns=MakeLikelihoods(self.inputs['Observables'],True) ## use log likelihood if "CMAES MaxLoss" in self.inputs["Setup"]: if self.inputs["Setup"].get("CMAES MaxLoss") is not None: self.maxloss = float(self.inputs["Setup"].get("CMAES MaxLoss")) #self.all_valid=[] #self.all_invalid=[] if 'CMAESEpisodes' in self.inputs['Setup']: self.episodes=int(self.inputs['Setup'].get('CMAESEpisodes')) else: #self.population_size=2*self.runsettings.cores self.episodes = 1 if 'CMAESPopulationSize' in self.inputs['Setup']: self.population_size=int(self.inputs['Setup'].get('CMAESPopulationSize')) else: #self.population_size=2*self.runsettings.cores self.population_size = None if "CMAESMaxGenerations" in self.inputs["Setup"]: self.max_n_generations = int(self.inputs["Setup"]["CMAESMaxGenerations"]) else: if self.population_size is not None: self.max_n_generations = int(self.Points/self.population_size) else: self.max_n_generations = 100 if 'CMAESMean' in self.inputs['Setup']: #self.mean=int(self.inputs['Setup'].get('CMAESMean')) self.mean=float(self.inputs['Setup'].get('CMAESMean'))*np.ones(self.n_variables) self.fixed_mean = True else: # random mean self.mean=np.random.rand(self.n_variables) self.fixed_mean = False if 'CMAESSigma' in self.inputs['Setup']: self.sigma=float(self.inputs['Setup'].get('CMAESSigma')) else: # use a reasonable default value self.sigma=1.0/math.sqrt(self.n_variables) self.sigma_tolerance = 1e-4 if 'Sigma Tolerance' in self.inputs['Setup']: # to check for convergence self.sigma_tolerance = float(self.inputs['Setup']["Sigma Tolerance"]) if 'Initial Sample' in self.inputs['Setup']: from bsmart.ml.seed_points import get_previous_sample seed_inputs,_ = get_previous_sample(sample_file=self.inputs['Setup']['Initial Sample'], inputs=self.inputs, nsamples = self.population_size, returntype = "NLL") """ seed_inputs,nlls = get_previous_sample(sample_file=self.inputs['Setup']['Initial Sample'], inputs=self.inputs, nsamples = self.population_size, returntype = "NLL") print(seed_inputs.shape) print(nlls.shape) print(f"Minimum nll: {np.min(nlls)}") print(np.hstack([seed_inputs, nlls.reshape(-1,1)])) # nlls reports size as (n,), need to change to (n,1) """ # now we have to scale the inputs: #self.seed_inputs = np.array([[f(x) for x,f in zip(point,self.downscalers)] for point in seed_inputs]) # NB want to set the mean to be the best point, which is the first one in the list self.mean = np.array([f(x) for x,f in zip(seed_inputs[0],self.downscalers)]) #self.seed_inputs[0] # self.sigma=0.01 # less variation # self.seed_inputs = None self.seed_inputs = np.array([[f(x) for x,f in zip(point,self.downscalers)] for point in seed_inputs]) else: self.seed_inputs = None
[docs] def postprocess(self,Point, observables, data_point,temp_dir,log, lock=None): """ return the likelihood; we won't get this far if the point failed to be generated """ """ calculate likelihood """ # For this algorithm we are minimising the objective function #res=1e100 if self.naive: return np.sum(self.get_losses(observables)) else: return 1.0
[docs] def smooth_cap_loss(self,x): """ Caps the loss by applying a sigmoid. This is useful for losses that are unbounded. """ return -self.maxloss*np.expm1(x/self.maxloss) # assume x is negative and want to change its sign
[docs] def get_losses(self,observables): """ Returns a list of losses. The C-function loss should be zero if the observable is within allowed bounds, and greater than zero outside it. If we include a validity condition, then other observables, evaluated afterwards, will be returned as NaN. -> We should assign these the maximum loss ~ 710. This is then compatible with the explicit use of 'hierarchical observables', where all other observables are set to the maximum. """ likeit=iter(self.likelihood_fns) return [self.smooth_cap_loss((next(likeit))(val)) if mask and not math.isnan(val := safe_float(v)) else float((next(likeit) and False) or self.maxloss) for v, mask in zip(observables, self.observable_masks) if mask]
[docs] def run(self): """ CMAES scan: just runs CMA-ES. It takes a minus-log-likelihood at the function to minimise. Excellent algorithm for finding best points """ self.log.info("Starting CMA-ES scan!") bounds=np.zeros((self.n_variables,2)) bounds[:,1] = np.ones(self.n_variables) #print(bounds) #optimizer=CMA(mean=np.zeros(self.n_variables),sigma=1.0) #mean=0.5*np.ones(self.n_variables),sigma=0.3) """ for generation in range(self.Points): solutions = [] for _ in range(optimizer.population_size): x = optimizer.ask() value = loglike(x) solutions.append((x, value)) upvals=[f(1.0/(1+math.exp(-x))) for x,f in zip(x,self.upscalers)] print(f"#{generation} {value} ({x}) -> ({upvals})") optimizer.tell(solutions) """ for episode in range(self.episodes): if not self.fixed_mean: self.mean = np.random.rand(self.n_variables) optimizer=CMA(bounds=bounds, mean=self.mean, sigma=self.sigma, population_size=self.population_size, lr_adapt=True) self.log.info('Starting CMAES run for episode '+str(episode+1)) for generation in range(self.max_n_generations): batch=[] cube_batch=[] for _ in range(optimizer.population_size): x = optimizer.ask() cube_batch.append(x) #batch.append([f(1.0/(1+math.exp(-y))) for y,f in zip(x,self.upscalers)]) batch.append([f(y) for y,f in zip(x,self.upscalers)]) results=self.RunManager.run_batch(batch) #batch.append([f(1.0/(1+math.exp(-y))) for y,f in zip(x,self.upscalers)]) #solutions = [(x,value) for x,value in zip(cube_batch,results)] # check for bad values if self.naive: solutions = [(x,value) if isinstance(value, (int, float)) else (x,self.maxloss) for x,value in zip(cube_batch,results)] else: # keeping 'invalid' points batch_observables = [ point["observables"] for point in self.RunManager.all_batch_points ] losses = np.array([self.get_losses(obs) for obs in batch_observables]) fitnesses = np.sum(losses, axis=1) solutions=[(pop, value) for pop, value in zip(cube_batch, fitnesses)] #print(solutions) self.log.info(f"Losses for batch: {solutions}") optimizer.tell(solutions) stds = np.std(np.array(cube_batch),axis=0) print(f"std: {stds} ") current_sigma=max(stds) #current_sigma = optimizer.sigma if any([optimizer.should_stop(), current_sigma < self.sigma_tolerance]): self.log.info(f"Converged afer {generation} generations") break self.log.info(f"Finished CMAES run")