# coding: utf8
"""
Polynomial Chaos class
======================
Interpolation using Polynomial Chaos method.
:Example:
::
>> from batman.surrogate import PC
>> from batman.functions import Michalewicz
>> import numpy as np
>> f = Michalewicz()
>> surrogate = PC('Quad', 5, [ot.Uniform(0, 1), ot.Uniform(0, 1)])
>> sample = np.array(surrogate.sample)
>> sample.shape
(36, 2)
>> data = f(sample)
>> surrogate.fit(sample, data)
>> point = [0.4, 0.6]
>> surrogate.evaluate(point)
array([ -8.642e-08])
"""
import logging
import openturns as ot
import numpy as np
from ..functions.utils import multi_eval
ot.ResourceMap.SetAsUnsignedInteger("DesignProxy-DefaultCacheSize", 0)
[docs]class PC(object):
"""Polynomial Chaos class."""
logger = logging.getLogger(__name__)
[docs] def __init__(self, strategy, degree, distributions, sample=None,
stieltjes=True):
"""Generate truncature and projection strategies.
Allong with the strategies the sample is storred as an attribute:
:attr:`sample` as well as the weights: :attr:`weights`.
:param str strategy: Least square or Quadrature ['LS', 'Quad'].
:param int degree: Polynomial degree.
:param distributions: Distributions of each input parameter.
:type distributions: lst(:class:`openturns.Distribution`)
:param int sample: Samples for least square.
:param bool stieltjes: Wether to use Stieltjes algorithm for the basis.
"""
# distributions
in_dim = len(distributions)
self.dist = ot.ComposedDistribution(distributions)
enumerateFunction = ot.EnumerateFunction(in_dim)
if stieltjes:
# Tend to result in performance issue
self.basis = ot.OrthogonalProductPolynomialFactory(
[ot.StandardDistributionPolynomialFactory(
ot.AdaptiveStieltjesAlgorithm(marginal))
for marginal in distributions], enumerateFunction)
else:
self.basis = ot.OrthogonalProductPolynomialFactory(
[ot.StandardDistributionPolynomialFactory(margin)
for margin in distributions], enumerateFunction)
self.n_basis = enumerateFunction.getStrataCumulatedCardinal(degree)
# Strategy choice for expansion coefficient determination
self.strategy = strategy
if self.strategy == 'LS': # least-squares method
self.sample = sample
else: # integration method
# redefinition of sample size
# n_samples = (degree + 1) ** in_dim
# marginal degree definition
# by default: the marginal degree for each input random
# variable is set to the total polynomial degree 'degree'+1
measure = self.basis.getMeasure()
degrees = [degree + 1] * in_dim
self.proj_strategy = ot.IntegrationStrategy(
ot.GaussProductExperiment(measure, degrees))
self.sample, self.weights = self.proj_strategy.getExperiment().generateWithWeights()
if not stieltjes:
transformation = ot.Function(ot.MarginalTransformationEvaluation(
[measure.getMarginal(i) for i in range(in_dim)],
distributions, False))
self.sample = transformation(self.sample)
self.pc = None
self.pc_result = None
[docs] def fit(self, sample, data):
"""Create the predictor.
The result of the Polynomial Chaos is stored as :attr:`pc_result` and
the surrogate is stored as :attr:`pc`.
:param array_like sample: The sample used to generate the data
(n_samples, n_features).
:param array_like data: The observed data (n_samples, [n_features]).
"""
if self.strategy == 'LS': # least-squares method
self.proj_strategy = ot.LeastSquaresStrategy(sample, data)
_, self.weights = self.proj_strategy.getExperiment().generateWithWeights()
sample_ = np.zeros_like(self.sample)
sample_[:len(sample)] = sample
sample_arg = np.all(np.isin(sample_, self.sample), axis=1)
weights = np.array(self.weights)[sample_arg]
trunc_strategy = ot.FixedStrategy(self.basis, self.n_basis)
pc_algo = ot.FunctionalChaosAlgorithm(sample, weights, data,
self.dist, trunc_strategy)
pc_algo.setProjectionStrategy(self.proj_strategy)
ot.Log.Show(ot.Log.ERROR)
pc_algo.run()
self.pc_result = pc_algo.getResult()
self.pc = self.pc_result.getMetaModel()
self.pc, self.pc_result
@multi_eval
def evaluate(self, point):
"""Make a prediction.
From a point, make a new prediction.
:param array_like point: The point to evaluate (n_features,).
:return: The predictions.
:rtype: array_like (n_features,).
"""
point_array = np.asarray(point).reshape(1, -1)
prediction = np.array(self.pc(point_array))
return prediction