"""adripy contains functions for manipulating MSC Adams Drill files
"""
import os
import re
import subprocess
import time
import platform
import thornpy
import numpy as np
from .constants import TO_PARAMETER_PATTERN, TO_LENGTH_PARAM, ADRILL_IDS, ADRILL_PLUGIN_VAR, ACF_FUNNEL_PATTERN, ACF_INTEGRATOR_ERROR_PATTERN
from . import TMPLT_ENV
[docs]def turn_measure_on(string_file, tool_types=[], tool_numbers=[], tool_names=[]):
"""Modify a string file to turn measure on for teh designated tools. Tools may be designated by type, number (stack order), or name.
Parameters
----------
string_file : str
Path to an MSC Adams Drill string file. Accepts full path or cdb alias.
tool_types : list, optional
List of tool types as seen in the string file (e.g. pdc_bit, motor, stabilizer)
tool_numbers : list, optional
List of stack orders corresponding to tools in the string
tool_names : list, optional
List of tool names
Returns
-------
int
number of tools measured
"""
n = 0
mark = False
# Convert to full path if cdb alias was used
string_file = get_full_path(string_file)
with open(string_file,'r') as fid_str, open(string_file.replace('.str','.tmp'),'w') as fid_str_tmp:
for line in fid_str:
if line.startswith('$'):
fid_str_tmp.write(line)
# If this is a measure line that has been marked
elif mark and ' measure' in line.lower():
fid_str_tmp.write(" Measure = 'yes'\n")
n += 1
mark = False
# Mark if the tool matches a designated stack order
elif ' stack_order' in line.lower():
if int(line.replace(' ','').replace('\n','').split('=')[-1]) in tool_numbers:
mark = True
fid_str_tmp.write(line)
# Mark if the tool matches a designated tool type
elif ' type' in line.lower():
if line.replace(' ','').replace('\n','').split("'")[-2] in tool_types:
mark = True
fid_str_tmp.write(line)
# Mark if the tool matches a designated tool name
elif ' name' in line.lower():
if line.replace(' ','').replace('\n','').split("'")[-2] in tool_names:
mark = True
fid_str_tmp.write(line)
else:
fid_str_tmp.write(line)
os.remove(string_file)
os.rename(string_file.replace('.str','.tmp'), string_file)
return n
[docs]def get_adrill_cdbs(adrill_user_cfg, adrill_shared_cfg=None):
"""Return the names and locatinos of all user defined MSC Adams Drill databases (cdbs)
Parameters
----------
adrill_user_cfg : str
Full path to an Adams Drill user configuration file. This hould be in the users HOME directory.
adrill_shared_cfg : str
Full path to an Adams Drill shared configuration file. This should be in the Adams Drill installation directory. (the default is None, which means that only user cdbs will be returned.)
Returns
-------
dict
A dictionary in which the cdb names are keys and the cdb locations are values.
"""
cdbs = {}
with open(adrill_user_cfg,'r') as fid:
for line in fid:
if line.startswith('DATABASE'):
# try:
cdb_name = re.split('[\t ]+',line.lstrip())[1]
cdb_loc = thornpy.utilities.convert_path(re.split('[\t ]+', line, maxsplit=2)[-1].replace('\n','').replace('$HOME', os.path.expanduser('~')))
cdbs[cdb_name] = cdb_loc
# except:
# raise cdbError('The following line in {} could not be interpreted.\n\n{}'.format(adrill_user_cfg,line))
if adrill_shared_cfg:
top_dir = os.path.split(os.path.split(adrill_shared_cfg)[0])[0]
with open(adrill_shared_cfg,'r') as fid:
for line in fid:
if line.startswith('DATABASE'):
# try:
cdb_name = re.split('[\t ]+', line, maxsplit=2)[1]
cdb_loc = thornpy.utilities.convert_path(re.split('[\t ]+', line, maxsplit=2)[-1].replace('\n','').replace('$HOME', os.path.expanduser('~')).replace('$topdir', top_dir))
cdbs[cdb_name] = cdb_loc
# except:
# raise cdbError('The following line in {} could not be interpreted.\n\n{}'.format(adrill_shared_cfg,line))
return cdbs
[docs]def get_TO_param(filename, requested_parameter):
"""Return the value of a parameter in a tiem orbit file
Parameters
----------
filename : str
Path to a tiem orbit file. Accepts full path or cdb alias.
requested_parameter : str
Name of a parameter in TO_file
Returns
-------
:obj:`str` or :obj:`int` or :obj:`float`
The value assigned to TO_param in TO_file
"""
# Check if CDB notation used and Convert
filename = get_full_path(filename)
# Initialize a flag indicating that the parameter has not yet
# been found
param_found = False
# Read in the tiem orbit file
with open(filename, 'r') as fid:
lines = fid.readlines()
for line in lines:
# For each line in the file, check if we are at a parameter line
if TO_PARAMETER_PATTERN.match(line):
# If we're at a parameter line, check if it's the requested parameter.
[parameter, value] = re.sub('[\\s\\n]','',line).split('=')
if parameter.lower() == requested_parameter.lower():
# If we're at the requested parameter line, check if it's an adams string or a number
if "'" in value:
# If value is an adams string
requested_value = value.replace("'",'')
else:
# If value is a number
requested_value = int(value) if thornpy.numtype.str_is_int(value) else float(value)
# Mark the parameter as found
param_found = True
break
# Raise an error if the parameter isn't found
if not param_found:
raise ValueError('{} does not contain the parameter {}'.format(filename,requested_parameter))
return requested_value
[docs]def fullNotation_to_cdbNotation(string_file):
"""
Replaces all references in a string file that use full path notation to use CDB notation
Parameters
----------
string_file : str
Full path to an Adams Drill string file
Returns
-------
int
Number of replacements made
"""
# Initialize a variable to count the number of replacements
n = 0
with open(string_file, 'r') as fid_str, open(string_file.replace('.str','.tmp'), 'w') as fid_str_tmp:
# Open the string file
for line in fid_str:
# For each line in the file
if re.match(' *Property_File *= *.*', line, flags=re.IGNORECASE) and '<' not in line:
# If this is a propery file line that doesn't use cdb_notation, get the property file
property_file = thornpy.utilities.convert_path(line.split("'")[1])
# Rewrite the line replacing the property file with the cdb path property file
new_line = ' Property_File = \'{}\''.format(get_cdb_path(property_file))
fid_str_tmp.write(new_line)
# Increment the number of replacements
n += 1
else:
# If this is not a property file line that doesn't use cdb_notation, write the line unchanged
fid_str_tmp.write(line)
os.remove(string_file)
os.rename(string_file.replace('.str','.tmp'), string_file)
return n
[docs]def cdbNotation_to_fullNotation(string_file):
"""Replaces all references in a string file that use CDB notation to use full path notation
Parameters
----------
string_file : str
Full path to an Adams Drill string file
Returns
-------
int
Number of replacements made
"""
n = 0
# Convert to full filepath if cdb alias used
string_file = get_full_path(string_file)
with open(string_file, 'r') as fid_str, open(string_file.replace('.str','.tmp'), 'w') as fid_str_tmp:
# Open the string file
for line in fid_str:
# For each line in the file
if re.match(' *Property_File *= *.*', line, flags=re.IGNORECASE) and '<' in line:
# If this is a propery file line that uses cdb_notation, get the property file
property_file = thornpy.utilities.convert_path(line.split("'")[1])
# Rewrite the line replacing the property file with the cdb path property file
new_line = ' Property_File = \'{}\''.format(get_full_path(property_file))
fid_str_tmp.write(new_line)
# Increment the number of replacements
n += 1
else:
# If this is not a property file line that uses cdb_notation, write the line unchanged
fid_str_tmp.write(line)
os.remove(string_file)
os.rename(string_file.replace('.str','.tmp'), string_file)
return n
[docs]def get_cdb_path(full_filepath):
"""Given the full path to a file located in a cdb, get_cdb_path returns the path to a file with the cdb path replaced by the cdb alias. `full_filepath` will be returned if no cdb is found in the path.
Parameters
----------
full_filepath : str
Full file path to a file in a cdb
Returns
-------
str
Path to a file with the cdb path replaced by the cdb alias.
"""
# Normalize the filepath
full_filepath = thornpy.utilities.convert_path(full_filepath)
# Find a string that looks like a database alias
match = re.search('^<.+>', full_filepath)
if match is not None:
# Return the given filepath if filepath looks like a cdb filepath
cdb_filepath = full_filepath
else:
# If full_filepath does not use cdb notation, get a dictionary of the known cdbs
cdbs = get_adrill_cdbs(os.environ['ADRILL_USER_CFG'], os.environ['ADRILL_SHARED_CFG'])
cdb_loc = None
for name, loc in cdbs.items():
# For each cdb in the cfg files
if loc in full_filepath:
# If the cdb location is in the filepath,
cdb_loc = loc
cdb_name = name
break
# Set the cdb_filepath by replacing the cdb path with the alias in full_filepath, if no cdb path was found, set it equal to the full filepath
cdb_filepath = full_filepath.replace(cdb_loc, '<{}>'.format(cdb_name)) if cdb_loc is not None else full_filepath
return cdb_filepath
[docs]def get_full_path(cdb_filepath):
"""Given the cdb path to a file located in a cdb, returns the path to a file with the cdb alias replaced by the cdb location. If `cdb_filepath` does not use cdb notation, returns the full filepath (convertes a relative filepath to a full filepath).
Note
----
`cdb_filepath` is **not** case sensitive.
Parameters
----------
cdb_filepath : str
Full file path to a file in a cdb.
Returns
-------
str
Path to a file with the cdb path replaced by the cdb alias.
"""
# Find a string that looks like a database alias
match = re.search('^<.+>', cdb_filepath)
if match is None:
# Return the given filepath if filepath looks like a full filepath
full_filepath = cdb_filepath
else:
# If cdb_filepath uses cdb notation, pull the database name out of the group
cdb_in_filepath = match.group(0).replace('<', '').replace('>', '')
# Get a dictionary of the known cdbs
cdbs = get_adrill_cdbs(os.environ['ADRILL_USER_CFG'], os.environ['ADRILL_SHARED_CFG'])
# Get the location of cdb_in_filepath
cdb_loc = None
for name, loc in cdbs.items():
# For each cdb in the cfg files
if cdb_in_filepath.lower() == name.lower():
# If the cdb name from the cfg files equals the cdb name found in the filepath, store that cdbs location
cdb_loc = loc
break
# Raise an error if cdb_in_filepath is not in the cdbs dictionary
if cdb_loc is None:
raise ValueError('{} not in {} OR {}!'.format(cdb_in_filepath, os.environ['ADRILL_USER_CFG'], os.environ['ADRILL_SHARED_CFG']))
full_filepath = cdb_filepath.replace(match.group(0), cdb_loc)
return full_filepath
[docs]def get_cdb_location(cdb_name):
"""Returns the location of cdb 'cdb_name'
Parameters
----------
cdb_name : str
Alias of a cdb
Returns
-------
str
Location of cdb
"""
cdbs = get_adrill_cdbs(os.environ['ADRILL_USER_CFG'], os.environ['ADRILL_SHARED_CFG'])
return cdbs[cdb_name]
[docs]def get_string_length(string_file, bha_only=False):
"""Gets the total length of the drill string defined in `string_file`
Parameters
----------
string_file : str
Full path to an Adams Drill string file
bha_only : bool
If True, ignores the length of top most physical string and equivalent upper string
Returns
-------
float
Cumulative length of the string
"""
# Convert to full filepath if cdb alias used
string_file = get_full_path(string_file)
tool_lengths = []
with open(string_file, 'r') as fid:
# Open the string file
for line in fid:
# For each line in the string file
if re.match(' *Property_File *= *.*', line, flags=re.IGNORECASE):
# If at a property file line, get the property file
tool_file = get_full_path(thornpy.utilities.convert_path(line.split("'")[1]))
# Open the tool file
with open(tool_file, 'r') as fid_tool:
file_type = ''
for tool_line in fid_tool:
# For each line in the tool file
if file_type and 'top_drive' not in file_type.lower() and tool_line.replace(' ', '').split('=')[0] in TO_LENGTH_PARAM[file_type.lower()] and not tool_line.startswith('$'):
# If this parameter is the length parameter, get the tool length
tool_length = float(tool_line.replace(' ', '').split('=')[1])
tool_lengths.append(tool_length)
break
elif re.match(' *File_Type *= *.*', tool_line, flags=re.IGNORECASE):
file_type = tool_line.replace(' ', '').replace("'",'').replace('\n','').split('=')[1]
if ' number_of_joints' in line.lower():
n = int(line.replace(' ','').split('=')[1])
tool_lengths[-1] = tool_lengths[-1]*n
# Sum the tool lengths to get the string length
string_length = sum(tool_lengths[:-2]) if bha_only else sum(tool_lengths)
return string_length
[docs]def get_bha_length(string_file):
"""Gets the total length of the drill string defined in string_file NOT including the equivalent upper string and highest most physical string
Parameters
----------
string_file : str
Full path to an Adams Drill string file
Returns
-------
float
Cumulative length of the string
"""
bha_length = get_string_length(string_file, bha_only=True)
return bha_length
[docs]def add_cdb_to_cfg(name, loc, cfg_file):
"""Adds cdb of name `name` and path `loc` to `cfg_file`
Parameters
----------
name : str
Name of cdb (e.g. example_database)
loc : str
path to cdb (e.g. C:\\example_database.cdb)
cfg_file : str
Full filename of an adams drill configuration file
Raises
------
ValueError
Raised if a cdb of the given name or path already exists in the given config file
PermissionError
Raised if the user does not have permissiosn to edit the given config file
"""
loc = thornpy.utilities.convert_path(loc)
cdbs = {}
# Read config file
with open(cfg_file, 'r') as fid:
lines = fid.readlines()
# Pull cdbs from config file into a dictionary
for line in lines:
if line.lower().startswith('database'):
splt = re.split('[ \t]+', line.replace('\n',''), maxsplit=2)
cdbs[splt[1]] = thornpy.utilities.convert_path(splt[2])
# Check if cdb name already exists
if name in cdbs:
raise ValueError('{} already exists in {}.'.format(name, cfg_file))
# Check if cdb location already exists
for cdb_name in cdbs:
if loc is cdbs[cdb_name]:
raise ValueError('{} already exists in {}'.format(loc, cfg_file))
# Add new cdb to cdbs dictionary
cdbs[name] = loc
# Rewrite config file with new cdb
try:
with open(cfg_file, 'w') as fid:
cdbs_written = False
for line in lines:
if not line.lower().startswith('database'):
fid.write(line)
elif not cdbs_written:
for cdb_name, cdb_loc in cdbs.items():
text = 'DATABASE {} {}\n'.format(cdb_name, cdb_loc)
fid.write(text)
cdbs_written = True
except PermissionError:
raise PermissionError('You do not have permission to edit {}.'.format(cfg_file))
[docs]def remove_cdb_from_cfg(name, cfg_file):
"""Removes cdb of name `name` from `cfg_file`
Parameters
----------
name : str
Name of cdb (e.g. example_database)
cfg_file : str
Full filename of an adams drill configuration file
Raises
------
ValueError
Raised if a cdb of the given name or path already exists in the given config file
PermissionError
Raised if the user does not have permissiosn to edit the given config file
"""
# Initialize cdbs dictionary
cdbs = {}
# Read config file
with open(cfg_file, 'r') as fid:
lines = fid.readlines()
# Pull cdbs from config file into a dictionary
for line in lines:
if line.lower().startswith('database'):
splt = re.split('[ \t]+', line.replace('\n',''), maxsplit=2)
cdbs[splt[1]] = thornpy.utilities.convert_path(splt[2])
# Check if cdb name exists
if name not in cdbs:
raise ValueError('{} does not exist in {}.'.format(name, cfg_file))
# Remove cdb from cdbs dictionary
del cdbs[name]
# Rewrite config file with the cdb removed
try:
with open(cfg_file, 'w') as fid:
cdbs_written = False
for line in lines:
if not line.lower().startswith('database'):
fid.write(line)
elif not cdbs_written:
for cdb_name, cdb_loc in cdbs.items():
text = 'DATABASE {} {}\n'.format(cdb_name, cdb_loc)
fid.write(text)
cdbs_written = True
except PermissionError:
raise PermissionError('You do not have permission to edit {}.'.format(cfg_file))
[docs]def create_cfg_file(filename, database_paths):
"""Create a cfg file with the databases whose paths are given in the database_paths list. Also sets the ADRILL_USER_CONFIG environment variable equal to filename.
Parameters
----------
filename : str
Filename for the new configuration file.
database_paths : list
List of database paths to include in the configuration file.
"""
# Create a databases dictionary
databases = []
for path in database_paths:
name = os.path.split(path)[1].replace('.cdb','')
databases.append({'name': name, 'path': path})
# Get the cfg template
cfg_template = TMPLT_ENV.from_string(open(os.path.join(os.path.dirname(__file__), 'templates', 'template.cfg')).read())
# Write the new cfg file
with open(filename ,'w') as fid:
fid.write(cfg_template.render(databases=databases))
os.environ['ADRILL_USER_CFG'] = os.path.join(os.getcwd(), filename)
[docs]def build(string_file, solver_settings_file, working_directory, output_name=None, wait=True):
"""Builds adm, acf, and cmd files from string, event, and solver settings files.
Parameters
----------
string : str
Filename of a drill string (.str) file.
solver_settings : str
Filename of a solver settings (.ssf) file.
working_directory : str
Path to the directory to put the adm, acf, and cmd
output_name : str
Base name of the adm, acf, and cmd files. (the default is none, which redefines the `ouput_name` to be the same as the 'OutputName' parameter in the string file)
wait : bool
If True, code execution waits until the build process terminates before moving on. (default is True)
Returns
-------
str
Filename of the adams dataset (.adm) file.
str
Filename of the adams command (.acf) file.
str
Filename of the adams view command (.cmd) file.
"""
# Set the output name
if output_name is None:
output_name = get_TO_param(string_file, 'OutputName')
# Set the names of the output files
adm_file = output_name + '.adm'
acf_file = output_name + '.acf'
cmd_file = output_name + '.cmd'
# Format the string filename
adams_formatted_str_filename = thornpy.utilities.convert_path(get_full_path(string_file)).replace(os.sep, '/')
# Set the event filename and solver settings file name (relative paths if in the same directory)
event_file = thornpy.utilities.convert_path(get_TO_param(string_file, 'Event_Property_File'))
if os.path.dirname(event_file) == thornpy.utilities.convert_path(working_directory):
evt_name = os.path.split(event_file)[-1]
else:
evt_name = thornpy.utilities.convert_path(event_file)
if os.path.dirname(solver_settings_file) == thornpy.utilities.convert_path(working_directory):
ssf_name = os.path.split(solver_settings_file)[-1]
else:
ssf_name = os.path.split(solver_settings_file)
# Create aview script
cmds = []
cmds.append(f'plugin load plugin_name = {ADRILL_PLUGIN_VAR}\n')
cmds.append(f'ds TOStart string_cfg_file = "{adams_formatted_str_filename}"\n')
cmds.append(f'adrill build acf ssf="{ssf_name}" evt="{evt_name}"\n')
cmds.append(f'file adams write file="{adm_file}"\n')
cmds.append(f'simulation script write_acf sim_script_name = "{output_name}" file_name = "{acf_file}"\n')
cmds.append(f'file command write entity_name = "{output_name}" file_name = "{cmd_file}"')
# If the working directory doesn't exit, create it
if not os.path.exists(working_directory):
os.makedirs(working_directory)
# Create the build.cmd script
with open(os.path.join(working_directory, 'build.cmd'), 'w') as fid:
for cmd in cmds:
fid.write(cmd)
# Run adams to generate adm, acf, cmd
if platform.system() == 'Windows':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
process = subprocess.Popen('"{}" aview ru-s b build.cmd'.format(os.environ['ADAMS_LAUNCH_COMMAND']), cwd=working_directory, startupinfo=startupinfo)
else:
process = subprocess.Popen([os.environ['ADAMS_LAUNCH_COMMAND'], '-c', 'aview', 'ru-standard', 'b', 'build.cmd', 'exit'], cwd=working_directory)
adm_file = os.path.join(working_directory, adm_file)
acf_file = os.path.join(working_directory, acf_file)
cmd_file = os.path.join(working_directory, cmd_file)
# Wait for the process to finish (if requested)
if wait:
while True:
if process.poll() is not None:
# If the process has terminated, stop waiting
break
elif os.path.exists(adm_file) and os.path.exists(acf_file) and os.path.exists(cmd_file):
# If the process hasn't terminated, but the adm file acf file and cmd files all exist
time.sleep(5)
break
else:
# If the model is still being built
time.sleep(1)
return adm_file, acf_file, cmd_file
TO_BLOCK_HEADER_PATTERN = re.compile('^\\[[_0-9a-zA-Z]+\\]\\s*$')
TO_SUBBLOCK_HEADER_PATTERN = re.compile('^\\([_0-9a-zA-Z]+\\)\\s*$')
TO_TABLE_HEADER_PATTERN = re.compile('^\\{(\\s*[_0-9a-zA-Z])+\\s*\\}\\s*$')
TO_TABLE_LINE_PATTERN = re.compile("^((\\s*\\'[_0-9a-zA-Z]+\\')+)|((\\s*-?[\\+-\\.eE0-9]+)+)\\s*(<- use this format for constant values)?\\s$")
[docs]def read_TO_file(filename):
"""Reads a Tiem Orbit file into a dictionary of parameters
Example
-------
This example prints the value of the `Integrator` parameter from the `DYNAMICS` block of a solver settings file.
>>> ssf = read_TO_file('example.ssf')
>>> integ = ssf['DYNAMICS']['Integrator']
>>> print(integ)
HHT
This example prints `Maxit` from the `FUNNEL` subblock of the `STATICS` block of a solver settings file.
>>> ssf = adripy.read_TO_file('example.ssf')
>>> maxit = ssf['STATICS']['FUNNEL']['maxit']
>>> print(maxit)
[100, 50, 50, 50]
Parameters
----------
filename : str
Filename of the Tiem Orbit file
Returns
-------
dict
Nested :obj:`dict` of the blocks, subblocks, and parameters.
Raises
------
TiemOrbitSyntaxError
Raised if the Tiem Orbit syntax is not recognized
"""
if not os.path.exists(get_full_path(thornpy.utilities.convert_path(filename))):
raise FileNotFoundError(f'{filename} does not exist!')
# Read in the TO file
with open(get_full_path(filename), 'r') as fid:
lines = fid.readlines()
current_block = None
current_subblock = None
current_table_headers = []
current_table_data = {}
parameters = {}
for line in lines:
# For each line determine if we are at a new Block, new SubBlock, or Table
if TO_BLOCK_HEADER_PATTERN.match(line):
# if a block is encountered, reset currents
current_block = re.sub('[\\[\\]\\s\\n]','',line)
current_subblock = None
current_table_headers = []
current_table_data = {}
# Create a new parameters dictionary entry
parameters[current_block] = {}
elif TO_SUBBLOCK_HEADER_PATTERN.match(line):
# If a subblock is encountered, reset currents
current_subblock = re.sub('[\\(\\)\\s\\n]','',line)
current_table_headers = []
current_table_data = {}
# Create a new parameters dictionary entry
parameters[current_block][current_subblock] = {}
elif TO_TABLE_HEADER_PATTERN.match(line):
# If a table is encountered, get the table headers
current_table_headers = re.split(' +', line.replace('{','').replace('}','').strip())
# Make a dictionary of empty lists to put the table data in
current_table_data = {header: [] for header in current_table_headers}
# Add empty table data to parameters dictionary
for header in current_table_headers:
if current_subblock is not None:
parameters[current_block][current_subblock][header.lower()] = []
else:
parameters[current_block][header.lower()] = []
elif TO_PARAMETER_PATTERN.match(line):
[parameter, value] = re.split('\\s*=\\s*', line.strip())
# Format the value
if "'" in value:
# If value is an adams string
value = value.replace("'",'').strip()
else:
# If value is a number
value = int(value) if thornpy.numtype.str_is_int(value) else float(value)
# Add parameter to parameters dictionary
if current_subblock is not None:
parameters[current_block][current_subblock][parameter.lower()] = value
else:
parameters[current_block][parameter.lower()] = value
elif current_table_headers != []:
# If already at a table
if TO_TABLE_LINE_PATTERN.match(line):
# If the current line looks like a table entry
values = re.split(' +', line.replace('<- use this format for constant values', '').strip())
if len(values) != len(current_table_headers):
# If the number of values doesn't match the number of table headers raise an error
raise TiemOrbitSyntaxError(f'Incorrect syntax found while processing a table in the {current_block} block of {filename}!')
# Add the value to the table data dictionary
for value, header in zip(values, current_table_headers):
if "'" in value:
# If value is an adams string
current_table_data[header].append(value.replace("'",''))
else:
# If value is a number
current_table_data[header].append(int(value) if thornpy.numtype.str_is_int(value) else float(value))
# Add table data to parameters dictionary
for header in current_table_headers:
if current_subblock is not None:
parameters[current_block][current_subblock][header.lower()] = current_table_data[header]
else:
parameters[current_block][header.lower()] = current_table_data[header]
return parameters
[docs]def isabs(filename):
"""Checks if a filepath is absolute.
Note
----
This function is derived from :meth:`os.path.isabs`. The only difference is that it returns `True` if the filepth uses cdb alias names.
Parameters
----------
filename : str
Path to a file. Can be relateive, absolute, or use cdb aliases.
Returns
-------
bool
True if path is absolute or uses cdb aliases.
"""
if not isinstance(filename, str):
# If filname is not a string
is_abs = False
elif filename != get_full_path(filename):
# If filename uses cdb alias notation
is_abs = True
elif os.path.isabs(filename):
# If filename is an absolute path
is_abs = True
else:
# If filename is a relative
is_abs = False
return is_abs
[docs]def add_splines_to_adm(adm_file, splines):
"""Adds splines inputs to an Adams Drill dataset (.adm) file.
Adds a block titled SPLINES above the OUTPUT block of the ADM file. The SPLINES block contains a SPLINE statement for each of the four drilling parameters.
Example
-------
>>> time = [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]
>>> gpm = [0, 78.125, 250.0, 421.875, 500, 500, 500, 500, 500, 500, 500]
>>> rpm = [0, 0, 0, 0, 12, 37, 50, 50, 50, 50, 50]
>>> wob = [0, 0, 0, 0, 0, 0, 0, 10, 29, 40, 40]
>>> rop = [0, 0, 0, 0, 0, 0, 0, 25, 74, 100, 100]
>>> splines = {}
>>> splines['gpm'] = [time, gpm]
>>> splines['rpm'] = [time, rpm]
>>> splines['wob'] = [time, wob]
>>> splines['rop'] = [time, rop]
>>> adm_file = 'demo.adm'
>>> add_splines_to_adm(adm_file, splines)
Parameters
----------
adm_file : str
Filename of Adams Drill dataset (.adm) file
splines : dict
Dictionary containing the spline data for the four drilling parameters. See Example for dictionary makeup.
"""
# Create a block of code to add to the adm file
code_block = {}
for param in ADRILL_IDS:
code_block[param] = "!\n"
code_block[param] += "! adams_view_name='{}_Spline'\n".format(param.upper())
code_block[param] += "SPLINE/{:d}\n".format(ADRILL_IDS[param]['spline'])
code_block[param] += ", LINEAR_EXTRAPOLATE\n"
code_block[param] += ", X={}\n".format(str(splines['time']).replace('[','').replace(']',''))
val_string = ''
for val in splines[param]:
val_string += '{:1.2f},'.format(val)
val_string = val_string[:-1]
code_block[param] += ", Y={}\n".format(val_string)
# Add the block of code to a temporary adm file
with open(adm_file, 'r') as fid, open(adm_file.replace('.adm','.tmp'),'w') as fid_new:
insert_splines_here = False
for line in fid:
if insert_splines_here:
fid_new.write('!----------------------------------- SPLINES ------------------------------------\n')
for param in code_block:
fid_new.write(code_block[param])
fid_new.write("!\n")
fid_new.write("!----------------------------------- OUTPUT ------------------------------------\n")
fid_new.write("!\n")
insert_splines_here = False
elif '!----------------------------------- OUTPUT ------------------------------------' in line:
insert_splines_here = True
else:
fid_new.write(line)
# Replace the adm file with the temporary adm file
os.remove(adm_file)
os.rename(adm_file.replace('.adm','.tmp'), adm_file)
[docs]def add_splines_to_acf(acf_file):
"""Adds spline references to the input variable function statements in an Adams Drill Command (.acf) file.
Parameters
----------
acf_file : str
File name of an Adams Drill Command (.acf) file.
"""
# go through original acf and create a new acf
with open(acf_file,'r') as fid, open(acf_file.replace('.acf','.tmp'),'w') as fid_new:
modify = False
skip = False
variable_id = 0
for line in fid:
# For each line in the original acf file
if modify:
# If this is a line defining a drilling parameter
if variable_id == ADRILL_IDS['gpm']['variable']:
# If the variable id is the gpm id
new_line = ', FUNCTION = STEP(TIME, 0,0,1,1)*VARVAL(11021)*ABS(AKISPL(TIME,0,{:d}, 0))\n'.format(ADRILL_IDS['gpm']['spline'])
skip = True
elif variable_id == ADRILL_IDS['rop']['variable']:
# If the variable id is the rop id
new_line = ', FUNCTION = STEP(TIME, 0,0,1,1)*(AKISPL(TIME,0,{:d}, 0))/3600\n'.format(ADRILL_IDS['rop']['spline'])
elif variable_id == ADRILL_IDS['rpm']['variable']:
# If the variable id is the rpm id
new_line = ', FUNCTION = STEP(TIME, 0,0,1,1)*(AKISPL(TIME,0,{:d}, 0))*(PI/30)\n'.format(ADRILL_IDS['rpm']['spline'])
elif variable_id == ADRILL_IDS['wob']['variable']:
# If the variable id is the wob id
new_line = ', FUNCTION = STEP(TIME, 0,0,1,1)*(AKISPL(TIME,0,{:d}, 0))*1000\n'.format(ADRILL_IDS['wob']['spline'])
fid_new.write(new_line)
modify = False
elif skip:
# If this is a line that should be skipped
skip = False
if line.startswith('!'):
fid_new.write(line)
else:
# If this is a normal line
fid_new.write(line)
if 'VARIABLE' in line:
variable_id = int(line.split('/')[1].replace('&','').replace(' ',''))
modify = True
# Replace original acf file
os.remove(acf_file)
os.rename(acf_file.replace('.acf','.tmp'), acf_file)
[docs]def modify_acf_solver_settings(acf_file, statics=None, error=None):
"""Modifies the contents of the Adams Command (.acf) file given in `acf_file` to apply the static funnel given in `statics` and the dynamic steps given in `dynamics`.
Parameters
----------
acf_file : str
Path to Adams Command (.acf) file to be updated
statics : list
List of six lists where each list contains the values of a particular equilibrium setting at each step in the funnel. (Defaults to the value already in the acf file.)
error : float
Error tolerance to use for the dynamics solver. (Defaults to the value already in the acf file.)
Example
-------
The following example modifies example.acf to have a three-step static funnel and a dynamic solver error tolerance of 0.0001.
>>> maxit = [500, 500, 500]
>>> stab = [8.0, 1.0, .01]
>>> error = [1.0, 0.1, .01]
>>> imbal = [4.0, 2.0, .01]
>>> tlim = [4.0, 2.0, 1.0]
>>> alim = [4.0, 2.0, 1.0] # Assumes DEGREES
>>> statics = [maxit, stab, error, imbal, tlim, alim]
>>> modify_acf_solver_settings('example.acf', statics=statics, error=.0001)
"""
# Read the current acf text
with open(acf_file, 'r') as fid:
acf_text = fid.read()
# Replace the static funnel
if statics is not None:
if len(statics) != 6 or not all([len(statics[0])==len(param) for param in statics]):
raise Exception('The statics list does not have the correct shape.')
# Create the new funnel text
funnel = ''.join([f'equil/maxit={maxit}, stab={stab}, error={error}, imbal={imbal}, tlim={tlim}, alim={alim}D\nsim/stat\n' for maxit, stab, error, imbal, tlim, alim in zip(*statics)])
# Replace the funnel
acf_text = ACF_FUNNEL_PATTERN.sub('!\n!\n' + funnel, acf_text)
# Replace the dynamic error tolerance
if error is not None:
# current_integrator_block = re.match
error_text = f', ERROR = {error}'
acf_text = ACF_INTEGRATOR_ERROR_PATTERN.sub(r'\1\2' + error_text + r'\4', acf_text)
# Write a temporary acf file
with open(acf_file + '._tmp_', 'w') as fid:
fid.write(acf_text)
# Replace the existing acf file with the temporary
os.remove(acf_file)
os.rename(acf_file + '._tmp_', acf_file)
[docs]def get_motor_curve(stall_torque, max_torque, max_rpm, min_rpm, save_filename=None):
"""Returns data to generate a theoretical motor curve.
Parameters
----------
stall_torque : float
Torque at which the motor stalls (klbf-ft)
max_torque : float
Maximum operating torque (klbf-ft)
max_rpm : float
Maximum operating motor speed
min_rpm : float
Minimum operating motor speed
save_filename : bool, optional
If True, motor curve will be saved to file `filename`, by default None
Returns
-------
list
Rpm points in normal operating range
list
Rpm points in extended stall range range
list
Torque points in normal operating range
list
Torque points in extended stall range range
Example
-------
This example plots the motor curve with a dashed line for the extended stall range.
>>> import matplotlib.pyplot as plt
>>> stall_torque = 20
>>> max_torque = 15
>>> max_rpm = 100
>>> min_rpm = 80
>>> nor_rpm, esr_rpm, nor_torque, esr_torque = get_motor_curve(stall_torque, max_torque, max_rpm, min_rpm)
>>> plt.plot([rpm + min_rpm for rpm in nor_rpm], nor_torque)
>>> plt.plot(esr_rpm, esr_torque, linestyle='--')
>>> plt.show()
"""
free_rpm = max_rpm - min_rpm
a_nor = -max_torque/(free_rpm**2)
b_nor = 0
c_nor = max_torque
a_esr = (stall_torque - max_torque)/(min_rpm**2)
b_esr = 2*(max_torque - stall_torque)/min_rpm
c_esr = stall_torque
w_nor = np.arange(0,free_rpm+1,1)
w_esr = np.arange(0,min_rpm,1)
q_nor = a_nor*w_nor**2 + b_nor*w_nor + c_nor
q_esr = a_esr*w_esr**2 + b_esr*w_esr + c_esr
if save_filename is not None:
with open(save_filename,'w') as fid:
fid.write('RPM, Torque (klbf-ft)\n')
for rpm, torque in zip(w_esr, q_esr):
# For each rpm and torque datapoin in the extended stall range, write the data point
fid.write('{},{}\n'.format(rpm, torque))
for rpm, torque in zip(w_nor, q_nor):
# For each rpm and torque datapoin in the normal operating range, write the data point and shift the rpm value by the rpm at max torque
fid.write('{},{}\n'.format(rpm+min_rpm, torque))
return list(w_nor), list(w_esr), list(q_nor), list(q_esr)
[docs]def get_motor_coefficients(min_rpms, max_rpms, flows):
"""Returns the motor coefficients used in the Adams Drill motor model.
Parameters
----------
min_rpms : list
List of the minimum operating motor speeds for each flow rate.
max_rpms : list
List of the maximum operating motor speeds for each flow rate.
flows : list
List of corresponding flow rates
Returns
-------
float
NOR Max (No Load) RPM A
float
NOR Max (No Load) RPM B
float
NOR Min (Max dP) RPM A
float
NOR Min (Max dP) RPM A
"""
[nor_min_rpm_a, nor_min_rpm_b] = np.polyfit(flows, min_rpms, 1)
[nor_max_rpm_a, nor_max_rpm_b] = np.polyfit(flows, max_rpms, 1)
return nor_max_rpm_a, nor_max_rpm_b, nor_min_rpm_a, nor_min_rpm_b
[docs]class MotorModel():
def __init__(self, min_rpms, max_rpms, flows, stall_torque, max_torque):
"""Initializes an Adams Motor Model
Parameters
----------
min_rpms : list
List of the minimum operating motor speeds for each flow rate.
max_rpms : list
List of the maximum operating motor speeds for each flow rate.
flows : list
List of corresponding flow rates
stall_torque : float
Torque at which the motor stalls (klbf-ft)
max_torque : float
Maximum operating torque (klbf-ft)
"""
nor_max_rpm_a, nor_max_rpm_b, nor_min_rpm_a, nor_min_rpm_b = get_motor_coefficients(min_rpms, max_rpms, flows)
self.nor_max_rpm_a = nor_max_rpm_a
self.nor_max_rpm_b = nor_max_rpm_b
self.nor_min_rpm_a = nor_min_rpm_a
self.nor_min_rpm_b = nor_min_rpm_b
self.max_torque = max_torque
self.stall_torque = stall_torque
[docs] def degrade_motor(self, degradation):
"""Degrades the motor by `degradation` (in place)
Parameters
----------
degradation : float
Motor degradation factor
"""
self.nor_max_rpm_a *= degradation
self.nor_max_rpm_b *= degradation
self.nor_min_rpm_a *= degradation
self.nor_min_rpm_b *= degradation
self.max_torque *= degradation
self.stall_torque *= degradation
[docs] def get_torque(self, rpm, flow, degradation=1):
"""Gets the motor output torque given a motor rpm and flow rate.
Parameters
----------
rpm : float
Motor rpm
flow : float
Motor flow rate
degradation : float
Motor degradation factor (default is 1 which means there is no degradation)
Returns
-------
float
Motor output torque
Raises
------
MotorModelError
Raised if the rpm given is greater than the motors maximum output rpm.
"""
nor_min_rpm_a = self.nor_min_rpm_a*degradation
nor_min_rpm_b = self.nor_min_rpm_b*degradation
nor_max_rpm_a = self.nor_max_rpm_a*degradation
nor_max_rpm_b = self.nor_max_rpm_b*degradation
max_torque = self.max_torque*degradation
stall_torque = self.stall_torque*degradation
min_rpm = np.polyval([nor_min_rpm_a, nor_min_rpm_b], flow)*degradation
max_rpm = np.polyval([nor_max_rpm_a, nor_max_rpm_b], flow)*degradation
free_rpm = max_rpm - min_rpm
if rpm < min_rpm:
a_coef = -max_torque/(free_rpm**2)
b_coef = 0
c_coef = max_torque
elif rpm > max_rpm:
raise MotorModelError(f'The rpm provided is greater than the max rpm of {max_rpm}')
else:
a_coef = (stall_torque - max_torque)/(min_rpm**2)
b_coef = 2*(max_torque - stall_torque)/min_rpm
c_coef = stall_torque
return a_coef*rpm**2 + b_coef*rpm + c_coef
[docs] def get_motor_curve(self, flow):
"""Returns data t`o generate a theoretical motor curve.
Parameters
----------
flow : float
Flow rate at which to generate the motor curve
Returns
-------
list
Rpm points in normal operating range
list
Rpm points in extended stall range range
list
Torque points in normal operating range
list
Torque points in extended stall range range
Example
-------
This example plots the motor curve with a dashed line for the extended stall range.
>>> import matplotlib.pyplot as plt
>>> stall_torque = 20
>>> max_torque = 15
>>> max_rpms = [80, 100, 120]
>>> min_rpm = [50, 70, 90]
>>> flows = [400, 500, 600]
>>> motor_model = MotorModel(min_rpms, max_rpms, flows, stall_torque, max_torque)
>>> nor_rpm, esr_rpm, nor_torque, esr_torque = motor_model.get_motor_curve(550)
>>> plt.plot([rpm + min_rpm for rpm in nor_rpm], nor_torque)
>>> plt.plot(esr_rpm, esr_torque, linestyle='--')
>>> plt.show()
"""
min_rpm = np.polyval([self.nor_min_rpm_a, self.nor_min_rpm_b], flow)
max_rpm = np.polyval([self.nor_max_rpm_a, self.nor_max_rpm_b], flow)
nor_rpm, esr_rpm, nor_torque, esr_torque = get_motor_curve(self.stall_torque, self.max_torque, max_rpm, min_rpm)
return min_rpm, max_rpm, nor_rpm, esr_rpm, nor_torque, esr_torque
[docs] @classmethod
def build_from_coeffs(cls, nor_max_rpm_a, nor_max_rpm_b, nor_min_rpm_a, nor_min_rpm_b, max_torque, stall_torque):
"""Used to build a motor model if you already know the motor coefficients.
Parameters
----------
nor_max_rpm_a : float
nor_max_rpm_a
nor_max_rpm_b : float
nor_max_rpm_b
nor_min_rpm_a : float
nor_min_rpm_a
nor_min_rpm_b : float
nor_min_rpm_b
max_torque : float
max_torque
stall_torque : float
stall_torque
Returns
-------
MotorModel
A :obj:`MotorModel` object
"""
motor_model = cls([0, 1], [0, 1], [1, 2], stall_torque, max_torque)
motor_model.nor_max_rpm_a = nor_max_rpm_a
motor_model.nor_max_rpm_b = nor_max_rpm_b
motor_model.nor_min_rpm_a = nor_min_rpm_a
motor_model.nor_min_rpm_b = nor_min_rpm_b
return motor_model
[docs]class MotorModelError(Exception):
pass
[docs]class TiemOrbitSyntaxError(Exception):
pass
[docs]class cdbError(Exception):
pass