Source code for adamspy.adripy.event

"""Contains the DrillEvent class
"""
import os
import logging
import copy
import thornpy
from . import TMPLT_ENV
from .utilities import read_TO_file, get_cdb_path, get_full_path

# Configure Logging
LOG = logging.getLogger(__name__)

[docs]class DrillEvent(): """ Creates an object with all data necessary to write a drill event. Once the DrillEvent is instanced ramp parameters must be defined before the DrillEvent is written to an event file. Note ---- After instancing the class, at least one simulation step must be added using the `add_simulation_step()` method. Also, at least one ramp must be added for each of the four drilling parameters (i.e. wob, rpm, gpm, rop) using the `add_ramp()` method. All other parameters in the event file can be specified when the DrillEvent is instanced using kwargs or they can be set later using: >>> drill_event.parameters[parameter] = value Note ---- WOB should be in lbf Attributes ---------- parameters : dict Dictionary of parameters that make up an Adams Drill string and would be found in an Adams Drill String file (.str). The keys of the dictionary are the parameter names that would be seen in the string file and the values of the dictionary are the values that would be seen in the string file. filename : str Name of the event file (.evt) in which this event is stored. This attribute is initially empty and is populated by the `write_to_file()` method. """ _SCALAR_PARAMETERS = [ 'Units', 'Event_Name', 'Drive_Type', 'Measurement_Tool', 'Start_Depth', 'Off_Bottom', 'Initial_Drive_Torque', 'Motor_Type', 'Filter_Time_Constant', 'Motor_Bend_Start', 'Motor_Bend_Ramp', 'Mud_Density', 'Impact_Damping_Penetration', 'Impact_Exponent', 'MWD_Pulsing', 'NperRev', 'N', 'S_threshold', 'C_hi', 'Plotting_4D', 'Start_Time', 'End_Time', 'Plotting_Interval', 'Start_Distance', 'End_Distance' ] _DEFAULT_PARAMETER_SCALARS = { 'Units': 'Imperial', 'Drive_Type': 'WITH_MOTOR', 'Measurement_Tool': 'TOS', 'Initial_Drive_Torque': 0, 'Motor_Type': '3D', 'Filter_Time_Constant': 0.05, 'Motor_Bend_Start': 1.0, 'Motor_Bend_Ramp': 9.0, 'Mud_Density': 75, 'Impact_Damping_Penetration': 0.005, 'Impact_Exponent': 1.05, 'MWD_Pulsing': 'On', 'NperRev': 'off', 'N': 1, 'S_threshold': 0.55, 'C_hi': 1.05, 'Plotting_4D': 'off', 'Start_Time': 160, 'End_Time': 200, 'Plotting_Interval': 0.1, 'Start_Distance': 2.0, 'End_Distance': 100.0 } _TABLE_PARAMETERS = [ 'ROTARY_RPM', 'MOTOR', 'FLOW_RATE', 'WOB', 'ROP', 'NPERREV', 'DYNAMICS' ] _DEFAULT_PARAMETER_TABLES = { 'ROTARY_RPM': ((),(),()), 'MOTOR': ((0,), (1,), (1,)), 'FLOW_RATE': ((),(),()), 'WOB': ((),(),()), 'ROP': ((),(),()), 'NPERREV': ((0,),(1,)), 'DYNAMICS': ((), ()) } _CDB_TABLE = 'events.tbl' _EXT = 'evt' def __init__(self, event_name, start_depth, off_bottom, **kwargs): """Initializes the :class:`DrillEvent` class. Parameters ---------- event_name : str Name of the event. start_depth : float Start depth of the event in feet. off_bottom : float Starting distance from bottom. """ self.parameters = kwargs self.parameters['Event_Name'] = event_name self.parameters['Start_Depth'] = start_depth self.parameters['Off_Bottom'] = off_bottom # Apply default parameters self._apply_defaults() self.filename = ''
[docs] def add_simulation_step(self, end_time, output_step_size=0.05, clear_existing=False): """ Adds a dynamic simulation step. Parameters ---------- end_time : float End time of the step in seconds. output_step_size : float Output step size of the step in seconds. (defaults is 0.05) """ if clear_existing: self.parameters['DYNAMICS'] = [[],[]] self.parameters['DYNAMICS'][0].append(end_time) self.parameters['DYNAMICS'][1].append(output_step_size) self.parameters['_DYNAMICS'] = zip(*self.parameters['DYNAMICS'])
[docs] def add_ramp(self, parameter, start_time, ramp_duration, delta, clear_existing=False): """Adds a ramp to the specified ramp parameter. Parameters ---------- parameter : str ramp parameter, options are 'TOP_DRIVE', 'WOB', 'FLOW_RATE', or 'ROP' start_time : float Start time of the ramp. ramp_duration : float Duration of the ramp. delta : float Delta of the ramp. Units should be in **rpm**, **lbf**, **gpm**, or **ft/sec**. clear_existing : bool If true, existing ramps for the specified parameter will be deleted. """ if clear_existing: self.parameters[parameter] = [[],[],[]] self.parameters[parameter][0].append(start_time) self.parameters[parameter][1].append(ramp_duration) self.parameters[parameter][2].append(delta) self.parameters['_' + parameter] = zip(*self.parameters[parameter])
[docs] def write_to_file(self, directory=None, filename=None, cdb=None): """Creates an event file from the DrillEvent object. Note ---- Uses file version 1.0 Parameters ---------- write_directory : str Directory in which to write the file. Defaults to current working directory. filename : str Name of the file to write. Defaults to self.parameters['EVENT_NAME'] cdb : str Name of the cdb in which to write the file. This argument overrides the write_directory. Raises ------ ValueError Raised if not all parameters have been defined. """ # Raise an error if the parameters can't be validated if not self.validate(): raise ValueError('The parameters could not be validated.') if directory is not None: # If the write_directory argument is passed if filename is None: # If the filename argument is not passed, set the filename # as the Event_Name in the file filename = self.parameters['Event_Name'] else: # If the filename argument is passed, strip the path and the # extension filename = os.path.split(filename)[-1].replace(f'.{self._EXT}','') # Set the filepath to the filename in the given directory filepath = os.path.join(directory, f'{filename}.{self._EXT}') elif cdb is not None: # If the write_directory argument is not passed, but the cdb # argument is if filename is None: # If the filename argument is not passed, set the filename # as the Event_Name in the file filename = self.parameters['Event_Name'] else: # If the filename argument is passed, strip the path and the # extension filename = os.path.split(filename)[-1].replace(f'.{self._EXT}','') # Set the filepath to the file in the cdb filepath = get_full_path(os.path.join(f'<{cdb}>', self._CDB_TABLE, f'{filename}.{self._EXT}')) elif filename is not None: # If the filename argument is given filepath = thornpy.utilities.convert_path(filename) else: # Raise an error if none of the arguments are provided raise ValueError('Ether directory, filename, or cdb must be passed.') event_template = TMPLT_ENV.from_string(open(os.path.join(os.path.dirname(__file__), 'templates', f'template.{self._EXT}')).read()) with open(filepath, 'w') as fid: fid.write(event_template.render(self.parameters)) self.filename = get_cdb_path(filepath) return self.filename
[docs] def validate(self): """ Determines if all parameters have been set Returns ------- bool `True` if all parameters have been set. Otherwise `False`. """ validated = True # Check that all parameters exist in the self.parameters dictionary for param_name in self._SCALAR_PARAMETERS: if param_name not in self.parameters: LOG.critical('%s not found in %s', param_name, self.filename) validated = False for param_name in self._TABLE_PARAMETERS: if not all([elem for elem in self.parameters[param_name]]): LOG.critical('%s not found in %s', param_name, self.filename) validated = False return validated
[docs] @classmethod def read_from_file(cls, filename): """Reads an Adams Drill event (.evt) file and sets :attr:`DrillEvent.parameters` based on data in the file. Parameters ---------- filename : str Filename of an Adams Drill event (.evt) file. """ # Read the TO data into a dictionary tiem_orbit_data = read_TO_file(get_full_path(filename)) # Create an empty event object event = cls('','','') # Extract the DrillEvent parameters from the TO dictionary event._get_params_from_TO_data(tiem_orbit_data) #pylint: disable=protected-access # Set the filename attribute event.filename = filename return event
def _apply_defaults(self): """ Applies defaults from class variables """ # Applies normal parameter defaults for scalar_parameter, value in self._DEFAULT_PARAMETER_SCALARS.items(): if scalar_parameter not in self.parameters: self.parameters[scalar_parameter] = copy.copy(value) # Applies defaults to all ramp parameters for table_parameter, table in self._DEFAULT_PARAMETER_TABLES.items(): self.parameters[table_parameter] = [list(tup) for tup in table] self.parameters['_' + table_parameter] = zip(*self.parameters[table_parameter]) def _get_params_from_TO_data(self, tiem_orbit_data): """Reads the event parameters out of a dictoinary of Tiem Orbit data generated by :func:`adamspy.adripy.utilities.read_TO_file` Note ---- Adams 2018 uses *delta* values for the surface parameters where as Adams 2019 uses *at end* values. :class:`DrillEvent` always uses *delta* values, so any *at end* values are converted in this method. Parameters ---------- tiem_orbit_data : dict Dictionary of Tiem Orbit data Raises ------ ValueError An event parameter could not be found """ file_version = float(tiem_orbit_data['ADAMS_DRILL_HEADER']['file_version']) for param in self._SCALAR_PARAMETERS: # For each event parameter initialize a found flag found = False for block in tiem_orbit_data: # For each block in the TO file if param.lower() in ['start_time', 'end_time']: # If finding the End_Time, or Start_Time parameters, only look in the PLOT_4D block if block.lower() == 'plot_4d': self.parameters[param] = tiem_orbit_data[block][param.lower()] found = True break elif param.lower() in tiem_orbit_data[block]: # If the parameter is in this block, set the parameter and break the loop self.parameters[param] = tiem_orbit_data[block][param.lower()] found = True break elif param.lower() != 'end_time': # If the parameter is not in this block, find all the sub blocks # and look for the parameter inside each sub block sub_blocks = [header for header in tiem_orbit_data[block] if isinstance(tiem_orbit_data[block][header], dict)] for sub_block in sub_blocks: # For each sub_block in the block if param.lower() in tiem_orbit_data[block][sub_block]: # If the parameter is in the sub block, set the parameter and break the loop self.parameters[param] = tiem_orbit_data[block][sub_block][param.lower()] found = True break if found: break # Raise a value error if the parameter isn't found. if not found: raise ValueError(f'{param} not found!') for param in self._TABLE_PARAMETERS: # For each event parameter initialize a found flag found = False blocks = [header for header in tiem_orbit_data if isinstance(tiem_orbit_data[header], dict)] for block in blocks: # For each block in the TO file, get the sub blocks sub_blocks = [header for header in tiem_orbit_data[block] if isinstance(tiem_orbit_data[block][header], dict)] # vvvvvvvvvvvv This is a bandaid to make this code backward compatible vvvvvvvvvvvv if 'TOP_DRIVE' in sub_blocks: # Replace 'TOP_DRIVE' with 'ROTARY_RPM' for _i in range(len(sub_blocks)): if sub_blocks[_i] == 'TOP_DRIVE': sub_blocks[_i] = 'ROTARY_RPM' break tiem_orbit_data[block]['ROTARY_RPM'] = tiem_orbit_data[block].pop('TOP_DRIVE') if 'PUMP_FLOW' in sub_blocks: # Replace 'PUMP_FLOW' with 'FLOW_RATE' for _i in range(len(sub_blocks)): if sub_blocks[_i] == 'PUMP_FLOW': sub_blocks[_i] = 'FLOW_RATE' break tiem_orbit_data[block]['FLOW_RATE'] = tiem_orbit_data[block].pop('PUMP_FLOW') # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ if param.upper()==block and block=='DYNAMICS': # If the parameter is DYNAMICS and so is the current block self.parameters[param][0] = tiem_orbit_data[block]['end_time'] self.parameters[param][1] = tiem_orbit_data[block]['output_step_size'] found = True break elif param.upper()==block and param.upper(): # If the current parameter is the block, add to the parameters dictionary self.parameters[param][0] = tiem_orbit_data[block]['start_time'] self.parameters[param][1] = tiem_orbit_data[block]['ramp_duration'] if len(self.parameters[param])>2: last_col = [col for col in tiem_orbit_data[block] if col not in ['start_time','ramp_duration']][0] self.parameters[param][2] = tiem_orbit_data[block][last_col] found = True break elif param.upper() in sub_blocks: # If the current parameter is in this sub block, add to the parameters dictionary self.parameters[param][0] = tiem_orbit_data[block][param.upper()]['start_time'] if 'ramp_duration' in tiem_orbit_data[block][param.upper()]: # If the .evt file uses *ramp duration* notation, no conversion necessary self.parameters[param][1] = tiem_orbit_data[block][param.upper()]['ramp_duration'] elif 'end_time' in tiem_orbit_data[block][param.upper()]: # If the .evt file uses *at end* notation, convert to *ramp duration* notation self.parameters[param][1] = [end - start for end, start in zip(tiem_orbit_data[block][param.upper()]['end_time'], tiem_orbit_data[block][param.upper()]['start_time'])] if len(self.parameters[param])>2: list_params = [par for par in tiem_orbit_data[block][param.upper()] if isinstance(tiem_orbit_data[block][param.upper()][par], list)] last_col = [col for col in list_params if col not in ['start_time','ramp_duration', 'end_time']][0] if 'delta' in last_col: # If .evt file uses 2018 notation (delta) self.parameters[param][2] = tiem_orbit_data[block][param.upper()][last_col] elif 'at_end' in last_col: # If .evt file uses 2019 notation (value at end) at_end_values = tiem_orbit_data[block][param.upper()][last_col] if len(at_end_values) < 2: # If there is only one value (the delta and the end value are the same) self.parameters[param][2] = at_end_values else: # If there are multiple values, calculate the deltas # The first "delta" value is the same as the first "at end" value delta_values = at_end_values[:1] # Loop over the "at end" values to calculate the "delta" values for at_end_value, prev_at_end_value in zip(at_end_values[1:], at_end_values[:-1]): delta_values.append(at_end_value - prev_at_end_value) self.parameters[param][2] = delta_values # Check if need to convert wob from klbf to lbf if file_version >= 2.0 and param.lower() == 'wob': # If this is a file version 2.0 and the parameter is wob, convert from klbf to lbf self.parameters[param][2] = [p*1000 for p in self.parameters[param][2]] found = True break if not found: raise ValueError(f'{param} not found!') self.parameters['_' + param] = zip(*self.parameters[param])