Source code for batman.functions.telemac_mascaret.run_mascaret.mascaret_api

# coding: utf-8
"""
MascaretAPI class
=================

Python Wrapper for Mascaret.

"""
import sys
import os
import logging
import json
from collections import OrderedDict
import ctypes
import itertools
import numpy as np
import batman.space.gp_1d_sampler as gp_1d_sampler


[docs]class MascaretApi(object): """Mascaret API.""" logger = logging.getLogger(__name__) _error = 0 @property def error(self): return self._error @error.setter def error(self, value): """Detect errors. Overwright attribute setter to detect API errors. If :attr:`error` is not set null, an error is raised and the programme is terminated. :param int value: value to assign. """ if value != 0: self.logger.error("API error:\n{}".format(self.error_message())) raise SystemExit self._error = 0
[docs] def __init__(self, settings, user_settings): """Constructor. 1. Loads the Mascaret library with :meth:`MascaretApi.load_mascaret`, 2. Creates an instance of Mascaret with :meth:`MascaretApi.create_model`, 3. Reads model files from "settings" with :meth:`MascaretApi.file_model`, 4. Gets model size with :meth:`MascaretApi.model_size`, 5. Gets the simulation times with :meth:`MascaretApi.simu_times`, 6. Reads and applies user defined parameters from ``user_settings``, 7. Initializes the model with :meth:`MascaretApi.init_model`. """ self.logger.info('Using MascaretApi') # Load the library mascaret.so libmascaret = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'lib/mascaret.so') self.load_mascaret(libmascaret) # Create an instance of MASCARET self.create_model() # Read model files self.file_model(settings) # Get model size self.nb_nodes = self.model_size # Get the simulation times self.simu_times # Read and apply user defined parameters self.user_defined(user_settings) # Get the curv abs for each index self.curv_abs() # Initialize model self.init_model() # Get Model.CrossSection
self.cross_section
[docs] def load_mascaret(self, libmascaret): """Load Mascaret library. :param str libmascaret: path to the library """ ld_library = os.environ['LD_LIBRARY_PATH'] self.logger.debug('LD_LIBRARY_PATH: {}'.format(ld_library)) self.logger.info('Loading {}...'.format(libmascaret)) if sys.platform.startswith('linux') or sys.platform.startswith('darwin'): try: self.libmascaret = ctypes.CDLL(libmascaret) except Exception as tb: self.logger.exception("Unable to load: mascaret.so. Check the " "environment variable LIBMASCARET: {}" .format(tb)) raise SystemExit else: self.logger.info('Library loaded.') else: self.logger.error('Unsupported OS. Only macOS or Unix!')
raise SystemExit @property def model_size(self): """Get model size (number of nodes). Uses :meth:`C_GET_TAILLE_VAR_MASCARET`. :return: Size of the model :rtype: int """ var_name = ctypes.c_char_p(b'Model.X') nb_nodes = ctypes.c_int() il_temp1 = ctypes.c_int() il_temp2 = ctypes.c_int() self.logger.debug("Getting size var in model #{}..." .format(self.id_masc.value)) self.error = self.libmascaret.C_GET_TAILLE_VAR_MASCARET(self.id_masc, var_name, 0, ctypes.byref(nb_nodes), ctypes.byref(il_temp1), ctypes.byref(il_temp2)) self.logger.debug('Get nb of nodes: size_X={}.' .format(nb_nodes.value)) return nb_nodes
[docs] def init_model(self): """Initialize the model from constant values. ``init_cst`` in :attr:`user_settings` along with ``Q_cst`` and ``Z_cst`` values or from :file:`file.lig` in :attr:`settings`. Uses Mascaret Api :meth:`C_INIT_LIGNE_MASCARET` or :meth:`C_INIT_ETAT_MASCARET`. """ if 'init_cst' in self.user_settings: # Initialize Mascaret Model from values q = self.user_settings['init_cst']['Q_cst'] * self.nb_nodes.value z = self.user_settings['init_cst']['Z_cst'] * self.nb_nodes.value q_c = (ctypes.c_double * self.nb_nodes.value)(q) z_c = (ctypes.c_double * self.nb_nodes.value)(z) self.logger.debug('Initilizing MASCARET from constant value...') self.error = self.libmascaret.C_INIT_LIGNE_MASCARET( self.id_masc, ctypes.byref(q_c), ctypes.byref(z_c), self.nb_nodes) self.logger.debug( 'State constant initialisation successfull from constant value.') else: # Initialize Mascaret Model from file init_file_name_c = (ctypes.c_char_p)( *[self.settings['files']['lig']]) self.logger.debug('Initializing MASCARET from lig...') self.error = self.libmascaret.C_INIT_ETAT_MASCARET( self.id_masc, init_file_name_c, self.iprint)
self.logger.debug('State initialisation successfull from lig.') @property def simu_times(self): """Get the simulation times from :file:`.xcas` in :attr:`settings`. Uses Mascaret Api :meth:`C_GET_DOUBLE_MASCARET`. :return: time step, initial time and final time :rtype: tuple(float) """ # dt self.dt = ctypes.c_double() var_name = ctypes.c_char_p(b'Model.DT') self.logger.debug('Getting time step...') self.error = self.libmascaret.C_GET_DOUBLE_MASCARET( self.id_masc, var_name, 0, 0, 0, ctypes.byref(self.dt)) self.logger.debug('Time step: dt={}.'.format(self.dt.value)) # t0 self.t0 = ctypes.c_double() var_name = ctypes.c_char_p(b'Model.InitTime') self.logger.debug('Getting initial time...') self.error = self.libmascaret.C_GET_DOUBLE_MASCARET( self.id_masc, var_name, 0, 0, 0, ctypes.byref(self.t0)) self.logger.debug('Initial time: t0={}.'.format(self.t0.value)) # tend self.tend = ctypes.c_double() var_name = ctypes.c_char_p(b'Model.MaxCompTime') self.logger.debug('Getting end time...') self.error = self.libmascaret.C_GET_DOUBLE_MASCARET( self.id_masc, var_name, 0, 0, 0, ctypes.byref(self.tend)) self.logger.debug('End time: tend={}.'.format(self.tend.value)) return (self.dt.value, self.t0.value, self.tend.value)
[docs] def create_model(self): """Create an instance of Mascaret. Uses Mascaret Api :meth:`C_CREATE_MASCARET`. """ id_masc = ctypes.c_int() self.logger.debug('Creating a model...') self.error = self.libmascaret.C_CREATE_MASCARET(ctypes.byref(id_masc)) self.logger.debug('Model created.') self.id_masc = ctypes.c_int(id_masc.value) # .opt and .lis written only if iprint = 1 at import AND calcul steps
self.iprint = 1
[docs] def file_model(self, settings): """Read model files from :file:`settings` which is a *JSON* file. (.xcas, .geo, .lig, .loi, .dtd) Uses Mascaret Api :meth:`C_IMPORT_MODELE_MASCARET`. :param str settings: path of *JSON* settings file """ with open(settings, 'rb') as file: file = file.read().decode('utf8') self.settings = json.loads( file, encoding="utf-8", object_pairs_hook=OrderedDict) self.nb_bc = None # Convert all values of settings from str to bytes file_type = [] file_name = [] for key_val in self.settings['files'].items(): try: value = key_val[1].encode('utf8') file_name.append(value) self.settings['files'][key_val[0]] = value file_type.append(key_val[0].encode('utf8')) except AttributeError: # In case of a list, loop over it for i, sub in enumerate(key_val[1]): sub_value = sub.encode('utf8') file_name.append(sub_value) self.settings['files'][key_val[0]][i] = sub_value file_type.append(key_val[0].encode('utf8')) # Import a model len_file = len(file_name) file_name_c = (ctypes.c_char_p * len_file)(*file_name) file_type_c = (ctypes.c_char_p * len_file)(*file_type) self.logger.debug('Importing a model...') self.error = self.libmascaret.C_IMPORT_MODELE_MASCARET(self.id_masc, file_name_c, file_type_c, len_file, self.iprint) self.logger.info("Model imported with:\n-> file_name: {}\n-> file_type: {}."
.format(file_name, file_type)) def __del__(self): """Delete a model.""" self.logger.debug('Deleting instance #{}...'.format(self.id_masc.value)) self.error = self.libmascaret.C_DELETE_MASCARET(self.id_masc) self.logger.debug("Model #{} deleted.".format(self.id_masc.value))
[docs] def user_defined(self, user_settings=None): """Read user parameters from :file:`user_settings`` and apply values. Look for ``Q_BC`` (``Q_BC={'idx','value'}``) and ``Ks`` (``Ks={'zone','idx','value', 'ind_zone'}``) (the ``Ks`` for 1 point or 1 zone). Use :meth:`zone_friction_minor`, :meth:`friction_minor` and :meth:`bc_qt`. :param str user_settings: Path of the *JSON* settings file """ if user_settings is not None: with open(user_settings, 'rb') as file: file = file.read().decode('utf8') self.user_settings = json.loads( file, encoding="utf-8", object_pairs_hook=OrderedDict) if 'Q_BC' in self.user_settings: self.logger.debug('In user_defined for bc_qt') self.bc_qt = self.user_settings['Q_BC'] if 'Ks' in self.user_settings: self.logger.debug('In user_defined for ks') if self.user_settings['Ks']['zone']: self.zone_friction_minor = self.user_settings['Ks'] else: self.friction_minor = self.user_settings['Ks'] if 'bathy' in self.user_settings: self.logger.debug('In user_defined for bathy')
self.cross_section = self.user_settings['bathy'] def __repr__(self): """Class informations based on settings.""" string = ("MODEL FILES:\n" " -- xcas: {}\n" " -- geo: {}\n") if 'casier' in self.settings['files']: string += (" -- casier: {}\n") string += (" -- res: {}\n") if 'casier' in self.settings['files']: string += (" -- res_casier: {}\n" " -- res_liaison: {}\n") string += (" -- listing: {}\n") if 'casier' in self.settings['files']: string += (" -- listing_casier: {}\n" " -- listing_liaison: {}\n") if 'casier' in self.settings['files']: string += (" -- damocle: {}\n") string += (" -- lig: {}\n" " -- loi:\n") for file1 in self.settings['files']['loi']: string += ' {}\n' string += '\nUSER SETTINGS:\n' if 'init_cst' in self.user_settings: string += (" -- Init model with cst values:\n" " > Q_cst: {}\n" " > Z_cst: {}\n") if 'Q_BC' in self.user_settings: string += (" -- Change the upstream flow rate:\n" " > Index: {}\n" " > Value: {}\n") if 'Ks' in self.user_settings: string += (" -- Change the friction coefficient:\n" " > By zone: {}\n" " > Index: {}\n" " > Value: {}\n" " > Zone index: {}\n") if 'MC' in self.user_settings: string += (" -- Monte-Carlo settings:\n" " > Ks distribution: {}\n" " > Ks parameter 1: {}\n" " > Ks parameter 2: {}\n" " > Number of simulations: {}\n") if 'bathy' in self.user_settings: string += (" -- Change bathymetry:\n" " > Flag shift all bathy by dz: {}\n" " > Shift Index Profil: {}\n" " > Shift dz: {}\n" " > Gp Lp: {}\n") if 'misc' in self.user_settings: string += (" -- Miscellaneous:\n" " > Print boundary conditions: {}\n" " > Output all state: {}\n" " > Output index: {}\n") src1 = list(itertools.chain.from_iterable([v.values() if isinstance( v, dict) else [v] for v in self.settings['files'].values()])) src2 = list(itertools.chain.from_iterable([v.values() if isinstance( v, dict) else [v] for v in self.user_settings.values()])) src = list(itertools.chain.from_iterable(v for v in [src1, src2])) src_ = [] for v in src: if isinstance(v, list): for w in v: src_.append(w) else: src_.append(v) return string.format(*src_)
[docs] def run_mascaret(self, x=None, Qtime=None, flag=None, saveall=False): """Run Mascaret simulation. Use Mascaret Api :meth:`C_CALCUL_MASCARET`. If :attr:`x` if not None, ``Ks`` and ``Q`` are modified before running. When :attr:`flag` is None, both parameters are modified. Thus :attr:`x` needs to be set accordingly. If the flag is set to ``Ks``, then only this parameter is considered. If :attr:`y` if not None, the BC are provided in .csv :param list x: inputs [Ks, Q] :param str flag: None, 'Ks' or 'Q' :param bool saveall: Change the default name of the Results file :return: water level at :attr:`index_outstate` or all state if `all_outstate` is true :rtype: double """ if x is not None: if (flag is None) or (flag == 'Ks'): if self.user_settings['Ks']['zone']: self.zone_friction_minor = { 'ind_zone': self.user_settings['Ks']['ind_zone'], 'value': x[0]} else: self.friction_minor = {'idx': self.user_settings['Ks']['idx'], 'value': x[0]} elif flag == 'Q': self.bc_qt = {'idx': self.user_settings['Q_BC']['idx'], 'value': x[0]} if flag is None: self.bc_qt = {'idx': self.user_settings['Q_BC']['idx'], 'value': x[1]} self.empty_opt() self.logger.info('Running Mascaret...') self.error = self.libmascaret.C_CALCUL_MASCARET(self.id_masc, self.t0, self.tend, self.dt, self.iprint) elif Qtime is not None: nb_timebc, tab_timebc_c, tab_CL1_c, tab_CL2_c = Qtime self.error = self.libmascaret.C_CALCUL_MASCARET_CONDITION_LIMITE(self.id_masc, self.t0, self.tend, self.dt, ctypes.byref( tab_timebc_c), nb_timebc, ctypes.byref( tab_CL1_c), ctypes.byref( tab_CL2_c), self.iprint) else: # self.user_defined() # user_defined here when bathy self.empty_opt() self.logger.info('Running Mascaret...') self.error = self.libmascaret.C_CALCUL_MASCARET(self.id_masc, self.t0, self.tend, self.dt, self.iprint) self.logger.info('Mascaret ran.') if saveall: os.rename('ResultatsOpthyca.opt', 'ResultatsOpthyca_' + str(x).replace(' ', '-') + '.opt') if 'curv_abs' in self.user_settings['misc']: ticks = self.curv_abs() i = 0 for tick in ticks: if tick > self.user_settings['misc']['curv_abs']: id_tick = i - 1 break else: i += 1 y1 = self.state(id_tick).value y2 = self.state(id_tick + 1).value output = (y2 * (self.user_settings['misc']['curv_abs'] - ticks[id_tick]) + y1 * (ticks[id_tick + 1] - self.user_settings['misc']['curv_abs']))\ / (ticks[id_tick + 1] - ticks[id_tick]) return output elif self.user_settings['misc']['all_outstate'] is True: self.logger.debug('Z in class: {}'.format(self.allstate())) self.logger.debug('Q in class: {}'.format(self.allstateQ())) return self.curv_abs(), self.allstate(), self.allstateQ() else:
return self.state(self.user_settings['misc']['index_outstate']).value
[docs] def __call__(self, x=None, Qtime=None, saveall=False): """Run the application using :attr:`user_settings`. :param list x: inputs [Ks, Q] :param bool saveall: Change the default name of the Results file """ if ('MC' in self.user_settings) and (x is None): self.logger.info('Performing a Monte-Carlo analysis...') try: n = self.user_settings['MC']['Ne'] except KeyError: n = 1 nx = ('distKs' in self.user_settings['MC'])\ + ('distQ' in self.user_settings['MC']) self.doe = np.empty((self.user_settings['MC']['Ne'], nx)) if 'distKs' in self.user_settings['MC']: ks = self.doe[:, 0] flag = 'Ks' if self.user_settings['MC']['distKs'] == "G": ks[:] = np.random.normal( self.user_settings['MC']['muKs'], self.user_settings['MC']['sigmaKs'], n) elif self.user_settings['MC']['distKs'] == "U": ks[:] = np.random.uniform( self.user_settings['MC']['minKs'], self.user_settings['MC']['maxKs'], n) if 'distQ' in self.user_settings['MC']: if 'distKs' in self.user_settings['MC']: q = self.doe[:, 1] flag = None else: q = self.doe[:, 0] flag = 'Q' if self.user_settings['MC']['distQ'] == "G": q[:] = np.random.normal( self.user_settings['MC']['muQ'], self.user_settings['MC']['sigmaQ'], n) elif self.user_settings['MC']['distQ'] == "U": q[:] = np.random.uniform( self.user_settings['MC']['minQ'], self.user_settings['MC']['maxQ'], n) self.logger.debug('Design of Experiment:\n{}'.format(self.doe)) h = self.run_mascaret(self.doe, flag=flag, saveall=True) else: self.logger.info('Performing a single MASCARET simulation...') h = self.run_mascaret(x, Qtime=Qtime, saveall=saveall) self.results = h
return self.results
[docs] def error_message(self): """Error message wrapper. :return: Error message :rtype: str """ err_mess_c = ctypes.POINTER(ctypes.c_char_p)() error = self.libmascaret.C_GET_ERREUR_MASCARET(self.id_masc, ctypes.byref(err_mess_c)) if error != 0: return 'Error could not be retrieved from MASCARET...'
return ctypes.string_at(err_mess_c)
[docs] def info_all_bc(self): """Return numbers and names of all boundary conditions. Use Mascaret Api :meth:`C_GET_NOM_CONDITION_LIMITE_MASCARET`. :return: :rtype: float, list(float), list(float) """ # Rating curve do not count nb_bc = ctypes.c_int() self.logger.debug( 'Info all : Getting the number of boundary conditions...') self.error = self.libmascaret.C_GET_NB_CONDITION_LIMITE_MASCARET( self.id_masc, ctypes.byref(nb_bc)) self.nb_bc = nb_bc.value self.logger.debug( ' Info all : Number of boundary conditions: {}.'.format(self.nb_bc)) l_name_all_bc = [] l_num_all_bc = [] self.logger.debug( 'Info all Getting name of the boundary conditions...') for k in range(nb_bc.value): name_all_bc = ctypes.POINTER(ctypes.c_char_p)() n_law = ctypes.c_int() self.error = self.libmascaret.C_GET_NOM_CONDITION_LIMITE_MASCARET( self.id_masc, k + 1, ctypes.byref(name_all_bc), ctypes.byref(n_law)) l_name_all_bc.append(ctypes.string_at(name_all_bc)) l_num_all_bc.append(n_law.value) self.l_name_all_bc = l_name_all_bc self.l_num_all_bc = l_num_all_bc self.logger.debug("In info_all_bc: _allself.l_name_all_bc" "self.l_num_all_bc" .format(self.l_name_all_bc, self.l_num_all_bc)) self.logger.debug('Info all: BC info get.')
return nb_bc, l_name_all_bc, l_num_all_bc @property def bc_qt(self): """Get boundary conditions Qt. Use Mascaret Api :meth:`C_GET_TAILLE_VAR_MASCARET` and :meth:`C_GET_DOUBLE_MASCARET`. :return: boundary conditions for Qt :rtype: list(float) """ self.logger.debug('In bc_qt Getter') var_name = ctypes.c_char_p(b'Model.Graph.Discharge') # Nb of BC size1 = ctypes.c_int() # Nb of time steps in BC size2 = ctypes.c_int() # Not used (0) size3 = ctypes.c_int() self.logger.debug('Getting the size of Model.Graph.Discharge...') self.error = self.libmascaret.C_GET_TAILLE_VAR_MASCARET( self.id_masc, var_name, 0, ctypes.byref(size1), ctypes.byref(size2), ctypes.byref(size3)) self.logger.debug('Size Model.Graph.Discharge= {} {} {}.' .format(size1.value, size2.value, size3.value)) bc_qt = np.ones((size1.value, size2.value), float) self.logger.debug('Getting discharge values...') for k, kk in itertools.product(range(size1.value), range(size2.value)): q_bc_c = ctypes.c_double() num_bc_c = ctypes.c_int(k + 1) indextime_bc_c = ctypes.c_int(kk + 1) self.error = self.libmascaret.C_GET_DOUBLE_MASCARET( self.id_masc, var_name, num_bc_c, indextime_bc_c, 0, ctypes.byref(q_bc_c)) bc_qt[k, kk] = q_bc_c.value self.logger.debug('BC Q(t) get.') if self.user_settings['misc']['info_bc'] is True: if self.nb_bc is None: self.info_all_bc() for k in range(self.nb_bc): self.logger.info("Info Getter Loi Q: {} {} {}" .format(self.l_name_all_bc[k], self.l_num_all_bc[k], bc_qt[k, :])) return bc_qt @bc_qt.setter def bc_qt(self, q_bc): """Set boundary condition Qt. Use Mascaret Api :meth:`C_GET_TAILLE_VAR_MASCARET` and :meth:`C_SET_DOUBLE_MASCARET`. :param dict q_bc: Boundary conditions on Qt ``{'idx','value'}`` """ self.logger.debug('In bc_qt Setter') new_tab_q_bc = self.bc_qt idx, value = q_bc['idx'], q_bc['value'] new_tab_q_bc[idx, :] = value var_name = ctypes.c_char_p(b'Model.Graph.Discharge') # Nb of BC size1 = ctypes.c_int() # Nb of time steps in BC size2 = ctypes.c_int() # Not used (0) size3 = ctypes.c_int() self.logger.debug('Setting the size of Model.Graph.Discharge...') self.error = self.libmascaret.C_GET_TAILLE_VAR_MASCARET( self.id_masc, var_name, 0, ctypes.byref(size1), ctypes.byref(size2), ctypes.byref(size3)) self.logger.debug('Size Model.Graph.Discharge= {} {} {}.' .format(size1.value, size2.value, size3.value)) self.logger.debug('Setting discharge values...') for k, kk in itertools.product(range(size1.value), range(size2.value)): q_bc_c = ctypes.c_double() num_bc_c = ctypes.c_int(k + 1) indextime_bc_c = ctypes.c_int(kk + 1) q_bc_c.value = new_tab_q_bc[k, kk] self.error = self.libmascaret.C_SET_DOUBLE_MASCARET( self.id_masc, var_name, num_bc_c, indextime_bc_c, 0, q_bc_c) self.logger.debug('BC Q(t) set {}'.format(self.bc_qt)) @property def ind_zone_frot(self): """Get indices of the beginning and end of all the friction zones. Use Mascaret Api :meth:`C_GET_TAILLE_VAR_MASCARET` and :meth:`C_GET_INT_MASCARET`. :return: Index of beginning and end :rtype: list(int) """ size1 = ctypes.c_int() size2 = ctypes.c_int() size3 = ctypes.c_int() var_name = ctypes.c_char_p(b'Model.FrictionZone.FirstNode') self.logger.debug( 'Getting the size of Model.FrictionZone.FirstNode...') self.error = self.libmascaret.C_GET_TAILLE_VAR_MASCARET( self.id_masc, var_name, 0, ctypes.byref(size1), ctypes.byref(size2), ctypes.byref(size3)) self.logger.debug( 'Number of Friction Zones at first node: {}.'.format(size1.value)) l_ind_beg_zone = [] self.logger.debug('Getting friction indices at the beginning...') for k in range(size1.value): ind_beg_zone_c = ctypes.c_int() self.error = self.libmascaret.C_GET_INT_MASCARET( self.id_masc, var_name, k + 1, 0, 0, ctypes.byref(ind_beg_zone_c)) l_ind_beg_zone.append(ind_beg_zone_c.value) self.logger.debug('Friction indices at the beginning get.') var_name = ctypes.c_char_p(b'Model.FrictionZone.LastNode') self.logger.debug('Getting the size of Model.FrictionZone.LastNode...') self.error = self.libmascaret.C_GET_TAILLE_VAR_MASCARET( self.id_masc, var_name, 0, ctypes.byref(size1), ctypes.byref(size2), ctypes.byref(size3)) self.logger.debug( 'Number of Friction Zones at last node: {}.'.format(size1.value)) l_ind_end_zone = [] self.logger.debug('Getting friction indices at the end...') for k in range(size1.value): ind_end_zone_c = ctypes.c_int() self.error = self.libmascaret.C_GET_INT_MASCARET( self.id_masc, var_name, k + 1, 0, 0, ctypes.byref(ind_end_zone_c)) l_ind_end_zone.append(ind_end_zone_c.value) self.logger.debug('Friction indices at the end get.') self.logger.debug('Get list index for all friction zones OK.') return l_ind_beg_zone, l_ind_end_zone @property def zone_friction_minor(self): """Get minor friction coefficient at zone :attr:`ind_zone`. Use :attr:`ind_zone_frot` and :attr:`friction_minor`. :return: Friction coefficient at zone :rtype: list(float) """ l_ind_beg_zone, l_ind_end_zone = self.ind_zone_frot idx_beg_zone = l_ind_beg_zone[self.ind_zone] idx_end_zone = l_ind_end_zone[self.ind_zone] zone_friction = [] for index in range(idx_beg_zone, idx_end_zone + 1): self.ks_idx = index zone_friction.append(self.friction_minor) self.logger.debug('Zone Ks get.') return zone_friction @zone_friction_minor.setter def zone_friction_minor(self, ks): """Change minor friction coefficient at zone. Use :attr:`ind_zone_frot` and :meth:`friction_minor`. :param dict ks: Friction coeffcient at zone ``{'ind_zone','value'}`` """ ind_zone, value = ks['ind_zone'], ks['value'] l_ind_beg_zone, l_ind_end_zone = self.ind_zone_frot idx_beg_zone = l_ind_beg_zone[ind_zone] idx_end_zone = l_ind_end_zone[ind_zone] self.ind_zone = ind_zone for index in range(idx_beg_zone, idx_end_zone + 1): self.logger.debug(index) self.friction_minor = {'idx': index, 'value': value} self.logger.debug('Zone Ks set.') @property def friction_minor(self): """Get minor friction coefficient at index :attr:`.ks_idx`. Use Mascaret Api :meth:`C_GET_TAILLE_VAR_MASCARET` and :meth:`C_GET_DOUBLE_MASCARET`. :return: Minor friction coefficient :rtype: float """ var_name = ctypes.c_char_p(b'Model.FricCoefMainCh') size1 = ctypes.c_int() size2 = ctypes.c_int() size3 = ctypes.c_int() self.logger.debug('Getting the size of Model.FricCoefMainCh...') self.error = self.libmascaret.C_GET_TAILLE_VAR_MASCARET( self.id_masc, var_name, 0, ctypes.byref(size1), ctypes.byref(size2), ctypes.byref(size3)) self.logger.debug('Size of Model.FricCoefMinCh = {} {} {}.' .format(size1.value, size2.value, size3.value)) ks_c = ctypes.c_double() self.logger.debug('Getting friction values.') self.error = self.libmascaret.C_GET_DOUBLE_MASCARET( self.id_masc, var_name, self.ks_idx, 0, 0, ctypes.byref(ks_c)) self.logger.debug('Ks value= {}.'.format(ks_c.value)) return ks_c.value @friction_minor.setter def friction_minor(self, ks): """Change minor friction coefficient. Use Mascaret Api :meth:`C_SET_DOUBLE_MASCARET`. :param dict ks: Minor friction coefficient ``{'idx','value'}`` """ var_name = ctypes.c_char_p(b'Model.FricCoefMainCh') ks_c = ctypes.c_double(ks['value']) self.logger.debug('ks_c = {}'.format(ks_c.value)) self.logger.debug('Ks new value= {}'.format(ks)) self.ks_idx = ks['idx'] self.logger.debug('Setting Ks value...') self.error = self.libmascaret.C_SET_DOUBLE_MASCARET( self.id_masc, var_name, self.ks_idx, 0, 0, ks_c) self.logger.debug('Ks changed.')
[docs] def allstate(self): """Get state at all simulation points in :attr:`user_settings['misc']['all_outstate']`. Use Mascaret Api :meth:`C_GET_TAILLE_VAR_MASCARET` and :meth:`C_GET_DOUBLE_MASCARET`. :boolean index: Flag to return all state :return: State at each simulation point :rtype: list of floats """ var_name = ctypes.c_char_p(b'State.Z') itemp0 = ctypes.c_int() itemp1 = ctypes.c_int() itemp2 = ctypes.c_int() self.logger.debug('Getting the size of State.Z...') self.error = self.libmascaret.C_GET_TAILLE_VAR_MASCARET( self.id_masc, var_name, 0, ctypes.byref(itemp0), ctypes.byref(itemp1), ctypes.byref(itemp2)) self.logger.debug('itemp= {} {} {}.' .format(itemp0.value, itemp1.value, itemp2.value)) state_res = [] z_res_c = ctypes.c_double() self.logger.debug('Getting the value of all State.Z...') for index in range(1, itemp0.value + 1): self.error = self.libmascaret.C_GET_DOUBLE_MASCARET( self.id_masc, var_name, index, 0, 0, ctypes.byref(z_res_c)) state_res.append(z_res_c.value) self.logger.debug('State get.')
return state_res
[docs] def allstateQ(self): """Get state Q at all simulation points in :attr:`user_settings['misc']['all_outstate']`. Use Mascaret Api :meth:`C_GET_TAILLE_VAR_MASCARET` and :meth:`C_GET_DOUBLE_MASCARET`. :boolean index: Flag to return all state :return: State at each simulation point :rtype: list of floats """ var_name = ctypes.c_char_p(b'State.Q') itemp0 = ctypes.c_int() itemp1 = ctypes.c_int() itemp2 = ctypes.c_int() self.logger.debug('Getting the size of State.Q...') self.error = self.libmascaret.C_GET_TAILLE_VAR_MASCARET( self.id_masc, var_name, 0, ctypes.byref(itemp0), ctypes.byref(itemp1), ctypes.byref(itemp2)) self.logger.debug('itemp= {} {} {}.' .format(itemp0.value, itemp1.value, itemp2.value)) stateQ_res = [] q_res_c = ctypes.c_double() self.logger.debug('Getting the value of all State.Q...') for index in range(1, itemp0.value + 1): self.error = self.libmascaret.C_GET_DOUBLE_MASCARET( self.id_masc, var_name, index, 0, 0, ctypes.byref(q_res_c)) stateQ_res.append(q_res_c.value) self.logger.debug('State Q get.')
return stateQ_res
[docs] def state(self, index): """Get state at given index in :attr:`user_settings['misc']['index_outstate']`. Use Mascaret Api :meth:`C_GET_TAILLE_VAR_MASCARET` and :meth:`C_GET_DOUBLE_MASCARET`. :param float index: Index to get the state from :return: State at a given index :rtype: float """ var_name = ctypes.c_char_p(b'State.Z') itemp0 = ctypes.c_int() itemp1 = ctypes.c_int() itemp2 = ctypes.c_int() self.logger.debug('Getting the size of State.Z...') self.error = self.libmascaret.C_GET_TAILLE_VAR_MASCARET( self.id_masc, var_name, 0, ctypes.byref(itemp0), ctypes.byref(itemp1), ctypes.byref(itemp2)) self.logger.debug('itemp= {} {} {}.' .format(itemp0.value, itemp1.value, itemp2.value)) z_res_c = ctypes.c_double() self.logger.debug('Getting the value of State.Z...') self.error = self.libmascaret.C_GET_DOUBLE_MASCARET( self.id_masc, var_name, index, 0, 0, ctypes.byref(z_res_c)) self.logger.debug('State get.')
return z_res_c
[docs] def curv_abs(self): """Get abscurv over entire domain. Use Mascaret Api :meth:`C_GET_TAILLE_VAR_MASCARET` and :meth:`C_GET_DOUBLE_MASCARET`. :return: curv_abs list :rtype: list of float """ var_name = ctypes.c_char_p(b'Model.X') itemp0 = ctypes.c_int() itemp1 = ctypes.c_int() itemp2 = ctypes.c_int() self.logger.debug('Getting the size of Model.X...') self.error = self.libmascaret.C_GET_TAILLE_VAR_MASCARET( self.id_masc, var_name, 0, ctypes.byref(itemp0), ctypes.byref(itemp1), ctypes.byref(itemp2)) self.logger.debug('itemp= {} {} {}.' .format(itemp0.value, itemp1.value, itemp2.value)) res = [] x_res_c = ctypes.c_double() self.logger.debug('Getting the value of Model.X...') for k in range(1, itemp0.value + 1): self.error = self.libmascaret.C_GET_DOUBLE_MASCARET( self.id_masc, var_name, k, 0, 0, ctypes.byref(x_res_c)) self.logger.debug('x_res_c.value= {}.' .format(x_res_c.value)) res.append(x_res_c.value) self.logger.debug('Model.X get.')
return res @property def cross_section(self): """Get CrossSection everywhere. Uses Mascaret Api :meth:`C_GET_TAILLE_VAR_MASCARET` and :meth:`C_GET_DOUBLE_MASCARET`. Requires ```ZBOT:{idx, value}``` in :file:`user.json``. """ # Model.CrossSection.RelAbs curvilinear abscissa of the cross section var_name_RelAbs = ctypes.c_char_p(b'Model.CrossSection.RelAbs') # Model.CrossSection.X y-coordinate on the cross section var_name_X = ctypes.c_char_p(b'Model.CrossSection.X') # Model.CrossSection.Y z-coordinate on the cross section var_name_Y = ctypes.c_char_p(b'Model.CrossSection.Y') # Model.CrossSection.Zbot Min of z-coordinate on the cross section var_name_Z = ctypes.c_char_p(b'Model.CrossSection.Zbot') sizeRelAbs1 = ctypes.c_int() sizeRelAbs2 = ctypes.c_int() sizeRelAbs3 = ctypes.c_int() sizeX1 = ctypes.c_int() sizeX2 = ctypes.c_int() sizeX3 = ctypes.c_int() sizeY1 = ctypes.c_int() sizeY2 = ctypes.c_int() sizeY3 = ctypes.c_int() sizeZ1 = ctypes.c_int() sizeZ2 = ctypes.c_int() sizeZ3 = ctypes.c_int() self.error = self.libmascaret.C_GET_TAILLE_VAR_MASCARET( self.id_masc, var_name_RelAbs, 0, ctypes.byref(sizeRelAbs1), ctypes.byref(sizeRelAbs2), ctypes.byref(sizeRelAbs3)) self.logger.debug('In Getter, size Model.CrossSection.RelAbs = {} {} {}' .format(sizeRelAbs1.value, sizeRelAbs2.value, sizeRelAbs3.value)) self.error = self.libmascaret.C_GET_TAILLE_VAR_MASCARET( self.id_masc, var_name_X, 0, ctypes.byref(sizeX1), ctypes.byref(sizeX2), ctypes.byref(sizeX3)) self.logger.debug('In Getter, size Model.CrossSection.X = {} {} {}' .format(sizeX1.value, sizeX2.value, sizeX3.value)) self.error = self.libmascaret.C_GET_TAILLE_VAR_MASCARET( self.id_masc, var_name_Y, 0, ctypes.byref(sizeY1), ctypes.byref(sizeY2), ctypes.byref(sizeY3)) self.logger.debug('In Getter, size Model.CrossSection.Y = {} {} {}' .format(sizeY1.value, sizeY2.value, sizeY3.value)) self.error = self.libmascaret.C_GET_TAILLE_VAR_MASCARET( self.id_masc, var_name_Z, 0, ctypes.byref(sizeZ1), ctypes.byref(sizeZ2), ctypes.byref(sizeZ3)) self.logger.debug('In Getter, size Model.CrossSection.Zbot = {} {} {}' .format(sizeZ1.value, sizeZ2.value, sizeZ3.value)) res_Zbot = [] Zbot_c = ctypes.c_double() res_Z = [] Z_c = ctypes.c_double() res_RelAbs = [] RelAbs_c = ctypes.c_double() # Loop on number of section for n_sec in range(sizeX1.value): self.error = self.libmascaret.C_GET_DOUBLE_MASCARET( self.id_masc, var_name_Z, n_sec + 1, 0, 0, ctypes.byref(Zbot_c)) self.logger.debug( 'In Getter Cross Section, Zbot = {}'.format(Zbot_c.value)) res_Zbot.append(Zbot_c.value) # Loop on number of point by section for n_pts in range(sizeX2.value): self.error = self.libmascaret.C_GET_DOUBLE_MASCARET( self.id_masc, var_name_Y, n_sec + 1, n_pts + 1, 0, ctypes.byref(Z_c)) self.logger.debug( 'In Getter Cross Section, Z ={}'.format(Z_c.value)) res_Z.append(Z_c.value) self.error = self.libmascaret.C_GET_DOUBLE_MASCARET( self.id_masc, var_name_RelAbs, n_sec +1, 0, 0, ctypes.byref(RelAbs_c)) self.logger.debug( 'In Getter Cross Section, RelAbs ={}'.format(RelAbs_c.value)) res_RelAbs.append(RelAbs_c.value) res_RelAbsBot = np.unique(res_RelAbs) info = ("Getter Cross Section summary:\n" "table X: {}\n" "table X bot unique: {}\n" "table Z: {}\n" "table Z bot: {}" ).format(res_RelAbs, res_RelAbsBot, res_Z, res_Zbot) self.logger.info(info) return res_RelAbsBot, res_Zbot, res_RelAbs, res_Z @cross_section.setter def cross_section(self, bathy): """Change cross section z by +dz everywhere. Use Mascaret Api :meth:`C_GET_DOUBLE_MASCARET` and :meth:`C_SET_DOUBLE_MASCARET`. :param dict bathy: Displacement of all bathymetry ``{'bathy','all_bathy','idx','dz'}`` """ var_name_Y = ctypes.c_char_p(b'Model.CrossSection.Y') sizeY1 = ctypes.c_int() sizeY2 = ctypes.c_int() sizeY3 = ctypes.c_int() self.error = self.libmascaret.C_GET_TAILLE_VAR_MASCARET( self.id_masc, var_name_Y, 0, ctypes.byref(sizeY1), ctypes.byref(sizeY2), ctypes.byref(sizeY3)) self.logger.debug('In setter, size Model.CrossSection.Y = {} {} {}' .format(sizeY1.value, sizeY2.value, sizeY3.value)) var_name_Z = ctypes.c_char_p(b'Model.CrossSection.Zbot') sizeZ1 = ctypes.c_int() sizeZ2 = ctypes.c_int() sizeZ3 = ctypes.c_int() self.error = self.libmascaret.C_GET_TAILLE_VAR_MASCARET( self.id_masc, var_name_Z, 0, ctypes.byref(sizeZ1), ctypes.byref(sizeZ2), ctypes.byref(sizeZ3)) self.logger.debug('In setter, size Model.Zbot = {} {} {}' .format(sizeZ1.value, sizeZ2.value, sizeZ3.value)) if 'Lp' in self.user_settings['bathy']: sampler = gp_1d_sampler.Gp1dSampler(t_ini=self.cross_section[0][0], t_end=self.cross_section[0][-1], Nt=sizeZ1.value, sigma=bathy['dz'], theta=bathy['Lp'], x=[[self.cross_section[0][0]], [self.cross_section[0][-1]]]) shift_dz = sampler.sample()['Values'][0] else: shift_dz = np.zeros(sizeZ1.value, float) + bathy['dz'] Z_c = ctypes.c_double() Zbot_c = ctypes.c_double() if self.user_settings['bathy']['all_bathy'] is True: # Loop on number of section for n_sec in range(sizeY1.value): self.error = self.libmascaret.C_GET_DOUBLE_MASCARET( self.id_masc, var_name_Z, n_sec + 1, 0, 0, ctypes.byref(Zbot_c)) self.logger.debug( 'In Setter Cross Section, Zbot = {}'.format(Zbot_c.value)) new_Zbot_c = ctypes.c_double() new_Zbot_c.value = Zbot_c.value + shift_dz[n_sec] self.error = self.libmascaret.C_SET_DOUBLE_MASCARET( self.id_masc, var_name_Z, n_sec + 1, 0, 0, new_Zbot_c) # Loop on number of point by section for n_pts in range(sizeY2.value): self.error = self.libmascaret.C_GET_DOUBLE_MASCARET( self.id_masc, var_name_Y, n_sec + 1, n_pts + 1, 0, ctypes.byref(Z_c)) self.logger.debug( 'In Setter Cross Section, Z ={}'.format(Z_c.value)) new_Z_c = ctypes.c_double() new_Z_c.value = Z_c.value + shift_dz[n_sec] self.error = self.libmascaret.C_SET_DOUBLE_MASCARET( self.id_masc, var_name_Y, n_sec + 1, n_pts + 1, 0, new_Z_c) else: idx = bathy['idx'] self.logger.debug( 'In Setter Cross Section, profil idx = {}'.format(idx)) self.error = self.libmascaret.C_GET_DOUBLE_MASCARET( self.id_masc, var_name_Z, idx + 1, 0, 0, ctypes.byref(Zbot_c)) self.logger.debug( 'In Setter Cross Section, Zbot = {}'.format(Zbot_c.value)) new_Zbot_c = ctypes.c_double() new_Zbot_c.value = Zbot_c.value + shift_dz[0] self.error = self.libmascaret.C_SET_DOUBLE_MASCARET( self.id_masc, var_name_Z, idx + 1, 0, 0, new_Zbot_c) for n_pts in range(sizeY2.value): self.error = self.libmascaret.C_GET_DOUBLE_MASCARET( self.id_masc, var_name_Y, idx + 1, n_pts + 1, 0, ctypes.byref(Z_c)) self.logger.debug( 'In Setter Cross Section, Z = {}'.format(Z_c.value)) new_Z_c = ctypes.c_double() new_Z_c.value = Z_c.value + shift_dz[0] self.error = self.libmascaret.C_SET_DOUBLE_MASCARET( self.id_masc, var_name_Y, idx + 1, n_pts + 1, 0, new_Z_c) self.logger.debug( 'In Setter, Cross section bathymetry shifted by dz= {}'.format(shift_dz))
[docs] def empty_opt(self): """Hack to be able to re-launch Mascaret.""" with open("ResultatsOpthyca.opt", 'w'):
self.logger.debug('Cleaning results to launch a new run.')