Commit c8c36c21 authored by Andrii Salnikov's avatar Andrii Salnikov

Merge branch 'master' into archery-timeout

parents 76b9681f 24855bb3
......@@ -1628,12 +1628,18 @@ if test "x$enables_xrootd" = "xyes"; then
fi
SAVE_CPPFLAGS=$CPPFLAGS
CPPFLAGS="$CPPFLAGS $XROOTD_CPPFLAGS"
AC_CHECK_HEADER(XrdPosix/XrdPosixXrootd.hh, [], [enables_xrootd="no"] [#include <iostream>])
AC_CHECK_HEADER(XrdCl/XrdClBuffer.hh, [], [
XROOTD_CPPFLAGS="-std=c++0x $XROOTD_CPPFLAGS"
CPPFLAGS="$SAVE_CPPFLAGS $XROOTD_CPPFLAGS"
# Disable Autoconf caching
unset ac_cv_header_XrdCl_XrdClBuffer_hh
AC_CHECK_HEADER(XrdCl/XrdClBuffer.hh, [], [enables_xrootd="no"])
])
CPPFLAGS=$SAVE_CPPFLAGS
SAVE_LDFLAGS=$LDFLAGS
LDFLAGS="$LDFLAGS $XROOTD_LDFLAGS"
AC_CHECK_LIB([XrdPosix], [_init],
[XROOTD_LIBS="$XROOTD_LDFLAGS -lXrdPosix"], [enables_xrootd="no"])
[XROOTD_LIBS="$XROOTD_LDFLAGS -lXrdPosix -lXrdCl"], [enables_xrootd="no"])
LDFLAGS=$SAVE_LDFLAGS
fi
AC_SUBST(XROOTD_CPPFLAGS)
......
......@@ -36,7 +36,7 @@
#
# xROOTd
#
%if %{?fedora}%{!?fedora:0} >= 12 || %{?rhel}%{!?rhel:0}
%if %{?fedora}%{!?fedora:0} >= 24 || %{?rhel}%{!?rhel:0}
%global with_xrootd %{!?_without_xrootd:1}%{?_without_xrootd:0}
%else
%global with_xrootd 0
......@@ -218,7 +218,7 @@ BuildRequires: python2-devel
BuildRequires: python3-devel
%endif
%if %{?rhel}%{!?rhel:0} == 7
BuildRequires: python34-devel
BuildRequires: python36-devel
%endif
%if %{with_pylint}
BuildRequires: pylint
......@@ -243,11 +243,7 @@ BuildRequires: globus-gssapi-gsi-devel >= 12.2
BuildRequires: globus-gssapi-gsi-devel < 12.2
%endif
%if %{with_xrootd}
%if %{?fedora}%{!?fedora:0} >= 17 || %{?rhel}%{!?rhel:0} >= 5
BuildRequires: xrootd-client-devel
%else
BuildRequires: xrootd-devel
%endif
BuildRequires: xrootd-client-devel >= 4.5.0
%endif
%if %{with_gfal}
BuildRequires: gfal2-devel
......
......@@ -262,8 +262,9 @@
#subject=/O=Grid/O=Big VO/CN=Main Boss
#subject=/O=Grid/O=Big VO/CN=Deputy Boss
## file = path - Processes a list of DNs stored in an external file one per line and adds those to
## the authgroup.
## file = path - Processes a list of DNs stored in an external file one per line
## in grid-mapfile format (see map_with_file from [mapping] block, unixname is ignored)
## and adds those to the authgroup.
## sequenced
## default: undefined
#file=/etc/grid-security/local_users
......@@ -373,9 +374,10 @@
#map_to_pool=atlas /etc/grid-security/pool/atlas
## map_with_file = authgroup_name file - for users that belongs to specified
## authgroup the subject of certificate is matched against a list of subjects
## stored in the specified "file", one per line followed by a local UNIX account
## name. This rule can be used to implement legacy grid-mapfile aproach.
## authgroup the DN of certificate is matched against a list of DNs stored in
## the specified "file", one per line followed by a local UNIX account name.
## The DN must be quoted if it contains blank spaces.
## This rule can be used to implement legacy grid-mapfile aproach.
## sequenced
## default: undefined
#map_with_file=any /etc/grid-security/grid-mapfile
......
......@@ -623,7 +623,7 @@ namespace ArcDMCGFAL {
return DataStatus::Success;
}
DataStatus DataPointGFAL::Transfer3rdParty(const URL& source, const URL& destination, DataPoint::Callback3rdParty callback) {
DataStatus DataPointGFAL::Transfer3rdParty(const URL& source, const URL& destination, DataPoint::TransferCallback callback) {
if (source.Protocol() == "lfc") lfc_host = source.Host();
GFALEnvLocker gfal_lock(usercfg, lfc_host);
......
......@@ -48,7 +48,7 @@ namespace ArcDMCGFAL {
protected:
// 3rd party transfer (destination pulls from source)
virtual DataStatus Transfer3rdParty(const URL& source, const URL& destination, DataPoint::Callback3rdParty callback = NULL);
virtual DataStatus Transfer3rdParty(const URL& source, const URL& destination, DataPoint::TransferCallback callback = NULL);
private:
DataStatus do_stat(const URL& stat_url, FileInfo& file, DataPointInfoType verb);
......
......@@ -11,7 +11,7 @@ namespace ArcDMCGFAL {
// Callback passed to gfal. It calls DataPoint callback and fills relevant
// info. DataPoint pointer is stored in user_data.
void GFALTransfer3rdParty::gfal_3rd_party_callback(gfalt_transfer_status_t h, const char* src, const char* dst, gpointer user_data) {
DataPoint::Callback3rdParty* cb = (DataPoint::Callback3rdParty*)user_data;
DataPoint::TransferCallback* cb = (DataPoint::TransferCallback*)user_data;
if (cb && *cb) {
GError * err = NULL;
size_t bytes = gfalt_copy_get_bytes_transfered(h, &err);
......@@ -25,7 +25,7 @@ namespace ArcDMCGFAL {
}
GFALTransfer3rdParty::GFALTransfer3rdParty(const URL& src, const URL& dest,
const Arc::UserConfig& cfg, DataPoint::Callback3rdParty cb)
const Arc::UserConfig& cfg, DataPoint::TransferCallback cb)
: source(src), destination(dest), transfer_timeout(cfg.Timeout()), callback(cb) {};
DataStatus GFALTransfer3rdParty::Transfer() {
......
......@@ -14,14 +14,14 @@ namespace ArcDMCGFAL {
public:
/// Constructor
GFALTransfer3rdParty(const URL& source, const URL& dest,
const Arc::UserConfig& cfg, DataPoint::Callback3rdParty cb);
const Arc::UserConfig& cfg, DataPoint::TransferCallback cb);
/// Start transfer
DataStatus Transfer();
private:
URL source;
URL destination;
int transfer_timeout;
DataPoint::Callback3rdParty callback;
DataPoint::TransferCallback callback;
static Logger logger;
/// Callback that is passed to GFAL2. It calls our Callback3rdParty callback
static void gfal_3rd_party_callback(gfalt_transfer_status_t h, const char* src, const char* dst, gpointer user_data);
......
......@@ -232,6 +232,13 @@ namespace ArcDMCRucio {
return DataStatus::Success;
}
DataStatus DataPointRucio::CompareLocationMetadata() const {
if (CurrentLocationHandle() && CurrentLocationHandle()->GetURL().HTTPOption("xrdcl.unzip", "") == "") {
return DataPointIndex::CompareLocationMetadata();
}
return DataStatus::Success;
}
DataStatus DataPointRucio::PreRegister(bool replication, bool force) {
if (url.Path().find("/objectstores/") == 0) return DataStatus::Success;
return DataStatus(DataStatus::PreRegisterError, ENOTSUP, "Writing to Rucio is not supported");
......
......@@ -58,6 +58,8 @@ namespace ArcDMCRucio {
virtual Arc::DataStatus List(std::list<Arc::FileInfo>& files, Arc::DataPoint::DataPointInfoType verb = INFO_TYPE_ALL);
virtual Arc::DataStatus CreateDirectory(bool with_parents=false);
virtual Arc::DataStatus Rename(const Arc::URL& newurl);
// Override to disable checks for zip archives
virtual Arc::DataStatus CompareLocationMetadata() const;
protected:
static Arc::Logger logger;
private:
......
......@@ -3,6 +3,9 @@
#endif
#include <fcntl.h>
#include <XrdCl/XrdClPropertyList.hh>
#include <XrdCl/XrdClDefaultEnv.hh>
#include <XrdCl/XrdClLog.hh>
#include <arc/StringConv.h>
#include <arc/data/DataBuffer.h>
......@@ -17,6 +20,20 @@ namespace ArcDMCXrootd {
Logger DataPointXrootd::logger(Logger::getRootLogger(), "DataPoint.Xrootd");
XrdPosixXrootd DataPointXrootd::xrdposix;
XrootdProgressHandler::XrootdProgressHandler(DataPoint::TransferCallback callback)
: cb(callback), cancel(false) {}
void XrootdProgressHandler::JobProgress(uint16_t jobNum,
uint64_t bytesProcessed,
uint64_t bytesTotal) {
cb(bytesProcessed);
}
bool XrootdProgressHandler::ShouldCancel(uint64_t jobNum) {
return cancel;
}
DataPointXrootd::DataPointXrootd(const URL& url, const UserConfig& usercfg, PluginArgument* parg)
: DataPointDirect(url, usercfg, parg),
fd(-1),
......@@ -44,6 +61,54 @@ namespace ArcDMCXrootd {
return new DataPointXrootd(*dmcarg, *dmcarg, dmcarg);
}
DataStatus DataPointXrootd::copy_file(std::string source, std::string dest, TransferCallback callback) {
XrdCl::PropertyList props;
XrdCl::PropertyList results;
// Check for special source/dest to handle
if (source.find("file:/") == 0) {
// xrootd doesn't like the arc file:/ urls so remove the protocol
source = source.substr(5);
}
if (source == "stdio:/stdin") {
source = "-";
}
if (dest.find("file:/") == 0) {
// xrootd doesn't like the arc file:/ urls so remove the protocol
dest = dest.substr(5);
}
if (dest == "stdio:/stdout") {
dest = "-";
}
props.Set("source", source);
props.Set("target", dest);
XrdCl::CopyProcess copy;
XrdCl::XRootDStatus st = copy.AddJob(props, &results);
if (!st.IsOK()) {
logger.msg(ERROR, "Failed to create xrootd copy job: %s", st.GetErrorMessage());
return DataStatus(DataStatus::TransferError, st.GetErrorMessage());
}
XrdCl::PropertyList processConfig;
processConfig.Set("jobType", "configuration" );
processConfig.Set("parallel", 1 );
copy.AddJob(processConfig, 0 );
copy.Prepare();
XrdCl::CopyProgressHandler * cph = NULL;
if (callback) {
cph = new XrootdProgressHandler(callback);
}
st = copy.Run(cph);
if (cph) delete cph;
if (!st.IsOK()) {
logger.msg(ERROR, "Failed to copy %s: %s", source, st.GetErrorMessage());
return DataStatus(DataStatus::TransferError, st.GetErrorMessage());
}
return DataStatus::Success;
}
void DataPointXrootd::read_file_start(void* arg) {
((DataPointXrootd*)arg)->read_file();
}
......@@ -486,6 +551,18 @@ namespace ArcDMCXrootd {
return DataStatus::Success;
}
DataStatus DataPointXrootd::Transfer(const URL& otherendpoint, bool source, TransferCallback callback) {
if (source) {
return copy_file(url.plainstr(), otherendpoint.plainstr(), callback);
} else {
return copy_file(otherendpoint.plainstr(), url.plainstr(), callback);
}
}
bool DataPointXrootd::SupportsTransfer() const {
return true;
}
bool DataPointXrootd::RequiresCredentialsInFile() const {
return true;
}
......@@ -494,8 +571,15 @@ namespace ArcDMCXrootd {
// TODO xrootd lib logs to stderr - need to redirect to log file for DTR
// Level 1 enables some messages which go to stdout - which messes up
// communication in DTR so better to use no debugging
if (logger.getThreshold() == DEBUG) XrdPosixXrootd::setDebug(1);
else XrdPosixXrootd::setDebug(0);
XrdCl::Log *log = XrdCl::DefaultEnv::GetLog();
if (logger.getThreshold() == DEBUG) {
XrdPosixXrootd::setDebug(1);
log->SetLevel(XrdCl::Log::DumpMsg);
}
else {
XrdPosixXrootd::setDebug(0);
log->SetLevel(XrdCl::Log::ErrorMsg);
}
}
} // namespace ArcDMCXrootd
......
......@@ -3,6 +3,7 @@
#include <list>
#include <XrdPosix/XrdPosixXrootd.hh>
#include <XrdCl/XrdClCopyProcess.hh>
#include <arc/data/DataPointDirect.h>
......@@ -10,6 +11,19 @@ namespace ArcDMCXrootd {
using namespace Arc;
/**
* Progress handler class that is used to pass data to callback in Transfer()
*/
class XrootdProgressHandler : public XrdCl::CopyProgressHandler {
public:
XrootdProgressHandler(DataPoint::TransferCallback callback);
void JobProgress(uint16_t jobNum, uint64_t bytesProcessed, uint64_t bytesTotal);
bool ShouldCancel(uint64_t jobNum);
private:
DataPoint::TransferCallback cb;
bool cancel;
};
/**
* xrootd is a protocol for data access across large scale storage clusters.
* More information can be found at http://xrootd.slac.stanford.edu/
......@@ -35,6 +49,8 @@ namespace ArcDMCXrootd {
virtual DataStatus Remove();
virtual DataStatus CreateDirectory(bool with_parents=false);
virtual DataStatus Rename(const URL& newurl);
virtual DataStatus Transfer(const URL& otherendpoint, bool source, TransferCallback callback = NULL);
virtual bool SupportsTransfer() const;
virtual bool RequiresCredentialsInFile() const;
private:
......@@ -43,6 +59,7 @@ namespace ArcDMCXrootd {
static void write_file_start(void* arg);
void read_file();
void write_file();
DataStatus copy_file(std::string source, std::string dest, TransferCallback callback = NULL);
/// must be called everytime a new XrdClient is created
void set_log_level();
......
......@@ -378,6 +378,7 @@ namespace Arc {
protocol == "httpg" ||
protocol == "arc" ||
protocol == "srm" ||
protocol == "root" ||
protocol == "rucio" ) {
pos = path.find("?");
if (pos != std::string::npos) {
......
......@@ -991,6 +991,7 @@ namespace Arc {
if (node_ == NULL) return "";
xmlNsPtr ns_ = xmlSearchNsByHref(node_->doc, node_, (const xmlChar*)urn);
if (!ns_) return "";
if (!(ns_->prefix)) return "";
return (char*)(ns_->prefix);
}
......
This diff is collapsed.
......@@ -111,7 +111,11 @@ namespace Arc {
return DataStatus::Success;
}
DataStatus DataPoint::Transfer3rdParty(const URL& source, const URL& destination, Callback3rdParty callback) {
DataStatus DataPoint::Transfer(const URL& otherendpoint, bool source, TransferCallback callback) {
return DataStatus(DataStatus::UnimplementedError, EOPNOTSUPP);
}
DataStatus DataPoint::Transfer3rdParty(const URL& source, const URL& destination, TransferCallback callback) {
return DataStatus(DataStatus::UnimplementedError, EOPNOTSUPP);
}
......@@ -207,6 +211,10 @@ namespace Arc {
return false;
}
bool DataPoint::SupportsTransfer() const {
return false;
}
void DataPoint::SetMeta(const DataPoint& p) {
if (!CheckSize())
SetSize(p.GetSize());
......@@ -259,7 +267,7 @@ namespace Arc {
}
DataStatus DataPoint::Transfer3rdParty(const URL& source, const URL& destination,
const UserConfig& usercfg, Callback3rdParty callback) {
const UserConfig& usercfg, TransferCallback callback) {
// to load GFAL instead of ARC's DMCs we create a fake URL with gfal protocol
URL gfal_url(destination);
gfal_url.ChangeProtocol("gfal");
......
......@@ -122,13 +122,13 @@ namespace Arc {
: public Plugin {
public:
/// Callback for use in 3rd party transfer.
/// Callback for use in protocol-internal or 3rd party transfers.
/**
* Will be called periodically during the transfer with the number of bytes
* transferred so far.
* \param bytes_transferred the number of bytes transferred so far
*/
typedef void(*Callback3rdParty)(unsigned long long int bytes_transferred);
typedef void(*TransferCallback)(unsigned long long int bytes_transferred);
/// Describes the latency to access this URL
/**
......@@ -159,6 +159,22 @@ namespace Arc {
INFO_TYPE_ALL = 127 ///< All the parameters.
};
/// Do a transfer in a single operation
/**
* This method is designed for plugins which support doing transfers
* between different endpoints in a single operation, rather than two
* separate DataPoints communicating with each other via a DataBuffer.
* \param endpoint Other endpoint to transfer to or from
* \param source Whether this DataPoint is the source (true) or destination
* (false)
* \param callback Optional monitoring callback
* \return Outcome of transfer
* \since 6.0.0
*/
virtual DataStatus Transfer(const URL& otherendpoint, bool source,
TransferCallback callback = NULL);
/// Perform third party transfer.
/**
* Credentials are delegated to the destination and it pulls data from the
......@@ -175,7 +191,7 @@ namespace Arc {
* \return outcome of transfer
*/
static DataStatus Transfer3rdParty(const URL& source, const URL& destination,
const UserConfig& usercfg, Callback3rdParty callback = NULL);
const UserConfig& usercfg, TransferCallback callback = NULL);
/// Destructor.
virtual ~DataPoint();
......@@ -643,6 +659,9 @@ namespace Arc {
/// Check if URL should be staged or queried for Transport URL (TURL)
virtual bool IsStageable() const;
/// Returns true if DataPoint supports internal transfer
virtual bool SupportsTransfer() const;
/// Check if endpoint can have any use from meta information.
virtual bool AcceptsMeta() const = 0;
......@@ -825,7 +844,7 @@ namespace Arc {
* \param callback Optional monitoring callback
* \return outcome of transfer
*/
virtual DataStatus Transfer3rdParty(const URL& source, const URL& destination, Callback3rdParty callback = NULL);
virtual DataStatus Transfer3rdParty(const URL& source, const URL& destination, TransferCallback callback = NULL);
};
/** \cond Class used by DataHandle to load the required DMC. */
......
......@@ -327,6 +327,19 @@ namespace Arc {
return registered;
}
bool DataPointIndex::SupportsTransfer() const {
if (!h || !*h)
return false;
return (*h)->SupportsTransfer();
}
DataStatus DataPointIndex::Transfer(const URL& otherendpoint, bool source,
TransferCallback callback) {
if (!h || !*h)
return DataStatus::NoLocationError;
return (*h)->Transfer(otherendpoint, source, callback);
}
DataStatus DataPointIndex::StartReading(DataBuffer& buffer) {
if (!h || !*h)
return DataStatus::NoLocationError;
......
......@@ -46,7 +46,7 @@ namespace Arc {
virtual void SetCheckSum(const std::string& val);
virtual void SetSize(const unsigned long long int val);
virtual bool Registered() const;
virtual bool SupportsTransfer() const;
virtual void SetTries(const int n);
// the following are relayed to the current location
......@@ -67,6 +67,8 @@ namespace Arc {
virtual DataStatus FinishWriting(bool error = false);
virtual std::vector<URL> TransferLocations() const;
virtual void ClearTransferLocations();
virtual DataStatus Transfer(const URL& otherendpoint, bool source,
TransferCallback callback = NULL);
virtual DataStatus Check(bool check_meta);
......
......@@ -25,13 +25,20 @@ AuthResult AuthUser::match_file(const char* line) {
for(;f.good();) {
std::string buf;
getline(f,buf);
buf = Arc::trim(buf);
if(buf.empty()) continue;
AuthResult res = match_subject(buf.c_str());
if(res != AAA_NO_MATCH) {
f.close();
return res;
};
//buf = Arc::trim(buf);
std::string::size_type p = 0;
for(;p<buf.length();++p) if(!isspace(buf[p])) break;
// Extract subject from the line.
// It is either till white space or quoted string.
// There are also comment lines starting from # and empty lines.
if(p>=buf.length()) continue;
if(buf[p] == '#') continue;
std::string subj;
p = Arc::get_token(subj,buf,p," ","\"","\"");
if(subj.empty()) continue; // can't match empty subject - it is dangerous
if(subject_ != subj) continue;
f.close();
return AAA_POSITIVE_MATCH;
};
f.close();
return AAA_NO_MATCH;
......
......@@ -20,6 +20,7 @@ using namespace Arc;
static Arc::Logger logger(Arc::Logger::getRootLogger(), "DataDelivery");
static bool delivery_shutdown = false;
static Arc::Time start_time;
static void sig_shutdown(int)
{
......@@ -81,6 +82,22 @@ static void ReportStatus(DataStaging::DTRStatus::DTRStatusType st,
};
}
static void ReportOngoingStatus(unsigned long long int bytes) {
// Send report on stdout
ReportStatus(DataStaging::DTRStatus::TRANSFERRING,
DataStaging::DTRErrorStatus::NONE_ERROR,
DataStaging::DTRErrorStatus::NO_ERROR_LOCATION,
"", bytes, 0, 0);
// Log progress in log
time_t t = Arc::Period(Arc::Time() - start_time).GetPeriod();
logger.msg(INFO, "%5u s: %10.1f kB %8.1f kB/s",
(unsigned int)t,
((double)bytes) / 1024,
(t == 0) ? 0 : ((double)bytes) / 1024 / t);
}
static unsigned long long int GetFileSize(const DataPoint& source, const DataPoint& dest) {
if(source.CheckSize()) return source.GetSize();
if(dest.CheckSize()) return dest.GetSize();
......@@ -89,7 +106,6 @@ static unsigned long long int GetFileSize(const DataPoint& source, const DataPoi
int main(int argc,char* argv[]) {
Arc::Time start_time;
// log to stderr
Arc::Logger::getRootLogger().setThreshold(Arc::VERBOSE); //TODO: configurable
Arc::LogStream logcerr(std::cerr);
......@@ -225,7 +241,7 @@ int main(int argc,char* argv[]) {
else if (!proxy_cred.empty()) source_cfg.CredentialString(proxy_cred);
if(!source_ca_path.empty()) source_cfg.CACertificatesDirectory(source_ca_path);
//source_cfg.UtilsDirPath(...); - probably not needed
DataHandle source(source_url,source_cfg);
DataHandle source(source_url, source_cfg);
if(!source) {
logger.msg(ERROR, "Source URL not supported: %s", source_url.str());
_exit(-1);
......@@ -310,50 +326,80 @@ int main(int argc,char* argv[]) {
}
}
// Initiating transfer
DataStatus source_st = source->StartReading(buffer);
if(!source_st) {
ReportStatus(DataStaging::DTRStatus::TRANSFERRED,
(source_url.Protocol()!="file") ?
(source_st.Retryable() ? DataStaging::DTRErrorStatus::TEMPORARY_REMOTE_ERROR :
DataStaging::DTRErrorStatus::PERMANENT_REMOTE_ERROR) :
DataStaging::DTRErrorStatus::LOCAL_FILE_ERROR,
DataStaging::DTRErrorStatus::ERROR_SOURCE,
std::string("Failed reading from source: ")+source->CurrentLocation().str()+
" : "+std::string(source_st),
0,0,0);
_exit(-1);
//return -1;
};
DataStatus dest_st = dest->StartWriting(buffer);
if(!dest_st) {
ReportStatus(DataStaging::DTRStatus::TRANSFERRED,
(dest_url.Protocol() != "file") ?
(dest_st.Retryable() ? DataStaging::DTRErrorStatus::TEMPORARY_REMOTE_ERROR :
DataStaging::DTRErrorStatus::PERMANENT_REMOTE_ERROR) :
DataStaging::DTRErrorStatus::LOCAL_FILE_ERROR,
DataStaging::DTRErrorStatus::ERROR_DESTINATION,
std::string("Failed writing to destination: ")+dest->CurrentLocation().str()+
" : "+std::string(dest_st),
0,0,0);
_exit(-1);
//return -1;
};
// While transfer is running in another threads
// here we periodically report status to parent
bool reported = false;
bool eof_reached = false;
for(;!buffer.error() && !delivery_shutdown;) {
if(buffer.eof_read() && buffer.eof_write()) {
eof_reached = true; break;
// checksum validation against supplied value
std::string calc_csum;
DataStatus source_st;
DataStatus dest_st;
DataStatus transfer_st;
// Check if datapoint handles transfer by itself
if (source->SupportsTransfer()) {
logger.msg(INFO, "Using internal transfer method of %s", source->str());
transfer_st = source->Transfer(dest->GetURL(), true, ReportOngoingStatus);
if (transfer_st.Passed()) {
eof_reached = true;
// so that full copy is reported back to scheduler
buffer.speed.verbose(false);
buffer.speed.transfer(GetFileSize(*source, *dest));
}
} else if (dest->SupportsTransfer()) {
logger.msg(INFO, "Using internal transfer method of %s", dest->str());
transfer_st = dest->Transfer(source->GetURL(), false, ReportOngoingStatus);
if (transfer_st.Passed()) {
eof_reached = true;
// so that full copy is reported back to scheduler
buffer.speed.verbose(false);
buffer.speed.transfer(GetFileSize(*source, *dest));
}
} else {
// Initiating transfer
source_st = source->StartReading(buffer);
if(!source_st) {
ReportStatus(DataStaging::DTRStatus::TRANSFERRED,
(source_url.Protocol()!="file") ?
(source_st.Retryable() ? DataStaging::DTRErrorStatus::TEMPORARY_REMOTE_ERROR :
DataStaging::DTRErrorStatus::PERMANENT_REMOTE_ERROR) :
DataStaging::DTRErrorStatus::LOCAL_FILE_ERROR,
DataStaging::DTRErrorStatus::ERROR_SOURCE,
std::string("Failed reading from source: ")+source->CurrentLocation().str()+
" : "+std::string(source_st),
0,0,0);
_exit(-1);
//return -1;
};
ReportStatus(DataStaging::DTRStatus::TRANSFERRING,
DataStaging::DTRErrorStatus::NONE_ERROR,
DataStaging::DTRErrorStatus::NO_ERROR_LOCATION,
"",
buffer.speed.transferred_size(),
GetFileSize(*source,*dest),0);
buffer.wait_any();
};
dest_st = dest->StartWriting(buffer);
if(!dest_st) {
ReportStatus(DataStaging::DTRStatus::TRANSFERRED,
(dest_url.Protocol() != "file") ?
(dest_st.Retryable() ? DataStaging::DTRErrorStatus::TEMPORARY_REMOTE_ERROR :
DataStaging::DTRErrorStatus::PERMANENT_REMOTE_ERROR) :
DataStaging::DTRErrorStatus::LOCAL_FILE_ERROR,
DataStaging::DTRErrorStatus::ERROR_DESTINATION,
std::string("Failed writing to destination: ")+dest->CurrentLocation().str()+
" : "+std::string(dest_st),
0,0,0);
_exit(-1);
//return -1;
}
// While transfer is running in another threads
// here we periodically report status to parent
for(;!buffer.error() && !delivery_shutdown;) {
if(buffer.eof_read() && buffer.eof_write()) {
eof_reached = true; break;
};
ReportStatus(DataStaging::DTRStatus::TRANSFERRING,
DataStaging::DTRErrorStatus::NONE_ERROR,
DataStaging::DTRErrorStatus::NO_ERROR_LOCATION,
"",