00001
00002
00003
00004
00005
00006
00007
00008 #include "net/HTTPConvergenceLayer.h"
00009 #include <ibrcommon/thread/MutexLock.h>
00010 #include "core/BundleCore.h"
00011 #include <ibrcommon/data/BLOB.h>
00012 #include <ibrdtn/data/Serializer.h>
00013 #include "core/BundleEvent.h"
00014 #include "net/TransferCompletedEvent.h"
00015 #include "net/BundleReceivedEvent.h"
00016 #include <ibrcommon/Logger.h>
00017
00018 #include <stdio.h>
00019 #include <fcntl.h>
00020 #include <sys/stat.h>
00021 #include <unistd.h>
00022
00023 #include <curl/curl.h>
00024
00025 namespace dtn
00026 {
00027 namespace net
00028 {
00029 static size_t HTTPConvergenceLayer_callback_read(void *ptr, size_t size, size_t nmemb, void *s)
00030 {
00031 size_t retcode = 0;
00032 std::istream *stream = static_cast<std::istream*>(s);
00033 char *buffer = static_cast<char*>(ptr);
00034
00035 stream->read(buffer, (size * nmemb));
00036 retcode = stream->gcount();
00037
00038 return retcode;
00039 }
00040
00041 static size_t HTTPConvergenceLayer_callback_write(void *ptr, size_t size, size_t nmemb, void *s)
00042 {
00043 std::ostream *stream = static_cast<std::ostream*>(s);
00044 char *buffer = static_cast<char*>(ptr);
00045
00046 stream->write(buffer, (size * nmemb));
00047
00048 return (size * nmemb);
00049 }
00050
00051 HTTPConvergenceLayer::HTTPConvergenceLayer(const std::string &server)
00052 : _server(server)
00053 {
00054 curl_global_init(CURL_GLOBAL_ALL);
00055 }
00056
00057 HTTPConvergenceLayer::~HTTPConvergenceLayer()
00058 {
00059 curl_global_cleanup();
00060 }
00061
00062 void HTTPConvergenceLayer::queue(const dtn::core::Node&, const ConvergenceLayer::Job &job)
00063 {
00064 ibrcommon::MutexLock l(_write_lock);
00065
00066 ibrcommon::BLOB::Reference ref = ibrcommon::TmpFileBLOB::create();
00067 {
00068 ibrcommon::MutexLock reflock(ref);
00069 dtn::data::DefaultSerializer(*ref) << job._bundle;
00070 }
00071
00072 size_t length = ref.getSize();
00073 CURLcode res;
00074
00075 CURL *curl = curl_easy_init();
00076 if(curl)
00077 {
00078
00079 curl_easy_setopt(curl, CURLOPT_READFUNCTION, HTTPConvergenceLayer_callback_read);
00080
00081
00082 curl_easy_setopt(curl, CURLOPT_UPLOAD, 1L);
00083
00084
00085 curl_easy_setopt(curl, CURLOPT_PUT, 1L);
00086
00087
00088
00089 curl_easy_setopt(curl, CURLOPT_URL, _server.c_str());
00090
00091
00092 curl_easy_setopt(curl, CURLOPT_READDATA, &(*ref));
00093
00094
00095
00096 curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE,
00097 (curl_off_t)length);
00098
00099 ibrcommon::MutexLock reflock(ref);
00100
00101 res = curl_easy_perform(curl);
00102
00103
00104 curl_easy_cleanup(curl);
00105
00106 dtn::net::TransferCompletedEvent::raise(job._destination, job._bundle);
00107 dtn::core::BundleEvent::raise(job._bundle, dtn::core::BUNDLE_FORWARDED);
00108 }
00109 }
00110
00111 dtn::core::Node::Protocol HTTPConvergenceLayer::getDiscoveryProtocol() const
00112 {
00113 return dtn::core::Node::CONN_HTTP;
00114 }
00115
00116 void HTTPConvergenceLayer::componentUp()
00117 {
00118 }
00119
00120 void HTTPConvergenceLayer::componentRun()
00121 {
00122 std::string url = _server + "?eid=" + dtn::core::BundleCore::local.getString();
00123 CURLcode res;
00124
00125 while (isRunning())
00126 {
00127 ibrcommon::BLOB::Reference ref = ibrcommon::TmpFileBLOB::create();
00128 CURL *curl = curl_easy_init();
00129
00130 if(curl)
00131 {
00132 curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
00133
00134
00135 curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 1L);
00136
00137
00138 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, HTTPConvergenceLayer_callback_write);
00139
00140
00141 curl_easy_setopt(curl, CURLOPT_WRITEDATA, &(*ref));
00142
00143 {
00144 ibrcommon::MutexLock reflock(ref);
00145 res = curl_easy_perform(curl);
00146 }
00147
00148
00149 curl_easy_cleanup(curl);
00150
00151 try {
00152 ibrcommon::MutexLock reflock(ref);
00153 dtn::data::Bundle bundle;
00154 dtn::data::DefaultDeserializer(*ref) >> bundle;
00155
00156
00157 dtn::net::BundleReceivedEvent::raise(dtn::data::EID(), bundle);
00158 } catch (ibrcommon::Exception ex) {
00159 IBRCOMMON_LOGGER_DEBUG(10) << "http error: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00160 }
00161 }
00162 yield();
00163 }
00164 }
00165
00166 void HTTPConvergenceLayer::componentDown()
00167 {
00168 }
00169 }
00170 }