Commit 292424c6 authored by Andrii Salnikov's avatar Andrii Salnikov

arcctl job stats interface

parent 66f369cc
......@@ -375,6 +375,7 @@ RTE_stage0 () {
# run RTE stage 0
# WARNING!!! IN SOME CASES DUE TO DIRECT SOURCING OF RTE SCRIPT WITHOUT ANY SAFETY CHECKS
# SPECIALLY CRAFTED RTES CAN BROKE CORRECT SUBMISSION (e.g. RTE redefine 'rte_idx' variable)
[ -n "${rte_params_path}" ] && source "${rte_params_path}" 1>&2
sourcewithargs "$rte_path" $args_value 1>&2
rte0_exitcode=$?
if [ $rte0_exitcode -ne 0 ] ; then
......
......@@ -36,7 +36,7 @@ class JobsControl(ComponentControl):
self.logger.critical('Jobs control cannot work without controldir.')
sys.exit(1)
self.logger.debug('Using controldir location: %s', self.control_dir)
self.cache_ttl = 60
self.cache_ttl = 30
self.jobs = {}
def __get_config_value(self, block, option, default_value=None):
......@@ -48,6 +48,7 @@ class JobsControl(ComponentControl):
@staticmethod
def __run_gmjobs(args, stderr=False):
__GMJOBS = [ARC_LIBEXEC_DIR + '/gm-jobs']
__GMJOBS = ['ssh', 'arc6.grid.org.ua'] + __GMJOBS
loglevel = logging.getLogger('ARCCTL').getEffectiveLevel()
__GMJOBS += ['-x', {50: 'FATAL', 40: 'ERROR', 30: 'WARNING', 20: 'INFO', 10: 'DEBUG'}[loglevel]]
if stderr:
......@@ -132,6 +133,9 @@ class JobsControl(ComponentControl):
jobre = __JOB_RE.match(line)
if jobre:
if job_dict:
for attr in __JOB_ATTRS.keys():
if attr not in job_dict:
job_dict[attr] = 'N/A'
self.jobs[job_dict['id']] = job_dict
job_dict = {'id': jobre.group(1)}
continue
......@@ -140,6 +144,9 @@ class JobsControl(ComponentControl):
if attr_re:
job_dict[attr] = attr_re.group(1)
if job_dict:
for attr in __JOB_ATTRS.keys():
if attr not in job_dict:
job_dict[attr] = 'N/A'
self.jobs[job_dict['id']] = job_dict
# dump jobs dictionary to cache
with open(__cache_file, 'wb') as cfd:
......@@ -242,6 +249,61 @@ class JobsControl(ComponentControl):
for k, v in job_attrs.iteritems():
print '{:<32}: {}'.format(k, v)
def job_stats(self, args):
__RE_JOBSTATES = re.compile(r'^\s*([A-Z]+):\s+([0-9]+)\s+\(([0-9]+)\)\s*$')
__RE_TOTALSTATS = re.compile(r'^\s*([A-Za-z]+):\s+([0-9]+)/([-0-9]+)\s*$')
__RE_DATASTATS = re.compile(r'^\s*Processing:\s+([0-9]+)\+([0-9]+)\s*$')
jobstates = []
totalstats = []
data_download = 0
data_upload = 0
# collect information from gm-jobs
gmjobs_out = self.__run_gmjobs('-J')
for line in iter(gmjobs_out.stdout.readline, ''):
js_re = __RE_JOBSTATES.match(line)
if js_re:
state = js_re.group(1)
jobstates.append({'processing': js_re.group(2), 'waiting': js_re.group(3), 'state': state})
continue
t_re = __RE_TOTALSTATS.match(line)
if t_re:
limit = t_re.group(3)
if limit == '-1':
limit = 'unlimited'
totalstats.append({'jobs': t_re.group(2), 'limit': limit, 'state': t_re.group(1)})
continue
ds_re = __RE_DATASTATS.match(line)
if ds_re:
data_download = ds_re.group(1)
data_upload = ds_re.group(2)
# show general stats per-state
if not args.no_states:
for s in jobstates:
if args.long:
print '{state}\n Processing: {processing:>10}\n Waiting: {waiting:>10}'.format(**s)
else:
print '{state:>11}: {processing:>8} ({waiting})'.format(**s)
# show total stats if requested
if args.total:
for t in totalstats:
if args.long:
if t['state'] == 'Accepted':
t['state'] = 'Total number of jobs accepted for further processing by A-REX'
elif t['state'] == 'Running':
t['state'] = 'Total number of jobs running in LRMS backend'
elif t['state'] == 'Total':
t['state'] = 'Total number of jobs managed by A-REX (including completed)'
print '{state}\n Jobs: {jobs:>15}\n Limit: {limit:>15}'.format(**t)
else:
print '{state:>11}: {jobs:>8} of {limit}'.format(**t)
if args.data_staging:
if args.long:
print 'Processing jobs in data-staging:'
print ' Downloading: {:>9}'.format(data_download)
print ' Uploading: {:>9}'.format(data_upload)
else:
print ' Processing: {:>8} + {}'.format(data_download, data_upload)
def control(self, args):
self.cache_ttl = args.cachettl
if args.action == 'list':
......@@ -263,6 +325,8 @@ class JobsControl(ComponentControl):
self.job_log(args)
elif args.action == 'attr':
self.job_getattr(args)
elif args.action == 'stats':
self.job_stats(args)
def complete_owner(self, args):
owners = []
......@@ -284,7 +348,7 @@ class JobsControl(ComponentControl):
__JOB_STATES = ['ACCEPTED', 'PREPARING', 'SUBMIT', 'INLRMS', 'FINISHING', 'FINISHED', 'DELETED', 'CANCELING']
jobs_ctl = root_parser.add_parser('job', help='A-REX Jobs')
jobs_ctl.add_argument('-t', '--cachettl', action='store', type=int, default=60,
jobs_ctl.add_argument('-t', '--cachettl', action='store', type=int, default=30,
help='GM-Jobs output caching validity in seconds (default is %(default)s)')
jobs_ctl.set_defaults(handler_class=JobsControl)
......@@ -322,3 +386,9 @@ class JobsControl(ComponentControl):
jobs_cleanall = jobs_actions.add_parser('cleanall', help='Clean all jobs')
jobs_cleanall.add_argument('-s', '--state', help='Filter jobs by state', action='append', choices=__JOB_STATES)
jobs_cleanall.add_argument('-o', '--owner', help='Filter jobs by owner').completer = complete_job_owner
jobs_stats = jobs_actions.add_parser('stats', help='Show jobs statistics')
jobs_stats.add_argument('-S', '--no-states', help='Do not show per-state job stats', action='store_true')
jobs_stats.add_argument('-t', '--total', help='Show server total stats', action='store_true')
jobs_stats.add_argument('-d', '--data-staging', help='Show server datastaging stats', action='store_true')
jobs_stats.add_argument('-l', '--long', help='Detailed output of stats', action='store_true')
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment