32 #include <ibrcommon/TimeMeasurement.h>
33 #include <ibrcommon/Logger.h>
38 #define AVG_RTT_WEIGHT 0.875
44 const std::string DatagramConnection::TAG =
"DatagramConnection";
47 : _send_state(SEND_IDLE), _recv_state(RECV_IDLE), _callback(callback), _identifier(identifier), _stream(*this, params.max_msg_length), _sender(*this, _stream),
48 _last_ack(0), _next_seqno(0), _head_buf(params.max_msg_length), _head_len(0), _params(params), _avg_rtt(static_cast<double>(params.initial_timeout))
64 IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 40) <<
"shutdown()" << IBRCOMMON_LOGGER_ENDL;
75 }
catch (
const ibrcommon::Exception&) { };
80 IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 40) <<
"run()" << IBRCOMMON_LOGGER_ENDL;
97 deserializer >> bundle;
104 case BundleFilter::ACCEPT:
109 case BundleFilter::REJECT:
113 case BundleFilter::DROP:
117 IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 25) <<
"Bundle rejected: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
122 IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 25) <<
"Received an invalid bundle: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
128 }
catch (std::exception &ex) {
129 IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 25) <<
"Main-thread died: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
135 IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 40) <<
"setup()" << IBRCOMMON_LOGGER_ENDL;
143 IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 40) <<
"finally()" << IBRCOMMON_LOGGER_ENDL;
146 ibrcommon::MutexLock l(_ack_cond);
148 }
catch (
const std::exception&) { };
156 }
catch (
const std::exception&) { };
161 }
catch (
const ibrcommon::MutexException&) { };
176 _sender.queue.push(job);
186 IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 25) <<
"frame received, flags: " << (int)flags <<
", seqno: " << seqno <<
", len: " << len << IBRCOMMON_LOGGER_ENDL;
194 if (_next_seqno != seqno)
202 IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 45) <<
"full segment received" << IBRCOMMON_LOGGER_ENDL;
205 _stream.queue(buf, len,
true);
208 _recv_state = RECV_IDLE;
210 else if (flags & DatagramService::SEGMENT_FIRST)
212 IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 45) <<
"first segment received" << IBRCOMMON_LOGGER_ENDL;
216 if (_recv_state == RECV_IDLE)
220 ::memcpy(&_head_buf[0], buf, len);
224 _recv_state = RECV_HEAD;
226 else if (_recv_state == RECV_HEAD)
231 ::memcpy(&_head_buf[0], buf, len);
242 IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 45) << ((flags &
DatagramService::SEGMENT_LAST) ?
"last" :
"middle") <<
" segment received" << IBRCOMMON_LOGGER_ENDL;
245 if (_recv_state == RECV_HEAD)
248 _stream.queue(&_head_buf[0], _head_len,
true);
252 _recv_state = RECV_TRANSMISSION;
256 _stream.queue(buf, len,
false);
258 if (flags & DatagramService::SEGMENT_LAST)
261 _recv_state = RECV_IDLE;
268 IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 15) <<
"sequence number received " << seqno <<
", expected " << ex.
expected_seqno << IBRCOMMON_LOGGER_ENDL;
290 unsigned int seqno = _last_ack;
292 IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 25) <<
"frame to send, flags: " << (int)flags <<
", seqno: " << seqno <<
", len: " << len << IBRCOMMON_LOGGER_ENDL;
297 ibrcommon::TimeMeasurement tm;
303 for (
size_t i = 0; i < _params.retry_limit; ++i)
305 IBRCOMMON_LOGGER_DEBUG_TAG(DatagramConnection::TAG, 30) <<
"transmit frame seqno: " << seqno << IBRCOMMON_LOGGER_ENDL;
308 _callback.callback_send(*
this, flags, seqno, getIdentifier(), buf, len);
311 _send_state = SEND_WAIT_ACK;
315 ibrcommon::Conditional::gettimeout(static_cast<size_t>(_avg_rtt * 2) + 1, &ts);
318 ibrcommon::MutexLock l(_ack_cond);
321 while (_last_ack != ((seqno + 1) % _params.max_seq_numbers))
330 _send_state = last ? SEND_IDLE : SEND_NEXT;
333 adjust_rtt(tm.getMilliseconds());
336 _callback.reportSuccess(i, tm.getMilliseconds());
339 }
catch (
const ibrcommon::Conditional::ConditionalAbortException &e) {
340 if (e.reason == ibrcommon::Conditional::ConditionalAbortException::COND_TIMEOUT)
342 IBRCOMMON_LOGGER_DEBUG_TAG(
DatagramConnection::TAG, 20) <<
"ack timeout for seqno " << seqno << IBRCOMMON_LOGGER_ENDL;
345 adjust_rtt(static_cast<double>(_avg_rtt) * 2);
359 _send_state = SEND_ERROR;
362 _callback.reportFailure();
371 ibrcommon::MutexLock l(_ack_cond);
377 ibrcommon::Conditional::gettimeout(static_cast<size_t>(_avg_rtt * 2) + 1, &ts);
380 while (sw_frames_full()) _ack_cond.wait(&ts);
383 _sw_frames.push_back(window_frame());
385 window_frame &new_frame = _sw_frames.back();
387 new_frame.flags = flags;
388 new_frame.seqno = seqno;
389 new_frame.buf.assign(buf, buf+len);
393 new_frame.tm.start();
396 _callback.callback_send(*
this, new_frame.flags, new_frame.seqno, getIdentifier(), &new_frame.buf[0], new_frame.buf.size());
399 _last_ack = (seqno + 1) % _params.max_seq_numbers;
402 _send_state = SEND_WAIT_ACK;
405 ibrcommon::Conditional::gettimeout(static_cast<size_t>(_avg_rtt * 2) + 1, &ts);
409 while ((last && !_sw_frames.empty()) || (!last && sw_frames_full()))
413 }
catch (
const ibrcommon::Conditional::ConditionalAbortException &e) {
414 if (e.reason == ibrcommon::Conditional::ConditionalAbortException::COND_TIMEOUT)
422 _send_state = SEND_ERROR;
425 _callback.reportFailure();
433 _send_state = last ? SEND_IDLE : SEND_NEXT;
437 IBRCOMMON_LOGGER_DEBUG_TAG(
DatagramConnection::TAG, 30) <<
"transmit frame seqno: " << seqno << IBRCOMMON_LOGGER_ENDL;
440 _callback.callback_send(*
this, flags, seqno, getIdentifier(), buf, len);
443 _send_state = last ? SEND_IDLE : SEND_NEXT;
446 ibrcommon::MutexLock l(_ack_cond);
447 _last_ack = (seqno + 1) % _params.max_seq_numbers;
451 bool DatagramConnection::sw_frames_full()
456 void DatagramConnection::sw_timeout(
bool last)
463 ibrcommon::MutexLock l(_ack_cond);
466 adjust_rtt(static_cast<double>(_avg_rtt) * 2);
468 if (_sw_frames.size() > 0)
470 window_frame &front_frame = _sw_frames.front();
472 IBRCOMMON_LOGGER_DEBUG_TAG(
DatagramConnection::TAG, 20) <<
"ack timeout for seqno " << front_frame.seqno << IBRCOMMON_LOGGER_ENDL;
476 _send_state = SEND_ERROR;
487 for (std::list<window_frame>::iterator it = _sw_frames.begin(); it != _sw_frames.end(); ++it)
489 window_frame &retry_frame = (*it);
492 _callback.
callback_send(*
this, retry_frame.flags, retry_frame.seqno,
getIdentifier(), &retry_frame.buf[0], retry_frame.buf.size());
499 _send_state = SEND_WAIT_ACK;
502 ibrcommon::Conditional::gettimeout(static_cast<size_t>(_avg_rtt * 2) + 1, &ts);
506 while ((last && !_sw_frames.empty()) || (!last && sw_frames_full()))
510 }
catch (
const ibrcommon::Conditional::ConditionalAbortException &e) {
511 if (e.reason == ibrcommon::Conditional::ConditionalAbortException::COND_TIMEOUT)
527 if (temporary)
return;
538 ibrcommon::MutexLock l(_ack_cond);
542 if (_sw_frames.size() > 0) {
543 window_frame &f = _sw_frames.front();
550 adjust_rtt(f.tm.getMilliseconds());
556 _sw_frames.pop_front();
566 _ack_cond.signal(
true);
579 void DatagramConnection::adjust_rtt(
double value)
582 double new_rtt = _avg_rtt;
590 IBRCOMMON_LOGGER_DEBUG_TAG(
DatagramConnection::TAG, 40) <<
"RTT adjusted, measured value: " << std::setprecision(4) << value <<
", new avg. RTT: " << std::setprecision(4) << _avg_rtt << IBRCOMMON_LOGGER_ENDL;
594 : std::iostream(this), _buf_size(maxmsglen), _first_segment(true), _last_segment(false),
595 _queue_buf(_buf_size), _queue_buf_len(0), _queue_buf_head(false),
596 _out_buf(_buf_size), _in_buf(_buf_size),
597 _abort(false), _skip(false), _reject(false), _callback(conn)
606 setp(&_out_buf[0], &_out_buf[0] + _buf_size - 1);
609 DatagramConnection::Stream::~Stream()
616 ibrcommon::MutexLock l(_queue_buf_cond);
620 while (_queue_buf_len > 0)
623 _queue_buf_cond.wait();
627 ::memcpy(&_queue_buf[0], buf, len);
630 _queue_buf_len = len;
631 _queue_buf_head = isFirst;
634 _queue_buf_cond.signal();
635 }
catch (ibrcommon::Conditional::ConditionalAbortException &ex) {
640 void DatagramConnection::Stream::skip()
642 ibrcommon::MutexLock l(_queue_buf_cond);
644 _queue_buf_cond.signal(
true);
647 void DatagramConnection::Stream::reject()
649 ibrcommon::MutexLock l(_queue_buf_cond);
653 _queue_buf_cond.signal(
true);
656 void DatagramConnection::Stream::close()
658 ibrcommon::MutexLock l(_queue_buf_cond);
660 _queue_buf_cond.abort();
663 int DatagramConnection::Stream::sync()
667 _last_segment =
true;
669 int ret = std::char_traits<char>::eq_int_type(this->overflow(
670 std::char_traits<char>::eof()), std::char_traits<char>::eof()) ? -1
676 std::char_traits<char>::int_type DatagramConnection::Stream::overflow(std::char_traits<char>::int_type c)
682 char *ibegin = &_out_buf[0];
688 setp(&_out_buf[0], &_out_buf[0] + _buf_size - 1);
690 if (!std::char_traits<char>::eq_int_type(c, std::char_traits<char>::eof()))
692 *iend++ = std::char_traits<char>::to_char_type(c);
701 IBRCOMMON_LOGGER_DEBUG_TAG(
DatagramConnection::TAG, 35) <<
"Stream::overflow() nothing to sent" << IBRCOMMON_LOGGER_ENDL;
702 return std::char_traits<char>::not_eof(c);
707 if (_first_segment) _skip =
false;
710 if (!_skip) _callback.stream_send(&_out_buf[0], bytes, _last_segment);
713 _first_segment = _last_segment;
714 _last_segment =
false;
716 IBRCOMMON_LOGGER_DEBUG_TAG(
DatagramConnection::TAG, 35) <<
"Stream::overflow() exception: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
725 return std::char_traits<char>::not_eof(c);
728 std::char_traits<char>::int_type DatagramConnection::Stream::underflow()
733 ibrcommon::MutexLock l(_queue_buf_cond);
734 if (_abort)
throw ibrcommon::Exception(
"stream aborted");
737 while ((_queue_buf_len == 0) || (_reject && !_queue_buf_head))
741 _queue_buf_cond.signal(
true);
743 if (_abort)
throw ibrcommon::Exception(
"stream aborted");
744 _queue_buf_cond.wait();
751 ::memcpy(&_in_buf[0], &_queue_buf[0], _queue_buf_len);
755 setg(&_in_buf[0], &_in_buf[0], &_in_buf[0] + _queue_buf_len);
759 _queue_buf_cond.signal();
761 return std::char_traits<char>::not_eof(_in_buf[0]);
762 }
catch (ibrcommon::Conditional::ConditionalAbortException &ex) {
768 : _stream(stream), _connection(conn), _skip(false)
772 DatagramConnection::Sender::~Sender()
776 void DatagramConnection::Sender::skip() throw ()
783 void DatagramConnection::Sender::run() throw ()
793 context.
setProtocol(_connection._callback.getDiscoveryProtocol());
799 while(_stream.good())
813 if (ret != BundleFilter::ACCEPT) {
822 serializer << bundle; _stream.flush();
842 IBRCOMMON_LOGGER_DEBUG_TAG(
DatagramConnection::TAG, 25) <<
"Sender::run() stream destroyed"<< IBRCOMMON_LOGGER_ENDL;
843 }
catch (
const ibrcommon::QueueUnblockedException &ex) {
844 IBRCOMMON_LOGGER_DEBUG_TAG(
DatagramConnection::TAG, 25) <<
"Sender::run() exception: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
846 }
catch (std::exception &ex) {
847 IBRCOMMON_LOGGER_DEBUG_TAG(
DatagramConnection::TAG, 25) <<
"Sender::run() exception: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
851 void DatagramConnection::Sender::finally() throw ()
855 void DatagramConnection::Sender::__cancellation() throw ()
std::string toString() const
void nack(const unsigned int &seqno, const bool temporary)
virtual dtn::core::Node::Protocol getDiscoveryProtocol() const =0
void setBundle(const dtn::data::Bundle &data)
virtual void callback_ack(DatagramConnection &connection, const unsigned int &seqno, const std::string &destination)=0
const size_t expected_seqno
BundleFilter::ACTION filter(BundleFilter::TABLE table, const FilterContext &context, dtn::data::Bundle &bundle) const
unsigned int max_seq_numbers
void queue(const dtn::net::BundleTransfer &job)
void setProtocol(const dtn::core::Node::Protocol &protocol)
void setPeer(const dtn::data::EID &endpoint)
virtual void callback_send(DatagramConnection &connection, const char &flags, const unsigned int &seqno, const std::string &destination, const char *buf, const dtn::data::Length &len)=0
virtual void reportSuccess(size_t retries, double rtt)
const dtn::data::EID & getNeighbor() const
virtual void connectionUp(const DatagramConnection *conn)=0
virtual void __cancellation()
virtual ~DatagramConnection()
virtual dtn::data::Bundle get(const dtn::data::BundleID &id)=0
void abort(const TransferAbortedEvent::AbortReason reason)
virtual void connectionDown(const DatagramConnection *conn)=0
DatagramConnection(const std::string &identifier, const DatagramService::Parameter ¶ms, DatagramConnectionCallback &callback)
const dtn::data::MetaBundle & getBundle() const
std::string getString() const
const dtn::data::EID & getPeerEID()
dtn::storage::BundleStorage & getStorage()
const std::string & getIdentifier() const
virtual void reportFailure()
void setPeerEID(const dtn::data::EID &peer)
static void raise(const dtn::data::EID &peer, const dtn::data::Bundle &bundle, const bool local=false)
static BundleCore & getInstance()
void ack(const unsigned int &seqno)