Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #include "ibrdtn/streams/StreamConnection.h"
00009 #include <ibrcommon/Logger.h>
00010 #include <ibrcommon/TimeMeasurement.h>
00011
00012 namespace dtn
00013 {
00014 namespace streams
00015 {
00016 StreamConnection::StreamBuffer::StreamBuffer(StreamConnection &conn, iostream &stream, const size_t buffer_size)
00017 : _buffer_size(buffer_size), _statebits(STREAM_SOB), _conn(conn), in_buf_(new char[buffer_size]), out_buf_(new char[buffer_size]), _stream(stream),
00018 _recv_size(0), _timer(*this, 0), _underflow_data_remain(0), _underflow_state(IDLE)
00019 {
00020
00021 setg(0, 0, 0);
00022 setp(out_buf_, out_buf_ + _buffer_size - 1);
00023 }
00024
00025 StreamConnection::StreamBuffer::~StreamBuffer()
00026 {
00027
00028 _timer.remove();
00029
00030
00031 delete [] in_buf_;
00032 delete [] out_buf_;
00033 }
00034
00035 bool StreamConnection::StreamBuffer::get(const StateBits bit) const
00036 {
00037 return (_statebits & bit);
00038 }
00039
00040 void StreamConnection::StreamBuffer::set(const StateBits bit)
00041 {
00042 ibrcommon::MutexLock l(_statelock);
00043 _statebits |= bit;
00044 }
00045
00046 void StreamConnection::StreamBuffer::unset(const StateBits bit)
00047 {
00048 ibrcommon::MutexLock l(_statelock);
00049 _statebits &= ~(bit);
00050 }
00051
00052 void StreamConnection::StreamBuffer::__error() const
00053 {
00054 IBRCOMMON_LOGGER_DEBUG(80) << "StreamBuffer Debugging" << IBRCOMMON_LOGGER_ENDL;
00055 IBRCOMMON_LOGGER_DEBUG(80) << "---------------------------------------" << IBRCOMMON_LOGGER_ENDL;
00056 IBRCOMMON_LOGGER_DEBUG(80) << "Buffer size: " << _buffer_size << IBRCOMMON_LOGGER_ENDL;
00057 IBRCOMMON_LOGGER_DEBUG(80) << "State bits: " << _statebits << IBRCOMMON_LOGGER_ENDL;
00058 IBRCOMMON_LOGGER_DEBUG(80) << "Recv size: " << _recv_size << IBRCOMMON_LOGGER_ENDL;
00059 IBRCOMMON_LOGGER_DEBUG(80) << "Timeout: " << _in_timeout << IBRCOMMON_LOGGER_ENDL;
00060 IBRCOMMON_LOGGER_DEBUG(80) << "Current timeout: " << _in_timeout_value << IBRCOMMON_LOGGER_ENDL;
00061 IBRCOMMON_LOGGER_DEBUG(80) << "Segments: " << _segments.size() << IBRCOMMON_LOGGER_ENDL;
00062 IBRCOMMON_LOGGER_DEBUG(80) << "Reject segments: " << _rejected_segments.size() << IBRCOMMON_LOGGER_ENDL;
00063 IBRCOMMON_LOGGER_DEBUG(80) << "Underflow remaining: " << _underflow_data_remain << IBRCOMMON_LOGGER_ENDL;
00064 IBRCOMMON_LOGGER_DEBUG(80) << "Underflow state: " << _underflow_state << IBRCOMMON_LOGGER_ENDL;
00065 IBRCOMMON_LOGGER_DEBUG(80) << "---------------------------------------" << IBRCOMMON_LOGGER_ENDL;
00066
00067 if (_statebits & STREAM_FAILED)
00068 {
00069 IBRCOMMON_LOGGER_DEBUG(80) << "stream went bad: STREAM_FAILED is set" << IBRCOMMON_LOGGER_ENDL;
00070 }
00071
00072 if (_statebits & STREAM_BAD)
00073 {
00074 IBRCOMMON_LOGGER_DEBUG(80) << "stream went bad: STREAM_BAD is set" << IBRCOMMON_LOGGER_ENDL;
00075 }
00076
00077 if (_statebits & STREAM_EOF)
00078 {
00079 IBRCOMMON_LOGGER_DEBUG(80) << "stream went bad: STREAM_EOF is set" << IBRCOMMON_LOGGER_ENDL;
00080 }
00081
00082 if (_statebits & STREAM_SHUTDOWN)
00083 {
00084 IBRCOMMON_LOGGER_DEBUG(80) << "stream went bad: STREAM_SHUTDOWN is set" << IBRCOMMON_LOGGER_ENDL;
00085 }
00086
00087 if (_statebits & STREAM_CLOSED)
00088 {
00089 IBRCOMMON_LOGGER_DEBUG(80) << "stream went bad: STREAM_CLOSED is set" << IBRCOMMON_LOGGER_ENDL;
00090 }
00091
00092 if (!_stream.good())
00093 {
00094 IBRCOMMON_LOGGER_DEBUG(80) << "stream went bad: good() returned false" << IBRCOMMON_LOGGER_ENDL;
00095 }
00096 }
00097
00098 bool StreamConnection::StreamBuffer::__good() const
00099 {
00100 int badbits = STREAM_FAILED + STREAM_BAD + STREAM_EOF + STREAM_SHUTDOWN + STREAM_CLOSED;
00101 return !(badbits & _statebits);
00102 }
00103
00112 const StreamContactHeader StreamConnection::StreamBuffer::handshake(const StreamContactHeader &header)
00113 {
00114 try {
00115
00116 {
00117 ibrcommon::MutexLock l(_sendlock);
00118
00119
00120 _stream << header << std::flush;
00121 }
00122
00123
00124 StreamContactHeader peer;
00125 _stream >> peer;
00126
00127
00128 if (peer._flags & StreamContactHeader::REQUEST_ACKNOWLEDGMENTS) set(STREAM_ACK_SUPPORT);
00129 if (peer._flags & StreamContactHeader::REQUEST_NEGATIVE_ACKNOWLEDGMENTS) set(STREAM_NACK_SUPPORT);
00130
00131
00132 if (peer._keepalive > 0)
00133 {
00134 set(STREAM_TIMER_SUPPORT);
00135
00136
00137 _in_timeout = header._keepalive * 2;
00138
00139
00140 ibrcommon::MutexLock timerl(_timer_lock);
00141 _in_timeout_value = _in_timeout;
00142 _timer.set(1);
00143
00144
00145 _timer.start();
00146 }
00147
00148
00149 set(STREAM_HANDSHAKE);
00150
00151
00152 return peer;
00153
00154 } catch (const std::exception&) {
00155
00156 set(STREAM_FAILED);
00157
00158
00159 shutdown(StreamDataSegment::MSG_SHUTDOWN_VERSION_MISSMATCH);
00160
00161
00162 _conn.shutdown(CONNECTION_SHUTDOWN_ERROR);
00163
00164
00165 throw StreamErrorException("handshake not completed");
00166 }
00167 }
00168
00174 void StreamConnection::StreamBuffer::shutdown(const StreamDataSegment::ShutdownReason reason)
00175 {
00176 try {
00177 ibrcommon::MutexLock l(_sendlock);
00178
00179 _stream << StreamDataSegment(reason) << std::flush;
00180 } catch (const std::exception&) {
00181
00182 set(STREAM_FAILED);
00183
00184 throw StreamErrorException("can not send shutdown message");
00185 }
00186 }
00187
00188 void StreamConnection::StreamBuffer::keepalive()
00189 {
00190 try {
00191 ibrcommon::MutexLock l(_sendlock);
00192 _stream << StreamDataSegment() << std::flush;
00193 IBRCOMMON_LOGGER_DEBUG(15) << "KEEPALIVE sent" << IBRCOMMON_LOGGER_ENDL;
00194 } catch (const std::exception&) {
00195
00196 set(STREAM_FAILED);
00197 }
00198 }
00199
00210 size_t StreamConnection::StreamBuffer::timeout(size_t)
00211 {
00212 size_t in_timeout_value = 0;
00213 {
00214 ibrcommon::MutexLock timerl(_timer_lock);
00215 _in_timeout_value--;
00216 in_timeout_value = _in_timeout_value;
00217 }
00218
00219 if (in_timeout_value <= 0)
00220 {
00221 IBRCOMMON_LOGGER_DEBUG(15) << "KEEPALIVE timeout reached -> shutdown connection" << IBRCOMMON_LOGGER_ENDL;
00222 _conn.shutdown(CONNECTION_SHUTDOWN_NODE_TIMEOUT);
00223 return 0;
00224 }
00225
00226 return 1;
00227 }
00228
00229 void StreamConnection::StreamBuffer::close()
00230 {
00231
00232 set(STREAM_SHUTDOWN);
00233 }
00234
00235 void StreamConnection::StreamBuffer::shutdowntimers()
00236 {
00237
00238 _timer.remove();
00239 }
00240
00241 void StreamConnection::StreamBuffer::reject()
00242 {
00243
00244
00245 set(STREAM_REJECT);
00246
00247
00248
00249 setg(0, 0, 0);
00250 }
00251
00252 void StreamConnection::StreamBuffer::abort()
00253 {
00254 _segments.abort();
00255 }
00256
00257 void StreamConnection::StreamBuffer::wait()
00258 {
00259
00260 size_t timeout = 0;
00261
00262 try {
00263 IBRCOMMON_LOGGER_DEBUG(15) << "waitCompleted(): wait for completion of transmission, " << _segments.size() << " ACKs left" << IBRCOMMON_LOGGER_ENDL;
00264 _segments.wait(ibrcommon::Queue<StreamDataSegment>::QUEUE_EMPTY, timeout);
00265 IBRCOMMON_LOGGER_DEBUG(15) << "waitCompleted(): transfer completed" << IBRCOMMON_LOGGER_ENDL;
00266 } catch (const ibrcommon::QueueUnblockedException&) {
00267 IBRCOMMON_LOGGER_DEBUG(15) << "waitCompleted(): transfer aborted (timeout)" << IBRCOMMON_LOGGER_ENDL;
00268 }
00269 }
00270
00271
00272
00273
00274 int StreamConnection::StreamBuffer::overflow(int c)
00275 {
00276 IBRCOMMON_LOGGER_DEBUG(90) << "StreamBuffer::overflow() called" << IBRCOMMON_LOGGER_ENDL;
00277
00278 try {
00279 char *ibegin = out_buf_;
00280 char *iend = pptr();
00281
00282
00283 setp(out_buf_, out_buf_ + _buffer_size - 1);
00284
00285
00286 if(!traits_type::eq_int_type(c, traits_type::eof())) {
00287 *iend++ = traits_type::to_char_type(c);
00288 }
00289
00290
00291 if ((iend - ibegin) == 0)
00292 {
00293 return traits_type::not_eof(c);
00294 }
00295
00296
00297 StreamDataSegment seg(StreamDataSegment::MSG_DATA_SEGMENT, (iend - ibegin));
00298
00299
00300 if (get(STREAM_SOB))
00301 {
00302 seg._flags |= StreamDataSegment::MSG_MARK_BEGINN;
00303 unset(STREAM_SKIP);
00304 unset(STREAM_SOB);
00305 }
00306
00307 if (char_traits<char>::eq_int_type(c, char_traits<char>::eof()))
00308 {
00309
00310 seg._flags |= StreamDataSegment::MSG_MARK_END;
00311 set(STREAM_SOB);
00312 }
00313
00314 if (!get(STREAM_SKIP))
00315 {
00316
00317 if (get(STREAM_ACK_SUPPORT))
00318 {
00319 _segments.push(seg);
00320 }
00321 else if (seg._flags & StreamDataSegment::MSG_MARK_END)
00322 {
00323
00324
00325 _conn.eventBundleForwarded();
00326 }
00327
00328 ibrcommon::MutexLock l(_sendlock);
00329 if (!_stream.good()) throw StreamErrorException("stream went bad");
00330
00331
00332 _stream << seg;
00333 _stream.write(out_buf_, seg._value);
00334 }
00335
00336 return traits_type::not_eof(c);
00337 } catch (const StreamClosedException&) {
00338
00339 set(STREAM_FAILED);
00340
00341 IBRCOMMON_LOGGER_DEBUG(10) << "StreamClosedException in overflow()" << IBRCOMMON_LOGGER_ENDL;
00342
00343 throw;
00344 } catch (const StreamErrorException&) {
00345
00346 set(STREAM_FAILED);
00347
00348 IBRCOMMON_LOGGER_DEBUG(10) << "StreamErrorException in overflow()" << IBRCOMMON_LOGGER_ENDL;
00349
00350 throw;
00351 } catch (const ios_base::failure&) {
00352
00353 set(STREAM_FAILED);
00354
00355 IBRCOMMON_LOGGER_DEBUG(10) << "ios_base::failure in overflow()" << IBRCOMMON_LOGGER_ENDL;
00356
00357 throw;
00358 }
00359
00360 return traits_type::eof();
00361 }
00362
00363
00364
00365 int StreamConnection::StreamBuffer::sync()
00366 {
00367 int ret = traits_type::eq_int_type(this->overflow(traits_type::eof()),
00368 traits_type::eof()) ? -1 : 0;
00369
00370 try {
00371 ibrcommon::MutexLock l(_sendlock);
00372
00373
00374 _stream.flush();
00375 } catch (const ios_base::failure&) {
00376
00377 set(STREAM_BAD);
00378
00379 _conn.shutdown(CONNECTION_SHUTDOWN_ERROR);
00380 }
00381
00382 return ret;
00383 }
00384
00385 void StreamConnection::StreamBuffer::skipData(size_t &size)
00386 {
00387
00388 char tmpbuf[_buffer_size];
00389
00390 try {
00391
00392 while (size > 0 && _stream.good())
00393 {
00394 size_t readsize = _buffer_size;
00395 if (size < _buffer_size) readsize = size;
00396
00397
00398 _stream.read(tmpbuf, readsize);
00399
00400
00401 size -= readsize;
00402
00403
00404 {
00405 ibrcommon::MutexLock timerl(_timer_lock);
00406 _in_timeout_value = _in_timeout;
00407 }
00408 }
00409 } catch (const ios_base::failure &ex) {
00410 _underflow_state = IDLE;
00411 throw StreamErrorException("read error during data skip: " + std::string(ex.what()));
00412 }
00413 }
00414
00415
00416 int StreamConnection::StreamBuffer::underflow()
00417 {
00418 IBRCOMMON_LOGGER_DEBUG(90) << "StreamBuffer::underflow() called" << IBRCOMMON_LOGGER_ENDL;
00419
00420 try {
00421 if (_underflow_state == DATA_TRANSFER)
00422 {
00423
00424 if (get(STREAM_REJECT))
00425 {
00426
00427 if (get(STREAM_NACK_SUPPORT))
00428 {
00429 ibrcommon::MutexLock l(_sendlock);
00430 if (!_stream.good()) throw StreamErrorException("stream went bad");
00431
00432
00433 _stream << StreamDataSegment(StreamDataSegment::MSG_REFUSE_BUNDLE) << std::flush;
00434 }
00435
00436
00437 skipData(_underflow_data_remain);
00438
00439
00440 _underflow_state = IDLE;
00441 }
00442
00443 else if (_underflow_data_remain == 0)
00444 {
00445
00446 if (get(STREAM_ACK_SUPPORT))
00447 {
00448 ibrcommon::MutexLock l(_sendlock);
00449 if (!_stream.good()) throw StreamErrorException("stream went bad");
00450 _stream << StreamDataSegment(StreamDataSegment::MSG_ACK_SEGMENT, _recv_size) << std::flush;
00451 }
00452
00453
00454 _underflow_state = IDLE;
00455 }
00456 }
00457
00458
00459 while (_underflow_state == IDLE)
00460 {
00461
00462 dtn::streams::StreamDataSegment seg;
00463
00464 try {
00465
00466 if (!_stream.good()) throw StreamErrorException("stream went bad");
00467
00468 _stream >> seg;
00469 } catch (const ios_base::failure &ex) {
00470 throw StreamErrorException("read error: " + std::string(ex.what()));
00471 }
00472
00473
00474 {
00475 ibrcommon::MutexLock timerl(_timer_lock);
00476 _in_timeout_value = _in_timeout;
00477 }
00478
00479 switch (seg._type)
00480 {
00481 case StreamDataSegment::MSG_DATA_SEGMENT:
00482 {
00483 IBRCOMMON_LOGGER_DEBUG(70) << "MSG_DATA_SEGMENT received, size: " << seg._value << IBRCOMMON_LOGGER_ENDL;
00484
00485 if (seg._flags & StreamDataSegment::MSG_MARK_BEGINN)
00486 {
00487 _recv_size = seg._value;
00488 unset(STREAM_REJECT);
00489 }
00490 else
00491 {
00492 _recv_size += seg._value;
00493 }
00494
00495
00496 _underflow_data_remain = seg._value;
00497
00498 if (get(STREAM_REJECT))
00499 {
00500
00501 if (get(STREAM_NACK_SUPPORT))
00502 {
00503
00504 ibrcommon::MutexLock l(_sendlock);
00505 if (!_stream.good()) throw StreamErrorException("stream went bad");
00506
00507
00508 _stream << StreamDataSegment(StreamDataSegment::MSG_REFUSE_BUNDLE, 0) << std::flush;
00509 }
00510
00511
00512 skipData(_underflow_data_remain);
00513 }
00514 else
00515 {
00516
00517 _underflow_state = DATA_TRANSFER;
00518 }
00519 break;
00520 }
00521
00522 case StreamDataSegment::MSG_ACK_SEGMENT:
00523 {
00524 IBRCOMMON_LOGGER_DEBUG(70) << "MSG_ACK_SEGMENT received, size: " << seg._value << IBRCOMMON_LOGGER_ENDL;
00525
00526
00527 if (get(STREAM_ACK_SUPPORT))
00528 {
00529 ibrcommon::Queue<StreamDataSegment>::Locked q = _segments.exclusive();
00530 if (q.empty())
00531 {
00532 IBRCOMMON_LOGGER(error) << "got an unexpected ACK with size of " << seg._value << IBRCOMMON_LOGGER_ENDL;
00533 }
00534 else
00535 {
00536 StreamDataSegment &qs = q.front();
00537
00538 if (qs._flags & StreamDataSegment::MSG_MARK_END)
00539 {
00540 _conn.eventBundleForwarded();
00541 }
00542
00543 IBRCOMMON_LOGGER_DEBUG(60) << q.size() << " elements to ACK" << IBRCOMMON_LOGGER_ENDL;
00544
00545 _conn.eventBundleAck(seg._value);
00546
00547 q.pop();
00548 }
00549 }
00550 break;
00551 }
00552
00553 case StreamDataSegment::MSG_KEEPALIVE:
00554 IBRCOMMON_LOGGER_DEBUG(70) << "MSG_KEEPALIVE received, size: " << seg._value << IBRCOMMON_LOGGER_ENDL;
00555 break;
00556
00557 case StreamDataSegment::MSG_REFUSE_BUNDLE:
00558 {
00559 IBRCOMMON_LOGGER_DEBUG(70) << "MSG_REFUSE_BUNDLE received, flags: " << seg._flags << IBRCOMMON_LOGGER_ENDL;
00560
00561
00562
00563
00564 if (get(STREAM_ACK_SUPPORT) && get(STREAM_NACK_SUPPORT))
00565 {
00566
00567 if (!_rejected_segments.empty())
00568 {
00569 _rejected_segments.pop();
00570
00571
00572 IBRCOMMON_LOGGER_DEBUG(30) << "NACK received, still " << _rejected_segments.size() << " segments to NACK" << IBRCOMMON_LOGGER_ENDL;
00573 }
00574 else try
00575 {
00576 StreamDataSegment qs = _segments.getnpop();
00577
00578
00579 IBRCOMMON_LOGGER_DEBUG(20) << "NACK received!" << IBRCOMMON_LOGGER_ENDL;
00580
00581
00582 while (!_segments.empty())
00583 {
00584 StreamDataSegment &seg = _segments.front();
00585 if (seg._flags & StreamDataSegment::MSG_MARK_BEGINN)
00586 {
00587 break;
00588 }
00589
00590
00591 _rejected_segments.push(seg);
00592 _segments.pop();
00593 }
00594
00595
00596 _conn.eventBundleRefused();
00597
00598
00599 IBRCOMMON_LOGGER_DEBUG(30) << _rejected_segments.size() << " segments to NACK" << IBRCOMMON_LOGGER_ENDL;
00600
00601
00602 if (_segments.empty())
00603 {
00604 set(STREAM_SKIP);
00605
00606
00607 IBRCOMMON_LOGGER_DEBUG(25) << "skip the current transfer" << IBRCOMMON_LOGGER_ENDL;
00608 }
00609
00610 } catch (const ibrcommon::QueueUnblockedException&) {
00611 IBRCOMMON_LOGGER(error) << "got an unexpected NACK" << IBRCOMMON_LOGGER_ENDL;
00612 }
00613
00614 }
00615 else
00616 {
00617 IBRCOMMON_LOGGER(error) << "got an unexpected NACK" << IBRCOMMON_LOGGER_ENDL;
00618 }
00619
00620 break;
00621 }
00622
00623 case StreamDataSegment::MSG_SHUTDOWN:
00624 {
00625 IBRCOMMON_LOGGER_DEBUG(70) << "MSG_SHUTDOWN received" << IBRCOMMON_LOGGER_ENDL;
00626 throw StreamShutdownException();
00627 }
00628 }
00629 }
00630
00631
00632 size_t readsize = _buffer_size;
00633 if (_underflow_data_remain < _buffer_size) readsize = _underflow_data_remain;
00634
00635 try {
00636 if (!_stream.good()) throw StreamErrorException("stream went bad");
00637
00638
00639 _stream.read(in_buf_, readsize);
00640 } catch (const ios_base::failure &ex) {
00641 _underflow_state = IDLE;
00642 throw StreamErrorException("read error: " + std::string(ex.what()));
00643 }
00644
00645
00646 _underflow_data_remain -= readsize;
00647
00648
00649
00650 setg(in_buf_, in_buf_, in_buf_ + readsize);
00651
00652 return traits_type::not_eof(in_buf_[0]);
00653
00654 } catch (const StreamClosedException&) {
00655
00656 set(STREAM_FAILED);
00657
00658 IBRCOMMON_LOGGER_DEBUG(10) << "StreamClosedException in underflow()" << IBRCOMMON_LOGGER_ENDL;
00659
00660 } catch (const StreamErrorException &ex) {
00661
00662 set(STREAM_FAILED);
00663
00664 IBRCOMMON_LOGGER_DEBUG(10) << "StreamErrorException in underflow(): " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00665
00666 throw;
00667 } catch (const StreamShutdownException&) {
00668
00669 set(STREAM_FAILED);
00670
00671 IBRCOMMON_LOGGER_DEBUG(10) << "StreamShutdownException in underflow()" << IBRCOMMON_LOGGER_ENDL;
00672 }
00673
00674 return traits_type::eof();
00675 }
00676 }
00677 }