|
IBR-DTNSuite 0.6
|
00001 /* 00002 * bpstreambuf.cpp 00003 * 00004 * Created on: 14.07.2009 00005 * Author: morgenro 00006 */ 00007 00008 #include "ibrdtn/streams/StreamConnection.h" 00009 #include <ibrcommon/Logger.h> 00010 #include <ibrcommon/TimeMeasurement.h> 00011 00012 namespace dtn 00013 { 00014 namespace streams 00015 { 00016 StreamConnection::StreamBuffer::StreamBuffer(StreamConnection &conn, iostream &stream, const size_t buffer_size) 00017 : _buffer_size(buffer_size), _statebits(STREAM_SOB), _conn(conn), in_buf_(new char[buffer_size]), out_buf_(new char[buffer_size]), _stream(stream), 00018 _recv_size(0), _underflow_data_remain(0), _underflow_state(IDLE) 00019 { 00020 // Initialize get pointer. This should be zero so that underflow is called upon first read. 00021 setg(0, 0, 0); 00022 setp(out_buf_, out_buf_ + _buffer_size - 1); 00023 } 00024 00025 StreamConnection::StreamBuffer::~StreamBuffer() 00026 { 00027 // clear the own buffer 00028 delete [] in_buf_; 00029 delete [] out_buf_; 00030 } 00031 00032 bool StreamConnection::StreamBuffer::get(const StateBits bit) const 00033 { 00034 return (_statebits & bit); 00035 } 00036 00037 void StreamConnection::StreamBuffer::set(const StateBits bit) 00038 { 00039 ibrcommon::MutexLock l(_statelock); 00040 _statebits |= bit; 00041 } 00042 00043 void StreamConnection::StreamBuffer::unset(const StateBits bit) 00044 { 00045 ibrcommon::MutexLock l(_statelock); 00046 _statebits &= ~(bit); 00047 } 00048 00049 void StreamConnection::StreamBuffer::__error() const 00050 { 00051 IBRCOMMON_LOGGER_DEBUG(80) << "StreamBuffer Debugging" << IBRCOMMON_LOGGER_ENDL; 00052 IBRCOMMON_LOGGER_DEBUG(80) << "---------------------------------------" << IBRCOMMON_LOGGER_ENDL; 00053 IBRCOMMON_LOGGER_DEBUG(80) << "Buffer size: " << _buffer_size << IBRCOMMON_LOGGER_ENDL; 00054 IBRCOMMON_LOGGER_DEBUG(80) << "State bits: " << _statebits << IBRCOMMON_LOGGER_ENDL; 00055 IBRCOMMON_LOGGER_DEBUG(80) << "Recv size: " << _recv_size << IBRCOMMON_LOGGER_ENDL; 00056 IBRCOMMON_LOGGER_DEBUG(80) << "Segments: " << _segments.size() << IBRCOMMON_LOGGER_ENDL; 00057 IBRCOMMON_LOGGER_DEBUG(80) << "Reject segments: " << _rejected_segments.size() << IBRCOMMON_LOGGER_ENDL; 00058 IBRCOMMON_LOGGER_DEBUG(80) << "Underflow remaining: " << _underflow_data_remain << IBRCOMMON_LOGGER_ENDL; 00059 IBRCOMMON_LOGGER_DEBUG(80) << "Underflow state: " << _underflow_state << IBRCOMMON_LOGGER_ENDL; 00060 IBRCOMMON_LOGGER_DEBUG(80) << "---------------------------------------" << IBRCOMMON_LOGGER_ENDL; 00061 00062 if (_statebits & STREAM_FAILED) 00063 { 00064 IBRCOMMON_LOGGER_DEBUG(80) << "stream went bad: STREAM_FAILED is set" << IBRCOMMON_LOGGER_ENDL; 00065 } 00066 00067 if (_statebits & STREAM_BAD) 00068 { 00069 IBRCOMMON_LOGGER_DEBUG(80) << "stream went bad: STREAM_BAD is set" << IBRCOMMON_LOGGER_ENDL; 00070 } 00071 00072 if (_statebits & STREAM_EOF) 00073 { 00074 IBRCOMMON_LOGGER_DEBUG(80) << "stream went bad: STREAM_EOF is set" << IBRCOMMON_LOGGER_ENDL; 00075 } 00076 00077 if (_statebits & STREAM_SHUTDOWN) 00078 { 00079 IBRCOMMON_LOGGER_DEBUG(80) << "stream went bad: STREAM_SHUTDOWN is set" << IBRCOMMON_LOGGER_ENDL; 00080 } 00081 00082 if (_statebits & STREAM_CLOSED) 00083 { 00084 IBRCOMMON_LOGGER_DEBUG(80) << "stream went bad: STREAM_CLOSED is set" << IBRCOMMON_LOGGER_ENDL; 00085 } 00086 00087 if (!_stream.good()) 00088 { 00089 IBRCOMMON_LOGGER_DEBUG(80) << "stream went bad: good() returned false" << IBRCOMMON_LOGGER_ENDL; 00090 } 00091 } 00092 00093 bool StreamConnection::StreamBuffer::__good() const 00094 { 00095 int badbits = STREAM_FAILED + STREAM_BAD + STREAM_EOF + STREAM_SHUTDOWN + STREAM_CLOSED; 00096 return !(badbits & _statebits); 00097 } 00098 00107 const StreamContactHeader StreamConnection::StreamBuffer::handshake(const StreamContactHeader &header) 00108 { 00109 try { 00110 // make the send-call atomic 00111 { 00112 ibrcommon::MutexLock l(_sendlock); 00113 00114 // transfer the local header 00115 _stream << header << std::flush; 00116 } 00117 00118 // receive the remote header 00119 StreamContactHeader peer; 00120 _stream >> peer; 00121 00122 // enable/disable ACK/NACK support 00123 if (peer._flags & StreamContactHeader::REQUEST_ACKNOWLEDGMENTS) set(STREAM_ACK_SUPPORT); 00124 if (peer._flags & StreamContactHeader::REQUEST_NEGATIVE_ACKNOWLEDGMENTS) set(STREAM_NACK_SUPPORT); 00125 00126 // set the incoming timer if set (> 0) 00127 if (peer._keepalive > 0) 00128 { 00129 // mark timer support 00130 set(STREAM_TIMER_SUPPORT); 00131 } 00132 00133 // set handshake completed bit 00134 set(STREAM_HANDSHAKE); 00135 00136 // return the received header 00137 return peer; 00138 00139 } catch (const std::exception&) { 00140 // set failed bit 00141 set(STREAM_FAILED); 00142 00143 // shutdown the stream 00144 shutdown(StreamDataSegment::MSG_SHUTDOWN_VERSION_MISSMATCH); 00145 00146 // call the shutdown event 00147 _conn.shutdown(CONNECTION_SHUTDOWN_ERROR); 00148 00149 // forward the catched exception 00150 throw StreamErrorException("handshake not completed"); 00151 } 00152 } 00153 00159 void StreamConnection::StreamBuffer::shutdown(const StreamDataSegment::ShutdownReason reason) 00160 { 00161 try { 00162 ibrcommon::MutexLock l(_sendlock); 00163 // send a SHUTDOWN message 00164 _stream << StreamDataSegment(reason) << std::flush; 00165 } catch (const std::exception&) { 00166 // set failed bit 00167 set(STREAM_FAILED); 00168 00169 throw StreamErrorException("can not send shutdown message"); 00170 } 00171 } 00172 00173 void StreamConnection::StreamBuffer::keepalive() 00174 { 00175 try { 00176 ibrcommon::MutexLock l(_sendlock); 00177 _stream << StreamDataSegment() << std::flush; 00178 IBRCOMMON_LOGGER_DEBUG(15) << "KEEPALIVE sent" << IBRCOMMON_LOGGER_ENDL; 00179 } catch (const std::exception&) { 00180 // set failed bit 00181 set(STREAM_FAILED); 00182 } 00183 } 00184 00185 void StreamConnection::StreamBuffer::close() 00186 { 00187 // set shutdown bit 00188 set(STREAM_SHUTDOWN); 00189 } 00190 00191 void StreamConnection::StreamBuffer::reject() 00192 { 00193 // we have to reject the current transmission 00194 // so we have to discard all all data until the next segment with a start bit 00195 set(STREAM_REJECT); 00196 00197 // set the current in buffer to zero 00198 // this should result in a underflow call on the next read 00199 setg(0, 0, 0); 00200 } 00201 00202 void StreamConnection::StreamBuffer::abort() 00203 { 00204 _segments.abort(); 00205 } 00206 00207 void StreamConnection::StreamBuffer::wait() 00208 { 00209 // TODO: get max time to wait out of the timeout values 00210 size_t timeout = 0; 00211 00212 try { 00213 IBRCOMMON_LOGGER_DEBUG(15) << "waitCompleted(): wait for completion of transmission, " << _segments.size() << " ACKs left" << IBRCOMMON_LOGGER_ENDL; 00214 _segments.wait(ibrcommon::Queue<StreamDataSegment>::QUEUE_EMPTY, timeout); 00215 IBRCOMMON_LOGGER_DEBUG(15) << "waitCompleted(): transfer completed" << IBRCOMMON_LOGGER_ENDL; 00216 } catch (const ibrcommon::QueueUnblockedException&) { 00217 IBRCOMMON_LOGGER_DEBUG(15) << "waitCompleted(): transfer aborted (timeout)" << IBRCOMMON_LOGGER_ENDL; 00218 } 00219 } 00220 00221 // This function is called when the output buffer is filled. 00222 // In this function, the buffer should be written to wherever it should 00223 // be written to (in this case, the streambuf object that this is controlling). 00224 int StreamConnection::StreamBuffer::overflow(int c) 00225 { 00226 IBRCOMMON_LOGGER_DEBUG(90) << "StreamBuffer::overflow() called" << IBRCOMMON_LOGGER_ENDL; 00227 00228 try { 00229 char *ibegin = out_buf_; 00230 char *iend = pptr(); 00231 00232 // mark the buffer as free 00233 setp(out_buf_, out_buf_ + _buffer_size - 1); 00234 00235 // append the last character 00236 if(!traits_type::eq_int_type(c, traits_type::eof())) { 00237 *iend++ = traits_type::to_char_type(c); 00238 } 00239 00240 // if there is nothing to send, just return 00241 if ((iend - ibegin) == 0) 00242 { 00243 return traits_type::not_eof(c); 00244 } 00245 00246 // wrap a segment around the data 00247 StreamDataSegment seg(StreamDataSegment::MSG_DATA_SEGMENT, (iend - ibegin)); 00248 00249 // set the start flag 00250 if (get(STREAM_SOB)) 00251 { 00252 seg._flags |= StreamDataSegment::MSG_MARK_BEGINN; 00253 unset(STREAM_SKIP); 00254 unset(STREAM_SOB); 00255 } 00256 00257 if (char_traits<char>::eq_int_type(c, char_traits<char>::eof())) 00258 { 00259 // set the end flag 00260 seg._flags |= StreamDataSegment::MSG_MARK_END; 00261 set(STREAM_SOB); 00262 } 00263 00264 if (!get(STREAM_SKIP)) 00265 { 00266 // put the segment into the queue 00267 if (get(STREAM_ACK_SUPPORT)) 00268 { 00269 _segments.push(seg); 00270 } 00271 else if (seg._flags & StreamDataSegment::MSG_MARK_END) 00272 { 00273 // without ACK support we have to assume that a bundle is forwarded 00274 // when the last segment is sent. 00275 _conn.eventBundleForwarded(); 00276 } 00277 00278 ibrcommon::MutexLock l(_sendlock); 00279 if (!_stream.good()) throw StreamErrorException("stream went bad"); 00280 00281 // write the segment to the stream 00282 _stream << seg; 00283 _stream.write(out_buf_, seg._value); 00284 } 00285 00286 return traits_type::not_eof(c); 00287 } catch (const StreamClosedException&) { 00288 // set failed bit 00289 set(STREAM_FAILED); 00290 00291 IBRCOMMON_LOGGER_DEBUG(10) << "StreamClosedException in overflow()" << IBRCOMMON_LOGGER_ENDL; 00292 00293 throw; 00294 } catch (const StreamErrorException&) { 00295 // set failed bit 00296 set(STREAM_FAILED); 00297 00298 IBRCOMMON_LOGGER_DEBUG(10) << "StreamErrorException in overflow()" << IBRCOMMON_LOGGER_ENDL; 00299 00300 throw; 00301 } catch (const ios_base::failure&) { 00302 // set failed bit 00303 set(STREAM_FAILED); 00304 00305 IBRCOMMON_LOGGER_DEBUG(10) << "ios_base::failure in overflow()" << IBRCOMMON_LOGGER_ENDL; 00306 00307 throw; 00308 } 00309 00310 return traits_type::eof(); 00311 } 00312 00313 // This is called to flush the buffer. 00314 // This is called when we're done with the file stream (or when .flush() is called). 00315 int StreamConnection::StreamBuffer::sync() 00316 { 00317 int ret = traits_type::eq_int_type(this->overflow(traits_type::eof()), 00318 traits_type::eof()) ? -1 : 0; 00319 00320 try { 00321 ibrcommon::MutexLock l(_sendlock); 00322 00323 // ... and flush. 00324 _stream.flush(); 00325 } catch (const ios_base::failure&) { 00326 // set failed bit 00327 set(STREAM_BAD); 00328 00329 _conn.shutdown(CONNECTION_SHUTDOWN_ERROR); 00330 } 00331 00332 return ret; 00333 } 00334 00335 void StreamConnection::StreamBuffer::skipData(size_t &size) 00336 { 00337 // a temporary buffer 00338 char tmpbuf[_buffer_size]; 00339 00340 try { 00341 // and read until the next segment 00342 while (size > 0 && _stream.good()) 00343 { 00344 size_t readsize = _buffer_size; 00345 if (size < _buffer_size) readsize = size; 00346 00347 // to reject a bundle read all remaining data of this segment 00348 _stream.read(tmpbuf, readsize); 00349 00350 // adjust the remain counter 00351 size -= readsize; 00352 } 00353 } catch (const ios_base::failure &ex) { 00354 _underflow_state = IDLE; 00355 throw StreamErrorException("read error during data skip: " + std::string(ex.what())); 00356 } 00357 } 00358 00359 // Fill the input buffer. This reads out of the streambuf. 00360 int StreamConnection::StreamBuffer::underflow() 00361 { 00362 IBRCOMMON_LOGGER_DEBUG(90) << "StreamBuffer::underflow() called" << IBRCOMMON_LOGGER_ENDL; 00363 00364 try { 00365 if (_underflow_state == DATA_TRANSFER) 00366 { 00367 // on bundle reject 00368 if (get(STREAM_REJECT)) 00369 { 00370 // send NACK on bundle reject 00371 if (get(STREAM_NACK_SUPPORT)) 00372 { 00373 ibrcommon::MutexLock l(_sendlock); 00374 if (!_stream.good()) throw StreamErrorException("stream went bad"); 00375 00376 // send a REFUSE message 00377 _stream << StreamDataSegment(StreamDataSegment::MSG_REFUSE_BUNDLE) << std::flush; 00378 } 00379 00380 // skip data in this segment 00381 skipData(_underflow_data_remain); 00382 00383 // return to idle state 00384 _underflow_state = IDLE; 00385 } 00386 // send ACK if the data segment is received completely 00387 else if (_underflow_data_remain == 0) 00388 { 00389 // New data segment received. Send an ACK. 00390 if (get(STREAM_ACK_SUPPORT)) 00391 { 00392 ibrcommon::MutexLock l(_sendlock); 00393 if (!_stream.good()) throw StreamErrorException("stream went bad"); 00394 _stream << StreamDataSegment(StreamDataSegment::MSG_ACK_SEGMENT, _recv_size) << std::flush; 00395 } 00396 00397 // return to idle state 00398 _underflow_state = IDLE; 00399 } 00400 } 00401 00402 // read segments until DATA is AVAILABLE 00403 while (_underflow_state == IDLE) 00404 { 00405 // container for segment data 00406 dtn::streams::StreamDataSegment seg; 00407 00408 try { 00409 // read the segment 00410 if (!_stream.good()) throw StreamErrorException("stream went bad"); 00411 00412 _stream >> seg; 00413 } catch (const ios_base::failure &ex) { 00414 throw StreamErrorException("read error: " + std::string(ex.what())); 00415 } 00416 00417 switch (seg._type) 00418 { 00419 case StreamDataSegment::MSG_DATA_SEGMENT: 00420 { 00421 IBRCOMMON_LOGGER_DEBUG(70) << "MSG_DATA_SEGMENT received, size: " << seg._value << IBRCOMMON_LOGGER_ENDL; 00422 00423 if (seg._flags & StreamDataSegment::MSG_MARK_BEGINN) 00424 { 00425 _recv_size = seg._value; 00426 unset(STREAM_REJECT); 00427 } 00428 else 00429 { 00430 _recv_size += seg._value; 00431 } 00432 00433 // set the new data length 00434 _underflow_data_remain = seg._value; 00435 00436 if (get(STREAM_REJECT)) 00437 { 00438 // send NACK on bundle reject 00439 if (get(STREAM_NACK_SUPPORT)) 00440 { 00441 // lock for sending 00442 ibrcommon::MutexLock l(_sendlock); 00443 if (!_stream.good()) throw StreamErrorException("stream went bad"); 00444 00445 // send a NACK message 00446 _stream << StreamDataSegment(StreamDataSegment::MSG_REFUSE_BUNDLE, 0) << std::flush; 00447 } 00448 00449 // skip data in this segment 00450 skipData(_underflow_data_remain); 00451 } 00452 else 00453 { 00454 // announce the new data block 00455 _underflow_state = DATA_TRANSFER; 00456 } 00457 break; 00458 } 00459 00460 case StreamDataSegment::MSG_ACK_SEGMENT: 00461 { 00462 IBRCOMMON_LOGGER_DEBUG(70) << "MSG_ACK_SEGMENT received, size: " << seg._value << IBRCOMMON_LOGGER_ENDL; 00463 00464 // remove the segment in the queue 00465 if (get(STREAM_ACK_SUPPORT)) 00466 { 00467 ibrcommon::Queue<StreamDataSegment>::Locked q = _segments.exclusive(); 00468 if (q.empty()) 00469 { 00470 IBRCOMMON_LOGGER(error) << "got an unexpected ACK with size of " << seg._value << IBRCOMMON_LOGGER_ENDL; 00471 } 00472 else 00473 { 00474 StreamDataSegment &qs = q.front(); 00475 00476 if (qs._flags & StreamDataSegment::MSG_MARK_END) 00477 { 00478 _conn.eventBundleForwarded(); 00479 } 00480 00481 IBRCOMMON_LOGGER_DEBUG(60) << q.size() << " elements to ACK" << IBRCOMMON_LOGGER_ENDL; 00482 00483 _conn.eventBundleAck(seg._value); 00484 00485 q.pop(); 00486 } 00487 } 00488 break; 00489 } 00490 00491 case StreamDataSegment::MSG_KEEPALIVE: 00492 IBRCOMMON_LOGGER_DEBUG(70) << "MSG_KEEPALIVE received, size: " << seg._value << IBRCOMMON_LOGGER_ENDL; 00493 break; 00494 00495 case StreamDataSegment::MSG_REFUSE_BUNDLE: 00496 { 00497 IBRCOMMON_LOGGER_DEBUG(70) << "MSG_REFUSE_BUNDLE received, flags: " << seg._flags << IBRCOMMON_LOGGER_ENDL; 00498 00499 // TODO: Test bundle rejection! 00500 00501 // remove the segment in the queue 00502 if (get(STREAM_ACK_SUPPORT) && get(STREAM_NACK_SUPPORT)) 00503 { 00504 // skip segments 00505 if (!_rejected_segments.empty()) 00506 { 00507 _rejected_segments.pop(); 00508 00509 // we received a NACK 00510 IBRCOMMON_LOGGER_DEBUG(30) << "NACK received, still " << _rejected_segments.size() << " segments to NACK" << IBRCOMMON_LOGGER_ENDL; 00511 } 00512 else try 00513 { 00514 StreamDataSegment qs = _segments.getnpop(); 00515 00516 // we received a NACK 00517 IBRCOMMON_LOGGER_DEBUG(20) << "NACK received!" << IBRCOMMON_LOGGER_ENDL; 00518 00519 // get all segment ACKs in the queue for this transmission 00520 while (!_segments.empty()) 00521 { 00522 StreamDataSegment &seg = _segments.front(); 00523 if (seg._flags & StreamDataSegment::MSG_MARK_BEGINN) 00524 { 00525 break; 00526 } 00527 00528 // move the segments to another queue 00529 _rejected_segments.push(seg); 00530 _segments.pop(); 00531 } 00532 00533 // call event reject 00534 _conn.eventBundleRefused(); 00535 00536 // we received a NACK 00537 IBRCOMMON_LOGGER_DEBUG(30) << _rejected_segments.size() << " segments to NACK" << IBRCOMMON_LOGGER_ENDL; 00538 00539 // the queue is empty, then skip the current transfer 00540 if (_segments.empty()) 00541 { 00542 set(STREAM_SKIP); 00543 00544 // we received a NACK 00545 IBRCOMMON_LOGGER_DEBUG(25) << "skip the current transfer" << IBRCOMMON_LOGGER_ENDL; 00546 } 00547 00548 } catch (const ibrcommon::QueueUnblockedException&) { 00549 IBRCOMMON_LOGGER(error) << "got an unexpected NACK" << IBRCOMMON_LOGGER_ENDL; 00550 } 00551 00552 } 00553 else 00554 { 00555 IBRCOMMON_LOGGER(error) << "got an unexpected NACK" << IBRCOMMON_LOGGER_ENDL; 00556 } 00557 00558 break; 00559 } 00560 00561 case StreamDataSegment::MSG_SHUTDOWN: 00562 { 00563 IBRCOMMON_LOGGER_DEBUG(70) << "MSG_SHUTDOWN received" << IBRCOMMON_LOGGER_ENDL; 00564 throw StreamShutdownException(); 00565 } 00566 } 00567 } 00568 00569 // currently transferring data 00570 size_t readsize = _buffer_size; 00571 if (_underflow_data_remain < _buffer_size) readsize = _underflow_data_remain; 00572 00573 try { 00574 if (!_stream.good()) throw StreamErrorException("stream went bad"); 00575 00576 // here receive the data 00577 _stream.read(in_buf_, readsize); 00578 } catch (const ios_base::failure &ex) { 00579 _underflow_state = IDLE; 00580 throw StreamErrorException("read error: " + std::string(ex.what())); 00581 } 00582 00583 // adjust the remain counter 00584 _underflow_data_remain -= readsize; 00585 00586 // Since the input buffer content is now valid (or is new) 00587 // the get pointer should be initialized (or reset). 00588 setg(in_buf_, in_buf_, in_buf_ + readsize); 00589 00590 return traits_type::not_eof(in_buf_[0]); 00591 00592 } catch (const StreamClosedException&) { 00593 // set failed bit 00594 set(STREAM_FAILED); 00595 00596 IBRCOMMON_LOGGER_DEBUG(10) << "StreamClosedException in underflow()" << IBRCOMMON_LOGGER_ENDL; 00597 00598 } catch (const StreamErrorException &ex) { 00599 // set failed bit 00600 set(STREAM_FAILED); 00601 00602 IBRCOMMON_LOGGER_DEBUG(10) << "StreamErrorException in underflow(): " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00603 00604 throw; 00605 } catch (const StreamShutdownException&) { 00606 // set failed bit 00607 set(STREAM_FAILED); 00608 00609 IBRCOMMON_LOGGER_DEBUG(10) << "StreamShutdownException in underflow()" << IBRCOMMON_LOGGER_ENDL; 00610 } 00611 00612 return traits_type::eof(); 00613 } 00614 } 00615 }