Commit 387c14f6 authored by Maiken Pedersen's avatar Maiken Pedersen
Browse files

First commit of Andrejs arc-exporter - this in python2 version

parent 6a75796a
# arc-exporter
ARC exporter by Andrej Filipcic
- hosts: all
become: true
vars:
bin_path: "/usr/local/bin"
arc_exporter_user: "arc_exporter"
roles:
- arc-exporter
#!/usr/bin/env python
from prometheus_client import start_http_server, Summary, Gauge, Counter
from dateutil.parser import parse
import sys
import time
import os
import re
import getopt
from datetime import datetime
from urlparse import urlparse
import subprocess
from collections import Counter
from prometheus_client.core import GaugeMetricFamily, CounterMetricFamily, REGISTRY
from arc.utils import config
config.parse_arc_conf()
ctrldir = ''
try:
ctrldir = config.get_config_dict()['arex']['controldir']
arexLogFile = config.get_config_dict()['arex']['logfile']
arexOn = True
except:
arexOn = False
if os.path.exists(ctrldir+'/dtr.state'):
arexOn = True
else:
arexOn = False
try:
deliveryLogFile = config.get_config_dict()['datadelivery-service']['logfile']
dtrOn = True
except:
dtrOn = False
try:
cdeliveryLogFile = config.get_config_dict()['arex/data-staging']['logfile']
cdtrOn = True
dtrs = {}
except:
cdtrOn = False
#if os.path.exists(deliveryLogFile):
# dtrOn = True
#else:
# dtrOn = Flase
class LogReader:
"Log reader helper, starts with current logfile and it follows logrotate"
def __init__(self,file):
self.file=file
self.f=open(file,'r')
def read(self):
while True:
try:
line = self.f.readline()
except:
yield None
if not line:
if (self.f.tell() > os.path.getsize(self.file)):
self.f.close()
self.f=open(self.file,'r')
yield None
yield line
class DTRCollector(object):
# Custom collector for DTR where metrics need to be reset at each cycle
def collect(self):
c = GaugeMetricFamily('arc_transfers', 'ARC Transfers', labels=['state','share','host'])
out,err = subprocess.Popen(['cat', ctrldir+'/dtr.state'],stdout=subprocess.PIPE).communicate()
d=[]
for i in out.split('\n'):
j = i.split()
if(len(j)) < 2:
continue
state = j[1]
share = j[3]
host=""
if (len(j)) == 5:
host='local'
if (len(j)) == 6:
host=j[5]
d.append((state,share,host))
for i in Counter(d).items():
c.add_metric(list(i[0]), i[1])
yield c
if arexOn:
REGISTRY.register(DTRCollector())
# Job state snapshot from gm-jobs
jobs = Gauge('arc_jobs','ARC Jobs', [ 'state'])
# Job state counter from arex.log
jstats = Gauge('arc_job_count','ARC Jobs', [ 'state'])
# gm=heartbeat time stamp
gmtime = Gauge('arc_arex_heartbeat','AREX Heartbeart')
arexlog=LogReader(arexLogFile)
if dtrOn:
dstats = Gauge('arc_delivery_count','ARC Data Delivery count')
deliveryLog=LogReader(deliveryLogFile)
if cdtrOn:
cdstats={}
cdstats['files'] = Gauge('arc_central_staging_files','ARC Central Data Staging Files', ['type','cached','domain'])
cdstats['size'] = Gauge('arc_central_staging_size','ARC Central Data Staging Size', ['type','cached','domain'])
cdeliveryLog=LogReader(cdeliveryLogFile)
def getArcStats():
if arexOn:
# jobs
out,err = subprocess.Popen(['arcctl', '-d', 'CRITICAL', 'job','stats'],stdout=subprocess.PIPE).communicate()
for i in out.split('\n'):
j = i.split()
if (len(j) > 1) :
m = j[0][:-1]
n = j[1]
jobs.labels(m).set(n)
# arex log
states={}
for line in arexlog.read():
if line is None:
break
j = line.split()
if (len(j)<8):
continue
if "Job failure detected" in line:
jstats.labels("FAILED").inc()
if (j[6] != 'State:'):
continue
s = j[7]
jstats.labels(s).inc()
# gm heartbeat
mtime=os.stat(ctrldir+'/gm-heartbeat').st_mtime
gmtime.set(time.time()-mtime)
if dtrOn:
# delivery log
states={}
for line in deliveryLog.read():
if line is None:
break
j = line.split()
if (len(j)<11):
continue
if (j[11] == 'TRANSFERRED'):
dstats.inc()
if cdtrOn:
# central staging:
for line in cdeliveryLog.read():
if line is None:
break
j=line.split()
if len(j) < 6:
continue
if "Scheduler received new DTR" in line:
dtr=j[5]
dtrs[dtr]={}
d=dtrs[dtr]
d['start'] = j[0][1:] + ' ' + j[1][:-1]
d['src'] = j[13]
d['dst'] = j[15]
d['cached'] = False
dtr = j[5]
if dtr not in dtrs:
continue
d=dtrs[dtr]
if "Delivery received new DTR" in line:
d['tsrc'] = j[13]
if len(j) > 11 and j[6] == "DataDelivery:" and len(j) > 11 and j[7].isdigit():
d['time'] = j[7]
d['rate'] = j[11]
if "Transfer finished" in line:
d['size'] = int(j[8])
if "is cached" in line:
d['cached'] = True
if "Finished successfully" in line:
try:
d['end'] = j[0][1:] + ' ' + j[1][:-1]
#s = datetime.fromisoformat(d['start'])
#e = datetime.fromisoformat(d['end'])
s = parse(d['start'])
e = parse(d['end'])
d['duration']=(e-s).total_seconds()
if "file:/" in d['src']:
d['type'] = 'upload'
t = urlparse(d['dst'])
else:
d['type'] = 'download'
if 'tsrc' in d:
t = urlparse(d['tsrc'])
else:
t = urlparse(d['src'])
#print(d)
if t is not None:
#print (t)
d['domain'] = '.'.join(t.netloc.split('.')[1:]).split(":")[0]
dm = ''
if 'domain' in d:
dm=d['domain']
cdstats['files'].labels(d['type'],d['cached'],dm).inc()
if 'size' in d:
cdstats['size'].labels(d['type'],d['cached'],dm).inc(d['size'])
del dtrs[dtr]
except:
pass
def process_request(t):
"""A dummy function that takes some time."""
getArcStats()
time.sleep(t)
#def usage():
# print("""
# usage: arc-exporter.py [--port <port>] [--refresh <seconds>]
# [-f space_command] [ -c <arex_config_file> | <dir1> [<dir2> [...]] ]
# --help - This help
#
#""")
if __name__ == '__main__':
if len(sys.argv) == 3 and sys.argv[1]=="--port":
port = int(sys.argv[2])
else:
port = 9101
# Start up the server to expose the metrics.
start_http_server(port)
# Generate some requests.
while True:
process_request(10)
[Unit]
Description=ARC Prometheus Exporter
After=network.target
[Service]
User=arc_exporter
Group=arc_exporter
Type=simple
ExecStart=/usr/bin/python /usr/local/bin/arc-exporter
[Install]
WantedBy=multi-user.target
---
- name: Install prerquisites for arc-exporter
yum:
name: python-pip
- name: Install prerequisites for arc-exporter
pip:
name: ['prometheus_client','python-dateutil']
#name: ['prometheus_client','python-dateutil','python-urllib']
- name: Copy binary to bin-folder
copy:
src: arc-exporter
dest: "{{ bin_path }}/arc-exporter"
- name: Set 755 permissions on arc-exporter file
file:
name: "{{ bin_path }}/arc-exporter"
mode: '0755'
- name: Create arc_exporter user
user:
name: "{{ arc_exporter_user }}"
system: yes
shell: /bin/false
- name: Copy systemd service file
copy:
src: files/arc-exporter.service
dest: /etc/systemd/system/arc-exporter.service
owner: root
group: root
backup: yes
mode: '0644'
- name: Reload systemd daemon
systemd:
daemon_reload: yes
- name: Start arc-exporter
systemd:
name: arc-exporter
state: started
- name: Enable arc-exporter
systemd:
name: arc-exporter
enabled: yes
- name: Check status of arc-exporter
systemd:
name: arc-exporter
...
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment