Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #include "net/HTTPConvergenceLayer.h"
00009 #include <ibrcommon/thread/MutexLock.h>
00010 #include "core/BundleCore.h"
00011 #include <ibrcommon/data/BLOB.h>
00012 #include <ibrdtn/data/Serializer.h>
00013 #include "core/BundleEvent.h"
00014 #include "net/TransferCompletedEvent.h"
00015 #include "net/TransferAbortedEvent.h"
00016 #include "net/BundleReceivedEvent.h"
00017 #include <ibrcommon/Logger.h>
00018
00019 #include <stdio.h>
00020 #include <fcntl.h>
00021 #include <sys/stat.h>
00022 #include <unistd.h>
00023
00024 #include <curl/curl.h>
00025
00026 namespace dtn
00027 {
00028 namespace net
00029 {
00030 static size_t HTTPConvergenceLayer_callback_read(void *ptr, size_t size, size_t nmemb, void *s)
00031 {
00032 size_t retcode = 0;
00033 std::istream *stream = static_cast<std::istream*>(s);
00034 char *buffer = static_cast<char*>(ptr);
00035
00036 stream->read(buffer, (size * nmemb));
00037 retcode = stream->gcount();
00038
00039 return retcode;
00040 }
00041
00042 static size_t HTTPConvergenceLayer_callback_write(void *ptr, size_t size, size_t nmemb, void *s)
00043 {
00044 std::ostream *stream = static_cast<std::ostream*>(s);
00045 char *buffer = static_cast<char*>(ptr);
00046
00047 stream->write(buffer, (size * nmemb));
00048
00049 return (size * nmemb);
00050 }
00051
00052 HTTPConvergenceLayer::HTTPConvergenceLayer(const std::string &server)
00053 : _server(server)
00054 {
00055 curl_global_init(CURL_GLOBAL_ALL);
00056 }
00057
00058 HTTPConvergenceLayer::~HTTPConvergenceLayer()
00059 {
00060 curl_global_cleanup();
00061 }
00062
00063 void HTTPConvergenceLayer::queue(const dtn::core::Node &node, const ConvergenceLayer::Job &job)
00064 {
00065 dtn::core::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage();
00066
00067 try {
00068
00069 const dtn::data::Bundle bundle = storage.get(job._bundle);
00070
00071 ibrcommon::MutexLock l(_write_lock);
00072
00073 ibrcommon::BLOB::Reference ref = ibrcommon::TmpFileBLOB::create();
00074 {
00075 ibrcommon::MutexLock reflock(ref);
00076 dtn::data::DefaultSerializer(*ref) << bundle;
00077 }
00078
00079 size_t length = ref.getSize();
00080 CURLcode res;
00081
00082 CURL *curl = curl_easy_init();
00083 if(curl)
00084 {
00085
00086 curl_easy_setopt(curl, CURLOPT_READFUNCTION, HTTPConvergenceLayer_callback_read);
00087
00088
00089 curl_easy_setopt(curl, CURLOPT_UPLOAD, 1L);
00090
00091
00092 curl_easy_setopt(curl, CURLOPT_PUT, 1L);
00093
00094
00095
00096 curl_easy_setopt(curl, CURLOPT_URL, _server.c_str());
00097
00098
00099 curl_easy_setopt(curl, CURLOPT_READDATA, &(*ref));
00100
00101
00102
00103 curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE,
00104 (curl_off_t)length);
00105
00106 ibrcommon::MutexLock reflock(ref);
00107
00108 res = curl_easy_perform(curl);
00109
00110
00111 curl_easy_cleanup(curl);
00112
00113 dtn::net::TransferCompletedEvent::raise(job._destination, bundle);
00114 dtn::core::BundleEvent::raise(bundle, dtn::core::BUNDLE_FORWARDED);
00115 }
00116 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) {
00117
00118 dtn::net::TransferAbortedEvent::raise(EID(node.getURI()), job._bundle, dtn::net::TransferAbortedEvent::REASON_BUNDLE_DELETED);
00119 }
00120 }
00121
00122 dtn::core::Node::Protocol HTTPConvergenceLayer::getDiscoveryProtocol() const
00123 {
00124 return dtn::core::Node::CONN_HTTP;
00125 }
00126
00127 void HTTPConvergenceLayer::componentUp()
00128 {
00129 }
00130
00131 void HTTPConvergenceLayer::componentRun()
00132 {
00133 std::string url = _server + "?eid=" + dtn::core::BundleCore::local.getString();
00134 CURLcode res;
00135
00136 while (true)
00137 {
00138 ibrcommon::BLOB::Reference ref = ibrcommon::TmpFileBLOB::create();
00139 CURL *curl = curl_easy_init();
00140
00141 if(curl)
00142 {
00143 curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
00144
00145
00146 curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 1L);
00147
00148
00149 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, HTTPConvergenceLayer_callback_write);
00150
00151
00152 curl_easy_setopt(curl, CURLOPT_WRITEDATA, &(*ref));
00153
00154 {
00155 ibrcommon::MutexLock reflock(ref);
00156 res = curl_easy_perform(curl);
00157 }
00158
00159
00160 curl_easy_cleanup(curl);
00161
00162 try {
00163 ibrcommon::MutexLock reflock(ref);
00164 dtn::data::Bundle bundle;
00165 dtn::data::DefaultDeserializer(*ref) >> bundle;
00166
00167
00168 dtn::net::BundleReceivedEvent::raise(dtn::data::EID(), bundle);
00169 } catch (ibrcommon::Exception ex) {
00170 IBRCOMMON_LOGGER_DEBUG(10) << "http error: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00171 }
00172 }
00173 yield();
00174 }
00175 }
00176
00177 void HTTPConvergenceLayer::componentDown()
00178 {
00179 }
00180
00181 bool HTTPConvergenceLayer::__cancellation()
00182 {
00183
00184 return false;
00185 }
00186
00187 const std::string HTTPConvergenceLayer::getName() const
00188 {
00189 return "HTTPConvergenceLayer";
00190 }
00191 }
00192 }