|
IBR-DTNSuite 0.6
|
00001 00010 #include "net/HTTPConvergenceLayer.h" 00011 #include <ibrdtn/data/ScopeControlHopLimitBlock.h> 00012 #include <ibrcommon/AutoDelete.h> 00013 00014 namespace dtn 00015 { 00016 namespace net 00017 { 00018 00020 const int TIMEOUT = 1000; 00022 const int CONN_TIMEOUT = 5000; 00023 00025 const int HTTP_OK = 200; 00027 const int HTTP_NO_DATA = 410; 00028 00030 const int CURL_CONN_OK = 0; 00032 const int CURL_PARTIAL_FILE = 18; 00033 00034 /* CURL DEBUG SECTION START */ 00035 00039 struct data { 00040 char trace_ascii; 00041 }; 00042 00054 static void dump(const char *text, 00055 FILE *stream, unsigned char *ptr, size_t size, 00056 char nohex) 00057 { 00058 size_t i; 00059 size_t c; 00060 00061 unsigned int width=0x10; 00062 00063 if(nohex) 00064 00065 // without the hex output, we can fit more on screen 00066 00067 width = 0x40; 00068 00069 fprintf(stream, "%s, %10.10ld bytes (0x%8.8lx)\n", 00070 text, (long)size, (long)size); 00071 00072 for(i=0; i<size; i+= width) { 00073 00074 fprintf(stream, "%4.4lx: ", (long)i); 00075 00076 if(!nohex) { 00077 00078 // hex not disabled, show it 00079 00080 for(c = 0; c < width; c++) 00081 if(i+c < size) 00082 fprintf(stream, "%02x ", ptr[i+c]); 00083 else 00084 fputs(" ", stream); 00085 } 00086 00087 for(c = 0; (c < width) && (i+c < size); c++) { 00088 00089 // check for 0D0A; if found, skip past and start a new line of output 00090 00091 if (nohex && (i+c+1 < size) && ptr[i+c]==0x0D && ptr[i+c+1]==0x0A) { 00092 i+=(c+2-width); 00093 break; 00094 } 00095 fprintf(stream, "%c", 00096 (ptr[i+c]>=0x20) && (ptr[i+c]<0x80)?ptr[i+c]:'.'); 00097 00098 // check again for 0D0A, to avoid an extra \n if it's at width 00099 00100 if (nohex && (i+c+2 < size) && ptr[i+c+1]==0x0D && ptr[i+c+2]==0x0A) { 00101 i+=(c+3-width); 00102 break; 00103 } 00104 } 00105 00106 00107 fputc('\n', stream); 00108 } 00109 fflush(stream); 00110 } 00111 00112 00124 static int my_trace(CURL *handle, curl_infotype type, 00125 char *data, size_t size, void *userp) 00126 { 00127 struct data *config = (struct data *)userp; 00128 const char *text; 00129 (void)handle; // prevent compiler warning 00130 00131 switch (type) { 00132 case CURLINFO_TEXT: 00133 fprintf(stderr, "== Info: %s", data); 00134 default: // in case a new one is introduced to shock us 00135 return 0; 00136 00137 case CURLINFO_HEADER_IN: 00138 text = "<= Recv header"; 00139 break; 00140 case CURLINFO_DATA_IN: 00141 text = "<= Recv data"; 00142 break; 00143 } 00144 00145 dump(text, stderr, (unsigned char *)data, size, config->trace_ascii); 00146 return 0; 00147 } 00148 00149 /* CURL DEBUG SECTION END */ 00150 00151 00160 static size_t HTTPConvergenceLayer_callback_read(void *ptr, size_t size, size_t nmemb, void *s) 00161 { 00162 size_t retcode = 0; 00163 std::istream *stream = static_cast<std::istream*>(s); 00164 00165 if (stream->eof()) return 0; 00166 00167 char *buffer = static_cast<char*>(ptr); 00168 00169 stream->read(buffer, (size * nmemb)); 00170 retcode = stream->gcount(); 00171 00172 return retcode; 00173 } 00174 00183 static size_t HTTPConvergenceLayer_callback_write(void *ptr, size_t size, size_t nmemb, void *s) 00184 { 00185 std::ostream *stream = static_cast<std::ostream*>(s); 00186 char *buffer = static_cast<char*>(ptr); 00187 00188 if (!stream->good()) return 0; 00189 00190 stream->write(buffer, (size * nmemb)); 00191 stream->flush(); 00192 00193 return (size * nmemb); 00194 } 00195 00203 HTTPConvergenceLayer::HTTPConvergenceLayer(const std::string &server) 00204 : _server(server), _push_iob(NULL), _running(true) 00205 { 00206 curl_global_init(CURL_GLOBAL_ALL); 00207 } 00208 00209 00214 HTTPConvergenceLayer::~HTTPConvergenceLayer() 00215 { 00216 curl_global_cleanup(); 00217 } 00218 00226 DownloadThread::DownloadThread(ibrcommon::iobuffer &buf) 00227 : _stream(&buf) 00228 { 00229 } 00230 00235 DownloadThread::~DownloadThread() 00236 { 00237 join(); 00238 } 00239 00247 void DownloadThread::run() 00248 { 00249 try { 00250 while(_stream.good()) 00251 { 00252 try { 00253 dtn::data::Bundle bundle; 00254 dtn::data::DefaultDeserializer(_stream) >> bundle; 00255 00256 // increment value in the scope control hop limit block 00257 try { 00258 dtn::data::ScopeControlHopLimitBlock &schl = bundle.getBlock<dtn::data::ScopeControlHopLimitBlock>(); 00259 schl.increment(); 00260 } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) { }; 00261 00262 // raise default bundle received event 00263 dtn::net::BundleReceivedEvent::raise(dtn::data::EID(), bundle); 00264 } catch (const ibrcommon::Exception &ex) { 00265 IBRCOMMON_LOGGER_DEBUG(10) << "http error: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00266 } 00267 00268 yield(); 00269 } 00270 } catch (const ibrcommon::ThreadException &e) { 00271 std::cerr << "error: " << e.what() << std::endl; 00272 } 00273 } 00274 00275 00289 void HTTPConvergenceLayer::queue(const dtn::core::Node &node, const ConvergenceLayer::Job &job) 00290 { 00291 dtn::core::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage(); 00292 std::string url_send; 00293 00294 long http_code = 0; 00295 //double upload_size = 0; 00296 00297 try { 00298 // read the bundle out of the storage 00299 const dtn::data::Bundle bundle = storage.get(job._bundle); 00300 00301 ibrcommon::BLOB::Reference ref = ibrcommon::BLOB::create(); 00302 { 00303 ibrcommon::BLOB::iostream io = ref.iostream(); 00304 dtn::data::DefaultSerializer(*io) << bundle; 00305 } 00306 00307 ibrcommon::BLOB::iostream io = ref.iostream(); 00308 size_t length = io.size(); 00309 CURLcode res; 00310 CURL *curl_up; 00311 00312 url_send = _server + "?eid=" + dtn::core::BundleCore::local.getString(); 00313 //+ "&dst-eid=dtn://experthe-laptop/filetransfer"; 00314 //+ "&priority=2"; 00315 //+ "&ttl=3600"; 00316 00317 //if (job._bundle.source.getString() == (dtn::core::BundleCore::local.getString() + "/echo-client")) 00318 //{ 00319 // url_send = _server + "?eid=" + dtn::core::BundleCore::local.getString() + "&dst-eid=echo-client"; 00320 //} 00321 00322 curl_up = curl_easy_init(); 00323 if(curl_up) 00324 { 00325 /* we want to use our own read function */ 00326 curl_easy_setopt(curl_up, CURLOPT_READFUNCTION, HTTPConvergenceLayer_callback_read); 00327 00328 /* enable uploading */ 00329 curl_easy_setopt(curl_up, CURLOPT_UPLOAD, 1L); 00330 00331 /* HTTP PUT please */ 00332 curl_easy_setopt(curl_up, CURLOPT_PUT, 1L); 00333 00334 /* disable connection timeout */ 00335 curl_easy_setopt(curl_up, CURLOPT_CONNECTTIMEOUT, 0); 00336 00337 /* specify target URL, and note that this URL should include a file 00338 name, not only a directory */ 00339 curl_easy_setopt(curl_up, CURLOPT_URL, url_send.c_str()); 00340 00341 /* now specify which file to upload */ 00342 curl_easy_setopt(curl_up, CURLOPT_READDATA, &(*io)); 00343 00344 /* provide the size of the upload, we specicially typecast the value 00345 to curl_off_t since we must be sure to use the correct data size */ 00346 curl_easy_setopt(curl_up, CURLOPT_INFILESIZE_LARGE, 00347 (curl_off_t)length); 00348 00349 /* Now run off and do what you've been told! */ 00350 res = curl_easy_perform(curl_up); 00351 00352 if(res == CURL_CONN_OK) 00353 { 00354 /* get HTTP Header StatusCode */ 00355 curl_easy_getinfo (curl_up, CURLINFO_RESPONSE_CODE, &http_code); 00356 //curl_easy_getinfo (curl, CURLINFO_SIZE_UPLOAD, &upload_size); 00357 00358 /* DEBUG OUTPUT INFORMATION */ 00359 //std::cout << "CURL CODE : " << res << std::endl; 00360 //std::cout << "HTTP CODE : " << http_code << std::endl; 00361 //std::cout << "UPLOAD_SIZE: " << upload_size << " Bytes" << std::endl; 00362 /* DEBUG OUTPUT INFORMATION */ 00363 00364 if(http_code == HTTP_OK) 00365 { 00366 dtn::net::TransferCompletedEvent::raise(job._destination, bundle); 00367 dtn::core::BundleEvent::raise(bundle, dtn::core::BUNDLE_FORWARDED); 00368 } 00369 } 00370 00371 curl_easy_cleanup(curl_up); 00372 } 00373 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) { 00374 // send transfer aborted event 00375 dtn::net::TransferAbortedEvent::raise(node.getEID(), job._bundle, dtn::net::TransferAbortedEvent::REASON_BUNDLE_DELETED); 00376 } 00377 00378 } 00379 00383 dtn::core::Node::Protocol HTTPConvergenceLayer::getDiscoveryProtocol() const 00384 { 00385 return dtn::core::Node::CONN_HTTP; 00386 } 00387 00392 void HTTPConvergenceLayer::componentUp() 00393 { 00394 } 00395 00407 void HTTPConvergenceLayer::componentRun() 00408 { 00409 00410 std::string url = _server + "?eid=" + dtn::core::BundleCore::local.getString(); 00411 00412 CURL *curl_down; 00413 CURLcode res; 00414 00415 /* CURL DEBUG */ 00416 //struct data config; 00417 //config.trace_ascii = 1; /* enable ascii tracing */ 00418 00419 00420 //long http_code = 0; 00421 //double download_size = 0; 00422 //long connects = 0; 00423 00424 while (_running) 00425 { 00426 curl_down = curl_easy_init(); 00427 00428 while(curl_down) 00429 { 00430 curl_easy_setopt(curl_down, CURLOPT_URL, url.c_str()); 00431 00432 /* disable connection timeout */ 00433 curl_easy_setopt(curl_down, CURLOPT_CONNECTTIMEOUT, 0); 00434 00435 /* no progress meter please */ 00436 curl_easy_setopt(curl_down, CURLOPT_NOPROGRESS, 1L); 00437 00438 /* cURL DEBUG options */ 00439 //curl_easy_setopt(curl_down, CURLOPT_DEBUGFUNCTION, my_trace); 00440 //curl_easy_setopt(curl_down, CURLOPT_DEBUGDATA, &config); 00441 00442 //curl_easy_setopt(curl_down, CURLOPT_DEBUGFUNCTION, my_trace); 00443 //curl_easy_setopt(curl_down, CURLOPT_VERBOSE, 1); 00444 00445 // create a receiver thread 00446 { 00447 ibrcommon::MutexLock l(_push_iob_mutex); 00448 _push_iob = new ibrcommon::iobuffer(); 00449 } 00450 00451 ibrcommon::AutoDelete<ibrcommon::iobuffer> auto_kill(_push_iob); 00452 std::ostream os(_push_iob); 00453 DownloadThread receiver(*_push_iob); 00454 00455 /* send all data to this function */ 00456 curl_easy_setopt(curl_down, CURLOPT_WRITEFUNCTION, HTTPConvergenceLayer_callback_write); 00457 00458 /* now specify where to write data */ 00459 curl_easy_setopt(curl_down, CURLOPT_WRITEDATA, &os); 00460 00461 /* do curl */ 00462 receiver.start(); 00463 res = curl_easy_perform(curl_down); 00464 00465 { 00466 ibrcommon::MutexLock l(_push_iob_mutex); 00467 /* finalize iobuffer */ 00468 _push_iob->finalize(); 00469 receiver.join(); 00470 _push_iob = NULL; 00471 } 00472 00473 /* get HTTP Header StatusCode */ 00474 //curl_easy_getinfo (curl_down, CURLINFO_RESPONSE_CODE, &http_code); 00475 //curl_easy_getinfo (curl, CURLINFO_SIZE_DOWNLOAD, &download_size); 00476 //curl_easy_getinfo (curl, CURLINFO_NUM_CONNECTS, &connects); 00477 00478 /* DEBUG OUTPUT INFORMATION */ 00479 //std::cout << "CURL CODE : " << res << std::endl; 00480 //std::cerr << "HTTP CODE : " << http_code << std::endl; 00481 //std::cout << "DOWNLOAD_SIZE: " << download_size << " Bytes" << std::endl; 00482 //std::cout << "NUM_CONNECTS : " << connects << " Connects" << std::endl; 00483 /* DEBUG OUTPUT INFORMATION */ 00484 00485 /* Wait some time an retry to connect */ 00486 sleep(CONN_TIMEOUT); // Wenn Verbindung nicht hergestellt werden konnte warte 5 sec. 00487 IBRCOMMON_LOGGER_DEBUG(10) << "http error: " << "Couldn't connect to server ... wait " << CONN_TIMEOUT/1000 << "s until retry" << IBRCOMMON_LOGGER_ENDL; 00488 00489 } 00490 00491 /* always cleanup */ 00492 curl_easy_cleanup(curl_down); 00493 } 00494 yield(); 00495 } 00496 00502 void HTTPConvergenceLayer::componentDown() 00503 { 00504 00505 _running = false; 00506 00507 ibrcommon::MutexLock l(_push_iob_mutex); 00508 if (_push_iob != NULL) 00509 { 00510 _push_iob->finalize(); 00511 _push_iob = NULL; 00512 } 00513 00514 } 00515 00520 bool HTTPConvergenceLayer::__cancellation() 00521 { 00522 // since this is an receiving thread we have to cancel the hard way 00523 return false; 00524 } 00525 00530 const std::string HTTPConvergenceLayer::getName() const 00531 { 00532 return "HTTPConvergenceLayer"; 00533 } 00534 } 00535 }