|
IBR-DTNSuite 0.6
|
00001 /* 00002 * EpidemicRoutingExtension.cpp 00003 * 00004 * Created on: 18.02.2010 00005 * Author: morgenro 00006 */ 00007 00008 #include "routing/epidemic/EpidemicRoutingExtension.h" 00009 #include "routing/epidemic/EpidemicControlMessage.h" 00010 00011 #include "routing/QueueBundleEvent.h" 00012 #include "net/TransferCompletedEvent.h" 00013 #include "net/TransferAbortedEvent.h" 00014 #include "core/BundleExpiredEvent.h" 00015 #include "core/NodeEvent.h" 00016 #include "core/TimeEvent.h" 00017 #include "core/Node.h" 00018 #include "net/ConnectionManager.h" 00019 #include "Configuration.h" 00020 #include "core/BundleCore.h" 00021 #include "core/BundleEvent.h" 00022 00023 #include <ibrdtn/data/MetaBundle.h> 00024 #include <ibrcommon/thread/MutexLock.h> 00025 #include <ibrcommon/Logger.h> 00026 #include <ibrcommon/AutoDelete.h> 00027 00028 #include <functional> 00029 #include <list> 00030 #include <algorithm> 00031 00032 #include <iomanip> 00033 #include <ios> 00034 #include <iostream> 00035 #include <set> 00036 00037 #include <stdlib.h> 00038 #include <typeinfo> 00039 00040 namespace dtn 00041 { 00042 namespace routing 00043 { 00044 EpidemicRoutingExtension::EpidemicRoutingExtension() 00045 : _endpoint(_taskqueue, _purge_vector) 00046 { 00047 // write something to the syslog 00048 IBRCOMMON_LOGGER(info) << "Initializing epidemic routing module" << IBRCOMMON_LOGGER_ENDL; 00049 } 00050 00051 EpidemicRoutingExtension::~EpidemicRoutingExtension() 00052 { 00053 stop(); 00054 join(); 00055 } 00056 00057 void EpidemicRoutingExtension::stopExtension() 00058 { 00059 _taskqueue.abort(); 00060 } 00061 00062 void EpidemicRoutingExtension::notify(const dtn::core::Event *evt) 00063 { 00064 // If an incoming bundle is received, forward it to all connected neighbors 00065 try { 00066 dynamic_cast<const QueueBundleEvent&>(*evt); 00067 00068 // new bundles trigger a recheck for all neighbors 00069 const std::set<dtn::core::Node> nl = dtn::core::BundleCore::getInstance().getNeighbors(); 00070 00071 for (std::set<dtn::core::Node>::const_iterator iter = nl.begin(); iter != nl.end(); iter++) 00072 { 00073 const dtn::core::Node &n = (*iter); 00074 00075 // transfer the next bundle to this destination 00076 _taskqueue.push( new SearchNextBundleTask( n.getEID() ) ); 00077 } 00078 return; 00079 } catch (const std::bad_cast&) { }; 00080 00081 // On each time event look for expired stuff 00082 try { 00083 const dtn::core::TimeEvent &time = dynamic_cast<const dtn::core::TimeEvent&>(*evt); 00084 00085 if (time.getAction() == dtn::core::TIME_SECOND_TICK) 00086 { 00087 // check all lists for expired entries 00088 _taskqueue.push( new ExpireTask(time.getTimestamp()) ); 00089 } 00090 return; 00091 } catch (const std::bad_cast&) { }; 00092 00093 // If a new neighbor comes available, send him a request for the summary vector 00094 // If a neighbor went away we can free the stored summary vector 00095 try { 00096 const dtn::core::NodeEvent &nodeevent = dynamic_cast<const dtn::core::NodeEvent&>(*evt); 00097 const dtn::core::Node &n = nodeevent.getNode(); 00098 00099 if (nodeevent.getAction() == NODE_AVAILABLE) 00100 { 00101 // query a new summary vector from this neighbor 00102 _taskqueue.push( new QuerySummaryVectorTask( (**this).getNeighborDB(), n.getEID(), _endpoint ) ); 00103 } 00104 00105 return; 00106 } catch (const std::bad_cast&) { }; 00107 00108 // The bundle transfer has been aborted 00109 try { 00110 const dtn::net::TransferAbortedEvent &aborted = dynamic_cast<const dtn::net::TransferAbortedEvent&>(*evt); 00111 00112 // transfer the next bundle to this destination 00113 _taskqueue.push( new SearchNextBundleTask( aborted.getPeer() ) ); 00114 00115 return; 00116 } catch (const std::bad_cast&) { }; 00117 00118 // A bundle transfer was successful 00119 try { 00120 const dtn::net::TransferCompletedEvent &completed = dynamic_cast<const dtn::net::TransferCompletedEvent&>(*evt); 00121 00122 // create a transfer completed task 00123 _taskqueue.push( new TransferCompletedTask( completed.getPeer(), completed.getBundle() ) ); 00124 return; 00125 } catch (const std::bad_cast&) { }; 00126 } 00127 00128 bool EpidemicRoutingExtension::__cancellation() 00129 { 00130 _taskqueue.abort(); 00131 return true; 00132 } 00133 00134 void EpidemicRoutingExtension::run() 00135 { 00136 class BundleFilter : public dtn::core::BundleStorage::BundleFilterCallback 00137 { 00138 public: 00139 BundleFilter(const NeighborDatabase::NeighborEntry &entry) 00140 : _entry(entry) 00141 {}; 00142 00143 virtual ~BundleFilter() {}; 00144 00145 virtual size_t limit() const { return 10; }; 00146 00147 virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const 00148 { 00149 // do not forward any epidemic control message 00150 // this is done by the neighbor routing module 00151 if (meta.source == (dtn::core::BundleCore::local + "/routing/epidemic")) 00152 { 00153 return false; 00154 } 00155 00156 // do not forward to any blacklisted destination 00157 const dtn::data::EID dest = meta.destination.getNode(); 00158 if (_blacklist.find(dest) != _blacklist.end()) 00159 { 00160 return false; 00161 } 00162 00163 // do not forward bundles already known by the destination 00164 // throws BloomfilterNotAvailableException if no filter is available or it is expired 00165 if (_entry.has(meta, true)) 00166 { 00167 return false; 00168 } 00169 00170 return true; 00171 }; 00172 00173 void blacklist(const dtn::data::EID& id) 00174 { 00175 _blacklist.insert(id); 00176 }; 00177 00178 private: 00179 std::set<dtn::data::EID> _blacklist; 00180 const NeighborDatabase::NeighborEntry &_entry; 00181 }; 00182 00183 dtn::core::BundleStorage &storage = (**this).getStorage(); 00184 00185 while (true) 00186 { 00187 try { 00188 Task *t = _taskqueue.getnpop(true); 00189 ibrcommon::AutoDelete<Task> killer(t); 00190 00191 IBRCOMMON_LOGGER_DEBUG(50) << "processing epidemic task " << t->toString() << IBRCOMMON_LOGGER_ENDL; 00192 00193 try { 00197 try { 00198 ExecutableTask &etask = dynamic_cast<ExecutableTask&>(*t); 00199 etask.execute(); 00200 } catch (const std::bad_cast&) { }; 00201 00205 try { 00206 ExpireTask &task = dynamic_cast<ExpireTask&>(*t); 00207 _purge_vector.expire(task.timestamp); 00208 } catch (const std::bad_cast&) { }; 00209 00215 try { 00216 SearchNextBundleTask &task = dynamic_cast<SearchNextBundleTask&>(*t); 00217 NeighborDatabase &db = (**this).getNeighborDB(); 00218 00219 ibrcommon::MutexLock l(db); 00220 NeighborDatabase::NeighborEntry &entry = db.get(task.eid); 00221 00222 try { 00223 // get the bundle filter of the neighbor 00224 BundleFilter filter(entry); 00225 00226 // some debug output 00227 IBRCOMMON_LOGGER_DEBUG(40) << "search some bundles not known by " << task.eid.getString() << IBRCOMMON_LOGGER_ENDL; 00228 00229 // blacklist the neighbor itself, because this is handled by neighbor routing extension 00230 filter.blacklist(task.eid); 00231 00232 // query some unknown bundle from the storage, the list contains max. 10 items. 00233 const std::list<dtn::data::MetaBundle> list = storage.get(filter); 00234 00235 // send the bundles as long as we have resources 00236 for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); iter++) 00237 { 00238 try { 00239 // transfer the bundle to the neighbor 00240 transferTo(entry, *iter); 00241 } catch (const NeighborDatabase::AlreadyInTransitException&) { }; 00242 } 00243 } catch (const NeighborDatabase::BloomfilterNotAvailableException&) { 00244 // query a new summary vector from this neighbor 00245 _taskqueue.push( new QuerySummaryVectorTask( (**this).getNeighborDB(), task.eid, _endpoint ) ); 00246 } 00247 } catch (const NeighborDatabase::NoMoreTransfersAvailable&) { 00248 } catch (const NeighborDatabase::NeighborNotAvailableException&) { 00249 } catch (const std::bad_cast&) { }; 00250 00254 try { 00255 TransferCompletedTask &task = dynamic_cast<TransferCompletedTask&>(*t); 00256 00257 try { 00258 // add this bundle to the purge vector if it is delivered to its destination 00259 if (( task.peer.getNode() == task.meta.destination.getNode() ) && (task.meta.procflags & dtn::data::Bundle::DESTINATION_IS_SINGLETON)) 00260 { 00261 IBRCOMMON_LOGGER(notice) << "singleton bundle added to purge vector: " << task.meta.toString() << IBRCOMMON_LOGGER_ENDL; 00262 00263 // add it to the purge vector 00264 _purge_vector.add(task.meta); 00265 } 00266 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) { }; 00267 00268 // transfer the next bundle to this destination 00269 _taskqueue.push( new SearchNextBundleTask( task.peer ) ); 00270 } catch (const std::bad_cast&) { }; 00271 00275 try { 00276 const ProcessEpidemicBundleTask &task = dynamic_cast<ProcessEpidemicBundleTask&>(*t); 00277 processECM(task.bundle); 00278 } catch (const std::bad_cast&) { }; 00279 00280 } catch (const ibrcommon::Exception &ex) { 00281 IBRCOMMON_LOGGER_DEBUG(20) << "Exception occurred in EpidemicRoutingExtension: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00282 } 00283 } catch (const std::exception&) { 00284 return; 00285 } 00286 00287 yield(); 00288 } 00289 } 00290 00291 /****************************************/ 00292 00293 EpidemicRoutingExtension::SearchNextBundleTask::SearchNextBundleTask(const dtn::data::EID &e) 00294 : eid(e) 00295 { } 00296 00297 EpidemicRoutingExtension::SearchNextBundleTask::~SearchNextBundleTask() 00298 { } 00299 00300 std::string EpidemicRoutingExtension::SearchNextBundleTask::toString() 00301 { 00302 return "SearchNextBundleTask: " + eid.getString(); 00303 } 00304 00305 /****************************************/ 00306 00307 EpidemicRoutingExtension::ProcessEpidemicBundleTask::ProcessEpidemicBundleTask(const dtn::data::Bundle &b) 00308 : bundle(b) 00309 { } 00310 00311 EpidemicRoutingExtension::ProcessEpidemicBundleTask::~ProcessEpidemicBundleTask() 00312 { } 00313 00314 std::string EpidemicRoutingExtension::ProcessEpidemicBundleTask::toString() 00315 { 00316 return "ProcessEpidemicBundleTask: " + bundle.toString(); 00317 } 00318 00319 /****************************************/ 00320 00321 EpidemicRoutingExtension::ExpireTask::ExpireTask(const size_t t) 00322 : timestamp(t) 00323 { } 00324 00325 EpidemicRoutingExtension::ExpireTask::~ExpireTask() 00326 { } 00327 00328 std::string EpidemicRoutingExtension::ExpireTask::toString() 00329 { 00330 return "ExpireTask"; 00331 } 00332 00333 /****************************************/ 00334 00335 EpidemicRoutingExtension::TransferCompletedTask::TransferCompletedTask(const dtn::data::EID &e, const dtn::data::MetaBundle &m) 00336 : peer(e), meta(m) 00337 { } 00338 00339 EpidemicRoutingExtension::TransferCompletedTask::~TransferCompletedTask() 00340 { } 00341 00342 std::string EpidemicRoutingExtension::TransferCompletedTask::toString() 00343 { 00344 return "TransferCompletedTask"; 00345 } 00346 00347 /****************************************/ 00348 00349 EpidemicRoutingExtension::QuerySummaryVectorTask::QuerySummaryVectorTask(NeighborDatabase &db, const dtn::data::EID &o, EpidemicEndpoint &e) 00350 : ndb(db), origin(o), endpoint(e) 00351 { } 00352 00353 EpidemicRoutingExtension::QuerySummaryVectorTask::~QuerySummaryVectorTask() 00354 { } 00355 00356 void EpidemicRoutingExtension::QuerySummaryVectorTask::execute() const 00357 { 00358 try { 00359 // lock the list of neighbors 00360 ibrcommon::MutexLock l(ndb); 00361 NeighborDatabase::NeighborEntry &entry = ndb.get(origin); 00362 00363 // acquire resources to send a summary vector request 00364 entry.acquireFilterRequest(); 00365 00366 // call the query method at the epidemic endpoint instance 00367 endpoint.query(origin); 00368 } catch (const NeighborDatabase::NoMoreTransfersAvailable&) { 00369 } catch (const NeighborDatabase::NeighborNotAvailableException&) { }; 00370 } 00371 00372 std::string EpidemicRoutingExtension::QuerySummaryVectorTask::toString() 00373 { 00374 return "QuerySummaryVectorTask: " + origin.getString(); 00375 } 00376 00377 EpidemicRoutingExtension::EpidemicEndpoint::EpidemicEndpoint(ibrcommon::Queue<EpidemicRoutingExtension::Task* > &queue, dtn::routing::BundleSummary &purge) 00378 : _taskqueue(queue), _purge_vector(purge) 00379 { 00380 AbstractWorker::initialize("/routing/epidemic", true); 00381 } 00382 00383 EpidemicRoutingExtension::EpidemicEndpoint::~EpidemicEndpoint() 00384 { 00385 } 00386 00387 void EpidemicRoutingExtension::EpidemicEndpoint::callbackBundleReceived(const Bundle &b) 00388 { 00389 _taskqueue.push( new ProcessEpidemicBundleTask(b) ); 00390 } 00391 00392 void EpidemicRoutingExtension::EpidemicEndpoint::send(const dtn::data::Bundle &b) 00393 { 00394 transmit(b); 00395 } 00396 00397 void EpidemicRoutingExtension::EpidemicEndpoint::query(const dtn::data::EID &origin) 00398 { 00399 // create a new request for the summary vector of the neighbor 00400 EpidemicControlMessage ecm; 00401 00402 // set message type 00403 ecm.type = EpidemicControlMessage::ECM_QUERY_SUMMARY_VECTOR; 00404 00405 // create a new bundle 00406 dtn::data::Bundle req; 00407 00408 // set the source of the bundle 00409 req._source = dtn::core::BundleCore::local + "/routing/epidemic"; 00410 00411 // set the destination of the bundle 00412 req.set(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON, true); 00413 req._destination = origin + "/routing/epidemic"; 00414 00415 // limit the lifetime to 60 seconds 00416 req._lifetime = 60; 00417 00418 // set high priority 00419 req.set(dtn::data::PrimaryBlock::PRIORITY_BIT1, true); 00420 req.set(dtn::data::PrimaryBlock::PRIORITY_BIT2, true); 00421 00422 dtn::data::PayloadBlock &p = req.push_back<PayloadBlock>(); 00423 ibrcommon::BLOB::Reference ref = p.getBLOB(); 00424 00425 // serialize the request into the payload 00426 { 00427 ibrcommon::BLOB::iostream ios = ref.iostream(); 00428 (*ios) << ecm; 00429 } 00430 00431 // send the bundle 00432 transmit(req); 00433 } 00434 00435 void EpidemicRoutingExtension::processECM(const dtn::data::Bundle &bundle) 00436 { 00437 // read the ecm 00438 const dtn::data::PayloadBlock &p = bundle.getBlock<dtn::data::PayloadBlock>(); 00439 ibrcommon::BLOB::Reference ref = p.getBLOB(); 00440 EpidemicControlMessage ecm; 00441 00442 // locked within this region 00443 { 00444 ibrcommon::BLOB::iostream s = ref.iostream(); 00445 (*s) >> ecm; 00446 } 00447 00448 // if this is a request answer with an summary vector 00449 if (ecm.type == EpidemicControlMessage::ECM_QUERY_SUMMARY_VECTOR) 00450 { 00451 // create a new request for the summary vector of the neighbor 00452 EpidemicControlMessage response_ecm; 00453 00454 // set message type 00455 response_ecm.type = EpidemicControlMessage::ECM_RESPONSE; 00456 00457 // add own summary vector to the message 00458 const SummaryVector vec = (**this).getSummaryVector(); 00459 response_ecm.setSummaryVector(vec); 00460 00461 // add own purge vector to the message 00462 response_ecm.setPurgeVector(_purge_vector); 00463 00464 // create a new bundle 00465 dtn::data::Bundle answer; 00466 00467 // set the source of the bundle 00468 answer._source = dtn::core::BundleCore::local + "/routing/epidemic"; 00469 00470 // set the destination of the bundle 00471 answer.set(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON, true); 00472 answer._destination = bundle._source; 00473 00474 // limit the lifetime to 60 seconds 00475 answer._lifetime = 60; 00476 00477 // set high priority 00478 answer.set(dtn::data::PrimaryBlock::PRIORITY_BIT1, true); 00479 answer.set(dtn::data::PrimaryBlock::PRIORITY_BIT2, true); 00480 00481 dtn::data::PayloadBlock &p = answer.push_back<PayloadBlock>(); 00482 ibrcommon::BLOB::Reference ref = p.getBLOB(); 00483 00484 // serialize the request into the payload 00485 { 00486 ibrcommon::BLOB::iostream ios = ref.iostream(); 00487 (*ios) << response_ecm; 00488 } 00489 00490 // transfer the bundle to the neighbor 00491 _endpoint.send(answer); 00492 } 00493 else if (ecm.type == EpidemicControlMessage::ECM_RESPONSE) 00494 { 00495 if (ecm.flags & EpidemicControlMessage::ECM_CONTAINS_SUMMARY_VECTOR) 00496 { 00497 // get the summary vector (bloomfilter) of this ECM 00498 const ibrcommon::BloomFilter &filter = ecm.getSummaryVector().getBloomFilter(); 00499 00505 try { 00506 NeighborDatabase &db = (**this).getNeighborDB(); 00507 ibrcommon::MutexLock l(db); 00508 NeighborDatabase::NeighborEntry &entry = db.get(bundle._source.getNode()); 00509 entry.update(filter, bundle._lifetime); 00510 00511 // trigger the search-for-next-bundle procedure 00512 _taskqueue.push( new SearchNextBundleTask( entry.eid ) ); 00513 } catch (const NeighborDatabase::NeighborNotAvailableException&) { }; 00514 } 00515 00516 if (ecm.flags & EpidemicControlMessage::ECM_CONTAINS_PURGE_VECTOR) 00517 { 00518 // get the purge vector (bloomfilter) of this ECM 00519 const ibrcommon::BloomFilter &purge = ecm.getPurgeVector().getBloomFilter(); 00520 00521 dtn::core::BundleStorage &storage = (**this).getStorage(); 00522 00523 try { 00524 while (true) 00525 { 00526 // delete bundles in the purge vector 00527 const dtn::data::MetaBundle meta = storage.remove(purge); 00528 00529 // log the purged bundle 00530 IBRCOMMON_LOGGER(notice) << "bundle purged: " << meta.toString() << IBRCOMMON_LOGGER_ENDL; 00531 00532 // gen a report 00533 dtn::core::BundleEvent::raise(meta, BUNDLE_DELETED, StatusReportBlock::DEPLETED_STORAGE); 00534 00535 // add this bundle to the own purge vector 00536 _purge_vector.add(meta); 00537 } 00538 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) { }; 00539 } 00540 } 00541 } 00542 } 00543 }