|
IBR-DTNSuite 0.6
|
00001 /* 00002 * TCPConvergenceLayer.h 00003 * 00004 * Created on: 05.08.2009 00005 * Author: morgenro 00006 */ 00007 00008 #ifndef TCPCONVERGENCELAYER_H_ 00009 #define TCPCONVERGENCELAYER_H_ 00010 00011 #include "Component.h" 00012 00013 #include "core/NodeEvent.h" 00014 #include "core/Node.h" 00015 #include "net/ConvergenceLayer.h" 00016 #include "net/DiscoveryService.h" 00017 #include "net/DiscoveryServiceProvider.h" 00018 00019 #include <ibrdtn/data/Bundle.h> 00020 #include <ibrdtn/data/EID.h> 00021 #include <ibrdtn/data/MetaBundle.h> 00022 #include <ibrdtn/data/Serializer.h> 00023 #include <ibrdtn/streams/StreamConnection.h> 00024 #include <ibrdtn/streams/StreamContactHeader.h> 00025 00026 #include <ibrcommon/net/tcpserver.h> 00027 #include <ibrcommon/net/tcpstream.h> 00028 #include <ibrcommon/net/vinterface.h> 00029 00030 #include <memory> 00031 #include <ibrcommon/thread/Queue.h> 00032 00033 using namespace dtn::streams; 00034 00035 namespace dtn 00036 { 00037 namespace net 00038 { 00039 class TCPConvergenceLayer; 00040 00041 class TCPConnection : public StreamConnection::Callback, public ibrcommon::DetachedThread 00042 { 00043 public: 00052 TCPConnection(TCPConvergenceLayer &tcpsrv, ibrcommon::tcpstream *stream, const dtn::data::EID &name, const size_t timeout = 10); 00053 00062 TCPConnection(TCPConvergenceLayer &tcpsrv, const dtn::core::Node &node, const dtn::data::EID &name, const size_t timeout = 10); 00063 00068 virtual ~TCPConnection(); 00069 00073 virtual void initialize(); 00074 00078 void shutdown(); 00079 00084 const StreamContactHeader& getHeader() const; 00085 00090 const dtn::core::Node& getNode() const; 00091 00095 virtual void eventShutdown(StreamConnection::ConnectionShutdownCases csc); 00096 virtual void eventTimeout(); 00097 virtual void eventError(); 00098 virtual void eventConnectionUp(const StreamContactHeader &header); 00099 virtual void eventConnectionDown(); 00100 00101 virtual void eventBundleRefused(); 00102 virtual void eventBundleForwarded(); 00103 virtual void eventBundleAck(size_t ack); 00104 00105 dtn::core::Node::Protocol getDiscoveryProtocol() const; 00106 00111 void queue(const dtn::data::BundleID &bundle); 00112 00113 bool match(const dtn::core::Node &n) const; 00114 bool match(const dtn::data::EID &destination) const; 00115 bool match(const dtn::core::NodeEvent &evt) const; 00116 00117 friend TCPConnection& operator>>(TCPConnection &conn, dtn::data::Bundle &bundle); 00118 friend TCPConnection& operator<<(TCPConnection &conn, const dtn::data::Bundle &bundle); 00119 00120 protected: 00121 void rejectTransmission(); 00122 00123 void setup(); 00124 void run(); 00125 void finally(); 00126 bool __cancellation(); 00127 00128 void clearQueue(); 00129 00130 void keepalive(); 00131 bool good() const; 00132 00133 private: 00134 class Sender : public ibrcommon::JoinableThread, public ibrcommon::Queue<dtn::data::BundleID> 00135 { 00136 public: 00137 Sender(TCPConnection &connection, size_t &keepalive_timeout); 00138 virtual ~Sender(); 00139 00140 protected: 00141 void run(); 00142 void finally(); 00143 bool __cancellation(); 00144 00145 private: 00146 TCPConnection &_connection; 00147 size_t &_keepalive_timeout; 00148 dtn::data::BundleID _current_transfer; 00149 }; 00150 00151 StreamContactHeader _peer; 00152 dtn::core::Node _node; 00153 00154 std::auto_ptr<ibrcommon::tcpstream> _tcpstream; 00155 StreamConnection _stream; 00156 00157 // This thread gets awaiting bundles of the queue 00158 // and transmit them to the peer. 00159 Sender _sender; 00160 00161 // handshake variables 00162 dtn::data::EID _name; 00163 size_t _timeout; 00164 00165 ibrcommon::Queue<dtn::data::MetaBundle> _sentqueue; 00166 size_t _lastack; 00167 size_t _keepalive_timeout; 00168 00169 TCPConvergenceLayer &_callback; 00170 }; 00171 00176 class TCPConvergenceLayer : public dtn::daemon::IndependentComponent, public ConvergenceLayer, public DiscoveryServiceProvider 00177 { 00178 friend class TCPConnection; 00179 public: 00185 TCPConvergenceLayer(); 00186 00190 virtual ~TCPConvergenceLayer(); 00191 00197 void bind(const ibrcommon::vinterface &net, int port); 00198 00203 void queue(const dtn::core::Node &n, const ConvergenceLayer::Job &job); 00204 00210 void open(const dtn::core::Node &n); 00211 00215 virtual const std::string getName() const; 00216 00221 dtn::core::Node::Protocol getDiscoveryProtocol() const; 00222 00226 void update(const ibrcommon::vinterface &iface, std::string &name, std::string &data) throw(dtn::net::DiscoveryServiceProvider::NoServiceHereException); 00227 00228 protected: 00229 bool __cancellation(); 00230 00231 void componentUp(); 00232 void componentRun(); 00233 void componentDown(); 00234 00235 private: 00239 void closeAll(); 00240 00245 void connectionUp(TCPConnection *conn); 00246 00252 void connectionDown(TCPConnection *conn); 00253 00254 static const int DEFAULT_PORT; 00255 bool _running; 00256 00257 ibrcommon::tcpserver _tcpsrv; 00258 00259 ibrcommon::Conditional _connections_cond; 00260 std::list<TCPConnection*> _connections; 00261 std::list<ibrcommon::vinterface> _interfaces; 00262 std::map<ibrcommon::vinterface, unsigned int> _portmap; 00263 }; 00264 } 00265 } 00266 00267 #endif /* TCPCONVERGENCELAYER_H_ */