Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #include "net/HTTPConvergenceLayer.h"
00009 #include <ibrcommon/thread/MutexLock.h>
00010 #include "core/BundleCore.h"
00011 #include <ibrcommon/data/BLOB.h>
00012 #include <ibrdtn/data/Serializer.h>
00013 #include "core/BundleEvent.h"
00014 #include "net/TransferCompletedEvent.h"
00015 #include "net/TransferAbortedEvent.h"
00016 #include "net/BundleReceivedEvent.h"
00017 #include <ibrcommon/Logger.h>
00018
00019 #include <stdio.h>
00020 #include <fcntl.h>
00021 #include <sys/stat.h>
00022 #include <unistd.h>
00023
00024 #include <curl/curl.h>
00025
00026 namespace dtn
00027 {
00028 namespace net
00029 {
00030 static size_t HTTPConvergenceLayer_callback_read(void *ptr, size_t size, size_t nmemb, void *s)
00031 {
00032 size_t retcode = 0;
00033 std::istream *stream = static_cast<std::istream*>(s);
00034 char *buffer = static_cast<char*>(ptr);
00035
00036 stream->read(buffer, (size * nmemb));
00037 retcode = stream->gcount();
00038
00039 return retcode;
00040 }
00041
00042 static size_t HTTPConvergenceLayer_callback_write(void *ptr, size_t size, size_t nmemb, void *s)
00043 {
00044 std::ostream *stream = static_cast<std::ostream*>(s);
00045 char *buffer = static_cast<char*>(ptr);
00046
00047 stream->write(buffer, (size * nmemb));
00048
00049 return (size * nmemb);
00050 }
00051
00052 HTTPConvergenceLayer::HTTPConvergenceLayer(const std::string &server)
00053 : _server(server)
00054 {
00055 curl_global_init(CURL_GLOBAL_ALL);
00056 }
00057
00058 HTTPConvergenceLayer::~HTTPConvergenceLayer()
00059 {
00060 curl_global_cleanup();
00061 }
00062
00063 void HTTPConvergenceLayer::queue(const dtn::core::Node &node, const ConvergenceLayer::Job &job)
00064 {
00065 dtn::core::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage();
00066
00067 try {
00068
00069 const dtn::data::Bundle bundle = storage.get(job._bundle);
00070
00071 ibrcommon::MutexLock l(_write_lock);
00072
00073 ibrcommon::BLOB::Reference ref = ibrcommon::TmpFileBLOB::create();
00074 {
00075 ibrcommon::BLOB::iostream io = ref.iostream();
00076 dtn::data::DefaultSerializer(*io) << bundle;
00077 }
00078
00079 ibrcommon::BLOB::iostream io = ref.iostream();
00080 size_t length = io.size();
00081 CURLcode res;
00082
00083 CURL *curl = curl_easy_init();
00084 if(curl)
00085 {
00086
00087 curl_easy_setopt(curl, CURLOPT_READFUNCTION, HTTPConvergenceLayer_callback_read);
00088
00089
00090 curl_easy_setopt(curl, CURLOPT_UPLOAD, 1L);
00091
00092
00093 curl_easy_setopt(curl, CURLOPT_PUT, 1L);
00094
00095
00096
00097 curl_easy_setopt(curl, CURLOPT_URL, _server.c_str());
00098
00099
00100 curl_easy_setopt(curl, CURLOPT_READDATA, &(*io));
00101
00102
00103
00104 curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE,
00105 (curl_off_t)length);
00106
00107
00108 res = curl_easy_perform(curl);
00109
00110
00111 curl_easy_cleanup(curl);
00112
00113 dtn::net::TransferCompletedEvent::raise(job._destination, bundle);
00114 dtn::core::BundleEvent::raise(bundle, dtn::core::BUNDLE_FORWARDED);
00115 }
00116 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) {
00117
00118 dtn::net::TransferAbortedEvent::raise(EID(node.getURI()), job._bundle, dtn::net::TransferAbortedEvent::REASON_BUNDLE_DELETED);
00119 }
00120 }
00121
00122 dtn::core::Node::Protocol HTTPConvergenceLayer::getDiscoveryProtocol() const
00123 {
00124 return dtn::core::Node::CONN_HTTP;
00125 }
00126
00127 void HTTPConvergenceLayer::componentUp()
00128 {
00129 }
00130
00131 void HTTPConvergenceLayer::componentRun()
00132 {
00133 std::string url = _server + "?eid=" + dtn::core::BundleCore::local.getString();
00134 CURLcode res;
00135
00136 while (true)
00137 {
00138 ibrcommon::BLOB::Reference ref = ibrcommon::TmpFileBLOB::create();
00139 ibrcommon::BLOB::iostream io = ref.iostream();
00140
00141 CURL *curl = curl_easy_init();
00142
00143 if(curl)
00144 {
00145 curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
00146
00147
00148 curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 1L);
00149
00150
00151 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, HTTPConvergenceLayer_callback_write);
00152
00153
00154 curl_easy_setopt(curl, CURLOPT_WRITEDATA, &(*io));
00155
00156 {
00157 ibrcommon::MutexLock reflock(ref);
00158 res = curl_easy_perform(curl);
00159 }
00160
00161
00162 curl_easy_cleanup(curl);
00163
00164 try {
00165 dtn::data::Bundle bundle;
00166 dtn::data::DefaultDeserializer(*io) >> bundle;
00167
00168
00169 dtn::net::BundleReceivedEvent::raise(dtn::data::EID(), bundle);
00170 } catch (const ibrcommon::Exception &ex) {
00171 IBRCOMMON_LOGGER_DEBUG(10) << "http error: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00172 }
00173 }
00174 yield();
00175 }
00176 }
00177
00178 void HTTPConvergenceLayer::componentDown()
00179 {
00180 }
00181
00182 bool HTTPConvergenceLayer::__cancellation()
00183 {
00184
00185 return false;
00186 }
00187
00188 const std::string HTTPConvergenceLayer::getName() const
00189 {
00190 return "HTTPConvergenceLayer";
00191 }
00192 }
00193 }