# coding: utf8
"""
Refinement Class
================
This class defines all resampling strategies that can be used.
It implements the following methods:
- :func:`Refiner.func`
- :func:`Refiner.func_sigma`
- :func:`Refiner.pred_sigma`
- :func:`Refiner.distance_min`
- :func:`Refiner.hypercube`
- :func:`Refiner.sigma`
- :func:`Refiner.leave_one_out_sigma`
- :func:`Refiner.leave_one_out_sobol`
- :func:`Refiner.extrema`
- :func:`Refiner.hybrid`
- :func:`Refiner.optimization`
:Example::
>> corners = ((10, 400), (18, 450))
>> resample = Refiner(pod, corners)
>> new_point = resample.sigma()
"""
import logging
import copy
import warnings
from scipy.optimize import (differential_evolution, basinhopping)
from scipy.stats import norm
import numpy as np
from sklearn import preprocessing
import batman as bat
from .sampling import Doe
from ..misc import optimization
[docs]class Refiner(object):
"""Resampling the space of parameters."""
logger = logging.getLogger(__name__)
[docs] def __init__(self, data, corners, delta_space=0.08, discrete=None):
"""Initialize the refiner with the Surrogate and space corners.
Points data are scaled between ``[0, 1]`` based on the size of the
corners taking into account a :param:``delta_space`` factor.
:param data: Surrogate or space
:type data: :class:`batman.surrogate.SurrogateModel` or
:class:`batman.space.Space`.
:param array_like corners: hypercube ([min, n_features], [max, n_features]).
:param float delta_space: Shrinking factor for the parameter space.
:param int discrete: index of the discrete variable.
"""
if isinstance(data, bat.surrogate.SurrogateModel):
self.surrogate = data
else:
self.surrogate = bat.surrogate.SurrogateModel('kriging', data.corners)
self.surrogate.space = data
self.logger.debug("Using Space instance instead of SurrogateModel "
"-> restricted to discrepancy refiner")
if self.surrogate.pod is None:
self.pod_S = 1
else:
self.pod_S = self.surrogate.pod.S
self.space = self.surrogate.space
self.points = copy.deepcopy(self.space[:])
self.discrete = discrete
self.corners = np.array(corners).T
self.dim = len(self.corners)
# Prevent delta space contraction on discrete
_dim = list(range(self.dim))
if self.discrete is not None:
_dim.pop(self.discrete)
# Inner delta space contraction: delta_space * 2 factor
for i in _dim:
self.corners[i, 0] = self.corners[i, 0] + delta_space\
* (self.corners[i, 1]-self.corners[i, 0])
self.corners[i, 1] = self.corners[i, 1] - delta_space\
* (self.corners[i, 1]-self.corners[i, 0])
# Data scaling
self.scaler = preprocessing.MinMaxScaler()
self.scaler.fit(self.corners.T)
self.points = self.scaler.transform(self.points)
[docs] def func(self, coords, sign=1):
r"""Get the prediction for a given point.
Retrieve Gaussian Process estimation. The function returns plus or
minus the function depending on the sign.
`-1` if we want to find the max and `1` if we want the min.
:param lst(float) coords: coordinate of the point
:param float sign: -1. or 1.
:return: L2 norm of the function at the point
:rtype: float
"""
f, _ = self.surrogate(coords)
modes_weights = np.array(self.pod_S ** 2).reshape(-1, 1)
sum_f = np.average(f, weights=modes_weights)
return sign * sum_f
[docs] def pred_sigma(self, coords):
"""Prediction and sigma.
Same as :func:`Refiner.func` and :func:`Refiner.func_sigma`.
Function prediction and sigma are weighted using POD modes.
:param lst(float) coords: coordinate of the point
:returns: sum_f and sum_sigma
:rtype: floats
"""
f, sigma = self.surrogate(coords)
modes_weights = np.array(self.pod_S ** 2).reshape(-1, 1)
sum_f = np.average(f, weights=modes_weights)
sum_sigma = np.average(sigma, weights=modes_weights)
return sum_f, sum_sigma
[docs] def distance_min(self, point):
"""Get the distance of influence.
Compute the distance, Linf norm between the anchor point and
every sampling points. Linf allows to add this lenght to all
coordinates and ensure that no points will be within this hypercube.
It returns the minimal distance. :attr:`point` needs to be scaled by
:attr:`self.corners` so the returned distance is scaled.
:param array_like point: Anchor point.
:return: The distance to the nearest point.
:rtype: float.
"""
point = self.scaler.transform(point.reshape(1, -1))[0]
distances = np.array([np.linalg.norm(pod_point - point)
for _, pod_point in enumerate(self.points)])
# Do not get itself
distances = distances[np.nonzero(distances)]
distance = min(distances)
self.logger.debug("Distance min: {}".format(distance))
return distance
[docs] def hypercube_distance(self, point, distance):
"""Get the hypercube to add a point in.
Propagate the distance around the anchor.
:attr:`point` is scaled by :attr:`self.corners` and input distance has
to be. Ensure that new values are bounded by corners.
:param array_like point: Anchor point.
:param float distance: The distance of influence.
:return: The hypercube around the point.
:rtype: array_like.
"""
point = self.scaler.transform(point.reshape(1, -1))[0]
hypercube = np.array([point - distance, point + distance])
hypercube = self.scaler.inverse_transform(hypercube)
hypercube = hypercube.T
self.logger.debug("Prior Hypercube:\n{}".format(hypercube))
hypercube[:, 0] = np.minimum(hypercube[:, 0], self.corners[:, 1])
hypercube[:, 0] = np.maximum(hypercube[:, 0], self.corners[:, 0])
hypercube[:, 1] = np.minimum(hypercube[:, 1], self.corners[:, 1])
hypercube[:, 1] = np.maximum(hypercube[:, 1], self.corners[:, 0])
self.logger.debug("Post Hypercube:\n{}".format(hypercube))
return hypercube
[docs] def hypercube_optim(self, point):
"""Get the hypercube to add a point in.
Compute the largest hypercube around the point based on the *L2-norm*.
Ensure that only the *leave-one-out* point lies within it.
Ensure that new values are bounded by corners.
:param np.array point: Anchor point.
:return: The hypercube around the point (a point per column).
:rtype: array_like.
"""
distance = self.distance_min(point) / 3
x0 = self.hypercube_distance(point, distance).flatten('F')
point = np.minimum(point, self.corners[:, 1])
point = np.maximum(point, self.corners[:, 0])
point = self.scaler.transform(point.reshape(1, -1))[0]
gen = (p for p in self.points if not np.allclose(p, point))
def min_norm(hypercube):
"""Compute euclidean distance.
:param np.array hypercube: [x1, y1, x2, y2, ...]
:return: distance of between hypercube points
:rtype: float
"""
hypercube = hypercube.reshape(2, self.dim)
try:
hypercube = self.scaler.transform(hypercube)
except ValueError: # If the hypercube is nan
return np.inf
# Sort coordinates
for i in range(self.dim):
hypercube[:, i] = hypercube[hypercube[:, i].argsort()][:, i]
hypercube = hypercube.T
diff = hypercube[:, 1] - hypercube[:, 0]
n = - np.linalg.norm(diff)
# Check aspect ratio
aspect = abs(diff)
with np.errstate(divide='ignore', invalid='ignore'):
aspect = abs(np.divide(np.power(np.max(aspect), self.dim),
np.prod(aspect)))
aspect = np.power(aspect, 1 / self.dim)
if not aspect <= 1.5:
return np.inf
# Verify that LOO point is inside
insiders = (hypercube[:, 0] <= point).all() & (point <= hypercube[:, 1]).all()
if not insiders:
return np.inf
# Verify that no other point is inside
insiders = np.array([True if (hypercube[:, 0] <= p).all() &
(p <= hypercube[:, 1]).all() else False
for p in gen]).any()
if insiders:
return np.inf
return n
bounds = np.reshape([self.corners] * 2, (self.dim * 2, 2))
# results = differential_evolution(min_norm, bounds, popsize=100)
minimizer_kwargs = {"method": "L-BFGS-B", "bounds": bounds}
results = basinhopping(min_norm, x0,
niter=1000, minimizer_kwargs=minimizer_kwargs)
hypercube = results.x.reshape(2, self.dim)
for i in range(self.dim):
hypercube[:, i] = hypercube[hypercube[:, i].argsort()][:, i]
hypercube = hypercube.T
self.logger.debug("Corners:\n{}".format(self.corners))
self.logger.debug("Optimization Hypercube:\n{}".format(hypercube))
return hypercube
[docs] def discrepancy(self):
"""Find the point that minimize the discrepancy.
:return: The coordinate of the point to add.
:rtype: lst(float).
"""
self.logger.debug("Discrepancy strategy")
init_discrepancy = self.surrogate.space.discrepancy()
@optimization(self.corners, self.discrete)
def func_discrepancy(coords):
"""Discrepancy of the augmented space."""
sample = np.vstack([self.surrogate.space[:], coords])
return self.surrogate.space.discrepancy(sample)
min_x, new_discrepancy = func_discrepancy()
rel_change = (new_discrepancy - init_discrepancy) / init_discrepancy
self.logger.debug("Relative change in discrepancy: {}%".format(rel_change))
return min_x
[docs] def sigma(self, hypercube=None):
"""Find the point at max Sigma.
It returns the point where the variance (sigma) is maximum.
To do so, it uses Gaussian Process information.
A genetic algorithm get the global maximum of the function.
:param array_like hypercube: Corners of the hypercube.
:return: The coordinate of the point to add.
:rtype: lst(float).
"""
if hypercube is None:
hypercube = self.corners
self.logger.debug("Sigma strategy")
@optimization(hypercube, self.discrete)
def func_sigma(coords):
r"""Get the Sigma for a given point.
Retrieve Gaussian Process estimation of sigma.
A composite indicator is constructed using POD's modes.
.. math:: \sum S_i^2 \times \sigma_i
Function returns `- sum_sigma` in order to have a minimization
problem.
:param lst(float) coords: coordinate of the point.
:return: - sum_sigma.
:rtype: float.
"""
_, sigma = self.surrogate(coords)
sum_sigma = np.sum(self.pod_S ** 2 * sigma)
return - sum_sigma
min_x, _ = func_sigma()
return min_x
[docs] def leave_one_out_sigma(self, point_loo):
"""Mixture of Leave-one-out and Sigma.
Estimate the quality of the POD by *leave-one-out cross validation*
(LOOCV), and add a point arround the max error point.
The point is added within an hypercube around the max error point.
The size of the hypercube is equal to the distance with
the nearest point.
:param tuple point_loo: leave-one-out point.
:return: The coordinate of the point to add.
:rtype: lst(float).
"""
self.logger.info("Leave-one-out + Sigma strategy")
# Get the point of max error by LOOCV
point = np.array(point_loo)
# Construct the hypercube around the point
# distance = self.distance_min(point)
# hypercube = self.hypercube_distance(point, distance)
hypercube = self.hypercube_optim(point)
# Global search of the point within the hypercube
point = self.sigma(hypercube)
return point
[docs] def leave_one_out_sobol(self, point_loo, dists):
"""Mixture of Leave-one-out and Sobol' indices.
Same as function :func:`leave_one_out_sigma` but change the shape
of the hypercube. Using Sobol' indices, the corners are shrinked
by the corresponding percentage of the total indices.
:param tuple point_loo: leave-one-out point.
:param lst(str) dists: List of valid openturns distributions as string.
:return: The coordinate of the point to add.
:rtype: lst(float).
"""
self.logger.info("Leave-one-out + Sobol strategy")
# Get the point of max error by LOOCV
point = np.array(point_loo)
# Get Sobol' indices
analyse = bat.uq.UQ(self.surrogate, dists=dists)
indices = analyse.sobol()[2]
indices = indices * (indices > 0)
indices = preprocessing.normalize(indices.reshape(1, -1), norm='max')
# Prevent indices inferior to 0.1
indices[indices < 0.1] = 0.1
# Construct the hypercube around the point
hypercube = self.hypercube_optim(point)
# Modify the hypercube with Sobol' indices
for i in range(self.dim):
hypercube[i, 0] = hypercube[i, 0] + (1 - indices[0, i])\
* (hypercube[i, 1]-hypercube[i, 0]) / 2
hypercube[i, 1] = hypercube[i, 1] - (1 - indices[0, i])\
* (hypercube[i, 1]-hypercube[i, 0]) / 2
self.logger.debug("Post Hypercube:\n{}".format(hypercube))
# Global search of the point within the hypercube
point = self.sigma(hypercube)
return point
[docs] def extrema(self, refined_pod_points):
"""Find the min or max point.
Using an anchor point based on the extremum value at sample points,
search the hypercube around it. If a new extremum is found,it uses
*Nelder-Mead* method to add a new point.
The point is then bounded back by the hypercube.
:return: The coordinate of the point to add
:rtype: lst(float)
"""
self.logger.info("Extrema strategy")
points = np.delete(self.points, refined_pod_points, 0)
point = None
new_points = []
# Get max-max and max-min then min-max and min-min
for sign in [-1., 1.]:
self.logger.debug("Sign (-1 : Maximum ; 1 : Minimum) -> {}"
.format(sign))
# Get a sample point where there is an extrema around
while point is None:
# Get min or max point
evaluations = np.array([self.func(pod_point, sign)
for _, pod_point in enumerate(points)])
try:
min_idx = np.argmin(evaluations)
except ValueError:
point = True
break
point = points[min_idx]
point_eval = min(evaluations) * sign
self.logger.debug("Extremum located at sample point: {} -> {}"
.format(point, point_eval))
# Construct the hypercube around the point
distance = self.distance_min(point)
hypercube = self.hypercube_distance(point, distance)
# Global search of the point within the hypercube
first_extremum = differential_evolution(self.func,
hypercube,
args=(sign,))
first_extremum.fun *= sign
self.logger.debug("Optimization first extremum: {} -> {}"
.format(first_extremum.x,
first_extremum.fun))
second_extremum = differential_evolution(self.func,
hypercube,
args=(-sign,))
second_extremum.fun *= - sign
self.logger.debug("Optimization second extremum: {} -> {}"
.format(second_extremum.x,
second_extremum.fun))
# Check for new extrema, compare with the sample point
if sign * first_extremum.fun < sign * point_eval:
# Nelder-Mead expansion
first_extremum = np.array([first_extremum.x +
(first_extremum.x - point)])
# Constrain to the hypercube
first_extremum = np.maximum(first_extremum,
hypercube[:, 0])
first_extremum = np.minimum(first_extremum,
hypercube[:, 1])
new_points.append(first_extremum[0].tolist())
self.logger.debug("Extremum-max: {}"
.format(first_extremum[0]))
if sign * second_extremum.fun > sign * point_eval:
second_extremum = np.array([second_extremum.x +
(second_extremum.x - point)])
second_extremum = np.maximum(second_extremum,
hypercube[:, 0])
second_extremum = np.minimum(second_extremum,
hypercube[:, 1])
if (second_extremum != first_extremum).all():
new_points.append(second_extremum[0].tolist())
self.logger.debug("Extremum-min: {}"
.format(second_extremum[0]))
else:
self.logger.debug("Extremum-min egal: not added.")
else:
point = None
points = np.delete(points, min_idx, 0)
point = None
refined_pod_points.append(min_idx)
return new_points, refined_pod_points
[docs] def hybrid(self, refined_pod_points, point_loo, method, dists):
"""Composite resampling strategy.
Uses all methods one after another to add new points.
It uses the navigator defined within settings file.
:param lst(int) refined_pod_points: points idx not to consider for extrema
:param point_loo: leave one out point
:type point_loo: :class:`batman.space.point.Point`
:param str strategy: resampling method
:param lst(str) dists: List of valid openturns distributions as string.
:return: The coordinate of the point to add
:rtype: lst(float)
"""
self.logger.info(">>---Hybrid strategy---<<")
if method == 'sigma':
new_point = self.sigma()
elif method == 'sigma':
new_point = self.discrepancy()
elif method == 'loo_sigma':
new_point = self.leave_one_out_sigma(point_loo)
elif method == 'loo_sobol':
new_point = self.leave_one_out_sobol(point_loo, dists)
elif method == 'extrema':
new_point, refined_pod_points = self.extrema(refined_pod_points)
elif method == 'discrepancy':
new_point = self.discrepancy()
elif method == 'optimization':
new_point = self.optimization()
else:
self.logger.exception("Resampling method does't exits")
raise SystemExit
return new_point, refined_pod_points
[docs] def optimization(self, method='EI', extremum='min'):
"""Maximization of the Probability/Expected Improvement.
:param str method: Flag ['EI', 'PI'].
:param str extremum: minimization or maximization objective
['min', 'max'].
:return: The coordinate of the point to add.
:rtype: lst(float).
"""
sign = 1 if extremum == 'min' else -1
gen = [self.func(x, sign=sign)
for x in self.scaler.inverse_transform(self.points)]
arg_min = np.argmin(gen)
min_value = gen[arg_min]
min_x = self.points[arg_min]
self.logger.info('Current extrema value is: f(x)={} for x={}'
.format(sign * min_value, min_x))
# Do not check point is close on the discrete parameter
if self.discrete is not None:
no_discrete = [list(range(self.points.shape[1]))]
no_discrete[0].pop(self.discrete)
else:
no_discrete = None
@optimization(self.corners, self.discrete)
def probability_improvement(x):
"""Do probability of improvement."""
x_scaled = self.scaler.transform(x.reshape(1, -1))
too_close = np.array([True if np.linalg.norm(
x_scaled[0][no_discrete] - p[no_discrete], -1) < 0.02
else False for p in self.points]).any()
if too_close:
return np.inf
pred, sigma = self.pred_sigma(x)
pred = sign * pred
std_dev = np.sqrt(sigma)
pi = norm.cdf((target - pred) / std_dev)
return - pi
@optimization(self.corners, self.discrete)
def expected_improvement(x):
"""Do expected improvement."""
x_scaled = self.scaler.transform(x.reshape(1, -1))
too_close = np.array([True if np.linalg.norm(
x_scaled[0][no_discrete] - p[no_discrete], -1) < 0.02
else False for p in self.points]).any()
if too_close:
return np.inf
pred, sigma = self.pred_sigma(x)
pred = sign * pred
std_dev = np.sqrt(sigma)
diff = min_value - pred
ei = diff * norm.cdf(diff / std_dev)\
+ std_dev * norm.pdf(diff / std_dev)
return - ei
with warnings.catch_warnings():
warnings.simplefilter("ignore")
if method == 'PI':
target = min_value - 0.1 * np.abs(min_value)
max_ei, _ = probability_improvement()
else:
max_ei, _ = expected_improvement()
return max_ei
[docs] def sigma_discrepancy(self, weights=None):
"""Maximization of the composite indicator: sigma - discrepancy.
:param list(float) weights: respectively weights of sigma and discrepancy.
:return: The coordinate of the point to add.
:rtype: lst(float).
"""
weights = [0.5, 0.5] if weights is None else weights
doe = Doe(500, self.corners, 'halton')
sample = doe.generate()
_, sigma = zip(*[self.pred_sigma(s) for s in sample])
disc = [1 / self.surrogate.space.discrepancy(
np.vstack([self.surrogate.space, p])) for p in sample]
sigma = np.array(sigma).reshape(-1, 1)
disc = np.array(disc).reshape(-1, 1)
scale_sigma = preprocessing.StandardScaler().fit(sigma)
scale_disc = preprocessing.StandardScaler().fit(disc)
@optimization(self.corners, self.discrete)
def f_obj(x):
"""Maximize the inverse of the discrepancy plus sigma."""
_, sigma = self.pred_sigma(x)
sigma = scale_sigma.transform(sigma.reshape(1, -1))
disc = 1 / self.surrogate.space.discrepancy(
np.vstack([self.surrogate.space, x]))
disc = scale_disc.transform(disc.reshape(1, -1))
sigma_disc = sigma * weights[0] + disc * weights[1]
return - sigma_disc
max_sigma_disc, _ = f_obj()
return max_sigma_disc