• Main Page
  • Namespaces
  • Classes
  • Files
  • File List
  • File Members

ibrdtn/ibrdtn/streams/StreamBuffer.cpp

Go to the documentation of this file.
00001 /*
00002  * bpstreambuf.cpp
00003  *
00004  *  Created on: 14.07.2009
00005  *      Author: morgenro
00006  */
00007 
00008 #include "ibrdtn/streams/StreamConnection.h"
00009 #include <ibrcommon/Logger.h>
00010 #include <ibrcommon/TimeMeasurement.h>
00011 
00012 namespace dtn
00013 {
00014         namespace streams
00015         {
00016                 StreamConnection::StreamBuffer::StreamBuffer(StreamConnection &conn, iostream &stream, const size_t buffer_size)
00017                         : _buffer_size(buffer_size), _statebits(STREAM_SOB), _conn(conn), in_buf_(new char[buffer_size]), out_buf_(new char[buffer_size]), _stream(stream),
00018                           _recv_size(0), _timer(*this, 0), _underflow_data_remain(0), _underflow_state(IDLE)
00019                 {
00020                         // Initialize get pointer.  This should be zero so that underflow is called upon first read.
00021                         setg(0, 0, 0);
00022                         setp(out_buf_, out_buf_ + _buffer_size - 1);
00023                 }
00024 
00025                 StreamConnection::StreamBuffer::~StreamBuffer()
00026                 {
00027                         // stop all timer
00028                         _timer.remove();
00029 
00030                         // clear the own buffer
00031                         delete [] in_buf_;
00032                         delete [] out_buf_;
00033                 }
00034 
00035                 bool StreamConnection::StreamBuffer::get(const StateBits bit) const
00036                 {
00037                         return (_statebits & bit);
00038                 }
00039 
00040                 void StreamConnection::StreamBuffer::set(const StateBits bit)
00041                 {
00042                         ibrcommon::MutexLock l(_statelock);
00043                         _statebits |= bit;
00044                 }
00045 
00046                 void StreamConnection::StreamBuffer::unset(const StateBits bit)
00047                 {
00048                         ibrcommon::MutexLock l(_statelock);
00049                         _statebits &= ~(bit);
00050                 }
00051 
00052                 void StreamConnection::StreamBuffer::__error() const
00053                 {
00054                         IBRCOMMON_LOGGER_DEBUG(80) << "StreamBuffer Debugging" << IBRCOMMON_LOGGER_ENDL;
00055                         IBRCOMMON_LOGGER_DEBUG(80) << "---------------------------------------" << IBRCOMMON_LOGGER_ENDL;
00056                         IBRCOMMON_LOGGER_DEBUG(80) << "Buffer size: " << _buffer_size << IBRCOMMON_LOGGER_ENDL;
00057                         IBRCOMMON_LOGGER_DEBUG(80) << "State bits: " << _statebits << IBRCOMMON_LOGGER_ENDL;
00058                         IBRCOMMON_LOGGER_DEBUG(80) << "Recv size: " << _recv_size << IBRCOMMON_LOGGER_ENDL;
00059                         IBRCOMMON_LOGGER_DEBUG(80) << "Timeout: " << _in_timeout << IBRCOMMON_LOGGER_ENDL;
00060                         IBRCOMMON_LOGGER_DEBUG(80) << "Current timeout: " << _in_timeout_value << IBRCOMMON_LOGGER_ENDL;
00061                         IBRCOMMON_LOGGER_DEBUG(80) << "Segments: " << _segments.size() << IBRCOMMON_LOGGER_ENDL;
00062                         IBRCOMMON_LOGGER_DEBUG(80) << "Reject segments: " << _rejected_segments.size() << IBRCOMMON_LOGGER_ENDL;
00063                         IBRCOMMON_LOGGER_DEBUG(80) << "Underflow remaining: " << _underflow_data_remain << IBRCOMMON_LOGGER_ENDL;
00064                         IBRCOMMON_LOGGER_DEBUG(80) << "Underflow state: " << _underflow_state << IBRCOMMON_LOGGER_ENDL;
00065                         IBRCOMMON_LOGGER_DEBUG(80) << "---------------------------------------" << IBRCOMMON_LOGGER_ENDL;
00066 
00067                         if (_statebits & STREAM_FAILED)
00068                         {
00069                                 IBRCOMMON_LOGGER_DEBUG(80) << "stream went bad: STREAM_FAILED is set" << IBRCOMMON_LOGGER_ENDL;
00070                         }
00071 
00072                         if (_statebits & STREAM_BAD)
00073                         {
00074                                 IBRCOMMON_LOGGER_DEBUG(80) << "stream went bad: STREAM_BAD is set" << IBRCOMMON_LOGGER_ENDL;
00075                         }
00076 
00077                         if (_statebits & STREAM_EOF)
00078                         {
00079                                 IBRCOMMON_LOGGER_DEBUG(80) << "stream went bad: STREAM_EOF is set" << IBRCOMMON_LOGGER_ENDL;
00080                         }
00081 
00082                         if (_statebits & STREAM_SHUTDOWN)
00083                         {
00084                                 IBRCOMMON_LOGGER_DEBUG(80) << "stream went bad: STREAM_SHUTDOWN is set" << IBRCOMMON_LOGGER_ENDL;
00085                         }
00086 
00087                         if (_statebits & STREAM_CLOSED)
00088                         {
00089                                 IBRCOMMON_LOGGER_DEBUG(80) << "stream went bad: STREAM_CLOSED is set" << IBRCOMMON_LOGGER_ENDL;
00090                         }
00091 
00092                         if (!_stream.good())
00093                         {
00094                                 IBRCOMMON_LOGGER_DEBUG(80) << "stream went bad: good() returned false" << IBRCOMMON_LOGGER_ENDL;
00095                         }
00096                 }
00097 
00098                 bool StreamConnection::StreamBuffer::__good() const
00099                 {
00100                         int badbits = STREAM_FAILED + STREAM_BAD + STREAM_EOF + STREAM_SHUTDOWN + STREAM_CLOSED;
00101                         return !(badbits & _statebits);
00102                 }
00103 
00112                 const StreamContactHeader StreamConnection::StreamBuffer::handshake(const StreamContactHeader &header)
00113                 {
00114                         try {
00115                                 // make the send-call atomic
00116                                 {
00117                                         ibrcommon::MutexLock l(_sendlock);
00118 
00119                                         // transfer the local header
00120                                         _stream << header << std::flush;
00121                                 }
00122 
00123                                 // receive the remote header
00124                                 StreamContactHeader peer;
00125                                 _stream >> peer;
00126 
00127                                 // enable/disable ACK/NACK support
00128                                 if (peer._flags & StreamContactHeader::REQUEST_ACKNOWLEDGMENTS) set(STREAM_ACK_SUPPORT);
00129                                 if (peer._flags & StreamContactHeader::REQUEST_NEGATIVE_ACKNOWLEDGMENTS) set(STREAM_NACK_SUPPORT);
00130 
00131                                 // set the incoming timer if set (> 0)
00132                                 if (peer._keepalive > 0)
00133                                 {
00134                                         set(STREAM_TIMER_SUPPORT);
00135 
00136                                         // read the timer values
00137                                         _in_timeout = header._keepalive * 2;
00138 
00139                                         // activate timer
00140                                         ibrcommon::MutexLock timerl(_timer_lock);
00141                                         _in_timeout_value = _in_timeout;
00142                                         _timer.set(1);
00143 
00144                                         // start the timer
00145                                         _timer.start();
00146                                 }
00147 
00148                                 // set handshake completed bit
00149                                 set(STREAM_HANDSHAKE);
00150 
00151                                 // return the received header
00152                                 return peer;
00153 
00154                         } catch (const std::exception&) {
00155                                 // set failed bit
00156                                 set(STREAM_FAILED);
00157 
00158                                 // shutdown the stream
00159                                 shutdown(StreamDataSegment::MSG_SHUTDOWN_VERSION_MISSMATCH);
00160 
00161                                 // call the shutdown event
00162                                 _conn.shutdown(CONNECTION_SHUTDOWN_ERROR);
00163 
00164                                 // forward the catched exception
00165                                 throw StreamErrorException("handshake not completed");
00166                         }
00167                 }
00168 
00174                 void StreamConnection::StreamBuffer::shutdown(const StreamDataSegment::ShutdownReason reason)
00175                 {
00176                         try {
00177                                 ibrcommon::MutexLock l(_sendlock);
00178                                 // send a SHUTDOWN message
00179                                 _stream << StreamDataSegment(reason) << std::flush;
00180                         } catch (const std::exception&) {
00181                                 // set failed bit
00182                                 set(STREAM_FAILED);
00183 
00184                                 throw StreamErrorException("can not send shutdown message");
00185                         }
00186                 }
00187 
00188                 void StreamConnection::StreamBuffer::keepalive()
00189                 {
00190                         try {
00191                                 ibrcommon::MutexLock l(_sendlock);
00192                                 _stream << StreamDataSegment() << std::flush;
00193                                 IBRCOMMON_LOGGER_DEBUG(15) << "KEEPALIVE sent" << IBRCOMMON_LOGGER_ENDL;
00194                         } catch (const std::exception&) {
00195                                 // set failed bit
00196                                 set(STREAM_FAILED);
00197                         }
00198                 }
00199 
00210                 size_t StreamConnection::StreamBuffer::timeout(size_t)
00211                 {
00212                         size_t in_timeout_value = 0;
00213                         {
00214                                 ibrcommon::MutexLock timerl(_timer_lock);
00215                                 _in_timeout_value--;
00216                                 in_timeout_value = _in_timeout_value;
00217                         }
00218 
00219                         if (in_timeout_value <= 0)
00220                         {
00221                                 IBRCOMMON_LOGGER_DEBUG(15) << "KEEPALIVE timeout reached -> shutdown connection" << IBRCOMMON_LOGGER_ENDL;
00222                                 _conn.shutdown(CONNECTION_SHUTDOWN_NODE_TIMEOUT);
00223                                 return 0;
00224                         }
00225 
00226                         return 1;
00227                 }
00228 
00229                 void StreamConnection::StreamBuffer::close()
00230                 {
00231                         // set shutdown bit
00232                         set(STREAM_SHUTDOWN);
00233                 }
00234 
00235                 void StreamConnection::StreamBuffer::shutdowntimers()
00236                 {
00237                         // stop all timer
00238                         _timer.remove();
00239                 }
00240 
00241                 void StreamConnection::StreamBuffer::reject()
00242                 {
00243                         // we have to reject the current transmission
00244                         // so we have to discard all all data until the next segment with a start bit
00245                         set(STREAM_REJECT);
00246 
00247                         // set the current in buffer to zero
00248                         // this should result in a underflow call on the next read
00249                         setg(0, 0, 0);
00250                 }
00251 
00252                 void StreamConnection::StreamBuffer::abort()
00253                 {
00254                         _segments.abort();
00255                 }
00256 
00257                 void StreamConnection::StreamBuffer::wait()
00258                 {
00259                         // TODO: get max time to wait out of the timeout values
00260                         size_t timeout = 0;
00261 
00262                         try {
00263                                 IBRCOMMON_LOGGER_DEBUG(15) << "waitCompleted(): wait for completion of transmission, " << _segments.size() << " ACKs left" << IBRCOMMON_LOGGER_ENDL;
00264                                 _segments.wait(ibrcommon::Queue<StreamDataSegment>::QUEUE_EMPTY, timeout);
00265                                 IBRCOMMON_LOGGER_DEBUG(15) << "waitCompleted(): transfer completed" << IBRCOMMON_LOGGER_ENDL;
00266                         } catch (const ibrcommon::QueueUnblockedException&) {
00267                                 IBRCOMMON_LOGGER_DEBUG(15) << "waitCompleted(): transfer aborted (timeout)" << IBRCOMMON_LOGGER_ENDL;
00268                         }
00269                 }
00270 
00271                 // This function is called when the output buffer is filled.
00272                 // In this function, the buffer should be written to wherever it should
00273                 // be written to (in this case, the streambuf object that this is controlling).
00274                 int StreamConnection::StreamBuffer::overflow(int c)
00275                 {
00276                         IBRCOMMON_LOGGER_DEBUG(90) << "StreamBuffer::overflow() called" << IBRCOMMON_LOGGER_ENDL;
00277 
00278                         try {
00279                                 char *ibegin = out_buf_;
00280                                 char *iend = pptr();
00281 
00282                                 // mark the buffer as free
00283                                 setp(out_buf_, out_buf_ + _buffer_size - 1);
00284 
00285                                 // append the last character
00286                                 if(!traits_type::eq_int_type(c, traits_type::eof())) {
00287                                         *iend++ = traits_type::to_char_type(c);
00288                                 }
00289 
00290                                 // if there is nothing to send, just return
00291                                 if ((iend - ibegin) == 0)
00292                                 {
00293                                         return traits_type::not_eof(c);
00294                                 }
00295 
00296                                 // wrap a segment around the data
00297                                 StreamDataSegment seg(StreamDataSegment::MSG_DATA_SEGMENT, (iend - ibegin));
00298 
00299                                 // set the start flag
00300                                 if (get(STREAM_SOB))
00301                                 {
00302                                         seg._flags |= StreamDataSegment::MSG_MARK_BEGINN;
00303                                         unset(STREAM_SKIP);
00304                                         unset(STREAM_SOB);
00305                                 }
00306 
00307                                 if (char_traits<char>::eq_int_type(c, char_traits<char>::eof()))
00308                                 {
00309                                         // set the end flag
00310                                         seg._flags |= StreamDataSegment::MSG_MARK_END;
00311                                         set(STREAM_SOB);
00312                                 }
00313 
00314                                 if (!get(STREAM_SKIP))
00315                                 {
00316                                         // put the segment into the queue
00317                                         if (get(STREAM_ACK_SUPPORT))
00318                                         {
00319                                                 _segments.push(seg);
00320                                         }
00321                                         else if (seg._flags & StreamDataSegment::MSG_MARK_END)
00322                                         {
00323                                                 // without ACK support we have to assume that a bundle is forwarded
00324                                                 // when the last segment is sent.
00325                                                 _conn.eventBundleForwarded();
00326                                         }
00327 
00328                                         ibrcommon::MutexLock l(_sendlock);
00329                                         if (!_stream.good()) throw StreamErrorException("stream went bad");
00330 
00331                                         // write the segment to the stream
00332                                         _stream << seg;
00333                                         _stream.write(out_buf_, seg._value);
00334                                 }
00335 
00336                                 return traits_type::not_eof(c);
00337                         } catch (const StreamClosedException&) {
00338                                 // set failed bit
00339                                 set(STREAM_FAILED);
00340 
00341                                 IBRCOMMON_LOGGER_DEBUG(10) << "StreamClosedException in overflow()" << IBRCOMMON_LOGGER_ENDL;
00342 
00343                                 throw;
00344                         } catch (const StreamErrorException&) {
00345                                 // set failed bit
00346                                 set(STREAM_FAILED);
00347 
00348                                 IBRCOMMON_LOGGER_DEBUG(10) << "StreamErrorException in overflow()" << IBRCOMMON_LOGGER_ENDL;
00349 
00350                                 throw;
00351                         } catch (const ios_base::failure&) {
00352                                 // set failed bit
00353                                 set(STREAM_FAILED);
00354 
00355                                 IBRCOMMON_LOGGER_DEBUG(10) << "ios_base::failure in overflow()" << IBRCOMMON_LOGGER_ENDL;
00356 
00357                                 throw;
00358                         }
00359 
00360                         return traits_type::eof();
00361                 }
00362 
00363                 // This is called to flush the buffer.
00364                 // This is called when we're done with the file stream (or when .flush() is called).
00365                 int StreamConnection::StreamBuffer::sync()
00366                 {
00367                         int ret = traits_type::eq_int_type(this->overflow(traits_type::eof()),
00368                                                                                         traits_type::eof()) ? -1 : 0;
00369 
00370                         try {
00371                                 ibrcommon::MutexLock l(_sendlock);
00372 
00373                                 // ... and flush.
00374                                 _stream.flush();
00375                         } catch (const ios_base::failure&) {
00376                                 // set failed bit
00377                                 set(STREAM_BAD);
00378 
00379                                 _conn.shutdown(CONNECTION_SHUTDOWN_ERROR);
00380                         }
00381 
00382                         return ret;
00383                 }
00384 
00385                 void StreamConnection::StreamBuffer::skipData(size_t &size)
00386                 {
00387                         // a temporary buffer
00388                         char tmpbuf[_buffer_size];
00389 
00390                         try {
00391                                 //  and read until the next segment
00392                                 while (size > 0 && _stream.good())
00393                                 {
00394                                         size_t readsize = _buffer_size;
00395                                         if (size < _buffer_size) readsize = size;
00396 
00397                                         // to reject a bundle read all remaining data of this segment
00398                                         _stream.read(tmpbuf, readsize);
00399 
00400                                         // adjust the remain counter
00401                                         size -= readsize;
00402 
00403                                         // reset the incoming timer
00404                                         {
00405                                                 ibrcommon::MutexLock timerl(_timer_lock);
00406                                                 _in_timeout_value = _in_timeout;
00407                                         }
00408                                 }
00409                         } catch (const ios_base::failure &ex) {
00410                                 _underflow_state = IDLE;
00411                                 throw StreamErrorException("read error during data skip: " + std::string(ex.what()));
00412                         }
00413                 }
00414 
00415                 // Fill the input buffer.  This reads out of the streambuf.
00416                 int StreamConnection::StreamBuffer::underflow()
00417                 {
00418                         IBRCOMMON_LOGGER_DEBUG(90) << "StreamBuffer::underflow() called" << IBRCOMMON_LOGGER_ENDL;
00419 
00420                         try {
00421                                 if (_underflow_state == DATA_TRANSFER)
00422                                 {
00423                                         // on bundle reject
00424                                         if (get(STREAM_REJECT))
00425                                         {
00426                                                 // send NACK on bundle reject
00427                                                 if (get(STREAM_NACK_SUPPORT))
00428                                                 {
00429                                                         ibrcommon::MutexLock l(_sendlock);
00430                                                         if (!_stream.good()) throw StreamErrorException("stream went bad");
00431 
00432                                                         // send a REFUSE message
00433                                                         _stream << StreamDataSegment(StreamDataSegment::MSG_REFUSE_BUNDLE) << std::flush;
00434                                                 }
00435 
00436                                                 // skip data in this segment
00437                                                 skipData(_underflow_data_remain);
00438 
00439                                                 // return to idle state
00440                                                 _underflow_state = IDLE;
00441                                         }
00442                                         // send ACK if the data segment is received completely
00443                                         else if (_underflow_data_remain == 0)
00444                                         {
00445                                                 // New data segment received. Send an ACK.
00446                                                 if (get(STREAM_ACK_SUPPORT))
00447                                                 {
00448                                                         ibrcommon::MutexLock l(_sendlock);
00449                                                         if (!_stream.good()) throw StreamErrorException("stream went bad");
00450                                                         _stream << StreamDataSegment(StreamDataSegment::MSG_ACK_SEGMENT, _recv_size) << std::flush;
00451                                                 }
00452 
00453                                                 // return to idle state
00454                                                 _underflow_state = IDLE;
00455                                         }
00456                                 }
00457 
00458                                 // read segments until DATA is AVAILABLE
00459                                 while (_underflow_state == IDLE)
00460                                 {
00461                                         // container for segment data
00462                                         dtn::streams::StreamDataSegment seg;
00463 
00464                                         try {
00465                                                 // read the segment
00466                                                 if (!_stream.good()) throw StreamErrorException("stream went bad");
00467 
00468                                                 _stream >> seg;
00469                                         } catch (const ios_base::failure &ex) {
00470                                                 throw StreamErrorException("read error: " + std::string(ex.what()));
00471                                         }
00472 
00473                                         // reset the incoming timer
00474                                         {
00475                                                 ibrcommon::MutexLock timerl(_timer_lock);
00476                                                 _in_timeout_value = _in_timeout;
00477                                         }
00478 
00479                                         switch (seg._type)
00480                                         {
00481                                                 case StreamDataSegment::MSG_DATA_SEGMENT:
00482                                                 {
00483                                                         IBRCOMMON_LOGGER_DEBUG(70) << "MSG_DATA_SEGMENT received, size: " << seg._value << IBRCOMMON_LOGGER_ENDL;
00484 
00485                                                         if (seg._flags & StreamDataSegment::MSG_MARK_BEGINN)
00486                                                         {
00487                                                                 _recv_size = seg._value;
00488                                                                 unset(STREAM_REJECT);
00489                                                         }
00490                                                         else
00491                                                         {
00492                                                                 _recv_size += seg._value;
00493                                                         }
00494 
00495                                                         // set the new data length
00496                                                         _underflow_data_remain = seg._value;
00497 
00498                                                         if (get(STREAM_REJECT))
00499                                                         {
00500                                                                 // send NACK on bundle reject
00501                                                                 if (get(STREAM_NACK_SUPPORT))
00502                                                                 {
00503                                                                         // lock for sending
00504                                                                         ibrcommon::MutexLock l(_sendlock);
00505                                                                         if (!_stream.good()) throw StreamErrorException("stream went bad");
00506 
00507                                                                         // send a NACK message
00508                                                                         _stream << StreamDataSegment(StreamDataSegment::MSG_REFUSE_BUNDLE, 0) << std::flush;
00509                                                                 }
00510 
00511                                                                 // skip data in this segment
00512                                                                 skipData(_underflow_data_remain);
00513                                                         }
00514                                                         else
00515                                                         {
00516                                                                 // announce the new data block
00517                                                                 _underflow_state = DATA_TRANSFER;
00518                                                         }
00519                                                         break;
00520                                                 }
00521 
00522                                                 case StreamDataSegment::MSG_ACK_SEGMENT:
00523                                                 {
00524                                                         IBRCOMMON_LOGGER_DEBUG(70) << "MSG_ACK_SEGMENT received, size: " << seg._value << IBRCOMMON_LOGGER_ENDL;
00525 
00526                                                         // remove the segment in the queue
00527                                                         if (get(STREAM_ACK_SUPPORT))
00528                                                         {
00529                                                                 ibrcommon::Queue<StreamDataSegment>::Locked q = _segments.exclusive();
00530                                                                 if (q.empty())
00531                                                                 {
00532                                                                         IBRCOMMON_LOGGER(error) << "got an unexpected ACK with size of " << seg._value << IBRCOMMON_LOGGER_ENDL;
00533                                                                 }
00534                                                                 else
00535                                                                 {
00536                                                                         StreamDataSegment &qs = q.front();
00537 
00538                                                                         if (qs._flags & StreamDataSegment::MSG_MARK_END)
00539                                                                         {
00540                                                                                 _conn.eventBundleForwarded();
00541                                                                         }
00542 
00543                                                                         IBRCOMMON_LOGGER_DEBUG(60) << q.size() << " elements to ACK" << IBRCOMMON_LOGGER_ENDL;
00544 
00545                                                                         _conn.eventBundleAck(seg._value);
00546 
00547                                                                         q.pop();
00548                                                                 }
00549                                                         }
00550                                                         break;
00551                                                 }
00552 
00553                                                 case StreamDataSegment::MSG_KEEPALIVE:
00554                                                         IBRCOMMON_LOGGER_DEBUG(70) << "MSG_KEEPALIVE received, size: " << seg._value << IBRCOMMON_LOGGER_ENDL;
00555                                                         break;
00556 
00557                                                 case StreamDataSegment::MSG_REFUSE_BUNDLE:
00558                                                 {
00559                                                         IBRCOMMON_LOGGER_DEBUG(70) << "MSG_REFUSE_BUNDLE received, flags: " << seg._flags << IBRCOMMON_LOGGER_ENDL;
00560 
00561                                                         // TODO: Test bundle rejection!
00562 
00563                                                         // remove the segment in the queue
00564                                                         if (get(STREAM_ACK_SUPPORT) && get(STREAM_NACK_SUPPORT))
00565                                                         {
00566                                                                 // skip segments
00567                                                                 if (!_rejected_segments.empty())
00568                                                                 {
00569                                                                         _rejected_segments.pop();
00570 
00571                                                                         // we received a NACK
00572                                                                         IBRCOMMON_LOGGER_DEBUG(30) << "NACK received, still " << _rejected_segments.size() << " segments to NACK" << IBRCOMMON_LOGGER_ENDL;
00573                                                                 }
00574                                                                 else try
00575                                                                 {
00576                                                                         StreamDataSegment qs = _segments.getnpop();
00577 
00578                                                                         // we received a NACK
00579                                                                         IBRCOMMON_LOGGER_DEBUG(20) << "NACK received!" << IBRCOMMON_LOGGER_ENDL;
00580 
00581                                                                         // get all segment ACKs in the queue for this transmission
00582                                                                         while (!_segments.empty())
00583                                                                         {
00584                                                                                 StreamDataSegment &seg = _segments.front();
00585                                                                                 if (seg._flags & StreamDataSegment::MSG_MARK_BEGINN)
00586                                                                                 {
00587                                                                                         break;
00588                                                                                 }
00589 
00590                                                                                 // move the segments to another queue
00591                                                                                 _rejected_segments.push(seg);
00592                                                                                 _segments.pop();
00593                                                                         }
00594 
00595                                                                         // call event reject
00596                                                                         _conn.eventBundleRefused();
00597 
00598                                                                         // we received a NACK
00599                                                                         IBRCOMMON_LOGGER_DEBUG(30) << _rejected_segments.size() << " segments to NACK" << IBRCOMMON_LOGGER_ENDL;
00600 
00601                                                                         // the queue is empty, then skip the current transfer
00602                                                                         if (_segments.empty())
00603                                                                         {
00604                                                                                 set(STREAM_SKIP);
00605 
00606                                                                                 // we received a NACK
00607                                                                                 IBRCOMMON_LOGGER_DEBUG(25) << "skip the current transfer" << IBRCOMMON_LOGGER_ENDL;
00608                                                                         }
00609 
00610                                                                 } catch (const ibrcommon::QueueUnblockedException&) {
00611                                                                         IBRCOMMON_LOGGER(error) << "got an unexpected NACK" << IBRCOMMON_LOGGER_ENDL;
00612                                                                 }
00613 
00614                                                         }
00615                                                         else
00616                                                         {
00617                                                                 IBRCOMMON_LOGGER(error) << "got an unexpected NACK" << IBRCOMMON_LOGGER_ENDL;
00618                                                         }
00619 
00620                                                         break;
00621                                                 }
00622 
00623                                                 case StreamDataSegment::MSG_SHUTDOWN:
00624                                                 {
00625                                                         IBRCOMMON_LOGGER_DEBUG(70) << "MSG_SHUTDOWN received" << IBRCOMMON_LOGGER_ENDL;
00626                                                         throw StreamShutdownException();
00627                                                 }
00628                                         }
00629                                 }
00630 
00631                                 // currently transferring data
00632                                 size_t readsize = _buffer_size;
00633                                 if (_underflow_data_remain < _buffer_size) readsize = _underflow_data_remain;
00634 
00635                                 try {
00636                                         if (!_stream.good()) throw StreamErrorException("stream went bad");
00637 
00638                                         // here receive the data
00639                                         _stream.read(in_buf_, readsize);
00640                                 } catch (const ios_base::failure &ex) {
00641                                         _underflow_state = IDLE;
00642                                         throw StreamErrorException("read error: " + std::string(ex.what()));
00643                                 }
00644 
00645                                 // adjust the remain counter
00646                                 _underflow_data_remain -= readsize;
00647 
00648                                 // Since the input buffer content is now valid (or is new)
00649                                 // the get pointer should be initialized (or reset).
00650                                 setg(in_buf_, in_buf_, in_buf_ + readsize);
00651 
00652                                 return traits_type::not_eof(in_buf_[0]);
00653 
00654                         } catch (const StreamClosedException&) {
00655                                 // set failed bit
00656                                 set(STREAM_FAILED);
00657 
00658                                 IBRCOMMON_LOGGER_DEBUG(10) << "StreamClosedException in underflow()" << IBRCOMMON_LOGGER_ENDL;
00659 
00660                         } catch (const StreamErrorException &ex) {
00661                                 // set failed bit
00662                                 set(STREAM_FAILED);
00663 
00664                                 IBRCOMMON_LOGGER_DEBUG(10) << "StreamErrorException in underflow(): " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00665 
00666                                 throw;
00667                         } catch (const StreamShutdownException&) {
00668                                 // set failed bit
00669                                 set(STREAM_FAILED);
00670 
00671                                 IBRCOMMON_LOGGER_DEBUG(10) << "StreamShutdownException in underflow()" << IBRCOMMON_LOGGER_ENDL;
00672                         }
00673 
00674                         return traits_type::eof();
00675                 }
00676         }
00677 }

Generated on Wed Mar 30 2011 11:11:49 for IBR-DTNSuite by  doxygen 1.7.1