Source code for batman.surrogate.kriging

# coding: utf8
"""
Kriging Class
=============

Interpolation using Gaussian Process method.

:Example:

::

    >> from batman.surrogate import Kriging
    >> import numpy as np
    >> sample = np.array([[2, 4], [3, 5], [6, 9]])
    >> data = np.array([[12, 1], [10, 2], [9, 4]])
    >> predictor = Kriging(sample, data)
    >> point = (5.0, 8.0)
    >> predictor.evaluate(point)
    (array([ 8.4526528 ,  3.57976035]), array([ 0.40982369,  0.05522197]))

Reference
---------
F. Pedregosa et al.: Scikit-learn: Machine Learning in Python. Journal of
Machine Learning Research. 2011. ArXiv ID: 1201.0490
"""
import logging
import warnings
import numpy as np
from scipy.optimize import differential_evolution
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import (WhiteKernel, Matern, ConstantKernel)
from ..misc import (NestedPool, cpu_system)
from ..functions.utils import multi_eval


[docs]class Kriging(object): """Kriging based on Gaussian Process.""" logger = logging.getLogger(__name__)
[docs] def __init__(self, sample, data, kernel=None, noise=False, global_optimizer=True): r"""Create the predictor. Uses sample and data to construct a predictor using Gaussian Process. Input is to be normalized before and depending on the number of parameters, the kernel is adapted to be anisotropic. :attr:`self.data` contains the predictors as a list(array) of the size of the `ouput`. A predictor per line of `data` is created. This leads to a line of predictors that predicts a new column of `data`. If :attr:`noise` is a float, it will be used as :attr:`noise_level` by :class:`sklearn.gaussian_process.kernels.WhiteKernel`. Otherwise, if :attr:`noise` is ``True``, default values are use for the WhiteKernel. If :attr:`noise` is ``False``, no noise is added. A multiprocessing strategy is used: 1. Create a process per mode, do not create if only one, 2. Create `n_restart` (3 by default) processes by process. In the end, there is :math:`N=n_{restart} \times n_{modes})` processes. If there is not enought CPU, :math:`N=\frac{n_{cpu}}{n_{restart}}`. :param array_like sample: Sample used to generate the data (n_samples, n_features). :param array_like data: Observed data (n_samples, n_features). :param kernel: Kernel from scikit-learn. :type kernel: :class:`sklearn.gaussian_process.kernels`.*. :param float/bool noise: Noise used into kriging. :param bool global_optimizer: Whether to do global optimization or gradient based optimization to estimate hyperparameters. """ try: sample[0][0] except (TypeError, IndexError): pass else: sample = np.array(sample).reshape(len(sample), -1) dim = sample.shape[1] self.model_len = data.shape[1] if self.model_len == 1: data = data.ravel() if kernel is not None: self.kernel = kernel else: # Define the model settings l_scale = (1.0,) * dim scale_bounds = [(0.01, 100)] * dim self.kernel = ConstantKernel() * Matern(length_scale=l_scale, length_scale_bounds=scale_bounds) # Add a noise on the kernel using WhiteKernel if noise: if isinstance(noise, bool): noise = WhiteKernel() else: noise = WhiteKernel(noise_level=noise) self.kernel += noise # Global optimization args_optim = {'kernel': self.kernel, 'normalize_y': True} if global_optimizer: args_optim.update({'optimizer': self._optim_evolution, 'n_restarts_optimizer': 0}) self.n_restart = 3 else: args_optim.update({'n_restarts_optimizer': 10 * dim}) self.n_restart = 1 # Define the CPU multi-threading/processing strategy n_cpu_system = cpu_system() self.n_cpu = self.model_len if n_cpu_system // (self.n_restart * self.model_len) < 1: self.n_cpu = n_cpu_system // self.n_restart self.n_cpu = 1 if self.n_cpu == 0 else self.n_cpu def model_fitting(column): """Fit an instance of :class:`sklearn.GaussianProcessRegressor`.""" gp = GaussianProcessRegressor(**args_optim) with warnings.catch_warnings(): warnings.simplefilter("ignore") data = gp.fit(sample, column) hyperparameter = np.exp(gp.kernel_.theta) # Convergence check with bounds only when kernel not user defined if kernel is None: hyper_bounds = all([i[0] < j < i[1] for i, j in zip(scale_bounds, hyperparameter[1:dim+1])]) if not hyper_bounds: self.logger.warning("Hyperparameters optimization not " "converged: {}" .format(gp.kernel_)) return data, hyperparameter # Create a predictor per data, parallelize if several data if self.model_len > 1: pool = NestedPool(self.n_cpu) results = pool.imap(model_fitting, data.T) results = list(results) pool.terminate() else: results = [model_fitting(data)] # Gather results self.data, self.hyperparameter = zip(*results)
self.logger.debug("Kernels:\n{}".format([gp.kernel_ for gp in self.data])) def _optim_evolution(self, obj_func, initial_theta, bounds): """Genetic optimization of the hyperparameters. Use DE strategy to optimize theta. The process is done several times using multiprocessing. The best results are returned. :param callable obj_func: function to optimize. :param lst(float) initial_theta: initial guess. :param lst(lst(float)) bounds: bounds. :return: theta_opt and func_min. :rtype: lst(float), float. """ def func(args): """Get the output from sklearn.""" return obj_func(args)[0] def fork_optimizer(i): """Optimize hyperparameters.""" results = differential_evolution(func, bounds, tol=0.001, popsize=15+i) theta_opt = results.x func_min = results.fun return theta_opt, func_min pool = NestedPool(self.n_restart) results = pool.imap(fork_optimizer, range(self.n_restart)) # Gather results results = list(results) pool.terminate() theta_opt, func_min = zip(*results) # Find best results min_idx = np.argmin(func_min) func_min = func_min[min_idx] theta_opt = theta_opt[min_idx] return theta_opt, func_min @multi_eval def evaluate(self, point): """Make a prediction. From a point, make a new prediction. :param array_like point: The point to evaluate (n_features,). :return: The predictions. :rtype: array_like (n_features,). """ point_array = np.asarray(point).reshape(1, -1) prediction = np.empty((self.model_len)) sigma = np.empty((self.model_len)) with warnings.catch_warnings(): warnings.simplefilter("ignore") # Compute a prediction per predictor for i, gp in enumerate(self.data): prediction[i], sigma[i] = gp.predict(point_array, return_std=True, return_cov=False)
return prediction, sigma