Commit 5f430729 authored by Aleksandr Konstantinov's avatar Aleksandr Konstantinov

Merge branch 'dmc_external_2' into 'master'

Adding support for Transfer and Transfer3rdParty in external DMCs. (Fixes BUGZ-3890)

See merge request nordugrid/arc!894
parents 7173ee72 35011798
......@@ -44,6 +44,21 @@ namespace ArcDMCGridFTP {
return is_secure;
}
bool DataPointGridFTPDelegate::SetURL(const URL& u) {
if ((u.Protocol() != "gsiftp") && (u.Protocol() != "ftp")) {
return false;
}
if (u.Host() != url.Host()) {
return false;
}
// Globus FTP handle allows changing url completely
url = u;
if(triesleft < 1) triesleft = 1;
ResetMeta();
return true;
}
} // namespace ArcDMCGridFTP
extern Arc::PluginDescriptor const ARC_PLUGINS_TABLE_NAME[] = {
......
......@@ -31,6 +31,7 @@ namespace ArcDMCGridFTP {
virtual ~DataPointGridFTPDelegate();
static Plugin* Instance(PluginArgument *arg);
virtual bool RequiresCredentials() const;
virtual bool SetURL(const Arc::URL&);
private:
bool is_secure;
......
This diff is collapsed.
......@@ -52,17 +52,24 @@ namespace ArcDMCSRM {
virtual DataStatus Rename(const URL& newurl);
virtual const std::string DefaultCheckSum() const;
virtual bool ProvidesMeta() const;
virtual bool AcceptsMeta() const;
virtual bool IsStageable() const;
virtual std::vector<URL> TransferLocations() const;
virtual void ClearTransferLocations();
// Methods below are delegated to underlying TURL handler
virtual DataStatus Transfer(const URL& otherendpoint, bool source, TransferCallback callback = NULL);
virtual bool SupportsTransfer() const;
private:
SRMClientRequest *srm_request; /* holds SRM request ID between Prepare* and Finish* */
AutoPointer<SRMClientRequest> srm_request; /* holds SRM request ID between Prepare* and Finish* */
static Logger logger;
std::vector<URL> turls; /* TURLs returned from prepare methods */
URL r_url; /* URL used for redirected operations in Start/Stop Reading/Writing */
DataHandle *r_handle; /* handle used for redirected operations in Start/Stop Reading/Writing */
mutable AutoPointer<DataHandle> r_handle; /* handle used for redirected operations in Start/Stop Reading/Writing */
bool reading;
bool writing;
DataStatus SetupHandler(DataStatus::DataStatusType base_error) const;
DataStatus ListFiles(std::list<FileInfo>& files, DataPointInfoType verb, int recursion);
/** Check protocols given in list can be used, and if not remove them */
void CheckProtocols(std::list<std::string>& transport_protocols);
......
......@@ -433,7 +433,7 @@ namespace ArcDMCXrootd {
logger.msg(WARNING, "Not getting checksum of zip constituent");
} else {
char buf[256];
if (XrdPosixXrootd::Getxattr(u.plainstr().c_str(), "xroot.cksum", &buf, sizeof(buf)) == -1) {
if (XrdPosixXrootd::Getxattr(u.plainstr().c_str(), "xroot.cksum", buf, sizeof(buf)) == -1) {
logger.msg(WARNING, "Could not get checksum of %s: %s", u.plainstr(), StrError(errno));
} else {
std::string csum(buf);
......@@ -481,6 +481,7 @@ namespace ArcDMCXrootd {
}
struct dirent* entry;
errno = 0; // reset because it is used as error indicator
while ((entry = XrdPosixXrootd::Readdir(dir))) {
FileInfo f;
if (verb > INFO_TYPE_NAME) {
......@@ -489,10 +490,12 @@ namespace ArcDMCXrootd {
}
f.SetName(entry->d_name);
files.push_back(f);
errno = 0;
}
if (errno != 0) {
logger.msg(VERBOSE, "Error while reading dir %s: %s", url.plainstr(), StrError(errno));
return DataStatus(DataStatus::ListError, errno);
int errNo = errno;
logger.msg(VERBOSE, "Error while reading dir %s: %s", url.plainstr(), StrError(errNo));
return DataStatus(DataStatus::ListError, errNo);
}
XrdPosixXrootd::Closedir(dir);
......
......@@ -41,6 +41,32 @@ namespace ArcDMCXrootd {
return true;
}
bool DataPointXrootdDelegate::RequiresCredentialsInFile() const {
return true;
}
bool DataPointXrootdDelegate::SupportsTransfer() const {
return true;
}
DataStatus DataPointXrootdDelegate::List(std::list<FileInfo>& files, DataPoint::DataPointInfoType verb) {
// Xrootd is slow when quering with higher verbosity. Here we simulate internal behavior of the
// plugin by performing Stat externally. Proper solution would be to push obtaineed infomation
// about eah file as soon as it is obtained. But so far quick hack shuld be enough.
if (verb <= INFO_TYPE_NAME)
return DataPointDelegate::List(files, verb);
DataStatus result = DataPointDelegate::List(files, INFO_TYPE_NAME);
if (!result) return result;
URL url_orig = url;
for(std::list<FileInfo>::iterator f = files.begin(); f != files.end(); ++f) {
std::string name = f->GetName();
url = Arc::URL(url_orig.plainstr() + '/' + name);
Stat(*f, verb);
f->SetName(name);
}
return result;
}
} // namespace ArcDMCGridFTP
extern Arc::PluginDescriptor const ARC_PLUGINS_TABLE_NAME[] = {
......
......@@ -23,6 +23,10 @@ namespace ArcDMCXrootd {
virtual ~DataPointXrootdDelegate();
static Plugin* Instance(PluginArgument *arg);
virtual bool RequiresCredentials() const;
virtual bool RequiresCredentialsInFile() const;
virtual bool SupportsTransfer() const;
virtual DataStatus List(std::list<FileInfo>& files, DataPoint::DataPointInfoType verb = INFO_TYPE_ALL);
};
} // namespace ArcDMCGridFTP
......
......@@ -14,6 +14,7 @@ namespace Arc {
char const DataExternalComm::DataStatusTag = 'S';
char const DataExternalComm::FileInfoTag = 'F';
char const DataExternalComm::DataChunkTag = 'D';
char const DataExternalComm::TransferStatusTag = 'T';
static char const entrySep = '\n';
static char const itemSep = ',';
......@@ -194,6 +195,7 @@ namespace Arc {
outstream<<"meta:"<<encode(attr->first)<<elemSep<<encode(attr->second)<<itemSep;
}
outstream<<entrySep;
outstream.flush();
return !outstream.fail();
}
......@@ -232,6 +234,32 @@ namespace Arc {
outstream<<status.GetErrno()<<itemSep;
outstream<<encode(status.GetDesc())<<itemSep;
outstream<<entrySep;
outstream.flush();
return !outstream.fail();
}
// -------------------------------------------
// ---------- TransferStatus -----------------
// -------------------------------------------
// ------------- control ---------------
bool DataExternalComm::InEntry(Arc::Run& run, int timeout, Arc::DataExternalComm::TransferStatus& status) {
try {
status.bytes_count = itemIn<uint64_t>(run,timeout);
return (InTag(run,timeout) == entrySep); // Check for proper end of entry
} catch(std::exception const&) {
}
return false;
}
// ------------- child ---------------
bool DataExternalComm::OutEntry(std::ostream& outstream, Arc::DataExternalComm::TransferStatus const& status) {
outstream<<status.bytes_count<<itemSep;
outstream<<entrySep;
outstream.flush();
return !outstream.fail();
}
......
......@@ -18,6 +18,7 @@ namespace Arc {
static char const DataStatusTag;
static char const FileInfoTag;
static char const DataChunkTag;
static char const TransferStatusTag;
template<typename T> static bool InEntry(std::istream& instream, T& entry) {
try {
......@@ -42,6 +43,14 @@ namespace Arc {
static bool OutEntry(std::ostream& outstream, Arc::FileInfo const& info);
static bool InEntry(Arc::Run& run, int timeout, Arc::FileInfo& info);
class TransferStatus {
public:
TransferStatus(unsigned long long int count) : bytes_count(count) {};
unsigned long long int bytes_count;
};
static bool OutEntry(std::ostream& outstream, TransferStatus const& info);
static bool InEntry(Arc::Run& run, int timeout, TransferStatus& info);
static bool OutEntry(std::ostream& outstream, Arc::DataStatus const& status);
static bool InEntry(Arc::Run& run, int timeout, Arc::DataStatus& status);
......
......@@ -71,7 +71,7 @@ namespace Arc {
//}
result = plugin->StartReading(buffer);
if (result.Passed()) {
if (!result.Passed()) {
plugin->FinishReading(true);
return result;
}
......@@ -95,8 +95,9 @@ namespace Arc {
plugin->StopReading();
plugin->FinishReading(buffer.error());
if(buffer.error())
if(buffer.error()) {
return DataStatus::ReadError;
}
return DataStatus::Success;
}
......@@ -135,7 +136,7 @@ namespace Arc {
//}
result = plugin->StartWriting(buffer);
if (result.Passed()) {
if (!result.Passed()) {
plugin->FinishReading(true);
return result;
}
......@@ -217,6 +218,26 @@ namespace Arc {
return plugin->Rename(newurl);
}
DataStatus DataExternalHelper::Transfer(const URL& otherendpoint, bool source, DataPoint::TransferCallback callback) {
if (!plugin)
return DataStatus::NotInitializedError;
return plugin->Transfer(otherendpoint, source, callback);
}
DataStatus DataExternalHelper::Transfer3rdParty(const URL& source, const URL& destination, DataPoint::TransferCallback callback) {
if (!plugin)
return DataStatus::NotInitializedError;
class DataPoint3rdParty: public DataPoint {
public:
DataStatus execute(const URL& source, const URL& destination, DataPoint::TransferCallback callback) {
return Transfer3rdParty(source, destination, callback);
}
};
return static_cast<DataPoint3rdParty*>(plugin)->execute(source, destination, callback);
}
DataExternalHelper::DataExternalHelper(char const * path, char const * name, const URL& url, const UserConfig& usercfg, std::istream& instream, std::ostream& outstream):
plugin(NULL),
plugins(NULL),
......@@ -258,6 +279,10 @@ namespace Arc {
} // namespace Arc
static void TransferCallback(unsigned long long int bytes_transferred) {
Arc::DataExternalComm::TransferStatus transfer_status(bytes_transferred);
Arc::DataExternalComm::OutEntry(std::cout<<Arc::DataExternalComm::TransferStatusTag, transfer_status);
}
int main(int argc, char* argv[]) {
using namespace Arc;
......@@ -320,7 +345,7 @@ int main(int argc, char* argv[]) {
std::cout << Arc::IString("%s version %s", "arc-dmc", VERSION) << std::endl;
exit(0);
}
std::cerr << Arc::IString("Expecting Command and URL provided");
std::cerr << Arc::IString("Expecting Module, Command and URL provided");
exit(1);
}
} catch(...) {
......@@ -396,6 +421,33 @@ int main(int argc, char* argv[]) {
throw Arc::DataStatus(Arc::DataStatus::GenericError, "Unexpected arguments");
}
result = handler->Stat(verb);
} else if(command == Arc::DataPointDelegate::TransferFromCommand) {
if(params.empty()) {
throw Arc::DataStatus(Arc::DataStatus::GenericError, "Expecting other URL among arguments");
}
std::string other_url_str = params.front(); params.pop_front();
if(!params.empty()) {
throw Arc::DataStatus(Arc::DataStatus::GenericError, "Unexpected arguments");
}
result = handler->Transfer(other_url_str,true,&TransferCallback);
} else if(command == Arc::DataPointDelegate::TransferToCommand) {
if(params.empty()) {
throw Arc::DataStatus(Arc::DataStatus::GenericError, "Expecting other URL among arguments");
}
std::string other_url_str = params.front(); params.pop_front();
if(!params.empty()) {
throw Arc::DataStatus(Arc::DataStatus::GenericError, "Unexpected arguments");
}
result = handler->Transfer(other_url_str,false,&TransferCallback);
} else if(command == Arc::DataPointDelegate::Transfer3rdCommand) {
if(params.empty()) {
throw Arc::DataStatus(Arc::DataStatus::GenericError, "Expecting other URL among arguments");
}
std::string other_url_str = params.front(); params.pop_front();
if(!params.empty()) {
throw Arc::DataStatus(Arc::DataStatus::GenericError, "Unexpected arguments");
}
result = handler->Transfer3rdParty(url_str,other_url_str,&TransferCallback);
} else {
if(!params.empty()) {
throw Arc::DataStatus(Arc::DataStatus::GenericError, "Unexpected arguments");
......
......@@ -43,6 +43,8 @@ namespace Arc {
DataStatus CreateDirectory(bool with_parents=false);
DataStatus Read();
DataStatus Write();
DataStatus Transfer(const URL& otherendpoint, bool source, DataPoint::TransferCallback callback);
DataStatus Transfer3rdParty(const URL& source, const URL& destination, DataPoint::TransferCallback callback);
static Logger logger;
};
......
......@@ -769,25 +769,46 @@ namespace Arc {
}
}
source_url.AddCheckSumObject(&crc_source);
if (source_url.SupportsTransfer()) {
logger.msg(INFO, "Using internal transfer method of %s", source_url.str());
URL dest_url(cacheable ? chdest.GetURL() : destination.GetURL());
DataStatus datares = source_url.Transfer(dest_url, true, show_progress ? transfer_cb : NULL);
if (!datares.Passed()) {
DataHandle d(destination.GetURL(), destination.GetUserConfig());
Delete(*d);
if (source.NextLocation()) {
logger.msg(VERBOSE, "(Re)Trying next source");
continue;
bool try_another_transfer = true;
if (try_another_transfer) {
if (source_url.SupportsTransfer()) {
logger.msg(INFO, "Using internal transfer method of %s", source_url.str());
URL dest_url(cacheable ? chdest.GetURL() : destination.GetURL());
DataStatus datares = source_url.Transfer(dest_url, true, show_progress ? transfer_cb : NULL);
if (!datares.Passed()) {
if (source.NextLocation()) {
logger.msg(VERBOSE, "(Re)Trying next source");
continue;
}
if (cacheable)
cache.StopAndDelete(canonic_url);
// Check if SupportsTransfer was too optimistic
if (datares != DataStatus::UnimplementedError) return datares;
logger.msg(INFO, "Internal transfer method is not supported for %s", source_url.str());
} else {
try_another_transfer = false;
}
if (cacheable)
cache.StopAndDelete(canonic_url);
return datares;
}
} else if (destination.SupportsTransfer()) {
logger.msg(INFO, "Using internal transfer method of %s", destination.str());
return destination.Transfer(source_url.GetURL(), false, transfer_cb);
} else {
}
if (try_another_transfer) {
if (destination.SupportsTransfer()) {
logger.msg(INFO, "Using internal transfer method of %s", destination.str());
DataStatus datares = destination.Transfer(source_url.GetURL(), false, show_progress ? transfer_cb : NULL);
if (!datares.Passed()) {
if (source.NextLocation()) {
logger.msg(VERBOSE, "(Re)Trying next source");
continue;
}
// Check if SupportsTransfer was too optimistic
if (datares != DataStatus::UnimplementedError) return datares;
logger.msg(INFO, "Internal transfer method is not supported for %s", destination.str());
} else {
try_another_transfer = false;
}
}
}
if (try_another_transfer) {
logger.msg(INFO, "Using buffered transfer method");
unsigned int wait_time;
DataStatus datares = source_url.PrepareReading(max_inactivity_time, wait_time);
if (!datares.Passed()) {
......
......@@ -29,6 +29,9 @@ namespace Arc {
char const * DataPointDelegate::StatCommand = "stat";
char const * DataPointDelegate::ListCommand = "list";
char const * DataPointDelegate::RenameCommand = "rename";
char const * DataPointDelegate::TransferFromCommand = "transferfrom";
char const * DataPointDelegate::TransferToCommand = "transferto";
char const * DataPointDelegate::Transfer3rdCommand = "transfer3";
Logger DataPointDelegate::logger(Logger::getRootLogger(), "DataPoint.Delegate");
......@@ -84,8 +87,11 @@ namespace Arc {
}
DataStatus DataPointDelegate::EndCommand(Arc::CountedPointer<Arc::Run>& run, DataStatus::DataStatusType errCode, char tag) {
if(tag == DataExternalComm::ErrorTag) {
return DataStatus(errCode, "Comunication error while waiting for data status from helper process for "+url.plainstr());
}
if(tag != DataExternalComm::DataStatusTag) {
return DataStatus(errCode, "Unexpected data status tag from helper process for "+url.plainstr());
return DataStatus(errCode, "Unexpected tag while waiting for data status from helper process for "+url.plainstr());
}
DataStatus result;
if(!DataExternalComm::InEntry(*run, 1000*usercfg.Timeout(), result)) {
......@@ -201,7 +207,7 @@ namespace Arc {
// If error in buffer then write thread have already called abort
if(buffer && !buffer->eof_read() && !buffer->error()) { // otherwise it will exit itself
logger.msg(VERBOSE, "StopWriting: aborting connection");
logger.msg(VERBOSE, "StopReading: aborting connection");
buffer->error_read(true);
}
helper_run->Kill(1); // kill anyway - it won't get worse
......@@ -227,14 +233,15 @@ namespace Arc {
if (it->buffer->eof_read()) break;
if (!it->buffer->for_read(h, l, true)) { // eof or error
if (it->buffer->error()) { // error -> abort reading
logger.msg(VERBOSE, "read_thread: for_read failed - aborting: %s",
it->url.plainstr());
logger.msg(VERBOSE, "read_thread: for_read failed - aborting: %s", it->url.plainstr());
}
break;
}
if(chunkReader.complete()) {
tag = DataExternalComm::InTag(*run, 1000 * it->usercfg.Timeout());
// No timeout here. Timeouts are implemented externally in DataMover through call to StopReading().
tag = DataExternalComm::InTag(*run, -1);
if(tag != DataExternalComm::DataChunkTag) {
logger.msg(DEBUG, "read_thread: non-data tag '%c' from external process - leaving: %s", tag, it->url.plainstr());
it->buffer->is_read(h, 0, 0);
break;
}
......@@ -243,6 +250,7 @@ namespace Arc {
unsigned long long int offset = 0;
unsigned long long int size = l;
if(!chunkReader.read(*run, 1000 * it->usercfg.Timeout(), (*(it->buffer))[h], offset, size)) {
logger.msg(VERBOSE, "read_thread: data read error from external process - aborting: %s", it->url.plainstr());
it->buffer->is_read(h, 0, 0);
it->buffer->error_read(true);
break;
......@@ -295,8 +303,8 @@ namespace Arc {
if(buffer && !buffer->eof_write() && !buffer->error()) { // otherwise it will exit itself
logger.msg(VERBOSE, "StopWriting: aborting connection");
buffer->error_write(true);
helper_run->Kill(1);
}
helper_run->Kill(1);
// Waiting for data transfer thread to finish
cond.wait();
helper_run = NULL;
......@@ -361,16 +369,16 @@ namespace Arc {
// no buffers and no errors - must be pure eof
o = buffer.eof_position();
DataExternalComm::DataChunkExtBuffer dc;
if((!DataExternalComm::OutTag(*run, timeout, DataExternalComm::DataChunkTag)) || (!dc.write(*run, timeout, NULL, o, 0))) {
// No timeout here. Timeouts are implemented externally through call to StopWriting().
if((!DataExternalComm::OutTag(*run, -1, DataExternalComm::DataChunkTag)) || (!dc.write(*run, -1, NULL, o, 0))) {
out_failed = true;
break;
}
buffer.eof_write(true);
break;
}
if(l > 0) {
DataExternalComm::DataChunkExtBuffer dc;
if((!DataExternalComm::OutTag(*run, timeout, DataExternalComm::DataChunkTag)) || (!dc.write(*run, timeout, buffer[h], o, l))) {
if((!DataExternalComm::OutTag(*run, -1, DataExternalComm::DataChunkTag)) || (!dc.write(*run, -1, buffer[h], o, l))) {
logger.msg(VERBOSE, "write_thread: out failed - aborting");
buffer.is_notwritten(h);
out_failed = true;
......@@ -379,18 +387,21 @@ namespace Arc {
}
buffer.is_written(h);
}
}
logger.msg(VERBOSE, "write_thread: exiting");
if(out_failed) {
buffer.error_write(true);
// Communication with helper failed but may still read status
it->data_status = it->EndCommand(run, DataStatus::WriteError);
} else if(buffer.error_write()) {
// Report generic error
it->data_status = DataStatus::WriteError;
logger.msg(VERBOSE, "write_thread: exiting");
if(out_failed) {
// Communication with helper failed but may still read status
it->data_status = it->EndCommand(run, DataStatus::WriteError);
buffer.error_write(true);
} else if(buffer.error_write()) {
// Report generic error
it->data_status = DataStatus::WriteError;
} else {
// So far so good - read status
it->data_status = it->EndCommand(run, DataStatus::WriteError);
buffer.eof_write(true);
}
} else {
// So far so good - read status
it->data_status = it->EndCommand(run, DataStatus::WriteError);
it->data_status = DataStatus::WriteError;
}
it->cond.signal(); // Report to control thread that data transfer thread finished
}
......@@ -467,11 +478,70 @@ namespace Arc {
Arc::CountedPointer<Arc::Run> run;
DataStatus result = StartCommand(run, argv, DataStatus::RenameError);
if(!result) return result;
result = EndCommand(run, DataStatus::ListError);
result = EndCommand(run, DataStatus::RenameError);
if(!result) return result;
return DataStatus::Success;
}
DataStatus DataPointDelegate::Transfer(const URL& otherendpoint, bool source, TransferCallback callback) {
if(!SupportsTransfer())
return DataStatus(DataStatus::UnimplementedError, EOPNOTSUPP);
if (reading) return DataStatus::IsReadingError;
if (writing) return DataStatus::IsWritingError;
std::list<std::string> argv(additional_args);
if (source) {
argv.push_back(TransferFromCommand);
} else {
argv.push_back(TransferToCommand);
}
argv.push_back(url.fullstr());
argv.push_back(otherendpoint.fullstr());
Arc::CountedPointer<Arc::Run> run;
DataStatus result = StartCommand(run, argv, DataStatus::TransferError);
if(!result) return result;
// Read callback information till end tag is received
// Looks like Transfer() method was designed to not timeout.
// Hence waiting for each tag indefinitely.
char tag = DataExternalComm::InTag(*run, -1);
while(tag == DataExternalComm::TransferStatusTag) {
DataExternalComm::TransferStatus transfer_status(0);
if(!DataExternalComm::InEntry(*run, 1000*usercfg.Timeout(), transfer_status)) {
return DataStatus(DataStatus::TransferError, "Failed to read data transfer status from helper process for "+url.plainstr());
}
if(callback) (*callback)(transfer_status.bytes_count);
tag = DataExternalComm::InTag(*run, -1);
}
result = EndCommand(run, DataStatus::TransferError, tag);
if(!result) return result;
return DataStatus::Success;
}
DataStatus DataPointDelegate::Transfer3rdParty(const URL& source, const URL& destination, TransferCallback callback) {
std::list<std::string> argv(additional_args);
argv.push_back(Transfer3rdCommand);
argv.push_back(source.fullstr());
argv.push_back(destination.fullstr());
Arc::CountedPointer<Arc::Run> run;
DataStatus result = StartCommand(run, argv, DataStatus::TransferError);
if(!result) return result;
// Read callback information till end tag is received
// Looks like Transfer3rdParty() method was designed to not timeout.
// Hence waiting for each tag indefinitely.
char tag = DataExternalComm::InTag(*run, -1);
while(tag == DataExternalComm::TransferStatusTag) {
DataExternalComm::TransferStatus transfer_status(0);
if(!DataExternalComm::InEntry(*run, 1000*usercfg.Timeout(), transfer_status)) {
return DataStatus(DataStatus::TransferError, "Failed to read data transfer status from helper process for "+url.plainstr());
}
if(callback) (*callback)(transfer_status.bytes_count);
tag = DataExternalComm::InTag(*run, -1);
}
result = EndCommand(run, DataStatus::TransferError, tag);
if(!result) return result;
return DataStatus::Success;
}
DataPointDelegate::DataPointDelegate(char const* module_name, const URL& url, const UserConfig& usercfg, PluginArgument* parg) :
DataPointDirect(url, usercfg, parg),
reading(false),
......@@ -516,21 +586,6 @@ namespace Arc {
return false;
}
bool DataPointDelegate::SetURL(const URL& u) {
// implement
if ((u.Protocol() != "gsiftp") && (u.Protocol() != "ftp")) {
return false;
}
if (u.Host() != url.Host()) {
return false;
}
// Globus FTP handle allows changing url completely
url = u;
if(triesleft < 1) triesleft = 1;
ResetMeta();
return true;
}
std::string::size_type const DataPointDelegate::LogRedirect::level_size_max_ = 32;
std::string::size_type const DataPointDelegate::LogRedirect::buffer_size_max_ = 4096;
......
......@@ -51,6 +51,9 @@ namespace Arc {
DataStatus EndCommand(Arc::CountedPointer<Arc::Run>& run, DataStatus::DataStatusType errCode);
DataStatus EndCommand(Arc::CountedPointer<Arc::Run>& run, DataStatus::DataStatusType errCode, char tag);
protected:
virtual DataStatus Transfer3rdParty(const URL& source, const URL& destination, TransferCallback callback = NULL);
public:
static char const * ReadCommand;
static char const * WriteCommand;
......@@ -61,6 +64,9 @@ namespace Arc {
static char const * StatCommand;
static char const * ListCommand;
static char const * RenameCommand;
static char const * TransferFromCommand;
static char const * TransferToCommand;
static char const * Transfer3rdCommand;
/// Create object which starts special executable which loads specified module
DataPointDelegate(char const* module_name, const URL& url, const UserConfig& usercfg, PluginArgument* parg);
......@@ -71,7 +77,6 @@ namespace Arc {
virtual ~DataPointDelegate();
static Plugin* Instance(PluginArgument *arg);
virtual bool SetURL(const URL& url);
virtual DataStatus StartReading(DataBuffer& buf);
virtual DataStatus StartWriting(DataBuffer& buf,
DataCallback *space_cb = NULL);
......@@ -83,6 +88,7 @@ namespace Arc {
virtual DataStatus Stat(FileInfo& file, DataPointInfoType verb = INFO_TYPE_ALL);
virtual DataStatus List(std::list<FileInfo>& files, DataPointInfoType verb = INFO_TYPE_ALL);
virtual DataStatus Rename(const URL& newurl);
virtual DataStatus Transfer(const URL& otherendpoint, bool source, TransferCallback callback = NULL);
virtual bool WriteOutOfOrder();
virtual bool ProvidesMeta() const;
virtual const std::string DefaultCheckSum() const;
......
......@@ -82,8 +82,10 @@ static void ReportStatus(DataStaging::DTRStatus::DTRStatusType st,
};
}
static void ReportOngoingStatus(unsigned long long int bytes) {
static unsigned long long int transfer_bytes = 0;
static void ReportOngoingStatus(unsigned long long int bytes) {
transfer_bytes = bytes;