• Main Page
  • Namespaces
  • Classes
  • Files
  • File List
  • File Members

ibrdtn/ibrdtn/streams/StreamConnection.h

Go to the documentation of this file.
00001 /*
00002  * StreamConnection.h
00003  *
00004  *  Created on: 01.07.2009
00005  *      Author: morgenro
00006  */
00007 
00008 #ifndef STREAMCONNECTION_H_
00009 #define STREAMCONNECTION_H_
00010 
00011 
00012 #include "ibrdtn/data/Bundle.h"
00013 #include "ibrdtn/data/Exceptions.h"
00014 #include "ibrdtn/streams/StreamContactHeader.h"
00015 #include "ibrdtn/streams/StreamDataSegment.h"
00016 #include <ibrcommon/thread/Mutex.h>
00017 #include <ibrcommon/thread/MutexLock.h>
00018 #include <ibrcommon/thread/Timer.h>
00019 #include <ibrcommon/Exceptions.h>
00020 #include <ibrcommon/thread/Queue.h>
00021 #include <iostream>
00022 #include <streambuf>
00023 
00024 namespace dtn
00025 {
00026         namespace streams
00027         {
00028                 class StreamConnection : public iostream
00029                 {
00030                 public:
00031                         enum ConnectionShutdownCases
00032                         {
00033                                 CONNECTION_SHUTDOWN_NOTSET = 0,
00034                                 CONNECTION_SHUTDOWN_IDLE = 1,
00035                                 CONNECTION_SHUTDOWN_ERROR = 2,
00036                                 CONNECTION_SHUTDOWN_SIMPLE_SHUTDOWN = 3,
00037                                 CONNECTION_SHUTDOWN_NODE_TIMEOUT = 4,
00038                                 CONNECTION_SHUTDOWN_PEER_SHUTDOWN = 5
00039                         };
00040 
00041                         class TransmissionInterruptedException : public ibrcommon::IOException
00042                         {
00043                                 public:
00044                                         TransmissionInterruptedException(const dtn::data::Bundle &bundle, const size_t position) throw()
00045                                          : ibrcommon::IOException("Transmission was interrupted."), _bundle(bundle), _position(position)
00046                                         {
00047                                         };
00048 
00049                                         virtual ~TransmissionInterruptedException() throw ()
00050                                         {
00051                                         };
00052 
00053                                         const dtn::data::Bundle _bundle;
00054                                         const size_t _position;
00055                         };
00056 
00057                         class StreamClosedException : public ibrcommon::IOException
00058                         {
00059                         public:
00060                                 StreamClosedException(string what = "The stream has been closed.") throw() : IOException(what)
00061                                 {
00062                                 };
00063                         };
00064 
00065                         class StreamErrorException : public ibrcommon::IOException
00066                         {
00067                         public:
00068                                 StreamErrorException(string what = "StreamError") throw() : IOException(what)
00069                                 {
00070                                 };
00071                         };
00072 
00073                         class StreamShutdownException : public ibrcommon::IOException
00074                         {
00075                         public:
00076                                 StreamShutdownException(string what = "Shutdown message received.") throw() : IOException(what)
00077                                 {
00078                                 };
00079                         };
00080 
00081                         class Callback
00082                         {
00083                         public:
00088                                 virtual void eventShutdown(StreamConnection::ConnectionShutdownCases csc) = 0;
00089 
00094                                 virtual void eventTimeout() = 0;
00095 
00099                                 virtual void eventError() = 0;
00100 
00104                                 virtual void eventBundleRefused() = 0;
00108                                 virtual void eventBundleForwarded() = 0;
00112                                 virtual void eventBundleAck(size_t ack) = 0;
00113 
00118                                 virtual void eventConnectionUp(const StreamContactHeader &header) = 0;
00119 
00123                                 virtual void eventConnectionDown() = 0;
00124                         };
00125 
00131                         StreamConnection(StreamConnection::Callback &cb, iostream &stream, const size_t buffer_size = 4096);
00132 
00137                         virtual ~StreamConnection();
00138 
00144                         void handshake(const dtn::data::EID &eid, const size_t timeout = 10, const char flags = 0);
00145 
00167                         void shutdown(ConnectionShutdownCases csc = CONNECTION_SHUTDOWN_SIMPLE_SHUTDOWN);
00168 
00172                         void reject();
00173 
00177                         void keepalive();
00178 
00179 
00180                 private:
00184                         class StreamBuffer : public std::basic_streambuf<char, std::char_traits<char> >, public ibrcommon::SimpleTimerCallback
00185                         {
00186                         public:
00187                                 enum State
00188                                 {
00189                                         INITIAL = 0,
00190                                         IDLE = 1,
00191                                         DATA_AVAILABLE = 2,
00192                                         DATA_TRANSFER = 3,
00193                                         SHUTDOWN = 4
00194                                 };
00195 
00199                                 StreamBuffer(StreamConnection &conn, iostream &stream, const size_t buffer_size = 1024);
00200                                 virtual ~StreamBuffer();
00201 
00207                                 const StreamContactHeader handshake(const StreamContactHeader &header);
00208 
00212                                 void close();
00213 
00217                                 void shutdowntimers();
00218 
00222                                 void shutdown(const StreamDataSegment::ShutdownReason reason = StreamDataSegment::MSG_SHUTDOWN_NONE);
00223 
00227                                 size_t timeout(size_t identifier);
00228 
00232                                 void reject();
00233 
00237                                 void wait();
00238 
00239                                 void abort();
00240 
00244                                 void keepalive();
00245 
00246                         protected:
00247                                 virtual int sync();
00248                                 virtual int overflow(int = std::char_traits<char>::eof());
00249                                 virtual int underflow();
00250 
00251                         private:
00255                                 bool __good() const;
00256 
00260                                 void __error() const;
00261 
00262                                 enum timerNames
00263                                 {
00264                                         TIMER_IN = 1,
00265                                         TIMER_OUT = 2
00266                                 };
00267 
00268                                 enum StateBits
00269                                 {
00270                                         STREAM_FAILED = 1 << 0,
00271                                         STREAM_BAD = 1 << 1,
00272                                         STREAM_EOF = 1 << 2,
00273                                         STREAM_HANDSHAKE = 1 << 3,
00274                                         STREAM_SHUTDOWN = 1 << 4,
00275                                         STREAM_CLOSED = 1 << 5,
00276                                         STREAM_REJECT = 1 << 6,
00277                                         STREAM_SKIP = 1 << 7,
00278                                         STREAM_ACK_SUPPORT = 1 << 8,
00279                                         STREAM_NACK_SUPPORT = 1 << 9,
00280                                         STREAM_SOB = 1 << 10,                   // start of bundle
00281                                         STREAM_TIMER_SUPPORT = 1 << 11
00282                                 };
00283 
00284                                 void skipData(size_t &size);
00285 
00286                                 bool get(const StateBits bit) const;
00287                                 void set(const StateBits bit);
00288                                 void unset(const StateBits bit);
00289 
00290                                 const size_t _buffer_size;
00291 
00292                                 ibrcommon::Mutex _statelock;
00293                                 int _statebits;
00294 
00295                                 StreamConnection &_conn;
00296 
00297                                 // Input buffer
00298                                 char *in_buf_;
00299 
00300                                 // Output buffer
00301                                 char *out_buf_;
00302                                 ibrcommon::Mutex _sendlock;
00303 
00304                                 std::iostream &_stream;
00305 
00306                                 size_t _recv_size;
00307 
00308                                 size_t _in_timeout;
00309 
00310                                 ibrcommon::Mutex _timer_lock;
00311                                 size_t _in_timeout_value;
00312                                 ibrcommon::SimpleTimer _timer;
00313 
00314                                 // this queue contains all sent data segments
00315                                 // they are removed if an ack or nack is received
00316                                 ibrcommon::Queue<StreamDataSegment> _segments;
00317                                 std::queue<StreamDataSegment> _rejected_segments;
00318 
00319                                 size_t _underflow_data_remain;
00320                                 State _underflow_state;
00321                         };
00322 
00326                         void close();
00327 
00328                         void connectionTimeout();
00329 
00330                         void eventShutdown(StreamConnection::ConnectionShutdownCases csc);
00331 
00332                         void eventBundleAck(size_t ack);
00333                         void eventBundleRefused();
00334                         void eventBundleForwarded();
00335 
00336                         StreamConnection::Callback &_callback;
00337 
00338                         dtn::streams::StreamContactHeader _peer;
00339 
00340                         StreamConnection::StreamBuffer _buf;
00341 
00342                         ibrcommon::Mutex _shutdown_reason_lock;
00343                         ConnectionShutdownCases _shutdown_reason;
00344                 };
00345         }
00346 }
00347 
00348 #endif /* STREAMCONNECTION_H_ */
00349 

Generated on Wed Mar 30 2011 11:11:49 for IBR-DTNSuite by  doxygen 1.7.1