"""This module is for sending scripts through Adams/PPT
"""
import os
import subprocess
import re
import time
import platform
import jinja2
from numpy import genfromtxt
from thornpy.signal import manually_clean_sig, remove_data_point, manually_clean_sigs, low_pass
from thornpy.signal import _clean_sig as clean_sig
from ..adamspy import get_log_errors, LOG_COMPLETE_PATTERN
LUNAR_SCRIPT_NAME = 'get_lunar_results.py'
GET_RESULTS_SCRIPT_NAME = 'get_results.py'
EDIT_RESULTS_SCRIPT_NAME = 'edit_results.py'
TEMP_OUTPUT_FILENAME = 'results.tmp'
LOG_NAME = 'aview.log'
TMPLT_ENV = jinja2.Environment(
loader=jinja2.PackageLoader('adamspy.postprocess', 'aview_scripts'),
autoescape=False,
keep_trailing_newline=True,
trim_blocks=True,
lstrip_blocks=True
)
_TIMEOUT = 300
[docs]def get_results(res_file, reqs_to_get, t_min=None, t_max=None, _just_write_script=False, timeout=_TIMEOUT):
"""Gets results from an Adams results (.res) file.
Example
-------
>>> result_file = 'example.res'
>>> t_min = 70
>>> t_max = 80
>>> reqs_to_get = {}
>>> reqs_to_get['MSE'] = ['Instantaneous_Bottom_MSE', 'Filtered_Surface_MSE']
>>> reqs_to_get['ROP_controls'] = ['Command_ROP', 'True_WOB']
>>> requests, units = get_results(result_file, reqs_to_get, t_min, t_max)
Note
----
This funciton only works with Requests. It does not work with Result Sets.
Note
----
This function only works with xml results files.
Parameters
----------
result_file : str
Filename of an Adams results (.res) file
reqs_to_get : dict
Dictionary of requests to extract (the default is None, which gets all results)
t_min : float, optional
Minimum time for which to extract results (the default is None)
t_max : float, optional
Maximum time for which to extract results (the default is None)
Returns
-------
dict
Dictionary of request data
"""
template = TMPLT_ENV.from_string(open(os.path.join(os.path.dirname(__file__), 'aview_scripts', GET_RESULTS_SCRIPT_NAME)).read())
working_directory = os.path.dirname(res_file)
script_filename = _get_unique_filename(GET_RESULTS_SCRIPT_NAME)
output_filename = _get_unique_filename(TEMP_OUTPUT_FILENAME)
with open(os.path.join(working_directory, script_filename), 'w') as fid:
fid.write(template.render({'res_file': os.path.split(res_file)[-1], 'reqs_to_get': reqs_to_get, 't_min': t_min, 't_max': t_max, 'output_file': output_filename}))
if _just_write_script is False:
# Delete the aview.log file
try:
os.remove(os.path.join(working_directory, LOG_NAME))
except FileNotFoundError:
pass
except PermissionError:
pass
# Run the postprocessor
if platform.system() == 'Windows':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
subprocess.Popen('"{}" aview ru-s b {}'.format(os.environ['ADAMS_LAUNCH_COMMAND'], script_filename), cwd=working_directory, startupinfo=startupinfo)
else:
subprocess.Popen([os.environ['ADAMS_LAUNCH_COMMAND'], '-c', 'aview', 'ru-standard', 'b', script_filename, 'exit'], cwd=working_directory)
# Wait for complete
_wait(os.path.join(working_directory, LOG_NAME), timeout=timeout)
# Check the log file for errors
get_log_errors(os.path.join(working_directory, LOG_NAME))
# Read and return the results
data = genfromtxt(os.path.join(working_directory, output_filename), delimiter=',', names=True, dtype=None)
os.remove(os.path.join(working_directory, script_filename))
os.remove(os.path.join(working_directory, output_filename))
output_dict = {'time': list(data['time'])}
for res in reqs_to_get:
output_dict[res] = {}
for comp in reqs_to_get[res]:
output_dict[res][comp] = list(data[f'{res}_{comp}'])
return output_dict
[docs]def write_results(res_file, input_dict):
# TODO
return
[docs]def edit_results(res_file, input_dict, new_res_file=None, _just_write_script=False, timeout=_TIMEOUT):
template = TMPLT_ENV.from_string(open(os.path.join(os.path.dirname(__file__), 'aview_scripts', EDIT_RESULTS_SCRIPT_NAME)).read())
working_directory = os.path.dirname(res_file)
new_res_file = os.path.split(res_file)[-1] if new_res_file is None else os.path.split(new_res_file)[-1]
script_name = _get_unique_filename(EDIT_RESULTS_SCRIPT_NAME)
with open(os.path.join(working_directory, script_name), 'w') as fid:
fid.write(template.render({'res_file': os.path.split(res_file)[-1], 'reqs_to_edit': input_dict, 'output_file': new_res_file}))
if _just_write_script is False:
# Delete the aview.log file
try:
os.remove(os.path.join(working_directory, LOG_NAME))
except FileNotFoundError:
pass
except PermissionError:
pass
if platform.system() == 'Windows':
# Run the postprocessor
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
subprocess.Popen('"{}" aview ru-s b {}'.format(os.environ['ADAMS_LAUNCH_COMMAND'], script_name), cwd=working_directory, startupinfo=startupinfo)
else:
subprocess.Popen([os.environ['ADAMS_LAUNCH_COMMAND'], '-c', 'aview', 'ru-s', 'b', script_name, 'exit'], cwd=working_directory)
# Wait for complete
_wait(os.path.join(working_directory, LOG_NAME), timeout=timeout)
# Check the log file for errors
get_log_errors(os.path.join(working_directory, LOG_NAME))
[docs]def get_lunar_results(res_files, reqs_to_get, t_min, t_max, output_file, _just_write_script=False, timeout=_TIMEOUT):
template = TMPLT_ENV.from_string(open(os.path.join(os.path.dirname(__file__), 'aview_scripts', LUNAR_SCRIPT_NAME)).read())
working_directory = os.path.dirname(res_files[0])
script_name = _get_unique_filename(LUNAR_SCRIPT_NAME)
with open(os.path.join(working_directory, script_name), 'w') as fid:
fid.write(template.render({'res_files': res_files, 'reqs_to_get': reqs_to_get, 't_min': t_min, 't_max': t_max, 'output_suffix': output_file}))
if _just_write_script is False:
# Delete the aview.log file
try:
os.remove(os.path.join(working_directory, LOG_NAME))
except FileNotFoundError:
pass
# Run the postprocessor
if platform.system() == 'Windows':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
subprocess.Popen('"{}" aview ru-s b {}'.format(os.environ['ADAMS_LAUNCH_COMMAND'], script_name), cwd=working_directory, startupinfo=startupinfo)
else:
subprocess.Popen([os.environ['ADAMS_LAUNCH_COMMAND'], 'aview', 'ru-s', 'b', script_name], cwd=working_directory)
# Wait for complete
_wait(os.path.join(working_directory, LOG_NAME), timeout=timeout)
os.remove(os.path.join(working_directory, script_name))
# Make a list of the files that are written
res_output_files = [os.path.splitext(output_file)[0] + '_time' + os.path.splitext(output_file)[-1]]
for res_name, res_comps in reqs_to_get.items():
for res_comp in res_comps:
full_res_name = f'{res_name}_{res_comp}'
res_output_files.append(os.path.splitext(output_file)[0] + '_' + full_res_name + os.path.splitext(output_file)[-1])
return res_output_files
def _wait(log_file, sleep_time=0.2, timeout=300):
"""Waits for the log file to write the last line of the macro
Parameters
----------
log_file : str
filename of log file
sleep_time : float, optional
Time between checks, by default 0.2
timeout : int, optional
During after which to time out, by default 300
"""
for _i in range(int(timeout/sleep_time)):
ppt_log_file_exists = os.path.exists(log_file)
if ppt_log_file_exists is True:
# If ppt.log exists, open it and see if the results have been loaded
with open(log_file, 'r') as fid:
text = fid.read()
if re.search(LOG_COMPLETE_PATTERN, text):
break
time.sleep(sleep_time)
[docs]def manually_remove_spikes(res_file, reqs_to_clean, reqs_to_check=None, t_min=None, t_max=None, _just_write_script=False, timeout=_TIMEOUT, _inplace=False):
"""Allows the user to manually scan through the result sets to pick out points to eliminate.
Parameters
----------
res_file : str
Adams Results (.res) filename
reqs_to_clean : dict, optional
Nested dictionary of result sets and result components to clean
reqs_to_check : dict
Nested dictionary of result sets and result components to check for spikes, by default same as reqs_to_clean
t_min : float, optional
Minumum simulation time to clean, by default None
t_max : float, optional
Maximum simulation time to clean, by default None
timeout : float, optional
Number of seconds to wait for results to load before timing out, by default _TIMEOUT
Returns
-------
dict
Nested dictionary of cleaned results
"""
if reqs_to_check is None:
reqs_to_check = reqs_to_clean
results = get_results(res_file, reqs_to_clean, t_min=t_min, t_max=t_max, _just_write_script=_just_write_script, timeout=timeout)
time_sig = results['time']
# Remove the spikes
for (res, res_comps) in [(r, rc) for r, rc in results.items() if r in reqs_to_check]:
for (res_comp, values) in [(rc, v) for rc, v in res_comps.items() if rc in reqs_to_check[res]]:
results[res][res_comp], i_mod = manually_clean_sig(time_sig, values, indices=True)
# If a modification was made to the signal, make that modification to the rest of the signals
if i_mod != []:
# Loop over all the other results
for (other_res, other_res_comps) in [(r, rc) for r, rc in results.items() if r != 'time']:
for (other_res_comp, other_values) in [(rc, v) for rc, v in other_res_comps.items() if not (other_res == res and rc == res_comp)]: #pylint: disable=no-member
for i in i_mod:
results[other_res][other_res_comp] = remove_data_point(time_sig, other_values, i)
# Update the analysis files
edit_results(res_file, results)
# Return the cleaned results
return results
[docs]def filter_results(res_file, reqs_to_clean, freq_cutoff, N_filter=5, reqs_to_check=None, t_min=None, t_max=None, _just_write_script=False, timeout=_TIMEOUT, _inplace=False, return_raw=False):
"""Similar to `manually_remove_spikes`, but allows user to plot the signals in batches. Instead of passing a dictionary for the `reqs_to_check` argument, pass a list of dictionaries and the results in each dictionary in the list will be plotted together.
Parameters
----------
res_file : str
Adams Results (.res) filename
reqs_to_clean : dict
Nested dictionary of result sets and result components to clean
freq_cutoff : float
Cutoff freqency of filter in Hz
N_filter : int
Order of filter
reqs_to_check : list of dicts
list of nested dictionary of result sets and result components to check for spikes, by default same as reqs_to_clean
t_min : float, optional
Minumum simulation time to clean, by default None
t_max : float, optional
Maximum simulation time to clean, by default None
timeout : float, optional
Number of seconds to wait for results to load before timing out, by default _TIMEOUT
Returns
-------
dict
Nested dictionary of cleaned results
"""
if reqs_to_check is None:
reqs_to_check = [reqs_to_clean]
results = get_results(res_file, reqs_to_clean, t_min=t_min, t_max=t_max, _just_write_script=_just_write_script, timeout=timeout)
time_sig = results.pop('time')
filtered_results = {}
for res_name, res_comps in results.items():
filtered_results[res_name] = {}
for res_comp, values in res_comps.items(): # pylint: disable=no-member
cleaned_sig, _, _ = clean_sig(values, 3)
filtered_results[res_name][res_comp], _ = low_pass(cleaned_sig, time_sig, freq_cutoff, N=N_filter)
# Return the cleaned results
if return_raw is True:
return {'time': time_sig, **filtered_results}, {'time': time_sig, **results}
else:
return {'time': time_sig, **filtered_results}
[docs]def manually_remove_spikes_batch(res_file, reqs_to_clean, reqs_to_check=None, t_min=None, t_max=None, _just_write_script=False, timeout=_TIMEOUT, _inplace=False):
"""Similar to `manually_remove_spikes`, but allows user to plot the signals in batches. Instead of passing a dictionary for the `reqs_to_check` argument, pass a list of dictionaries and the results in each dictionary in the list will be plotted together.
Parameters
----------
res_file : str
Adams Results (.res) filename
reqs_to_clean : dict
Nested dictionary of result sets and result components to clean
reqs_to_check : list of dicts
list of nested dictionary of result sets and result components to check for spikes, by default same as reqs_to_clean
t_min : float, optional
Minumum simulation time to clean, by default None
t_max : float, optional
Maximum simulation time to clean, by default None
timeout : float, optional
Number of seconds to wait for results to load before timing out, by default _TIMEOUT
Returns
-------
dict
Nested dictionary of cleaned results
"""
if reqs_to_check is None:
reqs_to_check = [reqs_to_clean]
results = get_results(res_file, reqs_to_clean, t_min=t_min, t_max=t_max, _just_write_script=_just_write_script, timeout=timeout)
time_sig = results['time']
# Create a flag indicating if the results have been modified and need to be rewritten
results_modified = False
for batch_to_check in reqs_to_check:
# Make a list/batch of values to clean
values_to_check = []
for (res, res_comps) in [(r, rc) for r, rc in results.items() if r in batch_to_check]:
for (res_comp, values) in [(rc, v) for rc, v in res_comps.items() if rc in batch_to_check[res]]:
values_to_check.append(values)
_, i_mod = manually_clean_sigs(time_sig, values_to_check, indices=True)
# If a modification was made to the signal, make that modification to the rest of the signals
if i_mod != []:
# Loop over all the results
for (res, res_comps) in [(r, rc) for r, rc in results.items() if r != 'time']:
for (res_comp, values) in res_comps.items():
for i in i_mod:
results[res][res_comp] = remove_data_point(time_sig, values, i)
# Flag that the results have been modified
results_modified = True
# If the results were modified, update the analysis files
if results_modified is True:
edit_results(res_file, results)
# Return the cleaned results
return results
def _get_unique_filename(filename):
if os.path.exists(filename):
for i in range(9999):
new_name, ext = os.path.splitext(filename)
new_name = new_name + f'_{i+1}'
if not os.path.exists(new_name + ext):
filename = new_name + ext
break
return filename