Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #include "net/TCPConvergenceLayer.h"
00009 #include "core/BundleCore.h"
00010 #include "core/GlobalEvent.h"
00011 #include <ibrcommon/net/vinterface.h>
00012 #include "net/ConnectionEvent.h"
00013 #include <ibrcommon/thread/MutexLock.h>
00014 #include "routing/RequeueBundleEvent.h"
00015 #include <ibrcommon/Logger.h>
00016 #include <ibrcommon/net/tcpclient.h>
00017 #include <ibrcommon/Logger.h>
00018 #include <streambuf>
00019
00020 #include <functional>
00021 #include <list>
00022 #include <algorithm>
00023
00024 using namespace ibrcommon;
00025
00026 namespace dtn
00027 {
00028 namespace net
00029 {
00030
00031
00032
00033 const int TCPConvergenceLayer::DEFAULT_PORT = 4556;
00034
00035 TCPConvergenceLayer::TCPConvergenceLayer()
00036 {
00037 bindEvent(NodeEvent::className);
00038 }
00039
00040 TCPConvergenceLayer::~TCPConvergenceLayer()
00041 {
00042 unbindEvent(NodeEvent::className);
00043 join();
00044 }
00045
00046 void TCPConvergenceLayer::bind(const ibrcommon::vinterface &net, int port)
00047 {
00048 _interfaces.push_back(net);
00049 _tcpsrv.bind(net, port);
00050 _portmap[net] = port;
00051 }
00052
00053 dtn::core::Node::Protocol TCPConvergenceLayer::getDiscoveryProtocol() const
00054 {
00055 return Node::CONN_TCPIP;
00056 }
00057
00058 void TCPConvergenceLayer::update(const ibrcommon::vinterface &iface, std::string &name, std::string ¶ms) throw(dtn::net::DiscoveryServiceProvider::NoServiceHereException)
00059 {
00060 name = "tcpcl";
00061 stringstream service;
00062
00063
00064
00065
00066 for (std::list<ibrcommon::vinterface>::const_iterator it = _interfaces.begin(); it != _interfaces.end(); it++)
00067 {
00068 const ibrcommon::vinterface &interface = *it;
00069 if (interface == iface)
00070 {
00071 try {
00072
00073 std::list<vaddress> list = interface.getAddresses(ibrcommon::vaddress::VADDRESS_INET);
00074
00075
00076 if (list.empty()) throw ibrcommon::Exception("no address found");
00077
00078
00079 service << "ip=" << list.front().get(false) << ";port=" << _portmap[iface] << ";";
00080 } catch (const ibrcommon::Exception&) {
00081
00082 service << "port=" << _portmap[iface] << ";";
00083 };
00084
00085 params = service.str();
00086 return;
00087 }
00088 }
00089
00090 throw dtn::net::DiscoveryServiceProvider::NoServiceHereException();
00091 }
00092
00093 const std::string TCPConvergenceLayer::getName() const
00094 {
00095 return "TCPConvergenceLayer";
00096 }
00097
00098 void TCPConvergenceLayer::open(const dtn::core::Node &n)
00099 {
00100
00101 ibrcommon::MutexLock l(_connections_cond);
00102
00103 for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00104 {
00105 TCPConnection &conn = *(*iter);
00106
00107 if (conn.match(n))
00108 {
00109 return;
00110 }
00111 }
00112
00113
00114 TCPConnection *conn = new TCPConnection(*this, n, dtn::core::BundleCore::local, 10);
00115
00116
00117 ConnectionEvent::raise(ConnectionEvent::CONNECTION_SETUP, n);
00118
00119
00120 _connections.push_back( conn );
00121
00122
00123 conn->initialize();
00124
00125
00126 _connections_cond.signal(true);
00127
00128 return;
00129 }
00130
00131 void TCPConvergenceLayer::queue(const dtn::core::Node &n, const ConvergenceLayer::Job &job)
00132 {
00133
00134 ibrcommon::MutexLock l(_connections_cond);
00135
00136 for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00137 {
00138 TCPConnection &conn = *(*iter);
00139
00140 if (conn.match(n))
00141 {
00142 conn.queue(job._bundle);
00143 IBRCOMMON_LOGGER_DEBUG(15) << "queued bundle to an existing tcp connection (" << conn.getNode().getEID().getString() << ")" << IBRCOMMON_LOGGER_ENDL;
00144
00145 return;
00146 }
00147 }
00148
00149
00150 TCPConnection *conn = new TCPConnection(*this, n, dtn::core::BundleCore::local, 10);
00151
00152
00153 ConnectionEvent::raise(ConnectionEvent::CONNECTION_SETUP, n);
00154
00155
00156 _connections.push_back( conn );
00157
00158
00159 conn->initialize();
00160
00161
00162 conn->queue(job._bundle);
00163
00164
00165 _connections_cond.signal(true);
00166
00167 IBRCOMMON_LOGGER_DEBUG(15) << "queued bundle to an new tcp connection (" << conn->getNode().getEID().getString() << ")" << IBRCOMMON_LOGGER_ENDL;
00168 }
00169
00170 void TCPConvergenceLayer::raiseEvent(const Event *evt)
00171 {
00172 const NodeEvent *node = dynamic_cast<const NodeEvent*>(evt);
00173
00174 if (node != NULL)
00175 {
00176 if (node->getAction() == NODE_UNAVAILABLE)
00177 {
00178
00179 ibrcommon::MutexLock l(_connections_cond);
00180 for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00181 {
00182 TCPConnection &conn = *(*iter);
00183
00184 if (conn.match(*node))
00185 {
00186
00187 conn.shutdown();
00188 }
00189 }
00190 }
00191 }
00192 }
00193
00194 void TCPConvergenceLayer::connectionUp(TCPConnection *conn)
00195 {
00196 ibrcommon::MutexLock l(_connections_cond);
00197 for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00198 {
00199 if (conn == (*iter))
00200 {
00201
00202 return;
00203 }
00204 }
00205
00206 _connections.push_back( conn );
00207
00208
00209 _connections_cond.signal(true);
00210
00211 IBRCOMMON_LOGGER_DEBUG(15) << "tcp connection added (" << conn->getNode().getEID().getString() << ")" << IBRCOMMON_LOGGER_ENDL;
00212 }
00213
00214 void TCPConvergenceLayer::connectionDown(TCPConnection *conn)
00215 {
00216 ibrcommon::MutexLock l(_connections_cond);
00217 for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00218 {
00219 if (conn == (*iter))
00220 {
00221 _connections.erase(iter);
00222 IBRCOMMON_LOGGER_DEBUG(15) << "tcp connection removed (" << conn->getNode().getEID().getString() << ")" << IBRCOMMON_LOGGER_ENDL;
00223
00224
00225 _connections_cond.signal(true);
00226 return;
00227 }
00228 }
00229 }
00230
00231 void TCPConvergenceLayer::componentRun()
00232 {
00233
00234 Thread::enable_interruption();
00235
00236 try {
00237 while (true)
00238 {
00239
00240 tcpstream *stream = _tcpsrv.accept();
00241
00242
00243 TCPConnection *obj = new TCPConnection(*this, stream, dtn::core::BundleCore::local, 10);
00244
00245
00246 connectionUp(obj);
00247
00248
00249 obj->initialize();
00250
00251
00252 ibrcommon::Thread::yield();
00253 }
00254 } catch (const std::exception&) {
00255
00256 return;
00257 }
00258 }
00259
00260 bool TCPConvergenceLayer::__cancellation()
00261 {
00262 _tcpsrv.shutdown();
00263 _tcpsrv.close();
00264 Thread::interrupt();
00265 return true;
00266 }
00267
00268 void TCPConvergenceLayer::closeAll()
00269 {
00270
00271 ibrcommon::MutexLock l(_connections_cond);
00272 for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00273 {
00274 TCPConnection &conn = *(*iter);
00275
00276
00277 conn.shutdown();
00278 }
00279 }
00280
00281 void TCPConvergenceLayer::componentUp()
00282 {
00283
00284 _tcpsrv.listen(5);
00285 }
00286
00287 void TCPConvergenceLayer::componentDown()
00288 {
00289
00290 _tcpsrv.shutdown();
00291 _tcpsrv.close();
00292
00293
00294 closeAll();
00295
00296
00297 {
00298 ibrcommon::MutexLock l(_connections_cond);
00299 while (_connections.size() > 0) _connections_cond.wait();
00300 }
00301
00302
00303 Thread::interrupt();
00304 }
00305 }
00306 }