XRootD
XrdHttpTpcTPC.cc
Go to the documentation of this file.
2 #include "XrdNet/XrdNetAddr.hh"
3 #include "XrdNet/XrdNetUtils.hh"
4 #include "XrdOuc/XrdOucEnv.hh"
5 #include "XrdSec/XrdSecEntity.hh"
8 #include "XrdSys/XrdSysFD.hh"
9 #include "XrdVersion.hh"
10 
14 #include "XrdOuc/XrdOucTUtils.hh"
16 #include "XrdHttp/XrdHttpUtils.hh"
17 
18 #include <curl/curl.h>
19 
20 #include <dlfcn.h>
21 #include <fcntl.h>
22 
23 #include <algorithm>
24 #include <memory>
25 #include <sstream>
26 #include <stdexcept>
27 #include <thread>
28 
29 #include "XrdHttpTpcState.hh"
30 #include "XrdHttpTpcStream.hh"
31 #include "XrdHttpTpcTPC.hh"
32 #include <fstream>
33 
34 using namespace TPC;
35 
36 XrdXrootdTpcMon* TPCHandler::TPCLogRecord::tpcMonitor = 0;
37 
38 uint64_t TPCHandler::m_monid{0};
39 int TPCHandler::m_marker_period = 5;
40 size_t TPCHandler::m_block_size = 16*1024*1024;
41 size_t TPCHandler::m_small_block_size = 1*1024*1024;
42 XrdSysMutex TPCHandler::m_monid_mutex;
43 bool TPCHandler::allowMissingCRL = false;
44 
46 
47 /******************************************************************************/
48 /* T P C H a n d l e r : : T P C L o g R e c o r d D e s t r u c t o r */
49 /******************************************************************************/
50 
51 TPCHandler::TPCLogRecord::~TPCLogRecord()
52 {
53 // Record monitoring data is enabled
54 //
55  if (tpcMonitor)
56  {XrdXrootdTpcMon::TpcInfo monInfo;
57 
58  monInfo.clID = clID.c_str();
59  monInfo.begT = begT;
60  gettimeofday(&monInfo.endT, 0);
61 
62  if (mTpcType == TpcType::Pull)
63  {monInfo.dstURL = local.c_str();
64  monInfo.srcURL = remote.c_str();
65  } else {
66  monInfo.dstURL = remote.c_str();
67  monInfo.srcURL = local.c_str();
69  }
70 
71  if (!status) monInfo.endRC = 0;
72  else if (tpc_status > 0) monInfo.endRC = tpc_status;
73  else monInfo.endRC = 1;
74  monInfo.strm = static_cast<unsigned char>(streams);
75  monInfo.fSize = (bytes_transferred < 0 ? 0 : bytes_transferred);
76  if (!isIPv6) monInfo.opts |= XrdXrootdTpcMon::TpcInfo::isIPv4;
77 
78  tpcMonitor->Report(monInfo);
79  }
80 }
81 
82 /******************************************************************************/
83 /* C u r l D e l e t e r : : o p e r a t o r ( ) */
84 /******************************************************************************/
85 
87 {
88  if (curl) curl_easy_cleanup(curl);
89 }
90 
91 /******************************************************************************/
92 /* s o c k o p t _ s e t c l o e x e c _ c a l l b a c k */
93 /******************************************************************************/
94 
103 int TPCHandler::sockopt_callback(void *clientp, curl_socket_t curlfd, curlsocktype purpose) {
104  TPCLogRecord * rec = (TPCLogRecord *)clientp;
105  if (purpose == CURLSOCKTYPE_IPCXN && rec && rec->pmarkManager.isEnabled()) {
106  // We will not reach this callback if the corresponding socket could not have been connected
107  // the socket is already connected only if the packet marking is enabled
108  return CURL_SOCKOPT_ALREADY_CONNECTED;
109  }
110  return CURL_SOCKOPT_OK;
111 }
112 
113 /******************************************************************************/
114 /* o p e n s o c k e t _ c a l l b a c k */
115 /******************************************************************************/
116 
117 
122 int TPCHandler::opensocket_callback(void *clientp,
123  curlsocktype purpose,
124  struct curl_sockaddr *aInfo)
125 {
126  /* CURLSOCKTYPE_IPCXN (for IP based connections) is the only type currently known by curl,
127  * so let's make sure to reject other types if they appear in the furure */
128  if (purpose != CURLSOCKTYPE_IPCXN)
129  return CURL_SOCKET_BAD;
130 
131  if (!aInfo)
132  return CURL_SOCKET_BAD;
133 
134  // Create the socket (note that O_CLOEXEC flag will be set)
135  int fd = XrdSysFD_Socket(aInfo->family, aInfo->socktype, aInfo->protocol);
136 
137  if (fd < 0) {
138  return CURL_SOCKET_BAD;
139  }
140 
141  if (!clientp)
142  return fd;
143 
144  XrdNetAddr thePeer(&(aInfo->addr));
145  TPCLogRecord *rec = static_cast<TPCLogRecord*>(clientp);
146 
147  /* Reject attempts to connect to local/private addresses unless allowed by configuration */
148  if ((!rec->allow_private && thePeer.isPrivate()) || (!rec->allow_local && thePeer.isLocal())) {
149  rec->tpc_status = 403; // Forbidden
150  rec->m_log->Emsg(rec->log_prefix.c_str(),
151  "Connection to local/private address is forbidden");
152  close(fd);
153  return CURL_SOCKET_BAD;
154  }
155 
156  rec->isIPv6 = (thePeer.isIPType(XrdNetAddrInfo::IPv6) && !thePeer.isMapped());
157 
158  std::stringstream connectErrMsg;
159  if(!rec->pmarkManager.connect(fd, &(aInfo->addr), aInfo->addrlen, CONNECT_TIMEOUT, connectErrMsg)) {
160  rec->m_log->Emsg(rec->log_prefix.c_str(), "Unable to connect socket: ", connectErrMsg.str().c_str());
161  close(fd);
162  return CURL_SOCKET_BAD;
163  }
164 
165  return fd;
166 }
167 
168 int TPCHandler::closesocket_callback(void *clientp, curl_socket_t fd) {
169  TPCLogRecord * rec = (TPCLogRecord *)clientp;
170 
171  // Destroy the PMark handle associated to the file descriptor before closing it.
172  // Otherwise, we would lose the socket usage information if the socket is closed before
173  // the PMark handle is closed.
174  rec->pmarkManager.endPmark(fd);
175 
176  return close(fd);
177 }
178 
179 /******************************************************************************/
180 /* s s l _ c t x _ c a l l b a c k */
181 /******************************************************************************/
182 
189 int TPCHandler::ssl_ctx_callback(CURL *curl, void *ssl_ctx, void *clientp) {
190  //TPCLogRecord * rec = (TPCLogRecord *)clientp;
191  SSL_CTX* ctx = static_cast<SSL_CTX*>(ssl_ctx);
192  SSL_CTX_set_verify(ctx, SSL_VERIFY_PEER, verify_callback);
193  return CURL_SOCKOPT_OK;
194 }
195 
196 int TPCHandler::verify_callback(int preverify_ok, X509_STORE_CTX* ctx) {
197  if (preverify_ok == 1) return 1;
198 
199  int err = X509_STORE_CTX_get_error(ctx);
200 
201  if (err == X509_V_ERR_UNABLE_TO_GET_CRL) {
202  X509_STORE_CTX_set_error(ctx, X509_V_OK);
203  return 1;
204  }
205 
206  return 0;
207 }
208 
209 /******************************************************************************/
210 /* p r e p a r e U R L */
211 /******************************************************************************/
212 
213 // See XrdHttpTpcUtils::prepareOpenURL() documentation
214 std::string TPCHandler::prepareURL(XrdHttpExtReq &req) {
215  XrdHttpTpcUtils::PrepareOpenURLParams parms {req.resource, req.headers, hdr2cgimap,req.mReprDigest};
216  return XrdHttpTpcUtils::prepareOpenURL(parms);
217 }
218 
219 bool TPCHandler::mismatchReprDigest(const std::map<std::string, std::string> & passiveSrvReprDigest, XrdHttpExtReq &req,
220  TPCLogRecord &rec) {
221  if(passiveSrvReprDigest.size()) {
222  for (const auto & [digestName, digestValue]: passiveSrvReprDigest) {
223  auto clientDigestMatch = req.mReprDigest.find(digestName);
224  if (clientDigestMatch != req.mReprDigest.end()) {
225  // We found a checksum type match between the client-provided one and the source server-provided one
226  if (clientDigestMatch->second != digestValue) {
227  // The checksum value does not match, return an error to the client 412 PRECONDITION_FAILED
228  std::stringstream errMsg;
229  errMsg << "Mismatch between client-provided and remote server checksums:"
230  << " client = (" << clientDigestMatch->first << "=" << clientDigestMatch->second << ")"
231  << " server = (" << digestName << "=" << digestValue << ")";
232  logTransferEvent(LogMask::Error, rec, "REPRDIGEST_VERIFY_FAIL", errMsg.str());
233  rec.status=412;
234  req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(errMsg, rec, CURLcode::CURLE_OK).c_str(), 0);
235  return true;
236  }
237  }
238  }
239  }
240  return false;
241 }
242 
243 /******************************************************************************/
244 /* e n c o d e _ x r o o t d _ o p a q u e _ t o _ u r i */
245 /******************************************************************************/
246 
247 // When processing a redirection from the filesystem layer, it is permitted to return
248 // some xrootd opaque data. The quoting rules for xrootd opaque data are significantly
249 // more permissive than a URI (basically, only '&' and '=' are disallowed while some
250 // URI parsers may dislike characters like '"'). This function takes an opaque string
251 // (e.g., foo=1&bar=2&baz=") and makes it safe for all URI parsers.
252 std::string encode_xrootd_opaque_to_uri(CURL *curl, const std::string &opaque)
253 {
254  std::stringstream parser(opaque);
255  std::string sequence;
256  std::stringstream output;
257  bool first = true;
258  while (getline(parser, sequence, '&')) {
259  if (sequence.empty()) {continue;}
260  size_t equal_pos = sequence.find('=');
261  char *val = NULL;
262  if (equal_pos != std::string::npos)
263  val = curl_easy_escape(curl, sequence.c_str() + equal_pos + 1, sequence.size() - equal_pos - 1);
264  // Do not emit parameter if value exists and escaping failed.
265  if (!val && equal_pos != std::string::npos) {continue;}
266 
267  if (!first) output << "&";
268  first = false;
269  output << sequence.substr(0, equal_pos);
270  if (val) {
271  output << "=" << val;
272  curl_free(val);
273  }
274  }
275  return output.str();
276 }
277 
278 /******************************************************************************/
279 /* T P C H a n d l e r : : C o n f i g u r e C u r l C A */
280 /******************************************************************************/
281 
282 void
283 TPCHandler::ConfigureCurlCA(CURL *curl)
284 {
285  auto ca_filename = m_ca_file ? m_ca_file->CAFilename() : "";
286  auto crl_filename = m_ca_file ? m_ca_file->CRLFilename() : "";
287  if (!ca_filename.empty() && !crl_filename.empty()) {
288  curl_easy_setopt(curl, CURLOPT_CAINFO, ca_filename.c_str());
289  //Check that the CRL file contains at least one entry before setting this option to curl
290  //Indeed, an empty CRL file will make curl unhappy and therefore will fail
291  //all HTTP TPC transfers (https://github.com/xrootd/xrootd/issues/1543)
292  std::ifstream in(crl_filename, std::ifstream::ate | std::ifstream::binary);
293  if(in.tellg() > 0 && m_ca_file->atLeastOneValidCRLFound()){
294  curl_easy_setopt(curl, CURLOPT_CRLFILE, crl_filename.c_str());
295  if (allowMissingCRL) {
296  // No need to set the callback if there is no need to do it
297  curl_easy_setopt(curl, CURLOPT_SSL_CTX_FUNCTION, ssl_ctx_callback);
298  }
299  } else {
300  std::ostringstream oss;
301  oss << "No valid CRL file has been found in the file " << crl_filename << ". Disabling CRL checking.";
302  m_log.Log(Warning,"TpcHandler",oss.str().c_str());
303  }
304  }
305  else if (!m_cadir.empty()) {
306  curl_easy_setopt(curl, CURLOPT_CAPATH, m_cadir.c_str());
307  }
308  if (!m_cafile.empty()) {
309  curl_easy_setopt(curl, CURLOPT_CAINFO, m_cafile.c_str());
310  }
311 }
312 
313 
314 bool TPCHandler::MatchesPath(const char *verb, const char *path) {
315  return !strcmp(verb, "COPY") || !strcmp(verb, "OPTIONS");
316 }
317 
318 /******************************************************************************/
319 /* P r e p a r e U R L */
320 /******************************************************************************/
321 
322 static std::string PrepareURL(const std::string &url)
323 {
324  const std::string replace_schemes[] = { "davs://", "s3://", "s3s://" };
325 
326  for (const auto& s : replace_schemes)
327  if (url.compare(0, s.size(), s) == 0)
328  return "https://" + url.substr(s.size());
329 
330  return url;
331 }
332 
333 static bool IsAllowedScheme(const std::string& url)
334 {
335  const std::string allowed_schemes[] = { "https://", "http://" };
336 
337  for (const auto& s : allowed_schemes)
338  if (url.compare(0, s.size(), s) == 0)
339  return true;
340 
341  return false;
342 }
343 
344 /******************************************************************************/
345 /* T P C H a n d l e r : : P r o c e s s R e q */
346 /******************************************************************************/
347 
349  if (req.verb == "OPTIONS") {
350  return ProcessOptionsReq(req);
351  }
352  auto header = XrdOucTUtils::caseInsensitiveFind(req.headers,"credential");
353  if (header != req.headers.end()) {
354  if (header->second != "none") {
355  m_log.Emsg("ProcessReq", "COPY requested an unsupported credential type: ", header->second.c_str());
356  return req.SendSimpleResp(400, NULL, NULL, "COPY requestd an unsupported Credential type", 0);
357  }
358  }
359  header = XrdOucTUtils::caseInsensitiveFind(req.headers,"source");
360  if (header != req.headers.end()) {
361  std::string src = PrepareURL(header->second);
362  if (!IsAllowedScheme(src)) {
363  const char *error_src = "COPY rejected: disallowed scheme in source URL";
364  m_log.Emsg("ProcessReq", error_src, src.c_str());
365  return req.SendSimpleResp(400, NULL, NULL, error_src, 0);
366  }
367  return ProcessPullReq(src, req);
368  }
369  header = XrdOucTUtils::caseInsensitiveFind(req.headers,"destination");
370  if (header != req.headers.end()) {
371  const std::string& dst = header->second;
372  if (!IsAllowedScheme(dst)) {
373  const char *error_dst = "COPY rejected: disallowed scheme in destination URL";
374  m_log.Emsg("ProcessReq", error_dst, dst.c_str());
375  return req.SendSimpleResp(400, NULL, NULL, error_dst, 0);
376  }
377  return ProcessPushReq(header->second, req);
378  }
379  m_log.Emsg("ProcessReq", "COPY verb requested but no source or destination specified.");
380  return req.SendSimpleResp(400, NULL, NULL, "No Source or Destination specified", 0);
381 }
382 
383 /******************************************************************************/
384 /* T P C H a n d l e r D e s t r u c t o r */
385 /******************************************************************************/
386 
388  m_sfs = NULL;
389 }
390 
391 /******************************************************************************/
392 /* T P C H a n d l e r C o n s t r u c t o r */
393 /******************************************************************************/
394 
395 TPCHandler::TPCHandler(XrdSysError *log, const char *config, XrdOucEnv *myEnv) :
396  m_allow_local(false),
397  m_allow_private(true),
398  m_desthttps(false),
399  m_fixed_route(false),
400  m_timeout(60),
401  m_first_timeout(120),
402  m_log(log->logger(), "TPC_"),
403  m_sfs(NULL)
404 {
405  if (!Configure(config, myEnv)) {
406  throw std::runtime_error("Failed to configure the HTTP third-party-copy handler.");
407  }
408 
409 // Extract out the TPC monitoring object (we share it with xrootd).
410 //
411  XrdXrootdGStream *gs = (XrdXrootdGStream*)myEnv->GetPtr("Tpc.gStream*");
412  if (gs)
413  TPCLogRecord::tpcMonitor = new XrdXrootdTpcMon("http",log->logger(),*gs);
414 }
415 
416 /******************************************************************************/
417 /* T P C H a n d l e r : : P r o c e s s O p t i o n s R e q */
418 /******************************************************************************/
419 
423 int TPCHandler::ProcessOptionsReq(XrdHttpExtReq &req) {
424  return req.SendSimpleResp(200, NULL, (char *) "DAV: 1\r\nDAV: <http://apache.org/dav/propset/fs/1>\r\nAllow: HEAD,GET,PUT,PROPFIND,DELETE,OPTIONS,COPY", NULL, 0);
425 }
426 
427 /******************************************************************************/
428 /* T P C H a n d l e r : : G e t A u t h z */
429 /******************************************************************************/
430 
431 std::string TPCHandler::GetAuthz(XrdHttpExtReq &req) {
432  std::string authz;
433  auto authz_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"authorization");
434  if (authz_header != req.headers.end()) {
435  std::stringstream ss;
436  ss << "authz=" << encode_str(authz_header->second);
437  authz += ss.str();
438  }
439  return authz;
440 }
441 
442 /******************************************************************************/
443 /* T P C H a n d l e r : : R e d i r e c t T r a n s f e r */
444 /******************************************************************************/
445 
446 int TPCHandler::RedirectTransfer(CURL *curl, const std::string &redirect_resource,
447  XrdHttpExtReq &req, XrdOucErrInfo &error, TPCLogRecord &rec)
448 {
449  int port;
450  const char *ptr = error.getErrText(port);
451  if ((ptr == NULL) || (*ptr == '\0') || (port == 0)) {
452  rec.status = 500;
453  std::stringstream ss;
454  ss << "Internal error: redirect without hostname";
455  logTransferEvent(LogMask::Error, rec, "REDIRECT_INTERNAL_ERROR", ss.str());
456  return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
457  }
458 
459  // The XrdSfs layer hands back the redirect target in host[?cgi] form; the
460  // port arrived separately via getErrText() above. Default to that target
461  // and let the redirect plugin block below override it if it rewrites it.
462  std::string finalTarget = ptr;
463 
464  // When a redirect plugin (XrdXrootdRedirPI) is configured, run the COPY
465  // redirect target through it so the same plugin-driven routing applies
466  // as in the XRootD protocol's fsRedirPI(). The plugin may rewrite host,
467  // port, and CGI; on a fatal plugin error surface a 500 with the plugin's
468  // message rather than emit a redirect we know is wrong. See issue #2767.
469  if (XrdNetAddrInfo *clientAddr = req.GetSecEntity().addrInfo;
470  XrdXrootdRedirHelper::IsActive() && clientAddr) {
471  // Redirect() takes the host[?cgi] target as a single string and splits
472  // it itself; the non-negative port selects its host+port form.
473  int newPort = port;
474  std::string newTarget;
475  std::string errMsg;
476  auto outcome = XrdXrootdRedirHelper::Redirect(ptr, newPort, *clientAddr,
477  newTarget, errMsg);
479  finalTarget = std::move(newTarget);
480  port = newPort;
481  logTransferEvent(LogMask::Info, rec, "REDIRECT_PLUGIN_REWRITE",
482  finalTarget);
483  } else if (outcome == XrdXrootdRedirHelper::Outcome::Error) {
484  rec.status = 500;
485  std::stringstream ess;
486  ess << "Redirect plugin error: " << errMsg;
487  logTransferEvent(LogMask::Error, rec, "REDIRECT_PLUGIN_ERROR",
488  ess.str());
489  return req.SendSimpleResp(rec.status, nullptr, nullptr,
490  generateClientErr(ess, rec).c_str(), 0);
491  }
492  // Outcome::Unchanged: keep the original target.
493  }
494 
495  // Split the (possibly plugin-rewritten) host[?cgi] target: the host goes
496  // into the Location authority, the cgi into its query string. splitHostCgi
497  // keeps the leading '?' on cgi; the Location builder below wants the bare
498  // opaque body, so drop that '?' here.
499  std::string host;
500  std::string cgi;
501  splitHostCgi(finalTarget, host, cgi);
502  std::string opaque = cgi.empty() ? std::string() : cgi.substr(1);
503 
504  std::stringstream ss;
505  ss << "Location: http" << (m_desthttps ? "s" : "") << "://" << host << ":" << port << "/" << redirect_resource;
506 
507  if (!opaque.empty()) {
508  // redirect_resource (sourced from xrd-http-fullresource) may already
509  // carry the client's query string, so pick the separator accordingly
510  // to avoid emitting a malformed URL with two '?'.
511  char sep = (redirect_resource.find('?') == std::string::npos) ? '?' : '&';
512  ss << sep << encode_xrootd_opaque_to_uri(curl, opaque);
513  }
514 
515  rec.status = 307;
516  logTransferEvent(LogMask::Info, rec, "REDIRECT", ss.str());
517  return req.SendSimpleResp(rec.status, NULL, const_cast<char *>(ss.str().c_str()),
518  NULL, 0);
519 }
520 
521 /******************************************************************************/
522 /* T P C H a n d l e r : : O p e n W a i t S t a l l */
523 /******************************************************************************/
524 
525 int TPCHandler::OpenWaitStall(XrdSfsFile &fh, const std::string &resource,
526  int mode, int openMode, const XrdSecEntity &sec,
527  const std::string &authz)
528 {
529  int open_result;
530  while (1) {
531  int orig_ucap = fh.error.getUCap();
532  fh.error.setUCap(orig_ucap | XrdOucEI::uIPv64);
533  std::string opaque;
534  size_t pos = resource.find('?');
535  // Extract the path and opaque info from the resource
536  std::string path = resource.substr(0, pos);
537 
538  if (pos != std::string::npos) {
539  opaque = resource.substr(pos + 1);
540  }
541 
542  // Append the authz information if there are some
543  if(!authz.empty()) {
544  opaque += (opaque.empty() ? "" : "&");
545  opaque += authz;
546  }
547  open_result = fh.open(path.c_str(), mode, openMode, &sec, opaque.c_str());
548 
549  if ((open_result == SFS_STALL) || (open_result == SFS_STARTED)) {
550  int secs_to_stall = fh.error.getErrInfo();
551  if (open_result == SFS_STARTED) {secs_to_stall = secs_to_stall/2 + 5;}
552  std::this_thread::sleep_for (std::chrono::seconds(secs_to_stall));
553  }
554  break;
555  }
556  return open_result;
557 }
558 
559 /******************************************************************************/
560 /* T P C H a n d l e r : : D e t e r m i n e X f e r S i z e */
561 /******************************************************************************/
562 
563 
564 
568 int TPCHandler::PerformHEADRequest(CURL *curl, XrdHttpExtReq &req, State &state,
569  bool &success, TPCLogRecord &rec, bool shouldReturnErrorToClient) {
570  success = false;
571  curl_easy_setopt(curl, CURLOPT_NOBODY, 1);
572  // Set a custom timeout of 60 seconds (= CONNECT_TIMEOUT for convenience) for the HEAD request
573  curl_easy_setopt(curl, CURLOPT_TIMEOUT, CONNECT_TIMEOUT);
574  CURLcode res;
575  res = curl_easy_perform(curl);
576  //Immediately set the CURLOPT_NOBODY flag to 0 as we anyway
577  //don't want the next curl call to do be a HEAD request
578  curl_easy_setopt(curl, CURLOPT_NOBODY, 0);
579  // Reset the CURLOPT_TIMEOUT to no timeout (default)
580  curl_easy_setopt(curl, CURLOPT_TIMEOUT, 0L);
581  curl_easy_setopt(curl, CURLOPT_FAILONERROR, true);
582 
583  std::stringstream ss;
584 
585  if (state.GetStatusCode() >= 400)
586  res = CURLE_HTTP_RETURNED_ERROR;
587 
588  if (res != CURLE_OK) { /* curl failed */
589  ss << curl_easy_strerror(res);
590  switch (res) {
591  case CURLE_HTTP_RETURNED_ERROR: /* remote side may have returned an error */
592  rec.tpc_status = state.GetStatusCode(); /* relay status received from remote side to the client */
593  ss << ": remote host returned '" << rec.tpc_status << " "
594  << httpStatusToString(rec.tpc_status) << "' while fetching file size";
595  break;
596  case CURLE_COULDNT_CONNECT: /* socket callback may have failed */
597  switch (rec.tpc_status) {
598  case 403:
599  ss << ": connection to local/private addresses is forbidden";
600  break;
601  default:
602  ss << ": internal server failure";
603  rec.tpc_status = 500;
604  }
605  break;
606  default:
607  rec.tpc_status = 500;
608  state.SetErrorCode(500);
609  }
610  }
611 
612  if (rec.tpc_status >= 400) {
613  logTransferEvent(LogMask::Error, rec, "HEAD_FAIL", ss.str());
614  return shouldReturnErrorToClient ? req.SendSimpleResp(rec.tpc_status, NULL, NULL, generateClientErr(ss, rec, res).c_str(), 0) : -1;
615  }
616 
617  success = true;
618  ss << "Successfully determined remote file information for pull request: "
619  << "size=" << state.GetContentLength();
620  if(state.GetReprDigest().size()) {
621  unsigned int cksumIndex = 1;
622  for(const auto & [cksumType,cksumValue]: state.GetReprDigest()) {
623  ss << " chksum" << cksumIndex << "=(" << cksumType << "," << cksumValue << ")";
624  cksumIndex++;
625  }
626  }
627  logTransferEvent(LogMask::Debug, rec, "HEAD_SUCCESS", ss.str());
628  return 0;
629 }
630 
631 int TPCHandler::GetRemoteFileInfoTPCPull(CURL *curl, XrdHttpExtReq &req, uint64_t &contentLength, std::map<std::string,std::string> & reprDigest, bool & success, TPCLogRecord &rec) {
632  State state(curl,req.tpcForwardCreds);
633  //Don't forget to copy the headers of the client's request before doing the HEAD call. Otherwise, if there is a need for authentication,
634  //it will fail
635  state.SetupHeadersForHEAD(req);
636  int result;
637  //In case we cannot get the file HEAD request, we return the error to the client
638  if ((result = PerformHEADRequest(curl, req, state, success, rec)) || !success) {
639  return result;
640  }
641  contentLength = state.GetContentLength();
642  reprDigest = state.GetReprDigest();
643  return result;
644 }
645 
646 /******************************************************************************/
647 /* T P C H a n d l e r : : S e n d P e r f M a r k e r */
648 /******************************************************************************/
649 
650 int TPCHandler::SendPerfMarker(XrdHttpExtReq &req, TPCLogRecord &rec, TPC::State &state) {
651  std::stringstream ss;
652  const std::string crlf = "\n";
653  ss << "Perf Marker" << crlf;
654  ss << "Timestamp: " << time(NULL) << crlf;
655  ss << "Stripe Index: 0" << crlf;
656  ss << "Stripe Bytes Transferred: " << state.BytesTransferred() << crlf;
657  ss << "Total Stripe Count: 1" << crlf;
658  // Include the TCP connection associated with this transfer; used by
659  // the TPC client for monitoring purposes.
660  std::string desc = state.GetConnectionDescription();
661  if (!desc.empty())
662  ss << "RemoteConnections: " << desc << crlf;
663  ss << "End" << crlf;
664  rec.bytes_transferred = state.BytesTransferred();
665  logTransferEvent(LogMask::Debug, rec, "PERF_MARKER");
666 
667  return req.ChunkResp(ss.str().c_str(), 0);
668 }
669 
670 /******************************************************************************/
671 /* T P C H a n d l e r : : S e n d P e r f M a r k e r */
672 /******************************************************************************/
673 
674 int TPCHandler::SendPerfMarker(XrdHttpExtReq &req, TPCLogRecord &rec, std::vector<State*> &state,
675  off_t bytes_transferred)
676 {
677  // The 'performance marker' format is largely derived from how GridFTP works
678  // (e.g., the concept of `Stripe` is not quite so relevant here). See:
679  // https://twiki.cern.ch/twiki/bin/view/LCG/HttpTpcTechnical
680  // Example marker:
681  // Perf Marker\n
682  // Timestamp: 1537788010\n
683  // Stripe Index: 0\n
684  // Stripe Bytes Transferred: 238745\n
685  // Total Stripe Count: 1\n
686  // RemoteConnections: tcp:129.93.3.4:1234,tcp:[2600:900:6:1301:268a:7ff:fef6:a590]:2345\n
687  // End\n
688  //
689  std::stringstream ss;
690  const std::string crlf = "\n";
691  ss << "Perf Marker" << crlf;
692  ss << "Timestamp: " << time(NULL) << crlf;
693  ss << "Stripe Index: 0" << crlf;
694  ss << "Stripe Bytes Transferred: " << bytes_transferred << crlf;
695  ss << "Total Stripe Count: 1" << crlf;
696  // Build a list of TCP connections associated with this transfer; used by
697  // the TPC client for monitoring purposes.
698  bool first = true;
699  std::stringstream ss2;
700  for (std::vector<State*>::const_iterator iter = state.begin();
701  iter != state.end(); iter++)
702  {
703  std::string desc = (*iter)->GetConnectionDescription();
704  if (!desc.empty()) {
705  ss2 << (first ? "" : ",") << desc;
706  first = false;
707  }
708  }
709  if (!first)
710  ss << "RemoteConnections: " << ss2.str() << crlf;
711  ss << "End" << crlf;
712  rec.bytes_transferred = bytes_transferred;
713  logTransferEvent(LogMask::Debug, rec, "PERF_MARKER");
714 
715  return req.ChunkResp(ss.str().c_str(), 0);
716 }
717 
718 /******************************************************************************/
719 /* T P C H a n d l e r : : R u n C u r l W i t h U p d a t e s */
720 /******************************************************************************/
721 
722 int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state,
723  TPCLogRecord &rec)
724 {
725  // Create the multi-handle and add in the current transfer to it.
726  CURLM *multi_handle = curl_multi_init();
727  if (!multi_handle) {
728  rec.status = 500;
729  logTransferEvent(LogMask::Error, rec, "CURL_INIT_FAIL",
730  "Failed to initialize a libcurl multi-handle");
731  std::stringstream ss;
732  ss << "Failed to initialize internal server memory";
733  return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
734  }
735 
736  //curl_easy_setopt(curl, CURLOPT_BUFFERSIZE, 128*1024);
737 
738  CURLMcode mres;
739  mres = curl_multi_add_handle(multi_handle, curl);
740  if (mres) {
741  rec.status = 500;
742  std::stringstream ss;
743  ss << "Failed to add transfer to libcurl multi-handle: HTTP library failure=" << curl_multi_strerror(mres);
744  logTransferEvent(LogMask::Error, rec, "CURL_INIT_FAIL", ss.str());
745  curl_multi_cleanup(multi_handle);
746  return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
747  }
748 
749  // Start response to client prior to the first call to curl_multi_perform
750  int retval = req.StartChunkedResp(202, NULL, "Content-Type: text/plain");
751  if (retval) {
752  curl_multi_cleanup(multi_handle);
753  logTransferEvent(LogMask::Error, rec, "RESPONSE_FAIL",
754  "Failed to send the initial response to the TPC client");
755  return retval;
756  } else {
757  logTransferEvent(LogMask::Debug, rec, "RESPONSE_START",
758  "Initial transfer response sent to the TPC client");
759  }
760 
761  // Transfer loop: use curl to actually run the transfer, but periodically
762  // interrupt things to send back performance updates to the client.
763  int running_handles = 1;
764  time_t last_marker = 0;
765  // Track how long it's been since the last time we recorded more bytes being transferred.
766  off_t last_advance_bytes = 0;
767  time_t last_advance_time = time(NULL);
768  time_t transfer_start = last_advance_time;
769  CURLcode res = static_cast<CURLcode>(-1);
770  do {
771  time_t now = time(NULL);
772  time_t next_marker = last_marker + m_marker_period;
773  if (now >= next_marker) {
774  off_t bytes_xfer = state.BytesTransferred();
775  if (bytes_xfer > last_advance_bytes) {
776  last_advance_bytes = bytes_xfer;
777  last_advance_time = now;
778  }
779  if (SendPerfMarker(req, rec, state)) {
780  curl_multi_remove_handle(multi_handle, curl);
781  curl_multi_cleanup(multi_handle);
782  logTransferEvent(LogMask::Error, rec, "PERFMARKER_FAIL",
783  "Failed to send a perf marker to the TPC client");
784  return -1;
785  }
786  int timeout = (transfer_start == last_advance_time) ? m_first_timeout : m_timeout;
787  if (now > last_advance_time + timeout) {
788  const char *log_prefix = rec.log_prefix.c_str();
789  bool tpc_pull = strncmp("Pull", log_prefix, 4) == 0;
790 
791  state.SetErrorCode(10);
792  std::stringstream ss;
793  ss << "Transfer failed because no bytes have been "
794  << (tpc_pull ? "received from the source (pull mode) in "
795  : "transmitted to the destination (push mode) in ") << timeout << " seconds.";
796  state.SetErrorMessage(ss.str());
797  curl_multi_remove_handle(multi_handle, curl);
798  curl_multi_cleanup(multi_handle);
799  break;
800  }
801  last_marker = now;
802  }
803  // The transfer will start after this point, notify the packet marking manager
804  rec.pmarkManager.startTransfer();
805  mres = curl_multi_perform(multi_handle, &running_handles);
806  if (mres == CURLM_CALL_MULTI_PERFORM) {
807  // curl_multi_perform should be called again immediately. On newer
808  // versions of curl, this is no longer used.
809  continue;
810  } else if (mres != CURLM_OK) {
811  break;
812  } else if (running_handles == 0) {
813  break;
814  }
815 
816  rec.pmarkManager.beginPMarks();
817  //printf("There are %d running handles\n", running_handles);
818 
819  // Harvest any messages, looking for CURLMSG_DONE.
820  CURLMsg *msg;
821  do {
822  int msgq = 0;
823  msg = curl_multi_info_read(multi_handle, &msgq);
824  if (msg && (msg->msg == CURLMSG_DONE)) {
825  CURL *easy_handle = msg->easy_handle;
826  res = msg->data.result;
827  curl_multi_remove_handle(multi_handle, easy_handle);
828  }
829  } while (msg);
830 
831  int64_t max_sleep_time = next_marker - time(NULL);
832  if (max_sleep_time <= 0) {
833  continue;
834  }
835  int fd_count;
836  mres = curl_multi_wait(multi_handle, NULL, 0, max_sleep_time*1000, &fd_count);
837  if (mres != CURLM_OK) {
838  break;
839  }
840  } while (running_handles);
841 
842  if (mres != CURLM_OK) {
843  std::stringstream ss;
844  ss << "Internal libcurl multi-handle error: HTTP library failure=" << curl_multi_strerror(mres);
845  logTransferEvent(LogMask::Error, rec, "TRANSFER_CURL_ERROR", ss.str());
846 
847  curl_multi_remove_handle(multi_handle, curl);
848  curl_multi_cleanup(multi_handle);
849 
850  if ((retval = req.ChunkResp(generateClientErr(ss, rec).c_str(), 0))) {
851  logTransferEvent(LogMask::Error, rec, "RESPONSE_FAIL",
852  "Failed to send error message to the TPC client");
853  return retval;
854  }
855  return req.ChunkResp(NULL, 0);
856  }
857 
858  // Harvest any messages, looking for CURLMSG_DONE.
859  CURLMsg *msg;
860  do {
861  int msgq = 0;
862  msg = curl_multi_info_read(multi_handle, &msgq);
863  if (msg && (msg->msg == CURLMSG_DONE)) {
864  CURL *easy_handle = msg->easy_handle;
865  res = msg->data.result;
866  curl_multi_remove_handle(multi_handle, easy_handle);
867  }
868  } while (msg);
869 
870  if (!state.GetErrorCode() && res == static_cast<CURLcode>(-1)) { // No transfers returned?!?
871  curl_multi_remove_handle(multi_handle, curl);
872  curl_multi_cleanup(multi_handle);
873  std::stringstream ss;
874  ss << "Internal state error in libcurl";
875  logTransferEvent(LogMask::Error, rec, "TRANSFER_CURL_ERROR", ss.str());
876 
877  if ((retval = req.ChunkResp(generateClientErr(ss, rec).c_str(), 0))) {
878  logTransferEvent(LogMask::Error, rec, "RESPONSE_FAIL",
879  "Failed to send error message to the TPC client");
880  return retval;
881  }
882  return req.ChunkResp(NULL, 0);
883  }
884  curl_multi_cleanup(multi_handle);
885 
886  state.Flush();
887 
888  rec.bytes_transferred = state.BytesTransferred();
889  rec.tpc_status = state.GetStatusCode();
890 
891  // Explicitly finalize the stream (which will close the underlying file
892  // handle) before the response is sent. In some cases, subsequent HTTP
893  // requests can occur before the filesystem is done closing the handle -
894  // and those requests may occur against partial data.
895  state.Finalize();
896 
897  // Generate the final response back to the client.
898  std::stringstream ss;
899  bool success = false;
900  if (state.GetStatusCode() >= 400) {
901  std::string err = state.GetErrorMessage();
902  std::stringstream ss2;
903  ss2 << "Remote side failed with status code " << state.GetStatusCode();
904  if (!err.empty()) {
905  std::replace(err.begin(), err.end(), '\n', ' ');
906  ss2 << "; error message: \"" << err << "\"";
907  }
908  logTransferEvent(LogMask::Error, rec, "TRANSFER_FAIL", ss2.str());
909  ss << generateClientErr(ss2, rec);
910  } else if (state.GetErrorCode()) {
911  std::string err = state.GetErrorMessage();
912  if (err.empty()) {err = "(no error message provided)";}
913  else {std::replace(err.begin(), err.end(), '\n', ' ');}
914  std::stringstream ss2;
915  ss2 << "Error when interacting with local filesystem: " << err;
916  logTransferEvent(LogMask::Error, rec, "TRANSFER_FAIL", ss2.str());
917  ss << generateClientErr(ss2, rec);
918  } else if (res != CURLE_OK) {
919  std::stringstream ss2;
920  ss2 << "Internal transfer failure";
921  std::stringstream ss3;
922  ss3 << ss2.str() << ": " << curl_easy_strerror(res);
923  logTransferEvent(LogMask::Error, rec, "TRANSFER_FAIL", ss3.str());
924  ss << generateClientErr(ss2, rec, res);
925  } else {
926  ss << "success: Created";
927  success = true;
928  }
929 
930  if ((retval = req.ChunkResp(ss.str().c_str(), 0))) {
931  logTransferEvent(LogMask::Error, rec, "TRANSFER_ERROR",
932  "Failed to send last update to remote client");
933  return retval;
934  } else if (success) {
935  logTransferEvent(LogMask::Info, rec, "TRANSFER_SUCCESS");
936  rec.status = 0;
937  }
938  return req.ChunkResp(NULL, 0);
939 }
940 
941 /******************************************************************************/
942 /* T P C H a n d l e r : : P r o c e s s P u s h R e q */
943 /******************************************************************************/
944 
945 int TPCHandler::ProcessPushReq(const std::string & resource, XrdHttpExtReq &req) {
946  TPCLogRecord rec(req, TpcType::Push);
947  rec.allow_local = m_allow_local;
948  rec.allow_private = m_allow_private;
949  rec.log_prefix = "PushRequest";
950  rec.local = req.resource;
951  rec.remote = resource;
952  rec.m_log = &m_log;
953  char *name = req.GetSecEntity().name;
954  req.GetClientID(rec.clID);
955  if (name) rec.name = name;
956  logTransferEvent(LogMask::Info, rec, "PUSH_START", "Starting a push request");
957 
958  ManagedCurlHandle curlPtr(curl_easy_init());
959  auto curl = curlPtr.get();
960  if (!curl) {
961  std::stringstream ss;
962  ss << "Failed to initialize internal transfer resources";
963  rec.status = 500;
964  logTransferEvent(LogMask::Error, rec, "PUSH_FAIL", ss.str());
965  return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
966  }
967  curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
968  curl_easy_setopt(curl, CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1_2);
969  curl_easy_setopt(curl, CURLOPT_HTTP_VERSION, (long) CURL_HTTP_VERSION_1_1);
970 #if CURL_AT_LEAST_VERSION(7, 85, 0)
971  curl_easy_setopt(curl, CURLOPT_PROTOCOLS_STR, "https,http");
972  curl_easy_setopt(curl, CURLOPT_REDIR_PROTOCOLS_STR, "https,http");
973 #else
974  long protocols = CURLPROTO_HTTP | CURLPROTO_HTTPS;
975  curl_easy_setopt(curl, CURLOPT_PROTOCOLS, protocols);
976  curl_easy_setopt(curl, CURLOPT_REDIR_PROTOCOLS, protocols);
977 #endif
978  curl_easy_setopt(curl, CURLOPT_OPENSOCKETFUNCTION, opensocket_callback);
979  curl_easy_setopt(curl, CURLOPT_OPENSOCKETDATA, &rec);
980  curl_easy_setopt(curl, CURLOPT_CLOSESOCKETFUNCTION, closesocket_callback);
981  curl_easy_setopt(curl, CURLOPT_SOCKOPTFUNCTION, sockopt_callback);
982  curl_easy_setopt(curl, CURLOPT_CLOSESOCKETDATA, &rec);
983  curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, CONNECT_TIMEOUT);
984 
985  auto query_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"xrd-http-fullresource");
986  std::string redirect_resource = req.resource;
987  if (query_header != req.headers.end()) {
988  redirect_resource = query_header->second;
989  }
990 
991  AtomicBeg(m_monid_mutex);
992  uint64_t file_monid = AtomicInc(m_monid);
993  AtomicEnd(m_monid_mutex);
994  std::unique_ptr<XrdSfsFile> fh(m_sfs->newFile(name, file_monid));
995  if (!fh.get()) {
996  rec.status = 500;
997  std::stringstream ss;
998  ss << "Failed to initialize internal transfer file handle";
999  logTransferEvent(LogMask::Error, rec, "OPEN_FAIL",
1000  ss.str());
1001  return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
1002  }
1003  std::string full_url = prepareURL(req);
1004 
1005  std::string authz = GetAuthz(req);
1006 
1007  int open_results = OpenWaitStall(*fh, full_url, SFS_O_RDONLY, 0644,
1008  req.GetSecEntity(), authz);
1009  if (SFS_REDIRECT == open_results) {
1010  int result = RedirectTransfer(curl, redirect_resource, req, fh->error, rec);
1011  return result;
1012  } else if (SFS_OK != open_results) {
1013  int code;
1014  std::stringstream ss;
1015  const char *msg = fh->error.getErrText(code);
1016  if (msg == NULL) ss << "Failed to open local resource";
1017  else ss << msg;
1018  rec.status = mapErrNoToHttp(code);
1019  logTransferEvent(LogMask::Error, rec, "OPEN_FAIL", msg);
1020  int resp_result = req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
1021  fh->close();
1022  return resp_result;
1023  }
1024  ConfigureCurlCA(curl);
1025  curl_easy_setopt(curl, CURLOPT_URL, resource.c_str());
1026 
1027  Stream stream(std::move(fh), 0, 0, m_log);
1028  State state(0, stream, curl, true, req.tpcForwardCreds);
1029  state.SetupHeaders(req);
1030 
1031  return RunCurlWithUpdates(curl, req, state, rec);
1032 }
1033 
1034 /******************************************************************************/
1035 /* T P C H a n d l e r : : P r o c e s s P u l l R e q */
1036 /******************************************************************************/
1037 
1038 int TPCHandler::ProcessPullReq(const std::string &resource, XrdHttpExtReq &req) {
1039  TPCLogRecord rec(req,TpcType::Pull);
1040  rec.allow_local = m_allow_local;
1041  rec.allow_private = m_allow_private;
1042  rec.log_prefix = "PullRequest";
1043  rec.local = req.resource;
1044  rec.remote = resource;
1045  rec.m_log = &m_log;
1046  char *name = req.GetSecEntity().name;
1047  req.GetClientID(rec.clID);
1048  if (name) rec.name = name;
1049  logTransferEvent(LogMask::Info, rec, "PULL_START", "Starting a pull request");
1050 
1051  ManagedCurlHandle curlPtr(curl_easy_init());
1052  auto curl = curlPtr.get();
1053  if (!curl) {
1054  std::stringstream ss;
1055  ss << "Failed to initialize internal transfer resources";
1056  rec.status = 500;
1057  logTransferEvent(LogMask::Error, rec, "PULL_FAIL", ss.str());
1058  return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
1059  }
1060 
1061  // ddavila 2023-01-05:
1062  // The following change was required by the Rucio/SENSE project where
1063  // multiple IP addresses, each from a different subnet, are assigned to a
1064  // single server and routed differently by SENSE.
1065  // The above requires the server to utilize the same IP, that was used to
1066  // start the TPC, for the resolution of the given TPC instead of
1067  // using any of the IPs available.
1068  if (m_fixed_route) {
1069  char ip[64];
1070  char ipType = 0;
1071 
1072  XrdNetAddrInfo *addrInfo = req.GetSecEntity().addrInfo;
1073  int sockFD = addrInfo ? addrInfo->SockFD() : -1;
1074 
1075  if (sockFD < 0 || XrdNetUtils::GetSokInfo(-sockFD, ip, sizeof(ip), ipType) < 0) {
1076  // The socket information could not be fetched for some reason, treat this tpc.fixed_route as "best-effort" instead
1077  // of failing the transfer
1078  logTransferEvent(LogMask::Error, rec, "FIXED_ROUTE_ERR", "Failed to determine local address of incoming fixed route request");
1079  } else {
1080  logTransferEvent(LogMask::Info, rec, "LOCAL IP", ip);
1081  curl_easy_setopt(curl, CURLOPT_INTERFACE, ip);
1082  }
1083  }
1084  curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
1085  curl_easy_setopt(curl, CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1_2);
1086  curl_easy_setopt(curl, CURLOPT_HTTP_VERSION, (long) CURL_HTTP_VERSION_1_1);
1087 #if CURL_AT_LEAST_VERSION(7, 85, 0)
1088  curl_easy_setopt(curl, CURLOPT_PROTOCOLS_STR, "https,http");
1089  curl_easy_setopt(curl, CURLOPT_REDIR_PROTOCOLS_STR, "https,http");
1090 #else
1091  long protocols = CURLPROTO_HTTP | CURLPROTO_HTTPS;
1092  curl_easy_setopt(curl, CURLOPT_PROTOCOLS, protocols);
1093  curl_easy_setopt(curl, CURLOPT_REDIR_PROTOCOLS, protocols);
1094 #endif
1095  curl_easy_setopt(curl, CURLOPT_OPENSOCKETFUNCTION, opensocket_callback);
1096  curl_easy_setopt(curl, CURLOPT_OPENSOCKETDATA, &rec);
1097  curl_easy_setopt(curl, CURLOPT_SOCKOPTFUNCTION, sockopt_callback);
1098  curl_easy_setopt(curl, CURLOPT_SOCKOPTDATA , &rec);
1099  curl_easy_setopt(curl, CURLOPT_CLOSESOCKETFUNCTION, closesocket_callback);
1100  curl_easy_setopt(curl, CURLOPT_CLOSESOCKETDATA, &rec);
1101  curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, CONNECT_TIMEOUT);
1102  std::unique_ptr<XrdSfsFile> fh(m_sfs->newFile(name, m_monid++));
1103  if (!fh.get()) {
1104  std::stringstream ss;
1105  ss << "Failed to initialize internal transfer file handle";
1106  rec.status = 500;
1107  logTransferEvent(LogMask::Error, rec, "PULL_FAIL", ss.str());
1108  return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
1109  }
1110  auto query_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"xrd-http-fullresource");
1111  std::string redirect_resource = req.resource;
1112  if (query_header != req.headers.end()) {
1113  redirect_resource = query_header->second;
1114  }
1116  auto overwrite_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"overwrite");
1117  if ((overwrite_header == req.headers.end()) || (overwrite_header->second == "T")) {
1118  if (! usingEC) mode = SFS_O_TRUNC;
1119  }
1120  int streams = 1;
1121  {
1122  auto streams_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"x-number-of-streams");
1123  if (streams_header != req.headers.end()) {
1124  int stream_req = -1;
1125  try {
1126  stream_req = std::stol(streams_header->second);
1127  } catch (...) { // Handled below
1128  }
1129  if (stream_req < 0 || stream_req > 100) {
1130  std::stringstream ss;
1131  ss << "Invalid request for number of streams";
1132  rec.status = 400;
1133  logTransferEvent(LogMask::Info, rec, "INVALID_REQUEST", ss.str());
1134  return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
1135  }
1136  streams = stream_req == 0 ? 1 : stream_req;
1137  }
1138  }
1139  rec.streams = streams;
1140  std::string full_url = prepareURL(req);
1141  std::string authz = GetAuthz(req);
1142  curl_easy_setopt(curl, CURLOPT_URL, resource.c_str());
1143  ConfigureCurlCA(curl);
1144  uint64_t sourceFileContentLength = 0;
1145  {
1146  //Get the content-length of the source file and pass it to the OSS layer
1147  //during the open
1148  bool success = false;
1149  bool mismatchDigests = false;
1150  std::map<std::string,std::string> sourceFileReprDigest;
1151  GetRemoteFileInfoTPCPull(curl, req, sourceFileContentLength, sourceFileReprDigest, success, rec);
1152  if(success) {
1153  //In the case we cannot get the information from the source server (offline or other error)
1154  //we just don't add the file information to the opaque of the local file to open
1155  full_url += "&oss.asize=" + std::to_string(sourceFileContentLength);
1156  mismatchDigests = mismatchReprDigest(sourceFileReprDigest,req,rec);
1157  }
1158  if(!success || mismatchDigests) {
1159  // We could not get remote file information, or the checksum provided by the client
1160  // does not match the source file one, we already sent the error to the client so we
1161  // just exit here
1162  return 0;
1163  }
1164  }
1165  int open_result = OpenWaitStall(*fh, full_url, mode|SFS_O_WRONLY,
1166  0644 | SFS_O_MKPTH,
1167  req.GetSecEntity(), authz);
1168  if (SFS_REDIRECT == open_result) {
1169  int result = RedirectTransfer(curl, redirect_resource, req, fh->error, rec);
1170  return result;
1171  } else if (SFS_OK != open_result) {
1172  int code;
1173  std::stringstream ss;
1174  const char *msg = fh->error.getErrText(code);
1175  if ((msg == NULL) || (*msg == '\0')) ss << "Failed to open local resource";
1176  else ss << msg;
1177  rec.status = mapErrNoToHttp(code);
1178  logTransferEvent(LogMask::Error, rec, "OPEN_FAIL", ss.str());
1179  int resp_result = req.SendSimpleResp(rec.status, NULL, NULL,
1180  generateClientErr(ss, rec).c_str(), 0);
1181  fh->close();
1182  return resp_result;
1183  }
1184  Stream stream(std::move(fh), streams * m_pipelining_multiplier, streams > 1 ? m_block_size : m_small_block_size, m_log);
1185  State state(0, stream, curl, false, req.tpcForwardCreds);
1186  state.SetupHeaders(req);
1187  state.SetContentLength(sourceFileContentLength);
1188 
1189  if (streams > 1) {
1190  return RunCurlWithStreams(req, state, streams, rec);
1191  } else {
1192  return RunCurlWithUpdates(curl, req, state, rec);
1193  }
1194 }
1195 
1196 /******************************************************************************/
1197 /* T P C H a n d l e r : : l o g T r a n s f e r E v e n t */
1198 /******************************************************************************/
1199 
1200 void TPCHandler::logTransferEvent(LogMask mask, const TPCLogRecord &rec,
1201  const std::string &event, const std::string &message)
1202 {
1203  if (!(m_log.getMsgMask() & mask)) {return;}
1204 
1205  std::stringstream ss;
1206  ss << "event=" << event << ", local=" << rec.local << ", remote=" << rec.remote;
1207  if (rec.name.empty())
1208  ss << ", user=(anonymous)";
1209  else
1210  ss << ", user=" << rec.name;
1211  if (rec.streams != 1)
1212  ss << ", streams=" << rec.streams;
1213  if (rec.bytes_transferred >= 0)
1214  ss << ", bytes_transferred=" << rec.bytes_transferred;
1215  if (rec.status >= 0)
1216  ss << ", status=" << rec.status;
1217  if (rec.tpc_status >= 0)
1218  ss << ", tpc_status=" << rec.tpc_status;
1219  if (!message.empty())
1220  ss << "; " << message;
1221  m_log.Log(mask, rec.log_prefix.c_str(), ss.str().c_str());
1222 }
1223 
1224 std::string TPCHandler::generateClientErr(std::stringstream &err_ss, const TPCLogRecord &rec, CURLcode cCode) {
1225  std::stringstream ssret;
1226  ssret << "failure: " << err_ss.str() << ", local=" << rec.local <<", remote=" << rec.remote;
1227  if(cCode != CURLcode::CURLE_OK) {
1228  ssret << ", HTTP library failure=" << curl_easy_strerror(cCode);
1229  }
1230  return ssret.str();
1231 }
1232 /******************************************************************************/
1233 /* X r d H t t p G e t E x t H a n d l e r */
1234 /******************************************************************************/
1235 
1236 extern "C" {
1237 
1238 XrdHttpExtHandler *XrdHttpGetExtHandler(XrdSysError *log, const char * config, const char * /*parms*/, XrdOucEnv *myEnv) {
1239  if (curl_global_init(CURL_GLOBAL_DEFAULT)) {
1240  log->Emsg("TPCInitialize", "libcurl failed to initialize");
1241  return NULL;
1242  }
1243 
1244  TPCHandler *retval{NULL};
1245  if (!config) {
1246  log->Emsg("TPCInitialize", "TPC handler requires a config filename in order to load");
1247  return NULL;
1248  }
1249  try {
1250  log->Emsg("TPCInitialize", "Will load configuration for the TPC handler from", config);
1251  retval = new TPCHandler(log, config, myEnv);
1252  } catch (std::runtime_error &re) {
1253  log->Emsg("TPCInitialize", "Encountered a runtime failure when loading ", re.what());
1254  //printf("Provided env vars: %p, XrdInet*: %p\n", myEnv, myEnv->GetPtr("XrdInet*"));
1255  }
1256  return retval;
1257 }
1258 
1259 }
void CURL
XrdVERSIONINFO(XrdHttpGetExtHandler, HttpTPC)
XrdHttpExtHandler * XrdHttpGetExtHandler(XrdSysError *log, const char *config, const char *, XrdOucEnv *myEnv)
static std::string PrepareURL(const std::string &url)
std::string encode_xrootd_opaque_to_uri(CURL *curl, const std::string &opaque)
static bool IsAllowedScheme(const std::string &url)
int mapErrNoToHttp(int errNo)
std::string httpStatusToString(int status)
Utility functions for XrdHTTP.
std::string encode_str(const std::string &str)
void splitHostCgi(std::string_view target, std::string &host, std::string &cgi)
#define close(a)
Definition: XrdPosix.hh:48
bool Debug
void getline(uchar *buff, int blen)
#define SFS_REDIRECT
#define SFS_O_MKPTH
#define SFS_STALL
#define SFS_O_RDONLY
#define SFS_STARTED
#define SFS_O_WRONLY
#define SFS_O_CREAT
int XrdSfsFileOpenMode
#define SFS_OK
#define SFS_O_TRUNC
#define AtomicInc(x)
#define AtomicBeg(Mtx)
#define AtomicEnd(Mtx)
@ Error
int GetStatusCode() const
off_t BytesTransferred() const
void SetErrorMessage(const std::string &error_msg)
int GetErrorCode() const
std::string GetErrorMessage() const
std::string GetConnectionDescription()
void SetupHeaders(XrdHttpExtReq &req)
void SetContentLength(const off_t content_length)
off_t GetContentLength() const
void SetErrorCode(int error_code)
const std::map< std::string, std::string > & GetReprDigest() const
void SetupHeadersForHEAD(XrdHttpExtReq &req)
TPCHandler(XrdSysError *log, const char *config, XrdOucEnv *myEnv)
virtual int ProcessReq(XrdHttpExtReq &req)
virtual ~TPCHandler()
virtual bool MatchesPath(const char *verb, const char *path)
Tells if the incoming path is recognized as one of the paths that have to be processed.
int ChunkResp(const char *body, long long bodylen)
Send a (potentially partial) body in a chunked response; invoking with NULL body.
void GetClientID(std::string &clid)
std::map< std::string, std::string > & headers
std::string resource
std::string verb
std::map< std::string, std::string > mReprDigest
Repr-Digest map where the key is the digest name and the value is the base64 encoded digest value.
int StartChunkedResp(int code, const char *desc, const char *header_to_add)
Starts a chunked response; body of request is sent over multiple parts using the SendChunkResp.
const XrdSecEntity & GetSecEntity() const
int SendSimpleResp(int code, const char *desc, const char *header_to_add, const char *body, long long bodylen)
Sends a basic response. If the length is < 0 then it is calculated internally.
static std::string prepareOpenURL(PrepareOpenURLParams &params)
static int GetSokInfo(int fd, char *theAddr, int theALen, char &theType)
Definition: XrdNetUtils.cc:533
void * GetPtr(const char *varname)
Definition: XrdOucEnv.cc:281
const char * getErrText()
void setUCap(int ucval)
Set user capabilties.
static std::map< std::string, T >::const_iterator caseInsensitiveFind(const std::map< std::string, T > &m, const std::string &lowerCaseSearchKey)
Definition: XrdOucTUtils.hh:79
XrdNetAddrInfo * addrInfo
Entity's connection details.
Definition: XrdSecEntity.hh:80
char * name
Entity's name.
Definition: XrdSecEntity.hh:69
virtual XrdSfsFile * newFile(char *user=0, int MonID=0)=0
XrdOucErrInfo & error
virtual int open(const char *fileName, XrdSfsFileOpenMode openMode, mode_t createMode, const XrdSecEntity *client=0, const char *opaque=0)=0
virtual int close()=0
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:116
XrdSysLogger * logger(XrdSysLogger *lp=0)
Definition: XrdSysError.hh:175
int getMsgMask()
Definition: XrdSysError.hh:190
void Log(int mask, const char *esfx, const char *text1, const char *text2=0, const char *text3=0)
Definition: XrdSysError.hh:167
static Outcome Redirect(const char *trg, int &port, XrdNetAddrInfo &clientAddr, std::string &outTarget, std::string &errMsg)
std::unique_ptr< CURL, CurlDeleter > ManagedCurlHandle
@ Warning
void operator()(CURL *curl)
static const int uIPv64
ucap: Supports only IPv4 info
static const int isaPush