Make Data Delivery code similar to DataMover in respect to Transfer method....

Make Data Delivery code similar to DataMover in respect to Transfer method. Fix reporting for case when Transfer is used and no endpoint reports file size.
parent 1c8dfcd1
......@@ -82,8 +82,10 @@ static void ReportStatus(DataStaging::DTRStatus::DTRStatusType st,
};
}
static void ReportOngoingStatus(unsigned long long int bytes) {
static unsigned long long int transfer_bytes = 0;
static void ReportOngoingStatus(unsigned long long int bytes) {
transfer_bytes = bytes;
// Send report on stdout
ReportStatus(DataStaging::DTRStatus::TRANSFERRING,
DataStaging::DTRErrorStatus::NONE_ERROR,
......@@ -330,32 +332,58 @@ int main(int argc,char* argv[]) {
bool eof_reached = false;
// checksum validation against supplied value
std::string calc_csum;
// These will stay positive if corresponding transfer type is not used
DataStatus source_st;
DataStatus dest_st;
DataStatus transfer_st;
// Check if datapoint handles transfer by itself
if (source->SupportsTransfer()) {
logger.msg(INFO, "Using internal transfer method of %s", source->str());
transfer_st = source->Transfer(dest->GetURL(), true, ReportOngoingStatus);
if (transfer_st.Passed()) {
eof_reached = true;
// so that full copy is reported back to scheduler
buffer.speed.verbose(false);
buffer.speed.transfer(GetFileSize(*source, *dest));
} else if (dest->Local()) {
dest->Remove(); // to allow retries
bool try_another_transfer = true;
if (try_another_transfer) {
if (source->SupportsTransfer()) {
logger.msg(INFO, "Using internal transfer method of %s", source->str());
transfer_st = source->Transfer(dest->GetURL(), true, ReportOngoingStatus);
if (transfer_st.Passed()) {
try_another_transfer = false;
eof_reached = true;
// so that full copy is reported back to scheduler
buffer.speed.verbose(false);
unsigned long long bytes = GetFileSize(*source, *dest);
if(bytes < transfer_bytes) bytes = transfer_bytes;
buffer.speed.transfer(bytes);
} else {
if (transfer_st != DataStatus::UnimplementedError) {
if (dest->Local())
dest->Remove(); // to allow retries
try_another_transfer = false;
} else {
logger.msg(INFO, "Internal transfer method is not supported for %s", source->str());
}
}
}
} else if (dest->SupportsTransfer()) {
logger.msg(INFO, "Using internal transfer method of %s", dest->str());
transfer_st = dest->Transfer(source->GetURL(), false, ReportOngoingStatus);
if (transfer_st.Passed()) {
eof_reached = true;
// so that full copy is reported back to scheduler
buffer.speed.verbose(false);
buffer.speed.transfer(GetFileSize(*source, *dest));
}
if (try_another_transfer) {
if (dest->SupportsTransfer()) {
logger.msg(INFO, "Using internal transfer method of %s", dest->str());
transfer_st = dest->Transfer(source->GetURL(), false, ReportOngoingStatus);
if (transfer_st.Passed()) {
try_another_transfer = false;
eof_reached = true;
// so that full copy is reported back to scheduler
buffer.speed.verbose(false);
unsigned long long bytes = GetFileSize(*source, *dest);
if(bytes < transfer_bytes) bytes = transfer_bytes;
buffer.speed.transfer(bytes);
} else {
if (transfer_st != DataStatus::UnimplementedError) {
try_another_transfer = false;
} else {
logger.msg(INFO, "Internal transfer method is not supported for %s", dest->str());
}
}
}
} else {
}
if (try_another_transfer) {
// Initiating transfer
source_st = source->StartReading(buffer);
if(!source_st) {
......@@ -419,9 +447,9 @@ int main(int argc,char* argv[]) {
buffer.speed.transferred_size(),
GetFileSize(*source,*dest),0);
// These will return false if buffer was not used
bool source_failed = buffer.error_read();
bool dest_failed = buffer.error_write();
// Error at source or destination
if(source_failed || !source_st) {
std::string err("Failed reading from source: "+source->CurrentLocation().str());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment