23 #include <ibrcommon/Logger.h>
24 #include <ibrcommon/TimeMeasurement.h>
32 : _buffer_size(buffer_size), _statebits(STREAM_SOB),
_conn(conn), in_buf_(buffer_size), out_buf_(buffer_size), _stream(stream),
33 _recv_size(0), _underflow_data_remain(0), _underflow_state(IDLE), _idle_timer(*
this, 0)
37 setp(&out_buf_[0], &out_buf_[0] + _buffer_size - 1);
40 StreamConnection::StreamBuffer::~StreamBuffer()
46 bool StreamConnection::StreamBuffer::get(
const StateBits bit)
const
48 return (_statebits & bit);
51 void StreamConnection::StreamBuffer::set(
const StateBits bit)
53 ibrcommon::MutexLock l(_statelock);
57 void StreamConnection::StreamBuffer::unset(
const StateBits bit)
59 ibrcommon::MutexLock l(_statelock);
63 void StreamConnection::StreamBuffer::__error()
const
65 IBRCOMMON_LOGGER_DEBUG_TAG(
"StreamBuffer", 80) <<
"StreamBuffer Debugging" << IBRCOMMON_LOGGER_ENDL;
66 IBRCOMMON_LOGGER_DEBUG_TAG(
"StreamBuffer", 80) <<
"---------------------------------------" << IBRCOMMON_LOGGER_ENDL;
67 IBRCOMMON_LOGGER_DEBUG_TAG(
"StreamBuffer", 80) <<
"Buffer size: " << _buffer_size << IBRCOMMON_LOGGER_ENDL;
68 IBRCOMMON_LOGGER_DEBUG_TAG(
"StreamBuffer", 80) <<
"State bits: " << _statebits << IBRCOMMON_LOGGER_ENDL;
69 IBRCOMMON_LOGGER_DEBUG_TAG(
"StreamBuffer", 80) <<
"Recv size: " << _recv_size.toString() << IBRCOMMON_LOGGER_ENDL;
70 IBRCOMMON_LOGGER_DEBUG_TAG(
"StreamBuffer", 80) <<
"Segments: " << _segments.size() << IBRCOMMON_LOGGER_ENDL;
71 IBRCOMMON_LOGGER_DEBUG_TAG(
"StreamBuffer", 80) <<
"Reject segments: " << _rejected_segments.size() << IBRCOMMON_LOGGER_ENDL;
72 IBRCOMMON_LOGGER_DEBUG_TAG(
"StreamBuffer", 80) <<
"Underflow remaining: " << _underflow_data_remain << IBRCOMMON_LOGGER_ENDL;
73 IBRCOMMON_LOGGER_DEBUG_TAG(
"StreamBuffer", 80) <<
"Underflow state: " << _underflow_state << IBRCOMMON_LOGGER_ENDL;
74 IBRCOMMON_LOGGER_DEBUG_TAG(
"StreamBuffer", 80) <<
"---------------------------------------" << IBRCOMMON_LOGGER_ENDL;
76 if (_statebits & STREAM_FAILED)
78 IBRCOMMON_LOGGER_DEBUG_TAG(
"StreamBuffer", 80) <<
"stream went bad: STREAM_FAILED is set" << IBRCOMMON_LOGGER_ENDL;
81 if (_statebits & STREAM_BAD)
83 IBRCOMMON_LOGGER_DEBUG_TAG(
"StreamBuffer", 80) <<
"stream went bad: STREAM_BAD is set" << IBRCOMMON_LOGGER_ENDL;
86 if (_statebits & STREAM_EOF)
88 IBRCOMMON_LOGGER_DEBUG_TAG(
"StreamBuffer", 80) <<
"stream went bad: STREAM_EOF is set" << IBRCOMMON_LOGGER_ENDL;
91 if (_statebits & STREAM_SHUTDOWN)
93 IBRCOMMON_LOGGER_DEBUG_TAG(
"StreamBuffer", 80) <<
"stream went bad: STREAM_SHUTDOWN is set" << IBRCOMMON_LOGGER_ENDL;
96 if (_statebits & STREAM_CLOSED)
98 IBRCOMMON_LOGGER_DEBUG_TAG(
"StreamBuffer", 80) <<
"stream went bad: STREAM_CLOSED is set" << IBRCOMMON_LOGGER_ENDL;
103 IBRCOMMON_LOGGER_DEBUG_TAG(
"StreamBuffer", 80) <<
"stream went bad: good() returned false" << IBRCOMMON_LOGGER_ENDL;
107 bool StreamConnection::StreamBuffer::__good()
const
109 int badbits = STREAM_FAILED | STREAM_BAD | STREAM_EOF | STREAM_SHUTDOWN | STREAM_CLOSED;
110 return !(badbits & _statebits);
128 ibrcommon::MutexLock l(_sendlock);
131 _stream << header << std::flush;
142 if (peer._keepalive > 0)
145 set(STREAM_TIMER_SUPPORT);
149 set(STREAM_HANDSHAKE);
151 }
catch (
const std::exception&) {
159 _conn.shutdown(CONNECTION_SHUTDOWN_ERROR);
162 throw StreamErrorException(
"handshake not completed");
177 ibrcommon::MutexLock l(_sendlock);
180 }
catch (
const std::exception&) {
184 throw StreamErrorException(
"can not send shutdown message");
188 void StreamConnection::StreamBuffer::keepalive()
192 ibrcommon::MutexTryLock l(_sendlock);
194 IBRCOMMON_LOGGER_DEBUG_TAG(
"StreamBuffer", 15) <<
"KEEPALIVE sent" << IBRCOMMON_LOGGER_ENDL;
195 }
catch (
const ibrcommon::MutexException&) {
199 }
catch (
const std::exception&) {
205 void StreamConnection::StreamBuffer::close()
208 set(STREAM_SHUTDOWN);
211 void StreamConnection::StreamBuffer::reject()
222 void StreamConnection::StreamBuffer::abort()
227 void StreamConnection::StreamBuffer::wait()
233 IBRCOMMON_LOGGER_DEBUG_TAG(
"StreamBuffer", 15) <<
"waitCompleted(): wait for completion of transmission, " << _segments.size() <<
" ACKs left" << IBRCOMMON_LOGGER_ENDL;
234 _segments.wait(ibrcommon::Queue<StreamDataSegment>::QUEUE_EMPTY, timeout);
235 IBRCOMMON_LOGGER_DEBUG_TAG(
"StreamBuffer", 15) <<
"waitCompleted(): transfer completed" << IBRCOMMON_LOGGER_ENDL;
236 }
catch (
const ibrcommon::QueueUnblockedException&) {
237 IBRCOMMON_LOGGER_DEBUG_TAG(
"StreamBuffer", 15) <<
"waitCompleted(): transfer aborted (timeout)" << IBRCOMMON_LOGGER_ENDL;
244 std::char_traits<char>::int_type StreamConnection::StreamBuffer::overflow(std::char_traits<char>::int_type c)
246 IBRCOMMON_LOGGER_DEBUG_TAG(
"StreamBuffer", 90) <<
"overflow() called" << IBRCOMMON_LOGGER_ENDL;
249 char *ibegin = &out_buf_[0];
253 setp(&out_buf_[0], &out_buf_[0] + _buffer_size - 1);
256 if(!traits_type::eq_int_type(c, traits_type::eof())) {
257 *iend++ = traits_type::to_char_type(c);
261 if ((iend - ibegin) == 0)
263 return traits_type::not_eof(c);
277 if (char_traits<char>::eq_int_type(c, char_traits<char>::eof()))
284 if (!
get(STREAM_SKIP))
287 if (
get(STREAM_ACK_SUPPORT))
295 _conn.eventBundleForwarded();
298 ibrcommon::MutexLock l(_sendlock);
299 if (!_stream.good())
throw StreamErrorException(
"stream went bad");
303 _stream.write(&out_buf_[0], seg._value.get<
size_t>());
306 _conn._callback.addTrafficOut(seg._value.get<
size_t>());
309 return traits_type::not_eof(c);
310 }
catch (
const StreamClosedException&) {
314 IBRCOMMON_LOGGER_DEBUG_TAG(
"StreamBuffer", 10) <<
"StreamClosedException in overflow()" << IBRCOMMON_LOGGER_ENDL;
317 }
catch (
const StreamErrorException&) {
321 IBRCOMMON_LOGGER_DEBUG_TAG(
"StreamBuffer", 10) <<
"StreamErrorException in overflow()" << IBRCOMMON_LOGGER_ENDL;
324 }
catch (
const ios_base::failure&) {
328 IBRCOMMON_LOGGER_DEBUG_TAG(
"StreamBuffer", 10) <<
"ios_base::failure in overflow()" << IBRCOMMON_LOGGER_ENDL;
333 return traits_type::eof();
338 int StreamConnection::StreamBuffer::sync()
340 int ret = traits_type::eq_int_type(this->overflow(traits_type::eof()),
341 traits_type::eof()) ? -1 : 0;
344 ibrcommon::MutexLock l(_sendlock);
348 }
catch (
const ios_base::failure&) {
352 _conn.shutdown(CONNECTION_SHUTDOWN_ERROR);
361 std::vector<char> tmpbuf(_buffer_size);
365 while (size > 0 && _stream.good())
368 if (size < _buffer_size) readsize = size;
371 _stream.read(&tmpbuf[0], (std::streamsize)readsize);
374 _conn._callback.addTrafficIn(readsize);
382 }
catch (
const ios_base::failure &ex) {
383 _underflow_state = IDLE;
384 throw StreamErrorException(
"read error during data skip: " + std::string(ex.what()));
389 std::char_traits<char>::int_type StreamConnection::StreamBuffer::underflow()
391 IBRCOMMON_LOGGER_DEBUG_TAG(
"StreamBuffer", 90) <<
"StreamBuffer::underflow() called" << IBRCOMMON_LOGGER_ENDL;
394 if (_underflow_state == DATA_TRANSFER)
397 if (
get(STREAM_REJECT))
400 if (
get(STREAM_NACK_SUPPORT))
402 ibrcommon::MutexLock l(_sendlock);
403 if (!_stream.good())
throw StreamErrorException(
"stream went bad");
410 skipData(_underflow_data_remain);
413 _underflow_state = IDLE;
416 else if (_underflow_data_remain == 0)
419 if (
get(STREAM_ACK_SUPPORT))
421 ibrcommon::MutexLock l(_sendlock);
422 if (!_stream.good())
throw StreamErrorException(
"stream went bad");
427 _underflow_state = IDLE;
432 while (_underflow_state == IDLE)
439 if (!_stream.good())
throw StreamErrorException(
"stream went bad");
442 }
catch (
const ios_base::failure &ex) {
443 throw StreamErrorException(
"read error: " + std::string(ex.what()));
456 IBRCOMMON_LOGGER_DEBUG_TAG(
"StreamBuffer", 70) <<
"MSG_DATA_SEGMENT received, size: " << seg.
_value.
toString() << IBRCOMMON_LOGGER_ENDL;
461 unset(STREAM_REJECT);
471 if (
get(STREAM_REJECT))
474 if (
get(STREAM_NACK_SUPPORT))
477 ibrcommon::MutexLock l(_sendlock);
478 if (!_stream.good())
throw StreamErrorException(
"stream went bad");
485 skipData(_underflow_data_remain);
490 _underflow_state = DATA_TRANSFER;
497 IBRCOMMON_LOGGER_DEBUG_TAG(
"StreamBuffer", 70) <<
"MSG_ACK_SEGMENT received, size: " << seg.
_value.
toString() << IBRCOMMON_LOGGER_ENDL;
500 if (
get(STREAM_ACK_SUPPORT))
502 ibrcommon::Queue<StreamDataSegment>::Locked q = _segments.exclusive();
505 IBRCOMMON_LOGGER_TAG(
"StreamBuffer", error) <<
"got an unexpected ACK with size of " << seg.
_value.
toString() << IBRCOMMON_LOGGER_ENDL;
511 IBRCOMMON_LOGGER_DEBUG_TAG(
"StreamBuffer", 60) << q.size() <<
" elements to ACK" << IBRCOMMON_LOGGER_ENDL;
517 _conn.eventBundleForwarded();
527 IBRCOMMON_LOGGER_DEBUG_TAG(
"StreamBuffer", 70) <<
"MSG_KEEPALIVE received, size: " << seg.
_value.
toString() << IBRCOMMON_LOGGER_ENDL;
532 IBRCOMMON_LOGGER_DEBUG_TAG(
"StreamBuffer", 70) <<
"MSG_REFUSE_BUNDLE received, flags: " << (int)seg.
_flags << IBRCOMMON_LOGGER_ENDL;
537 if (
get(STREAM_ACK_SUPPORT) &&
get(STREAM_NACK_SUPPORT))
540 if (!_rejected_segments.empty())
542 _rejected_segments.pop();
545 IBRCOMMON_LOGGER_DEBUG_TAG(
"StreamBuffer", 30) <<
"NACK received, still " << _rejected_segments.size() <<
" segments to NACK" << IBRCOMMON_LOGGER_ENDL;
552 IBRCOMMON_LOGGER_DEBUG_TAG(
"StreamBuffer", 20) <<
"NACK received!" << IBRCOMMON_LOGGER_ENDL;
555 while (!_segments.empty())
564 _rejected_segments.push(seg);
569 _conn.eventBundleRefused();
572 IBRCOMMON_LOGGER_DEBUG_TAG(
"StreamBuffer", 30) << _rejected_segments.size() <<
" segments to NACK" << IBRCOMMON_LOGGER_ENDL;
575 if (_segments.empty())
580 IBRCOMMON_LOGGER_DEBUG_TAG(
"StreamBuffer", 25) <<
"skip the current transfer" << IBRCOMMON_LOGGER_ENDL;
583 }
catch (
const ibrcommon::QueueUnblockedException&) {
584 IBRCOMMON_LOGGER_TAG(
"StreamBuffer", error) <<
"got an unexpected NACK" << IBRCOMMON_LOGGER_ENDL;
590 IBRCOMMON_LOGGER_TAG(
"StreamBuffer", error) <<
"got an unexpected NACK" << IBRCOMMON_LOGGER_ENDL;
598 IBRCOMMON_LOGGER_DEBUG_TAG(
"StreamBuffer", 70) <<
"MSG_SHUTDOWN received" << IBRCOMMON_LOGGER_ENDL;
599 throw StreamShutdownException();
606 if (_underflow_data_remain < _buffer_size) readsize = _underflow_data_remain;
609 if (!_stream.good())
throw StreamErrorException(
"stream went bad");
612 _stream.read(&in_buf_[0], (std::streamsize)readsize);
615 _conn._callback.addTrafficIn(readsize);
619 }
catch (
const ios_base::failure &ex) {
620 _underflow_state = IDLE;
621 throw StreamErrorException(
"read error: " + std::string(ex.what()));
625 _underflow_data_remain -= readsize;
629 setg(&in_buf_[0], &in_buf_[0], &in_buf_[0] + readsize);
631 return traits_type::not_eof(in_buf_[0]);
633 }
catch (
const StreamClosedException&) {
637 IBRCOMMON_LOGGER_DEBUG_TAG(
"StreamBuffer", 10) <<
"StreamClosedException in underflow()" << IBRCOMMON_LOGGER_ENDL;
639 }
catch (
const StreamErrorException &ex) {
643 IBRCOMMON_LOGGER_DEBUG_TAG(
"StreamBuffer", 10) <<
"StreamErrorException in underflow(): " << ex.what() << IBRCOMMON_LOGGER_ENDL;
646 }
catch (
const StreamShutdownException&) {
650 IBRCOMMON_LOGGER_DEBUG_TAG(
"StreamBuffer", 10) <<
"StreamShutdownException in underflow()" << IBRCOMMON_LOGGER_ENDL;
653 return traits_type::eof();
656 size_t StreamConnection::StreamBuffer::timeout(ibrcommon::Timer*)
662 throw ibrcommon::Timer::StopTimerException();
665 void StreamConnection::StreamBuffer::enableIdleTimeout(
const dtn::data::Timeout &seconds)
667 _idle_timer.set(seconds);
ibrcommon::socketstream * _conn
std::string toString() const