00001
00002
00003
00004
00005
00006
00007
00008 #ifndef TCPCONVERGENCELAYER_H_
00009 #define TCPCONVERGENCELAYER_H_
00010
00011 #include "Component.h"
00012
00013 #include "core/EventReceiver.h"
00014 #include "core/NodeEvent.h"
00015 #include "core/Node.h"
00016
00017 #include "net/GenericServer.h"
00018 #include "net/ConvergenceLayer.h"
00019 #include "net/DiscoveryService.h"
00020 #include "net/DiscoveryServiceProvider.h"
00021
00022 #include <ibrdtn/data/Bundle.h>
00023 #include <ibrdtn/data/Serializer.h>
00024 #include <ibrdtn/streams/StreamConnection.h>
00025 #include <ibrdtn/streams/StreamContactHeader.h>
00026
00027 #include <ibrcommon/net/tcpserver.h>
00028 #include <ibrcommon/net/tcpstream.h>
00029 #include <ibrcommon/net/NetInterface.h>
00030
00031 #include <memory>
00032 #include <ibrcommon/thread/ThreadSafeQueue.h>
00033
00034 using namespace dtn::streams;
00035
00036 namespace dtn
00037 {
00038 namespace net
00039 {
00044 class TCPConvergenceLayer : public dtn::daemon::Component, public ConvergenceLayer, public DiscoveryServiceProvider
00045 {
00046 public:
00047 class TCPConnection : public GenericConnection, public StreamConnection::Callback
00048 {
00049 public:
00050 TCPConnection(ibrcommon::tcpstream *stream);
00051 virtual ~TCPConnection();
00052
00056 void iamfree();
00057
00063 void initialize(const dtn::data::EID &name, const size_t timeout = 10);
00064
00068 void shutdown();
00069
00074 const StreamContactHeader getHeader() const;
00075
00080 const dtn::core::Node& getNode() const;
00081
00085 virtual void eventShutdown();
00086 virtual void eventTimeout();
00087 virtual void eventError();
00088 virtual void eventConnectionUp(const StreamContactHeader &header);
00089 virtual void eventConnectionDown();
00090
00091 virtual void eventBundleRefused();
00092 virtual void eventBundleForwarded();
00093 virtual void eventBundleAck(size_t ack);
00094
00095 bool free();
00096
00097 dtn::core::Node::Protocol getDiscoveryProtocol() const;
00098
00103 void queue(const dtn::data::Bundle &bundle);
00104
00105 friend TCPConvergenceLayer::TCPConnection& operator>>(TCPConvergenceLayer::TCPConnection &conn, dtn::data::Bundle &bundle);
00106 friend TCPConvergenceLayer::TCPConnection& operator<<(TCPConvergenceLayer::TCPConnection &conn, const dtn::data::Bundle &bundle);
00107
00108 protected:
00109 void handshake();
00110
00111 void rejectTransmission();
00112
00113 private:
00120 class Receiver : public ibrcommon::JoinableThread
00121 {
00122 public:
00123 Receiver(TCPConnection &connection);
00124 virtual ~Receiver();
00125 void run();
00126 void shutdown();
00127
00128 private:
00129 bool _running;
00130 TCPConnection &_connection;
00131 };
00132
00133 class Sender : public ibrcommon::JoinableThread, public ibrcommon::ThreadSafeQueue<dtn::data::Bundle>
00134 {
00135 public:
00136 Sender(TCPConnection &connection);
00137 virtual ~Sender();
00138 void run();
00139 void shutdown();
00140
00141 private:
00142 bool _running;
00143 TCPConnection &_connection;
00144 };
00145
00146 ibrcommon::Mutex _freemutex;
00147 bool _free;
00148
00149 StreamContactHeader _peer;
00150 dtn::core::Node _node;
00151
00152 std::auto_ptr<ibrcommon::tcpstream> _tcpstream;
00153 StreamConnection _stream;
00154
00155
00156
00157 Sender _sender;
00158
00159
00160 Receiver _receiver;
00161
00162
00163 dtn::data::EID _name;
00164 size_t _timeout;
00165
00166 ibrcommon::ThreadSafeQueue<dtn::data::Bundle> _sentqueue;
00167 size_t _lastack;
00168 };
00169
00170 class Server : public dtn::net::GenericServer<TCPConvergenceLayer::TCPConnection>, public dtn::core::EventReceiver
00171 {
00172 public:
00173 Server(ibrcommon::NetInterface net, int port);
00174 virtual ~Server();
00175
00180 void queue(const dtn::core::Node &n, const ConvergenceLayer::Job &job);
00181
00185 void raiseEvent(const dtn::core::Event *evt);
00186
00187 protected:
00188 TCPConvergenceLayer::TCPConnection* accept();
00189 void listen();
00190 void shutdown();
00191
00192 void connectionUp(TCPConvergenceLayer::TCPConnection *conn);
00193 void connectionDown(TCPConvergenceLayer::TCPConnection *conn);
00194
00195 private:
00196 class Connection
00197 {
00198 public:
00199 Connection(TCPConvergenceLayer::TCPConnection *conn, const dtn::core::Node &node, const bool &active = false);
00200 ~Connection();
00201
00202 bool match(const dtn::data::EID &destination) const;
00203 bool match(const dtn::core::NodeEvent &evt) const;
00204
00205 TCPConvergenceLayer::TCPConnection& operator*();
00206
00207 TCPConvergenceLayer::TCPConnection *_connection;
00208 dtn::core::Node _peer;
00209 bool _active;
00210 };
00211
00212 TCPConnection* getConnection(const dtn::core::Node &n);
00213 ibrcommon::tcpserver _tcpsrv;
00214 ibrcommon::Conditional _connection_lock;
00215 std::list<Connection> _connections;
00216 };
00217
00223 TCPConvergenceLayer(ibrcommon::NetInterface net, int port);
00224
00228 virtual ~TCPConvergenceLayer();
00229
00230 dtn::core::Node::Protocol getDiscoveryProtocol() const;
00231
00232 void initialize();
00233 void startup();
00234 void terminate();
00235
00239 void update(std::string &name, std::string &data);
00240 bool onInterface(const ibrcommon::NetInterface &net) const;
00241
00246 void queue(const dtn::core::Node &n, const ConvergenceLayer::Job &job);
00247
00248 private:
00249 static const int DEFAULT_PORT;
00250 bool _running;
00251
00252 ibrcommon::NetInterface _net;
00253 int _port;
00254 TCPConvergenceLayer::Server _server;
00255 };
00256 }
00257 }
00258
00259 #endif