• Main Page
  • Namespaces
  • Classes
  • Files
  • File List
  • File Members

daemon/src/net/TCPConvergenceLayer.h

Go to the documentation of this file.
00001 /*
00002  * TCPConvergenceLayer.h
00003  *
00004  *  Created on: 05.08.2009
00005  *      Author: morgenro
00006  */
00007 
00008 #ifndef TCPCONVERGENCELAYER_H_
00009 #define TCPCONVERGENCELAYER_H_
00010 
00011 #include "Component.h"
00012 
00013 #include "core/EventReceiver.h"
00014 #include "core/NodeEvent.h"
00015 #include "core/Node.h"
00016 
00017 #include "net/GenericServer.h"
00018 #include "net/ConvergenceLayer.h"
00019 #include "net/DiscoveryService.h"
00020 #include "net/DiscoveryServiceProvider.h"
00021 
00022 #include <ibrdtn/data/Bundle.h>
00023 #include <ibrdtn/data/MetaBundle.h>
00024 #include <ibrdtn/data/Serializer.h>
00025 #include <ibrdtn/streams/StreamConnection.h>
00026 #include <ibrdtn/streams/StreamContactHeader.h>
00027 
00028 #include <ibrcommon/net/tcpserver.h>
00029 #include <ibrcommon/net/tcpstream.h>
00030 #include <ibrcommon/net/NetInterface.h>
00031 
00032 #include <memory>
00033 #include <ibrcommon/thread/Queue.h>
00034 
00035 using namespace dtn::streams;
00036 
00037 namespace dtn
00038 {
00039         namespace net
00040         {
00045                 class TCPConvergenceLayer : public dtn::daemon::Component, public ConvergenceLayer, public DiscoveryServiceProvider
00046                 {
00047                 public:
00048                         class TCPConnection : public GenericConnection<TCPConvergenceLayer::TCPConnection>, public StreamConnection::Callback, public ibrcommon::DetachedThread
00049                         {
00050                         public:
00051                                 TCPConnection(GenericServer<TCPConnection> &tcpsrv, ibrcommon::tcpstream *stream, const dtn::data::EID &name, const size_t timeout = 10);
00052 
00053                                 virtual ~TCPConnection();
00054 
00058                                 virtual void initialize();
00059 
00063                                 void shutdown();
00064 
00069                                 const StreamContactHeader getHeader() const;
00070 
00075                                 const dtn::core::Node& getNode() const;
00076 
00080                                 virtual void eventShutdown(StreamConnection::ConnectionShutdownCases csc);
00081                                 virtual void eventTimeout();
00082                                 virtual void eventError();
00083                                 virtual void eventConnectionUp(const StreamContactHeader &header);
00084                                 virtual void eventConnectionDown();
00085 
00086                                 virtual void eventBundleRefused();
00087                                 virtual void eventBundleForwarded();
00088                                 virtual void eventBundleAck(size_t ack);
00089 
00090                                 dtn::core::Node::Protocol getDiscoveryProtocol() const;
00091 
00096                                 void queue(const dtn::data::BundleID &bundle);
00097 
00098                                 friend TCPConvergenceLayer::TCPConnection& operator>>(TCPConvergenceLayer::TCPConnection &conn, dtn::data::Bundle &bundle);
00099                                 friend TCPConvergenceLayer::TCPConnection& operator<<(TCPConvergenceLayer::TCPConnection &conn, const dtn::data::Bundle &bundle);
00100 
00101                         protected:
00102                                 void rejectTransmission();
00103 
00104                                 void run();
00105                                 void finally();
00106 
00107                                 void clearQueue();
00108 
00109                                 void keepalive();
00110 
00111                         private:
00112                                 class Sender : public ibrcommon::JoinableThread, public ibrcommon::Queue<dtn::data::BundleID>
00113                                 {
00114                                 public:
00115                                         Sender(TCPConnection &connection, size_t &keepalive_timeout);
00116                                         virtual ~Sender();
00117 
00118                                 protected:
00119                                         void run();
00120                                         void finally();
00121                                         bool __cancellation();
00122 
00123                                 private:
00124                                         bool _abort;
00125                                         TCPConnection &_connection;
00126                                         size_t &_keepalive_timeout;
00127                                         dtn::data::BundleID _current_transfer;
00128                                 };
00129 
00130                                 StreamContactHeader _peer;
00131                                 dtn::core::Node _node;
00132 
00133                                 std::auto_ptr<ibrcommon::tcpstream> _tcpstream;
00134                                 StreamConnection _stream;
00135 
00136                                 // This thread gets awaiting bundles of the queue
00137                                 // and transmit them to the peer.
00138                                 Sender _sender;
00139 
00140                                 // handshake variables
00141                                 dtn::data::EID _name;
00142                                 size_t _timeout;
00143 
00144                                 ibrcommon::Queue<dtn::data::MetaBundle> _sentqueue;
00145                                 size_t _lastack;
00146                                 size_t _keepalive_timeout;
00147                         };
00148 
00149                         class Server : public dtn::net::GenericServer<TCPConvergenceLayer::TCPConnection>, public dtn::core::EventReceiver
00150                         {
00151                         public:
00152                                 Server(ibrcommon::NetInterface net, int port);
00153                                 virtual ~Server();
00154 
00159                                 void queue(const dtn::core::Node &n, const ConvergenceLayer::Job &job);
00160 
00164                                 void raiseEvent(const dtn::core::Event *evt);
00165 
00169                                 virtual const std::string getName() const;
00170 
00171                         protected:
00172                                 TCPConvergenceLayer::TCPConnection* accept();
00173                                 void listen();
00174                                 void shutdown();
00175 
00176                                 void connectionUp(TCPConvergenceLayer::TCPConnection *conn);
00177                                 void connectionDown(TCPConvergenceLayer::TCPConnection *conn);
00178 
00179                         private:
00180                                 class Connection
00181                                 {
00182                                 public:
00183                                         Connection(TCPConvergenceLayer::TCPConnection *conn, const dtn::core::Node &node, const bool &active = false);
00184                                         ~Connection();
00185 
00186                                         bool match(const dtn::data::EID &destination) const;
00187                                         bool match(const dtn::core::NodeEvent &evt) const;
00188 
00189                                         TCPConvergenceLayer::TCPConnection& operator*();
00190 
00191                                         TCPConvergenceLayer::TCPConnection *_connection;
00192                                         dtn::core::Node _peer;
00193                                         bool _active;
00194                                 };
00195 
00196                                 ibrcommon::tcpserver _tcpsrv;
00197                                 std::list<Connection> _connections;
00198                         };
00199 
00205                         TCPConvergenceLayer(ibrcommon::NetInterface net, int port);
00206 
00210                         virtual ~TCPConvergenceLayer();
00211 
00212                         dtn::core::Node::Protocol getDiscoveryProtocol() const;
00213 
00214                         void initialize();
00215                         void startup();
00216                         void terminate();
00217 
00221                         void update(std::string &name, std::string &data);
00222                         bool onInterface(const ibrcommon::NetInterface &net) const;
00223 
00228                         void queue(const dtn::core::Node &n, const ConvergenceLayer::Job &job);
00229 
00233                         virtual const std::string getName() const;
00234 
00235                 private:
00236                         static const int DEFAULT_PORT;
00237                         bool _running;
00238 
00239                         ibrcommon::NetInterface _net;
00240                         int _port;
00241                         TCPConvergenceLayer::Server _server;
00242                 };
00243         }
00244 }
00245 
00246 #endif /* TCPCONVERGENCELAYER_H_ */

Generated on Thu Nov 11 2010 09:49:47 for IBR-DTNSuite by  doxygen 1.7.1