Source code for bsmart.scans.DEAP

"""
Optimisation using genetic algorithm DEAP





BSMArt scan written by M. Goodsell


Requires:
        

pip3 install deap

https://github.com/DEAP/deap

"""

__meta__ = {
    "name": "DEAP",
    "requires": ["deap", "numpy"],
    "settings": {
        "Points": "Int",
        "DEAP": { 
                "Population Size": "Int", 
                "Generations": "Int", 
                "Global Only": "Bool"

        },
        "n_cores": "Int"
    }
}


# NB from deap/base.py the fitness has:
#     @property
#     def valid(self):
#         return len(self.wvalues) != 0




# from deap import creator, base, tools, algorithms

# creator.create("FitnessMax", base.Fitness, weights=(1.0,))
# creator.create("Individual", list, fitness=creator.FitnessMax)

# toolbox = base.Toolbox()

# toolbox.register("attr_bool", random.randint, 0, 1)
# toolbox.register("individual", tools.initRepeat, creator.Individual, toolbox.attr_bool, n=100)
# toolbox.register("population", tools.initRepeat, list, toolbox.individual)

# def evalOneMax(individual):
#     return sum(individual),

# toolbox.register("evaluate", evalOneMax)
# toolbox.register("mate", tools.cxTwoPoint)
# toolbox.register("mutate", tools.mutFlipBit, indpb=0.05)
# toolbox.register("select", tools.selTournament, tournsize=3)

# population = toolbox.population(n=300)

# NGEN=40
# for gen in range(NGEN):
#     offspring = algorithms.varAnd(population, toolbox, cxpb=0.5, mutpb=0.1)
#     fits = toolbox.map(toolbox.evaluate, offspring)
#     for fit, ind in zip(fits, offspring):
#         ind.fitness.values = fit
#     population = toolbox.select(offspring, k=len(population))
# top10 = tools.selBest(population, k=10)




from bsmart.core import Scan as Scan

import numpy as np
import math
from bsmart import debug


import os
import shutil

from bsmart import BSMutil


from bsmart.BSMlikelihood import MakeLikelihoods, MakeGlobalLikelihood, safe_float

import sys, os

#import matplotlib.pyplot as plt
import numpy as np
import random

from deap import creator, base, tools, algorithms



import math

