Commit ee3dcf02 authored by Aleksandr Konstantinov's avatar Aleksandr Konstantinov

Merge branch 'dev-rest' into 'master'

Dev rest

See merge request nordugrid/arc!1085
parents 6201d689 0acb0724
......@@ -18,6 +18,7 @@
#include <arc/communication/ClientInterface.h>
#include <arc/delegation/DelegationInterface.h>
#include "SubmitterPluginREST.h"
#include "JobControllerPluginREST.h"
namespace Arc {
......@@ -95,6 +96,12 @@ namespace Arc {
if(pos != std::string::npos) id.erase(0,pos+1);
if(job_id == id) {
(*itJob)->State = JobStateARCREST(job_state);
// (*itJob)->RestartState = ;
// (*itJob)->StageInDir = (std::string)aid["esainfo:StageInDirectory"];
// (*itJob)->StageOutDir = (std::string)aid["esainfo:StageInDirectory"];
// (*itJob)->SessionDir = (std::string)aid["esainfo:StageInDirectory"];
// (*itJob)->DelegationID.push_back ;
// (*itJob)->JobID = ;
break;
}
}
......@@ -105,13 +112,14 @@ namespace Arc {
std::list<Job*>& jobs;
};
JobStateProcessor infoNodeProcessor(jobs);
JobStateProcessor stateProcessor(jobs);
Arc::URL currentServiceUrl;
std::list<std::string> IDs;
for (std::list<Job*>::const_iterator it = jobs.begin(); it != jobs.end(); ++it) {
if(!currentServiceUrl || (currentServiceUrl != GetAddressOfResource(**it))) {
if(!IDs.empty()) {
ProcessJobs(currentServiceUrl, "status", 200, IDs, IDsProcessed, IDsNotProcessed, infoNodeProcessor);
std::list<std::string> fakeIDs = IDs;
ProcessJobs(usercfg, currentServiceUrl, "status", 200, IDs, IDsProcessed, IDsNotProcessed, stateProcessor);
}
currentServiceUrl = GetAddressOfResource(**it);
}
......@@ -119,7 +127,8 @@ namespace Arc {
IDs.push_back((*it)->JobID);
}
if(!IDs.empty()) {
ProcessJobs(currentServiceUrl, "status", 200, IDs, IDsProcessed, IDsNotProcessed, infoNodeProcessor);
std::list<std::string> fakeIDs = IDs;
ProcessJobs(usercfg, currentServiceUrl, "status", 200, IDs, IDsProcessed, IDsNotProcessed, stateProcessor);
}
}
......@@ -132,7 +141,7 @@ namespace Arc {
for (std::list<Job*>::const_iterator it = jobs.begin(); it != jobs.end(); ++it) {
if(!currentServiceUrl || (currentServiceUrl != GetAddressOfResource(**it))) {
if(!IDs.empty()) {
if (!ProcessJobs(currentServiceUrl, "clean", 202, IDs, IDsProcessed, IDsNotProcessed, infoNodeProcessor))
if (!ProcessJobs(usercfg, currentServiceUrl, "clean", 202, IDs, IDsProcessed, IDsNotProcessed, infoNodeProcessor))
ok = false;
}
currentServiceUrl = GetAddressOfResource(**it);
......@@ -141,7 +150,7 @@ namespace Arc {
IDs.push_back((*it)->JobID);
}
if(!IDs.empty()) {
if (!ProcessJobs(currentServiceUrl, "clean", 202, IDs, IDsProcessed, IDsNotProcessed, infoNodeProcessor)) {
if (!ProcessJobs(usercfg, currentServiceUrl, "clean", 202, IDs, IDsProcessed, IDsNotProcessed, infoNodeProcessor)) {
ok = false;
}
}
......@@ -158,7 +167,7 @@ namespace Arc {
for (std::list<Job*>::const_iterator it = jobs.begin(); it != jobs.end(); ++it) {
if(!currentServiceUrl || (currentServiceUrl != GetAddressOfResource(**it))) {
if(!IDs.empty()) {
if (!ProcessJobs(currentServiceUrl, "kill", 202, IDs, IDsProcessed, IDsNotProcessed, infoNodeProcessor))
if (!ProcessJobs(usercfg, currentServiceUrl, "kill", 202, IDs, IDsProcessed, IDsNotProcessed, infoNodeProcessor))
ok = false;
}
currentServiceUrl = GetAddressOfResource(**it);
......@@ -167,7 +176,7 @@ namespace Arc {
IDs.push_back((*it)->JobID);
}
if(!IDs.empty()) {
if (!ProcessJobs(currentServiceUrl, "kill", 202, IDs, IDsProcessed, IDsNotProcessed, infoNodeProcessor)) {
if (!ProcessJobs(usercfg, currentServiceUrl, "kill", 202, IDs, IDsProcessed, IDsNotProcessed, infoNodeProcessor)) {
ok = false;
}
}
......@@ -177,10 +186,9 @@ namespace Arc {
bool JobControllerPluginREST::RenewJobs(const std::list<Job*>& jobs, std::list<std::string>& IDsProcessed, std::list<std::string>& IDsNotProcessed, bool isGrouped) const {
bool ok = true;
/*
for (std::list<Job*>::const_iterator it = jobs.begin(); it != jobs.end(); ++it) {
Arc::URL delegationUrl(GetAddressOfResource(**it));
delegationUrl.ChangePath(delegationUrl.Path()+DelegPrefix);
delegationUrl.ChangePath(delegationUrl.Path()+"/rest/1.0/delegations");
// 1. Fetch/find delegation ids for each job
if((*it)->DelegationID.empty()) {
logger.msg(INFO, "Job %s has no delegation associated. Can't renew such job.", (*it)->JobID);
......@@ -194,7 +202,7 @@ namespace Arc {
for(;did != (*it)->DelegationID.end();++did) {
std::string delegationId(*did);
if(!delegationId.empty()) {
if(!GetDelegation(delegationUrl, delegationId)) {
if(!SubmitterPluginREST::GetDelegation(*usercfg, delegationUrl, delegationId)) {
logger.msg(INFO, "Job %s failed to renew delegation %s.", (*it)->JobID, *did);
break;
}
......@@ -207,8 +215,6 @@ namespace Arc {
}
IDsProcessed.push_back((*it)->JobID);
}
*/
ok = false; // not implemented yet
return ok;
}
......@@ -221,7 +227,7 @@ namespace Arc {
for (std::list<Job*>::const_iterator it = jobs.begin(); it != jobs.end(); ++it) {
if(!currentServiceUrl || (currentServiceUrl != GetAddressOfResource(**it))) {
if(!IDs.empty()) {
if (!ProcessJobs(currentServiceUrl, "restart", 202, IDs, IDsProcessed, IDsNotProcessed, infoNodeProcessor)) {
if (!ProcessJobs(usercfg, currentServiceUrl, "restart", 202, IDs, IDsProcessed, IDsNotProcessed, infoNodeProcessor)) {
ok = false;
}
}
......@@ -231,7 +237,7 @@ namespace Arc {
IDs.push_back((*it)->JobID);
}
if(!IDs.empty()) {
if (!ProcessJobs(currentServiceUrl, "restart", 202, IDs, IDsProcessed, IDsNotProcessed, infoNodeProcessor)) {
if (!ProcessJobs(usercfg, currentServiceUrl, "restart", 202, IDs, IDsProcessed, IDsNotProcessed, infoNodeProcessor)) {
ok = false;
}
}
......@@ -239,9 +245,9 @@ namespace Arc {
return ok;
}
bool JobControllerPluginREST::ProcessJobs(Arc::URL const & resourceUrl, std::string const & action, int successCode,
bool JobControllerPluginREST::ProcessJobs(const UserConfig* usercfg, Arc::URL const & resourceUrl, std::string const & action, int successCode,
std::list<std::string>& IDs, std::list<std::string>& IDsProcessed, std::list<std::string>& IDsNotProcessed,
InfoNodeProcessor& infoNodeProcessor) const {
InfoNodeProcessor& infoNodeProcessor) {
Arc::URL statusUrl(resourceUrl);
statusUrl.ChangePath(statusUrl.Path()+"/rest/1.0/jobs");
statusUrl.AddHTTPOption("action",action);
......@@ -270,7 +276,7 @@ namespace Arc {
Arc::MCC_Status res = client.process(std::string("POST"), attributes, &request, &info, &response);
if((!res) || (info.code != 201)) {
logger.msg(WARNING, "Failed to process jobs - wrong response: %u", info.code);
if(response) logger.msg(DEBUG, "Content: %s", response->Content());
if(response && response->Content()) logger.msg(DEBUG, "Content: %s", response->Content());
delete response; response = NULL;
for (std::list<std::string>::const_iterator it = IDs.begin(); it != IDs.end(); ++it) {
logger.msg(WARNING, "Failed to process job: %s", *it);
......@@ -279,8 +285,8 @@ namespace Arc {
return false;
}
logger.msg(DEBUG, "Content: %s", response->Content());
Arc::XMLNode jobs_list(response->Content());
if(response->Content()) logger.msg(DEBUG, "Content: %s", response->Content());
Arc::XMLNode jobs_list(response->Content()?response->Content():"");
delete response; response = NULL;
if(!jobs_list || (jobs_list.Name() != "jobs")) {
logger.msg(WARNING, "Failed to process jobs - failed to parse response");
......
......@@ -29,18 +29,18 @@ namespace Arc {
virtual bool GetURLToJobResource(const Job& job, Job::ResourceType resource, URL& url) const;
virtual bool GetJobDescription(const Job& job, std::string& desc_str) const;
private:
static URL GetAddressOfResource(const Job& job);
static Logger logger;
class InfoNodeProcessor {
public:
virtual void operator()(std::string const& job_id, XMLNode info_node) {};
};
bool ProcessJobs(Arc::URL const & resourceUrl, std::string const & action, int successCode,
static bool ProcessJobs(const UserConfig* usercfg, Arc::URL const & resourceUrl, std::string const & action, int successCode,
std::list<std::string>& IDs, std::list<std::string>& IDsProcessed, std::list<std::string>& IDsNotProcessed,
InfoNodeProcessor& infoNodeProcessor) const;
InfoNodeProcessor& infoNodeProcessor);
private:
static URL GetAddressOfResource(const Job& job);
static Logger logger;
};
......
......@@ -9,6 +9,7 @@
#include <arc/message/PayloadRaw.h>
#include <arc/communication/ClientInterface.h>
#include "JobControllerPluginREST.h"
#include "JobListRetrieverPluginREST.h"
namespace Arc {
......@@ -18,10 +19,11 @@ namespace Arc {
EndpointQueryingStatus JobListRetrieverPluginREST::Query(const UserConfig& usercfg, const Endpoint& endpoint, std::list<Job>& jobs, const EndpointQueryOptions<Job>&) const {
EndpointQueryingStatus s(EndpointQueryingStatus::FAILED);
URL url(endpoint.URLString);
URL url(CreateURL(endpoint.URLString));
if (!url) {
return s;
}
URL jobIDsUrl(url);
url.ChangePath(url.Path()+"/rest/1.0/jobs");
logger.msg(DEBUG, "Collecting Job (A-REX REST jobs) information.");
......@@ -48,6 +50,8 @@ namespace Arc {
return s;
if(jobs_list.Name() != "jobs")
return s;
std::list<std::string> IDs;
std::list<Job*> idJobs;
for(Arc::XMLNode job = jobs_list["job"]; (bool)job; ++job) {
std::string id = job["id"];
if(id.empty()) continue;
......@@ -73,8 +77,43 @@ namespace Arc {
// j.DelegationID.push_back(delegationId); - TODO: Implement through reading job.#.status
jobs.push_back(j);
idJobs.push_back(&(jobs.back()));
IDs.push_back(id);
}
class JobDelegationsProcessor: public JobControllerPluginREST::InfoNodeProcessor {
public:
JobDelegationsProcessor(std::list<Job*>& jobs): jobs(jobs) {}
virtual void operator()(std::string const& id, XMLNode node) {
std::string job_id = node["id"];
XMLNode job_delegation_id = node["delegation_id"];
if((bool)job_delegation_id && !job_id.empty()) {
for(std::list<Job*>::iterator itJob = jobs.begin(); itJob != jobs.end(); ++itJob) {
std::string id = (*itJob)->JobID;
std::string::size_type pos = id.rfind('/');
if(pos != std::string::npos) id.erase(0,pos+1);
if(job_id == id) {
while(job_delegation_id) {
(*itJob)->DelegationID.push_back((std::string)job_delegation_id);
++job_delegation_id;
}
break;
}
}
}
}
private:
std::list<Job*>& jobs;
};
std::list<std::string> processedIDs;
std::list<std::string> notProcessedIDs;
JobDelegationsProcessor delegationsProcessor(idJobs);
JobControllerPluginREST::ProcessJobs(&usercfg, jobIDsUrl, "delegations", 200, IDs, processedIDs, notProcessedIDs, delegationsProcessor);
// TODO: Because listing/obtaining content is too generic operation
// maybe it is unsafe to claim that operation suceeded if nothing
// was retrieved.
......
......@@ -34,10 +34,10 @@ namespace Arc {
return pos != std::string::npos && lower(endpoint.substr(0, pos)) != "http" && lower(endpoint.substr(0, pos)) != "https";
}
bool SubmitterPluginREST::GetDelegation(Arc::URL url, std::string& delegationId) const {
bool SubmitterPluginREST::GetDelegation(const UserConfig& usercfg, Arc::URL url, std::string& delegationId) {
std::string delegationRequest;
Arc::MCCConfig cfg;
usercfg->ApplyToConfig(cfg);
usercfg.ApplyToConfig(cfg);
Arc::ClientHTTP client(cfg, url);
std::string delegationPath;
if(delegationId.empty()) {
......@@ -45,7 +45,7 @@ namespace Arc {
Arc::PayloadRaw request;
Arc::PayloadRawInterface* response(NULL);
Arc::HTTPClientInfo info;
Arc::MCC_Status res = client.process(std::string("POST"), &request, &info, &response);
Arc::MCC_Status res = client.process(std::string("POST"), url.FullPath(), &request, &info, &response);
if(!res) {
logger.msg(VERBOSE, "Failed to communicate to delegation endpoint.");
delete response;
......@@ -73,12 +73,13 @@ namespace Arc {
}
delegationId = delegationPath.substr(id_pos+1);
} else {
url.ChangePath(url.Path() + "/" + delegationId);
url.AddHTTPOption("action","renew");
delegationPath = url.Path() + "/" + delegationId;
delegationPath = url.Path();
Arc::PayloadRaw request;
Arc::PayloadRawInterface* response(NULL);
Arc::HTTPClientInfo info;
Arc::MCC_Status res = client.process(std::string("POST"), delegationPath, &request, &info, &response);
Arc::MCC_Status res = client.process(std::string("POST"), url.FullPath(), &request, &info, &response);
if(!res) {
logger.msg(VERBOSE, "Failed to communicate to delegation endpoint.");
delete response;
......@@ -172,7 +173,7 @@ namespace Arc {
if(jobdescs.empty())
return retval;
if(!GetDelegation(delegationUrl, delegationId)) {
if(!GetDelegation(*usercfg, delegationUrl, delegationId)) {
logger.msg(INFO, "Unable to submit jobs. Failed to delegate credentials.");
for (std::list<JobDescription>::const_iterator it = jobdescs.begin(); it != jobdescs.end(); ++it) {
notSubmitted.push_back(&*it);
......
......@@ -31,8 +31,9 @@ namespace Arc {
virtual SubmissionStatus Submit(const std::list<JobDescription>& jobdescs, const ExecutionTarget& et, EntityConsumer<Job>& jc, std::list<const JobDescription*>& notSubmitted);
virtual bool Migrate(const std::string& jobid, const JobDescription& jobdesc, const ExecutionTarget& et, bool forcemigration, Job& job);
static bool GetDelegation(const UserConfig& usercfg, Arc::URL url, std::string& delegationId);
private:
bool GetDelegation(Arc::URL url, std::string& delegationId) const;
bool AddDelegation(std::string& product, std::string const& delegationId);
SubmissionStatus SubmitInternal(const std::list<JobDescription>& jobdescs, const ExecutionTarget* et, const std::string& endpoint,
EntityConsumer<Job>& jc, std::list<const JobDescription*>& notSubmitted);
......
......@@ -24,10 +24,10 @@ namespace Arc {
Logger TargetInformationRetrieverPluginREST::logger(Logger::getRootLogger(), "TargetInformationRetrieverPlugin.REST");
Arc::EndpointQueryingStatus TargetInformationRetrieverPluginREST::Query(const Arc::UserConfig& uc, const Arc::Endpoint& cie, std::list<Arc::ComputingServiceType>& csList, const Arc::EndpointQueryOptions<Arc::ComputingServiceType>&) const {
logger.msg(DEBUG, "Querying WSRF GLUE2 computing info endpoint.");
logger.msg(DEBUG, "Querying WSRF GLUE2 computing REST endpoint.");
// TODO: autoversion
URL url(cie.URLString);
URL url(CreateURL(cie.URLString));
if (!url) {
return EndpointQueryingStatus(EndpointQueryingStatus::FAILED,"URL "+cie.URLString+" can't be processed");
}
......
......@@ -730,7 +730,9 @@ Arc::MCC_Status ARexRest::processDelegations(Arc::Message& inmsg,Arc::Message& o
if(!delegation_stores_.GetRequest(config_.DelegationDir(),delegationId,config->GridName(),delegationRequest)) {
return HTTPFault(inmsg,outmsg,500,"Failed generating delegation request");
}
return HTTPPOSTResponse(inmsg,outmsg,delegationRequest,"application/x-pem-file",delegationId);
Arc::URL base(inmsg.Attributes()->get("HTTP:ENDPOINT"));
return HTTPPOSTResponse(inmsg,outmsg,delegationRequest,"application/x-pem-file",base.Path()+"/"+delegationId);
}
logger_.msg(Arc::VERBOSE, "process: method %s is not supported for subpath %s",context.method,context.processed);
return HTTPFault(inmsg,outmsg,501,"Not Implemented");
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment