Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #include "net/ConnectionManager.h"
00009 #include "net/UDPConvergenceLayer.h"
00010 #include "net/TCPConvergenceLayer.h"
00011 #include "net/BundleReceivedEvent.h"
00012 #include "core/NodeEvent.h"
00013 #include "core/BundleEvent.h"
00014 #include "ibrcommon/net/tcpserver.h"
00015 #include "core/BundleCore.h"
00016 #include "routing/RequeueBundleEvent.h"
00017 #include "net/ConnectionEvent.h"
00018
00019 #include "core/NodeEvent.h"
00020 #include "core/TimeEvent.h"
00021
00022 #include <iostream>
00023 #include <iomanip>
00024 #include <algorithm>
00025 #include <functional>
00026 #include <typeinfo>
00027 #include <ibrcommon/Logger.h>
00028
00029 using namespace dtn::core;
00030
00031 namespace dtn
00032 {
00033 namespace net
00034 {
00035 struct CompareNodeDestination:
00036 public std::binary_function< dtn::core::Node, dtn::data::EID, bool > {
00037 bool operator() ( const dtn::core::Node &node, const dtn::data::EID &destination ) const {
00038 return dtn::data::EID(node.getURI()) == destination;
00039 }
00040 };
00041
00042 ConnectionManager::ConnectionManager()
00043 : _shutdown(false)
00044 {
00045 }
00046
00047 ConnectionManager::~ConnectionManager()
00048 {
00049 }
00050
00051 void ConnectionManager::componentUp()
00052 {
00053 bindEvent(TimeEvent::className);
00054 bindEvent(NodeEvent::className);
00055 bindEvent(ConnectionEvent::className);
00056 }
00057
00058 void ConnectionManager::componentDown()
00059 {
00060 {
00061 ibrcommon::MutexLock l(_cl_lock);
00062
00063 _cl.clear();
00064 }
00065
00066 unbindEvent(NodeEvent::className);
00067 unbindEvent(TimeEvent::className);
00068 unbindEvent(ConnectionEvent::className);
00069 }
00070
00071 void ConnectionManager::raiseEvent(const dtn::core::Event *evt)
00072 {
00073 try {
00074 const NodeEvent &nodeevent = dynamic_cast<const NodeEvent&>(*evt);
00075
00076 Node n = nodeevent.getNode();
00077
00078 if (nodeevent.getAction() == NODE_INFO_UPDATED)
00079 {
00080 discovered(n);
00081 }
00082 } catch (std::bad_cast) { }
00083
00084 try {
00085 const TimeEvent &timeevent = dynamic_cast<const TimeEvent&>(*evt);
00086
00087 if (timeevent.getAction() == TIME_SECOND_TICK)
00088 {
00089 check_discovered();
00090 }
00091 } catch (std::bad_cast) { }
00092
00093 try {
00094 const ConnectionEvent &connection = dynamic_cast<const ConnectionEvent&>(*evt);
00095
00096 switch (connection.state)
00097 {
00098 case ConnectionEvent::CONNECTION_UP:
00099 {
00100 ibrcommon::MutexLock l(_node_lock);
00101
00102
00103 if (!isNeighbor(connection.node))
00104 {
00105
00106 dtn::core::NodeEvent::raise(connection.node, dtn::core::NODE_AVAILABLE);
00107 }
00108
00109 _connected_nodes.insert(connection.node);
00110 break;
00111 }
00112
00113 case ConnectionEvent::CONNECTION_DOWN:
00114 {
00115 ibrcommon::MutexLock l(_node_lock);
00116
00117
00118 if (isNeighbor(connection.node))
00119 {
00120
00121 _connected_nodes.erase(connection.node);
00122
00123
00124 _discovered_nodes.erase(connection.node);
00125
00126
00127 if (!isNeighbor(connection.node))
00128 {
00129
00130 dtn::core::NodeEvent::raise(connection.node, dtn::core::NODE_UNAVAILABLE);
00131 }
00132 }
00133 break;
00134 }
00135
00136 default:
00137 break;
00138 }
00139
00140 } catch (std::bad_cast) {
00141
00142 }
00143 }
00144
00145 void ConnectionManager::addConnection(const dtn::core::Node &n)
00146 {
00147 ibrcommon::MutexLock l(_node_lock);
00148 _static_nodes.insert(n);
00149 dtn::core::NodeEvent::raise(n, dtn::core::NODE_AVAILABLE);
00150 }
00151
00152 void ConnectionManager::addConvergenceLayer(ConvergenceLayer *cl)
00153 {
00154 ibrcommon::MutexLock l(_cl_lock);
00155 _cl.insert( cl );
00156 }
00157
00158 void ConnectionManager::discovered(dtn::core::Node &node)
00159 {
00160
00161 if (EID(node.getURI()) == dtn::core::BundleCore::local) return;
00162
00163 ibrcommon::MutexLock l(_node_lock);
00164
00165
00166 if (!isNeighbor(node))
00167 {
00168
00169 dtn::core::NodeEvent::raise(node, dtn::core::NODE_AVAILABLE);
00170 }
00171
00172 _discovered_nodes.erase(node);
00173 _discovered_nodes.insert(node);
00174 }
00175
00176 void ConnectionManager::check_discovered()
00177 {
00178 ibrcommon::MutexLock l(_node_lock);
00179
00180
00181 std::set<dtn::core::Node>::iterator iter = _discovered_nodes.begin();
00182
00183 while (iter != _discovered_nodes.end())
00184 {
00185 dtn::core::Node n = (*iter);
00186
00187
00188 _discovered_nodes.erase( iter++ );
00189
00190 if ( !n.decrementTimeout(1) )
00191 {
00192
00193 _connected_nodes.erase(n);
00194
00195
00196 dtn::core::NodeEvent::raise(n, dtn::core::NODE_UNAVAILABLE);
00197 }
00198 else
00199 {
00200 _discovered_nodes.insert(n);
00201 }
00202 }
00203 }
00204
00205 void ConnectionManager::queue(const dtn::core::Node &node, const ConvergenceLayer::Job &job)
00206 {
00207 if ((node.getProtocol() == Node::CONN_UNDEFINED) || (node.getProtocol() == Node::CONN_UNSUPPORTED))
00208 throw ConnectionNotAvailableException();
00209
00210 ibrcommon::MutexLock l(_cl_lock);
00211
00212
00213 for (std::set<ConvergenceLayer*>::iterator iter = _cl.begin(); iter != _cl.end(); iter++)
00214 {
00215 ConvergenceLayer *cl = (*iter);
00216 if (node.getProtocol() == cl->getDiscoveryProtocol())
00217 {
00218 cl->queue(node, job);
00219
00220
00221 return;
00222 }
00223 }
00224
00225 throw ConnectionNotAvailableException();
00226 }
00227
00228 void ConnectionManager::queue(const ConvergenceLayer::Job &job)
00229 {
00230 try {
00231 ibrcommon::MutexLock l(_node_lock);
00232
00233
00234 std::list<dtn::core::Node> match_rank;
00235
00236
00237 match_rank.push_back(dtn::core::Node(job._destination, Node::CONN_TCPIP));
00238
00239
00240 match_rank.push_back(dtn::core::Node(job._destination, Node::CONN_UDPIP));
00241
00242
00243 match_rank.push_back(dtn::core::Node(job._destination, Node::CONN_BLUETOOTH));
00244
00245
00246 match_rank.push_back(dtn::core::Node(job._destination, Node::CONN_ZIGBEE));
00247
00248
00249 match_rank.push_back(dtn::core::Node(job._destination, Node::CONN_HTTP));
00250
00251 if (IBRCOMMON_LOGGER_LEVEL >= 50)
00252 {
00253 IBRCOMMON_LOGGER_DEBUG(50) << "## static node list ##" << IBRCOMMON_LOGGER_ENDL;
00254 for (std::set<dtn::core::Node>::const_iterator iter = _static_nodes.begin(); iter != _static_nodes.end(); iter++)
00255 {
00256 const dtn::core::Node &n = (*iter);
00257 IBRCOMMON_LOGGER_DEBUG(50) << n.toString() << IBRCOMMON_LOGGER_ENDL;
00258 }
00259 }
00260
00261 if (IBRCOMMON_LOGGER_LEVEL >= 50)
00262 {
00263 IBRCOMMON_LOGGER_DEBUG(50) << "## dynamic node list ##" << IBRCOMMON_LOGGER_ENDL;
00264 for (std::set<dtn::core::Node>::const_iterator iter = _discovered_nodes.begin(); iter != _discovered_nodes.end(); iter++)
00265 {
00266 const dtn::core::Node &n = (*iter);
00267 IBRCOMMON_LOGGER_DEBUG(50) << n.toString() << IBRCOMMON_LOGGER_ENDL;
00268 }
00269 }
00270
00271 if (IBRCOMMON_LOGGER_LEVEL >= 50)
00272 {
00273 IBRCOMMON_LOGGER_DEBUG(50) << "## connected node list ##" << IBRCOMMON_LOGGER_ENDL;
00274 for (std::set<dtn::core::Node>::const_iterator iter = _connected_nodes.begin(); iter != _connected_nodes.end(); iter++)
00275 {
00276 const dtn::core::Node &n = (*iter);
00277 IBRCOMMON_LOGGER_DEBUG(50) << n.toString() << IBRCOMMON_LOGGER_ENDL;
00278 }
00279 }
00280
00281
00282 for (std::list<dtn::core::Node>::const_iterator imatch = match_rank.begin(); imatch != match_rank.end(); imatch++)
00283 {
00284 const dtn::core::Node &match = (*imatch);
00285 IBRCOMMON_LOGGER_DEBUG(50) << "match for " << match.toString() << IBRCOMMON_LOGGER_ENDL;
00286
00287 try {
00288
00289 std::set<dtn::core::Node>::const_iterator iter = _static_nodes.find(match);
00290
00291 if (iter != _static_nodes.end())
00292 {
00293 throw (*iter);
00294 }
00295 } catch (std::exception) { }
00296
00297 try {
00298
00299 std::set<dtn::core::Node>::const_iterator iter = _discovered_nodes.find(match);
00300
00301 if (iter != _discovered_nodes.end())
00302 {
00303 throw (*iter);
00304 }
00305 } catch (std::exception) { }
00306
00307 try {
00308
00309 std::set<dtn::core::Node>::const_iterator iter = _connected_nodes.find(match);
00310
00311 if (iter != _connected_nodes.end())
00312 {
00313 throw (*iter);
00314 }
00315 } catch (std::exception) { }
00316 }
00317 } catch (const dtn::core::Node next) {
00318 IBRCOMMON_LOGGER_DEBUG(50) << "next hop: " << next.toString() << IBRCOMMON_LOGGER_ENDL;
00319 queue(next, job);
00320 return;
00321 }
00322
00323 throw NeighborNotAvailableException("No active connection to this neighbor available!");
00324 }
00325
00326 void ConnectionManager::queue(const dtn::data::EID &eid, const dtn::data::BundleID &b)
00327 {
00328 queue( ConvergenceLayer::Job(eid, b) );
00329 }
00330
00331 const std::set<dtn::core::Node> ConnectionManager::getNeighbors()
00332 {
00333 ibrcommon::MutexLock l(_node_lock);
00334
00335 std::set<dtn::core::Node> _nodes;
00336
00337 for (std::set<dtn::core::Node>::const_iterator iter = _static_nodes.begin(); iter != _static_nodes.end(); iter++)
00338 {
00339 _nodes.insert( *iter );
00340 }
00341
00342 for (std::set<dtn::core::Node>::const_iterator iter = _discovered_nodes.begin(); iter != _discovered_nodes.end(); iter++)
00343 {
00344 _nodes.insert( *iter );
00345 }
00346
00347 for (std::set<dtn::core::Node>::const_iterator iter = _connected_nodes.begin(); iter != _connected_nodes.end(); iter++)
00348 {
00349 _nodes.insert( *iter );
00350 }
00351
00352 return _nodes;
00353 }
00354
00355
00356 bool ConnectionManager::isNeighbor(const dtn::core::Node &node)
00357 {
00358
00359 if (_static_nodes.find(node) != _static_nodes.end())
00360 {
00361 return true;
00362 }
00363
00364 if (_discovered_nodes.find(node) != _discovered_nodes.end())
00365 {
00366 return true;
00367 }
00368
00369 if (_connected_nodes.find(node) != _connected_nodes.end())
00370 {
00371 return true;
00372 }
00373
00374 return false;
00375 }
00376
00377 const std::string ConnectionManager::getName() const
00378 {
00379 return "ConnectionManager";
00380 }
00381 }
00382 }