[docs] class NewScan(Scan): """Scanner class for NF Scans"""
[docs] def initialise(self): # overload initial settings. self.runsettings.store_points_in_memory = True # Force storage of info for hierarchical loss computation self.runsettings.store_invalid_points = True # Need this to keep points where self.runsettings.invalid_return_value = 0
def __init__(self, inputs, log): Scan.__init__(self, inputs, log) self.downscalers,self.upscalers = BSMutil.create_scalers(self.inputs,log) self.n_variables=len(self.upscalers) #self.use_loglike=eval(self.inputs['Setup'].get('LogLike',"True")) self.Points=self.inputs['Setup'].get('Points',10000) # Create the likelihood functions based on 'scaling' property of variables, if they have it; otherwise use gaussian likelihood #self.get_likelihood,self.observable_masks=MakeGlobalLikelihood(self.inputs['Observables'],self.use_loglike) #self.get_likelihood,self.observable_masks=MakeGlobalLikelihood(self.inputs['Observables'],loglike=True) self.likelihood_fns,self.observable_masks=MakeLikelihoods(self.inputs['Observables'],loglike=True) #self.likelihoodfns=MakeLikelihoods(self.inputs['Observables'],True) ## use log likelihood self.live_observables = sum(1 if mask else 0 for mask in self.observable_masks) #self.all_valid=[] #self.all_invalid=[] self.pop_size = max(10,self.inputs['Setup'].get('n_cores',1)) self.NGEN = 40 self.global_only = False if 'DEAP' in self.inputs['Setup']: self.pop_size = self.inputs['Setup']['DEAP'].get('Population Size',self.pop_size) self.NGEN = self.inputs['Setup']['DEAP'].get('Generations',self.NGEN) self.global_only = self.inputs['Setup']['DEAP'].get('Global Only',self.global_only) #self.maxloss = 1e100 self.maxloss = 1e8
[docs] def postprocess(self,Point, observables, data_point,temp_dir,log, lock=None): """ return the likelihood; we won't get this far if the point failed to be generated """ """ calculate NLL -> minimise this """ # For this algorithm we are minimising the objective function # return 1.0 """ res=1e100 #if self.use_loglike: if True: try: res = -self.get_likelihood(observables) except: pass """ """ else: try: res = -math.log(self.get_likelihood(observables)) except: pass """ #print(f"Res? {res}") # return "weights"; this should be a tuple, but we sort this later return res #-math.log(self.get_likelihood(observables))
#return self.get_likelihood(observables)
[docs] def get_losses(self,observables): """ Returns a list of losses. The C-function loss should be zero if the observable is within allowed bounds, and greater than zero outside it. If we include a validity condition, then other observables, evaluated afterwards, will be returned as NaN. -> We should assign these the maximum loss ~ 710. This is then compatible with the explicit use of 'hierarchical observables', where all other observables are set to the maximum. """ likeit=iter(self.likelihood_fns) #res = [(f := next(likeit))(val) if mask and not math.isnan(val := safe_float(v)) else ((next(likeit) and False) or self.maxloss if mask else self.maxloss) # for v, mask in zip(observables, self.observable_masks)] #print(f"Result: {res}") #return res #return [(f := next(likeit))(val) if mask and not math.isnan(val := safe_float(v)) else ((next(likeit) and False) or self.maxloss if mask else self.maxloss) # for v, mask in zip(observables, self.observable_masks)] #res = [(f := next(likeit))(val) if mask and not math.isnan(val := safe_float(v)) else (next(likeit) and False) or self.maxloss # for v, mask in zip(observables, self.observable_masks) if mask] #print(f"These obs: {[obs for obs,mask in zip(observables,self.observable_masks) if mask]}") #print(f"Give losses: {res}") #return res return tuple([(f := (-1.0/self.maxloss)*(next(likeit))(val)) if mask and not math.isnan(val := safe_float(v)) else float((next(likeit) and False) or 1.0) for v, mask in zip(observables, self.observable_masks) if mask])
#return tuple([(f := (-1.0)*(next(likeit))(val)) if mask and not math.isnan(val := safe_float(v)) else float((next(likeit) and False) or self.maxloss) # for v, mask in zip(observables, self.observable_masks) if mask])
[docs] def run(self): """ DEAP scan Version 1: just use it as an optimisation over a single global likelihood @todo: allow multiple likelihoods for different observables, with hierarchical evaluation """ self.log.info("Starting DEAP scan!") if self.global_only: creator.create("FitnessMin", base.Fitness, weights=(-1.0,)) else: #multi weight creator.create("FitnessMin", base.Fitness, weights=(-1.0,)*self.live_observables) creator.create("Individual", list, fitness=creator.FitnessMin) self.toolbox = base.Toolbox() #self.toolbox.register("attr_bool", random.randint, 0, 1) self.toolbox.register("random_float", random.random) self.toolbox.register("individual", tools.initRepeat, creator.Individual, self.toolbox.random_float, n=self.n_variables) self.toolbox.register("population", tools.initRepeat, list, self.toolbox.individual) #def evalOneMax(individual): # return sum(individual), #self.toolbox.register("evaluate", evalOneMax) #self.toolbox.register("mate", tools.cxTwoPoint) self.toolbox.register("mate", tools.cxBlend,alpha=0.2) # better for floats # mutFlipBit is usually for bools #self.toolbox.register("mutate", tools.mutFlipBit, indpb=0.05) # deap.tools.mutGaussian(individual, mu, sigma, indpb) self.toolbox.register("mutate",tools.mutGaussian, mu=0.0, sigma=0.2, indpb=1.0/math.sqrt(self.n_variables)) self.toolbox.register("select", tools.selTournament, tournsize=3) population = self.toolbox.population(n=self.pop_size) #NGEN=40 #self.NGEN for gen in range(self.NGEN): # cxpb = mating prob, mutpb = mutation prob offspring = algorithms.varAnd(population, self.toolbox, cxpb=0.4, mutpb=0.05) #print(offspring) batch=[] cube_batch=[] for child in offspring: cube_batch.append(child) #batch.append([f(1.0/(1+math.exp(-y))) for y,f in zip(x,self.upscalers)]) batch.append([f(y) for y,f in zip(child,self.upscalers)]) print(f"Batch: {batch}") results=self.RunManager.run_batch(batch) #print(self.RunManager.all_batch_points) #full_losses = [(x,value) if isinstance(value, (int, float)) else (x,1e100) for x,value in zip(cube_batch,results)] batch_observables=[point['observables'] for point in self.RunManager.all_batch_points] keep_observables=[[obs for obs,mask in zip(point_obs,self.observable_masks) if mask] for point_obs in batch_observables] #print(f"Got observables: {batch_observables}") weights = [self.get_losses(obs) for obs in batch_observables] for fit, ind,result in zip(weights, offspring,results): ind.fitness.values = fit print(f"weights: {weights}") #weights = [(value,) if isinstance(value, (int, float)) else (1e100,) for value in results] # """ # valids = [True if isinstance(value, (int, float)) else False for value in results] # weights = [(value,) if valid else (1e100,) for value,valid in zip(results,valids)] # print(f"Results: {results}") # for fit, ind,valid in zip(weights, offspring,valids): # if valid: # ind.fitness.values = fit # """ # """ # #The valid property is set when we give fit values # if ind.fitness.valid: # print(f"{fit} ->valid") # else: # print(f"{fit} -> INVALID") # """ population = self.toolbox.select(offspring, k=len(population)) top10 = tools.selBest(population, k=10) self.log.info(f"Best points: {top10}")