IBR-DTNSuite 0.6

daemon/src/net/TCPConvergenceLayer.cpp

Go to the documentation of this file.
00001 /*
00002  * TCPConvergenceLayer.cpp
00003  *
00004  *  Created on: 05.08.2009
00005  *      Author: morgenro
00006  */
00007 
00008 #include "net/TCPConvergenceLayer.h"
00009 #include "net/ConnectionEvent.h"
00010 #include "routing/RequeueBundleEvent.h"
00011 #include "core/BundleCore.h"
00012 
00013 #include <ibrcommon/net/vinterface.h>
00014 #include <ibrcommon/thread/MutexLock.h>
00015 #include <ibrcommon/Logger.h>
00016 #include <ibrcommon/net/tcpclient.h>
00017 #include <ibrcommon/Logger.h>
00018 
00019 #include <streambuf>
00020 #include <functional>
00021 #include <list>
00022 #include <algorithm>
00023 
00024 using namespace ibrcommon;
00025 
00026 namespace dtn
00027 {
00028         namespace net
00029         {
00030                 /*
00031                  * class TCPConvergenceLayer
00032                  */
00033                 const int TCPConvergenceLayer::DEFAULT_PORT = 4556;
00034 
00035                 TCPConvergenceLayer::TCPConvergenceLayer()
00036                 {
00037                 }
00038 
00039                 TCPConvergenceLayer::~TCPConvergenceLayer()
00040                 {
00041                         join();
00042                 }
00043 
00044                 void TCPConvergenceLayer::bind(const ibrcommon::vinterface &net, int port)
00045                 {
00046                         _interfaces.push_back(net);
00047                         _tcpsrv.bind(net, port);
00048                         _portmap[net] = port;
00049                 }
00050 
00051                 dtn::core::Node::Protocol TCPConvergenceLayer::getDiscoveryProtocol() const
00052                 {
00053                         return dtn::core::Node::CONN_TCPIP;
00054                 }
00055 
00056                 void TCPConvergenceLayer::update(const ibrcommon::vinterface &iface, std::string &name, std::string &params) throw(dtn::net::DiscoveryServiceProvider::NoServiceHereException)
00057                 {
00058                         name = "tcpcl";
00059                         stringstream service;
00060 
00061                         // TODO: get the main address of this host, if no interface is specified
00062 
00063                         // search for the matching interface
00064                         for (std::list<ibrcommon::vinterface>::const_iterator it = _interfaces.begin(); it != _interfaces.end(); it++)
00065                         {
00066                                 const ibrcommon::vinterface &interface = *it;
00067                                 if (interface == iface)
00068                                 {
00069                                         try {
00070                                                 // get all addresses of this interface
00071                                                 std::list<vaddress> list = interface.getAddresses(ibrcommon::vaddress::VADDRESS_INET);
00072 
00073                                                 // if no address is returned... (goto catch block)
00074                                                 if (list.empty()) throw ibrcommon::Exception("no address found");
00075 
00076                                                 // fill in the ip address
00077                                                 service << "ip=" << list.front().get(false) << ";port=" << _portmap[iface] << ";";
00078                                         } catch (const ibrcommon::Exception&) {
00079                                                 // ... set the port only
00080                                                 service << "port=" << _portmap[iface] << ";";
00081                                         };
00082 
00083                                         params = service.str();
00084                                         return;
00085                                 }
00086                         }
00087 
00088                         throw dtn::net::DiscoveryServiceProvider::NoServiceHereException();
00089                 }
00090 
00091                 const std::string TCPConvergenceLayer::getName() const
00092                 {
00093                         return "TCPConvergenceLayer";
00094                 }
00095 
00096                 void TCPConvergenceLayer::open(const dtn::core::Node &n)
00097                 {
00098                         // search for an existing connection
00099                         ibrcommon::MutexLock l(_connections_cond);
00100 
00101                         for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00102                         {
00103                                 TCPConnection &conn = *(*iter);
00104 
00105                                 if (conn.match(n))
00106                                 {
00107                                         return;
00108                                 }
00109                         }
00110 
00111                         // create a connection
00112                         TCPConnection *conn = new TCPConnection(*this, n, dtn::core::BundleCore::local, 10);
00113 
00114                         // raise setup event
00115                         ConnectionEvent::raise(ConnectionEvent::CONNECTION_SETUP, n);
00116 
00117                         // add connection as pending
00118                         _connections.push_back( conn );
00119 
00120                         // start the ClientHandler (service)
00121                         conn->initialize();
00122 
00123                         // signal that there is a new connection
00124                         _connections_cond.signal(true);
00125 
00126                         return;
00127                 }
00128 
00129                 void TCPConvergenceLayer::queue(const dtn::core::Node &n, const ConvergenceLayer::Job &job)
00130                 {
00131                         // search for an existing connection
00132                         ibrcommon::MutexLock l(_connections_cond);
00133 
00134                         for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00135                         {
00136                                 TCPConnection &conn = *(*iter);
00137 
00138                                 if (conn.match(n))
00139                                 {
00140                                         conn.queue(job._bundle);
00141                                         IBRCOMMON_LOGGER_DEBUG(15) << "queued bundle to an existing tcp connection (" << conn.getNode().toString() << ")" << IBRCOMMON_LOGGER_ENDL;
00142 
00143                                         return;
00144                                 }
00145                         }
00146 
00147                         // create a connection
00148                         TCPConnection *conn = new TCPConnection(*this, n, dtn::core::BundleCore::local, 10);
00149 
00150                         // raise setup event
00151                         ConnectionEvent::raise(ConnectionEvent::CONNECTION_SETUP, n);
00152 
00153                         // add connection as pending
00154                         _connections.push_back( conn );
00155 
00156                         // start the ClientHandler (service)
00157                         conn->initialize();
00158 
00159                         // queue the bundle
00160                         conn->queue(job._bundle);
00161 
00162                         // signal that there is a new connection
00163                         _connections_cond.signal(true);
00164 
00165                         IBRCOMMON_LOGGER_DEBUG(15) << "queued bundle to an new tcp connection (" << conn->getNode().toString() << ")" << IBRCOMMON_LOGGER_ENDL;
00166                 }
00167 
00168                 void TCPConvergenceLayer::connectionUp(TCPConnection *conn)
00169                 {
00170                         ibrcommon::MutexLock l(_connections_cond);
00171                         for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00172                         {
00173                                 if (conn == (*iter))
00174                                 {
00175                                         // put pending connection to the active connections
00176                                         return;
00177                                 }
00178                         }
00179 
00180                         _connections.push_back( conn );
00181 
00182                         // signal that there is a new connection
00183                         _connections_cond.signal(true);
00184 
00185                         IBRCOMMON_LOGGER_DEBUG(15) << "tcp connection added (" << conn->getNode().toString() << ")" << IBRCOMMON_LOGGER_ENDL;
00186                 }
00187 
00188                 void TCPConvergenceLayer::connectionDown(TCPConnection *conn)
00189                 {
00190                         ibrcommon::MutexLock l(_connections_cond);
00191                         for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00192                         {
00193                                 if (conn == (*iter))
00194                                 {
00195                                         _connections.erase(iter);
00196                                         IBRCOMMON_LOGGER_DEBUG(15) << "tcp connection removed (" << conn->getNode().toString() << ")" << IBRCOMMON_LOGGER_ENDL;
00197 
00198                                         // signal that there is a connection less
00199                                         _connections_cond.signal(true);
00200                                         return;
00201                                 }
00202                         }
00203                 }
00204 
00205                 void TCPConvergenceLayer::componentRun()
00206                 {
00207                         try {
00208                                 while (true)
00209                                 {
00210                                         // wait for incoming connections
00211                                         tcpstream *stream = _tcpsrv.accept();
00212 
00213                                         // create a new TCPConnection and return the pointer
00214                                         TCPConnection *obj = new TCPConnection(*this, stream, dtn::core::BundleCore::local, 10);
00215 
00216                                         // add the connection to the connection list
00217                                         connectionUp(obj);
00218 
00219                                         // initialize the object
00220                                         obj->initialize();
00221 
00222                                         // breakpoint
00223                                         ibrcommon::Thread::yield();
00224                                 }
00225                         } catch (const std::exception&) {
00226                                 // ignore all errors
00227                                 return;
00228                         }
00229                 }
00230 
00231                 bool TCPConvergenceLayer::__cancellation()
00232                 {
00233                         _tcpsrv.shutdown();
00234                         _tcpsrv.close();
00235                         return true;
00236                 }
00237 
00238                 void TCPConvergenceLayer::closeAll()
00239                 {
00240                         // search for an existing connection
00241                         ibrcommon::MutexLock l(_connections_cond);
00242                         for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00243                         {
00244                                 TCPConnection &conn = *(*iter);
00245 
00246                                 // close the connection immediately
00247                                 conn.shutdown();
00248                         }
00249                 }
00250 
00251                 void TCPConvergenceLayer::componentUp()
00252                 {
00253                         // listen on the socket, max. 5 concurrent awaiting connections
00254                         _tcpsrv.listen(5);
00255                 }
00256 
00257                 void TCPConvergenceLayer::componentDown()
00258                 {
00259                         // shutdown the TCP server
00260                         _tcpsrv.shutdown();
00261                         _tcpsrv.close();
00262 
00263                         // close all active connections
00264                         closeAll();
00265 
00266                         // wait until all tcp connections are down
00267                         {
00268                                 ibrcommon::MutexLock l(_connections_cond);
00269                                 while (_connections.size() > 0) _connections_cond.wait();
00270                         }
00271                 }
00272         }
00273 }