IBR-DTNSuite 0.6

daemon/src/net/HTTPConvergenceLayer.cpp

Go to the documentation of this file.
00001 
00010 #include "net/HTTPConvergenceLayer.h"
00011 #include <ibrdtn/data/ScopeControlHopLimitBlock.h>
00012 #include <ibrcommon/AutoDelete.h>
00013 
00014 namespace dtn
00015 {
00016         namespace net
00017         {
00018 
00020         const int TIMEOUT = 1000;
00022         const int CONN_TIMEOUT = 5000;
00023 
00025         const int HTTP_OK = 200;
00027         const int HTTP_NO_DATA = 410;
00028 
00030         const int CURL_CONN_OK = 0;
00032         const int CURL_PARTIAL_FILE = 18;
00033 
00034         /* CURL DEBUG SECTION START */
00035 
00039         struct data {
00040                 char trace_ascii;
00041         };
00042 
00054         static void dump(const char *text,
00055                                                 FILE *stream, unsigned char *ptr, size_t size,
00056                                                 char nohex)
00057         {
00058                   size_t i;
00059                   size_t c;
00060 
00061                   unsigned int width=0x10;
00062 
00063                   if(nohex)
00064 
00065                         // without the hex output, we can fit more on screen
00066 
00067                         width = 0x40;
00068 
00069                   fprintf(stream, "%s, %10.10ld bytes (0x%8.8lx)\n",
00070                                   text, (long)size, (long)size);
00071 
00072                   for(i=0; i<size; i+= width) {
00073 
00074                         fprintf(stream, "%4.4lx: ", (long)i);
00075 
00076                         if(!nohex) {
00077 
00078                           // hex not disabled, show it
00079 
00080               for(c = 0; c < width; c++)
00081                                 if(i+c < size)
00082                                   fprintf(stream, "%02x ", ptr[i+c]);
00083                                 else
00084                                   fputs("   ", stream);
00085                         }
00086 
00087                         for(c = 0; (c < width) && (i+c < size); c++) {
00088 
00089                           // check for 0D0A; if found, skip past and start a new line of output
00090 
00091                    if (nohex && (i+c+1 < size) && ptr[i+c]==0x0D && ptr[i+c+1]==0x0A) {
00092                                 i+=(c+2-width);
00093                                 break;
00094                           }
00095                           fprintf(stream, "%c",
00096                                           (ptr[i+c]>=0x20) && (ptr[i+c]<0x80)?ptr[i+c]:'.');
00097 
00098                           // check again for 0D0A, to avoid an extra \n if it's at width
00099 
00100                           if (nohex && (i+c+2 < size) && ptr[i+c+1]==0x0D && ptr[i+c+2]==0x0A) {
00101                                 i+=(c+3-width);
00102                                 break;
00103                           }
00104                         }
00105 
00106 
00107                         fputc('\n', stream);
00108                   }
00109                   fflush(stream);
00110         }
00111 
00112 
00124         static int my_trace(CURL *handle, curl_infotype type,
00125                      char *data, size_t size, void *userp)
00126         {
00127                   struct data *config = (struct data *)userp;
00128                   const char *text;
00129                   (void)handle; // prevent compiler warning
00130 
00131                   switch (type) {
00132                           case CURLINFO_TEXT:
00133                                 fprintf(stderr, "== Info: %s", data);
00134                           default: // in case a new one is introduced to shock us
00135                                 return 0;
00136 
00137                           case CURLINFO_HEADER_IN:
00138                                 text = "<= Recv header";
00139                                 break;
00140                           case CURLINFO_DATA_IN:
00141                                 text = "<= Recv data";
00142                                 break;
00143                   }
00144 
00145                   dump(text, stderr, (unsigned char *)data, size, config->trace_ascii);
00146                   return 0;
00147          }
00148 
00149         /* CURL DEBUG SECTION END */
00150 
00151 
00160                 static size_t HTTPConvergenceLayer_callback_read(void *ptr, size_t size, size_t nmemb, void *s)
00161                 {
00162                         size_t retcode = 0;
00163                         std::istream *stream = static_cast<std::istream*>(s);
00164 
00165                         if (stream->eof()) return 0;
00166 
00167                         char *buffer = static_cast<char*>(ptr);
00168 
00169                         stream->read(buffer, (size * nmemb));
00170                         retcode = stream->gcount();
00171 
00172                         return retcode;
00173                 }
00174 
00183                 static size_t HTTPConvergenceLayer_callback_write(void *ptr, size_t size, size_t nmemb, void *s)
00184                 {
00185                         std::ostream *stream = static_cast<std::ostream*>(s);
00186                         char *buffer = static_cast<char*>(ptr);
00187 
00188                         if (!stream->good()) return 0;
00189 
00190                         stream->write(buffer, (size * nmemb));
00191                         stream->flush();
00192 
00193                         return (size * nmemb);
00194                 }
00195 
00203                 HTTPConvergenceLayer::HTTPConvergenceLayer(const std::string &server)
00204                  : _server(server), _push_iob(NULL), _running(true)
00205                 {
00206                         curl_global_init(CURL_GLOBAL_ALL);
00207                 }
00208 
00209 
00214                 HTTPConvergenceLayer::~HTTPConvergenceLayer()
00215                 {
00216                         curl_global_cleanup();
00217                 }
00218 
00226                 DownloadThread::DownloadThread(ibrcommon::iobuffer &buf)
00227                  : _stream(&buf)
00228                 {
00229                 }
00230 
00235                 DownloadThread::~DownloadThread()
00236                 {
00237                         join();
00238                 }
00239 
00247                 void DownloadThread::run()
00248                 {
00249                         try  {
00250                                 while(_stream.good())
00251                                 {
00252                                         try  {
00253                                                 dtn::data::Bundle bundle;
00254                                                 dtn::data::DefaultDeserializer(_stream) >> bundle;
00255 
00256                                                 // increment value in the scope control hop limit block
00257                                                 try {
00258                                                         dtn::data::ScopeControlHopLimitBlock &schl = bundle.getBlock<dtn::data::ScopeControlHopLimitBlock>();
00259                                                         schl.increment();
00260                                                 } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) { };
00261 
00262                                                 // raise default bundle received event
00263                                                 dtn::net::BundleReceivedEvent::raise(dtn::data::EID(), bundle);
00264                                         } catch (const ibrcommon::Exception &ex) {
00265                                                         IBRCOMMON_LOGGER_DEBUG(10) << "http error: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00266                                         }
00267 
00268                                         yield();
00269                                 }
00270                         } catch (const ibrcommon::ThreadException &e)  {
00271                                 std::cerr << "error: " << e.what() << std::endl;
00272                         }
00273                 }
00274 
00275 
00289                 void HTTPConvergenceLayer::queue(const dtn::core::Node &node, const ConvergenceLayer::Job &job)
00290                 {
00291                         dtn::core::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage();
00292                         std::string url_send;
00293 
00294                         long http_code = 0;
00295                         //double upload_size = 0;
00296 
00297                         try {
00298                                 // read the bundle out of the storage
00299                                 const dtn::data::Bundle bundle = storage.get(job._bundle);
00300 
00301                                 ibrcommon::BLOB::Reference ref = ibrcommon::BLOB::create();
00302                                 {
00303                                         ibrcommon::BLOB::iostream io = ref.iostream();
00304                                         dtn::data::DefaultSerializer(*io) << bundle;
00305                                 }
00306 
00307                                 ibrcommon::BLOB::iostream io = ref.iostream();
00308                                 size_t length = io.size();
00309                                 CURLcode res;
00310                                 CURL *curl_up;
00311 
00312                                 url_send = _server + "?eid=" + dtn::core::BundleCore::local.getString();
00313                                                                                  //+ "&dst-eid=dtn://experthe-laptop/filetransfer";
00314                                                                                  //+ "&priority=2";
00315                                                                                  //+ "&ttl=3600";
00316 
00317                                 //if (job._bundle.source.getString() == (dtn::core::BundleCore::local.getString() + "/echo-client"))
00318                                 //{
00319                                 //      url_send = _server + "?eid=" + dtn::core::BundleCore::local.getString() + "&dst-eid=echo-client";
00320                                 //}
00321 
00322                                 curl_up = curl_easy_init();
00323                                 if(curl_up)
00324                                 {
00325                                         /* we want to use our own read function */
00326                                         curl_easy_setopt(curl_up, CURLOPT_READFUNCTION, HTTPConvergenceLayer_callback_read);
00327 
00328                                         /* enable uploading */
00329                                         curl_easy_setopt(curl_up, CURLOPT_UPLOAD, 1L);
00330 
00331                                         /* HTTP PUT please */
00332                                         curl_easy_setopt(curl_up, CURLOPT_PUT, 1L);
00333 
00334                                         /* disable connection timeout */
00335                                         curl_easy_setopt(curl_up, CURLOPT_CONNECTTIMEOUT, 0);
00336 
00337                                         /* specify target URL, and note that this URL should include a file
00338                                            name, not only a directory */
00339                                         curl_easy_setopt(curl_up, CURLOPT_URL, url_send.c_str());
00340 
00341                                         /* now specify which file to upload */
00342                                         curl_easy_setopt(curl_up, CURLOPT_READDATA, &(*io));
00343 
00344                                         /* provide the size of the upload, we specicially typecast the value
00345                                            to curl_off_t since we must be sure to use the correct data size */
00346                                         curl_easy_setopt(curl_up, CURLOPT_INFILESIZE_LARGE,
00347                                                                          (curl_off_t)length);
00348 
00349                                         /* Now run off and do what you've been told! */
00350                                         res = curl_easy_perform(curl_up);
00351 
00352                                         if(res == CURL_CONN_OK)
00353                                         {
00354                                                 /* get HTTP Header StatusCode */
00355                                                 curl_easy_getinfo (curl_up, CURLINFO_RESPONSE_CODE, &http_code);
00356                                                 //curl_easy_getinfo (curl, CURLINFO_SIZE_UPLOAD, &upload_size);
00357 
00358                                                 /* DEBUG OUTPUT INFORMATION */
00359                                                 //std::cout << "CURL CODE    : " << res << std::endl;
00360                                                 //std::cout << "HTTP CODE    : " << http_code << std::endl;
00361                                                 //std::cout << "UPLOAD_SIZE: " << upload_size << " Bytes" << std::endl;
00362                                                 /* DEBUG OUTPUT INFORMATION */
00363 
00364                                                 if(http_code == HTTP_OK)
00365                                                 {
00366                                                         dtn::net::TransferCompletedEvent::raise(job._destination, bundle);
00367                                                         dtn::core::BundleEvent::raise(bundle, dtn::core::BUNDLE_FORWARDED);
00368                                                 }
00369                                         }
00370 
00371                                         curl_easy_cleanup(curl_up);
00372                                 }
00373                         } catch (const dtn::core::BundleStorage::NoBundleFoundException&) {
00374                                 // send transfer aborted event
00375                                 dtn::net::TransferAbortedEvent::raise(node.getEID(), job._bundle, dtn::net::TransferAbortedEvent::REASON_BUNDLE_DELETED);
00376                         }
00377 
00378                 }
00379 
00383                 dtn::core::Node::Protocol HTTPConvergenceLayer::getDiscoveryProtocol() const
00384                 {
00385                         return dtn::core::Node::CONN_HTTP;
00386                 }
00387 
00392                 void HTTPConvergenceLayer::componentUp()
00393                 {
00394                 }
00395 
00407                 void HTTPConvergenceLayer::componentRun()
00408                 {
00409 
00410                         std::string url = _server + "?eid=" + dtn::core::BundleCore::local.getString();
00411 
00412                         CURL *curl_down;
00413                         CURLcode res;
00414 
00415                         /* CURL DEBUG */
00416             //struct data config;
00417                         //config.trace_ascii = 1; /* enable ascii tracing */
00418 
00419 
00420                         //long http_code = 0;
00421                         //double download_size = 0;
00422                         //long connects = 0;
00423 
00424                         while (_running)
00425                         {
00426                                 curl_down = curl_easy_init();
00427 
00428                                 while(curl_down)
00429                                 {
00430                                         curl_easy_setopt(curl_down, CURLOPT_URL, url.c_str());
00431 
00432                                         /* disable connection timeout */
00433                                         curl_easy_setopt(curl_down, CURLOPT_CONNECTTIMEOUT, 0);
00434 
00435                                         /* no progress meter please */
00436                                         curl_easy_setopt(curl_down, CURLOPT_NOPROGRESS, 1L);
00437 
00438                                         /* cURL DEBUG options */
00439                                         //curl_easy_setopt(curl_down, CURLOPT_DEBUGFUNCTION, my_trace);
00440                                     //curl_easy_setopt(curl_down, CURLOPT_DEBUGDATA, &config);
00441 
00442                                     //curl_easy_setopt(curl_down, CURLOPT_DEBUGFUNCTION, my_trace);
00443                                         //curl_easy_setopt(curl_down, CURLOPT_VERBOSE, 1);
00444 
00445                                         // create a receiver thread
00446                                         {
00447                                                 ibrcommon::MutexLock l(_push_iob_mutex);
00448                                                 _push_iob = new ibrcommon::iobuffer();
00449                                         }
00450 
00451                                         ibrcommon::AutoDelete<ibrcommon::iobuffer> auto_kill(_push_iob);
00452                                         std::ostream os(_push_iob);
00453                                         DownloadThread receiver(*_push_iob);
00454 
00455                                         /* send all data to this function  */
00456                                         curl_easy_setopt(curl_down, CURLOPT_WRITEFUNCTION, HTTPConvergenceLayer_callback_write);
00457 
00458                                         /* now specify where to write data */
00459                                         curl_easy_setopt(curl_down, CURLOPT_WRITEDATA, &os);
00460 
00461                                         /* do curl */
00462                                         receiver.start();
00463                                         res = curl_easy_perform(curl_down);
00464 
00465                                         {
00466                                                 ibrcommon::MutexLock l(_push_iob_mutex);
00467                                                 /* finalize iobuffer */
00468                                                 _push_iob->finalize();
00469                                                 receiver.join();
00470                                                 _push_iob = NULL;
00471                                         }
00472 
00473                                         /* get HTTP Header StatusCode */
00474                                         //curl_easy_getinfo (curl_down, CURLINFO_RESPONSE_CODE, &http_code);
00475                                         //curl_easy_getinfo (curl, CURLINFO_SIZE_DOWNLOAD, &download_size);
00476                                         //curl_easy_getinfo (curl, CURLINFO_NUM_CONNECTS, &connects);
00477 
00478                                         /* DEBUG OUTPUT INFORMATION */
00479                                         //std::cout << "CURL CODE    : " << res << std::endl;
00480                                         //std::cerr << "HTTP CODE    : " << http_code << std::endl;
00481                                         //std::cout << "DOWNLOAD_SIZE: " << download_size << " Bytes" << std::endl;
00482                                         //std::cout << "NUM_CONNECTS : " << connects << " Connects" << std::endl;
00483                                         /* DEBUG OUTPUT INFORMATION */
00484 
00485                                         /* Wait some time an retry to connect */
00486                                         sleep(CONN_TIMEOUT);  // Wenn Verbindung nicht hergestellt werden konnte warte 5 sec.
00487                                         IBRCOMMON_LOGGER_DEBUG(10) << "http error: " << "Couldn't connect to server ... wait " << CONN_TIMEOUT/1000 << "s until retry" << IBRCOMMON_LOGGER_ENDL;
00488 
00489                                 }
00490 
00491                                 /* always cleanup */
00492                                 curl_easy_cleanup(curl_down);
00493                         }
00494                         yield();
00495                 }
00496 
00502                 void HTTPConvergenceLayer::componentDown()
00503                 {
00504 
00505                         _running = false;
00506 
00507                         ibrcommon::MutexLock l(_push_iob_mutex);
00508                         if (_push_iob != NULL)
00509                         {
00510                                 _push_iob->finalize();
00511                                 _push_iob = NULL;
00512                         }
00513 
00514                 }
00515 
00520                 bool HTTPConvergenceLayer::__cancellation()
00521                 {
00522                         // since this is an receiving thread we have to cancel the hard way
00523                         return false;
00524                 }
00525 
00530                 const std::string HTTPConvergenceLayer::getName() const
00531                 {
00532                         return "HTTPConvergenceLayer";
00533                 }
00534         }
00535 }