00001
00002
00003
00004
00005
00006
00007
00008 #include "net/ConnectionManager.h"
00009 #include "net/UDPConvergenceLayer.h"
00010 #include "net/TCPConvergenceLayer.h"
00011 #include "net/BundleReceivedEvent.h"
00012 #include "core/NodeEvent.h"
00013 #include "core/BundleEvent.h"
00014 #include "ibrcommon/net/tcpserver.h"
00015 #include "core/BundleCore.h"
00016 #include "routing/RequeueBundleEvent.h"
00017 #include "net/TransferCompletedEvent.h"
00018 #include "net/ConnectionEvent.h"
00019
00020 #include "core/NodeEvent.h"
00021 #include "core/TimeEvent.h"
00022
00023 #include <iostream>
00024 #include <iomanip>
00025 #include <algorithm>
00026 #include <functional>
00027 #include <typeinfo>
00028 #include <ibrcommon/Logger.h>
00029
00030 using namespace dtn::core;
00031
00032 namespace dtn
00033 {
00034 namespace net
00035 {
00036 struct CompareNodeDestination:
00037 public std::binary_function< dtn::core::Node, dtn::data::EID, bool > {
00038 bool operator() ( const dtn::core::Node &node, const dtn::data::EID &destination ) const {
00039 return dtn::data::EID(node.getURI()) == destination;
00040 }
00041 };
00042
00043 ConnectionManager::ConnectionManager()
00044 : _shutdown(false)
00045 {
00046 }
00047
00048 ConnectionManager::~ConnectionManager()
00049 {
00050 }
00051
00052 void ConnectionManager::componentUp()
00053 {
00054 bindEvent(TimeEvent::className);
00055 bindEvent(NodeEvent::className);
00056 bindEvent(ConnectionEvent::className);
00057 }
00058
00059 void ConnectionManager::componentDown()
00060 {
00061 {
00062 ibrcommon::MutexLock l(_cl_lock);
00063
00064 _cl.clear();
00065 }
00066
00067 unbindEvent(NodeEvent::className);
00068 unbindEvent(TimeEvent::className);
00069 unbindEvent(ConnectionEvent::className);
00070 }
00071
00072 void ConnectionManager::raiseEvent(const dtn::core::Event *evt)
00073 {
00074 const NodeEvent *nodeevent = dynamic_cast<const NodeEvent*>(evt);
00075 const TimeEvent *timeevent = dynamic_cast<const TimeEvent*>(evt);
00076
00077 if (timeevent != NULL)
00078 {
00079 if (timeevent->getAction() == TIME_SECOND_TICK)
00080 {
00081 check_discovered();
00082 }
00083 }
00084 else if (nodeevent != NULL)
00085 {
00086 Node n = nodeevent->getNode();
00087
00088 if (nodeevent->getAction() == NODE_INFO_UPDATED)
00089 {
00090 discovered(n);
00091 }
00092 }
00093
00094 try {
00095 const ConnectionEvent &connection = dynamic_cast<const ConnectionEvent&>(*evt);
00096
00097 switch (connection.state)
00098 {
00099 case ConnectionEvent::CONNECTION_UP:
00100 {
00101 ibrcommon::MutexLock l(_node_lock);
00102
00103
00104 if (!isNeighbor(connection.node))
00105 {
00106
00107 dtn::core::NodeEvent::raise(connection.node, dtn::core::NODE_AVAILABLE);
00108 }
00109
00110 _connected_nodes.insert(connection.node);
00111 break;
00112 }
00113
00114 case ConnectionEvent::CONNECTION_DOWN:
00115 {
00116 ibrcommon::MutexLock l(_node_lock);
00117 _connected_nodes.erase(connection.node);
00118
00119
00120 if (!isNeighbor(connection.node))
00121 {
00122
00123 dtn::core::NodeEvent::raise(connection.node, dtn::core::NODE_UNAVAILABLE);
00124 }
00125 break;
00126 }
00127
00128 default:
00129 break;
00130 }
00131
00132 } catch (std::bad_cast) {
00133
00134 }
00135 }
00136
00137 void ConnectionManager::addConnection(const dtn::core::Node &n)
00138 {
00139 ibrcommon::MutexLock l(_node_lock);
00140 _static_nodes.insert(n);
00141 dtn::core::NodeEvent::raise(n, dtn::core::NODE_AVAILABLE);
00142 }
00143
00144 void ConnectionManager::addConvergenceLayer(ConvergenceLayer *cl)
00145 {
00146 ibrcommon::MutexLock l(_cl_lock);
00147 _cl.insert( cl );
00148 }
00149
00150 void ConnectionManager::discovered(dtn::core::Node &node)
00151 {
00152
00153 if (EID(node.getURI()) == dtn::core::BundleCore::local) return;
00154
00155 ibrcommon::MutexLock l(_node_lock);
00156
00157
00158 if (!isNeighbor(node))
00159 {
00160
00161 dtn::core::NodeEvent::raise(node, dtn::core::NODE_AVAILABLE);
00162 }
00163
00164 _discovered_nodes.erase(node);
00165 _discovered_nodes.insert(node);
00166 }
00167
00168 void ConnectionManager::check_discovered()
00169 {
00170 std::list<dtn::core::Node> unavailables;
00171
00172 ibrcommon::MutexLock l(_node_lock);
00173
00174
00175 std::set<dtn::core::Node>::iterator iter = _discovered_nodes.begin();
00176
00177 while (iter != _discovered_nodes.end())
00178 {
00179 dtn::core::Node n = (*iter);
00180
00181
00182 _discovered_nodes.erase( iter++ );
00183
00184 if ( !n.decrementTimeout(1) )
00185 {
00186 unavailables.push_back(n);
00187 }
00188 else
00189 {
00190 _discovered_nodes.insert(n);
00191 }
00192 }
00193
00194 for(std::list<dtn::core::Node>::const_iterator iter = unavailables.begin(); iter != unavailables.end(); iter++)
00195 {
00196
00197 if (!isNeighbor(*iter))
00198 {
00199
00200 dtn::core::NodeEvent::raise(*iter, dtn::core::NODE_UNAVAILABLE);
00201 }
00202 }
00203 }
00204
00205 void ConnectionManager::queue(const dtn::core::Node &node, const ConvergenceLayer::Job &job)
00206 {
00207 if ((node.getProtocol() == Node::CONN_UNDEFINED) || (node.getProtocol() == Node::CONN_UNSUPPORTED))
00208 throw ConnectionNotAvailableException();
00209
00210 ibrcommon::MutexLock l(_cl_lock);
00211
00212
00213 for (std::set<ConvergenceLayer*>::iterator iter = _cl.begin(); iter != _cl.end(); iter++)
00214 {
00215 ConvergenceLayer *cl = (*iter);
00216 if (node.getProtocol() == cl->getDiscoveryProtocol())
00217 {
00218 cl->queue(node, job);
00219
00220
00221 return;
00222 }
00223 }
00224
00225 throw ConnectionNotAvailableException();
00226 }
00227
00228 void ConnectionManager::queue(const ConvergenceLayer::Job &job)
00229 {
00230 ibrcommon::MutexLock l(_node_lock);
00231
00232
00233 std::list<dtn::core::Node> match_rank;
00234
00235
00236 match_rank.push_back(dtn::core::Node(job._destination, Node::CONN_TCPIP));
00237
00238
00239 match_rank.push_back(dtn::core::Node(job._destination, Node::CONN_UDPIP));
00240
00241
00242 match_rank.push_back(dtn::core::Node(job._destination, Node::CONN_BLUETOOTH));
00243
00244
00245 match_rank.push_back(dtn::core::Node(job._destination, Node::CONN_ZIGBEE));
00246
00247
00248 match_rank.push_back(dtn::core::Node(job._destination, Node::CONN_HTTP));
00249
00250 if (IBRCOMMON_LOGGER_LEVEL >= 50)
00251 {
00252 IBRCOMMON_LOGGER_DEBUG(50) << "## static node list ##" << IBRCOMMON_LOGGER_ENDL;
00253 for (std::set<dtn::core::Node>::const_iterator iter = _static_nodes.begin(); iter != _static_nodes.end(); iter++)
00254 {
00255 const dtn::core::Node &n = (*iter);
00256 IBRCOMMON_LOGGER_DEBUG(50) << n.toString() << IBRCOMMON_LOGGER_ENDL;
00257 }
00258 }
00259
00260 if (IBRCOMMON_LOGGER_LEVEL >= 50)
00261 {
00262 IBRCOMMON_LOGGER_DEBUG(50) << "## dynamic node list ##" << IBRCOMMON_LOGGER_ENDL;
00263 for (std::set<dtn::core::Node>::const_iterator iter = _discovered_nodes.begin(); iter != _discovered_nodes.end(); iter++)
00264 {
00265 const dtn::core::Node &n = (*iter);
00266 IBRCOMMON_LOGGER_DEBUG(50) << n.toString() << IBRCOMMON_LOGGER_ENDL;
00267 }
00268 }
00269
00270 if (IBRCOMMON_LOGGER_LEVEL >= 50)
00271 {
00272 IBRCOMMON_LOGGER_DEBUG(50) << "## connected node list ##" << IBRCOMMON_LOGGER_ENDL;
00273 for (std::set<dtn::core::Node>::const_iterator iter = _connected_nodes.begin(); iter != _connected_nodes.end(); iter++)
00274 {
00275 const dtn::core::Node &n = (*iter);
00276 IBRCOMMON_LOGGER_DEBUG(50) << n.toString() << IBRCOMMON_LOGGER_ENDL;
00277 }
00278 }
00279
00280
00281 for (std::list<dtn::core::Node>::const_iterator imatch = match_rank.begin(); imatch != match_rank.end(); imatch++)
00282 {
00283 const dtn::core::Node &match = (*imatch);
00284 IBRCOMMON_LOGGER_DEBUG(50) << "match for " << match.toString() << IBRCOMMON_LOGGER_ENDL;
00285
00286 try {
00287
00288 std::set<dtn::core::Node>::const_iterator iter = _static_nodes.find(match);
00289
00290 if (iter != _static_nodes.end())
00291 {
00292 const dtn::core::Node &next = (*iter);
00293 IBRCOMMON_LOGGER_DEBUG(50) << "next hop: " << next.toString() << IBRCOMMON_LOGGER_ENDL;
00294
00295 queue(next, job);
00296 return;
00297 }
00298 } catch (...) {
00299
00300 }
00301
00302 try {
00303
00304 std::set<dtn::core::Node>::const_iterator iter = _discovered_nodes.find(match);
00305
00306 if (iter != _discovered_nodes.end())
00307 {
00308 const dtn::core::Node &next = (*iter);
00309 IBRCOMMON_LOGGER_DEBUG(50) << "next hop: " << next.toString() << IBRCOMMON_LOGGER_ENDL;
00310
00311 queue(next, job);
00312 return;
00313 }
00314 } catch (...) {
00315
00316 }
00317
00318 try {
00319
00320 std::set<dtn::core::Node>::const_iterator iter = _connected_nodes.find(match);
00321
00322 if (iter != _connected_nodes.end())
00323 {
00324 const dtn::core::Node &next = (*iter);
00325 IBRCOMMON_LOGGER_DEBUG(50) << "next hop: " << next.toString() << IBRCOMMON_LOGGER_ENDL;
00326
00327 queue(next, job);
00328 return;
00329 }
00330 } catch (...) {
00331
00332 }
00333 }
00334
00335 throw NeighborNotAvailableException("No active connection to this neighbor available!");
00336 }
00337
00338 void ConnectionManager::queue(const dtn::data::EID &eid, const dtn::data::Bundle &b)
00339 {
00340 queue( ConvergenceLayer::Job(eid, b) );
00341 }
00342
00343 const std::set<dtn::core::Node> ConnectionManager::getNeighbors()
00344 {
00345 ibrcommon::MutexLock l(_node_lock);
00346
00347 std::set<dtn::core::Node> _nodes;
00348
00349 for (std::set<dtn::core::Node>::const_iterator iter = _static_nodes.begin(); iter != _static_nodes.end(); iter++)
00350 {
00351 _nodes.insert( *iter );
00352 }
00353
00354 for (std::set<dtn::core::Node>::const_iterator iter = _discovered_nodes.begin(); iter != _discovered_nodes.end(); iter++)
00355 {
00356 _nodes.insert( *iter );
00357 }
00358
00359 for (std::set<dtn::core::Node>::const_iterator iter = _connected_nodes.begin(); iter != _connected_nodes.end(); iter++)
00360 {
00361 _nodes.insert( *iter );
00362 }
00363
00364 return _nodes;
00365 }
00366
00367
00368 bool ConnectionManager::isNeighbor(const dtn::core::Node &node)
00369 {
00370
00371 if (_static_nodes.find(node) != _static_nodes.end())
00372 {
00373 return true;
00374 }
00375
00376 if (_discovered_nodes.find(node) != _discovered_nodes.end())
00377 {
00378 return true;
00379 }
00380
00381 if (_connected_nodes.find(node) != _connected_nodes.end())
00382 {
00383 return true;
00384 }
00385
00386 return false;
00387 }
00388 }
00389 }