|
IBR-DTNSuite 0.6
|
00001 /* 00002 * TCPConvergenceLayer.cpp 00003 * 00004 * Created on: 05.08.2009 00005 * Author: morgenro 00006 */ 00007 00008 #include "net/TCPConvergenceLayer.h" 00009 #include "net/ConnectionEvent.h" 00010 #include "routing/RequeueBundleEvent.h" 00011 #include "core/BundleCore.h" 00012 00013 #include <ibrcommon/net/vinterface.h> 00014 #include <ibrcommon/thread/MutexLock.h> 00015 #include <ibrcommon/Logger.h> 00016 #include <ibrcommon/net/tcpclient.h> 00017 #include <ibrcommon/Logger.h> 00018 00019 #include <streambuf> 00020 #include <functional> 00021 #include <list> 00022 #include <algorithm> 00023 00024 using namespace ibrcommon; 00025 00026 namespace dtn 00027 { 00028 namespace net 00029 { 00030 /* 00031 * class TCPConvergenceLayer 00032 */ 00033 const int TCPConvergenceLayer::DEFAULT_PORT = 4556; 00034 00035 TCPConvergenceLayer::TCPConvergenceLayer() 00036 { 00037 } 00038 00039 TCPConvergenceLayer::~TCPConvergenceLayer() 00040 { 00041 join(); 00042 } 00043 00044 void TCPConvergenceLayer::bind(const ibrcommon::vinterface &net, int port) 00045 { 00046 _interfaces.push_back(net); 00047 _tcpsrv.bind(net, port); 00048 _portmap[net] = port; 00049 } 00050 00051 dtn::core::Node::Protocol TCPConvergenceLayer::getDiscoveryProtocol() const 00052 { 00053 return dtn::core::Node::CONN_TCPIP; 00054 } 00055 00056 void TCPConvergenceLayer::update(const ibrcommon::vinterface &iface, std::string &name, std::string ¶ms) throw(dtn::net::DiscoveryServiceProvider::NoServiceHereException) 00057 { 00058 name = "tcpcl"; 00059 stringstream service; 00060 00061 // TODO: get the main address of this host, if no interface is specified 00062 00063 // search for the matching interface 00064 for (std::list<ibrcommon::vinterface>::const_iterator it = _interfaces.begin(); it != _interfaces.end(); it++) 00065 { 00066 const ibrcommon::vinterface &interface = *it; 00067 if (interface == iface) 00068 { 00069 try { 00070 // get all addresses of this interface 00071 std::list<vaddress> list = interface.getAddresses(ibrcommon::vaddress::VADDRESS_INET); 00072 00073 // if no address is returned... (goto catch block) 00074 if (list.empty()) throw ibrcommon::Exception("no address found"); 00075 00076 // fill in the ip address 00077 service << "ip=" << list.front().get(false) << ";port=" << _portmap[iface] << ";"; 00078 } catch (const ibrcommon::Exception&) { 00079 // ... set the port only 00080 service << "port=" << _portmap[iface] << ";"; 00081 }; 00082 00083 params = service.str(); 00084 return; 00085 } 00086 } 00087 00088 throw dtn::net::DiscoveryServiceProvider::NoServiceHereException(); 00089 } 00090 00091 const std::string TCPConvergenceLayer::getName() const 00092 { 00093 return "TCPConvergenceLayer"; 00094 } 00095 00096 void TCPConvergenceLayer::open(const dtn::core::Node &n) 00097 { 00098 // search for an existing connection 00099 ibrcommon::MutexLock l(_connections_cond); 00100 00101 for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++) 00102 { 00103 TCPConnection &conn = *(*iter); 00104 00105 if (conn.match(n)) 00106 { 00107 return; 00108 } 00109 } 00110 00111 // create a connection 00112 TCPConnection *conn = new TCPConnection(*this, n, dtn::core::BundleCore::local, 10); 00113 00114 // raise setup event 00115 ConnectionEvent::raise(ConnectionEvent::CONNECTION_SETUP, n); 00116 00117 // add connection as pending 00118 _connections.push_back( conn ); 00119 00120 // start the ClientHandler (service) 00121 conn->initialize(); 00122 00123 // signal that there is a new connection 00124 _connections_cond.signal(true); 00125 00126 return; 00127 } 00128 00129 void TCPConvergenceLayer::queue(const dtn::core::Node &n, const ConvergenceLayer::Job &job) 00130 { 00131 // search for an existing connection 00132 ibrcommon::MutexLock l(_connections_cond); 00133 00134 for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++) 00135 { 00136 TCPConnection &conn = *(*iter); 00137 00138 if (conn.match(n)) 00139 { 00140 conn.queue(job._bundle); 00141 IBRCOMMON_LOGGER_DEBUG(15) << "queued bundle to an existing tcp connection (" << conn.getNode().toString() << ")" << IBRCOMMON_LOGGER_ENDL; 00142 00143 return; 00144 } 00145 } 00146 00147 // create a connection 00148 TCPConnection *conn = new TCPConnection(*this, n, dtn::core::BundleCore::local, 10); 00149 00150 // raise setup event 00151 ConnectionEvent::raise(ConnectionEvent::CONNECTION_SETUP, n); 00152 00153 // add connection as pending 00154 _connections.push_back( conn ); 00155 00156 // start the ClientHandler (service) 00157 conn->initialize(); 00158 00159 // queue the bundle 00160 conn->queue(job._bundle); 00161 00162 // signal that there is a new connection 00163 _connections_cond.signal(true); 00164 00165 IBRCOMMON_LOGGER_DEBUG(15) << "queued bundle to an new tcp connection (" << conn->getNode().toString() << ")" << IBRCOMMON_LOGGER_ENDL; 00166 } 00167 00168 void TCPConvergenceLayer::connectionUp(TCPConnection *conn) 00169 { 00170 ibrcommon::MutexLock l(_connections_cond); 00171 for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++) 00172 { 00173 if (conn == (*iter)) 00174 { 00175 // put pending connection to the active connections 00176 return; 00177 } 00178 } 00179 00180 _connections.push_back( conn ); 00181 00182 // signal that there is a new connection 00183 _connections_cond.signal(true); 00184 00185 IBRCOMMON_LOGGER_DEBUG(15) << "tcp connection added (" << conn->getNode().toString() << ")" << IBRCOMMON_LOGGER_ENDL; 00186 } 00187 00188 void TCPConvergenceLayer::connectionDown(TCPConnection *conn) 00189 { 00190 ibrcommon::MutexLock l(_connections_cond); 00191 for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++) 00192 { 00193 if (conn == (*iter)) 00194 { 00195 _connections.erase(iter); 00196 IBRCOMMON_LOGGER_DEBUG(15) << "tcp connection removed (" << conn->getNode().toString() << ")" << IBRCOMMON_LOGGER_ENDL; 00197 00198 // signal that there is a connection less 00199 _connections_cond.signal(true); 00200 return; 00201 } 00202 } 00203 } 00204 00205 void TCPConvergenceLayer::componentRun() 00206 { 00207 try { 00208 while (true) 00209 { 00210 // wait for incoming connections 00211 tcpstream *stream = _tcpsrv.accept(); 00212 00213 // create a new TCPConnection and return the pointer 00214 TCPConnection *obj = new TCPConnection(*this, stream, dtn::core::BundleCore::local, 10); 00215 00216 // add the connection to the connection list 00217 connectionUp(obj); 00218 00219 // initialize the object 00220 obj->initialize(); 00221 00222 // breakpoint 00223 ibrcommon::Thread::yield(); 00224 } 00225 } catch (const std::exception&) { 00226 // ignore all errors 00227 return; 00228 } 00229 } 00230 00231 bool TCPConvergenceLayer::__cancellation() 00232 { 00233 _tcpsrv.shutdown(); 00234 _tcpsrv.close(); 00235 return true; 00236 } 00237 00238 void TCPConvergenceLayer::closeAll() 00239 { 00240 // search for an existing connection 00241 ibrcommon::MutexLock l(_connections_cond); 00242 for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++) 00243 { 00244 TCPConnection &conn = *(*iter); 00245 00246 // close the connection immediately 00247 conn.shutdown(); 00248 } 00249 } 00250 00251 void TCPConvergenceLayer::componentUp() 00252 { 00253 // listen on the socket, max. 5 concurrent awaiting connections 00254 _tcpsrv.listen(5); 00255 } 00256 00257 void TCPConvergenceLayer::componentDown() 00258 { 00259 // shutdown the TCP server 00260 _tcpsrv.shutdown(); 00261 _tcpsrv.close(); 00262 00263 // close all active connections 00264 closeAll(); 00265 00266 // wait until all tcp connections are down 00267 { 00268 ibrcommon::MutexLock l(_connections_cond); 00269 while (_connections.size() > 0) _connections_cond.wait(); 00270 } 00271 } 00272 } 00273 }