|
IBR-DTNSuite 0.6
|
00001 00010 #include "net/HTTPConvergenceLayer.h" 00011 #include <ibrcommon/AutoDelete.h> 00012 00013 namespace dtn 00014 { 00015 namespace net 00016 { 00017 00019 const int TIMEOUT = 1000; 00021 const int CONN_TIMEOUT = 5000; 00022 00024 const int HTTP_OK = 200; 00026 const int HTTP_NO_DATA = 410; 00027 00029 const int CURL_CONN_OK = 0; 00031 const int CURL_PARTIAL_FILE = 18; 00032 00033 /* CURL DEBUG SECTION START */ 00034 00038 struct data { 00039 char trace_ascii; 00040 }; 00041 00053 static void dump(const char *text, 00054 FILE *stream, unsigned char *ptr, size_t size, 00055 char nohex) 00056 { 00057 size_t i; 00058 size_t c; 00059 00060 unsigned int width=0x10; 00061 00062 if(nohex) 00063 00064 // without the hex output, we can fit more on screen 00065 00066 width = 0x40; 00067 00068 fprintf(stream, "%s, %10.10ld bytes (0x%8.8lx)\n", 00069 text, (long)size, (long)size); 00070 00071 for(i=0; i<size; i+= width) { 00072 00073 fprintf(stream, "%4.4lx: ", (long)i); 00074 00075 if(!nohex) { 00076 00077 // hex not disabled, show it 00078 00079 for(c = 0; c < width; c++) 00080 if(i+c < size) 00081 fprintf(stream, "%02x ", ptr[i+c]); 00082 else 00083 fputs(" ", stream); 00084 } 00085 00086 for(c = 0; (c < width) && (i+c < size); c++) { 00087 00088 // check for 0D0A; if found, skip past and start a new line of output 00089 00090 if (nohex && (i+c+1 < size) && ptr[i+c]==0x0D && ptr[i+c+1]==0x0A) { 00091 i+=(c+2-width); 00092 break; 00093 } 00094 fprintf(stream, "%c", 00095 (ptr[i+c]>=0x20) && (ptr[i+c]<0x80)?ptr[i+c]:'.'); 00096 00097 // check again for 0D0A, to avoid an extra \n if it's at width 00098 00099 if (nohex && (i+c+2 < size) && ptr[i+c+1]==0x0D && ptr[i+c+2]==0x0A) { 00100 i+=(c+3-width); 00101 break; 00102 } 00103 } 00104 00105 00106 fputc('\n', stream); 00107 } 00108 fflush(stream); 00109 } 00110 00111 00123 static int my_trace(CURL *handle, curl_infotype type, 00124 char *data, size_t size, void *userp) 00125 { 00126 struct data *config = (struct data *)userp; 00127 const char *text; 00128 (void)handle; // prevent compiler warning 00129 00130 switch (type) { 00131 case CURLINFO_TEXT: 00132 fprintf(stderr, "== Info: %s", data); 00133 default: // in case a new one is introduced to shock us 00134 return 0; 00135 00136 case CURLINFO_HEADER_IN: 00137 text = "<= Recv header"; 00138 break; 00139 case CURLINFO_DATA_IN: 00140 text = "<= Recv data"; 00141 break; 00142 } 00143 00144 dump(text, stderr, (unsigned char *)data, size, config->trace_ascii); 00145 return 0; 00146 } 00147 00148 /* CURL DEBUG SECTION END */ 00149 00150 00159 static size_t HTTPConvergenceLayer_callback_read(void *ptr, size_t size, size_t nmemb, void *s) 00160 { 00161 size_t retcode = 0; 00162 std::istream *stream = static_cast<std::istream*>(s); 00163 00164 if (stream->eof()) return 0; 00165 00166 char *buffer = static_cast<char*>(ptr); 00167 00168 stream->read(buffer, (size * nmemb)); 00169 retcode = stream->gcount(); 00170 00171 return retcode; 00172 } 00173 00182 static size_t HTTPConvergenceLayer_callback_write(void *ptr, size_t size, size_t nmemb, void *s) 00183 { 00184 std::ostream *stream = static_cast<std::ostream*>(s); 00185 char *buffer = static_cast<char*>(ptr); 00186 00187 if (!stream->good()) return 0; 00188 00189 stream->write(buffer, (size * nmemb)); 00190 stream->flush(); 00191 00192 return (size * nmemb); 00193 } 00194 00202 HTTPConvergenceLayer::HTTPConvergenceLayer(const std::string &server) 00203 : _server(server), _push_iob(NULL), _running(true) 00204 { 00205 curl_global_init(CURL_GLOBAL_ALL); 00206 } 00207 00208 00213 HTTPConvergenceLayer::~HTTPConvergenceLayer() 00214 { 00215 curl_global_cleanup(); 00216 } 00217 00225 DownloadThread::DownloadThread(ibrcommon::iobuffer &buf) 00226 : _stream(&buf) 00227 { 00228 } 00229 00234 DownloadThread::~DownloadThread() 00235 { 00236 join(); 00237 } 00238 00246 void DownloadThread::run() 00247 { 00248 try { 00249 while(_stream.good()) 00250 { 00251 try { 00252 dtn::data::Bundle bundle; 00253 dtn::data::DefaultDeserializer(_stream) >> bundle; 00254 00255 // raise default bundle received event 00256 dtn::net::BundleReceivedEvent::raise(dtn::data::EID(), bundle); 00257 } catch (const ibrcommon::Exception &ex) { 00258 IBRCOMMON_LOGGER_DEBUG(10) << "http error: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00259 } 00260 00261 yield(); 00262 } 00263 } catch (const ibrcommon::ThreadException &e) { 00264 std::cerr << "error: " << e.what() << std::endl; 00265 } 00266 } 00267 00268 00282 void HTTPConvergenceLayer::queue(const dtn::core::Node &node, const ConvergenceLayer::Job &job) 00283 { 00284 dtn::core::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage(); 00285 std::string url_send; 00286 00287 long http_code = 0; 00288 //double upload_size = 0; 00289 00290 try { 00291 // read the bundle out of the storage 00292 const dtn::data::Bundle bundle = storage.get(job._bundle); 00293 00294 ibrcommon::BLOB::Reference ref = ibrcommon::TmpFileBLOB::create(); 00295 { 00296 ibrcommon::BLOB::iostream io = ref.iostream(); 00297 dtn::data::DefaultSerializer(*io) << bundle; 00298 } 00299 00300 ibrcommon::BLOB::iostream io = ref.iostream(); 00301 size_t length = io.size(); 00302 CURLcode res; 00303 CURL *curl_up; 00304 00305 url_send = _server + "?eid=" + dtn::core::BundleCore::local.getString(); 00306 //+ "&dst-eid=dtn://experthe-laptop/filetransfer"; 00307 //+ "&priority=2"; 00308 //+ "&ttl=3600"; 00309 00310 //if (job._bundle.source.getString() == (dtn::core::BundleCore::local.getString() + "/echo-client")) 00311 //{ 00312 // url_send = _server + "?eid=" + dtn::core::BundleCore::local.getString() + "&dst-eid=echo-client"; 00313 //} 00314 00315 curl_up = curl_easy_init(); 00316 if(curl_up) 00317 { 00318 /* we want to use our own read function */ 00319 curl_easy_setopt(curl_up, CURLOPT_READFUNCTION, HTTPConvergenceLayer_callback_read); 00320 00321 /* enable uploading */ 00322 curl_easy_setopt(curl_up, CURLOPT_UPLOAD, 1L); 00323 00324 /* HTTP PUT please */ 00325 curl_easy_setopt(curl_up, CURLOPT_PUT, 1L); 00326 00327 /* disable connection timeout */ 00328 curl_easy_setopt(curl_up, CURLOPT_CONNECTTIMEOUT, 0); 00329 00330 /* specify target URL, and note that this URL should include a file 00331 name, not only a directory */ 00332 curl_easy_setopt(curl_up, CURLOPT_URL, url_send.c_str()); 00333 00334 /* now specify which file to upload */ 00335 curl_easy_setopt(curl_up, CURLOPT_READDATA, &(*io)); 00336 00337 /* provide the size of the upload, we specicially typecast the value 00338 to curl_off_t since we must be sure to use the correct data size */ 00339 curl_easy_setopt(curl_up, CURLOPT_INFILESIZE_LARGE, 00340 (curl_off_t)length); 00341 00342 /* Now run off and do what you've been told! */ 00343 res = curl_easy_perform(curl_up); 00344 00345 if(res == CURL_CONN_OK) 00346 { 00347 /* get HTTP Header StatusCode */ 00348 curl_easy_getinfo (curl_up, CURLINFO_RESPONSE_CODE, &http_code); 00349 //curl_easy_getinfo (curl, CURLINFO_SIZE_UPLOAD, &upload_size); 00350 00351 /* DEBUG OUTPUT INFORMATION */ 00352 //std::cout << "CURL CODE : " << res << std::endl; 00353 //std::cout << "HTTP CODE : " << http_code << std::endl; 00354 //std::cout << "UPLOAD_SIZE: " << upload_size << " Bytes" << std::endl; 00355 /* DEBUG OUTPUT INFORMATION */ 00356 00357 if(http_code == HTTP_OK) 00358 { 00359 dtn::net::TransferCompletedEvent::raise(job._destination, bundle); 00360 dtn::core::BundleEvent::raise(bundle, dtn::core::BUNDLE_FORWARDED); 00361 } 00362 } 00363 00364 curl_easy_cleanup(curl_up); 00365 } 00366 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) { 00367 // send transfer aborted event 00368 dtn::net::TransferAbortedEvent::raise(node.getEID(), job._bundle, dtn::net::TransferAbortedEvent::REASON_BUNDLE_DELETED); 00369 } 00370 00371 } 00372 00376 dtn::core::Node::Protocol HTTPConvergenceLayer::getDiscoveryProtocol() const 00377 { 00378 return dtn::core::Node::CONN_HTTP; 00379 } 00380 00385 void HTTPConvergenceLayer::componentUp() 00386 { 00387 } 00388 00400 void HTTPConvergenceLayer::componentRun() 00401 { 00402 00403 std::string url = _server + "?eid=" + dtn::core::BundleCore::local.getString(); 00404 00405 CURL *curl_down; 00406 CURLcode res; 00407 00408 /* CURL DEBUG */ 00409 //struct data config; 00410 //config.trace_ascii = 1; /* enable ascii tracing */ 00411 00412 00413 //long http_code = 0; 00414 //double download_size = 0; 00415 //long connects = 0; 00416 00417 while (_running) 00418 { 00419 curl_down = curl_easy_init(); 00420 00421 while(curl_down) 00422 { 00423 curl_easy_setopt(curl_down, CURLOPT_URL, url.c_str()); 00424 00425 /* disable connection timeout */ 00426 curl_easy_setopt(curl_down, CURLOPT_CONNECTTIMEOUT, 0); 00427 00428 /* no progress meter please */ 00429 curl_easy_setopt(curl_down, CURLOPT_NOPROGRESS, 1L); 00430 00431 /* cURL DEBUG options */ 00432 //curl_easy_setopt(curl_down, CURLOPT_DEBUGFUNCTION, my_trace); 00433 //curl_easy_setopt(curl_down, CURLOPT_DEBUGDATA, &config); 00434 00435 //curl_easy_setopt(curl_down, CURLOPT_DEBUGFUNCTION, my_trace); 00436 //curl_easy_setopt(curl_down, CURLOPT_VERBOSE, 1); 00437 00438 // create a receiver thread 00439 { 00440 ibrcommon::MutexLock l(_push_iob_mutex); 00441 _push_iob = new ibrcommon::iobuffer(); 00442 } 00443 00444 ibrcommon::AutoDelete<ibrcommon::iobuffer> auto_kill(_push_iob); 00445 std::ostream os(_push_iob); 00446 DownloadThread receiver(*_push_iob); 00447 00448 /* send all data to this function */ 00449 curl_easy_setopt(curl_down, CURLOPT_WRITEFUNCTION, HTTPConvergenceLayer_callback_write); 00450 00451 /* now specify where to write data */ 00452 curl_easy_setopt(curl_down, CURLOPT_WRITEDATA, &os); 00453 00454 /* do curl */ 00455 receiver.start(); 00456 res = curl_easy_perform(curl_down); 00457 00458 { 00459 ibrcommon::MutexLock l(_push_iob_mutex); 00460 /* finalize iobuffer */ 00461 _push_iob->finalize(); 00462 receiver.join(); 00463 _push_iob = NULL; 00464 } 00465 00466 /* get HTTP Header StatusCode */ 00467 //curl_easy_getinfo (curl_down, CURLINFO_RESPONSE_CODE, &http_code); 00468 //curl_easy_getinfo (curl, CURLINFO_SIZE_DOWNLOAD, &download_size); 00469 //curl_easy_getinfo (curl, CURLINFO_NUM_CONNECTS, &connects); 00470 00471 /* DEBUG OUTPUT INFORMATION */ 00472 //std::cout << "CURL CODE : " << res << std::endl; 00473 //std::cerr << "HTTP CODE : " << http_code << std::endl; 00474 //std::cout << "DOWNLOAD_SIZE: " << download_size << " Bytes" << std::endl; 00475 //std::cout << "NUM_CONNECTS : " << connects << " Connects" << std::endl; 00476 /* DEBUG OUTPUT INFORMATION */ 00477 00478 /* Wait some time an retry to connect */ 00479 sleep(CONN_TIMEOUT); // Wenn Verbindung nicht hergestellt werden konnte warte 5 sec. 00480 IBRCOMMON_LOGGER_DEBUG(10) << "http error: " << "Couldn't connect to server ... wait " << CONN_TIMEOUT/1000 << "s until retry" << IBRCOMMON_LOGGER_ENDL; 00481 00482 } 00483 00484 /* always cleanup */ 00485 curl_easy_cleanup(curl_down); 00486 } 00487 yield(); 00488 } 00489 00495 void HTTPConvergenceLayer::componentDown() 00496 { 00497 00498 _running = false; 00499 00500 ibrcommon::MutexLock l(_push_iob_mutex); 00501 if (_push_iob != NULL) 00502 { 00503 _push_iob->finalize(); 00504 _push_iob = NULL; 00505 } 00506 00507 } 00508 00513 bool HTTPConvergenceLayer::__cancellation() 00514 { 00515 // since this is an receiving thread we have to cancel the hard way 00516 return false; 00517 } 00518 00523 const std::string HTTPConvergenceLayer::getName() const 00524 { 00525 return "HTTPConvergenceLayer"; 00526 } 00527 } 00528 }