# coding: utf8
"""
POD Class
=========
Defines the methods to compute the POD.
:Example:
::
>> from pod import Pod
>> pod = Pod(tol, max, coners)
>> pod.decompose(snapshots)
>> pod.write(path)
>> pod.estimate_quality()
References
----------
[1] M. Brand: Fast low-rank modifications of the thin singular value decomposition.
2006. DOI:10.1016/j.laa.2005.07.021
[2] T. Braconnier: Towards an adaptive POD/SVD surrogate model for aeronautic design.
Computers & Fluids. 2011. DOI:10.1016/j.compfluid.2010.09.002
"""
import logging
import os
import copy
import numpy as np
from ..surrogate import SurrogateModel
from ..misc import ProgressBar, NestedPool, cpu_system
from ..space import Space
[docs]class Pod(object):
"""POD class."""
logger = logging.getLogger(__name__)
# Directory structure to store a pod
directories = {
'mean_snapshot': 'Mean.txt',
'modes': 'Mods.npz',
}
# File name for storing the MPI independent POD data
pod_file_name = 'pod.npz'
# File name for storing the points
points_file_name = 'points.dat'
[docs] def __init__(self, corners, nsample, tolerance, dim_max, nrefine=0):
"""Initialize POD components.
The decomposition of the snapshot matrix is stored as attributes:
- U: Singular vectors matrix, array_like (n_features, n_snapshots),
after filtering array_like(n_features, n_modes),
- S: Singular values matrix, array_like (n_modes, n_snapshots),
only the diagonal is stored, of length (n_modes),
- V: array_like(n_snapshots, n_snapshots),
after filtering (n_snapshots, n_modes).
:param array_like corners: hypercube ([min, n_features], [max, n_features]).
:param int/array_like sample: number of sample or list of sample of
shape (n_samples, n_features).
:param int nrefine: number of point to use for refinement.
:param float tolerance: basis modes filtering criteria.
:param int dim_max: number of basis modes to keep.
"""
self.quality = None
self.predictor = None
self.leave_one_out_predictor = 'kriging'
self.corners = corners
self.space = Space(self.corners, nsample, nrefine)
# POD computation related
self.tolerance = tolerance
self.dim_max = dim_max
self.mean_snapshot = None
self.U = None
self.S = None
self.V = None
def __str__(self):
"""POD summary."""
s = ("\nPOD summary:\n"
"modes filtering tolerance: {}\n"
"dimension of parameter space: {}\n"
"number of snapshots: {}\n"
"number of data per snapshot: {}\n"
"maximum number of modes: {}\n"
"number of modes: {}\n"
"modes: {}\n"
.format(self.tolerance, self.space.dim, len(self.space),
self.mean_snapshot.shape[0], self.dim_max,
self.S.shape[0], self.S))
return s
[docs] def decompose(self, snapshots):
"""Create a POD from a set of snapshots.
:param lst(array) snapshots: snapshots matrix.
"""
self.logger.info('Decomposing POD basis...')
matrix = np.column_stack(tuple([s.data for s in snapshots]))
self._decompose(matrix)
for s in snapshots:
self.space += s.point
self.logger.info('Computed POD basis with %g modes', self.S.shape[0])
[docs] def update(self, snapshot):
"""Update POD with a new snapshot.
:param snapshot: new snapshot to update the POD with.
"""
self.logger.info('Updating POD basis...')
self._update(snapshot.data)
self.space += snapshot.point
self.logger.info('Updated POD basis with snapshot at point {}'
.format(snapshot.point))
[docs] def estimate_quality(self):
"""Quality estimator.
Estimate the quality of the POD by the leave-one-out method.
:return: Q2.
:rtype: float.
"""
self.logger.info('Estimating POD quality...')
# Get rid of predictor creation messages
level_init = copy.copy(self.logger.getEffectiveLevel())
logging.getLogger().setLevel(logging.WARNING)
quality, point = self._estimate_quality(self.space)
logging.getLogger().setLevel(level_init)
self.quality = quality
self.logger.info('POD quality: {}, max error location at {}'
.format(quality, point))
return self.quality, point
[docs] def write(self, path):
"""Save a POD to disk.
:param str path: path to a directory.
"""
# create output directory if necessary
try:
os.makedirs(path)
except OSError:
pass
# points
self.space.write(os.path.join(path, self.points_file_name))
# mean snapshot
path_snapshot = os.path.join(path, self.directories['mean_snapshot'])
np.savetxt(path_snapshot, self.mean_snapshot)
# basis
points = np.vstack(tuple(self.space))
np.savez(os.path.join(path, self.pod_file_name),
parameters=points,
values=self.S,
vectors=self.V,
modes=self.U)
self.logger.info('Wrote POD to %s', path)
[docs] def read(self, path):
"""Read a POD from disk.
:param str path: path to a directory.
"""
# points
self.space.read(os.path.join(path, self.points_file_name))
# mean snapshot
path_snapshot = os.path.join(path, self.directories['mean_snapshot'])
self.mean_snapshot = np.atleast_1d(np.loadtxt(path_snapshot))
# basis
lazy_data = np.load(os.path.join(path, self.pod_file_name))
self.S = lazy_data['values']
self.V = lazy_data['vectors']
self.U = lazy_data['modes']
self.logger.info('Read POD from %s', path)
[docs] def VS(self):
"""Compute V*S matrix product.
S is diagonal and stored as vector thus (V*S).T = SV.T
"""
return self.V * self.S
def _decompose(self, snapshots):
"""Perform the POD.
The snapshot matrix consists in snapshots arranged in column.
Snapshots are centered with the mean snapshot then the matrix is
decomposed using a reduce SVD from numpy.
`S` is not stored as the conjugate but as `S`.
:param array_like snapshots: Snapshot matrix (nb of data per snapshot,
nb of samples)
"""
# compute mean snapshot
self.mean_snapshot = np.average(snapshots, 1)
# center snapshots
for i in range(snapshots.shape[1]):
snapshots[:, i] -= self.mean_snapshot
self.U, self.S, self.V = np.linalg.svd(snapshots, full_matrices=False)
self.V = self.V.T
self.U, self.S, self.V = self.filtering(self.U, self.S, self.V,
self.tolerance, self.dim_max)
[docs] @staticmethod
def filtering(U, S, V, tolerance, dim_max):
"""Remove lowest modes in U, S and V.
:param array_like U: (nb of data, nb of snapshots).
:param array_like S: (nb of modes).
:param array_like V: (nb of snapshots, nb of snapshots).
:param float tolerance: basis modes filtering criteria.
:param int dim_max: number of basis modes to keep.
:return: U (nb of data, nb of modes).
:rtype: array_like.
:return: S (nb of modes).
:rtype: array_like.
:return: V (nb of snapshots, nb of modes).
:rtype: array_like.
"""
total_sum = np.sum(S)
for i in range(S.shape[0]):
dim = i+1
with np.errstate(divide='ignore', invalid='ignore'):
if np.sum(S[:i + 1]) / total_sum > tolerance:
break
dim = min(dim, dim_max)
# copy ensures an array is not a slice of a bigger memory zone
if U.shape[1] != dim:
U = U[:, :dim].copy()
if S.shape[0] != dim:
S = S[:dim].copy()
if V.shape[1] != dim:
V = V[:, :dim].copy()
return U, S, V
def _update(self, snapshot):
"""Update POD with a new snapshot.
:param array_like snapshot: a snapshot, (n_features,).
"""
if self.mean_snapshot is None:
# start off with a mode that will be thrown away
# by filtering: 0. singular value
self.mean_snapshot = snapshot
self.U = np.zeros([snapshot.shape[0], 1])
self.U[0, 0] = 1.
self.S = np.zeros([1])
self.V = np.ones([1, 1])
else:
# backup and update mean snapshot
mean_snapshot_copy = self.mean_snapshot.copy()
s_nb = self.V.shape[0]
self.mean_snapshot = (s_nb * self.mean_snapshot + snapshot)\
/ (s_nb + 1)
# move to POD origin and project the snapshot on the POD basis
snapshot -= mean_snapshot_copy
s_proj = np.dot(self.U.T, snapshot)
# mpi.Allreduce(sendbuf=s_proj.copy(), recvbuf=s_proj, op=mpi.sum)
h = snapshot - np.dot(self.U, s_proj)
h_norm = np.linalg.norm(h)
h_norm *= h_norm
h_norm = np.sum(h_norm)
# h_norm = mpi.allreduce(h_norm, op=mpi.sum)
h_norm = np.sqrt(h_norm)
# St = |S U^T s_proj|
# |0 norm(h)|
self.S = np.column_stack([np.diag(self.S), s_proj])
self.S = np.vstack([self.S, np.zeros_like(self.S[0])])
self.S[-1, -1] = h_norm
# Ut = |U q/norm(q)|
if h_norm == 0.:
h_norm = 1. # fix for h = 0
self.U = np.column_stack([self.U, h / h_norm])
# Vt = |V 0|
# |0 1|
self.V = np.vstack([self.V, np.zeros_like(self.V[0])])
self.V = np.column_stack([self.V, np.zeros_like(self.V[:, 0])])
self.V[-1, -1] = 1.
Ud, self.S, Vd_T = np.linalg.svd(self.S)
self.V = np.dot(self.V, Vd_T.T)
Un, self.S, self.V = self.downgrade(self.S, self.V)
self.U = np.dot(self.U, np.dot(Ud, Un))
self.U, self.S, self.V = self.filtering(self.U, self.S, self.V,
self.tolerance, self.dim_max)
[docs] @staticmethod
def downgrade(S, Vt):
r"""Downgrade by removing the kth row of V.
.. math:: S^{-k} &= U\Sigma R^T Q^T\\
S^{-k} &= UU'\Sigma'V'^TQ^T \\
S^{-k} &= U^{-k}\Sigma'V^{(-k)^T}
:param S: Singular vector, array_like (n_modes,).
:param Vt: V.T without one row, array_like (n_snapshots - 1, n_modes).
:return: U', S', V(-k).T
:rtype: array_like.
"""
v = np.average(Vt, 0)
for row in Vt:
row -= v
Q, R = np.linalg.qr(Vt)
R = (S * R).T
Urot, S, V = np.linalg.svd(R, full_matrices=False)
V = np.dot(Q, V.T)
return Urot, S, V
def _estimate_quality(self, points):
r"""Quality estimation of the model.
The quality estimation is done using the leave-one-out method.
A parallel computation is performed by iterating over the
points of the DOE.
Q2 is computed and the point with max MSE is looked up.
A multithreading strategy is used:
1. Create a N threads with :math:`N=\frac{n_{cpu}}{n_{restart} \times n_{modes}}`,
2. If :math:`N > n_{cpu}` restrict the threads to 1.
:param lst points: Points in the parameter space.
:return: Q2 error.
:rtype: float.
:return: Max MSE point.
:rtype: array_like (n_features,).
"""
points_nb = len(points)
data_len = self.U.shape[0]
error_l_two = np.empty(points_nb)
snapshot_value = np.empty((points_nb, data_len))
error_matrix = np.empty((points_nb, data_len))
var_matrix = np.empty((points_nb, data_len))
surrogate = SurrogateModel(self.leave_one_out_predictor,
self.corners)
def quality(i):
"""Error at a point.
:param int i: point iterator.
:return: mean and error.
:rtype: array_like.
"""
# Remove point from matrix
V_1 = np.delete(self.V, i, 0)
(Urot, S_1, V_1) = self.downgrade(self.S, V_1)
(Urot, S_1, V_1) = self.filtering(Urot, S_1, V_1,
1.,
len(self.S))
points_1 = points[:]
points_1.pop(i)
new_pod = copy.deepcopy(self)
new_pod.space = points_1
new_pod.V = V_1
new_pod.S = S_1
# New prediction with points_nb - 1
surrogate.fit(new_pod.space, new_pod.V * new_pod.S)
prediction, _ = surrogate(points[i])
# MSE on the missing point
error_no_mod = np.dot(Urot, prediction[0]) - float(points_nb) /\
float(points_nb - 1) * self.V[i] * self.S
error_vector_ = np.dot(self.U, error_no_mod)
error_l_two_ = np.sqrt(np.sum(error_no_mod ** 2))
# Because V = V.T -> V[i] is a column so V[i]S = SV.T
snapshot_value_ = np.dot(self.U, self.V[i] * self.S)
return snapshot_value_, error_l_two_, error_vector_
# Multi-threading strategy
n_cpu_system = cpu_system()
n_cpu = n_cpu_system // (len(self.S) * 3)
if n_cpu < 1:
n_cpu = 1
elif n_cpu > points_nb:
n_cpu = points_nb
pool = NestedPool(n_cpu)
progress = ProgressBar(points_nb)
results = pool.imap(quality, range(points_nb))
for i in range(points_nb):
snapshot_value[i], error_l_two[i], error_matrix[i] = results.next()
progress()
pool.terminate()
mean = np.mean(snapshot_value, axis=0)
for i in range(points_nb):
var_matrix[i] = (mean - np.dot(self.U, self.V[i] * self.S)) ** 2
# Compute Q2
# Use a part of the code of the r2_score function
# From scikit-learn library
numerator = (error_matrix ** 2).sum(axis=0, dtype=np.float64)
denominator = np.sum(var_matrix, axis=0, dtype=np.float64)
nonzero_denominator = denominator != 0
nonzero_numerator = numerator != 0
valid_score = nonzero_denominator & nonzero_numerator
output_scores = np.ones([data_len])
output_scores[valid_score] = 1 - (numerator[valid_score] /
denominator[valid_score])
# arbitrary set to zero to avoid -inf scores, having a constant
# y_true is not interesting for scoring a regression anyway
output_scores[nonzero_numerator & ~nonzero_denominator] = 0.
q2 = output_scores
index = error_l_two.argmax()
err_q2 = np.mean(q2)
return err_q2, points[index]