# coding: utf8
"""Misc functions.
Implements functions:
- :func:`misc.clean_path`,
- :func:`misc.check_yes_no`,
- :func:`misc.ask_path`,
- :func:`misc.abs_path`,
- :func:`misc.import_config`,
- :class:`misc.ProgressBar`
"""
import os
import sys
import logging
import re
import json
import time
import jsonschema
import numpy as np
from pathos.multiprocessing import cpu_count
from scipy.optimize import differential_evolution
from .nested_pool import NestedPool
[docs]def clean_path(path):
"""Return an absolute and normalized path."""
return os.path.abspath(os.path.normpath(path))
[docs]def check_yes_no(prompt, default):
"""Ask user for delete confirmation.
:param str prompt: yes-no question
:param str default: default value
:returns: true if yes
:rtype: boolean
"""
logger = logging.getLogger('User checking')
while True:
try:
try:
value = raw_input(prompt) # safe python 2
except NameError:
value = input(prompt)
except ValueError:
logger.error("Sorry, I didn't understand that.")
continue
value = value.lower()
if not all(x in 'yesno ' for x in value.lower()):
logger.error('Sorry, your response must be yes, or no.')
continue
elif value == '':
value = default
break
else:
break
answer = True if value.strip()[0] == 'y' else False
return answer
[docs]def ask_path(prompt, default, root):
"""Ask user for a folder path.
:param str prompt: Ask.
:param str default: default value.
:param str root: root path.
:returns: path if folder exists.
:rtype: str.
"""
logger = logging.getLogger('User checking')
while True:
try:
try:
path = raw_input(prompt)
except NameError:
path = input(prompt)
except ValueError:
logger.error("Sorry, I didn't understand that.")
continue
if path == '':
path = default
if not os.path.isdir(os.path.join(root, path)):
logger.error("Output folder not found: {}".format(path))
continue
else:
break
return path
[docs]def abs_path(value):
"""Get absolute path."""
return os.path.abspath(value)
[docs]def import_config(path_config, path_schema):
"""Import a configuration file."""
logger = logging.getLogger('Settings Validation')
def minify_comments(file, **kwargs):
"""Minify comments in JSON file.
Deserialize `file` to a Python object using
`commentjson <https://pypi.python.org/pypi/commentjson>`_ package.
:param file: serialized JSON string with or without comments.
:param kwargs: all the arguments that `json.loads <http://docs.python.org/
2/library/json.html#json.loads>`_ accepts.
:raises: Parsing Exception from ``json.loads``.
:returns: dict or list.
"""
file = file.read().decode('utf8')
regex = r'\s*(#|\/{2}).*$'
regex_inline = r'(:?(?:\s)*([A-Za-z\d\.{}]*)|((?<=\").*\"),?)(?:\s)*(((#|(\/{2})).*)|)$'
lines = file.split('\n')
for index, line in enumerate(lines):
if re.search(regex, line):
if re.search(r'^' + regex, line, re.IGNORECASE):
lines[index] = ""
elif re.search(regex_inline, line):
lines[index] = re.sub(regex_inline, r'\1', line)
try:
return json.loads('\n'.join(lines),
encoding="utf-8", **kwargs)
except Exception as tb:
logger.exception("JSON error, cannot load configuration file: {}"
.format(tb))
raise SyntaxError
with open(path_config, 'rb') as file:
settings = minify_comments(file)
with open(path_schema, 'rb') as file:
schema = json.loads(file.read().decode('utf8'))
error = False
try:
validator = jsonschema.Draft4Validator(schema)
for error in sorted(validator.iter_errors(settings), key=str):
logger.error("Error: {}\n-> Origin: {}\n-> Schema: {}"
.format(error.message, error.path,
json.dumps(error.schema, indent=1)))
error = True
except jsonschema.ValidationError as e:
logger.exception(e.message)
if not error:
logger.info('Settings successfully imported and checked')
else:
logger.error('Error were found in configuration file: JSON syntax...')
raise SyntaxError
return settings
[docs]class ProgressBar(object):
"""Print progress bar in console."""
[docs] def __init__(self, total):
"""Create a bar.
:param int total: number of iterations
"""
self.total = total
self.calls = 1
self.progress = 0.
sys.stdout.write("Progress | " +
" " * 50 +
" |" + "0.0% ")
self.init_time = time.time()
[docs] def __call__(self):
"""Update bar."""
self.progress = (self.calls) / float(self.total) * 100
eta, vel = self.compute_eta()
self.show_progress(eta, vel)
self.calls += 1
[docs] def compute_eta(self):
"""Compute ETA.
Compare current time with init_time.
:return: eta, vel
:rtype: str
"""
end_time = time.time()
iter_time = (end_time - self.init_time) / self.calls
eta = (self.total - self.calls) * iter_time
eta = time.strftime("%H:%M:%S", time.gmtime(eta))
vel = str(1. / iter_time)
return eta, vel
[docs] def show_progress(self, eta=None, vel=None):
"""Print bar and ETA if relevant.
:param str eta: ETA in H:M:S
:param str vel: iteration/second
"""
p_bar = int(np.floor(self.progress / 2))
sys.stdout.write("\rProgress | " +
"-" * (p_bar - 1) + "~" +
" " * (50 - p_bar - 1) +
" |" + str(self.progress) + "% ")
if self.progress == 100:
sys.stdout.write('\n')
del self
elif eta and vel:
sys.stdout.write("| ETA: " + eta + " (at " + vel + " it/s) ")
sys.stdout.flush()
[docs]def cpu_system():
"""Number of CPU of system."""
try:
n_cpu_system = cpu_count()
except NotImplementedError:
n_cpu_system = os.sysconf('SC_NPROCESSORS_ONLN')
return 1 if n_cpu_system == 0 or n_cpu_system is None else n_cpu_system
[docs]def optimization(bounds, discrete=None):
"""Perform a discret or a continuous/discrete optimization.
If a variable is discrete, the decorator allows to find the optimum by
doing an optimization per discrete value and then returns the optimum.
:param array_like bounds: bounds for optimization ([min, max], n_features).
:param int discrete: index of the discrete variable.
"""
def optimize(fun):
"""Compute several optimizations."""
def combinatory_optimization(i, bounds=bounds, discrete=discrete):
"""One optimization.
:param int i: index of the optimization plan.
:param array_like bounds: bounds for optimization ([min, max], n_features).
:param int discrete: index of the discrete variable.
:returns: min_x, min_fun.
:rtype: floats.
"""
bounds[discrete] = [i, i]
with np.errstate(divide='ignore', invalid='ignore'):
results = differential_evolution(fun, bounds)
min_x = results.x
min_fun = results.fun
return min_x, min_fun
def wrapper_fun_obj():
"""Two behaviour depending on discrete."""
if discrete is not None:
start = int(np.ceil(bounds[discrete, 0]))
end = int(np.ceil(bounds[discrete, 1]))
discrete_range = range(start, end) if end > start else [start]
pool = NestedPool(cpu_system())
results = pool.imap(combinatory_optimization, discrete_range)
# Gather results
results = list(results)
pool.terminate()
min_x, min_fun = zip(*results)
# Find best results
min_idx = np.argmin(min_fun)
min_fun = min_fun[min_idx]
min_x = min_x[min_idx]
else:
results = differential_evolution(fun, bounds)
min_x = results.x
min_fun = results.fun
return min_x, min_fun
return wrapper_fun_obj
return optimize