Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #include "net/TCPConvergenceLayer.h"
00009 #include "core/BundleCore.h"
00010 #include "core/GlobalEvent.h"
00011 #include "ibrcommon/net/NetInterface.h"
00012 #include "net/ConnectionEvent.h"
00013 #include <ibrcommon/thread/MutexLock.h>
00014 #include "routing/RequeueBundleEvent.h"
00015 #include <ibrcommon/Logger.h>
00016 #include <ibrcommon/net/tcpclient.h>
00017 #include <streambuf>
00018
00019 #include <functional>
00020 #include <list>
00021 #include <algorithm>
00022
00023 using namespace ibrcommon;
00024
00025 namespace dtn
00026 {
00027 namespace net
00028 {
00029
00030
00031
00032 const int TCPConvergenceLayer::DEFAULT_PORT = 4556;
00033
00034 TCPConvergenceLayer::TCPConvergenceLayer(ibrcommon::NetInterface net, int port)
00035 : _net(net), _port(port), _server(net, port)
00036 {
00037 }
00038
00039 TCPConvergenceLayer::~TCPConvergenceLayer()
00040 {
00041 }
00042
00043 dtn::core::Node::Protocol TCPConvergenceLayer::getDiscoveryProtocol() const
00044 {
00045 return Node::CONN_TCPIP;
00046 }
00047
00048 void TCPConvergenceLayer::initialize()
00049 {
00050 _server.initialize();
00051 }
00052
00053 void TCPConvergenceLayer::startup()
00054 {
00055 _server.startup();
00056 }
00057
00058 void TCPConvergenceLayer::terminate()
00059 {
00060 _server.terminate();
00061 }
00062
00063 void TCPConvergenceLayer::update(std::string &name, std::string ¶ms)
00064 {
00065 name = "tcpcl";
00066
00067 stringstream service; service << "ip=" << _net.getAddress() << ";port=" << _port << ";";
00068 params = service.str();
00069 }
00070
00071 bool TCPConvergenceLayer::onInterface(const ibrcommon::NetInterface &net) const
00072 {
00073 if (_net == net) return true;
00074 return false;
00075 }
00076
00077 void TCPConvergenceLayer::queue(const dtn::core::Node &n, const ConvergenceLayer::Job &job)
00078 {
00079 _server.queue(n, job);
00080 }
00081
00082 const std::string TCPConvergenceLayer::getName() const
00083 {
00084 return "TCPConvergenceLayer";
00085 }
00086
00087 void TCPConvergenceLayer::Server::queue(const dtn::core::Node &n, const ConvergenceLayer::Job &job)
00088 {
00089 {
00090
00091 ibrcommon::MutexLock l(_lock);
00092
00093 for (std::list<TCPConvergenceLayer::Server::Connection>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00094 {
00095 TCPConvergenceLayer::Server::Connection &conn = (*iter);
00096
00097 if (conn.match(job._destination))
00098 {
00099 if (!conn._active)
00100 {
00101 IBRCOMMON_LOGGER(warning) << "putting bundle on pending connection" << IBRCOMMON_LOGGER_ENDL;
00102 }
00103
00104 (*conn).queue(job._bundle);
00105 return;
00106 }
00107 }
00108 }
00109
00110 try {
00111
00112 ibrcommon::tcpstream* stream = new ibrcommon::tcpclient(n.getAddress(), n.getPort());
00113
00114 TCPConnection *conn = new TCPConnection((GenericServer<TCPConnection>&)*this, stream, dtn::core::BundleCore::local, 10);
00115
00116
00117 ConnectionEvent::raise(ConnectionEvent::CONNECTION_SETUP, n);
00118
00119 ibrcommon::MutexLock l(_lock);
00120
00121
00122 _connections.push_back( Connection( conn, n ) );
00123
00124
00125 add(conn);
00126
00127
00128 conn->initialize();
00129
00130 conn->queue(job._bundle);
00131 } catch (ibrcommon::SocketException ex) {
00132
00133 dtn::routing::RequeueBundleEvent::raise(job._destination, job._bundle);
00134 }
00135 }
00136
00137 TCPConvergenceLayer::Server::Server(ibrcommon::NetInterface net, int port)
00138 : dtn::net::GenericServer<TCPConvergenceLayer::TCPConnection>(), _tcpsrv(net, port)
00139 {
00140 bindEvent(NodeEvent::className);
00141 }
00142
00143 TCPConvergenceLayer::Server::~Server()
00144 {
00145 unbindEvent(NodeEvent::className);
00146 join();
00147 }
00148
00149 void TCPConvergenceLayer::Server::raiseEvent(const Event *evt)
00150 {
00151 const NodeEvent *node = dynamic_cast<const NodeEvent*>(evt);
00152
00153 if (node != NULL)
00154 {
00155 if (node->getAction() == NODE_UNAVAILABLE)
00156 {
00157
00158 ibrcommon::MutexLock l(_lock);
00159 for (std::list<TCPConvergenceLayer::Server::Connection>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00160 {
00161 TCPConvergenceLayer::Server::Connection &conn = (*iter);
00162
00163 if (conn.match(*node))
00164 {
00165
00166 (*conn).shutdown();
00167 }
00168 }
00169 }
00170 }
00171 }
00172
00173 const std::string TCPConvergenceLayer::Server::getName() const
00174 {
00175 return "TCPConvergenceLayer::Server";
00176 }
00177
00178 void TCPConvergenceLayer::Server::connectionUp(TCPConvergenceLayer::TCPConnection *conn)
00179 {
00180 for (std::list<TCPConvergenceLayer::Server::Connection>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00181 {
00182 TCPConvergenceLayer::Server::Connection &item = (*iter);
00183
00184 if (conn == item._connection)
00185 {
00186
00187 item._active = true;
00188 return;
00189 }
00190 }
00191
00192 _connections.push_back( Connection( conn, conn->getNode(), true ) );
00193 }
00194
00195 void TCPConvergenceLayer::Server::connectionDown(TCPConvergenceLayer::TCPConnection *conn)
00196 {
00197 for (std::list<TCPConvergenceLayer::Server::Connection>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00198 {
00199 TCPConvergenceLayer::Server::Connection &item = (*iter);
00200
00201 if (conn == item._connection)
00202 {
00203 _connections.erase(iter);
00204 return;
00205 }
00206 }
00207 }
00208
00209 TCPConvergenceLayer::TCPConnection* TCPConvergenceLayer::Server::accept()
00210 {
00211
00212 TCPConnection *conn = new TCPConnection(*this, _tcpsrv.accept(), dtn::core::BundleCore::local, 10);
00213 {
00214 ibrcommon::MutexLock l(_lock);
00215
00216
00217 _connections.push_back( Connection( conn, conn->getNode() ) );
00218 }
00219
00220 return conn;
00221 }
00222
00223 void TCPConvergenceLayer::Server::listen()
00224 {
00225
00226 }
00227
00228 void TCPConvergenceLayer::Server::shutdown()
00229 {
00230 _tcpsrv.shutdown();
00231 _tcpsrv.close();
00232 }
00233
00234 TCPConvergenceLayer::Server::Connection::Connection(TCPConvergenceLayer::TCPConnection *conn, const dtn::core::Node &node, const bool &active)
00235 : _connection(conn), _peer(node), _active(active)
00236 {
00237
00238 }
00239
00240 TCPConvergenceLayer::Server::Connection::~Connection()
00241 {
00242
00243 }
00244
00245 TCPConvergenceLayer::TCPConnection& TCPConvergenceLayer::Server::Connection::operator*()
00246 {
00247 return *_connection;
00248 }
00249
00250 bool TCPConvergenceLayer::Server::Connection::match(const dtn::data::EID &destination) const
00251 {
00252 if (_peer.getURI() == destination.getNodeEID()) return true;
00253 if (_connection->getNode().getURI() == destination.getNodeEID()) return true;
00254
00255 return false;
00256 }
00257
00258 bool TCPConvergenceLayer::Server::Connection::match(const NodeEvent &evt) const
00259 {
00260 const dtn::core::Node &n = evt.getNode();
00261
00262 if (_peer.getURI() == n.getURI()) return true;
00263 if (_connection->getNode().getURI() == n.getURI()) return true;
00264
00265 return false;
00266 }
00267 }
00268 }