Source code for

# coding: utf8
Space class

Derives from :py:class:`list` and constitutes a groupment for points.
The space can be filled using low discrepancy sequences from
:class:`openturns.LowDiscrepancySequence`, it can be resampled or points can be
added manually.



    >> from import Space
    >> from batman.point import Point
    >> space = Space(settings)
    >> point = Point([12.3, 18.0])
    >> space += point

import logging
import os
import itertools
import numpy as np
from scipy.optimize import differential_evolution
from sklearn import preprocessing
from .sampling import Doe
from .point import Point
from .refiner import Refiner
from .. import visualization

[docs]class Space(list): """Manages the space of parameters.""" logger = logging.getLogger(__name__)
[docs] def __init__(self, corners, sample=np.inf, nrefine=0, plabels=None, multifidelity=None, duplicate=False): """Generate a Space. :param array_like corners: hypercube ([min, n_features], [max, n_features]). :param int/array_like sample: number of sample or list of sample of shape (n_samples, n_features). :param int nrefine: number of point to use for refinement. :param list(str) plabels: parameters' names. :param list(float) multifidelity: Whether to consider the first parameter as the fidelity level. It is a list of ['cost_ratio', 'grand_cost']. :param bool duplicate: Whether to allow duplicate points in space. """ if isinstance(sample, (int, float)): self.doe_init = sample else: self.doe_init = len(sample) if nrefine > 0: self.refiner = None self.max_points_nb = nrefine + self.doe_init else: self.max_points_nb = self.doe_init self.dim = len(corners[0]) self.multifidelity = multifidelity self.duplicate = duplicate # Multifidelity configuration if multifidelity is not None: self.doe_cheap = self._cheap_doe_from_expensive(self.doe_init)'Multifidelity with Ne: {} and Nc: {}' .format(self.doe_init, self.doe_cheap)) # create parameter list and omit fidelity if relevent if plabels is not None: self.plabels = plabels try: self.plabels.remove('fidelity') except ValueError: pass else: self.plabels = ["x" + str(i) for i in range(self.dim)] # corner points self.corners = [Point(p) for p in corners] # Point of the sample resampled around self.refined_pod_points = [] # corner points validation for i in range(self.dim): if corners[0][i] == corners[1][i]: raise ValueError('{}th corners coordinate are equal'
.format(i + 1)) def __str__(self): s = ("Space summary:\n" "Hypercube points: {}\n" "Number of points: {}\n" "Max number of points: {}").format([c for c in self.corners], len(self), self.max_points_nb) return s def __repr__(self): s = ("{}\n" "Points:\n" "{}").format(str(self), super(Space, self).__repr__()) return s def __iadd__(self, points): """Add `points` to the space. Raise if point already exists or if space is over full. :param array_like: point(s) to add to space (n_samples, n_features). """ # determine if adding one or multiple points try: points[0][0] except (TypeError, IndexError): points = [points] points_set = set(self) for point in points: # check point dimension is correct if (len(point) - 1 if self.multifidelity else len(point)) != self.dim: self.logger.warning("Ignoring Point - Coordinates dimensions" "mismatch - is {}, should be {}" .format(len(point), self.dim)) continue # check space is full if self.is_full(): self.logger.warning("Ignoring Point - Full Space - {}" .format(point)) continue point = Point(point) test_point = np.array(point)[1:] if self.multifidelity else np.array(point) # verify point is inside not_alien = (self.corners[0] <= test_point).all()\ & (test_point <= self.corners[1]).all() if not not_alien: self.logger.warning("Ignoring Point - Out of Space - {}" .format(point)) continue # verify point is not already in space if (point not in points_set) or self.duplicate: self.append(point) points_set.add(point) else: self.logger.warning("Ignoring Point - Duplicate - {}" .format(point)) continue return self
[docs] def sampling(self, n_samples=None, kind='halton', dists=None, discrete=None): """Create point samples in the parameter space. Minimum number of samples for halton and sobol: 4 For uniform sampling, the number of points is per dimensions. The points are registered into the space and replace existing ones. :param int n_samples: number of samples. :param str kind: method of sampling. :param lst(str) dists: List of valid openturns distributions as string. :param int discrete: index of the discrete variable :return: List of points. :rtype: self. """ if self.multifidelity and n_samples is None: n_samples = self._cheap_doe_from_expensive(self.doe_init) elif self.multifidelity and n_samples is not None: n_samples = self._cheap_doe_from_expensive(n_samples) elif not self.multifidelity and n_samples is None: n_samples = self.doe_init bounds = np.array(self.corners) doe = Doe(n_samples, bounds, kind, dists, discrete) samples = doe.generate() # concatenate cheap and expensive space and add identifier 0 or 1 if self.multifidelity: levels = np.vstack((np.zeros((self.doe_init, 1)), np.ones((self.doe_cheap, 1)))) samples = np.vstack((samples[0:self.doe_init, :], samples)) samples = np.hstack((levels, samples)) if kind == 'saltelli': self.duplicate = True self.empty() self += samples"Created {} samples with the {} method" .format(len(self), kind)) self.logger.debug("Points are:\n{}".format(samples))"Discrepancy is {}".format(self.discrepancy()))
return self
[docs] def refine(self, surrogate, method, point_loo=None, delta_space=0.08, dists=None, hybrid=None, discrete=None, extremum='min'): """Refine the sample, update space points and return the new point(s). :param surrogate: Surrogate. :type surrogate: :class:`batman.surrogate.SurrogateModel`. :param str method: Refinement method. :param array_like point_loo: Leave-one-out worst point (n_features,). :param float delta_space: Shrinking factor for the parameter space. :param lst(str) dists: List of valid openturns distributions as string. :param lst(lst(str, int)) hybrid: Navigator as list of [Method, n]. :param int discrete: Index of the discrete variable. :param str extremum: Minimization or maximization objective ['min', 'max']. :return: List of points to add. :rtype: Element or list of :class:``. """ # Refinement strategy if (self.refiner is None) and (method == 'hybrid'): strategy = [[m[0]] * m[1] for m in hybrid] self.hybrid = itertools.cycle(itertools.chain.from_iterable(strategy)) self.refiner = Refiner(surrogate, self.corners, delta_space, discrete) if method == 'sigma': new_point = self.refiner.sigma() elif method == 'discrepancy': new_point = self.refiner.discrepancy() elif method == 'loo_sigma': new_point = self.refiner.leave_one_out_sigma(point_loo) elif method == 'loo_sobol': new_point = self.refiner.leave_one_out_sobol(point_loo, dists) elif method == 'extrema': new_point, self.refined_pod_points = self.refiner.extrema(self.refined_pod_points) elif method == 'hybrid': new_point, self.refined_pod_points = self.refiner.hybrid(self.refined_pod_points, point_loo, next(self.hybrid), dists) elif method == 'optimization': new_point = self.refiner.optimization(extremum=extremum) elif method == 'sigma_discrepancy': new_point = self.refiner.sigma_discrepancy() try: point = [Point(point) for point in [new_point]] except TypeError: point = [Point(point) for point in new_point] # Check if points are added to space so only added points are returned points_set = set(self) new_point = [] for p in point: try: points_set.add(p) self += p except TypeError: # Empty list continue if p in self: new_point.append(p)'Refined sampling with new point: {}'.format(new_point))"New discrepancy is {}".format(self.discrepancy()))
return new_point
[docs] def empty(self): """Remove all points."""
del self[:]
[docs] def is_full(self): """Return whether the maximum number of points is reached."""
return len(self) >= self.max_points_nb def _cheap_doe_from_expensive(self, n): """Compute the number of points required for the cheap DOE. :param int n: size of the expensive design. :return: size of the cheap design. :rtype: int. """ doe_cheap = (self.multifidelity[1] - n) * self.multifidelity[0] doe_cheap = int(doe_cheap) if doe_cheap / float(n) <= 1: self.logger.error('Nc/Ne must be positive') raise SystemExit self.max_points_nb = n + doe_cheap return doe_cheap
[docs] def optimization_results(self, extremum): """Compute the optimal value. :param str extremum: minimization or maximization objective ['min', 'max']. """ sign = 1 if extremum == 'min' else -1 gen = [self.refiner.func(x, sign=sign) for x in self] arg_extremum = np.argmin(gen) _extremum = sign * gen[arg_extremum] _x = self[arg_extremum]'New extremum value is: f(x)={} for x={}' .format(_extremum, _x)) bounds = np.array(self.corners).T results = differential_evolution(self.refiner.func, bounds, args=(sign,)) _extremum = sign * _x = results.x'Optimization with surrogate: f(x)={} for x={}'
.format(_extremum, _x))
[docs] def discrepancy(self, sample=None): """Compute the centered discrepancy. :return: Centered discrepancy. :rtype: float. """ scaler = preprocessing.MinMaxScaler() if sample is None: sample = scaler.transform(self) else: sample = scaler.transform(sample) n_s = len(sample) abs_ = abs(sample - 0.5) disc1 = np.sum( + 0.5 * abs_ - 0.5 * abs_ ** 2, axis=1)) prod_arr = 1 for i in range(self.dim): s0 = sample[:, i] prod_arr *= (1 + 0.5 * abs(s0[:, None] - 0.5) + 0.5 * abs(s0 - 0.5) - 0.5 * abs(s0[:, None] - s0)) disc2 = prod_arr.sum() c2 = (13.0 / 12.0) ** self.dim - 2.0 / n_s * disc1 + 1.0 / (n_s ** 2) * disc2
return c2
[docs] def read(self, path): """Read space from the file `path`.""" self.empty() space = np.loadtxt(path) for p in space: self += p.flatten().tolist()
self.logger.debug('Space read from {}'.format(path))
[docs] def write(self, path): """Write space in file. After writting points, it plots them. :param str path: folder to save the points in. """ np.savetxt(path, self) resampling = len(self) - self.doe_init path = os.path.join(os.path.dirname(os.path.abspath(path)), 'DOE.pdf') visualization.doe(self, plabels=self.plabels, resampling=resampling, multifidelity=self.multifidelity, fname=path)
self.logger.debug('Space wrote to {}'.format(path))