# coding: utf8
"""
Driver Class
============
Defines all methods used to interact with other classes.
:Example:
::
>> from batman import Driver
>> driver = Driver(settings, script_path, output_path)
>> driver.sampling_pod(update=False)
>> driver.write_pod()
>> driver.prediction(write=True)
>> driver.write_model()
>> driver.uq()
>> driver.visualization()
"""
import logging
import os
import pickle
from copy import copy
from concurrent import futures
import numpy as np
import sklearn.gaussian_process.kernels as kernels
from .pod import Pod
from .space import (Space, dists_to_ot)
from .surrogate import SurrogateModel
from .tasks import (SnapshotIO, ProviderPlugin, ProviderFile)
from .uq import UQ
from .visualization import (response_surface, Kiviat3D)
from .functions.utils import multi_eval
[docs]class Driver(object):
"""Driver class."""
logger = logging.getLogger(__name__)
# Structure of the output directory
fname_tree = {
'snapshots': 'snapshots',
'space': 'space.dat',
'data': 'data.dat',
'pod': 'surrogate/pod',
'surrogate': 'surrogate',
'predictions': 'predictions',
'uq': 'uq',
'visualization': 'visualization',
}
# Data provider for snapshots
provider_class = {
'plugin': ProviderPlugin,
'file': ProviderFile,
}
[docs] def __init__(self, settings, fname):
"""Initialize Driver.
From settings, init snapshot, space [POD, surrogate].
:param dict settings: settings.
:param str fname: output folder path.
"""
self.settings = settings
self.fname = fname
try:
os.makedirs(self.fname)
except OSError:
pass # directory exists already
# Space
if 'resampling' in self.settings['space']:
resamp_size = self.settings['space']['resampling']['resamp_size']
else:
resamp_size = 0
if 'init_size' in self.settings['space']['sampling']:
init_size = self.settings['space']['sampling']['init_size']
else: # when providing DoE as a list
init_size = self.settings['space']['sampling']
try:
duplicate = self.settings['space']['sampling']['method'] == 'saltelli'
except (KeyError, TypeError):
duplicate = False
self.space = Space(self.settings['space']['corners'],
init_size,
nrefine=resamp_size,
plabels=self.settings['snapshot']['plabels'],
duplicate=duplicate)
# Asynchronous job manager
self.async_pool = futures.ThreadPoolExecutor(
max_workers=self.settings['snapshot']['max_workers'])
# Snapshot Management
args = settings['snapshot'].get('io', {})
self.snapshot_io = SnapshotIO(parameter_names=settings['snapshot']['plabels'],
feature_names=settings['snapshot']['flabels'],
**args)
provider_type = settings['snapshot']['provider']['type']
self.logger.info('Select data provider type "{}"'.format(provider_type))
self.provider = self.provider_class[provider_type](
self.async_pool,
self.snapshot_io,
settings['snapshot']['provider'])
self.snapshot_counter = 0
# Sampling initialisation
self.to_compute_points = copy(self.provider.known_points)
if self.to_compute_points:
# use points that were automatically discovered by the provider
for point in self.to_compute_points:
self.space += point
else:
# generate points according to settings
space_provider = self.settings['space']['sampling']
if isinstance(space_provider, list):
# a list of points is provided
self.logger.info('Reading list of points from the settings.')
self.to_compute_points = space_provider
self.space += space_provider
elif isinstance(space_provider, dict):
# use sampling method
distributions = space_provider.get('distributions')
discrete = self.settings['space']['sampling'].get('discrete')
self.to_compute_points = self.space.sampling(
space_provider['init_size'],
space_provider['method'],
distributions,
discrete)
else:
self.logger.error('Bad space provider.')
raise SystemError
# Pod
if 'pod' in self.settings:
settings_ = {'tolerance': self.settings['pod']['tolerance'],
'dim_max': self.settings['pod']['dim_max'],
'corners': self.settings['space']['corners'],
'nsample': self.space.doe_init,
'nrefine': resamp_size}
self.pod = Pod(**settings_)
self.pod.space.duplicate = duplicate
else:
self.pod = None
self.logger.info('No POD is computed.')
self.data = None
# Surrogate model
if 'surrogate' in self.settings:
settings_ = {}
if self.settings['surrogate']['method'] == 'pc':
dists = self.settings['space']['sampling']['distributions']
dists = dists_to_ot(dists)
settings_ = {'strategy': self.settings['surrogate']['strategy'],
'degree': self.settings['surrogate']['degree'],
'distributions': dists,
'sample': self.space[:]}
elif self.settings['surrogate']['method'] == 'evofusion':
settings_ = {'cost_ratio': self.settings['surrogate']['cost_ratio'],
'grand_cost': self.settings['surrogate']['grand_cost']}
elif self.settings['surrogate']['method'] == 'kriging':
if 'kernel' in self.settings['surrogate']:
kernel = self.settings['surrogate']['kernel']
try:
kernel = eval(kernel, {'__builtins__': None},
kernels.__dict__)
except (TypeError, AttributeError):
self.logger.error('Scikit-Learn kernel unknown.')
raise SystemError
settings_ = {'kernel': kernel}
settings_.update({
'noise': self.settings['surrogate'].get('noise', False),
'global_optimizer': self.settings['surrogate'].get('global_optimizer', True)
})
self.surrogate = SurrogateModel(self.settings['surrogate']['method'],
self.settings['space']['corners'],
**settings_)
if self.settings['surrogate']['method'] == 'pc':
self.space.empty()
sample = self.surrogate.predictor.sample
self.space += sample
if not self.provider.known_points:
self.to_compute_points = sample[:len(self.space)]
else:
self.surrogate = None
self.logger.info('No surrogate is computed.')
[docs] def sampling(self, points=None, update=False):
"""Create snapshots.
Generates or retrieve the snapshots [and then perform the POD].
:param :class:`Space` points: points to perform the sample from.
:param bool update: perform dynamic or static computation.
"""
if points is None:
points = self.to_compute_points
# Generate snapshots
if isinstance(points, dict):
snapshot_points = points.items()
else:
snapshot_root = os.path.join(self.fname, self.fname_tree['snapshots'])
snapshot_points = [(point,
os.path.join(snapshot_root,
str(i + self.snapshot_counter)))
for i, point in enumerate(points)]
snapshots = [self.provider.snapshot(p, d) for p, d in snapshot_points]
self.snapshot_counter += len(snapshots)
# Fit the Surrogate [and POD]
if self.pod is not None:
if update:
self.surrogate.space.empty()
[self.pod.update(snapshot) for snapshot in snapshots]
else:
self.pod.decompose(snapshots)
self.data = self.pod.VS()
points = self.pod.space
else:
if snapshots:
snapdata = np.vstack([snap.data for snap in snapshots])
if update:
snapdata = np.vstack([self.data, snapdata])
if len(snapdata) != len(self.space): # no resampling
snapdata = self.data
for snapshot in snapshots:
self.space += snapshot.point
snapdata = np.vstack([snapdata, snapshot.data])
self.data = snapdata
points = self.space
try: # if surrogate
self.surrogate.fit(points, self.data, pod=self.pod)
except AttributeError:
pass
[docs] def resampling(self):
"""Resampling of the parameter space.
Generate new samples if quality and number of sample are not satisfied.
From a new sample, it re-generates the POD.
"""
self.logger.info("\n----- Resampling parameter space -----")
method = self.settings['space']['resampling']['method']
extremum = self.settings['space']['resampling'].get('extremum')
hybrid = self.settings['space']['resampling'].get('hybrid')
discrete = self.settings['space']['sampling'].get('discrete')
delta_space = self.settings['space']['resampling'].get('delta_space', 0.08)
q2_criteria = self.settings['space']['resampling'].get('q2_criteria')
pdf = self.settings.get('uq', {}).get('pdf')
while len(self.space) < self.space.max_points_nb:
self.logger.info("-> New iteration")
if (method != 'optimization') and (q2_criteria is not None):
quality, point_loo = self.surrogate.estimate_quality()
if quality >= q2_criteria:
break
else:
point_loo = None
new_point = self.space.refine(self.surrogate,
method,
point_loo=point_loo,
delta_space=delta_space, dists=pdf,
hybrid=hybrid, discrete=discrete,
extremum=extremum)
try:
self.sampling(new_point, update=True)
except ValueError:
break
if method == 'optimization':
self.space.optimization_results(extremum=extremum)
[docs] def write(self):
"""Write Surrogate [and POD] to disk."""
if self.surrogate is not None:
path = os.path.join(self.fname, self.fname_tree['surrogate'])
try:
os.makedirs(path)
except OSError:
pass
self.surrogate.write(path)
else:
path = os.path.join(self.fname, self.fname_tree['space'])
self.space.write(path)
if self.pod is not None:
path = os.path.join(self.fname, self.fname_tree['pod'])
try:
os.makedirs(path)
except OSError:
pass
self.pod.write(path)
elif (self.pod is None) and (self.surrogate is None):
path = os.path.join(self.fname, self.fname_tree['data'])
with open(path, 'wb') as fdata:
pickler = pickle.Pickler(fdata)
pickler.dump(self.data)
self.logger.debug('Wrote data to {}'.format(path))
[docs] def read(self):
"""Read Surrogate [and POD] from disk."""
if self.surrogate is not None:
self.surrogate.read(os.path.join(self.fname,
self.fname_tree['surrogate']))
self.space[:] = self.surrogate.space[:]
self.data = self.surrogate.data
else:
path = os.path.join(self.fname, self.fname_tree['space'])
self.space.read(path)
if self.pod is not None:
self.pod.read(os.path.join(self.fname, self.fname_tree['pod']))
self.surrogate.pod = self.pod
elif (self.pod is None) and (self.surrogate is None):
path = os.path.join(self.fname, self.fname_tree['data'])
with open(path, 'rb') as fdata:
unpickler = pickle.Unpickler(fdata)
self.data = unpickler.load()
self.logger.debug('Data read from {}'.format(path))
[docs] def restart(self):
"""Restart process."""
# Surrogate [and POD] has already been computed
self.logger.info('Restarting from previous computation...')
to_compute_points = self.space[:]
self.read() # Reset space with actual computations
self.snapshot_counter = len(self.space)
if self.snapshot_counter < len(to_compute_points):
# will add new points to be processed
# [static or dynamic pod is finished]
self.to_compute_points = [p for p in to_compute_points
if p not in self.space]
else:
# automatic resampling has to continue from
# the processed points
self.to_compute_points = []
[docs] def prediction(self, points, write=False):
"""Perform a prediction.
:param points: point(s) to predict.
:type points: :class:`space.point.Point` or array_like (n_samples, n_features).
:param bool write: whether to write snapshots.
:return: Result.
:rtype: array_like (n_samples, n_features).
:return: Standard deviation.
:rtype: array_like (n_samples, n_features).
"""
results, sigma = self.surrogate(points)
if write:
root_path = os.path.join(self.fname, self.fname_tree['predictions'])
try:
points[0][0]
except TypeError:
points = [points]
for i, (data, point) in enumerate(zip(results, points)):
path = os.path.join(root_path, 'Newsnap{}'.format(i))
try:
os.makedirs(path)
except OSError:
pass
self.snapshot_io.write_point(path, point)
self.snapshot_io.write_data(path, data)
return results, sigma
[docs] def uq(self):
"""Perform UQ analysis."""
args = {}
args['fname'] = os.path.join(self.fname, self.fname_tree['uq'])
args['space'] = self.space
args['indices'] = self.settings['uq']['type']
args['plabels'] = self.settings['snapshot']['plabels']
args['dists'] = self.settings['uq']['pdf']
args['nsample'] = self.settings['uq']['sample']
if self.pod is not None:
args['data'] = self.pod.mean_snapshot + np.dot(self.pod.U, self.data.T).T
else:
args['data'] = self.data
args['test'] = self.settings['uq'].get('test')
args['xdata'] = self.settings.get('visualization', {}).get('xdata')
args['xlabel'] = self.settings.get('visualization', {}).get('xlabel')
args['flabel'] = self.settings.get('visualization', {}).get('flabel')
analyse = UQ(self.surrogate, **args)
if self.surrogate is None:
self.logger.warning("No surrogate model, be sure to have a "
"statistically significant sample to trust "
"following results.")
analyse.sobol()
analyse.error_propagation()
[docs] def visualization(self):
"""Apply visualisation options."""
p_len = len(self.settings['space']['corners'][0])
# In case of POD, data need to be converted from modes to snapshots.
if self.pod is not None:
data = self.pod.mean_snapshot + np.dot(self.pod.U, self.data.T).T
else:
data = self.data
output_len = np.asarray(data).shape[1]
self.logger.info('Creating response surface...')
args = {}
if 'visualization' in self.settings:
# xdata for output with dim > 1
if ('xdata' in self.settings['visualization']) and (output_len > 1):
args['xdata'] = self.settings['visualization']['xdata']
elif output_len > 1:
args['xdata'] = np.linspace(0, 1, output_len)
# Plot Doe if doe option is True
if ('doe' in self.settings['visualization']) and\
self.settings['visualization']['doe']:
args['doe'] = self.space
# Display resampling if resampling option is true
if ('resampling' in self.settings['visualization']) and\
self.settings['visualization']['resampling']:
args['resampling'] = self.settings['space']['resampling']['resamp_size']
else:
args['resampling'] = 0
args['ticks_nbr'] = self.settings.get('visualization', {}).get('ticks_nbr', 10)
args['contours'] = self.settings.get('visualization', {}).get('contours')
args['range_cbar'] = self.settings.get('visualization', {}).get('range_cbar')
args['axis_disc'] = self.settings.get('visualization', {}).get('axis_disc')
else:
args['xdata'] = np.linspace(0, 1, output_len) if output_len > 1 else None
try:
args['bounds'] = self.settings['visualization']['bounds']
for i, _ in enumerate(args['bounds'][0]):
if (args['bounds'][0][i] < self.settings['space']['corners'][0][i])\
or (args['bounds'][1][i] > self.settings['space']['corners'][1][i]):
args['bounds'] = self.settings['space']['corners']
self.logger.warning("Specified bounds for visualisation are "
"wider than space corners. Default value used.")
except KeyError:
args['bounds'] = self.settings['space']['corners']
# Data based on surrogate model (function) or not
if 'surrogate' in self.settings:
args['fun'] = self.func
else:
args['sample'] = self.space
args['data'] = data
try:
args['plabels'] = self.settings['visualization']['plabels']
except KeyError:
args['plabels'] = self.settings['snapshot']['plabels']
if len(self.settings['snapshot']['flabels']) < 2:
try:
args['flabel'] = self.settings['visualization']['flabel']
except KeyError:
args['flabel'] = self.settings['snapshot']['flabels'][0]
path = os.path.join(self.fname, self.fname_tree['visualization'])
try:
os.makedirs(path)
except OSError:
pass
if p_len < 5:
# Creation of the response surface(s)
args['fname'] = os.path.join(path, 'Response_Surface')
response_surface(**args)
else:
# Creation of the Kiviat image
args['fname'] = os.path.join(path, 'Kiviat.pdf')
args['sample'] = self.space
args['data'] = data
if 'range_cbar' not in args:
args['range_cbar'] = None
if 'ticks_nbr' not in args:
args['ticks_nbr'] = 10
if 'kiviat_fill' not in args:
args['kiviat_fill'] = True
kiviat = Kiviat3D(args['sample'], args['bounds'], args['data'],
plabels=args['plabels'],
range_cbar=args['range_cbar'])
kiviat.plot(fname=args['fname'], flabel=args['flabel'],
ticks_nbr=args['ticks_nbr'], fill=args['kiviat_fill'])
# Creation of the Kiviat movie:
args['fname'] = os.path.join(path, 'Kiviat.mp4')
rate = 400
kiviat.f_hops(frame_rate=rate, fname=args['fname'],
flabel=args['flabel'], fill=args['kiviat_fill'],
ticks_nbr=args['ticks_nbr'])
@multi_eval
def func(self, coords):
"""Evaluate the surrogate at a given point.
This function calls the surrogate to compute a prediction.
:param lst coords: The parameters set to calculate the solution from.
:return: The fonction evaluation.
:rtype: float.
"""
f_eval, _ = self.surrogate(coords)
return f_eval[0]