IBR-DTNSuite 0.6

daemon/src/net/HTTPConvergenceLayer.cpp

Go to the documentation of this file.
00001 
00010 #include "net/HTTPConvergenceLayer.h"
00011 #include <ibrcommon/AutoDelete.h>
00012 
00013 namespace dtn
00014 {
00015         namespace net
00016         {
00017 
00019         const int TIMEOUT = 1000;
00021         const int CONN_TIMEOUT = 5000;
00022 
00024         const int HTTP_OK = 200;
00026         const int HTTP_NO_DATA = 410;
00027 
00029         const int CURL_CONN_OK = 0;
00031         const int CURL_PARTIAL_FILE = 18;
00032 
00033         /* CURL DEBUG SECTION START */
00034 
00038         struct data {
00039                 char trace_ascii;
00040         };
00041 
00053         static void dump(const char *text,
00054                                                 FILE *stream, unsigned char *ptr, size_t size,
00055                                                 char nohex)
00056         {
00057                   size_t i;
00058                   size_t c;
00059 
00060                   unsigned int width=0x10;
00061 
00062                   if(nohex)
00063 
00064                         // without the hex output, we can fit more on screen
00065 
00066                         width = 0x40;
00067 
00068                   fprintf(stream, "%s, %10.10ld bytes (0x%8.8lx)\n",
00069                                   text, (long)size, (long)size);
00070 
00071                   for(i=0; i<size; i+= width) {
00072 
00073                         fprintf(stream, "%4.4lx: ", (long)i);
00074 
00075                         if(!nohex) {
00076 
00077                           // hex not disabled, show it
00078 
00079               for(c = 0; c < width; c++)
00080                                 if(i+c < size)
00081                                   fprintf(stream, "%02x ", ptr[i+c]);
00082                                 else
00083                                   fputs("   ", stream);
00084                         }
00085 
00086                         for(c = 0; (c < width) && (i+c < size); c++) {
00087 
00088                           // check for 0D0A; if found, skip past and start a new line of output
00089 
00090                    if (nohex && (i+c+1 < size) && ptr[i+c]==0x0D && ptr[i+c+1]==0x0A) {
00091                                 i+=(c+2-width);
00092                                 break;
00093                           }
00094                           fprintf(stream, "%c",
00095                                           (ptr[i+c]>=0x20) && (ptr[i+c]<0x80)?ptr[i+c]:'.');
00096 
00097                           // check again for 0D0A, to avoid an extra \n if it's at width
00098 
00099                           if (nohex && (i+c+2 < size) && ptr[i+c+1]==0x0D && ptr[i+c+2]==0x0A) {
00100                                 i+=(c+3-width);
00101                                 break;
00102                           }
00103                         }
00104 
00105 
00106                         fputc('\n', stream);
00107                   }
00108                   fflush(stream);
00109         }
00110 
00111 
00123         static int my_trace(CURL *handle, curl_infotype type,
00124                      char *data, size_t size, void *userp)
00125         {
00126                   struct data *config = (struct data *)userp;
00127                   const char *text;
00128                   (void)handle; // prevent compiler warning
00129 
00130                   switch (type) {
00131                           case CURLINFO_TEXT:
00132                                 fprintf(stderr, "== Info: %s", data);
00133                           default: // in case a new one is introduced to shock us
00134                                 return 0;
00135 
00136                           case CURLINFO_HEADER_IN:
00137                                 text = "<= Recv header";
00138                                 break;
00139                           case CURLINFO_DATA_IN:
00140                                 text = "<= Recv data";
00141                                 break;
00142                   }
00143 
00144                   dump(text, stderr, (unsigned char *)data, size, config->trace_ascii);
00145                   return 0;
00146          }
00147 
00148         /* CURL DEBUG SECTION END */
00149 
00150 
00159                 static size_t HTTPConvergenceLayer_callback_read(void *ptr, size_t size, size_t nmemb, void *s)
00160                 {
00161                         size_t retcode = 0;
00162                         std::istream *stream = static_cast<std::istream*>(s);
00163 
00164                         if (stream->eof()) return 0;
00165 
00166                         char *buffer = static_cast<char*>(ptr);
00167 
00168                         stream->read(buffer, (size * nmemb));
00169                         retcode = stream->gcount();
00170 
00171                         return retcode;
00172                 }
00173 
00182                 static size_t HTTPConvergenceLayer_callback_write(void *ptr, size_t size, size_t nmemb, void *s)
00183                 {
00184                         std::ostream *stream = static_cast<std::ostream*>(s);
00185                         char *buffer = static_cast<char*>(ptr);
00186 
00187                         if (!stream->good()) return 0;
00188 
00189                         stream->write(buffer, (size * nmemb));
00190                         stream->flush();
00191 
00192                         return (size * nmemb);
00193                 }
00194 
00202                 HTTPConvergenceLayer::HTTPConvergenceLayer(const std::string &server)
00203                  : _server(server), _push_iob(NULL), _running(true)
00204                 {
00205                         curl_global_init(CURL_GLOBAL_ALL);
00206                 }
00207 
00208 
00213                 HTTPConvergenceLayer::~HTTPConvergenceLayer()
00214                 {
00215                         curl_global_cleanup();
00216                 }
00217 
00225                 DownloadThread::DownloadThread(ibrcommon::iobuffer &buf)
00226                  : _stream(&buf)
00227                 {
00228                 }
00229 
00234                 DownloadThread::~DownloadThread()
00235                 {
00236                         join();
00237                 }
00238 
00246                 void DownloadThread::run()
00247                 {
00248                         try  {
00249                                 while(_stream.good())
00250                                 {
00251                                         try  {
00252                                                 dtn::data::Bundle bundle;
00253                                                 dtn::data::DefaultDeserializer(_stream) >> bundle;
00254 
00255                                                 // raise default bundle received event
00256                                                 dtn::net::BundleReceivedEvent::raise(dtn::data::EID(), bundle);
00257                                         } catch (const ibrcommon::Exception &ex) {
00258                                                         IBRCOMMON_LOGGER_DEBUG(10) << "http error: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00259                                         }
00260 
00261                                         yield();
00262                                 }
00263                         } catch (const ibrcommon::ThreadException &e)  {
00264                                 std::cerr << "error: " << e.what() << std::endl;
00265                         }
00266                 }
00267 
00268 
00282                 void HTTPConvergenceLayer::queue(const dtn::core::Node &node, const ConvergenceLayer::Job &job)
00283                 {
00284                         dtn::core::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage();
00285                         std::string url_send;
00286 
00287                         long http_code = 0;
00288                         //double upload_size = 0;
00289 
00290                         try {
00291                                 // read the bundle out of the storage
00292                                 const dtn::data::Bundle bundle = storage.get(job._bundle);
00293 
00294                                 ibrcommon::BLOB::Reference ref = ibrcommon::TmpFileBLOB::create();
00295                                 {
00296                                         ibrcommon::BLOB::iostream io = ref.iostream();
00297                                         dtn::data::DefaultSerializer(*io) << bundle;
00298                                 }
00299 
00300                                 ibrcommon::BLOB::iostream io = ref.iostream();
00301                                 size_t length = io.size();
00302                                 CURLcode res;
00303                                 CURL *curl_up;
00304 
00305                                 url_send = _server + "?eid=" + dtn::core::BundleCore::local.getString();
00306                                                                                  //+ "&dst-eid=dtn://experthe-laptop/filetransfer";
00307                                                                                  //+ "&priority=2";
00308                                                                                  //+ "&ttl=3600";
00309 
00310                                 //if (job._bundle.source.getString() == (dtn::core::BundleCore::local.getString() + "/echo-client"))
00311                                 //{
00312                                 //      url_send = _server + "?eid=" + dtn::core::BundleCore::local.getString() + "&dst-eid=echo-client";
00313                                 //}
00314 
00315                                 curl_up = curl_easy_init();
00316                                 if(curl_up)
00317                                 {
00318                                         /* we want to use our own read function */
00319                                         curl_easy_setopt(curl_up, CURLOPT_READFUNCTION, HTTPConvergenceLayer_callback_read);
00320 
00321                                         /* enable uploading */
00322                                         curl_easy_setopt(curl_up, CURLOPT_UPLOAD, 1L);
00323 
00324                                         /* HTTP PUT please */
00325                                         curl_easy_setopt(curl_up, CURLOPT_PUT, 1L);
00326 
00327                                         /* disable connection timeout */
00328                                         curl_easy_setopt(curl_up, CURLOPT_CONNECTTIMEOUT, 0);
00329 
00330                                         /* specify target URL, and note that this URL should include a file
00331                                            name, not only a directory */
00332                                         curl_easy_setopt(curl_up, CURLOPT_URL, url_send.c_str());
00333 
00334                                         /* now specify which file to upload */
00335                                         curl_easy_setopt(curl_up, CURLOPT_READDATA, &(*io));
00336 
00337                                         /* provide the size of the upload, we specicially typecast the value
00338                                            to curl_off_t since we must be sure to use the correct data size */
00339                                         curl_easy_setopt(curl_up, CURLOPT_INFILESIZE_LARGE,
00340                                                                          (curl_off_t)length);
00341 
00342                                         /* Now run off and do what you've been told! */
00343                                         res = curl_easy_perform(curl_up);
00344 
00345                                         if(res == CURL_CONN_OK)
00346                                         {
00347                                                 /* get HTTP Header StatusCode */
00348                                                 curl_easy_getinfo (curl_up, CURLINFO_RESPONSE_CODE, &http_code);
00349                                                 //curl_easy_getinfo (curl, CURLINFO_SIZE_UPLOAD, &upload_size);
00350 
00351                                                 /* DEBUG OUTPUT INFORMATION */
00352                                                 //std::cout << "CURL CODE    : " << res << std::endl;
00353                                                 //std::cout << "HTTP CODE    : " << http_code << std::endl;
00354                                                 //std::cout << "UPLOAD_SIZE: " << upload_size << " Bytes" << std::endl;
00355                                                 /* DEBUG OUTPUT INFORMATION */
00356 
00357                                                 if(http_code == HTTP_OK)
00358                                                 {
00359                                                         dtn::net::TransferCompletedEvent::raise(job._destination, bundle);
00360                                                         dtn::core::BundleEvent::raise(bundle, dtn::core::BUNDLE_FORWARDED);
00361                                                 }
00362                                         }
00363 
00364                                         curl_easy_cleanup(curl_up);
00365                                 }
00366                         } catch (const dtn::core::BundleStorage::NoBundleFoundException&) {
00367                                 // send transfer aborted event
00368                                 dtn::net::TransferAbortedEvent::raise(node.getEID(), job._bundle, dtn::net::TransferAbortedEvent::REASON_BUNDLE_DELETED);
00369                         }
00370 
00371                 }
00372 
00376                 dtn::core::Node::Protocol HTTPConvergenceLayer::getDiscoveryProtocol() const
00377                 {
00378                         return dtn::core::Node::CONN_HTTP;
00379                 }
00380 
00385                 void HTTPConvergenceLayer::componentUp()
00386                 {
00387                 }
00388 
00400                 void HTTPConvergenceLayer::componentRun()
00401                 {
00402 
00403                         std::string url = _server + "?eid=" + dtn::core::BundleCore::local.getString();
00404 
00405                         CURL *curl_down;
00406                         CURLcode res;
00407 
00408                         /* CURL DEBUG */
00409             //struct data config;
00410                         //config.trace_ascii = 1; /* enable ascii tracing */
00411 
00412 
00413                         //long http_code = 0;
00414                         //double download_size = 0;
00415                         //long connects = 0;
00416 
00417                         while (_running)
00418                         {
00419                                 curl_down = curl_easy_init();
00420 
00421                                 while(curl_down)
00422                                 {
00423                                         curl_easy_setopt(curl_down, CURLOPT_URL, url.c_str());
00424 
00425                                         /* disable connection timeout */
00426                                         curl_easy_setopt(curl_down, CURLOPT_CONNECTTIMEOUT, 0);
00427 
00428                                         /* no progress meter please */
00429                                         curl_easy_setopt(curl_down, CURLOPT_NOPROGRESS, 1L);
00430 
00431                                         /* cURL DEBUG options */
00432                                         //curl_easy_setopt(curl_down, CURLOPT_DEBUGFUNCTION, my_trace);
00433                                     //curl_easy_setopt(curl_down, CURLOPT_DEBUGDATA, &config);
00434 
00435                                     //curl_easy_setopt(curl_down, CURLOPT_DEBUGFUNCTION, my_trace);
00436                                         //curl_easy_setopt(curl_down, CURLOPT_VERBOSE, 1);
00437 
00438                                         // create a receiver thread
00439                                         {
00440                                                 ibrcommon::MutexLock l(_push_iob_mutex);
00441                                                 _push_iob = new ibrcommon::iobuffer();
00442                                         }
00443 
00444                                         ibrcommon::AutoDelete<ibrcommon::iobuffer> auto_kill(_push_iob);
00445                                         std::ostream os(_push_iob);
00446                                         DownloadThread receiver(*_push_iob);
00447 
00448                                         /* send all data to this function  */
00449                                         curl_easy_setopt(curl_down, CURLOPT_WRITEFUNCTION, HTTPConvergenceLayer_callback_write);
00450 
00451                                         /* now specify where to write data */
00452                                         curl_easy_setopt(curl_down, CURLOPT_WRITEDATA, &os);
00453 
00454                                         /* do curl */
00455                                         receiver.start();
00456                                         res = curl_easy_perform(curl_down);
00457 
00458                                         {
00459                                                 ibrcommon::MutexLock l(_push_iob_mutex);
00460                                                 /* finalize iobuffer */
00461                                                 _push_iob->finalize();
00462                                                 receiver.join();
00463                                                 _push_iob = NULL;
00464                                         }
00465 
00466                                         /* get HTTP Header StatusCode */
00467                                         //curl_easy_getinfo (curl_down, CURLINFO_RESPONSE_CODE, &http_code);
00468                                         //curl_easy_getinfo (curl, CURLINFO_SIZE_DOWNLOAD, &download_size);
00469                                         //curl_easy_getinfo (curl, CURLINFO_NUM_CONNECTS, &connects);
00470 
00471                                         /* DEBUG OUTPUT INFORMATION */
00472                                         //std::cout << "CURL CODE    : " << res << std::endl;
00473                                         //std::cerr << "HTTP CODE    : " << http_code << std::endl;
00474                                         //std::cout << "DOWNLOAD_SIZE: " << download_size << " Bytes" << std::endl;
00475                                         //std::cout << "NUM_CONNECTS : " << connects << " Connects" << std::endl;
00476                                         /* DEBUG OUTPUT INFORMATION */
00477 
00478                                         /* Wait some time an retry to connect */
00479                                         sleep(CONN_TIMEOUT);  // Wenn Verbindung nicht hergestellt werden konnte warte 5 sec.
00480                                         IBRCOMMON_LOGGER_DEBUG(10) << "http error: " << "Couldn't connect to server ... wait " << CONN_TIMEOUT/1000 << "s until retry" << IBRCOMMON_LOGGER_ENDL;
00481 
00482                                 }
00483 
00484                                 /* always cleanup */
00485                                 curl_easy_cleanup(curl_down);
00486                         }
00487                         yield();
00488                 }
00489 
00495                 void HTTPConvergenceLayer::componentDown()
00496                 {
00497 
00498                         _running = false;
00499 
00500                         ibrcommon::MutexLock l(_push_iob_mutex);
00501                         if (_push_iob != NULL)
00502                         {
00503                                 _push_iob->finalize();
00504                                 _push_iob = NULL;
00505                         }
00506 
00507                 }
00508 
00513                 bool HTTPConvergenceLayer::__cancellation()
00514                 {
00515                         // since this is an receiving thread we have to cancel the hard way
00516                         return false;
00517                 }
00518 
00523                 const std::string HTTPConvergenceLayer::getName() const
00524                 {
00525                         return "HTTPConvergenceLayer";
00526                 }
00527         }
00528 }