• Main Page
  • Namespaces
  • Classes
  • Files
  • File List
  • File Members

daemon/src/net/TCPConvergenceLayer.cpp

Go to the documentation of this file.
00001 /*
00002  * TCPConvergenceLayer.cpp
00003  *
00004  *  Created on: 05.08.2009
00005  *      Author: morgenro
00006  */
00007 
00008 #include "net/TCPConvergenceLayer.h"
00009 #include "core/BundleCore.h"
00010 #include "core/GlobalEvent.h"
00011 #include "ibrcommon/net/NetInterface.h"
00012 #include "net/ConnectionEvent.h"
00013 #include <ibrcommon/thread/MutexLock.h>
00014 #include "routing/RequeueBundleEvent.h"
00015 #include <ibrcommon/Logger.h>
00016 #include <ibrcommon/net/tcpclient.h>
00017 #include <streambuf>
00018 
00019 #include <functional>
00020 #include <list>
00021 #include <algorithm>
00022 
00023 using namespace ibrcommon;
00024 
00025 namespace dtn
00026 {
00027         namespace net
00028         {
00029                 /*
00030                  * class TCPConvergenceLayer
00031                  */
00032                 const int TCPConvergenceLayer::DEFAULT_PORT = 4556;
00033 
00034                 TCPConvergenceLayer::TCPConvergenceLayer(ibrcommon::NetInterface net, int port)
00035                  : _net(net), _port(port), _server(net, port)
00036                 {
00037                 }
00038 
00039                 TCPConvergenceLayer::~TCPConvergenceLayer()
00040                 {
00041                 }
00042 
00043                 dtn::core::Node::Protocol TCPConvergenceLayer::getDiscoveryProtocol() const
00044                 {
00045                         return Node::CONN_TCPIP;
00046                 }
00047 
00048                 void TCPConvergenceLayer::initialize()
00049                 {
00050                         _server.initialize();
00051                 }
00052 
00053                 void TCPConvergenceLayer::startup()
00054                 {
00055                         _server.startup();
00056                 }
00057 
00058                 void TCPConvergenceLayer::terminate()
00059                 {
00060                         _server.terminate();
00061                 }
00062 
00063                 void TCPConvergenceLayer::update(std::string &name, std::string &params)
00064                 {
00065                         name = "tcpcl";
00066 
00067                         stringstream service; service << "ip=" << _net.getAddress() << ";port=" << _port << ";";
00068                         params = service.str();
00069                 }
00070 
00071                 bool TCPConvergenceLayer::onInterface(const ibrcommon::NetInterface &net) const
00072                 {
00073                         if (_net == net) return true;
00074                         return false;
00075                 }
00076 
00077                 void TCPConvergenceLayer::queue(const dtn::core::Node &n, const ConvergenceLayer::Job &job)
00078                 {
00079                         _server.queue(n, job);
00080                 }
00081 
00082                 const std::string TCPConvergenceLayer::getName() const
00083                 {
00084                         return "TCPConvergenceLayer";
00085                 }
00086 
00087                 void TCPConvergenceLayer::Server::queue(const dtn::core::Node &n, const ConvergenceLayer::Job &job)
00088                 {
00089                         {
00090                                 // search for an existing connection
00091                                 ibrcommon::MutexLock l(_lock);
00092 
00093                                 for (std::list<TCPConvergenceLayer::Server::Connection>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00094                                 {
00095                                         TCPConvergenceLayer::Server::Connection &conn = (*iter);
00096 
00097                                         if (conn.match(job._destination))
00098                                         {
00099                                                 if (!conn._active)
00100                                                 {
00101                                                         IBRCOMMON_LOGGER(warning) << "putting bundle on pending connection" << IBRCOMMON_LOGGER_ENDL;
00102                                                 }
00103 
00104                                                 (*conn).queue(job._bundle);
00105                                                 return;
00106                                         }
00107                                 }
00108                         }
00109 
00110                         try {
00111                                 // create a connection
00112                                 ibrcommon::tcpstream* stream = new ibrcommon::tcpclient(n.getAddress(), n.getPort());
00113 
00114                                 TCPConnection *conn = new TCPConnection((GenericServer<TCPConnection>&)*this, stream, dtn::core::BundleCore::local, 10);
00115 
00116                                 // raise setup event
00117                                 ConnectionEvent::raise(ConnectionEvent::CONNECTION_SETUP, n);
00118 
00119                                 ibrcommon::MutexLock l(_lock);
00120 
00121                                 // add connection as pending
00122                                 _connections.push_back( Connection( conn, n ) );
00123 
00124                                 // add the connection to the connection list
00125                                 add(conn);
00126 
00127                                 // start the ClientHandler (service)
00128                                 conn->initialize();
00129 
00130                                 conn->queue(job._bundle);
00131                         } catch (ibrcommon::SocketException ex) {
00132                                 // signal interruption of the transfer
00133                                 dtn::routing::RequeueBundleEvent::raise(job._destination, job._bundle);
00134                         }
00135                 }
00136 
00137                 TCPConvergenceLayer::Server::Server(ibrcommon::NetInterface net, int port)
00138                  : dtn::net::GenericServer<TCPConvergenceLayer::TCPConnection>(), _tcpsrv(net, port)
00139                 {
00140                         bindEvent(NodeEvent::className);
00141                 }
00142 
00143                 TCPConvergenceLayer::Server::~Server()
00144                 {
00145                         unbindEvent(NodeEvent::className);
00146                         join();
00147                 }
00148 
00149                 void TCPConvergenceLayer::Server::raiseEvent(const Event *evt)
00150                 {
00151                         const NodeEvent *node = dynamic_cast<const NodeEvent*>(evt);
00152 
00153                         if (node != NULL)
00154                         {
00155                                 if (node->getAction() == NODE_UNAVAILABLE)
00156                                 {
00157                                         // search for an existing connection
00158                                         ibrcommon::MutexLock l(_lock);
00159                                         for (std::list<TCPConvergenceLayer::Server::Connection>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00160                                         {
00161                                                 TCPConvergenceLayer::Server::Connection &conn = (*iter);
00162 
00163                                                 if (conn.match(*node))
00164                                                 {
00165                                                         // close the connection immediately
00166                                                         (*conn).shutdown();
00167                                                 }
00168                                         }
00169                                 }
00170                         }
00171                 }
00172 
00173                 const std::string TCPConvergenceLayer::Server::getName() const
00174                 {
00175                         return "TCPConvergenceLayer::Server";
00176                 }
00177 
00178                 void TCPConvergenceLayer::Server::connectionUp(TCPConvergenceLayer::TCPConnection *conn)
00179                 {
00180                         for (std::list<TCPConvergenceLayer::Server::Connection>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00181                         {
00182                                 TCPConvergenceLayer::Server::Connection &item = (*iter);
00183 
00184                                 if (conn == item._connection)
00185                                 {
00186                                         // put pending connection to the active connections
00187                                         item._active = true;
00188                                         return;
00189                                 }
00190                         }
00191 
00192                         _connections.push_back( Connection( conn, conn->getNode(), true ) );
00193                 }
00194 
00195                 void TCPConvergenceLayer::Server::connectionDown(TCPConvergenceLayer::TCPConnection *conn)
00196                 {
00197                         for (std::list<TCPConvergenceLayer::Server::Connection>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00198                         {
00199                                 TCPConvergenceLayer::Server::Connection &item = (*iter);
00200 
00201                                 if (conn == item._connection)
00202                                 {
00203                                         _connections.erase(iter);
00204                                         return;
00205                                 }
00206                         }
00207                 }
00208 
00209                 TCPConvergenceLayer::TCPConnection* TCPConvergenceLayer::Server::accept()
00210                 {
00211                         // wait for incoming connections
00212                         TCPConnection *conn = new TCPConnection(*this, _tcpsrv.accept(), dtn::core::BundleCore::local, 10);
00213                         {
00214                                 ibrcommon::MutexLock l(_lock);
00215 
00216                                 // add connection as pending
00217                                 _connections.push_back( Connection( conn, conn->getNode() ) );
00218                         }
00219 
00220                         return conn;
00221                 }
00222 
00223                 void TCPConvergenceLayer::Server::listen()
00224                 {
00225 
00226                 }
00227 
00228                 void TCPConvergenceLayer::Server::shutdown()
00229                 {
00230                         _tcpsrv.shutdown();
00231                         _tcpsrv.close();
00232                 }
00233 
00234                 TCPConvergenceLayer::Server::Connection::Connection(TCPConvergenceLayer::TCPConnection *conn, const dtn::core::Node &node, const bool &active)
00235                  : _connection(conn), _peer(node), _active(active)
00236                 {
00237 
00238                 }
00239 
00240                 TCPConvergenceLayer::Server::Connection::~Connection()
00241                 {
00242 
00243                 }
00244 
00245                 TCPConvergenceLayer::TCPConnection& TCPConvergenceLayer::Server::Connection::operator*()
00246                 {
00247                         return *_connection;
00248                 }
00249 
00250                 bool TCPConvergenceLayer::Server::Connection::match(const dtn::data::EID &destination) const
00251                 {
00252                         if (_peer.getURI() == destination.getNodeEID()) return true;
00253                         if (_connection->getNode().getURI() == destination.getNodeEID()) return true;
00254 
00255                         return false;
00256                 }
00257 
00258                 bool TCPConvergenceLayer::Server::Connection::match(const NodeEvent &evt) const
00259                 {
00260                         const dtn::core::Node &n = evt.getNode();
00261 
00262                         if (_peer.getURI() == n.getURI()) return true;
00263                         if (_connection->getNode().getURI() == n.getURI()) return true;
00264 
00265                         return false;
00266                 }
00267         }
00268 }

Generated on Thu Nov 11 2010 09:49:47 for IBR-DTNSuite by  doxygen 1.7.1