|
IBR-DTNSuite 0.6
|
00001 /* 00002 * FloodRoutingExtension.cpp 00003 * 00004 * Created on: 18.02.2010 00005 * Author: morgenro 00006 */ 00007 00008 #include "routing/flooding/FloodRoutingExtension.h" 00009 #include "routing/QueueBundleEvent.h" 00010 #include "core/NodeEvent.h" 00011 #include "net/TransferCompletedEvent.h" 00012 #include "net/TransferAbortedEvent.h" 00013 #include "core/Node.h" 00014 #include "net/ConnectionManager.h" 00015 #include "Configuration.h" 00016 #include "core/BundleCore.h" 00017 00018 #include <ibrdtn/data/MetaBundle.h> 00019 #include <ibrcommon/thread/MutexLock.h> 00020 #include <ibrcommon/Logger.h> 00021 #include <ibrcommon/AutoDelete.h> 00022 00023 #include <functional> 00024 #include <list> 00025 #include <algorithm> 00026 #include <iomanip> 00027 #include <ios> 00028 #include <iostream> 00029 00030 #include <stdlib.h> 00031 #include <typeinfo> 00032 00033 namespace dtn 00034 { 00035 namespace routing 00036 { 00037 FloodRoutingExtension::FloodRoutingExtension() 00038 { 00039 // write something to the syslog 00040 IBRCOMMON_LOGGER(info) << "Initializing flooding routing module" << IBRCOMMON_LOGGER_ENDL; 00041 } 00042 00043 FloodRoutingExtension::~FloodRoutingExtension() 00044 { 00045 stop(); 00046 join(); 00047 } 00048 00049 void FloodRoutingExtension::stopExtension() 00050 { 00051 _taskqueue.abort(); 00052 } 00053 00054 void FloodRoutingExtension::notify(const dtn::core::Event *evt) 00055 { 00056 try { 00057 const QueueBundleEvent &queued = dynamic_cast<const QueueBundleEvent&>(*evt); 00058 _taskqueue.push( new ProcessBundleTask(queued.bundle, queued.origin) ); 00059 return; 00060 } catch (const std::bad_cast&) { }; 00061 00062 try { 00063 const dtn::core::NodeEvent &nodeevent = dynamic_cast<const dtn::core::NodeEvent&>(*evt); 00064 const dtn::core::Node &n = nodeevent.getNode(); 00065 00066 if (nodeevent.getAction() == NODE_AVAILABLE) 00067 { 00068 const dtn::data::EID &eid = n.getEID(); 00069 00070 // send all (multi-hop) bundles in the storage to the neighbor 00071 _taskqueue.push( new SearchNextBundleTask(eid) ); 00072 } 00073 return; 00074 } catch (const std::bad_cast&) { }; 00075 00076 // The bundle transfer has been aborted 00077 try { 00078 const dtn::net::TransferAbortedEvent &aborted = dynamic_cast<const dtn::net::TransferAbortedEvent&>(*evt); 00079 00080 // transfer the next bundle to this destination 00081 _taskqueue.push( new SearchNextBundleTask( aborted.getPeer() ) ); 00082 return; 00083 } catch (const std::bad_cast&) { }; 00084 00085 // A bundle transfer was successful 00086 try { 00087 const dtn::net::TransferCompletedEvent &completed = dynamic_cast<const dtn::net::TransferCompletedEvent&>(*evt); 00088 00089 // transfer the next bundle to this destination 00090 _taskqueue.push( new SearchNextBundleTask( completed.getPeer() ) ); 00091 return; 00092 } catch (const std::bad_cast&) { }; 00093 } 00094 00095 bool FloodRoutingExtension::__cancellation() 00096 { 00097 _taskqueue.abort(); 00098 return true; 00099 } 00100 00101 void FloodRoutingExtension::run() 00102 { 00103 class BundleFilter : public dtn::core::BundleStorage::BundleFilterCallback 00104 { 00105 public: 00106 BundleFilter(const NeighborDatabase::NeighborEntry &entry) 00107 : _entry(entry) 00108 {}; 00109 00110 virtual ~BundleFilter() {}; 00111 00112 virtual size_t limit() const { return 10; }; 00113 00114 virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const 00115 { 00116 // do not forward to any blacklisted destination 00117 const dtn::data::EID dest = meta.destination.getNode(); 00118 if (_blacklist.find(dest) != _blacklist.end()) 00119 { 00120 return false; 00121 } 00122 00123 // do not forward bundles already known by the destination 00124 if (_entry.has(meta)) 00125 { 00126 return false; 00127 } 00128 00129 return true; 00130 }; 00131 00132 void blacklist(const dtn::data::EID& id) 00133 { 00134 _blacklist.insert(id); 00135 }; 00136 00137 private: 00138 std::set<dtn::data::EID> _blacklist; 00139 const NeighborDatabase::NeighborEntry &_entry; 00140 }; 00141 00142 dtn::core::BundleStorage &storage = (**this).getStorage(); 00143 00144 while (true) 00145 { 00146 try { 00147 Task *t = _taskqueue.getnpop(true); 00148 ibrcommon::AutoDelete<Task> killer(t); 00149 00150 IBRCOMMON_LOGGER_DEBUG(50) << "processing flooding task " << t->toString() << IBRCOMMON_LOGGER_ENDL; 00151 00152 try { 00153 try { 00154 SearchNextBundleTask &task = dynamic_cast<SearchNextBundleTask&>(*t); 00155 NeighborDatabase &db = (**this).getNeighborDB(); 00156 00157 ibrcommon::MutexLock l(db); 00158 NeighborDatabase::NeighborEntry &entry = db.get(task.eid); 00159 00160 // get the bundle filter of the neighbor 00161 BundleFilter filter(entry); 00162 00163 // some debug 00164 IBRCOMMON_LOGGER_DEBUG(40) << "search some bundles not known by " << task.eid.getString() << IBRCOMMON_LOGGER_ENDL; 00165 00166 // blacklist the neighbor itself, because this is handles by neighbor routing extension 00167 filter.blacklist(task.eid); 00168 00169 // query all bundles from the storage 00170 const std::list<dtn::data::MetaBundle> list = storage.get(filter); 00171 00172 // send the bundles as long as we have resources 00173 for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); iter++) 00174 { 00175 try { 00176 // transfer the bundle to the neighbor 00177 transferTo(entry, *iter); 00178 } catch (const NeighborDatabase::AlreadyInTransitException&) { }; 00179 } 00180 } catch (const NeighborDatabase::NoMoreTransfersAvailable&) { 00181 } catch (const NeighborDatabase::NeighborNotAvailableException&) { 00182 } catch (const std::bad_cast&) { }; 00183 00184 try { 00185 dynamic_cast<ProcessBundleTask&>(*t); 00186 00187 // new bundles are forwarded to all neighbors 00188 const std::set<dtn::core::Node> nl = dtn::core::BundleCore::getInstance().getNeighbors(); 00189 00190 for (std::set<dtn::core::Node>::const_iterator iter = nl.begin(); iter != nl.end(); iter++) 00191 { 00192 const dtn::core::Node &n = (*iter); 00193 00194 // transfer the next bundle to this destination 00195 _taskqueue.push( new SearchNextBundleTask( n.getEID() ) ); 00196 } 00197 } catch (const std::bad_cast&) { }; 00198 } catch (const ibrcommon::Exception &ex) { 00199 IBRCOMMON_LOGGER_DEBUG(20) << "Exception occurred in FloodRoutingExtension: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00200 } 00201 } catch (const std::exception&) { 00202 return; 00203 } 00204 00205 yield(); 00206 } 00207 } 00208 00209 /****************************************/ 00210 00211 FloodRoutingExtension::SearchNextBundleTask::SearchNextBundleTask(const dtn::data::EID &e) 00212 : eid(e) 00213 { } 00214 00215 FloodRoutingExtension::SearchNextBundleTask::~SearchNextBundleTask() 00216 { } 00217 00218 std::string FloodRoutingExtension::SearchNextBundleTask::toString() 00219 { 00220 return "SearchNextBundleTask: " + eid.getString(); 00221 } 00222 00223 /****************************************/ 00224 00225 FloodRoutingExtension::ProcessBundleTask::ProcessBundleTask(const dtn::data::MetaBundle &meta, const dtn::data::EID &o) 00226 : bundle(meta), origin(o) 00227 { } 00228 00229 FloodRoutingExtension::ProcessBundleTask::~ProcessBundleTask() 00230 { } 00231 00232 std::string FloodRoutingExtension::ProcessBundleTask::toString() 00233 { 00234 return "ProcessBundleTask: " + bundle.toString(); 00235 } 00236 } 00237 }