IBR-DTN  1.0.0
HTTPConvergenceLayer.cpp
Go to the documentation of this file.
1 
27 #include "Configuration.h"
29 #include "core/BundleCore.h"
30 #include <memory>
31 
32 namespace dtn
33 {
34  namespace net
35  {
37  const int TIMEOUT = 1000;
39  const int CONN_TIMEOUT = 5000;
40 
42  const int HTTP_OK = 200;
44  const int HTTP_NO_DATA = 410;
45 
47  const int CURL_CONN_OK = 0;
49  const int CURL_PARTIAL_FILE = 18;
50 
59  static size_t HTTPConvergenceLayer_callback_read(void *ptr, size_t size, size_t nmemb, void *s)
60  {
61  size_t retcode = 0;
62  std::istream *stream = static_cast<std::istream*>(s);
63 
64  if (stream->eof()) return 0;
65 
66  char *buffer = static_cast<char*>(ptr);
67 
68  stream->read(buffer, (size * nmemb));
69  retcode = stream->gcount();
70 
71  return retcode;
72  }
73 
82  static size_t HTTPConvergenceLayer_callback_write(void *ptr, size_t size, size_t nmemb, void *s)
83  {
84  std::ostream *stream = static_cast<std::ostream*>(s);
85  char *buffer = static_cast<char*>(ptr);
86 
87  if (!stream->good()) return 0;
88 
89  stream->write(buffer, (size * nmemb));
90  stream->flush();
91 
92  return (size * nmemb);
93  }
94 
103  : _server(server), _push_iob(NULL), _running(true)
104  {
105  curl_global_init(CURL_GLOBAL_ALL);
106  }
107 
108 
114  {
115  curl_global_cleanup();
116  }
117 
125  DownloadThread::DownloadThread(ibrcommon::iobuffer &buf)
126  : _stream(&buf)
127  {
128  }
129 
135  {
136  join();
137  }
138 
146  void DownloadThread::run() throw ()
147  {
148  try {
149  // create a filter context
150  dtn::core::FilterContext context;
152 
153  while(_stream.good())
154  {
155  try {
156  dtn::data::Bundle bundle;
158 
159  // push bundle through the filter routines
160  context.setBundle(bundle);
162 
163  if (ret != BundleFilter::ACCEPT) continue;
164 
165  // raise default bundle received event
167  } catch (const ibrcommon::Exception &ex) {
168  IBRCOMMON_LOGGER_DEBUG_TAG("HTTPConvergenceLayer", 10) << "http error: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
169  }
170 
171  yield();
172  }
173  } catch (const ibrcommon::ThreadException &e) {
174  std::cerr << "error: " << e.what() << std::endl;
175  }
176  }
177 
179  {
180  }
181 
196  {
198  std::string url_send;
199 
200  long http_code = 0;
201  //double upload_size = 0;
202 
203  // create a filter context
204  dtn::core::FilterContext context;
206 
207  try {
208  // read the bundle out of the storage
209  dtn::data::Bundle bundle = storage.get(job.getBundle());
210 
211  // push bundle through the filter routines
212  context.setBundle(bundle);
214 
215  if (ret != BundleFilter::ACCEPT) {
216  dtn::net::BundleTransfer local_job = job;
218  return;
219  }
220 
221  ibrcommon::BLOB::Reference ref = ibrcommon::BLOB::create();
222  {
223  ibrcommon::BLOB::iostream io = ref.iostream();
224  dtn::data::DefaultSerializer(*io) << bundle;
225  }
226 
227  ibrcommon::BLOB::iostream io = ref.iostream();
228  size_t length = io.size();
229  CURLcode res;
230  CURL *curl_up;
231 
232  url_send = _server + "?eid=" + dtn::core::BundleCore::local.getString();
233  //+ "&dst-eid=dtn://experthe-laptop/filetransfer";
234  //+ "&priority=2";
235  //+ "&ttl=3600";
236 
237  //if (job._bundle.source.getString() == (dtn::core::BundleCore::local.getString() + "/echo-client"))
238  //{
239  // url_send = _server + "?eid=" + dtn::core::BundleCore::local.getString() + "&dst-eid=echo-client";
240  //}
241 
242  curl_up = curl_easy_init();
243  if(curl_up)
244  {
245  /* we want to use our own read function */
246  curl_easy_setopt(curl_up, CURLOPT_READFUNCTION, HTTPConvergenceLayer_callback_read);
247 
248  /* enable uploading */
249  curl_easy_setopt(curl_up, CURLOPT_UPLOAD, 1L);
250 
251  /* HTTP PUT please */
252  curl_easy_setopt(curl_up, CURLOPT_PUT, 1L);
253 
254  /* disable connection timeout */
255  curl_easy_setopt(curl_up, CURLOPT_CONNECTTIMEOUT, 0);
256 
257  /* specify target URL, and note that this URL should include a file
258  name, not only a directory */
259  curl_easy_setopt(curl_up, CURLOPT_URL, url_send.c_str());
260 
261  /* now specify which file to upload */
262  curl_easy_setopt(curl_up, CURLOPT_READDATA, &(*io));
263 
264  /* provide the size of the upload, we specicially typecast the value
265  to curl_off_t since we must be sure to use the correct data size */
266  curl_easy_setopt(curl_up, CURLOPT_INFILESIZE_LARGE,
267  (curl_off_t)length);
268 
269  /* Now run off and do what you've been told! */
270  res = curl_easy_perform(curl_up);
271 
272  if(res == CURL_CONN_OK)
273  {
274  /* get HTTP Header StatusCode */
275  curl_easy_getinfo (curl_up, CURLINFO_RESPONSE_CODE, &http_code);
276  //curl_easy_getinfo (curl, CURLINFO_SIZE_UPLOAD, &upload_size);
277 
278  /* DEBUG OUTPUT INFORMATION */
279  //std::cout << "CURL CODE : " << res << std::endl;
280  //std::cout << "HTTP CODE : " << http_code << std::endl;
281  //std::cout << "UPLOAD_SIZE: " << upload_size << " Bytes" << std::endl;
282  /* DEBUG OUTPUT INFORMATION */
283 
284  if(http_code == HTTP_OK)
285  {
286  dtn::net::BundleTransfer local_job = job;
287  local_job.complete();
288  }
289  }
290 
291  curl_easy_cleanup(curl_up);
292  }
293  } catch (const dtn::storage::NoBundleFoundException&) {
294  // send transfer aborted event
295  dtn::net::BundleTransfer local_job = job;
297  }
298 
299  }
300 
305  {
307  }
308 
314  {
315  // routine checked for throw() on 15.02.2013
316  }
317 
330  {
331 
332  std::string url = _server + "?eid=" + dtn::core::BundleCore::local.getString();
333 
334  CURL *curl_down;
335 
336  while (_running)
337  {
338  curl_down = curl_easy_init();
339 
340  while(curl_down)
341  {
342  curl_easy_setopt(curl_down, CURLOPT_URL, url.c_str());
343 
344  /* disable connection timeout */
345  curl_easy_setopt(curl_down, CURLOPT_CONNECTTIMEOUT, 0);
346 
347  /* no progress meter please */
348  curl_easy_setopt(curl_down, CURLOPT_NOPROGRESS, 1L);
349 
350  /* cURL DEBUG options */
351  //curl_easy_setopt(curl_down, CURLOPT_DEBUGFUNCTION, my_trace);
352  //curl_easy_setopt(curl_down, CURLOPT_DEBUGDATA, &config);
353 
354  //curl_easy_setopt(curl_down, CURLOPT_DEBUGFUNCTION, my_trace);
355  //curl_easy_setopt(curl_down, CURLOPT_VERBOSE, 1);
356 
357  // create a receiver thread
358  {
359  ibrcommon::MutexLock l(_push_iob_mutex);
360  _push_iob = new ibrcommon::iobuffer();
361  }
362 
363  std::auto_ptr<ibrcommon::iobuffer> auto_kill(_push_iob);
364  std::ostream os(_push_iob);
365  DownloadThread receiver(*_push_iob);
366 
367  /* send all data to this function */
368  curl_easy_setopt(curl_down, CURLOPT_WRITEFUNCTION, HTTPConvergenceLayer_callback_write);
369 
370  /* now specify where to write data */
371  curl_easy_setopt(curl_down, CURLOPT_WRITEDATA, &os);
372 
373  /* do curl */
374  receiver.start();
375  curl_easy_perform(curl_down);
376 
377  {
378  ibrcommon::MutexLock l(_push_iob_mutex);
379  /* finalize iobuffer */
380  _push_iob->finalize();
381  receiver.join();
382  _push_iob = NULL;
383  }
384 
385  /* get HTTP Header StatusCode */
386  //curl_easy_getinfo (curl_down, CURLINFO_RESPONSE_CODE, &http_code);
387  //curl_easy_getinfo (curl, CURLINFO_SIZE_DOWNLOAD, &download_size);
388  //curl_easy_getinfo (curl, CURLINFO_NUM_CONNECTS, &connects);
389 
390  /* Wait some time an retry to connect */
391  ibrcommon::Thread::sleep(CONN_TIMEOUT); // Wenn Verbindung nicht hergestellt werden konnte warte 5 sec.
392  IBRCOMMON_LOGGER_DEBUG_TAG("HTTPConvergenceLayer", 10) << "http error: " << "Couldn't connect to server ... wait " << CONN_TIMEOUT/1000 << "s until retry" << IBRCOMMON_LOGGER_ENDL;
393  }
394 
395  /* always cleanup */
396  curl_easy_cleanup(curl_down);
397  }
398  yield();
399  }
400 
407  {
408  }
409 
415  {
416  _running = false;
417 
418  ibrcommon::MutexLock l(_push_iob_mutex);
419  if (_push_iob != NULL)
420  {
421  _push_iob->finalize();
422  _push_iob = NULL;
423  }
424  }
425 
430  const std::string HTTPConvergenceLayer::getName() const
431  {
432  return "HTTPConvergenceLayer";
433  }
434  }
435 }
const int CURL_PARTIAL_FILE
static dtn::data::EID local
Definition: BundleCore.h:79
HTTPConvergenceLayer(const std::string &server)
void setBundle(const dtn::data::Bundle &data)
dtn::core::Node::Protocol getDiscoveryProtocol() const
BundleFilter::ACTION filter(BundleFilter::TABLE table, const FilterContext &context, dtn::data::Bundle &bundle) const
Definition: BundleCore.cpp:598
const int CURL_CONN_OK
const int CONN_TIMEOUT
void setProtocol(const dtn::core::Node::Protocol &protocol)
bool _running
Definition: dtninbox.cpp:122
const int HTTP_NO_DATA
void queue(const dtn::core::Node &n, const dtn::net::BundleTransfer &job)
virtual const std::string getName() const
virtual dtn::data::Bundle get(const dtn::data::BundleID &id)=0
Size size() const
Definition: Bundle.cpp:258
void abort(const TransferAbortedEvent::AbortReason reason)
const dtn::data::MetaBundle & getBundle() const
std::string getString() const
Definition: EID.cpp:374
dtn::storage::BundleStorage & getStorage()
Definition: BundleCore.cpp:237
DownloadThread(ibrcommon::iobuffer &buf)
static void raise(const dtn::data::EID &peer, const dtn::data::Bundle &bundle, const bool local=false)
static BundleCore & getInstance()
Definition: BundleCore.cpp:82