Source code for batman.uq.uq

# coding: utf8
"""
UQ class
========

This class is intented to implement statistical tools provided by the OpenTURNS
framework.

.. seealso:: The documentation of the used class
             :class:`openturns.SensitivityAnalysis`.

It is called using option `-u`. Configuration is done from *settings*:

1. Computes Sobol' indices (map and block or aggragated),
2. Compare the result with a know function if available,
3. Propagate uncertainties from input distributions.

At each step, an output file is beeing written within the UQ folder.

It implements the following methods:

- :func:`UQ.func`
- :func:`UQ.int_func`
- :func:`UQ.error_pod`
- :func:`UQ.sobol`
- :func:`UQ.error_propagation`

Usage
-----
The *settings* file must contains the following parameters::

    'UQ' : {
        'method' : sobol, (or FAST)
        'type' : aggregated, (or block)
        'sample' : 20000,
        'pdf' : [Normal(sigma, mu), Uniform(inf, sup)] (OpenTURNS factories)
    }

:Example:

::

    >> analyse = UQ(surrogate, settings, output)
    >> analyse.sobol()
    >> analyse.error_propagation()

References
----------
A. Marrel, N. Saint-Geours. M. De Lozzo: Sensitivity Analysis of Spatial and/or
Temporal Phenomena. Handbook of Uncertainty Quantification. 2015.
DOI:10.1007/978-3-319-11259-6_39-1

B. Iooss: Revue sur l’analyse de sensibilité globale de modèles numériques.
Journal de la Société Française de Statistique. 2010

M. Baudin, A. Dutfoy, B. Iooss, A. Popelin: OpenTURNS: An industrial software
for uncertainty quantification in simulation. 2015. ArXiv ID: 1501.05242

"""
import logging
import os
import itertools
import numpy as np
import openturns as ot
from openturns.viewer import View
from sklearn.metrics import (r2_score, mean_squared_error)
from ..functions.utils import multi_eval
from ..input_output import formater
from .. import functions as func_ref
from .. import visualization


