• Main Page
  • Namespaces
  • Classes
  • Files
  • File List
  • File Members

daemon/src/net/HTTPConvergenceLayer.cpp

Go to the documentation of this file.
00001 /*
00002  * HTTPConvergenceLayer.cpp
00003  *
00004  *  Created on: 29.07.2010
00005  *      Author: morgenro
00006  */
00007 
00008 #include "net/HTTPConvergenceLayer.h"
00009 #include <ibrcommon/thread/MutexLock.h>
00010 #include "core/BundleCore.h"
00011 #include <ibrcommon/data/BLOB.h>
00012 #include <ibrdtn/data/Serializer.h>
00013 #include "core/BundleEvent.h"
00014 #include "net/TransferCompletedEvent.h"
00015 #include "net/TransferAbortedEvent.h"
00016 #include "net/BundleReceivedEvent.h"
00017 #include <ibrcommon/Logger.h>
00018 
00019 #include <stdio.h>
00020 #include <fcntl.h>
00021 #include <sys/stat.h>
00022 #include <unistd.h>
00023 
00024 #include <curl/curl.h>
00025 
00026 namespace dtn
00027 {
00028         namespace net
00029         {
00030                 static size_t HTTPConvergenceLayer_callback_read(void *ptr, size_t size, size_t nmemb, void *s)
00031                 {
00032                         size_t retcode = 0;
00033                         std::istream *stream = static_cast<std::istream*>(s);
00034                         char *buffer = static_cast<char*>(ptr);
00035 
00036                         stream->read(buffer, (size * nmemb));
00037                         retcode = stream->gcount();
00038 
00039                         return retcode;
00040                 }
00041 
00042                 static size_t HTTPConvergenceLayer_callback_write(void *ptr, size_t size, size_t nmemb, void *s)
00043                 {
00044                         std::ostream *stream = static_cast<std::ostream*>(s);
00045                         char *buffer = static_cast<char*>(ptr);
00046 
00047                         stream->write(buffer, (size * nmemb));
00048 
00049                         return (size * nmemb);
00050                 }
00051 
00052                 HTTPConvergenceLayer::HTTPConvergenceLayer(const std::string &server)
00053                  : _server(server)
00054                 {
00055                         curl_global_init(CURL_GLOBAL_ALL);
00056                 }
00057 
00058                 HTTPConvergenceLayer::~HTTPConvergenceLayer()
00059                 {
00060                         curl_global_cleanup();
00061                 }
00062 
00063                 void HTTPConvergenceLayer::queue(const dtn::core::Node &node, const ConvergenceLayer::Job &job)
00064                 {
00065                         dtn::core::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage();
00066 
00067                         try {
00068                                 // read the bundle out of the storage
00069                                 const dtn::data::Bundle bundle = storage.get(job._bundle);
00070 
00071                                 ibrcommon::MutexLock l(_write_lock);
00072 
00073                                 ibrcommon::BLOB::Reference ref = ibrcommon::TmpFileBLOB::create();
00074                                 {
00075                                         ibrcommon::BLOB::iostream io = ref.iostream();
00076                                         dtn::data::DefaultSerializer(*io) << bundle;
00077                                 }
00078 
00079                                 ibrcommon::BLOB::iostream io = ref.iostream();
00080                                 size_t length = io.size();
00081                                 CURLcode res;
00082 
00083                                 CURL *curl = curl_easy_init();
00084                                 if(curl)
00085                                 {
00086                                         /* we want to use our own read function */
00087                                         curl_easy_setopt(curl, CURLOPT_READFUNCTION, HTTPConvergenceLayer_callback_read);
00088 
00089                                         /* enable uploading */
00090                                         curl_easy_setopt(curl, CURLOPT_UPLOAD, 1L);
00091 
00092                                         /* HTTP PUT please */
00093                                         curl_easy_setopt(curl, CURLOPT_PUT, 1L);
00094 
00095                                         /* specify target URL, and note that this URL should include a file
00096                                            name, not only a directory */
00097                                         curl_easy_setopt(curl, CURLOPT_URL, _server.c_str());
00098 
00099                                         /* now specify which file to upload */
00100                                         curl_easy_setopt(curl, CURLOPT_READDATA, &(*io));
00101 
00102                                         /* provide the size of the upload, we specicially typecast the value
00103                                            to curl_off_t since we must be sure to use the correct data size */
00104                                         curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE,
00105                                                                          (curl_off_t)length);
00106 
00107                                         /* Now run off and do what you've been told! */
00108                                         res = curl_easy_perform(curl);
00109 
00110                                         /* always cleanup */
00111                                         curl_easy_cleanup(curl);
00112 
00113                                         dtn::net::TransferCompletedEvent::raise(job._destination, bundle);
00114                                         dtn::core::BundleEvent::raise(bundle, dtn::core::BUNDLE_FORWARDED);
00115                                 }
00116                         } catch (const dtn::core::BundleStorage::NoBundleFoundException&) {
00117                                 // send transfer aborted event
00118                                 dtn::net::TransferAbortedEvent::raise(EID(node.getURI()), job._bundle, dtn::net::TransferAbortedEvent::REASON_BUNDLE_DELETED);
00119                         }
00120                 }
00121 
00122                 dtn::core::Node::Protocol HTTPConvergenceLayer::getDiscoveryProtocol() const
00123                 {
00124                         return dtn::core::Node::CONN_HTTP;
00125                 }
00126 
00127                 void HTTPConvergenceLayer::componentUp()
00128                 {
00129                 }
00130 
00131                 void HTTPConvergenceLayer::componentRun()
00132                 {
00133                         std::string url = _server + "?eid=" + dtn::core::BundleCore::local.getString();
00134                         CURLcode res;
00135 
00136                         while (true)
00137                         {
00138                                 ibrcommon::BLOB::Reference ref = ibrcommon::TmpFileBLOB::create();
00139                                 ibrcommon::BLOB::iostream io = ref.iostream();
00140 
00141                                 CURL *curl = curl_easy_init();
00142 
00143                                 if(curl)
00144                                 {
00145                                         curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
00146 
00147                                         /* no progress meter please */
00148                                         curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 1L);
00149 
00150                                         /* send all data to this function  */
00151                                         curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, HTTPConvergenceLayer_callback_write);
00152 
00153                                         /* now specify which file to upload */
00154                                         curl_easy_setopt(curl, CURLOPT_WRITEDATA, &(*io));
00155 
00156                                         {
00157                                                 ibrcommon::MutexLock reflock(ref);
00158                                                 res = curl_easy_perform(curl);
00159                                         }
00160 
00161                                         /* always cleanup */
00162                                         curl_easy_cleanup(curl);
00163 
00164                                         try {
00165                                                 dtn::data::Bundle bundle;
00166                                                 dtn::data::DefaultDeserializer(*io) >> bundle;
00167 
00168                                                 // raise default bundle received event
00169                                                 dtn::net::BundleReceivedEvent::raise(dtn::data::EID(), bundle);
00170                                         } catch (const ibrcommon::Exception &ex) {
00171                                                 IBRCOMMON_LOGGER_DEBUG(10) << "http error: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00172                                         }
00173                                 }
00174                                 yield();
00175                         }
00176                 }
00177 
00178                 void HTTPConvergenceLayer::componentDown()
00179                 {
00180                 }
00181 
00182                 bool HTTPConvergenceLayer::__cancellation()
00183                 {
00184                         // since this is an receiving thread we have to cancel the hard way
00185                         return false;
00186                 }
00187 
00188                 const std::string HTTPConvergenceLayer::getName() const
00189                 {
00190                         return "HTTPConvergenceLayer";
00191                 }
00192         }
00193 }

Generated on Wed Mar 30 2011 11:11:49 for IBR-DTNSuite by  doxygen 1.7.1