IBR-DTNSuite 0.6

ibrdtn/ibrdtn/streams/StreamConnection.h

Go to the documentation of this file.
00001 /*
00002  * StreamConnection.h
00003  *
00004  *  Created on: 01.07.2009
00005  *      Author: morgenro
00006  */
00007 
00008 #ifndef STREAMCONNECTION_H_
00009 #define STREAMCONNECTION_H_
00010 
00011 
00012 #include "ibrdtn/data/Bundle.h"
00013 #include "ibrdtn/data/Exceptions.h"
00014 #include "ibrdtn/streams/StreamContactHeader.h"
00015 #include "ibrdtn/streams/StreamDataSegment.h"
00016 #include <ibrcommon/thread/Mutex.h>
00017 #include <ibrcommon/thread/MutexLock.h>
00018 #include <ibrcommon/thread/Timer.h>
00019 #include <ibrcommon/Exceptions.h>
00020 #include <ibrcommon/thread/Queue.h>
00021 #include <iostream>
00022 #include <streambuf>
00023 
00024 namespace dtn
00025 {
00026         namespace streams
00027         {
00028                 class StreamConnection : public iostream
00029                 {
00030                 public:
00031                         enum ConnectionShutdownCases
00032                         {
00033                                 CONNECTION_SHUTDOWN_NOTSET = 0,
00034                                 CONNECTION_SHUTDOWN_IDLE = 1,
00035                                 CONNECTION_SHUTDOWN_ERROR = 2,
00036                                 CONNECTION_SHUTDOWN_SIMPLE_SHUTDOWN = 3,
00037                                 CONNECTION_SHUTDOWN_NODE_TIMEOUT = 4,
00038                                 CONNECTION_SHUTDOWN_PEER_SHUTDOWN = 5
00039                         };
00040 
00041                         class TransmissionInterruptedException : public ibrcommon::IOException
00042                         {
00043                                 public:
00044                                         TransmissionInterruptedException(const dtn::data::Bundle &bundle, const size_t position) throw()
00045                                          : ibrcommon::IOException("Transmission was interrupted."), _bundle(bundle), _position(position)
00046                                         {
00047                                         };
00048 
00049                                         virtual ~TransmissionInterruptedException() throw ()
00050                                         {
00051                                         };
00052 
00053                                         const dtn::data::Bundle _bundle;
00054                                         const size_t _position;
00055                         };
00056 
00057                         class StreamClosedException : public ibrcommon::IOException
00058                         {
00059                         public:
00060                                 StreamClosedException(string what = "The stream has been closed.") throw() : IOException(what)
00061                                 {
00062                                 };
00063                         };
00064 
00065                         class StreamErrorException : public ibrcommon::IOException
00066                         {
00067                         public:
00068                                 StreamErrorException(string what = "StreamError") throw() : IOException(what)
00069                                 {
00070                                 };
00071                         };
00072 
00073                         class StreamShutdownException : public ibrcommon::IOException
00074                         {
00075                         public:
00076                                 StreamShutdownException(string what = "Shutdown message received.") throw() : IOException(what)
00077                                 {
00078                                 };
00079                         };
00080 
00081                         class Callback
00082                         {
00083                         public:
00088                                 virtual void eventShutdown(StreamConnection::ConnectionShutdownCases csc) = 0;
00089 
00094                                 virtual void eventTimeout() = 0;
00095 
00099                                 virtual void eventError() = 0;
00100 
00104                                 virtual void eventBundleRefused() = 0;
00108                                 virtual void eventBundleForwarded() = 0;
00112                                 virtual void eventBundleAck(size_t ack) = 0;
00113 
00118                                 virtual void eventConnectionUp(const StreamContactHeader &header) = 0;
00119 
00123                                 virtual void eventConnectionDown() = 0;
00124                         };
00125 
00131                         StreamConnection(StreamConnection::Callback &cb, iostream &stream, const size_t buffer_size = 4096);
00132 
00137                         virtual ~StreamConnection();
00138 
00144                         void handshake(const dtn::data::EID &eid, const size_t timeout = 10, const char flags = 0);
00145 
00167                         void shutdown(ConnectionShutdownCases csc = CONNECTION_SHUTDOWN_SIMPLE_SHUTDOWN);
00168 
00172                         void reject();
00173 
00177                         void keepalive();
00178 
00179 
00180                 private:
00184                         class StreamBuffer : public std::basic_streambuf<char, std::char_traits<char> >
00185                         {
00186                         public:
00187                                 enum State
00188                                 {
00189                                         INITIAL = 0,
00190                                         IDLE = 1,
00191                                         DATA_AVAILABLE = 2,
00192                                         DATA_TRANSFER = 3,
00193                                         SHUTDOWN = 4
00194                                 };
00195 
00199                                 StreamBuffer(StreamConnection &conn, iostream &stream, const size_t buffer_size = 1024);
00200                                 virtual ~StreamBuffer();
00201 
00207                                 const StreamContactHeader handshake(const StreamContactHeader &header);
00208 
00212                                 void close();
00213 
00217                                 void shutdown(const StreamDataSegment::ShutdownReason reason = StreamDataSegment::MSG_SHUTDOWN_NONE);
00218 
00222                                 void reject();
00223 
00227                                 void wait();
00228 
00229                                 void abort();
00230 
00234                                 void keepalive();
00235 
00236                         protected:
00237                                 virtual int sync();
00238                                 virtual int overflow(int = std::char_traits<char>::eof());
00239                                 virtual int underflow();
00240 
00241                         private:
00245                                 bool __good() const;
00246 
00250                                 void __error() const;
00251 
00252                                 enum timerNames
00253                                 {
00254                                         TIMER_IN = 1,
00255                                         TIMER_OUT = 2
00256                                 };
00257 
00258                                 enum StateBits
00259                                 {
00260                                         STREAM_FAILED = 1 << 0,
00261                                         STREAM_BAD = 1 << 1,
00262                                         STREAM_EOF = 1 << 2,
00263                                         STREAM_HANDSHAKE = 1 << 3,
00264                                         STREAM_SHUTDOWN = 1 << 4,
00265                                         STREAM_CLOSED = 1 << 5,
00266                                         STREAM_REJECT = 1 << 6,
00267                                         STREAM_SKIP = 1 << 7,
00268                                         STREAM_ACK_SUPPORT = 1 << 8,
00269                                         STREAM_NACK_SUPPORT = 1 << 9,
00270                                         STREAM_SOB = 1 << 10,                   // start of bundle
00271                                         STREAM_TIMER_SUPPORT = 1 << 11
00272                                 };
00273 
00274                                 void skipData(size_t &size);
00275 
00276                                 bool get(const StateBits bit) const;
00277                                 void set(const StateBits bit);
00278                                 void unset(const StateBits bit);
00279 
00280                                 const size_t _buffer_size;
00281 
00282                                 ibrcommon::Mutex _statelock;
00283                                 int _statebits;
00284 
00285                                 StreamConnection &_conn;
00286 
00287                                 // Input buffer
00288                                 char *in_buf_;
00289 
00290                                 // Output buffer
00291                                 char *out_buf_;
00292                                 ibrcommon::Mutex _sendlock;
00293 
00294                                 std::iostream &_stream;
00295 
00296                                 size_t _recv_size;
00297 
00298                                 // this queue contains all sent data segments
00299                                 // they are removed if an ack or nack is received
00300                                 ibrcommon::Queue<StreamDataSegment> _segments;
00301                                 std::queue<StreamDataSegment> _rejected_segments;
00302 
00303                                 size_t _underflow_data_remain;
00304                                 State _underflow_state;
00305                         };
00306 
00307                         void connectionTimeout();
00308 
00309                         void eventShutdown(StreamConnection::ConnectionShutdownCases csc);
00310 
00311                         void eventBundleAck(size_t ack);
00312                         void eventBundleRefused();
00313                         void eventBundleForwarded();
00314 
00315                         StreamConnection::Callback &_callback;
00316 
00317                         dtn::streams::StreamContactHeader _peer;
00318 
00319                         StreamConnection::StreamBuffer _buf;
00320 
00321                         ibrcommon::Mutex _shutdown_reason_lock;
00322                         ConnectionShutdownCases _shutdown_reason;
00323                 };
00324         }
00325 }
00326 
00327 #endif /* STREAMCONNECTION_H_ */
00328