Commit ffb55404 authored by Aleksandr Konstantinov's avatar Aleksandr Konstantinov
Browse files

Merge branch 'bug_3841_3' into 'master'

Speed up jobs processing in DTR. Fixes BUGZ-3841

See merge request nordugrid/arc!926
parents ab99c28e ad922bf1
......@@ -85,6 +85,12 @@ void DTRGenerator::thread() {
elock.lock();
} else {
logger.msg(Arc::DEBUG, "%s: Returning canceled job from DTR generator", job->get_id());
elock.unlock();
{
Arc::AutoLock<Arc::SimpleCondition> dlock(dtrs_lock);
finished_jobs[job->get_id()] = std::string("Job was canceled while waiting in DTR queue");
}
elock.lock();
jobs_received.Erase(job);
jobs.RequestAttention(job); // pass job back to states processing
}
......@@ -120,12 +126,15 @@ void DTRGenerator::thread() {
elock.lock();
if(!jobAccepted) {
logger.msg(Arc::DEBUG, "%s: Re-requesting attention from DTR generator", job->get_id());
// processReceivedJob fills error in finished_jobs - no need to do that here
jobs_received.Erase(job); // release from queue cause 'jobs' queues have lower priority
jobs.RequestAttention(job); // pass job back to states processing
}
}
bool queuesEmpty = jobs_cancelled.empty() && dtrs_received.empty() && jobs_received.IsEmpty();
elock.unlock();
event_lock.wait(50000);
// wait till something arrives or go back to processing almost immediately if queues not empty
event_lock.wait(queuesEmpty ? 50000 : 100);
} // main processing loop
// stop scheduler - cancels all DTRs and waits for them to complete
scheduler->stop();
......@@ -543,9 +552,10 @@ bool DTRGenerator::processReceivedDTR(DataStaging::DTR_ptr dtr) {
std::multimap<std::string, std::string>::iterator> dtr_iterator = active_dtrs.equal_range(jobid);
if (dtr_iterator.first == dtr_iterator.second) {
finished_jobs[jobid] += std::string(""); // It is not clear either this is error. At least mark it as finished.
dlock.unlock();
logger.msg(Arc::WARNING, "No active job id %s", jobid);
// No DTRs recorded. But still we have job ref. It is probbaly safer to return it.
// No DTRs recorded. But still we have job ref. It is probably safer to return it.
jobs_processing.Erase(job);
jobs.RequestAttention(job);
return true;
......
......@@ -311,6 +311,11 @@ bool GMJobQueue::Exists(const GMJobRef& ref) const {
return (ref->queue == this);
}
bool GMJobQueue::IsEmpty() const {
Glib::RecMutex::Lock lock(lock_);
return queue_.empty();
}
void GMJobQueue::Sort(comparator_t compare) {
Glib::RecMutex::Lock lock(lock_);
queue_.sort(compare);
......
......@@ -184,6 +184,10 @@ public:
return (job_ == job);
}
bool operator==(GMJob* job) const {
return (job_ == job);
}
operator bool() const {
return job_ != NULL;
}
......@@ -259,6 +263,9 @@ class GMJobQueue {
//! Returns true if job is in queue
bool Exists(const GMJobRef& ref) const;
//! Returns true if there are no jobs in queue
bool IsEmpty() const;
//! Sort jobs in queue
void Sort(comparator_t compare);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment