00001
00002
00003
00004
00005
00006
00007
00008 #include "net/TCPConvergenceLayer.h"
00009 #include "core/BundleCore.h"
00010 #include "core/GlobalEvent.h"
00011 #include "ibrcommon/net/NetInterface.h"
00012 #include "net/ConnectionEvent.h"
00013 #include <ibrcommon/thread/MutexLock.h>
00014 #include "routing/RequeueBundleEvent.h"
00015 #include <ibrcommon/Logger.h>
00016
00017 #include <sys/socket.h>
00018 #include <streambuf>
00019 #include <netinet/in.h>
00020 #include <arpa/inet.h>
00021
00022 #include <functional>
00023 #include <list>
00024 #include <algorithm>
00025
00026 using namespace ibrcommon;
00027
00028 namespace dtn
00029 {
00030 namespace net
00031 {
00032
00033
00034
00035 const int TCPConvergenceLayer::DEFAULT_PORT = 4556;
00036
00037 TCPConvergenceLayer::TCPConvergenceLayer(ibrcommon::NetInterface net, int port)
00038 : _net(net), _port(port), _server(net, port)
00039 {
00040 }
00041
00042 TCPConvergenceLayer::~TCPConvergenceLayer()
00043 {
00044 }
00045
00046 dtn::core::Node::Protocol TCPConvergenceLayer::getDiscoveryProtocol() const
00047 {
00048 return Node::CONN_TCPIP;
00049 }
00050
00051 void TCPConvergenceLayer::initialize()
00052 {
00053 _server.initialize();
00054 }
00055
00056 void TCPConvergenceLayer::startup()
00057 {
00058 _server.startup();
00059 }
00060
00061 void TCPConvergenceLayer::terminate()
00062 {
00063 _server.terminate();
00064 }
00065
00066 void TCPConvergenceLayer::update(std::string &name, std::string ¶ms)
00067 {
00068 name = "tcpcl";
00069
00070 stringstream service; service << "ip=" << _net.getAddress() << ";port=" << _port << ";";
00071 params = service.str();
00072 }
00073
00074 bool TCPConvergenceLayer::onInterface(const ibrcommon::NetInterface &net) const
00075 {
00076 if (_net.getInterface() == net.getInterface()) return true;
00077 return false;
00078 }
00079
00080 void TCPConvergenceLayer::queue(const dtn::core::Node &n, const ConvergenceLayer::Job &job)
00081 {
00082 _server.queue(n, job);
00083 }
00084
00085 void TCPConvergenceLayer::Server::queue(const dtn::core::Node &n, const ConvergenceLayer::Job &job)
00086 {
00087 {
00088
00089 ibrcommon::MutexLock l(_connection_lock);
00090
00091 for (std::list<TCPConvergenceLayer::Server::Connection>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00092 {
00093 TCPConvergenceLayer::Server::Connection &conn = (*iter);
00094
00095
00096 if ((*conn).free()) continue;
00097
00098 if (conn.match(job._destination))
00099 {
00100 if (!conn._active)
00101 {
00102 IBRCOMMON_LOGGER(warning) << "putting bundle on pending connection" << IBRCOMMON_LOGGER_ENDL;
00103 }
00104
00105 (*conn).queue(job._bundle);
00106 return;
00107 }
00108 }
00109 }
00110
00111 try {
00112 TCPConvergenceLayer::TCPConnection *conn = getConnection(n);
00113
00114
00115 add(conn);
00116
00117 conn->queue(job._bundle);
00118 } catch (ibrcommon::SocketException ex) {
00119
00120 dtn::routing::RequeueBundleEvent::raise(job._destination, job._bundle);
00121 }
00122 }
00123
00124 TCPConvergenceLayer::TCPConnection* TCPConvergenceLayer::Server::getConnection(const dtn::core::Node &n)
00125 {
00126 struct sockaddr_in sock_address;
00127 int sock = socket(AF_INET, SOCK_STREAM, 0);
00128
00129 if (sock <= 0)
00130 {
00131
00132 throw ibrcommon::SocketException("Could not create a socket.");
00133 }
00134
00135 sock_address.sin_family = AF_INET;
00136 sock_address.sin_addr.s_addr = inet_addr(n.getAddress().c_str());
00137 sock_address.sin_port = htons(n.getPort());
00138
00139 if (connect ( sock, (struct sockaddr *) &sock_address, sizeof (sock_address)) != 0)
00140 {
00141
00142 throw ibrcommon::SocketException("Could not connect to the server.");
00143 }
00144
00145
00146 TCPConnection *conn = new TCPConnection(new ibrcommon::tcpstream(sock));
00147
00148
00149 _connections.push_back( Connection( conn, n ) );
00150
00151
00152 ConnectionEvent::raise(ConnectionEvent::CONNECTION_SETUP, n);
00153
00154
00155 conn->initialize(dtn::core::BundleCore::local, 10);
00156
00157 return conn;
00158 }
00159
00160 TCPConvergenceLayer::Server::Server(ibrcommon::NetInterface net, int port)
00161 : dtn::net::GenericServer<TCPConvergenceLayer::TCPConnection>(), _tcpsrv(net, port)
00162 {
00163 bindEvent(NodeEvent::className);
00164 }
00165
00166 TCPConvergenceLayer::Server::~Server()
00167 {
00168 unbindEvent(NodeEvent::className);
00169 }
00170
00171 void TCPConvergenceLayer::Server::raiseEvent(const Event *evt)
00172 {
00173 const NodeEvent *node = dynamic_cast<const NodeEvent*>(evt);
00174
00175 if (node != NULL)
00176 {
00177 if (node->getAction() == NODE_UNAVAILABLE)
00178 {
00179
00180 ibrcommon::MutexLock l(_connection_lock);
00181 for (std::list<TCPConvergenceLayer::Server::Connection>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00182 {
00183 TCPConvergenceLayer::Server::Connection &conn = (*iter);
00184
00185
00186 if ( (*conn).free()) continue;
00187
00188 if (conn.match(*node))
00189 {
00190 (*conn).shutdown();
00191 }
00192 }
00193 }
00194 }
00195 }
00196
00197 void TCPConvergenceLayer::Server::connectionUp(TCPConvergenceLayer::TCPConnection *conn)
00198 {
00199 ibrcommon::MutexLock l(_connection_lock);
00200
00201 for (std::list<TCPConvergenceLayer::Server::Connection>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00202 {
00203 TCPConvergenceLayer::Server::Connection &item = (*iter);
00204
00205 if (conn == item._connection)
00206 {
00207
00208 item._active = true;
00209 return;
00210 }
00211 }
00212
00213 _connections.push_back( Connection( conn, conn->getNode(), true ) );
00214 }
00215
00216 void TCPConvergenceLayer::Server::connectionDown(TCPConvergenceLayer::TCPConnection *conn)
00217 {
00218 ibrcommon::MutexLock l(_connection_lock);
00219 for (std::list<TCPConvergenceLayer::Server::Connection>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00220 {
00221 TCPConvergenceLayer::Server::Connection &item = (*iter);
00222
00223 if (conn == item._connection)
00224 {
00225 _connections.erase(iter);
00226 return;
00227 }
00228 }
00229 }
00230
00231 TCPConvergenceLayer::TCPConnection* TCPConvergenceLayer::Server::accept()
00232 {
00233 try {
00234
00235 TCPConnection *conn = new TCPConnection(_tcpsrv.accept());
00236
00237 {
00238 ibrcommon::MutexLock l(_connection_lock);
00239
00240
00241 _connections.push_back( Connection( conn, conn->getNode() ) );
00242 }
00243
00244 conn->initialize(dtn::core::BundleCore::local, 10);
00245 return conn;
00246 } catch (ibrcommon::SocketException ex) {
00247
00248 return NULL;
00249 }
00250 }
00251
00252 void TCPConvergenceLayer::Server::listen()
00253 {
00254
00255 }
00256
00257 void TCPConvergenceLayer::Server::shutdown()
00258 {
00259 shutdownAll();
00260 }
00261
00262 TCPConvergenceLayer::Server::Connection::Connection(TCPConvergenceLayer::TCPConnection *conn, const dtn::core::Node &node, const bool &active)
00263 : _connection(conn), _peer(node), _active(active)
00264 {
00265
00266 }
00267
00268 TCPConvergenceLayer::Server::Connection::~Connection()
00269 {
00270
00271 }
00272
00273 TCPConvergenceLayer::TCPConnection& TCPConvergenceLayer::Server::Connection::operator*()
00274 {
00275 return *_connection;
00276 }
00277
00278 bool TCPConvergenceLayer::Server::Connection::match(const dtn::data::EID &destination) const
00279 {
00280 if (_peer.getURI() == destination.getNodeEID()) return true;
00281 if (_connection->getNode().getURI() == destination.getNodeEID()) return true;
00282
00283 return false;
00284 }
00285
00286 bool TCPConvergenceLayer::Server::Connection::match(const NodeEvent &evt) const
00287 {
00288 const dtn::core::Node &n = evt.getNode();
00289
00290 if (_peer.getURI() == n.getURI()) return true;
00291 if (_connection->getNode().getURI() == n.getURI()) return true;
00292
00293 return false;
00294 }
00295 }
00296 }