00001
00002
00003
00004
00005
00006
00007
00008 #ifndef STREAMCONNECTION_H_
00009 #define STREAMCONNECTION_H_
00010
00011
00012 #include "ibrdtn/data/Bundle.h"
00013 #include "ibrdtn/data/Exceptions.h"
00014 #include "ibrdtn/streams/StreamContactHeader.h"
00015 #include "ibrdtn/streams/StreamDataSegment.h"
00016 #include <ibrcommon/thread/Mutex.h>
00017 #include <ibrcommon/thread/MutexLock.h>
00018 #include <ibrcommon/thread/Timer.h>
00019 #include <ibrcommon/Exceptions.h>
00020 #include <ibrcommon/thread/ThreadSafeQueue.h>
00021 #include <iostream>
00022 #include <streambuf>
00023
00024 namespace dtn
00025 {
00026 namespace streams
00027 {
00028 class StreamConnection : public iostream
00029 {
00030 public:
00031 enum ConnectionState
00032 {
00033 CONNECTION_INITIAL = 0,
00034 CONNECTION_CONNECTED = 1,
00035 CONNECTION_SHUTDOWN = 2,
00036 CONNECTION_CLOSED = 3
00037 };
00038
00039 enum ConnectionShutdownCases
00040 {
00041 CONNECTION_SHUTDOWN_NOTSET = 0,
00042 CONNECTION_SHUTDOWN_IDLE = 1,
00043 CONNECTION_SHUTDOWN_ERROR = 2,
00044 CONNECTION_SHUTDOWN_SIMPLE_SHUTDOWN = 3,
00045 CONNECTION_SHUTDOWN_NODE_TIMEOUT = 4,
00046 CONNECTION_SHUTDOWN_PEER_SHUTDOWN = 5
00047 };
00048
00049 class TransmissionInterruptedException : public ibrcommon::IOException
00050 {
00051 public:
00052 TransmissionInterruptedException(const dtn::data::Bundle &bundle, const size_t position) throw()
00053 : ibrcommon::IOException("Transmission was interrupted."), _bundle(bundle), _position(position)
00054 {
00055 };
00056
00057 virtual ~TransmissionInterruptedException() throw ()
00058 {
00059 };
00060
00061 const dtn::data::Bundle _bundle;
00062 const size_t _position;
00063 };
00064
00065 class StreamClosedException : public ibrcommon::IOException
00066 {
00067 public:
00068 StreamClosedException(string what = "The stream has been closed.") throw() : IOException(what)
00069 {
00070 };
00071 };
00072
00073 class StreamErrorException : public ibrcommon::IOException
00074 {
00075 public:
00076 StreamErrorException(string what = "StreamError") throw() : IOException(what)
00077 {
00078 };
00079 };
00080
00081 class StreamShutdownException : public ibrcommon::IOException
00082 {
00083 public:
00084 StreamShutdownException(string what = "Shutdown message received.") throw() : IOException(what)
00085 {
00086 };
00087 };
00088
00089 class Callback
00090 {
00091 public:
00096 virtual void eventShutdown() = 0;
00097
00102 virtual void eventTimeout() = 0;
00103
00107 virtual void eventError() = 0;
00108
00112 virtual void eventBundleRefused() = 0;
00116 virtual void eventBundleForwarded() = 0;
00120 virtual void eventBundleAck(size_t ack) = 0;
00121
00126 virtual void eventConnectionUp(const StreamContactHeader &header) = 0;
00127
00131 virtual void eventConnectionDown() = 0;
00132 };
00133
00139 StreamConnection(StreamConnection::Callback &cb, iostream &stream);
00140
00145 virtual ~StreamConnection();
00146
00152 void handshake(const dtn::data::EID &eid, const size_t timeout = 10, const char flags = 0);
00153
00158 bool isConnected();
00159
00164 void wait(const size_t timeout = 0);
00165
00187 void shutdown(ConnectionShutdownCases csc = CONNECTION_SHUTDOWN_SIMPLE_SHUTDOWN);
00188
00192 void reject();
00193
00194 private:
00198 class StreamBuffer : public std::basic_streambuf<char, std::char_traits<char> >, public ibrcommon::SimpleTimerCallback
00199 {
00200 public:
00201 enum State
00202 {
00203 INITIAL = 0,
00204 IDLE = 1,
00205 DATA_AVAILABLE = 2,
00206 DATA_TRANSFER = 3,
00207 SHUTDOWN = 4
00208 };
00209
00210
00211 static const size_t BUFF_SIZE = 1024;
00212
00216 StreamBuffer(StreamConnection &conn, iostream &stream);
00217 virtual ~StreamBuffer();
00218
00224 const StreamContactHeader handshake(const StreamContactHeader &header);
00225
00229 void close();
00230
00234 void shutdown(const StreamDataSegment::ShutdownReason reason = StreamDataSegment::MSG_SHUTDOWN_NONE);
00235
00241
00242 size_t timeout(size_t identifier);
00243
00247 void reject();
00248
00252 bool isCompleted();
00253
00254 protected:
00255 virtual int sync();
00256 virtual int overflow(int = std::char_traits<char>::eof());
00257 virtual int underflow();
00258
00259 private:
00260 enum timerNames
00261 {
00262 TIMER_IN = 1,
00263 TIMER_OUT = 2
00264 };
00265
00266 enum StateBits
00267 {
00268 STREAM_FAILED = 1 << 0,
00269 STREAM_BAD = 1 << 1,
00270 STREAM_EOF = 1 << 2,
00271 STREAM_HANDSHAKE = 1 << 3,
00272 STREAM_SHUTDOWN = 1 << 4,
00273 STREAM_CLOSED = 1 << 5,
00274 STREAM_REJECT = 1 << 6,
00275 STREAM_SKIP = 1 << 7,
00276 STREAM_ACK_SUPPORT = 1 << 8,
00277 STREAM_NACK_SUPPORT = 1 << 9,
00278 STREAM_SOB = 1 << 10,
00279 STREAM_TIMER_SUPPORT = 1 << 11
00280 };
00281
00282 void skipData(size_t &size);
00283
00284 bool get(const StateBits bit) const;
00285 void set(const StateBits bit);
00286 void unset(const StateBits bit);
00287 bool good() const;
00288
00289 ibrcommon::Mutex _statelock;
00290 int _statebits;
00291
00292 StreamConnection &_conn;
00293
00294
00295 char *in_buf_;
00296
00297
00298 char *out_buf_;
00299 ibrcommon::Mutex _sendlock;
00300
00301 std::iostream &_stream;
00302
00303 size_t _recv_size;
00304
00305 size_t _in_timeout;
00306 size_t _out_timeout;
00307
00308 ibrcommon::Mutex _timer_lock;
00309 size_t _in_timeout_value;
00310 size_t _out_timeout_value;
00311 ibrcommon::SimpleTimer _timer;
00312
00313
00314
00315 ibrcommon::ThreadSafeQueue<StreamDataSegment> _segments;
00316 std::queue<StreamDataSegment> _rejected_segments;
00317
00318 ibrcommon::Mutex _underflow_mutex;
00319 size_t _underflow_data_remain;
00320 State _underflow_state;
00321
00322 ibrcommon::Mutex _overflow_mutex;
00323 };
00324
00328 void close();
00329
00330 void connectionTimeout();
00331
00332 void eventShutdown();
00333
00334 void eventBundleAck(size_t ack);
00335 void eventBundleRefused();
00336 void eventBundleForwarded();
00337
00338 StreamConnection::Callback &_callback;
00339
00340 ibrcommon::StatefulConditional<ConnectionState, CONNECTION_CLOSED> _in_state;
00341 ibrcommon::StatefulConditional<ConnectionState, CONNECTION_CLOSED> _out_state;
00342
00343 dtn::streams::StreamContactHeader _peer;
00344
00345 StreamConnection::StreamBuffer _buf;
00346
00347 ibrcommon::Mutex _shutdown_reason_lock;
00348 ConnectionShutdownCases _shutdown_reason;
00349 };
00350 }
00351 }
00352
00353 #endif
00354