• Main Page
  • Namespaces
  • Classes
  • Files
  • File List
  • File Members

daemon/src/net/TCPConvergenceLayer.cpp

Go to the documentation of this file.
00001 /*
00002  * TCPConvergenceLayer.cpp
00003  *
00004  *  Created on: 05.08.2009
00005  *      Author: morgenro
00006  */
00007 
00008 #include "net/TCPConvergenceLayer.h"
00009 #include "core/BundleCore.h"
00010 #include "core/GlobalEvent.h"
00011 #include <ibrcommon/net/vinterface.h>
00012 #include "net/ConnectionEvent.h"
00013 #include <ibrcommon/thread/MutexLock.h>
00014 #include "routing/RequeueBundleEvent.h"
00015 #include <ibrcommon/Logger.h>
00016 #include <ibrcommon/net/tcpclient.h>
00017 #include <ibrcommon/Logger.h>
00018 #include <streambuf>
00019 
00020 #include <functional>
00021 #include <list>
00022 #include <algorithm>
00023 
00024 using namespace ibrcommon;
00025 
00026 namespace dtn
00027 {
00028         namespace net
00029         {
00030                 /*
00031                  * class TCPConvergenceLayer
00032                  */
00033                 const int TCPConvergenceLayer::DEFAULT_PORT = 4556;
00034 
00035                 TCPConvergenceLayer::TCPConvergenceLayer()
00036                 {
00037                         bindEvent(NodeEvent::className);
00038                 }
00039 
00040                 TCPConvergenceLayer::~TCPConvergenceLayer()
00041                 {
00042                         unbindEvent(NodeEvent::className);
00043                         join();
00044                 }
00045 
00046                 void TCPConvergenceLayer::bind(const ibrcommon::vinterface &net, int port)
00047                 {
00048                         _interfaces.push_back(net);
00049                         _tcpsrv.bind(net, port);
00050                         _portmap[net] = port;
00051                 }
00052 
00053                 dtn::core::Node::Protocol TCPConvergenceLayer::getDiscoveryProtocol() const
00054                 {
00055                         return Node::CONN_TCPIP;
00056                 }
00057 
00058                 void TCPConvergenceLayer::update(const ibrcommon::vinterface &iface, std::string &name, std::string &params) throw(dtn::net::DiscoveryServiceProvider::NoServiceHereException)
00059                 {
00060                         name = "tcpcl";
00061                         stringstream service;
00062 
00063                         // TODO: get the main address of this host, if no interface is specified
00064 
00065                         // search for the matching interface
00066                         for (std::list<ibrcommon::vinterface>::const_iterator it = _interfaces.begin(); it != _interfaces.end(); it++)
00067                         {
00068                                 const ibrcommon::vinterface &interface = *it;
00069                                 if (interface == iface)
00070                                 {
00071                                         try {
00072                                                 // get all addresses of this interface
00073                                                 std::list<vaddress> list = interface.getAddresses(ibrcommon::vaddress::VADDRESS_INET);
00074 
00075                                                 // if no address is returned... (goto catch block)
00076                                                 if (list.empty()) throw ibrcommon::Exception("no address found");
00077 
00078                                                 // fill in the ip address
00079                                                 service << "ip=" << list.front().get(false) << ";port=" << _portmap[iface] << ";";
00080                                         } catch (const ibrcommon::Exception&) {
00081                                                 // ... set the port only
00082                                                 service << "port=" << _portmap[iface] << ";";
00083                                         };
00084 
00085                                         params = service.str();
00086                                         return;
00087                                 }
00088                         }
00089 
00090                         throw dtn::net::DiscoveryServiceProvider::NoServiceHereException();
00091                 }
00092 
00093                 const std::string TCPConvergenceLayer::getName() const
00094                 {
00095                         return "TCPConvergenceLayer";
00096                 }
00097 
00098                 void TCPConvergenceLayer::open(const dtn::core::Node &n)
00099                 {
00100                         // search for an existing connection
00101                         ibrcommon::MutexLock l(_connections_cond);
00102 
00103                         for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00104                         {
00105                                 TCPConnection &conn = *(*iter);
00106 
00107                                 if (conn.match(n))
00108                                 {
00109                                         return;
00110                                 }
00111                         }
00112 
00113                         // create a connection
00114                         TCPConnection *conn = new TCPConnection(*this, n, dtn::core::BundleCore::local, 10);
00115 
00116                         // raise setup event
00117                         ConnectionEvent::raise(ConnectionEvent::CONNECTION_SETUP, n);
00118 
00119                         // add connection as pending
00120                         _connections.push_back( conn );
00121 
00122                         // start the ClientHandler (service)
00123                         conn->initialize();
00124 
00125                         // signal that there is a new connection
00126                         _connections_cond.signal(true);
00127 
00128                         return;
00129                 }
00130 
00131                 void TCPConvergenceLayer::queue(const dtn::core::Node &n, const ConvergenceLayer::Job &job)
00132                 {
00133                         // search for an existing connection
00134                         ibrcommon::MutexLock l(_connections_cond);
00135 
00136                         for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00137                         {
00138                                 TCPConnection &conn = *(*iter);
00139 
00140                                 if (conn.match(n))
00141                                 {
00142                                         conn.queue(job._bundle);
00143                                         IBRCOMMON_LOGGER_DEBUG(15) << "queued bundle to an existing tcp connection (" << conn.getNode().getEID().getString() << ")" << IBRCOMMON_LOGGER_ENDL;
00144 
00145                                         return;
00146                                 }
00147                         }
00148 
00149                         // create a connection
00150                         TCPConnection *conn = new TCPConnection(*this, n, dtn::core::BundleCore::local, 10);
00151 
00152                         // raise setup event
00153                         ConnectionEvent::raise(ConnectionEvent::CONNECTION_SETUP, n);
00154 
00155                         // add connection as pending
00156                         _connections.push_back( conn );
00157 
00158                         // start the ClientHandler (service)
00159                         conn->initialize();
00160 
00161                         // queue the bundle
00162                         conn->queue(job._bundle);
00163 
00164                         // signal that there is a new connection
00165                         _connections_cond.signal(true);
00166 
00167                         IBRCOMMON_LOGGER_DEBUG(15) << "queued bundle to an new tcp connection (" << conn->getNode().getEID().getString() << ")" << IBRCOMMON_LOGGER_ENDL;
00168                 }
00169 
00170                 void TCPConvergenceLayer::raiseEvent(const Event *evt)
00171                 {
00172                         const NodeEvent *node = dynamic_cast<const NodeEvent*>(evt);
00173 
00174                         if (node != NULL)
00175                         {
00176                                 if (node->getAction() == NODE_UNAVAILABLE)
00177                                 {
00178                                         // search for an existing connection
00179                                         ibrcommon::MutexLock l(_connections_cond);
00180                                         for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00181                                         {
00182                                                 TCPConnection &conn = *(*iter);
00183 
00184                                                 if (conn.match(*node))
00185                                                 {
00186                                                         // close the connection immediately
00187                                                         conn.shutdown();
00188                                                 }
00189                                         }
00190                                 }
00191                         }
00192                 }
00193 
00194                 void TCPConvergenceLayer::connectionUp(TCPConnection *conn)
00195                 {
00196                         ibrcommon::MutexLock l(_connections_cond);
00197                         for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00198                         {
00199                                 if (conn == (*iter))
00200                                 {
00201                                         // put pending connection to the active connections
00202                                         return;
00203                                 }
00204                         }
00205 
00206                         _connections.push_back( conn );
00207 
00208                         // signal that there is a new connection
00209                         _connections_cond.signal(true);
00210 
00211                         IBRCOMMON_LOGGER_DEBUG(15) << "tcp connection added (" << conn->getNode().getEID().getString() << ")" << IBRCOMMON_LOGGER_ENDL;
00212                 }
00213 
00214                 void TCPConvergenceLayer::connectionDown(TCPConnection *conn)
00215                 {
00216                         ibrcommon::MutexLock l(_connections_cond);
00217                         for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00218                         {
00219                                 if (conn == (*iter))
00220                                 {
00221                                         _connections.erase(iter);
00222                                         IBRCOMMON_LOGGER_DEBUG(15) << "tcp connection removed (" << conn->getNode().getEID().getString() << ")" << IBRCOMMON_LOGGER_ENDL;
00223 
00224                                         // signal that there is a connection less
00225                                         _connections_cond.signal(true);
00226                                         return;
00227                                 }
00228                         }
00229                 }
00230 
00231                 void TCPConvergenceLayer::componentRun()
00232                 {
00233                         // enable interruption in this thread
00234                         Thread::enable_interruption();
00235 
00236                         try {
00237                                 while (true)
00238                                 {
00239                                         // wait for incoming connections
00240                                         tcpstream *stream = _tcpsrv.accept();
00241 
00242                                         // create a new TCPConnection and return the pointer
00243                                         TCPConnection *obj = new TCPConnection(*this, stream, dtn::core::BundleCore::local, 10);
00244 
00245                                         // add the connection to the connection list
00246                                         connectionUp(obj);
00247 
00248                                         // initialize the object
00249                                         obj->initialize();
00250 
00251                                         // breakpoint
00252                                         ibrcommon::Thread::yield();
00253                                 }
00254                         } catch (const std::exception&) {
00255                                 // ignore all errors
00256                                 return;
00257                         }
00258                 }
00259 
00260                 bool TCPConvergenceLayer::__cancellation()
00261                 {
00262                         _tcpsrv.shutdown();
00263                         _tcpsrv.close();
00264                         Thread::interrupt();
00265                         return true;
00266                 }
00267 
00268                 void TCPConvergenceLayer::closeAll()
00269                 {
00270                         // search for an existing connection
00271                         ibrcommon::MutexLock l(_connections_cond);
00272                         for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00273                         {
00274                                 TCPConnection &conn = *(*iter);
00275 
00276                                 // close the connection immediately
00277                                 conn.shutdown();
00278                         }
00279                 }
00280 
00281                 void TCPConvergenceLayer::componentUp()
00282                 {
00283                         // listen on the socket, max. 5 concurrent awaiting connections
00284                         _tcpsrv.listen(5);
00285                 }
00286 
00287                 void TCPConvergenceLayer::componentDown()
00288                 {
00289                         // shutdown the TCP server
00290                         _tcpsrv.shutdown();
00291                         _tcpsrv.close();
00292 
00293                         // close all active connections
00294                         closeAll();
00295 
00296                         // wait until all tcp connections are down
00297                         {
00298                                 ibrcommon::MutexLock l(_connections_cond);
00299                                 while (_connections.size() > 0) _connections_cond.wait();
00300                         }
00301 
00302                         // interrupt the select thread
00303                         Thread::interrupt();
00304                 }
00305         }
00306 }

Generated on Wed Mar 30 2011 11:11:49 for IBR-DTNSuite by  doxygen 1.7.1