IBR-DTN  1.0.0
DatagramConnection.cpp
Go to the documentation of this file.
1 /*
2  * DatagramConnection.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "Configuration.h"
23 #include "net/DatagramConnection.h"
25 #include "core/BundleEvent.h"
27 #include "core/BundleCore.h"
28 
29 #include <ibrdtn/utils/Utils.h>
30 #include <ibrdtn/data/Serializer.h>
31 
32 #include <ibrcommon/TimeMeasurement.h>
33 #include <ibrcommon/Logger.h>
34 #include <string.h>
35 
36 #include <iomanip>
37 
38 #define AVG_RTT_WEIGHT 0.875
39 
40 namespace dtn
41 {
42  namespace net
43  {
44  const std::string DatagramConnection::TAG = "DatagramConnection";
45 
46  DatagramConnection::DatagramConnection(const std::string &identifier, const DatagramService::Parameter &params, DatagramConnectionCallback &callback)
47  : _send_state(SEND_IDLE), _recv_state(RECV_IDLE), _callback(callback), _identifier(identifier), _stream(*this, params.max_msg_length), _sender(*this, _stream),
48  _last_ack(0), _next_seqno(0), _head_buf(params.max_msg_length), _head_len(0), _params(params), _avg_rtt(static_cast<double>(params.initial_timeout))
49  {
50  }
51 
53  {
54  // do not destroy this instance as long as
55  // the sender thread is running
56  _sender.join();
57 
58  // join ourself
59  join();
60  }
61 
63  {
64  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 40) << "shutdown()" << IBRCOMMON_LOGGER_ENDL;
65 
66  // close the stream
68  }
69 
71  {
72  // close the stream
73  try {
74  _stream.close();
75  } catch (const ibrcommon::Exception&) { };
76  }
77 
78  void DatagramConnection::run() throw ()
79  {
80  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 40) << "run()" << IBRCOMMON_LOGGER_ENDL;
81 
82  // create a filter context
84  context.setPeer(_peer_eid);
85  context.setProtocol(_callback.getDiscoveryProtocol());
86 
87  // create a deserializer for the stream
89 
90  try {
91  while(_stream.good())
92  {
93  try {
94  dtn::data::Bundle bundle;
95 
96  // read the bundle out of the stream
97  deserializer >> bundle;
98 
99  // push bundle through the filter routines
100  context.setBundle(bundle);
102 
103  switch (ret) {
104  case BundleFilter::ACCEPT:
105  // raise default bundle received event
106  dtn::net::BundleReceivedEvent::raise(_peer_eid, bundle, false);
107  break;
108 
109  case BundleFilter::REJECT:
110  throw dtn::data::Validator::RejectedException("rejected by input filter");
111  break;
112 
113  case BundleFilter::DROP:
114  break;
115  }
116  } catch (const dtn::data::Validator::RejectedException &ex) {
117  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 25) << "Bundle rejected: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
118 
119  // TODO: send NACK
120  _stream.reject();
121  } catch (const dtn::InvalidDataException &ex) {
122  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 25) << "Received an invalid bundle: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
123 
124  // TODO: send NACK
125  _stream.reject();
126  }
127  }
128  } catch (std::exception &ex) {
129  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 25) << "Main-thread died: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
130  }
131  }
132 
134  {
135  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 40) << "setup()" << IBRCOMMON_LOGGER_ENDL;
136 
137  _callback.connectionUp(this);
138  _sender.start();
139  }
140 
142  {
143  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 40) << "finally()" << IBRCOMMON_LOGGER_ENDL;
144 
145  try {
146  ibrcommon::MutexLock l(_ack_cond);
147  _ack_cond.abort();
148  } catch (const std::exception&) { };
149 
150  try {
151  // shutdown the sender thread
152  _sender.stop();
153 
154  // wait until all operations are stopped
155  _sender.join();
156  } catch (const std::exception&) { };
157 
158  try {
159  // remove this connection from the connection list
160  _callback.connectionDown(this);
161  } catch (const ibrcommon::MutexException&) { };
162  }
163 
164  const std::string& DatagramConnection::getIdentifier() const
165  {
166  return _identifier;
167  }
168 
174  {
175  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 15) << "queue bundle " << job.getBundle().toString() << " to " << job.getNeighbor().getString() << IBRCOMMON_LOGGER_ENDL;
176  _sender.queue.push(job);
177  }
178 
184  void DatagramConnection::queue(const char &flags, const unsigned int &seqno, const char *buf, const dtn::data::Length &len)
185  {
186  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 25) << "frame received, flags: " << (int)flags << ", seqno: " << seqno << ", len: " << len << IBRCOMMON_LOGGER_ENDL;
187 
188  try {
189  // we will accept every sequence number on first segments
190  // if this is not the first segment
191  if (!(flags & DatagramService::SEGMENT_FIRST))
192  {
193  // if the sequence number is not expected
194  if (_next_seqno != seqno)
195  // then drop it and send an ack
196  throw WrongSeqNoException(_next_seqno);
197  }
198 
199  // if this is the last segment then...
200  if ((flags & DatagramService::SEGMENT_FIRST) && (flags & DatagramService::SEGMENT_LAST))
201  {
202  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 45) << "full segment received" << IBRCOMMON_LOGGER_ENDL;
203 
204  // forward the last segment to the stream
205  _stream.queue(buf, len, true);
206 
207  // switch to IDLE state
208  _recv_state = RECV_IDLE;
209  }
210  else if (flags & DatagramService::SEGMENT_FIRST)
211  {
212  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 45) << "first segment received" << IBRCOMMON_LOGGER_ENDL;
213 
214  // the first segment is only allowed on IDLE state or on
215  // retransmissions due to lost ACKs
216  if (_recv_state == RECV_IDLE)
217  {
218  // first segment received
219  // store the segment in a buffer
220  ::memcpy(&_head_buf[0], buf, len);
221  _head_len = len;
222 
223  // enter the HEAD state
224  _recv_state = RECV_HEAD;
225  }
226  else if (_recv_state == RECV_HEAD)
227  {
228  // last ACK seams to be lost or the peer has been restarted after
229  // sending the first segment
230  // overwrite the buffer with the new segment
231  ::memcpy(&_head_buf[0], buf, len);
232  _head_len = len;
233  }
234  else
235  {
236  // failure - abort the stream
237  throw DatagramException("stream went inconsistent");
238  }
239  }
240  else
241  {
242  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 45) << ((flags & DatagramService::SEGMENT_LAST) ? "last" : "middle") << " segment received" << IBRCOMMON_LOGGER_ENDL;
243 
244  // this is one segment after the HEAD flush the buffers
245  if (_recv_state == RECV_HEAD)
246  {
247  // forward HEAD buffer to the stream
248  _stream.queue(&_head_buf[0], _head_len, true);
249  _head_len = 0;
250 
251  // switch to TRANSMISSION state
252  _recv_state = RECV_TRANSMISSION;
253  }
254 
255  // forward the current segment to the stream
256  _stream.queue(buf, len, false);
257 
258  if (flags & DatagramService::SEGMENT_LAST)
259  {
260  // switch to IDLE state
261  _recv_state = RECV_IDLE;
262  }
263  }
264 
265  // increment next sequence number
266  _next_seqno = (seqno + 1) % _params.max_seq_numbers;
267  } catch (const WrongSeqNoException &ex) {
268  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 15) << "sequence number received " << seqno << ", expected " << ex.expected_seqno << IBRCOMMON_LOGGER_ENDL;
269  }
270 
272  {
273  // send ack for this message
274  _callback.callback_ack(*this, _next_seqno, getIdentifier());
275  }
276  }
277 
278  void DatagramConnection::stream_send(const char *buf, const dtn::data::Length &len, bool last) throw (DatagramException)
279  {
280  // build the right flags
281  char flags = 0;
282 
283  // if this is the first segment, then set the FIRST bit
284  if (_send_state == SEND_IDLE) flags |= DatagramService::SEGMENT_FIRST;
285 
286  // if this is the last segment, then set the LAST bit
287  if (last) flags |= DatagramService::SEGMENT_LAST;
288 
289  // set the seqno for this segment
290  unsigned int seqno = _last_ack;
291 
292  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 25) << "frame to send, flags: " << (int)flags << ", seqno: " << seqno << ", len: " << len << IBRCOMMON_LOGGER_ENDL;
293 
294  if (_params.flowcontrol == DatagramService::FLOW_STOPNWAIT)
295  {
296  // measure the time until the ack is received
297  ibrcommon::TimeMeasurement tm;
298 
299  // start time measurement
300  tm.start();
301 
302  // max. 5 retries
303  for (size_t i = 0; i < _params.retry_limit; ++i)
304  {
305  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 30) << "transmit frame seqno: " << seqno << IBRCOMMON_LOGGER_ENDL;
306 
307  // send the datagram
308  _callback.callback_send(*this, flags, seqno, getIdentifier(), buf, len);
309 
310  // enter the wait state
311  _send_state = SEND_WAIT_ACK;
312 
313  // set timeout to twice the average round-trip-time
314  struct timespec ts;
315  ibrcommon::Conditional::gettimeout(static_cast<size_t>(_avg_rtt * 2) + 1, &ts);
316 
317  try {
318  ibrcommon::MutexLock l(_ack_cond);
319 
320  // wait here for an ACK
321  while (_last_ack != ((seqno + 1) % _params.max_seq_numbers))
322  {
323  _ack_cond.wait(&ts);
324  }
325 
326  // stop the measurement
327  tm.stop();
328 
329  // success!
330  _send_state = last ? SEND_IDLE : SEND_NEXT;
331 
332  // adjust the average rtt
333  adjust_rtt(tm.getMilliseconds());
334 
335  // report result
336  _callback.reportSuccess(i, tm.getMilliseconds());
337 
338  return;
339  } catch (const ibrcommon::Conditional::ConditionalAbortException &e) {
340  if (e.reason == ibrcommon::Conditional::ConditionalAbortException::COND_TIMEOUT)
341  {
342  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 20) << "ack timeout for seqno " << seqno << IBRCOMMON_LOGGER_ENDL;
343 
344  // fail -> increment the future timeout
345  adjust_rtt(static_cast<double>(_avg_rtt) * 2);
346 
347  // retransmit the frame
348  continue;
349  }
350  else
351  {
352  // aborted
353  break;
354  }
355  }
356  }
357 
358  // maximum number of retransmissions hit
359  _send_state = SEND_ERROR;
360 
361  // report failure
362  _callback.reportFailure();
363 
364  // transmission failed - abort the stream
365  throw DatagramException("transmission failed - abort the stream");
366  }
367  else if (_params.flowcontrol == DatagramService::FLOW_SLIDING_WINDOW)
368  {
369  try {
370  // lock the ACK variables and frame window
371  ibrcommon::MutexLock l(_ack_cond);
372 
373  // timeout value
374  struct timespec ts;
375 
376  // set timeout to twice the average round-trip-time
377  ibrcommon::Conditional::gettimeout(static_cast<size_t>(_avg_rtt * 2) + 1, &ts);
378 
379  // wait until window has at least one free slot
380  while (sw_frames_full()) _ack_cond.wait(&ts);
381 
382  // add new frame to the window
383  _sw_frames.push_back(window_frame());
384 
385  window_frame &new_frame = _sw_frames.back();
386 
387  new_frame.flags = flags;
388  new_frame.seqno = seqno;
389  new_frame.buf.assign(buf, buf+len);
390  new_frame.retry = 0;
391 
392  // start RTT measurement
393  new_frame.tm.start();
394 
395  // send the datagram
396  _callback.callback_send(*this, new_frame.flags, new_frame.seqno, getIdentifier(), &new_frame.buf[0], new_frame.buf.size());
397 
398  // increment next sequence number
399  _last_ack = (seqno + 1) % _params.max_seq_numbers;
400 
401  // enter the wait state
402  _send_state = SEND_WAIT_ACK;
403 
404  // set timeout to twice the average round-trip-time
405  ibrcommon::Conditional::gettimeout(static_cast<size_t>(_avg_rtt * 2) + 1, &ts);
406 
407  // wait until one more slot is available
408  // or no more frames are to ACK (if this was the last frame)
409  while ((last && !_sw_frames.empty()) || (!last && sw_frames_full()))
410  {
411  _ack_cond.wait(&ts);
412  }
413  } catch (const ibrcommon::Conditional::ConditionalAbortException &e) {
414  if (e.reason == ibrcommon::Conditional::ConditionalAbortException::COND_TIMEOUT)
415  {
416  // timeout - retransmit the whole window
417  sw_timeout(last);
418  }
419  else
420  {
421  // maximum number of retransmissions hit
422  _send_state = SEND_ERROR;
423 
424  // report failure
425  _callback.reportFailure();
426 
427  // transmission failed - abort the stream
428  throw DatagramException("transmission failed - abort the stream");
429  }
430  }
431 
432  // if this is the last segment switch directly to IDLE
433  _send_state = last ? SEND_IDLE : SEND_NEXT;
434  }
435  else
436  {
437  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 30) << "transmit frame seqno: " << seqno << IBRCOMMON_LOGGER_ENDL;
438 
439  // send the datagram
440  _callback.callback_send(*this, flags, seqno, getIdentifier(), buf, len);
441 
442  // if this is the last segment switch directly to IDLE
443  _send_state = last ? SEND_IDLE : SEND_NEXT;
444 
445  // increment next sequence number
446  ibrcommon::MutexLock l(_ack_cond);
447  _last_ack = (seqno + 1) % _params.max_seq_numbers;
448  }
449  }
450 
451  bool DatagramConnection::sw_frames_full()
452  {
453  return _sw_frames.size() >= (_params.max_seq_numbers / 2);
454  }
455 
456  void DatagramConnection::sw_timeout(bool last)
457  {
458  // timeout value
459  struct timespec ts;
460 
461  while (true) {
462  try {
463  ibrcommon::MutexLock l(_ack_cond);
464 
465  // fail -> increment the future timeout
466  adjust_rtt(static_cast<double>(_avg_rtt) * 2);
467 
468  if (_sw_frames.size() > 0)
469  {
470  window_frame &front_frame = _sw_frames.front();
471 
472  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 20) << "ack timeout for seqno " << front_frame.seqno << IBRCOMMON_LOGGER_ENDL;
473 
474  if (front_frame.retry > _params.retry_limit) {
475  // maximum number of retransmissions hit
476  _send_state = SEND_ERROR;
477 
478  // report failure
479  _callback.reportFailure();
480 
481  // transmission failed - abort the stream
482  throw DatagramException("transmission failed - abort the stream");
483  }
484  }
485 
486  // retransmit the window
487  for (std::list<window_frame>::iterator it = _sw_frames.begin(); it != _sw_frames.end(); ++it)
488  {
489  window_frame &retry_frame = (*it);
490 
491  // send the datagram
492  _callback.callback_send(*this, retry_frame.flags, retry_frame.seqno, getIdentifier(), &retry_frame.buf[0], retry_frame.buf.size());
493 
494  // increment retry counter
495  retry_frame.retry++;
496  }
497 
498  // enter the wait state
499  _send_state = SEND_WAIT_ACK;
500 
501  // set timeout to twice the average round-trip-time
502  ibrcommon::Conditional::gettimeout(static_cast<size_t>(_avg_rtt * 2) + 1, &ts);
503 
504  // wait until one more slot is available
505  // or no more frames are to ACK (if this was the last frame)
506  while ((last && !_sw_frames.empty()) || (!last && sw_frames_full()))
507  {
508  _ack_cond.wait(&ts);
509  }
510  } catch (const ibrcommon::Conditional::ConditionalAbortException &e) {
511  if (e.reason == ibrcommon::Conditional::ConditionalAbortException::COND_TIMEOUT)
512  {
513  // timeout again - repeat at while loop
514  continue;
515  }
516  }
517 
518  // done
519  return;
520  }
521  }
522 
523  void DatagramConnection::nack(const unsigned int &seqno, const bool temporary)
524  {
525  // if the NACK is temporary skip ignore it
526  // and repeat the frame after the timeout
527  if (temporary) return;
528 
529  // skip the currently transmitted bundle
530  _sender.skip();
531 
532  // handle the NACK as an ACK to move on with the next frame
533  ack(seqno);
534  }
535 
536  void DatagramConnection::ack(const unsigned int &seqno)
537  {
538  ibrcommon::MutexLock l(_ack_cond);
539 
540  switch (_params.flowcontrol) {
542  if (_sw_frames.size() > 0) {
543  window_frame &f = _sw_frames.front();
544 
545  if (seqno == ((f.seqno + 1) % _params.max_seq_numbers)) {
546  // stop the measurement
547  f.tm.stop();
548 
549  // adjust the average rtt
550  adjust_rtt(f.tm.getMilliseconds());
551 
552  // report result
553  _callback.reportSuccess(f.retry, f.tm.getMilliseconds());
554 
555  // remove front element
556  _sw_frames.pop_front();
557  }
558  }
559  break;
560 
561  default:
562  _last_ack = seqno;
563  break;
564  }
565 
566  _ack_cond.signal(true);
567  }
568 
570  {
571  _peer_eid = peer;
572  }
573 
575  {
576  return _peer_eid;
577  }
578 
579  void DatagramConnection::adjust_rtt(double value)
580  {
581  // convert current avg to float
582  double new_rtt = _avg_rtt;
583 
584  // calculate average
585  new_rtt = (new_rtt * AVG_RTT_WEIGHT) + ((1 - AVG_RTT_WEIGHT) * value);
586 
587  // assign the new value
588  _avg_rtt = new_rtt;
589 
590  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 40) << "RTT adjusted, measured value: " << std::setprecision(4) << value << ", new avg. RTT: " << std::setprecision(4) << _avg_rtt << IBRCOMMON_LOGGER_ENDL;
591  }
592 
593  DatagramConnection::Stream::Stream(DatagramConnection &conn, const dtn::data::Length &maxmsglen)
594  : std::iostream(this), _buf_size(maxmsglen), _first_segment(true), _last_segment(false),
595  _queue_buf(_buf_size), _queue_buf_len(0), _queue_buf_head(false),
596  _out_buf(_buf_size), _in_buf(_buf_size),
597  _abort(false), _skip(false), _reject(false), _callback(conn)
598  {
599  // Initialize get pointer. This should be zero so that underflow
600  // is called upon first read.
601  setg(0, 0, 0);
602 
603  // mark the buffer for outgoing data as free
604  // the +1 sparse the first byte in the buffer and leave room
605  // for the processing flags of the segment
606  setp(&_out_buf[0], &_out_buf[0] + _buf_size - 1);
607  }
608 
609  DatagramConnection::Stream::~Stream()
610  {
611  }
612 
613  void DatagramConnection::Stream::queue(const char *buf, const dtn::data::Length &len, bool isFirst) throw (DatagramException)
614  {
615  try {
616  ibrcommon::MutexLock l(_queue_buf_cond);
617  if (_abort) throw DatagramException("stream aborted");
618 
619  // wait until the buffer is free
620  while (_queue_buf_len > 0)
621  {
622  if (_abort) throw DatagramException("stream aborted");
623  _queue_buf_cond.wait();
624  }
625 
626  // copy the new data into the buffer, but leave out the first byte (header)
627  ::memcpy(&_queue_buf[0], buf, len);
628 
629  // store the buffer length
630  _queue_buf_len = len;
631  _queue_buf_head = isFirst;
632 
633  // notify waiting threads
634  _queue_buf_cond.signal();
635  } catch (ibrcommon::Conditional::ConditionalAbortException &ex) {
636  throw DatagramException("stream aborted");
637  }
638  }
639 
640  void DatagramConnection::Stream::skip()
641  {
642  ibrcommon::MutexLock l(_queue_buf_cond);
643  _skip = true;
644  _queue_buf_cond.signal(true);
645  }
646 
647  void DatagramConnection::Stream::reject()
648  {
649  ibrcommon::MutexLock l(_queue_buf_cond);
650 
651  // set reject flag for futher frames
652  _reject = true;
653  _queue_buf_cond.signal(true);
654  }
655 
656  void DatagramConnection::Stream::close()
657  {
658  ibrcommon::MutexLock l(_queue_buf_cond);
659  _abort = true;
660  _queue_buf_cond.abort();
661  }
662 
663  int DatagramConnection::Stream::sync()
664  {
665  // We process the last segment in the set. Set this variable, so
666  // that this information is available for the overflow method.
667  _last_segment = true;
668 
669  int ret = std::char_traits<char>::eq_int_type(this->overflow(
670  std::char_traits<char>::eof()), std::char_traits<char>::eof()) ? -1
671  : 0;
672 
673  return ret;
674  }
675 
676  std::char_traits<char>::int_type DatagramConnection::Stream::overflow(std::char_traits<char>::int_type c)
677  {
678  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 40) << "Stream::overflow()" << IBRCOMMON_LOGGER_ENDL;
679 
680  if (_abort) throw DatagramException("stream aborted");
681 
682  char *ibegin = &_out_buf[0];
683  char *iend = pptr();
684 
685  // mark the buffer for outgoing data as free
686  // the +1 sparse the first byte in the buffer and leave room
687  // for the processing flags of the segment
688  setp(&_out_buf[0], &_out_buf[0] + _buf_size - 1);
689 
690  if (!std::char_traits<char>::eq_int_type(c, std::char_traits<char>::eof()))
691  {
692  *iend++ = std::char_traits<char>::to_char_type(c);
693  }
694 
695  // bytes to send
696  const dtn::data::Length bytes = (iend - ibegin);
697 
698  // if there is nothing to send, just return
699  if (bytes == 0)
700  {
701  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 35) << "Stream::overflow() nothing to sent" << IBRCOMMON_LOGGER_ENDL;
702  return std::char_traits<char>::not_eof(c);
703  }
704 
705  try {
706  // disable skipping if this is the first segment
707  if (_first_segment) _skip = false;
708 
709  // send segment to CL, use callback interface
710  if (!_skip) _callback.stream_send(&_out_buf[0], bytes, _last_segment);
711 
712  // set the flags for the next segment
713  _first_segment = _last_segment;
714  _last_segment = false;
715  } catch (const DatagramException &ex) {
716  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 35) << "Stream::overflow() exception: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
717 
718  // close this stream
719  close();
720 
721  // re-throw the DatagramException
722  throw;
723  }
724 
725  return std::char_traits<char>::not_eof(c);
726  }
727 
728  std::char_traits<char>::int_type DatagramConnection::Stream::underflow()
729  {
730  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 40) << "Stream::underflow()" << IBRCOMMON_LOGGER_ENDL;
731 
732  try {
733  ibrcommon::MutexLock l(_queue_buf_cond);
734  if (_abort) throw ibrcommon::Exception("stream aborted");
735 
736  // ignore this frame if this frame set is rejected
737  while ((_queue_buf_len == 0) || (_reject && !_queue_buf_head))
738  {
739  // clear the buffer
740  _queue_buf_len = 0;
741  _queue_buf_cond.signal(true);
742 
743  if (_abort) throw ibrcommon::Exception("stream aborted");
744  _queue_buf_cond.wait();
745  }
746 
747  // reset reject
748  _reject = false;
749 
750  // copy the queue buffer to an internal buffer
751  ::memcpy(&_in_buf[0], &_queue_buf[0], _queue_buf_len);
752 
753  // Since the input buffer content is now valid (or is new)
754  // the get pointer should be initialized (or reset).
755  setg(&_in_buf[0], &_in_buf[0], &_in_buf[0] + _queue_buf_len);
756 
757  // mark the queue buffer as free
758  _queue_buf_len = 0;
759  _queue_buf_cond.signal();
760 
761  return std::char_traits<char>::not_eof(_in_buf[0]);
762  } catch (ibrcommon::Conditional::ConditionalAbortException &ex) {
763  throw DatagramException("stream aborted");
764  }
765  }
766 
767  DatagramConnection::Sender::Sender(DatagramConnection &conn, Stream &stream)
768  : _stream(stream), _connection(conn), _skip(false)
769  {
770  }
771 
772  DatagramConnection::Sender::~Sender()
773  {
774  }
775 
776  void DatagramConnection::Sender::skip() throw ()
777  {
778  // skip all data of the current transmission
779  _skip = true;
780  _stream.skip();
781  }
782 
783  void DatagramConnection::Sender::run() throw ()
784  {
785  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 40) << "Sender::run()"<< IBRCOMMON_LOGGER_ENDL;
786 
787  try {
788  // get reference to the storage
790 
791  // create a filter context
792  dtn::core::FilterContext context;
793  context.setProtocol(_connection._callback.getDiscoveryProtocol());
794 
795  // create a standard serializer
796  dtn::data::DefaultSerializer serializer(_stream);
797 
798  // as long as the stream is marked as good ...
799  while(_stream.good())
800  {
801  // get the next job
802  dtn::net::BundleTransfer job = queue.poll();
803 
804  try {
805  // read the bundle out of the storage
806  dtn::data::Bundle bundle = storage.get(job.getBundle());
807 
808  // push bundle through the filter routines
809  context.setBundle(bundle);
810  context.setPeer(job.getNeighbor());
812 
813  if (ret != BundleFilter::ACCEPT) {
815  continue;
816  }
817 
818  // reset skip flag
819  _skip = false;
820 
821  // write the bundle into the stream
822  serializer << bundle; _stream.flush();
823 
824  // check if the stream is still marked as good
825  if (_stream.good())
826  {
827  // check if last transmission was refused
828  if (_skip) {
829  // send transfer aborted event
831  } else {
832  // bundle send completely - raise bundle event
833  job.complete();
834  }
835  }
836  } catch (const dtn::storage::NoBundleFoundException&) {
837  // could not load the bundle, abort the job
839  }
840  }
841 
842  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 25) << "Sender::run() stream destroyed"<< IBRCOMMON_LOGGER_ENDL;
843  } catch (const ibrcommon::QueueUnblockedException &ex) {
844  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 25) << "Sender::run() exception: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
845  return;
846  } catch (std::exception &ex) {
847  IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 25) << "Sender::run() exception: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
848  }
849  }
850 
851  void DatagramConnection::Sender::finally() throw ()
852  {
853  }
854 
855  void DatagramConnection::Sender::__cancellation() throw ()
856  {
857  // abort all blocking operations on the stream
858  _stream.close();
859 
860  // abort blocking calls on the queue
861  queue.abort();
862  }
863  } /* namespace data */
864 } /* namespace dtn */
std::string toString() const
Definition: BundleID.cpp:190
void nack(const unsigned int &seqno, const bool temporary)
virtual dtn::core::Node::Protocol getDiscoveryProtocol() const =0
size_t Length
Definition: Number.h:33
void setBundle(const dtn::data::Bundle &data)
virtual void callback_ack(DatagramConnection &connection, const unsigned int &seqno, const std::string &destination)=0
#define AVG_RTT_WEIGHT
BundleFilter::ACTION filter(BundleFilter::TABLE table, const FilterContext &context, dtn::data::Bundle &bundle) const
Definition: BundleCore.cpp:598
void queue(const dtn::net::BundleTransfer &job)
void setProtocol(const dtn::core::Node::Protocol &protocol)
void setPeer(const dtn::data::EID &endpoint)
virtual void callback_send(DatagramConnection &connection, const char &flags, const unsigned int &seqno, const std::string &destination, const char *buf, const dtn::data::Length &len)=0
const std::string TAG
Definition: dtnoutbox.cpp:62
virtual void reportSuccess(size_t retries, double rtt)
const dtn::data::EID & getNeighbor() const
virtual void connectionUp(const DatagramConnection *conn)=0
virtual dtn::data::Bundle get(const dtn::data::BundleID &id)=0
void abort(const TransferAbortedEvent::AbortReason reason)
virtual void connectionDown(const DatagramConnection *conn)=0
DatagramConnection(const std::string &identifier, const DatagramService::Parameter &params, DatagramConnectionCallback &callback)
const dtn::data::MetaBundle & getBundle() const
std::string getString() const
Definition: EID.cpp:374
const dtn::data::EID & getPeerEID()
dtn::storage::BundleStorage & getStorage()
Definition: BundleCore.cpp:237
const std::string & getIdentifier() const
void setPeerEID(const dtn::data::EID &peer)
static void raise(const dtn::data::EID &peer, const dtn::data::Bundle &bundle, const bool local=false)
static BundleCore & getInstance()
Definition: BundleCore.cpp:82
void ack(const unsigned int &seqno)