"""Module for the DrillSim class
"""
import os
import subprocess
import glob
import platform
from thornpy.signal import step_function, low_pass
from numpy import linspace, argmax, array
from ..postprocess.xml import get_results, shrink_results
from .utilities import build, add_splines_to_acf, add_splines_to_adm
from ..postprocess import launch_ppt
from .string import DrillString
from .event import DrillEvent
from .solver_settings import DrillSolverSettings
from ..adamspy import get_simdur_from_acf, get_simdur_from_msg
from . import modify_acf_solver_settings
[docs]class DrillSim(): #pylint: disable=too-many-instance-attributes
"""Contains data defining the files that make up an Adams Drill input deck.
Parameters
----------
string : DrillString
:class:`DrillString` object representing the string to be used in the simulation
event : DrillEvent
:class:`DrillEvent` object representing the string to be used in the simulation
solver_settings : DrillSolverSettings
:class:`DrillSolverSettings` object representing the string to be used in the simulation
directory : str
Path to the directory in which to put the simulation files
analysis_name : str
Name of the analysis. Used for all file prefixes
Attributes
----------
string : DrillString
:class:`DrillString` object representing the string to be used in the simulation
event : DrillEvent
:class:`DrillEvent` object representing the string to be used in the simulation
solver_settings : DrillSolverSettings
:class:`DrillSolverSettings` object representing the string to be used in the simulation
directory : str
Path to the directory in which to put the simulation files
analysis_name : str
Name of the analysis. Used for all file prefixes
string_filename : str
Filename of the analysis' string file relative to :attr:`directory`
adm_filename : str
Filename of the analysis' adm file relative to :attr:`directory`
acf_filename : str
Filename of the analysis' acf file relative to :attr:`directory`
cmd_filename : str
Filename of the analysis' cmd file relative to :attr:`directory`
res_filename : str
Filename of the analysis' res file relative to :attr:`directory`
results : dict
Simulation results
results_units : dict
Units of simulation results
pason_inputs : dict
Contains the cleaned pason signals that will be used as inputs to the Adams model. Keys are 'time', 'rop', 'wob', 'rpm', and 'gpm'. Each value is a :obj:`list` with two entries. The first is the signal and the second is the associated time.
built : bool
Indicates whether the input deck (adm, acf, and cmd files) has been built yet for this DrillSim
solved : bool
Indicates whether the :class:`DrillSim` has been solved and results file (.res) has been generated.
RAMP_TIME : dict
Class attribute containing standard ramp times for rpm, gpm, wob, and rop
CUTOFF_FREQ : dict
Class attribute containing default cutoff frequencies for smoothing pason inputs.
Example
-------
>>> my_drillsim = adripy.DrillSim(my_string, my_event, my_solversettings, os.getcwd(), 'MyAnalysis')
"""
RAMP_TIME = {
'rpm': 10,
'gpm': 20
}
CUTOFF_FREQ = {
'wob': .1,
'rop': .1
}
def __init__(self, string, event, solver_settings, directory, analysis_name, write_TO_files=True): #pylint: disable=too-many-arguments
"""Sets instance attributes and writes the string, event, and solver settings files to `directory`.
"""
self.string = string
self.event = event
self.solver_settings = solver_settings
self.directory = directory
self.analysis_name = analysis_name
self.string_filename = ''
self.adm_filename = ''
self.acf_filename = ''
self.cmd_filename = ''
self.res_filename = ''
self.msg_filename = ''
self.results = None
self.results_units = None
# If `directory` does not exist, create it
if not os.path.exists(self.directory):
os.makedirs(self.directory)
# Write the TO files to the `directory`
if write_TO_files:
self.write_tiem_orbit_files()
self.run_proc = None
self.pason_inputs = {}
# Flags
self.built = False
self.solved = False
[docs] @classmethod
def read_from_directory(cls, directory):
"""Returns a :class:`DrillSim` object from an existing directory that was previously created by a :class:`DrillSim` object.
Note
----
This method does not read the `pason_inputs` attribute
Parameters
----------
directory : str
Path to a directory previously created by a :class:`DrillSim` object.
Returns
-------
DrillSim
:class:`DrillSim` object created from an existing directory that was previously created by a :class:`DrillSim` object.
"""
# If `directory` is not a directory, raise an error
if not os.path.isdir(directory):
raise OSError(f'No directory found with name {directory}!')
# Get string, event, ssf filenames
string_filename = glob.glob(os.path.join(directory, '*.' + DrillString._EXT))[0]
event_filename = glob.glob(os.path.join(directory, '*.' + DrillEvent._EXT))[0]
solver_settings_filename = glob.glob(os.path.join(directory, '*.' + DrillSolverSettings._EXT))[0]
# Create string, event, ssf objects
string = DrillString.read_from_file(string_filename)
event = DrillEvent.read_from_file(event_filename)
solver_settings = DrillSolverSettings.read_from_file(solver_settings_filename)
# Get analysis name
analysis_name = string.parameters['ModelName']
# Create the DrillSim object
drill_sim = cls(string, event, solver_settings, directory, analysis_name, write_TO_files=False)
# Define file names
drill_sim.string_filename = os.path.relpath(string_filename, directory)
adm_files = glob.glob(os.path.join(directory, '*.adm'))
drill_sim.adm_filename = os.path.relpath(adm_files[0], directory) if adm_files else ''
acf_files = glob.glob(os.path.join(directory, '*.acf'))
drill_sim.acf_filename = os.path.relpath(acf_files[0], directory) if acf_files else ''
cmd_files = glob.glob(os.path.join(directory, '*.cmd'))
drill_sim.cmd_filename = os.path.relpath(cmd_files[0], directory) if cmd_files else ''
res_files = glob.glob(os.path.join(directory, '*.res'))
drill_sim.res_filename = os.path.relpath(res_files[0], directory) if res_files else ''
msg_files = glob.glob(os.path.join(directory, '*.msg'))
drill_sim.msg_filename = os.path.relpath(msg_files[0], directory) if msg_files else ''
# Set the `built` flag
if drill_sim.acf_filename and drill_sim.adm_filename:
drill_sim.built = True
# set the `solved` flag
if drill_sim.res_filename:
drill_sim.solved = True
return drill_sim
[docs] def build(self, wait=True):
"""This method builds the input deck. It launches Adams View in batch, reads in the `self.string.filename` and `self.solver_settings.filename` files, and runs the Adams Drill macro ``ds tostart`` to build a drill string model. Then it saves the model files (.acf, .adm, and .cmd) to the `self.directory` directory.
Parameters
----------
wait : bool
If True, code execution waits until the build process terminates before moving on. (default is True)
"""
# Build the model
adm, acf, cmd = build(os.path.join(self.directory, self.string_filename), self.solver_settings.filename, self.directory, wait=wait)
# store the new filenames as attributes
self.adm_filename = os.path.relpath(adm, self.directory)
self.acf_filename = os.path.relpath(acf, self.directory)
self.cmd_filename = os.path.relpath(cmd, self.directory)
if self.pason_inputs:
self._add_adm_splines()
self._add_acf_splines()
# Flag this simulation as built
self.built = True
[docs] def run(self, wait=True): #pylint: disable=no-self-use
"""Run the simulation
Parameters
----------
wait : bool
If True the application will wait for the process to complete before continuing.
"""
if platform.system() == 'Windows':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
self.run_proc = subprocess.Popen('"{}" ru-s "{}"'.format(os.environ['ADAMS_LAUNCH_COMMAND'], self.acf_filename), cwd=self.directory, startupinfo=startupinfo)
else:
self.run_proc = subprocess.Popen([os.environ['ADAMS_LAUNCH_COMMAND'], '-c', 'ru-standard', 'i', self.acf_filename, 'exit'], cwd=self.directory)
self.msg_filename = f'{self.analysis_name}.msg'
self.res_filename = f'{self.analysis_name}.res'
# Flag this simulation as solved
self.solved = True
# Wait for the process to complete before moving on.
if wait:
self.run_proc.wait()
[docs] def write_tiem_orbit_files(self):
"""Writes the solver settings and event files and publishes the string file to the simulation directory.
Note
----
When the string file gets published, all the supporting tool files and the hole file are copied to the simulation directory.
"""
# solver settings file
self.solver_settings.write_to_file(self.analysis_name, directory=self.directory)
# event file
self.event.parameters['Event_Name'] = self.analysis_name
self.event.write_to_file(directory=self.directory)
# string file
self.string.parameters['Event_Property_File'] = os.path.split(self.event.filename)[1]
self.string.parameters['ModelName'] = self.analysis_name
self.string.parameters['OutputName'] = self.analysis_name
abs_str_file = self.string.write_to_file(directory=self.directory, publish=True)
self.string_filename = os.path.relpath(abs_str_file, self.directory)
[docs] def read_results(self, reqs_to_read=None, t_min=None, t_max=None, shrink_results_file=False, overwrite_pickle=False, use_iterparse=False):
"""Reads results from the results file.
Example
-------
>>> drillsim.run()
>>> t_min = 70
>>> t_max = 71
>>> reqs_to_read = {}
>>> reqs_to_read['MSE'] = ['Instantaneous_Bottom_MSE', 'Filtered_Surface_MSE']
>>> reqs_to_read['ROP_controls'] = ['Command_ROP', 'True_WOB']
>>> results, units = drillsim.read_results(reqs_to_read, t_min, t_max)
>>> results['MSE']['Instantaneous_Bottom_MSE']
[5000.5621, 5000.8913, ]
Parameters
----------
reqs_to_read : dict, optional
Dictionary of results data (the default is None, which gets all the results)
t_min : float, optional
Mininmum time for which to get results (the default is None, which gets results starting at the first time step)
t_max : float, optional
Maximum time for which to get results (the default is None, which gets results up to the last time step)
shrink_results_file : bool, optional
If True, the results file will be rewritten to include only the requests in `reqs_to_read` and the time period between `t_max` and `t_min`.
"""
self.results, self.results_units = get_results(os.path.join(self.directory, self.res_filename), reqs_to_read, t_min, t_max, return_units=True, overwrite_pickle=overwrite_pickle, use_iterparse=use_iterparse)
if shrink_results_file and any([reqs_to_read is not None, t_min is not None, t_max is not None]):
shrink_results(os.path.join(self.directory, self.res_filename), reqs_to_keep=reqs_to_read, t_min=t_min, t_max=t_max, in_place=True)
[docs] def launch_ppt(self, wait=False):
"""Launches the postprocessor and loads :attr:`res_filename`
Parameters
----------
wait : bool, optional
If `True`, code execution will freeze until the postprocessor is closed. (the default is False)
Raises
------
FileNotFoundError
Raised if no results file exists in :attr:`directory`.
"""
# Check for results file
if not self.res_filename:
res_files = glob.glob(os.path.join(self.directory, '*.res'))
if not res_files:
raise FileNotFoundError('No results file found! You must run the simultion using DrillSim.solve before you can launch the postprocessor to view the results.')
self.res_filename = os.path.relpath(res_files[0], self.directory)
# Launch the postprocessor
launch_ppt(os.path.join(self.directory, self.res_filename), wait=wait)
[docs] @classmethod
def clean_pason(cls, pason_data, sig_type, t_min=None, t_max=None, show_plot=True, ramp_time=None, cutoff_freq=None):
"""Returns a cleaned up pason signal to be used as input to an Adams Drill model.
Parameters
----------
pason_data : PasonData
Pason signal to be used as input.
sig_type : str
Type of signal. Options are 'wob', 'rpm', 'gpm', or 'rop'.
t_min : float
Minimum time to crop to the dataset to. (default is None which uses the beginning of the dataset)
t_max : float
Maximum time to crop the dataset to. (default is None which uses the end of the dataset)
show_plot : bool, optional
If True, shows a plot of the resulting setpoints (the default is True)
Returns
-------
list
Cleaned signal
list
Associated time signal
"""
def crop_signal(t_min, t_max, signal, time):
if t_min is None:
t_min = time[0]
if t_max is None:
t_max = time[-1]
i_min = argmax(array(time)>=t_min)
i_max = argmax(array(time)>=t_max)
signal = signal[i_min:i_max+1]
time = time[i_min:i_max+1] - time[i_min]
return signal, time
if sig_type in ['rpm', 'gpm']:
# For rpm and gpm get a stepped signal
if ramp_time is None:
ramp_time = cls.RAMP_TIME[sig_type]
signal, time = cls.get_stepped_signal(pason_data, ramp_time, sig_type, show_plot=False)
elif sig_type in ['wob', 'rop']:
# For wob and rop get a filterd signal
signal, time = cls.get_filtered_signal(pason_data, sig_type, cutoff_freq=cutoff_freq)
else:
raise ValueError('Value given for sig_type must be rpm, gpm, rop, or wob.')
# Crop the signals if t_min or t_max are given
if t_min or t_max:
signal, time = crop_signal(t_min, t_max, signal, time)
# If show_plots is true
if show_plot:
pason_data.plt.Figure()
pason_data.plt.plot(pason_data.time, pason_data.data[sig_type], linewidth=2)
pason_data.plt.ylabel(sig_type)
pason_data.plt.plot(time, signal, linewidth=1)
pason_data.plt.xlabel('Time (sec)')
pason_data.plt.title(sig_type.upper())
pason_data.plt.show()
return list(signal), list(time)
[docs] @classmethod
def get_filtered_signal(cls, pason_data, sig_type, cutoff_freq=None):
"""Gets a filtered signal from the wob or rop data of a :class:`PasonData` object.
This method uses a digital zero-phase 5th order butterworth filter with a
Parameters
----------
pason_data : PasonData
A :class:`PasonData` object
sig_type : str
Type of signal. Options are 'wob' or 'rop'
cutoff_freq : float, optional
Cutoff frequency to be used in the lowpass filter. (the default is None, which uses value from PasonData.CUTOFF_FREQ)
Raises
------
ValueError
Raised if `sig_type` isn't 'wob' or 'rop'
Returns
-------
list
Filtered signal
list
Time associated with filterd signal
"""
# Check that the correct sig_type was passed.
if sig_type not in ['wob', 'rop']:
raise ValueError('sig_type must be wob or rop')
# Set the cutoff frequency if it wasn't given
if cutoff_freq is None:
cutoff_freq = cls.CUTOFF_FREQ[sig_type]
signal, time = low_pass(pason_data.data[sig_type], pason_data.time, cutoff_freq)
return signal, time
[docs] @classmethod
def get_stepped_signal(cls, pason_data, ramp_time, sig_type, show_plot=True):
"""Gets the stepped signal from the rpm or gpm setpoints of a :class:`PasonData` object.
Parameters
----------
pason_data : PasonData
A :class:`PasonData` object.
ramp_time : float
Time it takes to ramp the signal
sig_type : str
Type of signal options are 'rpm' or 'gpm'
show_plot : bool
If True, shows a plot of the resulting setpoints (the default is True)
Returns
-------
list
Stepped signal
list
Associated time signal
"""
def make_index(start, stop, step, endpoint=False):
num = (stop-start)/step
if not num.is_integer():
raise ValueError('(stop-start)/step must be a whole number.')
num = int(num)
index = list(linspace(start, stop, num if endpoint is False else num+1, endpoint=endpoint, dtype=float))
return index
# Check that the correct sig_type was passed.
if sig_type not in ['rpm', 'gpm']:
raise ValueError('sig_type must be rpm or gpm')
# Get the sample period
samp_per = round(pason_data.time[1]-pason_data.time[0], 3)
# Set the ramp time if it is given
# Determine the half ramp time rounded to the sample period
half_ramp_time = round(ramp_time/samp_per/2)*samp_per
# Get the setpoints from the pason data
set_points = pason_data.get_setpoints(sig_type, show_plot=show_plot)
# Initialize time and signal lists with the segment from the begining of the dataset to the beginning of the first ramp
time = make_index(set_points[0][0], set_points[0][0]+half_ramp_time, samp_per)
signal = [set_points[0][1] for t in time]
# Add intermediate time segments
for (sp_time, sp_val), (nxt_sp_time, nxt_sp_val) in zip(set_points[:-1], set_points[1:]):
time_seg = make_index(sp_time + half_ramp_time, nxt_sp_time + half_ramp_time, samp_per)
signal.extend(step_function(time_seg, nxt_sp_time-half_ramp_time, sp_val, nxt_sp_time+half_ramp_time, nxt_sp_val))
time.extend(time_seg)
# Add the segment from the end of the last ramp to the end of the dataset
time_seg = make_index(set_points[-1][0]+half_ramp_time, pason_data.time[-1], samp_per, endpoint=True)
signal.extend([set_points[-1][1] for t in time_seg])
time.extend(time_seg)
return signal, time
[docs] def get_duration(self, use_acf=False):
"""Returns the duration of the :class:`DrillSim` simulation based on contents of the acf file or the msg file.
Parameters
----------
use_acf : bool (optional)
Set this to `True` if you want to use the acf file even if the :class:`DrillSim` has been solved. (the default is `False`)
Returns
-------
float
Duration of the simulation
"""
if use_acf is True:
duration = get_simdur_from_acf(os.path.join(self.directory, self.acf_filename)) if self.built is True else None
else:
duration = get_simdur_from_msg(os.path.join(self.directory, self.msg_filename)) if self.solved is True else 0
return duration
[docs] def modify_solver_settings(self, new_solver_settings):
"""Modifies the contents of the Adams Command (.acf) file given in `acf_file` to apply the solver settings specified in the Solver Settings (.ssf) file given in `ssf_file`.
Parameters
----------
solver_settings : DrillSolverSettings
Solver settings to use in updating the Adams Command file.
Note
----
If the DrillSim.built is True, Only the static equilibrium funnel and the integrator error are modified
"""
# Remove the current solver settings file
os.remove(self.solver_settings.filename)
# Check if this drill sim has already been built
if self.built:
# If already built, update the funnel
for i, (maxit, stab, error, imbal, tlim, alim) in enumerate(new_solver_settings.parameters['_Funnel']):
self.solver_settings.add_funnel_step(maxit, stab, error, imbal, tlim, alim, clear_existing=True if i==0 else False)
# Update the integrator error
self.solver_settings.parameters['Error'] = new_solver_settings.parameters['Error']
# Modify the acf file
modify_acf_solver_settings(os.path.join(self.directory, self.acf_filename), new_solver_settings.parameters['Funnel'], new_solver_settings.parameters['Error'])
else:
# If not already built, simply replace self.sover_settings
self.solver_settings = new_solver_settings
# Write the modified solver settings
self.solver_settings.write_to_file(self.analysis_name, directory=self.directory)
def _add_adm_splines(self):
"""Adds splines to the adm file
"""
add_splines_to_adm(os.path.join(self.directory, self.adm_filename), self.pason_inputs)
def _add_acf_splines(self):
"""Adds splines to the acf file
"""
add_splines_to_acf(os.path.join(self.directory, self.acf_filename))