...
 
Commits (2)
  • Ievgen Sliusar's avatar
    Add write() method to JURA::JobLogFile · 616060fa
    Ievgen Sliusar authored
    Also handle inheritance of accounting record from old file if
    file is being saved with different name.
    This is useful because multiple destinations may consume the same
    accounting record and do not spill archive directory with duplicates.
    616060fa
  • Ievgen Sliusar's avatar
    Implement per-destination logfile generation. · 3386f5ba
    Ievgen Sliusar authored
    Generate multiple files from A-REX-produced records (without loggerurl)
    with specific logger URLs and force directory rescanning.
    Submit all logfiles which have loggerurl defined.
    3386f5ba
......@@ -189,6 +189,47 @@ namespace ArcJura
return count;
}
int JobLogFile::write(const std::string& filename_)
{
int count=0; //number of saved values
// save inherited filename for archiving!
if ( !filename.empty() && !(*this)["jobreport_option_archiving"].empty() )
{
std::string base_fn;
size_type seppos=filename.rfind('/');
if (seppos==std::string::npos)
base_fn=filename;
else
base_fn=filename.substr(seppos+1,std::string::npos);
(*this)["archiving_basefilename"] = base_fn;
}
//filename=_filename;
std::ofstream logfile(filename_.c_str());
std::map<std::string,std::string>::iterator it;
for(it=begin(); it!=end(); it++)
{
if ( it->first != "inputfile" && it->first != "outputfile" && it->second != "" )
{
logfile << it->first << "=" << it->second << std::endl;
count++;
}
}
for (int i=0; i<(int)inputfiles.size(); i++) {
logfile << "inputfile=" << inputfiles[i] << std::endl;
count++;
}
for (int i=0; i<(int)outputfiles.size(); i++) {
logfile << "outputfile=" << inputfiles[i] << std::endl;
count++;
}
logfile.close();
return count;
}
void JobLogFile::createUsageRecord(Arc::XMLNode &usagerecord,
const char *recordid_prefix)
{
......@@ -1332,12 +1373,15 @@ namespace ArcJura
if ((*this)["jobreport_option_archiving"].empty()) return std::string();
//if set, archive file name corresponds to original job log file
std::string base_fn;
size_type seppos=filename.rfind('/');
if (seppos==std::string::npos)
base_fn=filename;
else
base_fn=filename.substr(seppos+1,std::string::npos);
std::string base_fn = (*this)["archiving_basefilename"];
if (base_fn.empty())
{
size_type seppos=filename.rfind('/');
if (seppos==std::string::npos)
base_fn=filename;
else
base_fn=filename.substr(seppos+1,std::string::npos);
}
if (car) {
return (*this)["jobreport_option_archiving"]+"/usagerecordCAR."+base_fn;
......
......@@ -29,6 +29,8 @@ namespace ArcJura
JobLogFile(const std::string& _filename):allow_remove(true) { parse(_filename); }
/** Reloads and parses A-REX job log. */
int parse(const std::string& _filename);
/** Write job log to another file */
int write(const std::string& _filename);
/** Creates an OGF Job Usage Record from parsed log files.
* - Missing UR properties:
* -# ProcessID: Local PID(s) of job. Extraction is LRMS-specific and \n
......
......@@ -13,6 +13,8 @@
#include <time.h>
#include <sstream>
#include <glibmm.h>
#include <arc/ArcRegex.h>
#include <arc/Utils.h>
......@@ -60,6 +62,7 @@ namespace ArcJura
std::map<std::string,std::string> dest_to_duplicate;
//Collect job log file names from job log dir
//(to know where to get usage data from)
bool rescan = false;
DIR *dirp = NULL;
dirent *entp = NULL;
errno=0;
......@@ -92,8 +95,14 @@ namespace ArcJura
// Seek "<jobnumber>.<randomstring>" files.
Arc::RegularExpression logfilepattern("^[0-9A-Za-z]+\\.[^.]+$");
errno = 0;
while ((entp = readdir(dirp)) != NULL)
while ((entp = readdir(dirp)) != NULL || rescan)
{
if (entp == NULL) // rescan
{
rewinddir(dirp);
rescan = false;
continue;
}
if (logfilepattern.match(entp->d_name))
{
//Parse log file
......@@ -146,14 +155,18 @@ namespace ArcJura
{
//TODO: handle logfile removing problem by multiple destination
// it is not remove jet only when expired
// if it is virgin logfile from a-rex
if ( (*logfile)["loggerurl"].empty() )
{
//Create SGAS reports
std::vector<Config::SGAS> const & sgases = config.getSGAS();
for (int i=0; i<(int)sgases.size(); i++) {
JobLogFile *dupl_logfile=
new JobLogFile(*logfile);
dupl_logfile->allowRemove(false);
std::string tmpfilename = fname + "_XXXXXX";
Glib::mkstemp(tmpfilename);
//dupl_logfile->allowRemove(false);
// Set only loggerurl option
(*dupl_logfile)["loggerurl"] = sgases[i].targeturl.fullstr();
......@@ -170,9 +183,11 @@ namespace ArcJura
{
(*dupl_logfile)["vo_filters"] = vo_filters;
}
//Write
dupl_logfile->write(tmpfilename);
//Pass job log file content to the appropriate
//logging destination
dests->report(*dupl_logfile,sgases[i]);
//dests->report(*dupl_logfile,sgases[i]);
//(deep copy performed)
delete dupl_logfile;
}
......@@ -182,7 +197,9 @@ namespace ArcJura
for (int i=0; i<(int)apels.size(); i++) {
JobLogFile *dupl_logfile=
new JobLogFile(*logfile);
dupl_logfile->allowRemove(false);
std::string tmpfilename = fname + "_XXXXXX";
Glib::mkstemp(tmpfilename);
//dupl_logfile->allowRemove(false);
// Set loggerurl and topic option
(*dupl_logfile)["loggerurl"] = "APEL:" + apels[i].targeturl.fullstr();
(*dupl_logfile)["topic"] = apels[i].topic;
......@@ -198,13 +215,57 @@ namespace ArcJura
if (!apels[i].benchmark_description.empty()) {
(*dupl_logfile)["jobreport_option_benchmark_description"] = apels[i].benchmark_description;
}
//Write
dupl_logfile->write(tmpfilename);
delete dupl_logfile;
}
// delete a-rex log file after all child logfiles were created
logfile->remove();
// new files need to be processed
rescan = true;
}
else // if it is our remade one for specific source
{
std::string loggerurl = (*logfile)["loggerurl"];
const Config::ACCOUNTING *aconf = NULL;
if ( !(*logfile)["topic"].empty() || loggerurl.substr(0,4) == "APEL")
{ //Search APEL configs
loggerurl.erase(0, 5); // cut "APEL:"
std::vector<Config::APEL> const & apels = config.getAPEL();
for (int i=0; i<(int)apels.size(); i++) {
if (apels[i].targeturl == loggerurl) {
aconf = new Config::APEL(apels[i]);
break;
}
}
}
else //Search SGAS configs
{
std::vector<Config::SGAS> const & sgases = config.getSGAS();
for (int i=0; i<(int)sgases.size(); i++) {
if (sgases[i].targeturl == loggerurl) {
aconf = new Config::SGAS(sgases[i]);
break;
}
}
}
if (aconf != NULL) {
//Pass job log file content to the appropriate
//logging destination
dests->report(*dupl_logfile,apels[i]);
dests->report(*logfile, *aconf);
//(deep copy performed)
delete dupl_logfile;
delete aconf;
}
else
{
// remove logfile because it has no reporter configuration
logfile->remove();
}
}
}
}
//B. Interactive mode: submit only to services specified by
......