Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #ifndef STREAMCONNECTION_H_
00009 #define STREAMCONNECTION_H_
00010
00011
00012 #include "ibrdtn/data/Bundle.h"
00013 #include "ibrdtn/data/Exceptions.h"
00014 #include "ibrdtn/streams/StreamContactHeader.h"
00015 #include "ibrdtn/streams/StreamDataSegment.h"
00016 #include <ibrcommon/thread/Mutex.h>
00017 #include <ibrcommon/thread/MutexLock.h>
00018 #include <ibrcommon/thread/Timer.h>
00019 #include <ibrcommon/Exceptions.h>
00020 #include <ibrcommon/thread/Queue.h>
00021 #include <iostream>
00022 #include <streambuf>
00023
00024 namespace dtn
00025 {
00026 namespace streams
00027 {
00028 class StreamConnection : public iostream
00029 {
00030 public:
00031 enum ConnectionShutdownCases
00032 {
00033 CONNECTION_SHUTDOWN_NOTSET = 0,
00034 CONNECTION_SHUTDOWN_IDLE = 1,
00035 CONNECTION_SHUTDOWN_ERROR = 2,
00036 CONNECTION_SHUTDOWN_SIMPLE_SHUTDOWN = 3,
00037 CONNECTION_SHUTDOWN_NODE_TIMEOUT = 4,
00038 CONNECTION_SHUTDOWN_PEER_SHUTDOWN = 5
00039 };
00040
00041 class TransmissionInterruptedException : public ibrcommon::IOException
00042 {
00043 public:
00044 TransmissionInterruptedException(const dtn::data::Bundle &bundle, const size_t position) throw()
00045 : ibrcommon::IOException("Transmission was interrupted."), _bundle(bundle), _position(position)
00046 {
00047 };
00048
00049 virtual ~TransmissionInterruptedException() throw ()
00050 {
00051 };
00052
00053 const dtn::data::Bundle _bundle;
00054 const size_t _position;
00055 };
00056
00057 class StreamClosedException : public ibrcommon::IOException
00058 {
00059 public:
00060 StreamClosedException(string what = "The stream has been closed.") throw() : IOException(what)
00061 {
00062 };
00063 };
00064
00065 class StreamErrorException : public ibrcommon::IOException
00066 {
00067 public:
00068 StreamErrorException(string what = "StreamError") throw() : IOException(what)
00069 {
00070 };
00071 };
00072
00073 class StreamShutdownException : public ibrcommon::IOException
00074 {
00075 public:
00076 StreamShutdownException(string what = "Shutdown message received.") throw() : IOException(what)
00077 {
00078 };
00079 };
00080
00081 class Callback
00082 {
00083 public:
00088 virtual void eventShutdown(StreamConnection::ConnectionShutdownCases csc) = 0;
00089
00094 virtual void eventTimeout() = 0;
00095
00099 virtual void eventError() = 0;
00100
00104 virtual void eventBundleRefused() = 0;
00108 virtual void eventBundleForwarded() = 0;
00112 virtual void eventBundleAck(size_t ack) = 0;
00113
00118 virtual void eventConnectionUp(const StreamContactHeader &header) = 0;
00119
00123 virtual void eventConnectionDown() = 0;
00124 };
00125
00131 StreamConnection(StreamConnection::Callback &cb, iostream &stream, const size_t buffer_size = 4096);
00132
00137 virtual ~StreamConnection();
00138
00144 void handshake(const dtn::data::EID &eid, const size_t timeout = 10, const char flags = 0);
00145
00167 void shutdown(ConnectionShutdownCases csc = CONNECTION_SHUTDOWN_SIMPLE_SHUTDOWN);
00168
00172 void reject();
00173
00177 void keepalive();
00178
00179
00180 private:
00184 class StreamBuffer : public std::basic_streambuf<char, std::char_traits<char> >, public ibrcommon::SimpleTimerCallback
00185 {
00186 public:
00187 enum State
00188 {
00189 INITIAL = 0,
00190 IDLE = 1,
00191 DATA_AVAILABLE = 2,
00192 DATA_TRANSFER = 3,
00193 SHUTDOWN = 4
00194 };
00195
00199 StreamBuffer(StreamConnection &conn, iostream &stream, const size_t buffer_size = 1024);
00200 virtual ~StreamBuffer();
00201
00207 const StreamContactHeader handshake(const StreamContactHeader &header);
00208
00212 void close();
00213
00217 void shutdowntimers();
00218
00222 void shutdown(const StreamDataSegment::ShutdownReason reason = StreamDataSegment::MSG_SHUTDOWN_NONE);
00223
00227 size_t timeout(size_t identifier);
00228
00232 void reject();
00233
00237 void wait();
00238
00239 void abort();
00240
00244 void keepalive();
00245
00246 protected:
00247 virtual int sync();
00248 virtual int overflow(int = std::char_traits<char>::eof());
00249 virtual int underflow();
00250
00251 private:
00255 bool __good() const;
00256
00260 void __error() const;
00261
00262 enum timerNames
00263 {
00264 TIMER_IN = 1,
00265 TIMER_OUT = 2
00266 };
00267
00268 enum StateBits
00269 {
00270 STREAM_FAILED = 1 << 0,
00271 STREAM_BAD = 1 << 1,
00272 STREAM_EOF = 1 << 2,
00273 STREAM_HANDSHAKE = 1 << 3,
00274 STREAM_SHUTDOWN = 1 << 4,
00275 STREAM_CLOSED = 1 << 5,
00276 STREAM_REJECT = 1 << 6,
00277 STREAM_SKIP = 1 << 7,
00278 STREAM_ACK_SUPPORT = 1 << 8,
00279 STREAM_NACK_SUPPORT = 1 << 9,
00280 STREAM_SOB = 1 << 10,
00281 STREAM_TIMER_SUPPORT = 1 << 11
00282 };
00283
00284 void skipData(size_t &size);
00285
00286 bool get(const StateBits bit) const;
00287 void set(const StateBits bit);
00288 void unset(const StateBits bit);
00289
00290 const size_t _buffer_size;
00291
00292 ibrcommon::Mutex _statelock;
00293 int _statebits;
00294
00295 StreamConnection &_conn;
00296
00297
00298 char *in_buf_;
00299
00300
00301 char *out_buf_;
00302 ibrcommon::Mutex _sendlock;
00303
00304 std::iostream &_stream;
00305
00306 size_t _recv_size;
00307
00308 size_t _in_timeout;
00309
00310 ibrcommon::Mutex _timer_lock;
00311 size_t _in_timeout_value;
00312 ibrcommon::SimpleTimer _timer;
00313
00314
00315
00316 ibrcommon::Queue<StreamDataSegment> _segments;
00317 std::queue<StreamDataSegment> _rejected_segments;
00318
00319 size_t _underflow_data_remain;
00320 State _underflow_state;
00321 };
00322
00326 void close();
00327
00328 void connectionTimeout();
00329
00330 void eventShutdown(StreamConnection::ConnectionShutdownCases csc);
00331
00332 void eventBundleAck(size_t ack);
00333 void eventBundleRefused();
00334 void eventBundleForwarded();
00335
00336 StreamConnection::Callback &_callback;
00337
00338 dtn::streams::StreamContactHeader _peer;
00339
00340 StreamConnection::StreamBuffer _buf;
00341
00342 ibrcommon::Mutex _shutdown_reason_lock;
00343 ConnectionShutdownCases _shutdown_reason;
00344 };
00345 }
00346 }
00347
00348 #endif
00349