00001
00002
00003
00004
00005
00006
00007
00008 #include "ibrdtn/streams/StreamConnection.h"
00009 #include <ibrcommon/Logger.h>
00010
00011 namespace dtn
00012 {
00013 namespace streams
00014 {
00015 StreamConnection::StreamBuffer::StreamBuffer(StreamConnection &conn, iostream &stream)
00016 : _statebits(STREAM_SOB), _conn(conn), in_buf_(new char[BUFF_SIZE]), out_buf_(new char[BUFF_SIZE]), _stream(stream),
00017 _recv_size(0), _timer(*this, 0), _underflow_data_remain(0), _underflow_state(IDLE)
00018 {
00019
00020 setg(0, 0, 0);
00021 setp(out_buf_, out_buf_ + BUFF_SIZE - 1);
00022 }
00023
00024 StreamConnection::StreamBuffer::~StreamBuffer()
00025 {
00026
00027 _timer.remove();
00028
00029
00030 delete [] in_buf_;
00031 delete [] out_buf_;
00032 }
00033
00034 bool StreamConnection::StreamBuffer::get(const StateBits bit) const
00035 {
00036 return (_statebits & bit);
00037 }
00038
00039 void StreamConnection::StreamBuffer::set(const StateBits bit)
00040 {
00041 ibrcommon::MutexLock l(_statelock);
00042 _statebits |= bit;
00043 }
00044
00045 void StreamConnection::StreamBuffer::unset(const StateBits bit)
00046 {
00047 ibrcommon::MutexLock l(_statelock);
00048 _statebits &= ~(bit);
00049 }
00050
00051 bool StreamConnection::StreamBuffer::good() const
00052 {
00053 int badbits = STREAM_FAILED + STREAM_BAD + STREAM_EOF + STREAM_SHUTDOWN + STREAM_CLOSED;
00054 return !(badbits & _statebits) && _stream.good();
00055 }
00056
00065 const StreamContactHeader StreamConnection::StreamBuffer::handshake(const StreamContactHeader &header)
00066 {
00067
00068 _stream.exceptions(std::ios::badbit | std::ios::eofbit | std::ios::failbit);
00069
00070 try {
00071 ibrcommon::MutexLock l(_sendlock);
00072
00073
00074 _stream << header << std::flush;
00075
00076
00077 StreamContactHeader peer;
00078 _stream >> peer;
00079
00080
00081 if (peer._flags & StreamContactHeader::REQUEST_ACKNOWLEDGMENTS) set(STREAM_ACK_SUPPORT);
00082 if (peer._flags & StreamContactHeader::REQUEST_NEGATIVE_ACKNOWLEDGMENTS) set(STREAM_NACK_SUPPORT);
00083
00084
00085 if (peer._keepalive > 0)
00086 {
00087 set(STREAM_TIMER_SUPPORT);
00088
00089
00090 _in_timeout = header._keepalive * 2;
00091 _out_timeout = peer._keepalive - 2;
00092
00093
00094 ibrcommon::MutexLock timerl(_timer_lock);
00095 _in_timeout_value = _in_timeout;
00096 _out_timeout_value = _out_timeout;
00097 _timer.set(1);
00098
00099
00100 _timer.start();
00101 }
00102
00103
00104 set(STREAM_HANDSHAKE);
00105
00106
00107 return peer;
00108
00109 } catch (...) {
00110
00111
00112 set(STREAM_FAILED);
00113
00114
00115 shutdown(StreamDataSegment::MSG_SHUTDOWN_VERSION_MISSMATCH);
00116
00117
00118 _conn.shutdown(CONNECTION_SHUTDOWN_ERROR);
00119
00120
00121 throw StreamErrorException("handshake not completed");
00122 }
00123 }
00124
00130 void StreamConnection::StreamBuffer::shutdown(const StreamDataSegment::ShutdownReason reason)
00131 {
00132 try {
00133 ibrcommon::MutexLock l(_sendlock);
00134
00135 _stream << StreamDataSegment(reason) << std::flush;
00136 } catch (...) {
00137
00138 set(STREAM_FAILED);
00139
00140 throw StreamErrorException("can not send shutdown message");
00141 }
00142 }
00143
00154 size_t StreamConnection::StreamBuffer::timeout(size_t)
00155 {
00156 ibrcommon::MutexLock timerl(_timer_lock);
00157
00158 _out_timeout_value--;
00159 _in_timeout_value--;
00160
00161 if (_out_timeout_value <= 0)
00162 {
00163 try {
00164 ibrcommon::MutexLock l(_sendlock);
00165 _stream << StreamDataSegment() << std::flush;
00166 IBRCOMMON_LOGGER_DEBUG(15) << "KEEPALIVE sent" << IBRCOMMON_LOGGER_ENDL;
00167 } catch (...) {
00168
00169 set(STREAM_FAILED);
00170 return 0;
00171 }
00172
00173 _out_timeout_value = _out_timeout;
00174 }
00175
00176 if (_in_timeout_value <= 0)
00177 {
00178 IBRCOMMON_LOGGER_DEBUG(15) << "KEEPALIVE timeout reached -> shutdown connection" << IBRCOMMON_LOGGER_ENDL;
00179 _conn.shutdown(CONNECTION_SHUTDOWN_IDLE);
00180 return 0;
00181 }
00182
00183 return 1;
00184 }
00185
00186 void StreamConnection::StreamBuffer::close()
00187 {
00188
00189 set(STREAM_SHUTDOWN);
00190 }
00191
00192 void StreamConnection::StreamBuffer::reject()
00193 {
00194
00195
00196 set(STREAM_REJECT);
00197
00198
00199
00200 setg(0, 0, 0);
00201 }
00202
00203 bool StreamConnection::StreamBuffer::isCompleted()
00204 {
00205 size_t size = _segments.size();
00206 IBRCOMMON_LOGGER_DEBUG(15) << size << " segments to confirm" << IBRCOMMON_LOGGER_ENDL;
00207 return (size == 0);
00208 }
00209
00210
00211
00212
00213 int StreamConnection::StreamBuffer::overflow(int c)
00214 {
00215
00216 if (!good()) throw StreamErrorException();
00217
00218
00219 ibrcommon::MutexLock l(_overflow_mutex);
00220
00221 try {
00222 char *ibegin = out_buf_;
00223 char *iend = pptr();
00224
00225
00226 setp(out_buf_, out_buf_ + BUFF_SIZE - 1);
00227
00228
00229 if(!traits_type::eq_int_type(c, traits_type::eof())) {
00230 *iend++ = traits_type::to_char_type(c);
00231 }
00232
00233
00234 if ((iend - ibegin) == 0)
00235 {
00236 return traits_type::not_eof(c);
00237 }
00238
00239
00240 StreamDataSegment seg(StreamDataSegment::MSG_DATA_SEGMENT, (iend - ibegin));
00241
00242
00243 if (get(STREAM_SOB))
00244 {
00245 seg._flags |= StreamDataSegment::MSG_MARK_BEGINN;
00246 unset(STREAM_SKIP);
00247 unset(STREAM_SOB);
00248 }
00249
00250 if (char_traits<char>::eq_int_type(c, char_traits<char>::eof()))
00251 {
00252
00253 seg._flags |= StreamDataSegment::MSG_MARK_END;
00254 set(STREAM_SOB);
00255 }
00256
00257 if (!get(STREAM_SKIP))
00258 {
00259 ibrcommon::MutexLock l(_sendlock);
00260
00261
00262 if (get(STREAM_ACK_SUPPORT))
00263 {
00264 _segments.push(seg);
00265 }
00266 else if (seg._flags & StreamDataSegment::MSG_MARK_END)
00267 {
00268
00269
00270 _conn.eventBundleForwarded();
00271 }
00272
00273
00274 _stream << seg;
00275 _stream.write(out_buf_, seg._value);
00276 }
00277
00278 return traits_type::not_eof(c);
00279 } catch (StreamClosedException ex) {
00280
00281 set(STREAM_FAILED);
00282
00283 IBRCOMMON_LOGGER_DEBUG(10) << "StreamClosedException in overflow()" << IBRCOMMON_LOGGER_ENDL;
00284 _conn.shutdown(CONNECTION_SHUTDOWN_ERROR);
00285 } catch (StreamErrorException ex) {
00286
00287 set(STREAM_FAILED);
00288
00289 IBRCOMMON_LOGGER_DEBUG(10) << "StreamErrorException in overflow()" << IBRCOMMON_LOGGER_ENDL;
00290 _conn.shutdown(CONNECTION_SHUTDOWN_ERROR);
00291 } catch (ios_base::failure ex) {
00292
00293 set(STREAM_FAILED);
00294
00295 IBRCOMMON_LOGGER_DEBUG(10) << "ios_base::failure in overflow()" << IBRCOMMON_LOGGER_ENDL;
00296 _conn.shutdown(CONNECTION_SHUTDOWN_ERROR);
00297 }
00298
00299 return traits_type::eof();
00300 }
00301
00302
00303
00304 int StreamConnection::StreamBuffer::sync()
00305 {
00306 int ret = traits_type::eq_int_type(this->overflow(traits_type::eof()),
00307 traits_type::eof()) ? -1 : 0;
00308
00309 try {
00310 ibrcommon::MutexLock l(_sendlock);
00311
00312
00313 _stream.flush();
00314 } catch (ios_base::failure ex) {
00315
00316 set(STREAM_BAD);
00317
00318 _conn.shutdown(CONNECTION_SHUTDOWN_ERROR);
00319 }
00320
00321 return ret;
00322 }
00323
00324 void StreamConnection::StreamBuffer::skipData(size_t &size)
00325 {
00326
00327 char tmpbuf[BUFF_SIZE];
00328
00329 try {
00330
00331 while (size > 0 && _stream.good())
00332 {
00333 size_t readsize = BUFF_SIZE;
00334 if (size < BUFF_SIZE) readsize = size;
00335
00336
00337 _stream.read(tmpbuf, readsize);
00338
00339
00340 size -= readsize;
00341 }
00342 } catch (ios_base::failure ex) {
00343 _underflow_state = IDLE;
00344 throw StreamErrorException("read error");
00345 }
00346 }
00347
00348
00349 int StreamConnection::StreamBuffer::underflow()
00350 {
00351
00352 if (!good()) throw StreamErrorException();
00353
00354
00355 ibrcommon::MutexLock l(_underflow_mutex);
00356
00357 try {
00358 if (_underflow_state == DATA_TRANSFER)
00359 {
00360
00361 if (get(STREAM_REJECT))
00362 {
00363
00364 {
00365 ibrcommon::MutexLock l(_sendlock);
00366
00367
00368 _stream << StreamDataSegment(StreamDataSegment::MSG_REFUSE_BUNDLE) << std::flush;
00369 }
00370
00371
00372 skipData(_underflow_data_remain);
00373
00374
00375 _underflow_state = IDLE;
00376 }
00377
00378 else if (_underflow_data_remain == 0)
00379 {
00380
00381 if (get(STREAM_ACK_SUPPORT))
00382 {
00383 ibrcommon::MutexLock l(_sendlock);
00384 _stream << StreamDataSegment(StreamDataSegment::MSG_ACK_SEGMENT, _recv_size) << std::flush;
00385 }
00386
00387
00388 _underflow_state = IDLE;
00389 }
00390 }
00391
00392
00393 while (_underflow_state == IDLE)
00394 {
00395
00396 dtn::streams::StreamDataSegment seg;
00397
00398 try {
00399
00400 _stream >> seg;
00401 } catch (ios_base::failure ex) {
00402 throw StreamErrorException("read error");
00403 }
00404
00405
00406 {
00407 ibrcommon::MutexLock timerl(_timer_lock);
00408 _in_timeout_value = _in_timeout;
00409 }
00410
00411 switch (seg._type)
00412 {
00413 case StreamDataSegment::MSG_DATA_SEGMENT:
00414 {
00415 if (seg._flags & StreamDataSegment::MSG_MARK_BEGINN)
00416 {
00417 _recv_size = seg._value;
00418 unset(STREAM_REJECT);
00419 }
00420 else
00421 {
00422 _recv_size += seg._value;
00423 }
00424
00425
00426 _underflow_data_remain = seg._value;
00427
00428 if (get(STREAM_REJECT))
00429 {
00430
00431 {
00432
00433 ibrcommon::MutexLock l(_sendlock);
00434
00435
00436 _stream << StreamDataSegment(StreamDataSegment::MSG_REFUSE_BUNDLE, 0) << std::flush;
00437 }
00438
00439
00440 skipData(_underflow_data_remain);
00441 }
00442 else
00443 {
00444
00445 _underflow_state = DATA_TRANSFER;
00446 }
00447 break;
00448 }
00449
00450 case StreamDataSegment::MSG_ACK_SEGMENT:
00451 {
00452
00453 if (get(STREAM_ACK_SUPPORT))
00454 {
00455 ibrcommon::LockedQueue<StreamDataSegment> q = _segments.LockedAccess();
00456
00457 if ((*q).empty())
00458 {
00459 IBRCOMMON_LOGGER(error) << "got an unexpected ACK with size of " << seg._value << IBRCOMMON_LOGGER_ENDL;
00460 }
00461 else
00462 {
00463 StreamDataSegment &qs = (*q).front();
00464
00465 if (qs._flags & StreamDataSegment::MSG_MARK_END)
00466 {
00467 _conn.eventBundleForwarded();
00468 }
00469
00470 (*q).pop();
00471
00472 _conn.eventBundleAck(seg._value);
00473 }
00474 }
00475 break;
00476 }
00477
00478 case StreamDataSegment::MSG_KEEPALIVE:
00479 break;
00480
00481 case StreamDataSegment::MSG_REFUSE_BUNDLE:
00482 {
00483
00484 if (get(STREAM_ACK_SUPPORT) && get(STREAM_NACK_SUPPORT))
00485 {
00486 ibrcommon::LockedQueue<StreamDataSegment> q = _segments.LockedAccess();
00487
00488
00489 if (!_rejected_segments.empty())
00490 {
00491 _rejected_segments.pop();
00492
00493
00494 IBRCOMMON_LOGGER_DEBUG(30) << "NACK received, still " << _rejected_segments.size() << " segments to NACK" << IBRCOMMON_LOGGER_ENDL;
00495 }
00496 else if ((*q).empty())
00497 {
00498 IBRCOMMON_LOGGER(error) << "got an unexpected NACK" << IBRCOMMON_LOGGER_ENDL;
00499 }
00500 else
00501 {
00502
00503 IBRCOMMON_LOGGER_DEBUG(20) << "NACK received!" << IBRCOMMON_LOGGER_ENDL;
00504
00505
00506 (*q).pop();
00507
00508
00509 while (!(*q).empty())
00510 {
00511 if ((*q).front()._flags & StreamDataSegment::MSG_MARK_BEGINN)
00512 {
00513 break;
00514 }
00515
00516
00517 _rejected_segments.push((*q).front());
00518 (*q).pop();
00519 }
00520
00521
00522 _conn.eventBundleRefused();
00523
00524
00525 IBRCOMMON_LOGGER_DEBUG(30) << _rejected_segments.size() << " segments to NACK" << IBRCOMMON_LOGGER_ENDL;
00526
00527
00528 if ((*q).empty())
00529 {
00530 set(STREAM_SKIP);
00531
00532
00533 IBRCOMMON_LOGGER_DEBUG(25) << "skip the current transfer" << IBRCOMMON_LOGGER_ENDL;
00534 }
00535 }
00536 }
00537 else
00538 {
00539 IBRCOMMON_LOGGER(error) << "got an unexpected NACK" << IBRCOMMON_LOGGER_ENDL;
00540 }
00541
00542 break;
00543 }
00544
00545 case StreamDataSegment::MSG_SHUTDOWN:
00546 {
00547 throw StreamShutdownException();
00548 }
00549 }
00550 }
00551
00552
00553 size_t readsize = BUFF_SIZE;
00554 if (_underflow_data_remain < BUFF_SIZE) readsize = _underflow_data_remain;
00555
00556 try {
00557
00558 _stream.read(in_buf_, readsize);
00559 } catch (ios_base::failure ex) {
00560 _underflow_state = IDLE;
00561 throw StreamErrorException("read error");
00562 }
00563
00564
00565 _underflow_data_remain -= readsize;
00566
00567
00568
00569 setg(in_buf_, in_buf_, in_buf_ + readsize);
00570
00571 return traits_type::not_eof(in_buf_[0]);
00572
00573 } catch (StreamClosedException ex) {
00574
00575 set(STREAM_FAILED);
00576
00577 IBRCOMMON_LOGGER_DEBUG(10) << "StreamClosedException in underflow()" << IBRCOMMON_LOGGER_ENDL;
00578 _conn.shutdown(CONNECTION_SHUTDOWN_ERROR);
00579 } catch (StreamErrorException ex) {
00580
00581 set(STREAM_FAILED);
00582
00583 IBRCOMMON_LOGGER_DEBUG(10) << "StreamErrorException in underflow(): " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00584 _conn.shutdown(CONNECTION_SHUTDOWN_ERROR);
00585 } catch (StreamShutdownException ex) {
00586
00587 set(STREAM_FAILED);
00588
00589 IBRCOMMON_LOGGER_DEBUG(10) << "StreamShutdownException in underflow()" << IBRCOMMON_LOGGER_ENDL;
00590 _conn.shutdown(CONNECTION_SHUTDOWN_PEER_SHUTDOWN);
00591 }
00592
00593 return traits_type::eof();
00594 }
00595 }
00596 }