IBR-DTNSuite 0.6

ibrdtn/ibrdtn/streams/StreamBuffer.cpp

Go to the documentation of this file.
00001 /*
00002  * bpstreambuf.cpp
00003  *
00004  *  Created on: 14.07.2009
00005  *      Author: morgenro
00006  */
00007 
00008 #include "ibrdtn/streams/StreamConnection.h"
00009 #include <ibrcommon/Logger.h>
00010 #include <ibrcommon/TimeMeasurement.h>
00011 
00012 namespace dtn
00013 {
00014         namespace streams
00015         {
00016                 StreamConnection::StreamBuffer::StreamBuffer(StreamConnection &conn, iostream &stream, const size_t buffer_size)
00017                         : _buffer_size(buffer_size), _statebits(STREAM_SOB), _conn(conn), in_buf_(new char[buffer_size]), out_buf_(new char[buffer_size]), _stream(stream),
00018                           _recv_size(0), _underflow_data_remain(0), _underflow_state(IDLE)
00019                 {
00020                         // Initialize get pointer.  This should be zero so that underflow is called upon first read.
00021                         setg(0, 0, 0);
00022                         setp(out_buf_, out_buf_ + _buffer_size - 1);
00023                 }
00024 
00025                 StreamConnection::StreamBuffer::~StreamBuffer()
00026                 {
00027                         // clear the own buffer
00028                         delete [] in_buf_;
00029                         delete [] out_buf_;
00030                 }
00031 
00032                 bool StreamConnection::StreamBuffer::get(const StateBits bit) const
00033                 {
00034                         return (_statebits & bit);
00035                 }
00036 
00037                 void StreamConnection::StreamBuffer::set(const StateBits bit)
00038                 {
00039                         ibrcommon::MutexLock l(_statelock);
00040                         _statebits |= bit;
00041                 }
00042 
00043                 void StreamConnection::StreamBuffer::unset(const StateBits bit)
00044                 {
00045                         ibrcommon::MutexLock l(_statelock);
00046                         _statebits &= ~(bit);
00047                 }
00048 
00049                 void StreamConnection::StreamBuffer::__error() const
00050                 {
00051                         IBRCOMMON_LOGGER_DEBUG(80) << "StreamBuffer Debugging" << IBRCOMMON_LOGGER_ENDL;
00052                         IBRCOMMON_LOGGER_DEBUG(80) << "---------------------------------------" << IBRCOMMON_LOGGER_ENDL;
00053                         IBRCOMMON_LOGGER_DEBUG(80) << "Buffer size: " << _buffer_size << IBRCOMMON_LOGGER_ENDL;
00054                         IBRCOMMON_LOGGER_DEBUG(80) << "State bits: " << _statebits << IBRCOMMON_LOGGER_ENDL;
00055                         IBRCOMMON_LOGGER_DEBUG(80) << "Recv size: " << _recv_size << IBRCOMMON_LOGGER_ENDL;
00056                         IBRCOMMON_LOGGER_DEBUG(80) << "Segments: " << _segments.size() << IBRCOMMON_LOGGER_ENDL;
00057                         IBRCOMMON_LOGGER_DEBUG(80) << "Reject segments: " << _rejected_segments.size() << IBRCOMMON_LOGGER_ENDL;
00058                         IBRCOMMON_LOGGER_DEBUG(80) << "Underflow remaining: " << _underflow_data_remain << IBRCOMMON_LOGGER_ENDL;
00059                         IBRCOMMON_LOGGER_DEBUG(80) << "Underflow state: " << _underflow_state << IBRCOMMON_LOGGER_ENDL;
00060                         IBRCOMMON_LOGGER_DEBUG(80) << "---------------------------------------" << IBRCOMMON_LOGGER_ENDL;
00061 
00062                         if (_statebits & STREAM_FAILED)
00063                         {
00064                                 IBRCOMMON_LOGGER_DEBUG(80) << "stream went bad: STREAM_FAILED is set" << IBRCOMMON_LOGGER_ENDL;
00065                         }
00066 
00067                         if (_statebits & STREAM_BAD)
00068                         {
00069                                 IBRCOMMON_LOGGER_DEBUG(80) << "stream went bad: STREAM_BAD is set" << IBRCOMMON_LOGGER_ENDL;
00070                         }
00071 
00072                         if (_statebits & STREAM_EOF)
00073                         {
00074                                 IBRCOMMON_LOGGER_DEBUG(80) << "stream went bad: STREAM_EOF is set" << IBRCOMMON_LOGGER_ENDL;
00075                         }
00076 
00077                         if (_statebits & STREAM_SHUTDOWN)
00078                         {
00079                                 IBRCOMMON_LOGGER_DEBUG(80) << "stream went bad: STREAM_SHUTDOWN is set" << IBRCOMMON_LOGGER_ENDL;
00080                         }
00081 
00082                         if (_statebits & STREAM_CLOSED)
00083                         {
00084                                 IBRCOMMON_LOGGER_DEBUG(80) << "stream went bad: STREAM_CLOSED is set" << IBRCOMMON_LOGGER_ENDL;
00085                         }
00086 
00087                         if (!_stream.good())
00088                         {
00089                                 IBRCOMMON_LOGGER_DEBUG(80) << "stream went bad: good() returned false" << IBRCOMMON_LOGGER_ENDL;
00090                         }
00091                 }
00092 
00093                 bool StreamConnection::StreamBuffer::__good() const
00094                 {
00095                         int badbits = STREAM_FAILED + STREAM_BAD + STREAM_EOF + STREAM_SHUTDOWN + STREAM_CLOSED;
00096                         return !(badbits & _statebits);
00097                 }
00098 
00107                 const StreamContactHeader StreamConnection::StreamBuffer::handshake(const StreamContactHeader &header)
00108                 {
00109                         try {
00110                                 // make the send-call atomic
00111                                 {
00112                                         ibrcommon::MutexLock l(_sendlock);
00113 
00114                                         // transfer the local header
00115                                         _stream << header << std::flush;
00116                                 }
00117 
00118                                 // receive the remote header
00119                                 StreamContactHeader peer;
00120                                 _stream >> peer;
00121 
00122                                 // enable/disable ACK/NACK support
00123                                 if (peer._flags & StreamContactHeader::REQUEST_ACKNOWLEDGMENTS) set(STREAM_ACK_SUPPORT);
00124                                 if (peer._flags & StreamContactHeader::REQUEST_NEGATIVE_ACKNOWLEDGMENTS) set(STREAM_NACK_SUPPORT);
00125 
00126                                 // set the incoming timer if set (> 0)
00127                                 if (peer._keepalive > 0)
00128                                 {
00129                                         // mark timer support
00130                                         set(STREAM_TIMER_SUPPORT);
00131                                 }
00132 
00133                                 // set handshake completed bit
00134                                 set(STREAM_HANDSHAKE);
00135 
00136                                 // return the received header
00137                                 return peer;
00138 
00139                         } catch (const std::exception&) {
00140                                 // set failed bit
00141                                 set(STREAM_FAILED);
00142 
00143                                 // shutdown the stream
00144                                 shutdown(StreamDataSegment::MSG_SHUTDOWN_VERSION_MISSMATCH);
00145 
00146                                 // call the shutdown event
00147                                 _conn.shutdown(CONNECTION_SHUTDOWN_ERROR);
00148 
00149                                 // forward the catched exception
00150                                 throw StreamErrorException("handshake not completed");
00151                         }
00152                 }
00153 
00159                 void StreamConnection::StreamBuffer::shutdown(const StreamDataSegment::ShutdownReason reason)
00160                 {
00161                         try {
00162                                 ibrcommon::MutexLock l(_sendlock);
00163                                 // send a SHUTDOWN message
00164                                 _stream << StreamDataSegment(reason) << std::flush;
00165                         } catch (const std::exception&) {
00166                                 // set failed bit
00167                                 set(STREAM_FAILED);
00168 
00169                                 throw StreamErrorException("can not send shutdown message");
00170                         }
00171                 }
00172 
00173                 void StreamConnection::StreamBuffer::keepalive()
00174                 {
00175                         try {
00176                                 ibrcommon::MutexLock l(_sendlock);
00177                                 _stream << StreamDataSegment() << std::flush;
00178                                 IBRCOMMON_LOGGER_DEBUG(15) << "KEEPALIVE sent" << IBRCOMMON_LOGGER_ENDL;
00179                         } catch (const std::exception&) {
00180                                 // set failed bit
00181                                 set(STREAM_FAILED);
00182                         }
00183                 }
00184 
00185                 void StreamConnection::StreamBuffer::close()
00186                 {
00187                         // set shutdown bit
00188                         set(STREAM_SHUTDOWN);
00189                 }
00190 
00191                 void StreamConnection::StreamBuffer::reject()
00192                 {
00193                         // we have to reject the current transmission
00194                         // so we have to discard all all data until the next segment with a start bit
00195                         set(STREAM_REJECT);
00196 
00197                         // set the current in buffer to zero
00198                         // this should result in a underflow call on the next read
00199                         setg(0, 0, 0);
00200                 }
00201 
00202                 void StreamConnection::StreamBuffer::abort()
00203                 {
00204                         _segments.abort();
00205                 }
00206 
00207                 void StreamConnection::StreamBuffer::wait()
00208                 {
00209                         // TODO: get max time to wait out of the timeout values
00210                         size_t timeout = 0;
00211 
00212                         try {
00213                                 IBRCOMMON_LOGGER_DEBUG(15) << "waitCompleted(): wait for completion of transmission, " << _segments.size() << " ACKs left" << IBRCOMMON_LOGGER_ENDL;
00214                                 _segments.wait(ibrcommon::Queue<StreamDataSegment>::QUEUE_EMPTY, timeout);
00215                                 IBRCOMMON_LOGGER_DEBUG(15) << "waitCompleted(): transfer completed" << IBRCOMMON_LOGGER_ENDL;
00216                         } catch (const ibrcommon::QueueUnblockedException&) {
00217                                 IBRCOMMON_LOGGER_DEBUG(15) << "waitCompleted(): transfer aborted (timeout)" << IBRCOMMON_LOGGER_ENDL;
00218                         }
00219                 }
00220 
00221                 // This function is called when the output buffer is filled.
00222                 // In this function, the buffer should be written to wherever it should
00223                 // be written to (in this case, the streambuf object that this is controlling).
00224                 int StreamConnection::StreamBuffer::overflow(int c)
00225                 {
00226                         IBRCOMMON_LOGGER_DEBUG(90) << "StreamBuffer::overflow() called" << IBRCOMMON_LOGGER_ENDL;
00227 
00228                         try {
00229                                 char *ibegin = out_buf_;
00230                                 char *iend = pptr();
00231 
00232                                 // mark the buffer as free
00233                                 setp(out_buf_, out_buf_ + _buffer_size - 1);
00234 
00235                                 // append the last character
00236                                 if(!traits_type::eq_int_type(c, traits_type::eof())) {
00237                                         *iend++ = traits_type::to_char_type(c);
00238                                 }
00239 
00240                                 // if there is nothing to send, just return
00241                                 if ((iend - ibegin) == 0)
00242                                 {
00243                                         return traits_type::not_eof(c);
00244                                 }
00245 
00246                                 // wrap a segment around the data
00247                                 StreamDataSegment seg(StreamDataSegment::MSG_DATA_SEGMENT, (iend - ibegin));
00248 
00249                                 // set the start flag
00250                                 if (get(STREAM_SOB))
00251                                 {
00252                                         seg._flags |= StreamDataSegment::MSG_MARK_BEGINN;
00253                                         unset(STREAM_SKIP);
00254                                         unset(STREAM_SOB);
00255                                 }
00256 
00257                                 if (char_traits<char>::eq_int_type(c, char_traits<char>::eof()))
00258                                 {
00259                                         // set the end flag
00260                                         seg._flags |= StreamDataSegment::MSG_MARK_END;
00261                                         set(STREAM_SOB);
00262                                 }
00263 
00264                                 if (!get(STREAM_SKIP))
00265                                 {
00266                                         // put the segment into the queue
00267                                         if (get(STREAM_ACK_SUPPORT))
00268                                         {
00269                                                 _segments.push(seg);
00270                                         }
00271                                         else if (seg._flags & StreamDataSegment::MSG_MARK_END)
00272                                         {
00273                                                 // without ACK support we have to assume that a bundle is forwarded
00274                                                 // when the last segment is sent.
00275                                                 _conn.eventBundleForwarded();
00276                                         }
00277 
00278                                         ibrcommon::MutexLock l(_sendlock);
00279                                         if (!_stream.good()) throw StreamErrorException("stream went bad");
00280 
00281                                         // write the segment to the stream
00282                                         _stream << seg;
00283                                         _stream.write(out_buf_, seg._value);
00284                                 }
00285 
00286                                 return traits_type::not_eof(c);
00287                         } catch (const StreamClosedException&) {
00288                                 // set failed bit
00289                                 set(STREAM_FAILED);
00290 
00291                                 IBRCOMMON_LOGGER_DEBUG(10) << "StreamClosedException in overflow()" << IBRCOMMON_LOGGER_ENDL;
00292 
00293                                 throw;
00294                         } catch (const StreamErrorException&) {
00295                                 // set failed bit
00296                                 set(STREAM_FAILED);
00297 
00298                                 IBRCOMMON_LOGGER_DEBUG(10) << "StreamErrorException in overflow()" << IBRCOMMON_LOGGER_ENDL;
00299 
00300                                 throw;
00301                         } catch (const ios_base::failure&) {
00302                                 // set failed bit
00303                                 set(STREAM_FAILED);
00304 
00305                                 IBRCOMMON_LOGGER_DEBUG(10) << "ios_base::failure in overflow()" << IBRCOMMON_LOGGER_ENDL;
00306 
00307                                 throw;
00308                         }
00309 
00310                         return traits_type::eof();
00311                 }
00312 
00313                 // This is called to flush the buffer.
00314                 // This is called when we're done with the file stream (or when .flush() is called).
00315                 int StreamConnection::StreamBuffer::sync()
00316                 {
00317                         int ret = traits_type::eq_int_type(this->overflow(traits_type::eof()),
00318                                                                                         traits_type::eof()) ? -1 : 0;
00319 
00320                         try {
00321                                 ibrcommon::MutexLock l(_sendlock);
00322 
00323                                 // ... and flush.
00324                                 _stream.flush();
00325                         } catch (const ios_base::failure&) {
00326                                 // set failed bit
00327                                 set(STREAM_BAD);
00328 
00329                                 _conn.shutdown(CONNECTION_SHUTDOWN_ERROR);
00330                         }
00331 
00332                         return ret;
00333                 }
00334 
00335                 void StreamConnection::StreamBuffer::skipData(size_t &size)
00336                 {
00337                         // a temporary buffer
00338                         char tmpbuf[_buffer_size];
00339 
00340                         try {
00341                                 //  and read until the next segment
00342                                 while (size > 0 && _stream.good())
00343                                 {
00344                                         size_t readsize = _buffer_size;
00345                                         if (size < _buffer_size) readsize = size;
00346 
00347                                         // to reject a bundle read all remaining data of this segment
00348                                         _stream.read(tmpbuf, readsize);
00349 
00350                                         // adjust the remain counter
00351                                         size -= readsize;
00352                                 }
00353                         } catch (const ios_base::failure &ex) {
00354                                 _underflow_state = IDLE;
00355                                 throw StreamErrorException("read error during data skip: " + std::string(ex.what()));
00356                         }
00357                 }
00358 
00359                 // Fill the input buffer.  This reads out of the streambuf.
00360                 int StreamConnection::StreamBuffer::underflow()
00361                 {
00362                         IBRCOMMON_LOGGER_DEBUG(90) << "StreamBuffer::underflow() called" << IBRCOMMON_LOGGER_ENDL;
00363 
00364                         try {
00365                                 if (_underflow_state == DATA_TRANSFER)
00366                                 {
00367                                         // on bundle reject
00368                                         if (get(STREAM_REJECT))
00369                                         {
00370                                                 // send NACK on bundle reject
00371                                                 if (get(STREAM_NACK_SUPPORT))
00372                                                 {
00373                                                         ibrcommon::MutexLock l(_sendlock);
00374                                                         if (!_stream.good()) throw StreamErrorException("stream went bad");
00375 
00376                                                         // send a REFUSE message
00377                                                         _stream << StreamDataSegment(StreamDataSegment::MSG_REFUSE_BUNDLE) << std::flush;
00378                                                 }
00379 
00380                                                 // skip data in this segment
00381                                                 skipData(_underflow_data_remain);
00382 
00383                                                 // return to idle state
00384                                                 _underflow_state = IDLE;
00385                                         }
00386                                         // send ACK if the data segment is received completely
00387                                         else if (_underflow_data_remain == 0)
00388                                         {
00389                                                 // New data segment received. Send an ACK.
00390                                                 if (get(STREAM_ACK_SUPPORT))
00391                                                 {
00392                                                         ibrcommon::MutexLock l(_sendlock);
00393                                                         if (!_stream.good()) throw StreamErrorException("stream went bad");
00394                                                         _stream << StreamDataSegment(StreamDataSegment::MSG_ACK_SEGMENT, _recv_size) << std::flush;
00395                                                 }
00396 
00397                                                 // return to idle state
00398                                                 _underflow_state = IDLE;
00399                                         }
00400                                 }
00401 
00402                                 // read segments until DATA is AVAILABLE
00403                                 while (_underflow_state == IDLE)
00404                                 {
00405                                         // container for segment data
00406                                         dtn::streams::StreamDataSegment seg;
00407 
00408                                         try {
00409                                                 // read the segment
00410                                                 if (!_stream.good()) throw StreamErrorException("stream went bad");
00411 
00412                                                 _stream >> seg;
00413                                         } catch (const ios_base::failure &ex) {
00414                                                 throw StreamErrorException("read error: " + std::string(ex.what()));
00415                                         }
00416 
00417                                         switch (seg._type)
00418                                         {
00419                                                 case StreamDataSegment::MSG_DATA_SEGMENT:
00420                                                 {
00421                                                         IBRCOMMON_LOGGER_DEBUG(70) << "MSG_DATA_SEGMENT received, size: " << seg._value << IBRCOMMON_LOGGER_ENDL;
00422 
00423                                                         if (seg._flags & StreamDataSegment::MSG_MARK_BEGINN)
00424                                                         {
00425                                                                 _recv_size = seg._value;
00426                                                                 unset(STREAM_REJECT);
00427                                                         }
00428                                                         else
00429                                                         {
00430                                                                 _recv_size += seg._value;
00431                                                         }
00432 
00433                                                         // set the new data length
00434                                                         _underflow_data_remain = seg._value;
00435 
00436                                                         if (get(STREAM_REJECT))
00437                                                         {
00438                                                                 // send NACK on bundle reject
00439                                                                 if (get(STREAM_NACK_SUPPORT))
00440                                                                 {
00441                                                                         // lock for sending
00442                                                                         ibrcommon::MutexLock l(_sendlock);
00443                                                                         if (!_stream.good()) throw StreamErrorException("stream went bad");
00444 
00445                                                                         // send a NACK message
00446                                                                         _stream << StreamDataSegment(StreamDataSegment::MSG_REFUSE_BUNDLE, 0) << std::flush;
00447                                                                 }
00448 
00449                                                                 // skip data in this segment
00450                                                                 skipData(_underflow_data_remain);
00451                                                         }
00452                                                         else
00453                                                         {
00454                                                                 // announce the new data block
00455                                                                 _underflow_state = DATA_TRANSFER;
00456                                                         }
00457                                                         break;
00458                                                 }
00459 
00460                                                 case StreamDataSegment::MSG_ACK_SEGMENT:
00461                                                 {
00462                                                         IBRCOMMON_LOGGER_DEBUG(70) << "MSG_ACK_SEGMENT received, size: " << seg._value << IBRCOMMON_LOGGER_ENDL;
00463 
00464                                                         // remove the segment in the queue
00465                                                         if (get(STREAM_ACK_SUPPORT))
00466                                                         {
00467                                                                 ibrcommon::Queue<StreamDataSegment>::Locked q = _segments.exclusive();
00468                                                                 if (q.empty())
00469                                                                 {
00470                                                                         IBRCOMMON_LOGGER(error) << "got an unexpected ACK with size of " << seg._value << IBRCOMMON_LOGGER_ENDL;
00471                                                                 }
00472                                                                 else
00473                                                                 {
00474                                                                         StreamDataSegment &qs = q.front();
00475 
00476                                                                         if (qs._flags & StreamDataSegment::MSG_MARK_END)
00477                                                                         {
00478                                                                                 _conn.eventBundleForwarded();
00479                                                                         }
00480 
00481                                                                         IBRCOMMON_LOGGER_DEBUG(60) << q.size() << " elements to ACK" << IBRCOMMON_LOGGER_ENDL;
00482 
00483                                                                         _conn.eventBundleAck(seg._value);
00484 
00485                                                                         q.pop();
00486                                                                 }
00487                                                         }
00488                                                         break;
00489                                                 }
00490 
00491                                                 case StreamDataSegment::MSG_KEEPALIVE:
00492                                                         IBRCOMMON_LOGGER_DEBUG(70) << "MSG_KEEPALIVE received, size: " << seg._value << IBRCOMMON_LOGGER_ENDL;
00493                                                         break;
00494 
00495                                                 case StreamDataSegment::MSG_REFUSE_BUNDLE:
00496                                                 {
00497                                                         IBRCOMMON_LOGGER_DEBUG(70) << "MSG_REFUSE_BUNDLE received, flags: " << seg._flags << IBRCOMMON_LOGGER_ENDL;
00498 
00499                                                         // TODO: Test bundle rejection!
00500 
00501                                                         // remove the segment in the queue
00502                                                         if (get(STREAM_ACK_SUPPORT) && get(STREAM_NACK_SUPPORT))
00503                                                         {
00504                                                                 // skip segments
00505                                                                 if (!_rejected_segments.empty())
00506                                                                 {
00507                                                                         _rejected_segments.pop();
00508 
00509                                                                         // we received a NACK
00510                                                                         IBRCOMMON_LOGGER_DEBUG(30) << "NACK received, still " << _rejected_segments.size() << " segments to NACK" << IBRCOMMON_LOGGER_ENDL;
00511                                                                 }
00512                                                                 else try
00513                                                                 {
00514                                                                         StreamDataSegment qs = _segments.getnpop();
00515 
00516                                                                         // we received a NACK
00517                                                                         IBRCOMMON_LOGGER_DEBUG(20) << "NACK received!" << IBRCOMMON_LOGGER_ENDL;
00518 
00519                                                                         // get all segment ACKs in the queue for this transmission
00520                                                                         while (!_segments.empty())
00521                                                                         {
00522                                                                                 StreamDataSegment &seg = _segments.front();
00523                                                                                 if (seg._flags & StreamDataSegment::MSG_MARK_BEGINN)
00524                                                                                 {
00525                                                                                         break;
00526                                                                                 }
00527 
00528                                                                                 // move the segments to another queue
00529                                                                                 _rejected_segments.push(seg);
00530                                                                                 _segments.pop();
00531                                                                         }
00532 
00533                                                                         // call event reject
00534                                                                         _conn.eventBundleRefused();
00535 
00536                                                                         // we received a NACK
00537                                                                         IBRCOMMON_LOGGER_DEBUG(30) << _rejected_segments.size() << " segments to NACK" << IBRCOMMON_LOGGER_ENDL;
00538 
00539                                                                         // the queue is empty, then skip the current transfer
00540                                                                         if (_segments.empty())
00541                                                                         {
00542                                                                                 set(STREAM_SKIP);
00543 
00544                                                                                 // we received a NACK
00545                                                                                 IBRCOMMON_LOGGER_DEBUG(25) << "skip the current transfer" << IBRCOMMON_LOGGER_ENDL;
00546                                                                         }
00547 
00548                                                                 } catch (const ibrcommon::QueueUnblockedException&) {
00549                                                                         IBRCOMMON_LOGGER(error) << "got an unexpected NACK" << IBRCOMMON_LOGGER_ENDL;
00550                                                                 }
00551 
00552                                                         }
00553                                                         else
00554                                                         {
00555                                                                 IBRCOMMON_LOGGER(error) << "got an unexpected NACK" << IBRCOMMON_LOGGER_ENDL;
00556                                                         }
00557 
00558                                                         break;
00559                                                 }
00560 
00561                                                 case StreamDataSegment::MSG_SHUTDOWN:
00562                                                 {
00563                                                         IBRCOMMON_LOGGER_DEBUG(70) << "MSG_SHUTDOWN received" << IBRCOMMON_LOGGER_ENDL;
00564                                                         throw StreamShutdownException();
00565                                                 }
00566                                         }
00567                                 }
00568 
00569                                 // currently transferring data
00570                                 size_t readsize = _buffer_size;
00571                                 if (_underflow_data_remain < _buffer_size) readsize = _underflow_data_remain;
00572 
00573                                 try {
00574                                         if (!_stream.good()) throw StreamErrorException("stream went bad");
00575 
00576                                         // here receive the data
00577                                         _stream.read(in_buf_, readsize);
00578                                 } catch (const ios_base::failure &ex) {
00579                                         _underflow_state = IDLE;
00580                                         throw StreamErrorException("read error: " + std::string(ex.what()));
00581                                 }
00582 
00583                                 // adjust the remain counter
00584                                 _underflow_data_remain -= readsize;
00585 
00586                                 // Since the input buffer content is now valid (or is new)
00587                                 // the get pointer should be initialized (or reset).
00588                                 setg(in_buf_, in_buf_, in_buf_ + readsize);
00589 
00590                                 return traits_type::not_eof(in_buf_[0]);
00591 
00592                         } catch (const StreamClosedException&) {
00593                                 // set failed bit
00594                                 set(STREAM_FAILED);
00595 
00596                                 IBRCOMMON_LOGGER_DEBUG(10) << "StreamClosedException in underflow()" << IBRCOMMON_LOGGER_ENDL;
00597 
00598                         } catch (const StreamErrorException &ex) {
00599                                 // set failed bit
00600                                 set(STREAM_FAILED);
00601 
00602                                 IBRCOMMON_LOGGER_DEBUG(10) << "StreamErrorException in underflow(): " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00603 
00604                                 throw;
00605                         } catch (const StreamShutdownException&) {
00606                                 // set failed bit
00607                                 set(STREAM_FAILED);
00608 
00609                                 IBRCOMMON_LOGGER_DEBUG(10) << "StreamShutdownException in underflow()" << IBRCOMMON_LOGGER_ENDL;
00610                         }
00611 
00612                         return traits_type::eof();
00613                 }
00614         }
00615 }