Commit 66f369cc authored by Maiken's avatar Maiken

Merge branch 'RTEs' into 'master'

More RTEs handling improvemets

See merge request nordugrid/arc!140
parents c60eed25 a25d6465
......@@ -119,21 +119,37 @@ def get_rte_path(sw):
rte_path = None
if os.path.exists('%s/rte/enabled/%s' % (Config.controldir, sw)):
rte_path = '%s/rte/enabled/%s' % (Config.controldir, sw)
elif os.path.exists('%s/rte/default/%s' % (Config.controldir, sw)):
rte_path = '%s/rte/default/%s' % (Config.controldir, sw)
else:
warn('Requested RunTimeEnvironment %s is missing or not enabled.' % (sw), 'common.submit')
return rte_path
def get_rte_params_path(sw):
rte_params_path = None
if os.path.exists('%s/rte/params/%s' % (Config.controldir, sw)):
rte_params_path = '%s/rte/params/%s' % (Config.controldir, sw)
return rte_params_path
# include optional params file to the RTE file content
def get_rte_content(sw):
rte_path = get_rte_path(sw)
if not rte_path:
return ""
rte_params_path = get_rte_params_path(sw)
rte_content = ''
if rte_params_path:
try:
with open(rte_params_path, 'r') as f:
rte_content += f.read()
except IOError:
pass
try:
with open(rte_path, 'r') as f:
return f.read()
except:
return ""
rte_content += f.read()
except IOError:
pass
return rte_content
# Limitation: In RTE stage 0, scripts MUST use the 'export' command if any
......@@ -304,9 +320,7 @@ class JobscriptAssembler(object):
"""
return self._stubs.get(stub, '')
def __init__(self, jobdesc):
self.jobdesc = jobdesc
self._stubs = {}
self._ignore = []
......@@ -359,14 +373,12 @@ class JobscriptAssembler(object):
self._setup_runtime_env()
self._parse()
def __getitem__(self, item):
return self.map.get(item, '')
def __setitem__(self, item, value):
self.map[item] = value
def _parse(self):
PARSE = 0
......
#@IgnoreInspection BashAddShebang
######################################################
#Common functions for submit scripts
######################################################
......@@ -278,10 +279,17 @@ RTE_include_default () {
}
RTE_path_set () {
rte_params_path="${CONFIG_controldir}/rte/params/${rte_name}"
if [ ! -f "$rte_params_path" ]; then
rte_params_path=""
fi
rte_path="${CONFIG_controldir}/rte/enabled/${rte_name}"
if [ ! -f "$rte_path" ]; then
echo "ERROR: Requested RunTimeEnvironment ${rte_name} is missing or not enabled." 1>&2
exit 1
rte_path="${CONFIG_controldir}/rte/default/${rte_name}"
if [ ! -f "$rte_path" ]; then
echo "ERROR: Requested RunTimeEnvironment ${rte_name} is missing or not enabled." 1>&2
exit 1
fi
fi
}
......@@ -310,6 +318,7 @@ RTE_to_jobscript () {
# add RTE script content as a function into the job script
echo "# RunTimeEnvironment function for ${rte_name}:" >> $LRMS_JOB_SCRIPT
echo "RTE_function_${rte_idx} () {" >> $LRMS_JOB_SCRIPT
[ -n "${rte_params_path}" ] && cat "${rte_params_path}" >> $LRMS_JOB_SCRIPT
cat "${rte_path}" >> $LRMS_JOB_SCRIPT
echo "}" >> $LRMS_JOB_SCRIPT
# next RTE
......
# description: copy proxy certificate to the job session directory
# param:COPY_CACERT_DIR:Yes,No:Yes:If set to Yes, CA certificate dir will be copied to the session directory along with proxy certificate
COPY_CACERT_DIR="${COPY_CACERT_DIR:-Yes}"
X509_CERT_DIR="${X509_CERT_DIR:-/etc/grid-security/certificates}"
if [ "x$1" = "x0" ]; then
mkdir -pv ${joboption_directory}/arc/certificates/
cp -rv ${X509_CERT_DIR}/* ${joboption_directory}/arc/certificates/
if [ "x$COPY_CACERT_DIR" = "xYes" ]; then
mkdir -pv ${joboption_directory}/arc/certificates/
cp -rv ${X509_CERT_DIR}/* ${joboption_directory}/arc/certificates/
fi
GM_JOBS="${ARC_LOCATION:-@prefix@}/@pkglibexecsubdir@/gm-jobs"
$GM_JOBS -J -S -D ${joboption_gridid} -o "${joboption_directory}/user.proxy"
if [ $? != 0 ]; then
......@@ -14,5 +18,7 @@ if [ "x$1" = "x0" ]; then
elif [ "x$1" = "x1" ]; then
export X509_USER_PROXY="${RUNTIME_JOB_DIR}/user.proxy"
export X509_USER_CERT="${RUNTIME_JOB_DIR}/user.proxy"
export X509_CERT_DIR="${RUNTIME_JOB_DIR}/arc/certificates"
if [ "x$COPY_CACERT_DIR" = "xYes" ]; then
export X509_CERT_DIR="${RUNTIME_JOB_DIR}/arc/certificates"
fi
fi
import os
import logging
from arc.utils import config
logger = logging.getLogger('ARCCTL.Common')
......@@ -20,6 +21,15 @@ except ImportError:
ARC_DATA_DIR = ARC_LOCATION + '/share/arc/'
def get_parsed_arcconf(config_file):
try:
config.parse_arc_conf(config_file)
arcconf = config
except IOError:
arcconf = None
return arcconf
class ComponentControl(object):
""" Common abstract class to ensure all implicit calls to methods are defined """
def control(self, args):
......
from ControlCommon import *
from arc.utils import config
import subprocess
import sys
import re
......@@ -8,20 +7,12 @@ import time
def complete_job_owner(prefix, parsed_args, **kwargs):
try:
config.parse_arc_conf(parsed_args.config)
arcconf = config
except IOError:
arcconf = None
arcconf = get_parsed_arcconf(parsed_args.config)
return JobsControl(arcconf).complete_owner(parsed_args)
def complete_job_id(prefix, parsed_args, **kwargs):
try:
config.parse_arc_conf(parsed_args.config)
arcconf = config
except IOError:
arcconf = None
arcconf = get_parsed_arcconf(parsed_args.config)
return JobsControl(arcconf).complete_job(parsed_args)
......@@ -173,9 +164,10 @@ class JobsControl(ComponentControl):
def kill_or_clean(self, args, action='-k'):
self.__get_jobs()
self.__job_exists(args.jobid)
for jobid in args.jobid:
self.__job_exists(jobid)
__JOB_RE = re.compile(r'^Job:\s*')
gmjobs_out = self.__run_gmjobs('-J -S ' + action + ' ' + args.jobid)
gmjobs_out = self.__run_gmjobs('-J -S ' + action + ' ' + ' '.join(args.jobid))
for line in iter(gmjobs_out.stdout.readline, ''):
if __JOB_RE.match(line):
sys.stdout.write(line)
......@@ -310,9 +302,6 @@ class JobsControl(ComponentControl):
jobs_log.add_argument('-s', '--service', help='Show ARC CE logs containing the jobID instead of job log',
action='store_true')
jobs_kill = jobs_actions.add_parser('kill', help='Cancel job')
jobs_kill.add_argument('jobid', help='Job ID').completer = complete_job_id
jobs_info = jobs_actions.add_parser('info', help='Show job main info')
jobs_info.add_argument('jobid', help='Job ID').completer = complete_job_id
......@@ -320,12 +309,15 @@ class JobsControl(ComponentControl):
jobs_attr.add_argument('jobid', help='Job ID').completer = complete_job_id
jobs_attr.add_argument('attr', help='Attribute name', nargs='?')
jobs_kill = jobs_actions.add_parser('kill', help='Cancel job')
jobs_kill.add_argument('jobid', nargs='*', help='Job ID').completer = complete_job_id
jobs_killall = jobs_actions.add_parser('killall', help='Cancel all jobs')
jobs_killall.add_argument('-s', '--state', help='Filter jobs by state', action='append', choices=__JOB_STATES)
jobs_killall.add_argument('-o', '--owner', help='Filter jobs by owner').completer = complete_job_owner
jobs_clean = jobs_actions.add_parser('clean', help='Clean job')
jobs_clean.add_argument('jobid', help='Job ID').completer = complete_job_id
jobs_clean.add_argument('jobid', nargs='*', help='Job ID').completer = complete_job_id
jobs_cleanall = jobs_actions.add_parser('cleanall', help='Clean all jobs')
jobs_cleanall.add_argument('-s', '--state', help='Filter jobs by state', action='append', choices=__JOB_STATES)
......
from ControlCommon import *
import sys
import re
from arc.utils import config
import fnmatch
def complete_rte_name(prefix, parsed_args, **kwargs):
try:
config.parse_arc_conf(parsed_args.config)
arcconf = config
except IOError:
arcconf = None
arcconf = get_parsed_arcconf(parsed_args.config)
if parsed_args.action == 'enable':
return RTEControl(arcconf).complete_enable()
if parsed_args.action == 'default':
......@@ -18,6 +14,22 @@ def complete_rte_name(prefix, parsed_args, **kwargs):
return RTEControl(arcconf).complete_disable()
if parsed_args.action == 'undefault':
return RTEControl(arcconf).complete_undefault()
if parsed_args.action == 'cat':
return RTEControl(arcconf).complete_all()
if parsed_args.action == 'params-get':
return RTEControl(arcconf).complete_all()
if parsed_args.action == 'params-set':
return RTEControl(arcconf).complete_all()
def complete_rte_params(prefix, parsed_args, **kwargs):
arcconf = get_parsed_arcconf(parsed_args.config)
return RTEControl(arcconf).complete_params(parsed_args.rte)
def complete_rte_params_values(prefix, parsed_args, **kwargs):
arcconf = get_parsed_arcconf(parsed_args.config)
return RTEControl(arcconf).complete_params_values(parsed_args.rte, parsed_args.parameter)
class RTEControl(ComponentControl):
......@@ -26,11 +38,17 @@ class RTEControl(ComponentControl):
if arcconfig is None:
self.logger.critical('Controlling RunTime Environments is not possible without arc.conf defined controldir')
sys.exit(1)
# define directories
self.control_rte_dir = arcconfig.get_value('controldir', 'arex').rstrip('/') + '/rte'
self.system_rte_dir = ARC_DATA_DIR.rstrip('/') + '/rte'
self.user_rte_dirs = arcconfig.get_value('runtimedir', 'arex', force_list=True)
if self.user_rte_dirs is None:
self.user_rte_dirs = []
# define internal structures to hold RTEs
self.all_rtes = {}
self.system_rtes = {}
self.user_rtes = {}
@staticmethod
def __get_dir_rtes(rtedir):
......@@ -68,7 +86,11 @@ class RTEControl(ComponentControl):
break
return description
def __get_rtes(self):
def __fetch_rtes(self):
"""Look for RTEs on the filesystem and fill the object structures"""
# run once per tool invocation
if self.all_rtes:
return
# available pre-installed RTEs
self.logger.debug('Indexing ARC defined RTEs from %s', self.system_rte_dir)
self.system_rtes = self.__get_dir_rtes(self.system_rte_dir)
......@@ -76,7 +98,6 @@ class RTEControl(ComponentControl):
self.logger.info('There are no RTEs found in ARC defined location %s', self.system_rte_dir)
# RTEs in user-defined locations
self.user_rtes = {}
for urte in self.user_rte_dirs:
self.logger.debug('Indexing user-defined RTEs from %s', urte)
rtes = self.__get_dir_rtes(urte)
......@@ -85,7 +106,6 @@ class RTEControl(ComponentControl):
self.user_rtes.update(rtes)
# all available RTEs
self.all_rtes = {}
self.all_rtes.update(self.system_rtes)
self.all_rtes.update(self.user_rtes)
......@@ -97,6 +117,39 @@ class RTEControl(ComponentControl):
self.logger.debug('Indexing default RTEs in %s', self.control_rte_dir + '/default')
self.default_rtes = self.__get_dir_rtes(self.control_rte_dir + '/default')
def __get_rte_file(self, rte):
self.__fetch_rtes()
if rte not in self.all_rtes:
self.logger.error('There is no %s RunTimeEnvironment found', rte)
sys.exit(1)
rte_file = self.all_rtes[rte]
return rte_file
def __get_rte_params_file(self, rte):
rte_params_path = self.control_rte_dir + '/params/'
rte_params_file = rte_params_path + rte
if os.path.exists(rte_params_file):
return rte_params_file
return None
def __get_rte_list(self, rtes):
rte_list = []
for r in rtes:
if r in self.all_rtes:
# filename match goes directly to list
rte_list.append(r)
else:
# check for glob wildcards in rte name
for irte in self.all_rtes.keys():
if fnmatch.fnmatch(irte, r):
self.logger.debug('Glob wildcard match for %s RTE, adding to the list.', irte)
rte_list.append(irte)
if not rte_list:
self.logger.error('Failed to find requested RunTimeEnvironment(s). '
'No RTEs that match \'%s\' are available.', r)
sys.exit(1)
return rte_list
def __list_brief(self):
for rte_type, rte_dict in [('system', self.system_rtes), ('user', self.user_rtes)]:
for rte, link in rte_dict.iteritems():
......@@ -143,7 +196,7 @@ class RTEControl(ComponentControl):
self.__list_rte(self.default_rtes, True, prefix='\t')
def list(self, args):
self.__get_rtes()
self.__fetch_rtes()
if args.enabled:
self.__list_rte(self.enabled_rtes, args.long)
elif args.default:
......@@ -160,18 +213,120 @@ class RTEControl(ComponentControl):
else:
self.__list_brief()
def enable(self, rte, force=False, rtetype='enabled'):
self.__get_rtes()
def __params_parse(self, rte):
rte_file = self.__get_rte_file(rte)
param_str = re.compile(r'#\s*param:([^:]+):([^:]+):([^:]+):(.*)$')
params = {}
with open(rte_file) as rte_f:
max_lines = 20
for line in rte_f:
param_re = param_str.match(line)
if param_re:
pname = param_re.group(1)
params[pname] = {
'name': pname,
'allowed_string': param_re.group(2),
'allowed_values': param_re.group(2).split(','),
'value': param_re.group(3),
'description': param_re.group(4)
}
params_defined = self.__params_read(rte)
if pname in params_defined:
params[pname]['value'] = params_defined[pname]
max_lines -= 1
if not max_lines:
break
return params
def __params_read(self, rte):
self.__fetch_rtes()
rte_params_file = self.__get_rte_params_file(rte)
params = {}
if rte_params_file:
kv_re = re.compile(r'^([^ =]+)="(.*)"\s*$')
with open(rte_params_file) as rte_parm_f:
for line in rte_parm_f:
kv = kv_re.match(line)
if kv:
params[kv.group(1)] = kv.group(2)
return params
def __params_write(self, rte, params):
rte_params_path = self.control_rte_dir + '/params/'
if not os.path.exists(rte_params_path):
self.logger.debug('Making control directory %s for RunTimeEnvironments parameters', rte_params_path)
os.makedirs(rte_params_path, mode=0755)
rte_dir_path = rte_params_path + '/'.join(rte.split('/')[:-1])
if not os.path.exists(rte_dir_path):
self.logger.debug('Making RunTimeEnvironment directory structure inside controldir %s', rte_dir_path)
os.makedirs(rte_dir_path, mode=0755)
rte_params_file = rte_params_path + rte
with open(rte_params_file, 'w') as rte_parm_f:
for p in params.values():
rte_parm_f.write('{name}="{value}"\n'.format(**p))
def params_get(self, rte, long=False):
params = self.__params_parse(rte)
for pdescr in params.values():
if long:
print '{name:>16} = {value:8} {description} (allowed values are: {allowed_string})'.format(**pdescr)
else:
print '{name}={value}'.format(**pdescr)
def params_set(self, rte, parameter, value):
params = self.__params_parse(rte)
if parameter not in params:
self.logger.error('There is no such parameter %s for RunTimeEnvironment %s', parameter, rte)
sys.exit(1)
if params[parameter]['allowed_string'] == 'string':
pass
elif params[parameter]['allowed_string'] == 'int':
if not re.match(r'[-0-9]+'):
self.logger.error('Parameter %s for RunTimeEnvironment %s should be integer', parameter, rte)
sys.exit(1)
elif value not in params[parameter]['allowed_values']:
self.logger.error('Parameter %s for RunTimeEnvironment %s should be one of %s',
parameter, rte, params[parameter]['allowed_string'])
sys.exit(1)
params[parameter]['value'] = value
self.__params_write(rte, params)
def cat_rte(self, rte):
rte_file = self.__get_rte_file(rte)
self.logger.info('Printing the content of %s RunTimeEnvironment from %s', rte, rte_file)
rte_params_file = self.__get_rte_params_file(rte)
if rte_params_file:
self.logger.info('Including the content of RunTimeEnvironment parameters file from %s', rte_params_file)
with open(rte_params_file) as rte_parm_f:
for line in rte_parm_f:
sys.stdout.write(line)
with open(rte_file, 'r') as rte_fd:
for line in rte_fd:
sys.stdout.write(line)
sys.stdout.flush()
def enable(self, rtes_def, force=False, rtetype='enabled'):
"""
Entry point for enable operation.
RTE definition can be glob wildcarded RTE name.
"""
self.__fetch_rtes()
for rte in self.__get_rte_list(rtes_def):
self.enable_rte(rte, force, rtetype)
def enable_rte(self, rte, force=False, rtetype='enabled'):
"""
Enables single RTE by name
"""
self.__fetch_rtes()
rte_enable_path = self.control_rte_dir + '/' + rtetype + '/'
if not os.path.exists(rte_enable_path):
self.logger.debug('Making control directory %s for %s RunTimeEnvironments', rte_enable_path, rtetype)
os.makedirs(rte_enable_path, mode=0755)
if rte not in self.all_rtes:
self.logger.error('Failed to enable RunTimeEnvironment. No such RTE file \'%s\' available.', rte)
sys.exit(1)
rte_dir_path = rte_enable_path + '/'.join(rte.split('/')[:-1])
if not os.path.exists(rte_dir_path):
self.logger.debug('Making RunTimeEnvironment directory structure inside controldir %s', rte_dir_path)
......@@ -203,8 +358,20 @@ class RTEControl(ComponentControl):
self.logger.debug('Linking RunTimeEnvironment file %s to %s', self.all_rtes[rte], rte_enable_path)
os.symlink(self.all_rtes[rte], rte_enable_path + rte)
def disable(self, rte, rtetype='enabled'):
self.__get_rtes()
def disable(self, rte_def, rtetype='enabled'):
"""
Entry point for disable operation.
RTE definition can be glob wildcarded RTE name.
"""
self.__fetch_rtes()
for rte in self.__get_rte_list(rte_def):
self.disable_rte(rte, rtetype)
def disable_rte(self, rte, rtetype='enabled'):
"""
Disables single RTE by name
"""
self.__fetch_rtes()
enabled_rtes = {}
if rtetype == 'enabled':
......@@ -239,26 +406,47 @@ class RTEControl(ComponentControl):
self.disable(args.rte)
elif args.action == 'undefault':
self.disable(args.rte, 'default')
elif args.action == 'cat':
self.cat_rte(args.rte)
elif args.action == 'params-get':
self.params_get(args.rte, args.long)
elif args.action == 'params-set':
self.params_set(args.rte, args.parameter, args.value)
else:
self.logger.critical('Unsupported RunTimeEnvironment control action %s', args.action)
sys.exit(1)
def complete_enable(self):
self.__get_rtes()
self.__fetch_rtes()
return list(set(self.system_rtes.keys() + self.user_rtes.keys()) - set(self.enabled_rtes.keys()))
def complete_default(self):
self.__get_rtes()
self.__fetch_rtes()
return list(set(self.system_rtes.keys() + self.user_rtes.keys()) - set(self.default_rtes.keys()))
def complete_disable(self):
self.__get_rtes()
self.__fetch_rtes()
return self.enabled_rtes.keys()
def complete_undefault(self):
self.__get_rtes()
self.__fetch_rtes()
return self.default_rtes.keys()
def complete_all(self):
self.__fetch_rtes()
return self.all_rtes.keys()
def complete_params(self, rte):
self.__fetch_rtes()
return self.__params_parse(rte).keys()
def complete_params_values(self, rte, param):
self.__fetch_rtes()
param_options = self.__params_parse(rte)[param]
if param_options['allowed_string'] == 'string' or param_options['allowed_string'] == 'int':
return []
return param_options['allowed_values']
@staticmethod
def register_parser(root_parser):
rte_ctl = root_parser.add_parser('rte', help='RunTime Environments')
......@@ -267,11 +455,11 @@ class RTEControl(ComponentControl):
rte_actions = rte_ctl.add_subparsers(title='RunTime Environments Actions', dest='action',
metavar='ACTION', help='DESCRIPTION')
rte_enable = rte_actions.add_parser('enable', help='Enable RTE to be used by A-REX')
rte_enable.add_argument('rte', help='RTE name').completer = complete_rte_name
rte_enable.add_argument('rte', nargs='*', help='RTE name').completer = complete_rte_name
rte_enable.add_argument('-f', '--force', help='Force RTE enabling', action='store_true')
rte_disable = rte_actions.add_parser('disable', help='Disable RTE to be used by A-REX')
rte_disable.add_argument('rte', help='RTE name').completer = complete_rte_name
rte_disable.add_argument('rte', nargs='*', help='RTE name').completer = complete_rte_name
rte_list = rte_actions.add_parser('list', help='List RunTime Environments')
rte_list.add_argument('-l', '--long', help='Detailed listing of RTEs', action='store_true')
......@@ -283,8 +471,20 @@ class RTEControl(ComponentControl):
rte_list_types.add_argument('-u', '--user', help='List available user-defined RTEs', action='store_true')
rte_default = rte_actions.add_parser('default', help='Transparently use RTE for every A-REX job')
rte_default.add_argument('rte', help='RTE name').completer = complete_rte_name
rte_default.add_argument('rte', nargs='*', help='RTE name').completer = complete_rte_name
rte_default.add_argument('-f', '--force', help='Force RTE enabling', action='store_true')
rte_undefault = rte_actions.add_parser('undefault', help='Remove RTE from transparent A-REX usage')
rte_undefault.add_argument('rte', help='RTE name').completer = complete_rte_name
rte_undefault.add_argument('rte', nargs='*', help='RTE name').completer = complete_rte_name
rte_cat = rte_actions.add_parser('cat', help='Print the content of RTE file')
rte_cat.add_argument('rte', help='RTE name').completer = complete_rte_name
rte_params_get = rte_actions.add_parser('params-get', help='List configurable RTE parameters')
rte_params_get.add_argument('rte', help='RTE name').completer = complete_rte_name
rte_params_get.add_argument('-l', '--long', help='Detailed listing of RTEs', action='store_true')
rte_params_set = rte_actions.add_parser('params-set', help='List configurable RTE parameters')
rte_params_set.add_argument('rte', help='RTE name').completer = complete_rte_name
rte_params_set.add_argument('parameter', help='RTE parameter to configure').completer = complete_rte_params
rte_params_set.add_argument('value', help='RTE parameter value to set').completer = complete_rte_params_values
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment