Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #ifndef TCPCONVERGENCELAYER_H_
00009 #define TCPCONVERGENCELAYER_H_
00010
00011 #include "Component.h"
00012
00013 #include "core/EventReceiver.h"
00014 #include "core/NodeEvent.h"
00015 #include "core/Node.h"
00016
00017 #include "net/GenericServer.h"
00018 #include "net/ConvergenceLayer.h"
00019 #include "net/DiscoveryService.h"
00020 #include "net/DiscoveryServiceProvider.h"
00021
00022 #include <ibrdtn/data/Bundle.h>
00023 #include <ibrdtn/data/MetaBundle.h>
00024 #include <ibrdtn/data/Serializer.h>
00025 #include <ibrdtn/streams/StreamConnection.h>
00026 #include <ibrdtn/streams/StreamContactHeader.h>
00027
00028 #include <ibrcommon/net/tcpserver.h>
00029 #include <ibrcommon/net/tcpstream.h>
00030 #include <ibrcommon/net/NetInterface.h>
00031
00032 #include <memory>
00033 #include <ibrcommon/thread/Queue.h>
00034
00035 using namespace dtn::streams;
00036
00037 namespace dtn
00038 {
00039 namespace net
00040 {
00045 class TCPConvergenceLayer : public dtn::daemon::Component, public ConvergenceLayer, public DiscoveryServiceProvider
00046 {
00047 public:
00048 class TCPConnection : public GenericConnection<TCPConvergenceLayer::TCPConnection>, public StreamConnection::Callback, public ibrcommon::DetachedThread
00049 {
00050 public:
00051 TCPConnection(GenericServer<TCPConnection> &tcpsrv, ibrcommon::tcpstream *stream, const dtn::data::EID &name, const size_t timeout = 10);
00052
00053 virtual ~TCPConnection();
00054
00058 virtual void initialize();
00059
00063 void shutdown();
00064
00069 const StreamContactHeader getHeader() const;
00070
00075 const dtn::core::Node& getNode() const;
00076
00080 virtual void eventShutdown(StreamConnection::ConnectionShutdownCases csc);
00081 virtual void eventTimeout();
00082 virtual void eventError();
00083 virtual void eventConnectionUp(const StreamContactHeader &header);
00084 virtual void eventConnectionDown();
00085
00086 virtual void eventBundleRefused();
00087 virtual void eventBundleForwarded();
00088 virtual void eventBundleAck(size_t ack);
00089
00090 dtn::core::Node::Protocol getDiscoveryProtocol() const;
00091
00096 void queue(const dtn::data::BundleID &bundle);
00097
00098 friend TCPConvergenceLayer::TCPConnection& operator>>(TCPConvergenceLayer::TCPConnection &conn, dtn::data::Bundle &bundle);
00099 friend TCPConvergenceLayer::TCPConnection& operator<<(TCPConvergenceLayer::TCPConnection &conn, const dtn::data::Bundle &bundle);
00100
00101 protected:
00102 void rejectTransmission();
00103
00104 void run();
00105 void finally();
00106
00107 void clearQueue();
00108
00109 void keepalive();
00110
00111 private:
00112 class Sender : public ibrcommon::JoinableThread, public ibrcommon::Queue<dtn::data::BundleID>
00113 {
00114 public:
00115 Sender(TCPConnection &connection, size_t &keepalive_timeout);
00116 virtual ~Sender();
00117
00118 protected:
00119 void run();
00120 void finally();
00121 bool __cancellation();
00122
00123 private:
00124 bool _abort;
00125 TCPConnection &_connection;
00126 size_t &_keepalive_timeout;
00127 dtn::data::BundleID _current_transfer;
00128 };
00129
00130 StreamContactHeader _peer;
00131 dtn::core::Node _node;
00132
00133 std::auto_ptr<ibrcommon::tcpstream> _tcpstream;
00134 StreamConnection _stream;
00135
00136
00137
00138 Sender _sender;
00139
00140
00141 dtn::data::EID _name;
00142 size_t _timeout;
00143
00144 ibrcommon::Queue<dtn::data::MetaBundle> _sentqueue;
00145 size_t _lastack;
00146 size_t _keepalive_timeout;
00147 };
00148
00149 class Server : public dtn::net::GenericServer<TCPConvergenceLayer::TCPConnection>, public dtn::core::EventReceiver
00150 {
00151 public:
00152 Server(ibrcommon::NetInterface net, int port);
00153 virtual ~Server();
00154
00159 void queue(const dtn::core::Node &n, const ConvergenceLayer::Job &job);
00160
00164 void raiseEvent(const dtn::core::Event *evt);
00165
00169 virtual const std::string getName() const;
00170
00171 protected:
00172 TCPConvergenceLayer::TCPConnection* accept();
00173 void listen();
00174 void shutdown();
00175
00176 void connectionUp(TCPConvergenceLayer::TCPConnection *conn);
00177 void connectionDown(TCPConvergenceLayer::TCPConnection *conn);
00178
00179 private:
00180 class Connection
00181 {
00182 public:
00183 Connection(TCPConvergenceLayer::TCPConnection *conn, const dtn::core::Node &node, const bool &active = false);
00184 ~Connection();
00185
00186 bool match(const dtn::data::EID &destination) const;
00187 bool match(const dtn::core::NodeEvent &evt) const;
00188
00189 TCPConvergenceLayer::TCPConnection& operator*();
00190
00191 TCPConvergenceLayer::TCPConnection *_connection;
00192 dtn::core::Node _peer;
00193 bool _active;
00194 };
00195
00196 ibrcommon::tcpserver _tcpsrv;
00197 std::list<Connection> _connections;
00198 };
00199
00205 TCPConvergenceLayer(ibrcommon::NetInterface net, int port);
00206
00210 virtual ~TCPConvergenceLayer();
00211
00212 dtn::core::Node::Protocol getDiscoveryProtocol() const;
00213
00214 void initialize();
00215 void startup();
00216 void terminate();
00217
00221 void update(std::string &name, std::string &data);
00222 bool onInterface(const ibrcommon::NetInterface &net) const;
00223
00228 void queue(const dtn::core::Node &n, const ConvergenceLayer::Job &job);
00229
00233 virtual const std::string getName() const;
00234
00235 private:
00236 static const int DEFAULT_PORT;
00237 bool _running;
00238
00239 ibrcommon::NetInterface _net;
00240 int _port;
00241 TCPConvergenceLayer::Server _server;
00242 };
00243 }
00244 }
00245
00246 #endif