Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #include "ibrdtn/streams/StreamConnection.h"
00009 #include <ibrcommon/Logger.h>
00010 #include <ibrcommon/TimeMeasurement.h>
00011
00012 namespace dtn
00013 {
00014 namespace streams
00015 {
00016 StreamConnection::StreamBuffer::StreamBuffer(StreamConnection &conn, iostream &stream, const size_t buffer_size)
00017 : _buffer_size(buffer_size), _statebits(STREAM_SOB), _conn(conn), in_buf_(new char[buffer_size]), out_buf_(new char[buffer_size]), _stream(stream),
00018 _recv_size(0), _timer(*this, 0), _underflow_data_remain(0), _underflow_state(IDLE)
00019 {
00020
00021 setg(0, 0, 0);
00022 setp(out_buf_, out_buf_ + _buffer_size - 1);
00023 }
00024
00025 StreamConnection::StreamBuffer::~StreamBuffer()
00026 {
00027
00028 _timer.remove();
00029
00030
00031 delete [] in_buf_;
00032 delete [] out_buf_;
00033 }
00034
00035 bool StreamConnection::StreamBuffer::get(const StateBits bit) const
00036 {
00037 return (_statebits & bit);
00038 }
00039
00040 void StreamConnection::StreamBuffer::set(const StateBits bit)
00041 {
00042 ibrcommon::MutexLock l(_statelock);
00043 _statebits |= bit;
00044 }
00045
00046 void StreamConnection::StreamBuffer::unset(const StateBits bit)
00047 {
00048 ibrcommon::MutexLock l(_statelock);
00049 _statebits &= ~(bit);
00050 }
00051
00052 void StreamConnection::StreamBuffer::__error() const
00053 {
00054 IBRCOMMON_LOGGER_DEBUG(80) << "StreamBuffer Debugging" << IBRCOMMON_LOGGER_ENDL;
00055 IBRCOMMON_LOGGER_DEBUG(80) << "---------------------------------------" << IBRCOMMON_LOGGER_ENDL;
00056 IBRCOMMON_LOGGER_DEBUG(80) << "Buffer size: " << _buffer_size << IBRCOMMON_LOGGER_ENDL;
00057 IBRCOMMON_LOGGER_DEBUG(80) << "State bits: " << _statebits << IBRCOMMON_LOGGER_ENDL;
00058 IBRCOMMON_LOGGER_DEBUG(80) << "Recv size: " << _recv_size << IBRCOMMON_LOGGER_ENDL;
00059 IBRCOMMON_LOGGER_DEBUG(80) << "Timeout: " << _in_timeout << IBRCOMMON_LOGGER_ENDL;
00060 IBRCOMMON_LOGGER_DEBUG(80) << "Current timeout: " << _in_timeout_value << IBRCOMMON_LOGGER_ENDL;
00061 IBRCOMMON_LOGGER_DEBUG(80) << "Segments: " << _segments.size() << IBRCOMMON_LOGGER_ENDL;
00062 IBRCOMMON_LOGGER_DEBUG(80) << "Reject segments: " << _rejected_segments.size() << IBRCOMMON_LOGGER_ENDL;
00063 IBRCOMMON_LOGGER_DEBUG(80) << "Underflow remaining: " << _underflow_data_remain << IBRCOMMON_LOGGER_ENDL;
00064 IBRCOMMON_LOGGER_DEBUG(80) << "Underflow state: " << _underflow_state << IBRCOMMON_LOGGER_ENDL;
00065 IBRCOMMON_LOGGER_DEBUG(80) << "---------------------------------------" << IBRCOMMON_LOGGER_ENDL;
00066
00067 if (_statebits & STREAM_FAILED)
00068 {
00069 IBRCOMMON_LOGGER_DEBUG(80) << "stream went bad: STREAM_FAILED is set" << IBRCOMMON_LOGGER_ENDL;
00070 }
00071
00072 if (_statebits & STREAM_BAD)
00073 {
00074 IBRCOMMON_LOGGER_DEBUG(80) << "stream went bad: STREAM_BAD is set" << IBRCOMMON_LOGGER_ENDL;
00075 }
00076
00077 if (_statebits & STREAM_EOF)
00078 {
00079 IBRCOMMON_LOGGER_DEBUG(80) << "stream went bad: STREAM_EOF is set" << IBRCOMMON_LOGGER_ENDL;
00080 }
00081
00082 if (_statebits & STREAM_SHUTDOWN)
00083 {
00084 IBRCOMMON_LOGGER_DEBUG(80) << "stream went bad: STREAM_SHUTDOWN is set" << IBRCOMMON_LOGGER_ENDL;
00085 }
00086
00087 if (_statebits & STREAM_CLOSED)
00088 {
00089 IBRCOMMON_LOGGER_DEBUG(80) << "stream went bad: STREAM_CLOSED is set" << IBRCOMMON_LOGGER_ENDL;
00090 }
00091
00092 if (!_stream.good())
00093 {
00094 IBRCOMMON_LOGGER_DEBUG(80) << "stream went bad: good() returned false" << IBRCOMMON_LOGGER_ENDL;
00095 }
00096 }
00097
00098 bool StreamConnection::StreamBuffer::__good() const
00099 {
00100 int badbits = STREAM_FAILED + STREAM_BAD + STREAM_EOF + STREAM_SHUTDOWN + STREAM_CLOSED;
00101 return !(badbits & _statebits);
00102 }
00103
00112 const StreamContactHeader StreamConnection::StreamBuffer::handshake(const StreamContactHeader &header)
00113 {
00114 try {
00115
00116 {
00117 ibrcommon::MutexLock l(_sendlock);
00118
00119
00120 _stream << header << std::flush;
00121 }
00122
00123
00124 StreamContactHeader peer;
00125 _stream >> peer;
00126
00127
00128 if (peer._flags & StreamContactHeader::REQUEST_ACKNOWLEDGMENTS) set(STREAM_ACK_SUPPORT);
00129 if (peer._flags & StreamContactHeader::REQUEST_NEGATIVE_ACKNOWLEDGMENTS) set(STREAM_NACK_SUPPORT);
00130
00131
00132 if (peer._keepalive > 0)
00133 {
00134 set(STREAM_TIMER_SUPPORT);
00135
00136
00137 _in_timeout = header._keepalive * 2;
00138
00139
00140 ibrcommon::MutexLock timerl(_timer_lock);
00141 _in_timeout_value = _in_timeout;
00142 _timer.set(1);
00143
00144
00145 _timer.start();
00146 }
00147
00148
00149 set(STREAM_HANDSHAKE);
00150
00151
00152 return peer;
00153
00154 } catch (std::exception) {
00155
00156 set(STREAM_FAILED);
00157
00158
00159 shutdown(StreamDataSegment::MSG_SHUTDOWN_VERSION_MISSMATCH);
00160
00161
00162 _conn.shutdown(CONNECTION_SHUTDOWN_ERROR);
00163
00164
00165 throw StreamErrorException("handshake not completed");
00166 }
00167 }
00168
00174 void StreamConnection::StreamBuffer::shutdown(const StreamDataSegment::ShutdownReason reason)
00175 {
00176 try {
00177 ibrcommon::MutexLock l(_sendlock);
00178
00179 _stream << StreamDataSegment(reason) << std::flush;
00180 } catch (std::exception) {
00181
00182 set(STREAM_FAILED);
00183
00184 throw StreamErrorException("can not send shutdown message");
00185 }
00186 }
00187
00188 void StreamConnection::StreamBuffer::keepalive()
00189 {
00190 try {
00191 ibrcommon::MutexLock l(_sendlock);
00192 _stream << StreamDataSegment() << std::flush;
00193 IBRCOMMON_LOGGER_DEBUG(15) << "KEEPALIVE sent" << IBRCOMMON_LOGGER_ENDL;
00194 } catch (std::exception) {
00195
00196 set(STREAM_FAILED);
00197 }
00198 }
00199
00210 size_t StreamConnection::StreamBuffer::timeout(size_t)
00211 {
00212 size_t in_timeout_value = 0;
00213 {
00214 ibrcommon::MutexLock timerl(_timer_lock);
00215 _in_timeout_value--;
00216 in_timeout_value = _in_timeout_value;
00217 }
00218
00219 if (in_timeout_value <= 0)
00220 {
00221 IBRCOMMON_LOGGER_DEBUG(15) << "KEEPALIVE timeout reached -> shutdown connection" << IBRCOMMON_LOGGER_ENDL;
00222 _conn.shutdown(CONNECTION_SHUTDOWN_NODE_TIMEOUT);
00223 return 0;
00224 }
00225
00226 return 1;
00227 }
00228
00229 void StreamConnection::StreamBuffer::close()
00230 {
00231
00232 set(STREAM_SHUTDOWN);
00233 }
00234
00235 void StreamConnection::StreamBuffer::shutdowntimers()
00236 {
00237
00238 _timer.remove();
00239 }
00240
00241 void StreamConnection::StreamBuffer::reject()
00242 {
00243
00244
00245 set(STREAM_REJECT);
00246
00247
00248
00249 setg(0, 0, 0);
00250 }
00251
00252 void StreamConnection::StreamBuffer::abort()
00253 {
00254 _segments.abort();
00255 }
00256
00257 void StreamConnection::StreamBuffer::wait()
00258 {
00259
00260 size_t timeout = 0;
00261
00262 try {
00263 IBRCOMMON_LOGGER_DEBUG(15) << "waitCompleted(): wait for completion of transmission, " << _segments.size() << " ACKs left" << IBRCOMMON_LOGGER_ENDL;
00264 _segments.wait(ibrcommon::Queue<StreamDataSegment>::QUEUE_EMPTY, timeout);
00265 IBRCOMMON_LOGGER_DEBUG(15) << "waitCompleted(): transfer completed" << IBRCOMMON_LOGGER_ENDL;
00266 } catch (ibrcommon::QueueUnblockedException) {
00267 IBRCOMMON_LOGGER_DEBUG(15) << "waitCompleted(): transfer aborted (timeout)" << IBRCOMMON_LOGGER_ENDL;
00268 }
00269 }
00270
00271
00272
00273
00274 int StreamConnection::StreamBuffer::overflow(int c)
00275 {
00276 IBRCOMMON_LOGGER_DEBUG(90) << "StreamBuffer::overflow() called" << IBRCOMMON_LOGGER_ENDL;
00277
00278 try {
00279 char *ibegin = out_buf_;
00280 char *iend = pptr();
00281
00282
00283 setp(out_buf_, out_buf_ + _buffer_size - 1);
00284
00285
00286 if(!traits_type::eq_int_type(c, traits_type::eof())) {
00287 *iend++ = traits_type::to_char_type(c);
00288 }
00289
00290
00291 if ((iend - ibegin) == 0)
00292 {
00293 return traits_type::not_eof(c);
00294 }
00295
00296
00297 StreamDataSegment seg(StreamDataSegment::MSG_DATA_SEGMENT, (iend - ibegin));
00298
00299
00300 if (get(STREAM_SOB))
00301 {
00302 seg._flags |= StreamDataSegment::MSG_MARK_BEGINN;
00303 unset(STREAM_SKIP);
00304 unset(STREAM_SOB);
00305 }
00306
00307 if (char_traits<char>::eq_int_type(c, char_traits<char>::eof()))
00308 {
00309
00310 seg._flags |= StreamDataSegment::MSG_MARK_END;
00311 set(STREAM_SOB);
00312 }
00313
00314 if (!get(STREAM_SKIP))
00315 {
00316
00317 if (get(STREAM_ACK_SUPPORT))
00318 {
00319 _segments.push(seg);
00320 }
00321 else if (seg._flags & StreamDataSegment::MSG_MARK_END)
00322 {
00323
00324
00325 _conn.eventBundleForwarded();
00326 }
00327
00328 ibrcommon::MutexLock l(_sendlock);
00329 if (!_stream.good()) throw StreamErrorException("stream went bad");
00330
00331
00332 _stream << seg;
00333 _stream.write(out_buf_, seg._value);
00334 }
00335
00336 return traits_type::not_eof(c);
00337 } catch (StreamClosedException ex) {
00338
00339 set(STREAM_FAILED);
00340
00341 IBRCOMMON_LOGGER_DEBUG(10) << "StreamClosedException in overflow()" << IBRCOMMON_LOGGER_ENDL;
00342
00343 throw;
00344 } catch (StreamErrorException ex) {
00345
00346 set(STREAM_FAILED);
00347
00348 IBRCOMMON_LOGGER_DEBUG(10) << "StreamErrorException in overflow()" << IBRCOMMON_LOGGER_ENDL;
00349
00350 throw;
00351 } catch (ios_base::failure ex) {
00352
00353 set(STREAM_FAILED);
00354
00355 IBRCOMMON_LOGGER_DEBUG(10) << "ios_base::failure in overflow()" << IBRCOMMON_LOGGER_ENDL;
00356
00357 throw;
00358 }
00359
00360 return traits_type::eof();
00361 }
00362
00363
00364
00365 int StreamConnection::StreamBuffer::sync()
00366 {
00367 int ret = traits_type::eq_int_type(this->overflow(traits_type::eof()),
00368 traits_type::eof()) ? -1 : 0;
00369
00370 try {
00371 ibrcommon::MutexLock l(_sendlock);
00372
00373
00374 _stream.flush();
00375 } catch (ios_base::failure ex) {
00376
00377 set(STREAM_BAD);
00378
00379 _conn.shutdown(CONNECTION_SHUTDOWN_ERROR);
00380 }
00381
00382 return ret;
00383 }
00384
00385 void StreamConnection::StreamBuffer::skipData(size_t &size)
00386 {
00387
00388 char tmpbuf[_buffer_size];
00389
00390 try {
00391
00392 while (size > 0 && _stream.good())
00393 {
00394 size_t readsize = _buffer_size;
00395 if (size < _buffer_size) readsize = size;
00396
00397
00398 _stream.read(tmpbuf, readsize);
00399
00400
00401 size -= readsize;
00402
00403
00404 {
00405 ibrcommon::MutexLock timerl(_timer_lock);
00406 _in_timeout_value = _in_timeout;
00407 }
00408 }
00409 } catch (ios_base::failure ex) {
00410 _underflow_state = IDLE;
00411 throw StreamErrorException("read error during data skip: " + std::string(ex.what()));
00412 }
00413 }
00414
00415
00416 int StreamConnection::StreamBuffer::underflow()
00417 {
00418 IBRCOMMON_LOGGER_DEBUG(90) << "StreamBuffer::underflow() called" << IBRCOMMON_LOGGER_ENDL;
00419
00420 try {
00421 if (_underflow_state == DATA_TRANSFER)
00422 {
00423
00424 if (get(STREAM_REJECT))
00425 {
00426
00427 {
00428 ibrcommon::MutexLock l(_sendlock);
00429 if (!_stream.good()) throw StreamErrorException("stream went bad");
00430
00431
00432 _stream << StreamDataSegment(StreamDataSegment::MSG_REFUSE_BUNDLE) << std::flush;
00433 }
00434
00435
00436 skipData(_underflow_data_remain);
00437
00438
00439 _underflow_state = IDLE;
00440 }
00441
00442 else if (_underflow_data_remain == 0)
00443 {
00444
00445 if (get(STREAM_ACK_SUPPORT))
00446 {
00447 ibrcommon::MutexLock l(_sendlock);
00448 if (!_stream.good()) throw StreamErrorException("stream went bad");
00449 _stream << StreamDataSegment(StreamDataSegment::MSG_ACK_SEGMENT, _recv_size) << std::flush;
00450 }
00451
00452
00453 _underflow_state = IDLE;
00454 }
00455 }
00456
00457
00458 while (_underflow_state == IDLE)
00459 {
00460
00461 dtn::streams::StreamDataSegment seg;
00462
00463 try {
00464
00465 if (!_stream.good()) throw StreamErrorException("stream went bad");
00466
00467 _stream >> seg;
00468 } catch (const ios_base::failure &ex) {
00469 throw StreamErrorException("read error: " + std::string(ex.what()));
00470 }
00471
00472
00473 {
00474 ibrcommon::MutexLock timerl(_timer_lock);
00475 _in_timeout_value = _in_timeout;
00476 }
00477
00478 switch (seg._type)
00479 {
00480 case StreamDataSegment::MSG_DATA_SEGMENT:
00481 {
00482 IBRCOMMON_LOGGER_DEBUG(70) << "MSG_DATA_SEGMENT received, size: " << seg._value << IBRCOMMON_LOGGER_ENDL;
00483
00484 if (seg._flags & StreamDataSegment::MSG_MARK_BEGINN)
00485 {
00486 _recv_size = seg._value;
00487 unset(STREAM_REJECT);
00488 }
00489 else
00490 {
00491 _recv_size += seg._value;
00492 }
00493
00494
00495 _underflow_data_remain = seg._value;
00496
00497 if (get(STREAM_REJECT))
00498 {
00499
00500 {
00501
00502 ibrcommon::MutexLock l(_sendlock);
00503 if (!_stream.good()) throw StreamErrorException("stream went bad");
00504
00505
00506 _stream << StreamDataSegment(StreamDataSegment::MSG_REFUSE_BUNDLE, 0) << std::flush;
00507 }
00508
00509
00510 skipData(_underflow_data_remain);
00511 }
00512 else
00513 {
00514
00515 _underflow_state = DATA_TRANSFER;
00516 }
00517 break;
00518 }
00519
00520 case StreamDataSegment::MSG_ACK_SEGMENT:
00521 {
00522 IBRCOMMON_LOGGER_DEBUG(70) << "MSG_ACK_SEGMENT received, size: " << seg._value << IBRCOMMON_LOGGER_ENDL;
00523
00524
00525 if (get(STREAM_ACK_SUPPORT))
00526 {
00527 ibrcommon::Queue<StreamDataSegment>::Locked q = _segments.exclusive();
00528 if (q.empty())
00529 {
00530 IBRCOMMON_LOGGER(error) << "got an unexpected ACK with size of " << seg._value << IBRCOMMON_LOGGER_ENDL;
00531 }
00532 else
00533 {
00534 StreamDataSegment &qs = q.front();
00535
00536 if (qs._flags & StreamDataSegment::MSG_MARK_END)
00537 {
00538 _conn.eventBundleForwarded();
00539 }
00540
00541 IBRCOMMON_LOGGER_DEBUG(60) << q.size() << " elements to ACK" << IBRCOMMON_LOGGER_ENDL;
00542
00543 _conn.eventBundleAck(seg._value);
00544
00545 q.pop();
00546 }
00547 }
00548 break;
00549 }
00550
00551 case StreamDataSegment::MSG_KEEPALIVE:
00552 IBRCOMMON_LOGGER_DEBUG(70) << "MSG_KEEPALIVE received, size: " << seg._value << IBRCOMMON_LOGGER_ENDL;
00553 break;
00554
00555 case StreamDataSegment::MSG_REFUSE_BUNDLE:
00556 {
00557 IBRCOMMON_LOGGER_DEBUG(70) << "MSG_REFUSE_BUNDLE received, flags: " << seg._flags << IBRCOMMON_LOGGER_ENDL;
00558
00559
00560
00561
00562 if (get(STREAM_ACK_SUPPORT) && get(STREAM_NACK_SUPPORT))
00563 {
00564
00565 if (!_rejected_segments.empty())
00566 {
00567 _rejected_segments.pop();
00568
00569
00570 IBRCOMMON_LOGGER_DEBUG(30) << "NACK received, still " << _rejected_segments.size() << " segments to NACK" << IBRCOMMON_LOGGER_ENDL;
00571 }
00572 else try
00573 {
00574 StreamDataSegment qs = _segments.getnpop();
00575
00576
00577 IBRCOMMON_LOGGER_DEBUG(20) << "NACK received!" << IBRCOMMON_LOGGER_ENDL;
00578
00579
00580 while (!_segments.empty())
00581 {
00582 StreamDataSegment &seg = _segments.front();
00583 if (seg._flags & StreamDataSegment::MSG_MARK_BEGINN)
00584 {
00585 break;
00586 }
00587
00588
00589 _rejected_segments.push(seg);
00590 _segments.pop();
00591 }
00592
00593
00594 _conn.eventBundleRefused();
00595
00596
00597 IBRCOMMON_LOGGER_DEBUG(30) << _rejected_segments.size() << " segments to NACK" << IBRCOMMON_LOGGER_ENDL;
00598
00599
00600 if (_segments.empty())
00601 {
00602 set(STREAM_SKIP);
00603
00604
00605 IBRCOMMON_LOGGER_DEBUG(25) << "skip the current transfer" << IBRCOMMON_LOGGER_ENDL;
00606 }
00607
00608 } catch (ibrcommon::QueueUnblockedException) {
00609 IBRCOMMON_LOGGER(error) << "got an unexpected NACK" << IBRCOMMON_LOGGER_ENDL;
00610 }
00611
00612 }
00613 else
00614 {
00615 IBRCOMMON_LOGGER(error) << "got an unexpected NACK" << IBRCOMMON_LOGGER_ENDL;
00616 }
00617
00618 break;
00619 }
00620
00621 case StreamDataSegment::MSG_SHUTDOWN:
00622 {
00623 IBRCOMMON_LOGGER_DEBUG(70) << "MSG_SHUTDOWN received" << IBRCOMMON_LOGGER_ENDL;
00624 throw StreamShutdownException();
00625 }
00626 }
00627 }
00628
00629
00630 size_t readsize = _buffer_size;
00631 if (_underflow_data_remain < _buffer_size) readsize = _underflow_data_remain;
00632
00633 try {
00634 if (!_stream.good()) throw StreamErrorException("stream went bad");
00635
00636
00637 _stream.read(in_buf_, readsize);
00638 } catch (const ios_base::failure &ex) {
00639 _underflow_state = IDLE;
00640 throw StreamErrorException("read error: " + std::string(ex.what()));
00641 }
00642
00643
00644 _underflow_data_remain -= readsize;
00645
00646
00647
00648 setg(in_buf_, in_buf_, in_buf_ + readsize);
00649
00650 return traits_type::not_eof(in_buf_[0]);
00651
00652 } catch (StreamClosedException ex) {
00653
00654 set(STREAM_FAILED);
00655
00656 IBRCOMMON_LOGGER_DEBUG(10) << "StreamClosedException in underflow()" << IBRCOMMON_LOGGER_ENDL;
00657 _conn.shutdown(CONNECTION_SHUTDOWN_ERROR);
00658
00659 } catch (StreamErrorException ex) {
00660
00661 set(STREAM_FAILED);
00662
00663 IBRCOMMON_LOGGER_DEBUG(10) << "StreamErrorException in underflow(): " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00664
00665 throw;
00666 } catch (StreamShutdownException ex) {
00667
00668 set(STREAM_FAILED);
00669
00670 IBRCOMMON_LOGGER_DEBUG(10) << "StreamShutdownException in underflow()" << IBRCOMMON_LOGGER_ENDL;
00671 _conn.shutdown(CONNECTION_SHUTDOWN_PEER_SHUTDOWN);
00672 }
00673
00674 return traits_type::eof();
00675 }
00676 }
00677 }