Commit f0a58eaa authored by Aleksandr Konstantinov's avatar Aleksandr Konstantinov
Browse files

Merge branch 'slowdown_hunting' into 'master'

More changes to reduce a-rex slowdown. Fixes BUGZ-4020.

See merge request nordugrid/arc!1217
parents 74224ef9 184cbea6
......@@ -38,13 +38,14 @@ namespace Arc {
public:
StringData();
virtual ~StringData();
void Assign(std::string& str);
void Assign(std::string& str, int content_max_size = 0);
virtual void Append(char const* data, unsigned int size);
virtual void Remove(unsigned int size);
virtual char const* Get() const;
virtual unsigned int Size() const;
private:
std::string* content_;
int content_max_size_;
};
// working directory
......@@ -164,12 +165,12 @@ namespace Arc {
/// Associate stdout pipe of executable with string.
/** This method must be called before Start(). str object
must be valid as long as this object exists. */
void AssignStdout(std::string& str);
void AssignStdout(std::string& str, int max_size = 102400);
void AssignStdout(Data& str);
/// Associate stderr pipe of executable with string.
/** This method must be called before Start(). str object
must be valid as long as this object exists. */
void AssignStderr(std::string& str);
void AssignStderr(std::string& str, int max_size = 102400);
void AssignStderr(Data& str);
/// Associate stdin pipe of executable with string.
/** This method must be called before Start(). str object
......
......@@ -802,9 +802,9 @@ namespace Arc {
return (!running_);
}
void Run::AssignStdout(std::string& str) {
void Run::AssignStdout(std::string& str, int max_size) {
if (!running_) {
stdout_str_wrap_.Assign(str);
stdout_str_wrap_.Assign(str,max_size);
stdout_str_ = &stdout_str_wrap_;
}
}
......@@ -815,9 +815,9 @@ namespace Arc {
}
}
void Run::AssignStderr(std::string& str) {
void Run::AssignStderr(std::string& str, int max_size) {
if (!running_) {
stderr_str_wrap_.Assign(str);
stderr_str_wrap_.Assign(str,max_size);
stderr_str_ = &stderr_str_wrap_;
}
}
......@@ -872,18 +872,23 @@ namespace Arc {
}
Run::StringData::StringData(): content_(NULL) {
Run::StringData::StringData(): content_(NULL),content_max_size_(0) {
}
Run::StringData::~StringData() {
}
void Run::StringData::Assign(std::string& str) {
void Run::StringData::Assign(std::string& str, int content_max_size) {
content_max_size_ = content_max_size;
content_ = &str;
}
void Run::StringData::Append(char const* data, unsigned int size) {
if(content_) content_->append(data, size);
if(content_) {
if((content_max_size_ > 0) && (content_->length() < content_max_size_)) {
content_->append(data, size);
}
}
}
void Run::StringData::Remove(unsigned int size) {
......
......@@ -310,6 +310,8 @@ bool GridManager::thread() {
joblog->RunReporter(config_);
}
}
// TODO: review metrics calls to reduce frequency of calling gmetrics tool.
JobsMetrics* metrics = config_.GetJobsMetrics();
if(metrics) metrics->Sync();
// Process jobs which need attention ASAP
......
......@@ -24,7 +24,7 @@ HeartBeatMetrics::HeartBeatMetrics():enabled(false),proc(NULL) {
free = 0;
totalfree = 0;
time_lastupdate = (time_delta = (time_now = time(NULL)));
time_delta = 0;
time_update = false;
}
......@@ -45,21 +45,22 @@ void HeartBeatMetrics::SetGmetricPath(const char* path) {
}
void HeartBeatMetrics::ReportHeartBeatChange(const GMConfig& config) {
void HeartBeatMetrics::ReportHeartBeatChange(const GMConfig& config) {
if(!enabled) return; // not configured
Glib::RecMutex::Lock lock_(lock);
struct stat st;
std::string heartbeat_file(config.ControlDir() + "/gm-heartbeat");
if(Arc::FileStat(heartbeat_file, &st, true)){
time_lastupdate = st.st_mtime;
time_now = time(NULL);
time_delta = time_now - time_lastupdate;
time_update = true;
}
else{
logger.msg(Arc::ERROR,"Error with hearbeatfile: %s",heartbeat_file.c_str());
time_update = false;
}
struct stat st;
std::string heartbeat_file(config.ControlDir() + "/gm-heartbeat");
if(Arc::FileStat(heartbeat_file, &st, true)){
time_t time_lastupdate = st.st_mtime;
time_t time_now = time(NULL);
time_delta = time_now - time_lastupdate;
time_update = true;
}
else{
logger.msg(Arc::ERROR,"Error with hearbeatfile: %s",heartbeat_file.c_str());
time_update = false;
}
Sync();
}
......@@ -150,7 +151,7 @@ void HeartBeatMetrics::SyncAsync(void* arg) {
void HeartBeatMetrics::RunMetricsKicker(void* arg) {
// Currently it is not allowed to start new external process
// from inside process licker (todo: redesign).
// from inside process kicker (todo: redesign).
// So do it asynchronously from another thread.
Arc::CreateThreadFunction(&SyncAsync, arg);
}
......
......@@ -23,8 +23,6 @@ class HeartBeatMetrics {
std::string config_filename;
std::string tool_path;
time_t time_now;
time_t time_lastupdate;
time_t time_delta;
......
......@@ -16,85 +16,60 @@ static Arc::Logger& logger = Arc::Logger::getRootLogger();
JobStateList::JobStateList(int _limit):limit(_limit){
failures = 0;
length = 0;
this_node = NULL;
oldhead = NULL;
head = NULL;
tail = NULL;
}
JobStateList::~JobStateList(){}
JobStateList::JobNode::JobNode(JobStateList* _sl, JobNode* _prev, JobNode* _next, bool _isfailed, std::string _job_id):
sl(_sl),prev(_prev),next(_next),isfailed(_isfailed),job_id(_job_id){
//update the previously last node in the list to instead point to NULL, now point to the new node
if(prev)prev->next = this;
//this is maybe not necessary as in the current set-up the next pointer is always NULL, insce the next of the last item in the list is NULL
if(next)next->prev = this;
JobStateList::JobNode::JobNode(bool _isfailed, std::string _job_id):
isfailed(_isfailed),job_id(_job_id){
}
JobStateList::JobNode::~JobNode(){}
JobStateList::JobNode* JobStateList::NodeInList(std::string _job_id){
JobStateList::JobNode* JobStateList::NodeInList(std::string _job_id){
JobStateList::JobNode* it = head;
if (head != NULL){
while(it->next != NULL){
std::list<JobNode>::iterator it = nodes.begin();
while(it != nodes.end()){
if(it->job_id == _job_id){
return it;
}
it = it->next;
if(it->job_id == _job_id){
return &(*it);
}
++it;
}
return NULL;
}
void JobStateList::setFailure(bool _isfailed,std::string _job_id){
void JobStateList::SetFailure(bool _isfailed,std::string _job_id){
//check if the node is already in the list, and if it is update the failure status
this_node = NodeInList(_job_id);
//check if the node is already in the list, and if it is update the failure status
JobStateList::JobNode* this_node = NodeInList(_job_id);
if(this_node){
//existing job in the list
if(!this_node->isfailed && _isfailed){
//update the failure-state of the node
//only update once (i.e. if node was not failed before)
this_node->isfailed=_isfailed;
if(_isfailed)failures++;
failures++;
}
}
else{
if(head==NULL){
//first node in list
JobStateList::JobNode* node = new JobStateList::JobNode(this,NULL,NULL,_isfailed,_job_id);
head = tail = node;
length++;
if(_isfailed)failures++;
} else {
//put the new node at the end of the list (newest job)
JobStateList::JobNode* node = new JobStateList::JobNode(this,tail,NULL,_isfailed,_job_id);
tail = node;
length++;
if(_isfailed)failures++;
if(length>limit){
//list is now 1 too long, remove the old head of the list (oldest job)
oldhead = head;
head = oldhead->next;
length--;
if (oldhead->isfailed)failures--;
oldhead = NULL;
}
JobStateList::JobNode node(_isfailed,_job_id);
nodes.push_back(node);
if(_isfailed)failures++;
if(nodes.size()>limit){
//list is now 1 too long, remove the old head of the list (oldest job)
if(nodes.front().isfailed)failures--;
nodes.pop_front();
}
}
}
JobsMetrics::JobsMetrics():enabled(false),proc(NULL) {
JobsMetrics::JobsMetrics():enabled(false),proc(NULL),jobstatelist(100) {
job_fail_counter = 0;
std::memset(jobs_in_state, 0, sizeof(jobs_in_state));
std::memset(jobs_in_state_changed, 0, sizeof(jobs_in_state_changed));
......@@ -107,12 +82,9 @@ JobsMetrics::JobsMetrics():enabled(false),proc(NULL) {
time_lastupdate = time(NULL);
jobstatelist = new JobStateList(100);
}
JobsMetrics::~JobsMetrics() {
delete jobstatelist;
}
void JobsMetrics::SetEnabled(bool val) {
......@@ -128,7 +100,8 @@ void JobsMetrics::SetGmetricPath(const char* path) {
}
void JobsMetrics::ReportJobStateChange(const GMConfig& config, GMJobRef i, job_state_t old_state, job_state_t new_state) {
void JobsMetrics::ReportJobStateChange(const GMConfig& config, GMJobRef i, job_state_t old_state, job_state_t new_state) {
if(!enabled) return; // not configured
Glib::RecMutex::Lock lock_(lock);
......@@ -140,9 +113,9 @@ void JobsMetrics::SetGmetricPath(const char* path) {
*/
/*jobstatelist holds jobid and 1 for failed or 0 for non-failed job for 100 latest jobs */
jobstatelist->setFailure(i->CheckFailure(config),job_id);
job_fail_counter = jobstatelist->failures;
/*jobstatelist holds jobid and true for failed or false for non-failed job for 100 latest jobs */
jobstatelist.SetFailure(i->CheckFailure(config),job_id);
job_fail_counter = jobstatelist.failures;
fail_changed = true;
//actual states (jobstates)
......@@ -257,7 +230,7 @@ void JobsMetrics::SyncAsync(void* arg) {
void JobsMetrics::RunMetricsKicker(void* arg) {
// Currently it is not allowed to start new external process
// from inside process licker (todo: redesign).
// from inside process kicker (todo: redesign).
// So do it asynchronously from another thread.
Arc::CreateThreadFunction(&SyncAsync, arg);
}
......
......@@ -18,19 +18,15 @@ namespace ARex {
class JobStateList {
/*Holds sucess or fail of last 100 jobs */
private:
class JobNode {
public:
std::string job_id;
int isfailed;
JobStateList* sl;
JobStateList::JobNode* next;
JobStateList::JobNode* prev;
bool isfailed;
JobNode(JobStateList* _sl, JobNode* _prev=NULL, JobNode* _next=NULL, bool _isfailed=false, std::string _job_id="");
JobNode(bool _isfailed=false, std::string _job_id="");
~JobNode(void);
};
......@@ -39,19 +35,14 @@ namespace ARex {
private:
const int limit;
public:
int failures;
int length;
std::list<JobNode> nodes;
JobStateList::JobNode* this_node;
JobStateList::JobNode* oldhead;
JobStateList::JobNode* tail;
JobStateList::JobNode* head;
JobStateList::JobNode* NodeInList(std::string _job_id);
void setFailure(bool _isfailed, std::string _job_id);
public:
int failures;
JobStateList::JobNode* NodeInList(std::string _job_id);
void SetFailure(bool _isfailed, std::string _job_id);
JobStateList(int _limit);
~JobStateList(void);
......@@ -95,7 +86,7 @@ class JobsMetrics {
static void RunMetricsKicker(void* arg);
static void SyncAsync(void* arg);
JobStateList* jobstatelist;
JobStateList jobstatelist;
public:
JobsMetrics(void);
~JobsMetrics(void);
......
......@@ -48,6 +48,7 @@ namespace ARex {
void SpaceMetrics::ReportSpaceChange(const GMConfig& config) {
if(!enabled) return; // not configured
Glib::RecMutex::Lock lock_(lock);
/*Free sessiondir space*/
......@@ -221,7 +222,7 @@ namespace ARex {
void SpaceMetrics::RunMetricsKicker(void* arg) {
// Currently it is not allowed to start new external process
// from inside process licker (todo: redesign).
// from inside process kicker (todo: redesign).
// So do it asynchronously from another thread.
Arc::CreateThreadFunction(&SyncAsync, arg);
}
......
......@@ -38,7 +38,7 @@ void ARexService::InformationCollector(void) {
std::string stderr_str;
Arc::Run run(cmd);
run.AssignStdin(stdin_str);
run.AssignStdout(xml_str);
run.AssignStdout(xml_str, 1024*1024); // can information document become bigger than 1MB?
run.AssignStderr(stderr_str);
logger_.msg(Arc::DEBUG,"Resource information provider: %s",cmd);
if(!run.Start()) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment