|
IBR-DTNSuite 0.6
|
00001 /* 00002 * FloodRoutingExtension.cpp 00003 * 00004 * Created on: 18.02.2010 00005 * Author: morgenro 00006 */ 00007 00008 #include "routing/flooding/FloodRoutingExtension.h" 00009 #include "routing/QueueBundleEvent.h" 00010 #include "core/NodeEvent.h" 00011 #include "net/TransferCompletedEvent.h" 00012 #include "net/TransferAbortedEvent.h" 00013 #include "net/ConnectionEvent.h" 00014 #include "core/Node.h" 00015 #include "net/ConnectionManager.h" 00016 #include "Configuration.h" 00017 #include "core/BundleCore.h" 00018 00019 #include <ibrdtn/data/MetaBundle.h> 00020 #include <ibrcommon/thread/MutexLock.h> 00021 #include <ibrcommon/Logger.h> 00022 #include <ibrcommon/AutoDelete.h> 00023 00024 #include <functional> 00025 #include <list> 00026 #include <algorithm> 00027 #include <iomanip> 00028 #include <ios> 00029 #include <iostream> 00030 00031 #include <stdlib.h> 00032 #include <typeinfo> 00033 00034 namespace dtn 00035 { 00036 namespace routing 00037 { 00038 FloodRoutingExtension::FloodRoutingExtension() 00039 { 00040 // write something to the syslog 00041 IBRCOMMON_LOGGER(info) << "Initializing flooding routing module" << IBRCOMMON_LOGGER_ENDL; 00042 } 00043 00044 FloodRoutingExtension::~FloodRoutingExtension() 00045 { 00046 stop(); 00047 join(); 00048 } 00049 00050 void FloodRoutingExtension::stopExtension() 00051 { 00052 _taskqueue.abort(); 00053 } 00054 00055 void FloodRoutingExtension::notify(const dtn::core::Event *evt) 00056 { 00057 try { 00058 const QueueBundleEvent &queued = dynamic_cast<const QueueBundleEvent&>(*evt); 00059 _taskqueue.push( new ProcessBundleTask(queued.bundle, queued.origin) ); 00060 return; 00061 } catch (const std::bad_cast&) { }; 00062 00063 try { 00064 const dtn::core::NodeEvent &nodeevent = dynamic_cast<const dtn::core::NodeEvent&>(*evt); 00065 const dtn::core::Node &n = nodeevent.getNode(); 00066 00067 if (nodeevent.getAction() == NODE_AVAILABLE) 00068 { 00069 const dtn::data::EID &eid = n.getEID(); 00070 00071 // send all (multi-hop) bundles in the storage to the neighbor 00072 _taskqueue.push( new SearchNextBundleTask(eid) ); 00073 } 00074 return; 00075 } catch (const std::bad_cast&) { }; 00076 00077 try { 00078 const dtn::net::ConnectionEvent &ce = dynamic_cast<const dtn::net::ConnectionEvent&>(*evt); 00079 00080 if (ce.state == dtn::net::ConnectionEvent::CONNECTION_UP) 00081 { 00082 // send all (multi-hop) bundles in the storage to the neighbor 00083 _taskqueue.push( new SearchNextBundleTask(ce.peer) ); 00084 } 00085 return; 00086 } catch (const std::bad_cast&) { }; 00087 00088 // The bundle transfer has been aborted 00089 try { 00090 const dtn::net::TransferAbortedEvent &aborted = dynamic_cast<const dtn::net::TransferAbortedEvent&>(*evt); 00091 00092 // transfer the next bundle to this destination 00093 _taskqueue.push( new SearchNextBundleTask( aborted.getPeer() ) ); 00094 return; 00095 } catch (const std::bad_cast&) { }; 00096 00097 // A bundle transfer was successful 00098 try { 00099 const dtn::net::TransferCompletedEvent &completed = dynamic_cast<const dtn::net::TransferCompletedEvent&>(*evt); 00100 00101 // transfer the next bundle to this destination 00102 _taskqueue.push( new SearchNextBundleTask( completed.getPeer() ) ); 00103 return; 00104 } catch (const std::bad_cast&) { }; 00105 } 00106 00107 bool FloodRoutingExtension::__cancellation() 00108 { 00109 _taskqueue.abort(); 00110 return true; 00111 } 00112 00113 void FloodRoutingExtension::run() 00114 { 00115 class BundleFilter : public dtn::core::BundleStorage::BundleFilterCallback 00116 { 00117 public: 00118 BundleFilter(const NeighborDatabase::NeighborEntry &entry) 00119 : _entry(entry) 00120 {}; 00121 00122 virtual ~BundleFilter() {}; 00123 00124 virtual size_t limit() const { return 10; }; 00125 00126 virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const 00127 { 00128 // check Scope Control Block - do not forward bundles with hop limit == 0 00129 if (meta.hopcount == 0) 00130 { 00131 return false; 00132 } 00133 00134 // check Scope Control Block - do not forward non-group bundles with hop limit <= 1 00135 if ((meta.hopcount <= 1) && (meta.get(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON))) 00136 { 00137 return false; 00138 } 00139 00140 // do not forward to any blacklisted destination 00141 const dtn::data::EID dest = meta.destination.getNode(); 00142 if (_blacklist.find(dest) != _blacklist.end()) 00143 { 00144 return false; 00145 } 00146 00147 // do not forward bundles already known by the destination 00148 if (_entry.has(meta)) 00149 { 00150 return false; 00151 } 00152 00153 return true; 00154 }; 00155 00156 void blacklist(const dtn::data::EID& id) 00157 { 00158 _blacklist.insert(id); 00159 }; 00160 00161 private: 00162 std::set<dtn::data::EID> _blacklist; 00163 const NeighborDatabase::NeighborEntry &_entry; 00164 }; 00165 00166 dtn::core::BundleStorage &storage = (**this).getStorage(); 00167 00168 while (true) 00169 { 00170 try { 00171 Task *t = _taskqueue.getnpop(true); 00172 ibrcommon::AutoDelete<Task> killer(t); 00173 00174 IBRCOMMON_LOGGER_DEBUG(50) << "processing flooding task " << t->toString() << IBRCOMMON_LOGGER_ENDL; 00175 00176 try { 00177 try { 00178 SearchNextBundleTask &task = dynamic_cast<SearchNextBundleTask&>(*t); 00179 NeighborDatabase &db = (**this).getNeighborDB(); 00180 00181 ibrcommon::MutexLock l(db); 00182 NeighborDatabase::NeighborEntry &entry = db.get(task.eid); 00183 00184 // get the bundle filter of the neighbor 00185 BundleFilter filter(entry); 00186 00187 // some debug 00188 IBRCOMMON_LOGGER_DEBUG(40) << "search some bundles not known by " << task.eid.getString() << IBRCOMMON_LOGGER_ENDL; 00189 00190 // blacklist the neighbor itself, because this is handles by neighbor routing extension 00191 filter.blacklist(task.eid); 00192 00193 // query all bundles from the storage 00194 const std::list<dtn::data::MetaBundle> list = storage.get(filter); 00195 00196 // send the bundles as long as we have resources 00197 for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); iter++) 00198 { 00199 try { 00200 // transfer the bundle to the neighbor 00201 transferTo(entry, *iter); 00202 } catch (const NeighborDatabase::AlreadyInTransitException&) { }; 00203 } 00204 } catch (const NeighborDatabase::NoMoreTransfersAvailable&) { 00205 } catch (const NeighborDatabase::NeighborNotAvailableException&) { 00206 } catch (const std::bad_cast&) { }; 00207 00208 try { 00209 dynamic_cast<ProcessBundleTask&>(*t); 00210 00211 // new bundles are forwarded to all neighbors 00212 const std::set<dtn::core::Node> nl = dtn::core::BundleCore::getInstance().getNeighbors(); 00213 00214 for (std::set<dtn::core::Node>::const_iterator iter = nl.begin(); iter != nl.end(); iter++) 00215 { 00216 const dtn::core::Node &n = (*iter); 00217 00218 // transfer the next bundle to this destination 00219 _taskqueue.push( new SearchNextBundleTask( n.getEID() ) ); 00220 } 00221 } catch (const std::bad_cast&) { }; 00222 } catch (const ibrcommon::Exception &ex) { 00223 IBRCOMMON_LOGGER_DEBUG(20) << "Exception occurred in FloodRoutingExtension: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00224 } 00225 } catch (const std::exception&) { 00226 return; 00227 } 00228 00229 yield(); 00230 } 00231 } 00232 00233 /****************************************/ 00234 00235 FloodRoutingExtension::SearchNextBundleTask::SearchNextBundleTask(const dtn::data::EID &e) 00236 : eid(e) 00237 { } 00238 00239 FloodRoutingExtension::SearchNextBundleTask::~SearchNextBundleTask() 00240 { } 00241 00242 std::string FloodRoutingExtension::SearchNextBundleTask::toString() 00243 { 00244 return "SearchNextBundleTask: " + eid.getString(); 00245 } 00246 00247 /****************************************/ 00248 00249 FloodRoutingExtension::ProcessBundleTask::ProcessBundleTask(const dtn::data::MetaBundle &meta, const dtn::data::EID &o) 00250 : bundle(meta), origin(o) 00251 { } 00252 00253 FloodRoutingExtension::ProcessBundleTask::~ProcessBundleTask() 00254 { } 00255 00256 std::string FloodRoutingExtension::ProcessBundleTask::toString() 00257 { 00258 return "ProcessBundleTask: " + bundle.toString(); 00259 } 00260 } 00261 }