[docs]class UQ: """Uncertainty Quantification class.""" logger = logging.getLogger(__name__)
[docs] def __init__(self, surrogate, dists=None, nsample=5000, method='sobol', indices='aggregated', space=None, data=None, plabels=None, xlabel=None, flabel=None, xdata=None, fname=None, test=None): """Init the UQ class. From the settings file, it gets: - Method to use for the Sensitivity Analysis (SA), - Type of Sobol' indices to compute, - Number of points per sample to use for SA (:math:`N(2p+2)` predictions), resulting storage is 6N(out+p)*8 octets => 184Mo if N=1e4 - Method to use to predict a new snapshot, - The list of input variables, - The lengh of the output function. Also, it creates the `model` and `int_model` as :class:`openturns.PythonFunction`. :param surrogate: Surrogate model. :type surrogate: class:`batman.surrogate.SurrogateModel`. :param space: sample space (can be a list). :type space: class:`batman.space.Space`. :param array_like data: Snapshot's data (n_samples, n_features). :param list(str) plabels: parameters' names. :param str xlabel: label of the discretization parameter. :param str flabel: name of the quantity of interest. :param array_like xdata: 1D discretization of the function (n_features,). :param str fname: folder output path. :param str test: Test function from class:`batman.functions`. """ self.logger.info("\n----- UQ module -----") self.test = test self.fname = fname self.xlabel = xlabel self.flabel = flabel if self.fname is not None: try: os.mkdir(fname) except OSError: self.logger.debug("Output folder already exists.") finally: self.io = formater('json') # IOFormatSelector('fmt_tp_fortran') else: self.logger.debug("Not using output folder.") self.surrogate = surrogate self.p_len = len(dists) if plabels is None: self.plabels = ["x" + str(i) for i in range(self.p_len)] else: self.plabels = plabels self.method_sobol = method self.type_indices = indices self.space = space # Generate samples self.points_sample = nsample dists = ','.join(['ot.' + dists[i] for i in range(self.p_len)]) try: self.distribution = eval('ot.ComposedDistribution([' + dists + '])', {'__builtins__': None}, {'ot': __import__('openturns')}) except (TypeError, AttributeError): self.logger.error('OpenTURNS distribution unknown.') raise SystemError self.sample = ot.LHSExperiment(self.distribution, self.points_sample, True, True).generate() self.logger.info("Created {} samples with an LHS experiment" .format(self.points_sample)) try: # With surrogate model try: # Functional output f_eval, _ = self.surrogate(self.sample[0]) self.output_len = len(f_eval[0]) except ValueError: self.output_len = 1 self.output = self.func(self.sample) self.init_size = self.surrogate.space.doe_init except TypeError: self.sample = space self.init_size = len(space) self.output_len = data.shape[1] self.output = data self.points_sample = self.init_size if (xdata is None) and (self.output_len > 1): self.xdata = np.linspace(0, 1, self.output_len) else:
self.xdata = xdata def __repr__(self): """Information about object.""" return ("UQ object: Method({}), Input({}), Distribution({})" .format(self.method_sobol, self.plabels, self.distribution)) @multi_eval def func(self, coords): """Evaluate the surrogate at a given point. This function calls the surrogate to compute a prediction. :param lst coords: The parameters set to calculate the solution from. :return: The fonction evaluation. :rtype: float. """ f_eval, _ = self.surrogate(coords) return f_eval[0] @multi_eval def int_func(self, coords): """Evaluate the surrogate at a given point and return the integral. Same as :func:`func` but compute the integral using the trapezoidale rule. It simply returns the prediction in case of a scalar output. :param lst coords: The parameters set to calculate the solution from. :return: The integral of the function. :rtype: float. """ f_eval, _ = self.surrogate(coords) f_eval = np.trapz(f_eval[0]) return f_eval
[docs] def error_model(self, indices, function): r"""Compute the error between the POD and the analytic function. .. warning:: For test purpose only. Choises are `Ishigami`, `Rosenbrock`, `Michalewicz`, `G_Function` and `Channel_Flow` test functions. From the surrogate of the function, evaluate the error using the analytical evaluation of the function on the sample points. .. math:: Q^2 = 1 - \frac{err_{l2}}{var_{model}} Knowing that :math:`err_{l2} = \sum \frac{(prediction - reference)^2}{n}`, :math:`var_{model} = \sum \frac{(prediction - mean)^2}{n}` Also, it computes the mean square error on the Sobol first andtotal order indices. A summary is written within `model_err.dat`. :param array_like indices: Sobol first order indices computed using the POD. :param str function: name of the analytic function. :return: err_q2, mse, s_l2_2nd, s_l2_1st, s_l2_total. :rtype: array_like. """ fun = func_ref.__dict__[function]() s_l2_2nd = np.sqrt(np.sum((fun.s_second - indices[0]) ** 2)) s_l2_1st = np.sqrt(np.sum((fun.s_first - indices[1]) ** 2)) s_l2_total = np.sqrt(np.sum((fun.s_total - indices[2]) ** 2)) # Q2 computation y_ref = np.array(fun(self.sample)) y_pred = np.array(self.func(self.sample)) err_q2 = r2_score(y_ref, y_pred, multioutput='uniform_average') # MSE computation mse = mean_squared_error(y_ref, y_pred, multioutput='uniform_average') self.logger.info("\n----- Surrogate Model Error -----\n" "Q2: {}\n" "MSE: {}\n" "L2(sobol 2nd, 1st and total order indices error): " "{}, {}, {}" .format(err_q2, mse, s_l2_2nd, s_l2_1st, s_l2_total)) # Write error to file model_err.dat if self.fname is not None: with open(os.path.join(self.fname, 'model_err.dat'), 'w') as f: f.writelines("{} {} {} {} {} {}".format(self.init_size, err_q2, self.points_sample, s_l2_2nd, s_l2_1st, s_l2_total)) # Visual tests if fun.d_out == 1: # cobweb = ot.VisualTest.DrawCobWeb(self.sample, y_pred, y_min, y_max, 'red', False) # View(cobweb).show() qq_plot = ot.VisualTest_DrawQQplot(y_ref, y_pred) View(qq_plot).save(os.path.join(self.fname, 'qq_plot.png')) else: self.logger.debug("Cannot draw QQplot with output dimension > 1") else: self.logger.debug("No output folder to write errors in")
return err_q2, mse, s_l2_2nd, s_l2_1st, s_l2_total
[docs] def sobol(self): """Compute Sobol' indices. It returns the second, first and total order indices of Sobol'. Two methods are possible for the indices: - `sobol` - `FAST` .. warning:: The second order indices are only available with the sobol method. Also, when there is no surrogate (ensemble mode), FAST is not available and the DoE must have been generated with `saltelli`. And two types of computation are availlable for the global indices: - `block` - `aggregated` If *aggregated*, *map* indices are computed. In case of a scalar value, all types returns the same values. *block* indices are written within `sensitivity.dat` and aggregated indices within `sensitivity_aggregated.dat`. Finally, it calls :func:`error_pod` in order to compare the indices with their analytical values. :return: Sobol' indices. :rtype: array_like. """ indices = [[], [], []] aggregated = [[], [], []] indices_conf = [[], []] if self.type_indices == 'block': sobol_model = self.int_func sobol_len = 1 else: sobol_model = self.func sobol_len = self.output_len if self.method_sobol == 'sobol': self.logger.info("\n----- Sobol' indices -----") if self.surrogate is not None: size = self.points_sample input_design = ot.SobolIndicesAlgorithmImplementation.Generate( self.distribution, size, True) output_design = sobol_model(input_design) self.logger.info("Created {} samples for Sobol'" .format(len(output_design))) else: input_design = self.space output_design = self.output size = len(self.space) // (2 * self.p_len + 2) # Martinez, Saltelli, MauntzKucherenko, Jansen ot.ResourceMap.SetAsBool('MartinezSensitivityAlgorithm-UseAsmpytoticInterval', True) sobol = ot.SaltelliSensitivityAlgorithm(input_design, output_design, size) for i in range(sobol_len): try: indices[0].append(np.array(sobol.getSecondOrderIndices(i))) except TypeError: indices[0].append(np.zeros((self.p_len, self.p_len))) self.logger.debug("-> Second order:\n{}\n".format(indices[0])) elif self.method_sobol == 'FAST': self.logger.info("\n----- FAST indices -----") if self.output_len > 1: wrap_fun = sobol_model else: def wrap_fun(x): return [sobol_model(x)] fast_model = ot.PythonFunction(self.p_len, self.output_len, wrap_fun) sobol = ot.FAST(ot.Function(fast_model), self.distribution, self.points_sample) output_design = sobol_model(self.sample) self.logger.warning("No Second order indices with FAST") # try block used to handle boundary conditions with fixed values for i in range(sobol_len): try: indices[1].append(np.array(sobol.getFirstOrderIndices(i))) except TypeError: indices[1].append(np.zeros(self.p_len)) try: indices[2].append(np.array(sobol.getTotalOrderIndices(i))) except TypeError: indices[2].append(np.zeros(self.p_len)) self.logger.debug("-> First order:\n{}\n" "-> Total:\n{}\n" .format(*indices[1:])) # Write Sobol' indices to file: block or map if self.fname is not None: i1 = np.reshape(indices[1], (sobol_len, self.p_len)) i2 = np.reshape(indices[2], (sobol_len, self.p_len)) data = np.append(i1, i2, axis=1) names = [] for p in self.plabels: names += ['S_' + str(p)] for p in self.plabels: names += ['S_T_' + str(p)] if (self.output_len > 1) and (self.type_indices != 'block'): full_names = ['x'] + names data = np.append(self.xdata, data) else: full_names = names self.io.write(os.path.join(self.fname, 'sensitivity.json'), data, full_names) else: self.logger.debug("No output folder to write indices in") # Aggregated Indices if self.type_indices == 'aggregated': self.logger.info("\n----- Aggregated Sensitivity Indices -----") output_var = output_design.var(axis=0) sum_var_indices = [np.zeros((self.p_len, self.p_len)), np.zeros((self.p_len)), np.zeros((self.p_len))] # Compute manually for FAST and second order, otherwise OT if self.method_sobol == 'FAST': agg_range = [0, 1, 2] else: agg_range = [0] for i, j in itertools.product(range(self.output_len), agg_range): try: indices[:][j][i] = np.nan_to_num(indices[:][j][i]) sum_var_indices[j] += float(output_var[i]) * indices[:][j][i] except IndexError: sum_var_indices[j] = np.inf sum_var = np.sum(output_var) for i in range(3): aggregated[i] = sum_var_indices[i] / sum_var if self.method_sobol != 'FAST': aggregated[1] = np.array(sobol.getAggregatedFirstOrderIndices()) aggregated[2] = np.array(sobol.getAggregatedTotalOrderIndices()) indices_conf[0] = sobol.getFirstOrderIndicesInterval() indices_conf[1] = sobol.getTotalOrderIndicesInterval() self.logger.info("-> First order confidence:\n{}\n" "-> Total order confidence:\n{}\n" .format(*indices_conf)) self.logger.info("Aggregated_indices:\n" "-> Second order:\n{}\n" "-> First order:\n{}\n" "-> Total order:\n{}\n" .format(*aggregated)) # Write aggregated indices to file if self.fname is not None: ind_total_first = np.array(aggregated[1:]) # .flatten('F') i1 = np.array(aggregated[1]) # .flatten('F') i2 = np.array(aggregated[2]) # .flatten('F') if self.method_sobol != 'FAST': i1_min = np.array(indices_conf[0].getLowerBound()) # .flatten('F') i1_max = np.array(indices_conf[0].getUpperBound()) # .flatten('F') i2_min = np.array(indices_conf[1].getLowerBound()) # .flatten('F') i2_max = np.array(indices_conf[1].getUpperBound()) # .flatten('F') # layout: [S_min_P1, S_min_P2, ..., S_P1, S_p2, ...] data = np.array([i1_min, i1, i1_max, i2_min, i2, i2_max]).flatten() names = [i + str(p) for i, p in itertools.product(['S_min_', 'S_', 'S_max_', 'S_T_min_', 'S_T_', 'S_T_max_'], self.plabels)] conf1 = np.vstack((i1_min, i2_min)) conf1 = np.ravel(ind_total_first - conf1, order='F') conf2 = np.vstack((i1_max, i2_max)) conf2 = np.ravel(conf2 - ind_total_first, order='F') conf = np.vstack((conf1, conf2)) else: conf = None names = [i + str(p) for i, p in itertools.product(['S_', 'S_T_'], self.plabels)] data = np.append(i1, i2) self.io.write(os.path.join(self.fname, 'sensitivity_aggregated.json'), data, names) else: self.logger.debug( "No output folder to write aggregated indices in") full_indices = [aggregated[1], aggregated[2], indices[1], indices[2]] else: full_indices = [indices[1][0], indices[2][0]] aggregated = [indices[0][0], indices[1][0], indices[2][0]] conf = None self.xdata = None # Plot if self.fname is not None: path = os.path.join(self.fname, 'sensitivity.pdf') visualization.sobol(full_indices, plabels=self.plabels, conf=conf, xdata=self.xdata, fname=path) # Compute error of the POD with a known function if (self.type_indices in ['aggregated', 'block'])\ and (self.test is not None) and (self.surrogate is not None): self.error_model(aggregated, self.test)
return aggregated
[docs] def error_propagation(self): """Compute the moments. 1st, 2nd order moments are computed for every output of the function. Also compute the PDF for these outputs, and compute correlations (YY and XY) and correlation (YY). Both exported as 2D cartesian plots. Files are respectivelly: * :file:`pdf-moment.dat`, moments [discretized on curvilinear abscissa] * :file:`pdf.dat` -> the PDFs [discretized on curvilinear abscissa] * :file:`correlation_covariance.dat` -> correlation and covariance YY * :file:`correlation_XY.dat` -> correlation XY * :file:`pdf.pdf`, plot of the PDF (with moments if dim > 1) """ self.logger.info("\n----- Uncertainty Propagation -----") # Covariance and correlation matrices self.logger.info('Creating Covariance/correlation and figures...') if (self.output_len > 1) and (self.type_indices != 'block'): visualization.corr_cov(self.output, self.sample, self.xdata, fname=os.path.join(self.fname, 'corr_cov.pdf')) # Create and plot the PDFs + moments self.logger.info('Creating PDF and figures...') visualization.pdf(self.output, self.xdata, xlabel=self.xlabel, flabel=self.flabel, fname=os.path.join(self.fname, 'pdf.pdf'),
moments=True)