Commit 2ff297c6 authored by Maiken's avatar Maiken
Browse files

Merge branch 'accounting_rereporting_fixes' into 'master'

Series of accounting fixes

See merge request nordugrid/arc!747
parents f217679e 69e57365
......@@ -71,8 +71,6 @@ debian/tmp/usr/share/man/man8/gm-*.8
debian/tmp/usr/lib/python?.?/site-packages/arc/ssm/__init__.py*
debian/tmp/usr/lib/python?.?/site-packages/arc/ssm/crypto.py*
debian/tmp/usr/lib/python?.?/site-packages/arc/ssm/ssm2.py*
debian/tmp/usr/lib/python?.?/site-packages/arc/ssm/brokers.py*
debian/tmp/usr/lib/python?.?/site-packages/arc/ssm/sender.cfg
debian/tmp/usr/lib/arc/arccandypond
debian/tmp/usr/share/arc/rte/ENV/LRMS-SCRATCH
......
......@@ -14,35 +14,50 @@
namespace ArcJura
{
// Construct APEL destination during republishing using provided URL and topic
ApelDestination::ApelDestination(std::string url_, std::string topic_):
logger(Arc::Logger::rootLogger, "JURA.ApelReReporter"),
rereport(true),
use_ssl("true"),
use_ssl(false),
urn(0),
sequence(0),
usagerecordset(Arc::NS("","http://eu-emi.eu/namespaces/2012/11/computerecord"),
"UsageRecords")
{
init(url_.substr(5), topic_, "", "", "", "");
// define APEL URL stripping 'APEL:' if present
std::string apel_url = url_;
if (url_.substr(0,5) == "APEL:") {
apel_url = url_.substr(5);
}
// check SSL url
if (apel_url.substr(0,5) == "https") {
use_ssl = true;
}
// NOTE that cert/key/cadir path will be read from environment variables or defaults are used
init(apel_url, topic_, "", "", "", "");
}
// Construct APEL destination during normal publishing cycle from joblog and arc.conf
ApelDestination::ApelDestination(JobLogFile& joblog, const Config::APEL &_conf):
logger(Arc::Logger::rootLogger, "JURA.ApelDestination"),
conf(_conf),
rereport(false),
use_ssl("false"),
use_ssl(false),
urn(0),
sequence(0),
usagerecordset(Arc::NS("","http://eu-emi.eu/namespaces/2012/11/computerecord"),
"UsageRecords")
{
init(joblog["loggerurl"].substr(5), joblog["topic"], joblog["outputdir"], joblog["certificate_path"], joblog["key_path"], joblog["ca_certificates_dir"]);
//From jobreport_options:
//Settings from arc.conf:
max_ur_set_size=conf.urbatchsize;
use_ssl=conf.use_ssl;
// WARNING: 'loggerurl' should contains 'APEL:' prefix.
// Jura adds this prefix when original A-REX joblogs are converted to per-destination joblogs in accordance to configuration in arc.conf
init(joblog["loggerurl"].substr(5), joblog["topic"], joblog["outputdir"], joblog["certificate_path"], joblog["key_path"], joblog["ca_certificates_dir"]);
}
void ApelDestination::init(std::string serviceurl_,std::string topic_, std::string outputdir_, std::string cert_, std::string key_, std::string ca_)
......@@ -85,7 +100,7 @@ namespace ArcJura
service_url = url;
if (url.Protocol()!="https")
{
logger.msg(Arc::ERROR, "Protocol is %s, should be https",
logger.msg(Arc::ERROR, "Protocol is %s. It is recommended to use secure connection with https.",
url.Protocol());
}
host=url.Host();
......@@ -97,7 +112,7 @@ namespace ArcJura
//read the previous aggregation records
if (!rereport)
aggregationManager = new CARAggregation(host,port,topic, true);
aggregationManager = new CARAggregation(host,port,topic,true,use_ssl);
//Get Batch Size:
//Default value:
......@@ -315,50 +330,27 @@ namespace ArcJura
);
}
int retval;
//ssmsend <hostname> <port> <topic> <key path> <cert path> <cadir path> <messages path> <use_ssl>"
std::string command;
std::vector<std::string> ssm_pathes;
std::string exec_cmd = "ssmsend";
//RedHat: /usr/libexec/arc/ssm_master
ssm_pathes.push_back("/usr/libexec/arc/"+exec_cmd);
ssm_pathes.push_back("/usr/local/libexec/arc/"+exec_cmd);
// Ubuntu/Debian: /usr/lib/arc/ssm_master
ssm_pathes.push_back("/usr/lib/arc/"+exec_cmd);
ssm_pathes.push_back("/usr/local/lib/arc/"+exec_cmd);
// If you don't use non-standard prefix for a compilation you will
// use this extra location.
std::ostringstream prefix;
prefix << INSTPREFIX << "/" << PKGLIBEXECSUBDIR << "/";
ssm_pathes.push_back(prefix.str()+exec_cmd);
// Find the location of the ssm_master
std::string ssm_command = "./ssm/"+exec_cmd;
for (int i=0; i<(int)ssm_pathes.size(); i++) {
std::ifstream ssmfile(ssm_pathes[i].c_str());
if (ssmfile) {
// The file exists,
ssm_command = ssm_pathes[i];
ssmfile.close();
break;
}
}
//ssmsend -H <hostname> -p <port> -t <topic> -k <key path> -c <cert path> -C <cadir path> -m <messages path> [--ssl]"
std::string command = INSTPREFIX "/" PKGLIBEXECSUBDIR "/ssmsend";
command = ssm_command;
command += " " + service_url.Host(); //host
command += " -H " + service_url.Host(); //host
std::stringstream port;
port << service_url.Port();
command += " " + port.str(); //port
command += " " + topic; //topic
command += " " + cfg.key; //certificate key
command += " " + cfg.cert; //certificate
command += " " + cfg.cadir; //cadir
command += " " + default_path; //messages path
command += " " + use_ssl; //use_ssl
command += " -p " + port.str(); //port
command += " -t " + topic; //topic
command += " -k " + cfg.key; //certificate key
command += " -c " + cfg.cert; //certificate
command += " -C " + cfg.cadir; //cadir
command += " -m " + default_path; //messages path
command += " -d " + Arc::level_to_string(logger.getThreshold()); // loglevel
if (use_ssl) {
command += " --ssl"; //use_ssl
}
command += "";
logger.msg(Arc::INFO, "Running SSM client using: %s", command);
retval = system(command.c_str());
logger.msg(Arc::DEBUG, "system retval: %d", retval);
logger.msg(Arc::DEBUG, "SSM client exit code: %d", retval);
if (retval == 0) {
return Arc::MCC_Status(Arc::STATUS_OK,
"apelclient",
......
......@@ -33,7 +33,7 @@ namespace ArcJura
int max_ur_set_size;
bool rereport;
/** Require to set to ture this option by production message broker */
std::string use_ssl;
bool use_ssl;
/** Actual number of usage records in set */
int urn;
/** File name extension */
......
......@@ -22,12 +22,12 @@ namespace ArcJura
aggregationrecordset(Arc::NS("","http://eu-emi.eu/namespaces/2012/11/aggregatedcomputerecord"),
"SummaryRecords")
{
init(_host, "", "");
init(_host, "", "", false);
}
CARAggregation::CARAggregation(std::string _host, std::string _port, std::string _topic, bool synch):
CARAggregation::CARAggregation(std::string _host, std::string _port, std::string _topic, bool synch, bool _ssl):
logger(Arc::Logger::rootLogger, "JURA.CARAggregation"),
use_ssl("false"),
use_ssl(false),
sequence(0),
aggr_record_update_need(false),
synch_message(false),
......@@ -35,10 +35,10 @@ namespace ArcJura
"SummaryRecords")
{
synch_message = synch;
init(_host, _port, _topic);
init(_host, _port, _topic, _ssl);
}
void CARAggregation::init(std::string _host, std::string _port, std::string _topic)
void CARAggregation::init(std::string _host, std::string _port, std::string _topic, bool _ssl)
{
ns[""] = "http://eu-emi.eu/namespaces/2012/11/aggregatedcomputerecord";
ns["urf"] = "http://eu-emi.eu/namespaces/2012/11/computerecord";
......@@ -65,6 +65,7 @@ namespace ArcJura
host = _host;
port = _port;
topic = _topic;
use_ssl = _ssl;
//read the previous aggregation records
std::string default_path = (std::string)JURA_DEFAULT_DIR_PREFIX + "/urs/";
aggr_record_location = default_path + host + "_aggregation_records.xml";
......@@ -164,48 +165,25 @@ namespace ArcJura
);
}
int retval;
//ssmsend <hostname> <port> <topic> <key path> <cert path> <cadir path> <messages path> <use_ssl>"
std::string command;
std::vector<std::string> ssm_pathes;
std::string exec_cmd = "ssmsend";
//RedHat: /usr/libexec/arc/ssm_master
ssm_pathes.push_back("/usr/libexec/arc/"+exec_cmd);
ssm_pathes.push_back("/usr/local/libexec/arc/"+exec_cmd);
// Ubuntu/Debian: /usr/lib/arc/ssm_master
ssm_pathes.push_back("/usr/lib/arc/"+exec_cmd);
ssm_pathes.push_back("/usr/local/lib/arc/"+exec_cmd);
// If you don't use non-standard prefix for a compilation you will
// use this extra location.
std::ostringstream prefix;
prefix << INSTPREFIX << "/" << PKGLIBEXECSUBDIR << "/";
ssm_pathes.push_back(prefix.str()+exec_cmd);
// Find the location of the ssm_master
std::string ssm_command = "./ssm/"+exec_cmd;
for (int i=0; i<(int)ssm_pathes.size(); i++) {
std::ifstream ssmfile(ssm_pathes[i].c_str());
if (ssmfile) {
// The file exists,
ssm_command = ssm_pathes[i];
ssmfile.close();
break;
}
//ssmsend -H <hostname> -p <port> -t <topic> -k <key path> -c <cert path> -C <cadir path> -m <messages path> [--ssl]"
std::string command = INSTPREFIX "/" PKGLIBEXECSUBDIR "/ssmsend";
command += " -H " + host; //host
command += " -p " + port; //port
command += " -t " + topic; //topic
command += " -k " + cfg.key; //certificate key
command += " -c " + cfg.cert; //certificate
command += " -C " + cfg.cadir; //cadir
command += " -m " + default_path; //messages path
command += " -d " + Arc::level_to_string(logger.getThreshold()); // loglevel
if (use_ssl) {
command += " --ssl"; //use_ssl
}
command = ssm_command;
command += " " + host; //host
command += " " + port; //port
command += " " + topic; //topic
command += " " + cfg.key; //certificate key
command += " " + cfg.cert; //certificate
command += " " + cfg.cadir; //cadir
command += " " + default_path; //messages path
command += " " + use_ssl; //use_ssl
command += "";
logger.msg(Arc::INFO, "Running SSM client using: %s", command);
retval = system(command.c_str());
logger.msg(Arc::DEBUG, "system retval: %d", retval);
logger.msg(Arc::DEBUG, "SSM client return value: %d", retval);
if (retval == 0) {
return Arc::MCC_Status(Arc::STATUS_OK,
"apelclient",
......
......@@ -20,7 +20,7 @@ namespace ArcJura
std::string topic;
/** Require to set to true this option by production message broker */
std::string use_ssl;
bool use_ssl;
/** File name extension */
int sequence;
/** location of Aggregation Records */
......@@ -32,7 +32,7 @@ namespace ArcJura
Arc::NS ns;
Arc::NS ns_query;
void init(std::string _host, std::string _port, std::string _topic);
void init(std::string _host, std::string _port, std::string _topic, bool _ssl);
/** Send records to the accounting server. */
Arc::MCC_Status send_records(const std::string &urset);
/** Update all records sending dates */
......@@ -57,7 +57,7 @@ namespace ArcJura
/**
* Constructor for record reporting.
*/
CARAggregation(std::string _host, std::string _port, std::string _topic, bool synch);
CARAggregation(std::string _host, std::string _port, std::string _topic, bool synch, bool _ssl);
~CARAggregation();
/** Generated record from CAR record, collects it into the
......
......@@ -44,8 +44,12 @@ namespace ArcJura
Arc::VOMSTrustList voms_trust_dn;
voms_trust_dn.AddRegex(".*");
std::vector<Arc::VOMSACInfo> voms_attributes;
// parse VOMS AC suppressing any parsing ERRORs unless DEBUG level is requested
Arc::LogLevel current_loglevel = Arc::Logger::getRootLogger().getThreshold();
if ( current_loglevel != Arc::DEBUG ) Arc::Logger::getRootLogger().setThreshold(Arc::FATAL);
parseVOMSAC(holder, ca_dir, "", voms_dir, voms_trust_dn, voms_attributes, true, true);
//parseVOMSAC(cert_str, ca_dir, "", voms_dir, voms_trust_dn, voms_attributes, true, true);
Arc::Logger::getRootLogger().setThreshold(current_loglevel);
return voms_attributes;
}
......
......@@ -132,7 +132,8 @@ int main(int argc, char **argv)
{
std::cout << urls[i] << std::endl;
// Tokenize service URL
std::string host, port, endpoint;
std::string host, port, endpoint, protocol;
bool use_ssl = false;
if (urls[i].empty())
{
logger.msg(Arc::ERROR, "ServiceURL missing");
......@@ -146,6 +147,7 @@ int main(int argc, char **argv)
os<<url.Port();
port=os.str();
endpoint=url.Path();
protocol=url.Protocol();
}
if (topics[i].empty())
......@@ -153,8 +155,11 @@ int main(int argc, char **argv)
logger.msg(Arc::ERROR, "Topic missing for a (%s) host.", urls[i]);
continue;
}
if (protocol == "https") {
use_ssl = true;
}
logger.msg(Arc::INFO, "Aggregation record(s) sending to %s", host);
aggr = new ArcJura::CARAggregation(host, port, topics[i], sync);
aggr = new ArcJura::CARAggregation(host, port, topics[i], sync, use_ssl);
if ( !year.empty() )
{
......
pkgpythondir = $(PYTHON_SITE_ARCH)/arc/ssm
pkgpython_PYTHON = ssm2.py crypto.py brokers.py sender.cfg __init__.py
pkgpython_PYTHON = ssm2.py crypto.py __init__.py
pkglibexec_SCRIPTS = ssmsend
This modified Python codes are part of the SSM that developed by APEL.
NorduGrid ARC distribution includes necessary SSM python libraries to submit records to APEL.
This is necessary thus we provides NorduGrid ARC packaging for number of platforms APEL SSM is not packaged.
Prepare selected files from original SSM distribution:
brokers.py
crypto.py
SSM libraries are developed by APEL team and can be found at https://github.com/apel/ssm
Following files are copied from APEL SSM codebase:
__init__.py
sender.cfg
crypto.py
ssm2.py
sender.py -> ssmsend.in
Apply arc-ssm.patch to add ARC modifications and update files.
The 'ssmsend' utility is a helper script to invoke SSM to send accounting record to APEL from ARC Jura.
ARC Jura prepare the records to be sent and call 'ssmsend' utlity with proper parameters,
NOTE: ARC patch includes futurize for Python3 compatibility. Run futurize to make sure new SSM code that not covered by arc-ssm.patch also updated.
Records submission logic in 'ssmsend' obey APEL sender.py code logic. Everything else is a glue with ARC code.
In case of operating system has APEL SSM libraries installed - these libraries will be used.
ARC redistributed libraries are used as a fallback option and targeted for OSes without APEL SSM packaging support.
ARC also works for most modern Linux distrbutions that have Python3 as a default Python version.
At the time of writting the only modification to redistributed SSM libraries is 'futurization' to support Python 3.
Changelog:
*Wed May 1 2019 Andrii Salnikov <manf@grid.org.ua>
- update to ssm2 2.3.0-2
- ssmsend rework specially for ARC Jura
*Sun Sep 9 2018 Andrii Salnikov <manf@grid.org.ua>
-update to ssm2 2.3.0
......
diff -ur 2.2.1-orig/brokers.py 2.2.1-arc/brokers.py
--- 2.2.1-orig/brokers.py 2018-05-14 16:00:55.000000001 +0300
+++ 2.2.1-arc/brokers.py 2018-08-24 22:16:20.065833696 +0300
@@ -18,6 +18,9 @@
Class to interact with a BDII LDAP server to retrieve information about
the stomp brokers specified in a network.
'''
+
+from __future__ import print_function
+
import ldap
import logging
@@ -140,12 +143,12 @@
def print_brokers(text, service, network):
brokers = BG.get_broker_hosts_and_ports(service, network)
# Print section heading
- print '==', text, '=='
+ print('==', text, '==')
# Print brokers in form 'host:port'
for broker in brokers:
- print '%s:%i' % (broker[0], broker[1])
+ print('%s:%i' % (broker[0], broker[1]))
# Leave space between sections
- print
+ print()
print_brokers('SSL production brokers', STOMP_SSL_SERVICE, 'PROD')
print_brokers('Production brokers', STOMP_SERVICE, 'PROD')
diff -ur 2.2.1-orig/crypto.py 2.2.1-arc/crypto.py
--- 2.2.1-orig/crypto.py 2018-05-14 16:00:55.000000001 +0300
+++ 2.2.1-arc/crypto.py 2018-09-05 13:44:16.186307157 +0300
@@ -57,7 +57,7 @@
try:
cert = _from_file(certpath)
key = _from_file(keypath)
- except IOError, e:
+ except IOError as e:
log.error('Could not find cert or key file: %s', e)
return False
@@ -101,7 +101,7 @@
return signed_msg
- except OSError, e:
+ except OSError as e:
log.error('Failed to sign message: %s', e)
raise CryptoException('Message signing failed. Check cert and key permissions.')
@@ -289,7 +289,7 @@
'''
Return the certificate subject's DN, in legacy openssl format.
'''
- p1 = Popen(['openssl', 'x509', '-noout', '-subject'],
+ p1 = Popen(['openssl', 'x509', '-noout', '-subject', '-nameopt', 'oneline'],
stdin=PIPE, stdout=PIPE, stderr=PIPE)
subject, error = p1.communicate(certstring)
@@ -298,7 +298,18 @@
log.error(error)
raise CryptoException('Failed to get subject: %s' % error)
- subject = subject.strip()[9:] # remove 'subject= ' from the front
+ subject = subject.strip()[8:] # remove 'subject=' from the front
+ subject = subject.lstrip() # even if there is space after subject=
+ subject = subject.split(', ')
+
+ for idx in range(len(subject)):
+ el = subject[idx].split(' = ')
+ el = '='.join(el)
+ subject.pop(idx)
+ subject.insert(idx,el)
+
+ subject = '/' + '/'.join(subject)
+
return subject
diff -ur 2.2.1-orig/sender.cfg 2.2.1-arc/sender.cfg
--- 2.2.1-orig/sender.cfg 2018-05-14 16:00:55.000000001 +0300
+++ 2.2.1-arc/sender.cfg 2018-09-05 13:44:16.186307157 +0300
@@ -6,15 +6,15 @@
# The SSM will query a BDII to find brokers available. These details are for the
# EGI production broker network
-bdii: ldap://lcg-bdii.cern.ch:2170
-network: PROD
+#bdii: ldap://lcg-bdii.cern.ch:2170
+#network: PROD
# OR (these details will only be used if the broker network settings aren't used)
-#host: test-msg01.afroditi.hellasgrid.gr
+#host: test-msg02.afroditi.hellasgrid.gr
#port: 6163
# broker authentication. If use_ssl is set, the certificates configured
# in the mandatory [certificates] section will be used.
-use_ssl: true
+use_ssl: false
################################################################################
@@ -36,13 +36,13 @@
[messaging]
# Queue to which SSM will send messages
-destination:
+destination: /queue/global.accounting.cputest.CENTRAL
# Outgoing messages will be read and removed from this directory.
path: /var/spool/apel/outgoing
[logging]
-logfile: /var/log/apel/ssmsend.log
+logfile: /var/spool/arc/ssm/ssmsend.log
# Available logging levels:
# DEBUG, INFO, WARN, ERROR, CRITICAL
level: INFO
diff -ur 2.2.1-orig/ssm2.py 2.2.1-arc/ssm2.py
--- 2.2.1-orig/ssm2.py 2018-05-14 16:00:55.000000001 +0300
+++ 2.2.1-arc/ssm2.py 2018-09-05 13:44:16.187307133 +0300
@@ -16,6 +16,8 @@
@author: Will Rogers
'''
+from __future__ import absolute_import
+
# It's possible for SSM to be used without SSL, and the ssl module isn't in the
# standard library until 2.6, so this makes it safe for earlier Python versions.
try:
@@ -24,7 +26,7 @@
# ImportError is raised later on if SSL is actually requested.
ssl = None
-from ssm import crypto
+from . import crypto
from dirq.QueueSimple import QueueSimple
from dirq.queue import Queue
@@ -238,7 +240,7 @@
if 'application/pkcs7-mime' in text or 'application/x-pkcs7-mime' in text:
try:
text = crypto.decrypt(text, self._cert, self._key)
- except crypto.CryptoException, e:
+ except crypto.CryptoException as e:
error = 'Failed to decrypt message: %s' % e
log.error(error)
return None, None, error
@@ -246,7 +248,7 @@
# always signed
try:
message, signer = crypto.verify(text, self._capath, self._check_crls)
- except crypto.CryptoException, e:
+ except crypto.CryptoException as e:
error = 'Failed to verify message: %s' % e
log.error(error)
return None, None, error
@@ -326,7 +328,7 @@
try:
# Remove empty dirs and unlock msgs older than 5 min (default)
self._outq.purge()
- except OSError, e:
+ except OSError as e:
log.warn('OSError raised while purging message queue: %s', e)
############################################################################
@@ -388,10 +390,10 @@
try:
self.start_connection()
break
- except ConnectFailedException, e:
+ except ConnectFailedException as e:
# ConnectFailedException doesn't provide a message.
log.warn('Failed to connect to %s:%s.', host, port)
- except Ssm2Exception, e:
+ except Ssm2Exception as e:
log.warn('Failed to connect to %s:%s: %s', host, port, e)
if not self.connected:
@@ -478,7 +480,7 @@
f.write(str(os.getpid()))
f.write('\n')
f.close()
- except IOError, e:
+ except IOError as e:
log.warn('Failed to create pidfile %s: %s', self._pidfile, e)
self.handle_connect()
@@ -494,7 +496,7 @@
os.remove(self._pidfile)
else:
log.warn('pidfile %s not found.', self._pidfile)
- except IOError, e:
+ except IOError as e:
log.warn('Failed to remove pidfile %s: %e', self._pidfile, e)
log.warn('SSM may not start again until it is removed.')
diff -ur 2.2.1-orig/ssmsend.in 2.2.1-arc/ssmsend.in
--- 2.2.1-orig/ssmsend.in 2018-05-14 16:00:55.000000001 +0300
+++ 2.2.1-arc/ssmsend.in 2018-09-08 13:14:30.983453275 +0300
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!@PYTHON@
# Copyright (C) 2012 STFC
#
@@ -19,27 +19,39 @@
@author: Will Rogers
'''
-from ssm import __version__, set_up_logging, LOG_BREAK
-from ssm.ssm2 import Ssm2, Ssm2Exception
-from ssm.crypto import CryptoException
-from ssm.brokers import StompBrokerGetter, STOMP_SERVICE, STOMP_SSL_SERVICE
+from __future__ import print_function
-import logging.config
-import ldap
import sys
import os
+
+# ARC-prefix path in PYTHONPATH
+arc_prefix_pythonpath = '@PYTHON_SITE_ARCH@'.replace('${prefix}', '@prefix@')
+if os.path.isdir(arc_prefix_pythonpath):
+ if arc_prefix_pythonpath not in sys.path:
+ sys.path.insert(1, arc_prefix_pythonpath)
+